ROMANCE DAWN for the new world

Microsoft Azure を中心とした技術情報を書いています。

Azure Synapse Analytics を使って売上分析プラットフォームを作ってみた

Azure Synapse Analytics は、データのインジェスト→分析→可視化のための開発やモニタリングを統合して管理できるデータ分析プラットフォームです。
先日少し試してみましたが、ポテンシャルの高さを感じる良いサービスです。
gooner.hateblo.jp
今回は、Azure Synapse Analytics を使って売上分析プラットフォームを作ってみます。

売上分析プラットフォームのアーキテクチャ

売上分析プラットフォームでは、各店舗の POS システムから売上データを収集し、全店舗のデータをまとめた横断的な売上分析を行う想定シナリオです。

  • 各店舗の売上データは、日次単位で XML 形式のファイルがアップロードされる
  • 各店舗の顧客マスターは、バラバラのローカルコードが使われているため、標準顧客マスターのコードをマッピングする
  • 全店舗の売上データを集めて、顧客ごとの売上状況を分析する

次のようなアーキテクチャで、売上分析プラットフォームを構築します。

f:id:TonyTonyKun:20210105171758p:plain

データの準備

売上データ

Blob Storage にアップロードする XML のデータは、mockaroo というサイトを使って、売上データっぽいスキーマを作ります。

f:id:TonyTonyKun:20210104172316p:plain

作成した XML は、以下のようなスキーマです。1つの XML ファイルに 1000 件の売上データを作ります。顧客マスターのローカルコード(customer_code)には、A0001/A0002/A0003 という3つのコードをランダムにセットしています。

<?xml version='1.0' encoding='UTF-8'?>
<dataset>
    <record>
        <id>1</id>
        <order_date>2021-01-01T00:00:00Z</order_date>
        <customer_code>A0001</customer_code>
        <product_code>KE-774</product_code>
        <unit_price>414.17</unit_price>
        <quantity>2</quantity>
    </record>
    <record>
        <id>2</id>
        <order_date>2021-01-01T00:00:00Z</order_date>
        <customer_code>A0002</customer_code>
        <product_code>MF-050</product_code>
        <unit_price>395.07</unit_price>
        <quantity>3</quantity>
    </record>
</dataset>

顧客マスターのマッピングデータ

SQL Database に、顧客マスターをマッピングするための3つのテーブルを作ります。

  • 顧客マスター(Customer)
  • 標準顧客マスター(StandardCustomer)
  • 顧客マッピング(CustomerMapping)

f:id:TonyTonyKun:20210104173454p:plain

顧客マスターには、店舗を判別する Store 列(e.g. store-a, store-b)を作り、Store + CustomerCode で UNIQUE Key としています。
売上データのローカルコードと突き合わせて、標準コードに変換するための View も作ります。

f:id:TonyTonyKun:20210104173629p:plain

リソースの作成

売上分析プラットフォームで使う Azure リソースを作成し、Azure Synapse Studio で作成する成果物を GitHub で管理できるように設定します。

  • Azure Synapse Analytics(ADLS / Dedicated SQL Pool)
  • Blob Storage
  • SQL Database(DbName:Sales)

docs.microsoft.com
Synapse Analytics で管理される ADLS と Dedicated SQL Pool は、デフォルトで Managed Service Identity(MSI)の認証が設定されているので、外部サービスに対しても追加で設定しておきます。
Azure Portal から Access control(IAM) で、Blob Storage に Storage Blob Data Contributor を設定します。MSI の名前は、ワークスペース名で登録されています。

f:id:TonyTonyKun:20210110132419p:plain

同様に SQL Database にも SQL DB Contributor を設定することに加えて、Active Directory admin で Azure AD の管理者を登録する必要があります。

f:id:TonyTonyKun:20210110133205p:plain

Ingest Layer

Ingest Layer は、収集したデータから DataLake を作る責務を持ちます。
Copy Data を使って、Blob Storage の XML を Parquet に変換して ADLS にコピーするパイプラインを作成します。

f:id:TonyTonyKun:20210106190053p:plain

詳細は、こちらの記事を参照してください。
gooner.hateblo.jp
gooner.hateblo.jp

Batch Layer

Batch Layer は、DataLake から DWH を作る責務を持ちます。
Mapping Data Flow を使って、ADLS の Parquet から Dedicated SQL Pool に売上データを取り込むパイプラインを作成します。
売上データを取り込む過程で、SQL Database の標準顧客マスターと突き合わせて、各店舗のローカルコードを標準コードに変換します。

f:id:TonyTonyKun:20210104162356p:plain

詳細は、こちらの記事を参照してください。
gooner.hateblo.jp

Serving Layer

Serving Layer は、DWH から Data Mart を作る責務を持ちます。
Mapping Data Flow を使って、Dedicated SQL Pool の売上データから SQL Database に顧客別売上データを取り込むパイプラインを作成します。

f:id:TonyTonyKun:20210105151148p:plain

詳細は、こちらの記事を参照してください。
gooner.hateblo.jp

まとめ

Azure Synapse Analytics を使って売上分析プラットフォームを作ってみました。
Spark Pool の Notebook でコードを書いてパイプラインを作るつもりでしたが、Dedicated SQL Pool 向けのコネクタが Append に対応していませんでした。
このレベルのシナリオであれば、Copy Data と Mapping Data Flow で構築できることが分かったので、これはこれでいいのですが、次回は Spark Pool を使ったシナリオを掘り下げていきたいです。

今回のソースコードは、こちらで公開しています。
github.com

Azure Synapse Analytics で Dedicated SQL Pool から SQL Database にデータを取り込むパイプラインを作成する

Azure Synapse Analytics を使って、売上分析プラットフォームを作ってみました。
ここでは、売上分析プラットフォームの Serving Layer を作る部分を記載します。
想定シナリオや全体アーキテクチャについては、次の記事を参照してください。
gooner.hateblo.jp

Serving Layer

f:id:TonyTonyKun:20210105172513p:plain
Serving Layer は、DWH から Data Mart を作る責務を持ちます。
Mapping Data Flow を使って、Dedicated SQL Pool の売上データから SQL Database に顧客別売上データを取り込むパイプラインを作成します。

Datasets を作成する

利用するリソースごとに、2つの Datasets を作成します。

Datasets for Dedicated SQL Pool

Dedicated SQL Pool から売上データを取り込むための Datasets を作成します。
Linked service には Dedicated SQL Pool を指定し、DbName の SQLDW1 と Table の Sales を入力します。

f:id:TonyTonyKun:20210104162122p:plain

Linked service を指定する際の注意点として、"Azure Synapse Dedicated SQL Pool" ではなく、"Azure Synapse Analytics" を選択してください。Mapping Data Flow から Datasets を指定する際に、なぜか "Azure Synapse Dedicated SQL Pool" を選択できないためです。

Datasets for SQL Database

SQL Database の顧客マスターを参照するための Datasets を作成します。
Linked service には SQL Database を指定し、Table で SalesByCustomer を入力します。

f:id:TonyTonyKun:20210105152859p:plain

Mapping Data Flow を作成する

Dedicated SQL Pool の売上データから SQL Database に顧客別売上データを取り込む Mapping Data Flow の完成イメージです。

f:id:TonyTonyKun:20210105151148p:plain

Source

Dedicated SQL Pool から売上データを参照したいので、先ほど作成した Dedicated SQL Pool 用の Datasets を指定します。

f:id:TonyTonyKun:20210105154239p:plain

売上データを参照するためのクエリ(SELECT * FROM Sales)を入力し、import projection ボタンをクリックしてスキーマを取り込みます。

f:id:TonyTonyKun:20210105154429p:plain

Aggregate

顧客別にデータを集計するため、Group By で StandardCustomerCode を指定します。

f:id:TonyTonyKun:20210105155036p:plain

商品単価の平均(UnitPriceAvg)と商品数の合計(QuantitySum)を返すように列を設定します。

f:id:TonyTonyKun:20210105155045p:plain

Sink

SQL Database に顧客別売上データを取り込み取りたいので、先ほど作成した SQL Database 用の Datasets を指定します。

f:id:TonyTonyKun:20210105155848p:plain

パイプラインが実行される都度、データを作り直したいので、Table action で Truncate Table を選択します。

f:id:TonyTonyKun:20210105155858p:plain

以上で、Mapping Data Flow の作成が完了しました。

Pipeline を作成する

Mapping Data Flow を実行するパイプラインを作成します。
パイプラインに先ほど作成した Mapping Data Flow を追加するだけで、特別な設定は必要ありません。

パイプラインを動作確認する

パイプラインの Trigger now から実行してみます。
パイプラインが成功すると、顧客別売上データが SQL Database の SalesByCustomer テーブルに取り込まれたことを確認できます。

f:id:TonyTonyKun:20210105160755p:plain

Trigger を作成する

最後に、日次実行するトリガーを作成して完了です。Tumbling window を 24 時間の間隔で実行します。

f:id:TonyTonyKun:20210105161029p:plain
Schedule でも代用できますが、リトライポリシーを設定できる Tumbling window を選択しました。
docs.microsoft.com

まとめ

Azure Synapse Analytics を使って、売上分析プラットフォームの Serving Layer を作ってみました。
Mapping Data Flow のアクティビティを使いこなせるようになって、もう少し複雑なクレンジングも作れるようになりたいところです。
このレイヤーは、Power BI などの分析ツール側に Data Mart を作ってしまう方法もあるので、要件に合わせて選択することになると思います。

今回のソースコードは、こちらで公開しています。
github.com

Azure Synapse Analytics で Parquet から Dedicated SQL Pool にデータを取り込むパイプラインを作成する

Azure Synapse Analytics を使って、売上分析プラットフォームを作ってみました。
ここでは、売上分析プラットフォームの Batch Layer を作る部分について記載します。
想定シナリオや全体アーキテクチャについては、次の記事を参照してください。
gooner.hateblo.jp

Batch Layer

f:id:TonyTonyKun:20210105172203p:plain
Batch Layer は、DataLake から DWH を作る責務を持ちます。
Mapping Data Flow を使って、Azure Data Lake Storage Gen2(ADLS)の Parquet から Dedicated SQL Pool に売上データを取り込むパイプラインを作成します。
売上データを取り込む過程で、SQL Database の顧客マスターと突き合わせて、各店舗のローカルコードを標準コードに変換します。

Datasets を作成する

利用するリソースごとに、3つの Datasets を作成します。

Datasets for ADLS

ADLS に配置されている Parquet を参照するための Datasets を作成します。
Linked service には、Synapse Analytics の Default Storage(ADLS)を指定し、file path でコンテナ名の sales を入力します。

f:id:TonyTonyKun:20210104161354p:plain

sales コンテナの直下に店舗毎に日付別のフォルダ(e.g. sales/store-a/2021/01/01)を作成して Parquet ファイルを配置しています。
Datasets に引数を作りたかったのですが、Mapping Data Flow から値を引き渡すことができませんでした。そのため、ここではコンテナ名のみを入力して、Mapping Data Flow 側で file path を上書きするように構成します。

Datasets for SQL Database

SQL Database の顧客マスターを参照するための Datasets を作成します。
Linked service には SQL Database を指定し、Table で CustomerView を選択します。

f:id:TonyTonyKun:20210104161527p:plain

Datasets for Dedicated SQL Pool

Dedicated SQL Pool に売上データを取り込むための Datasets を作成します。
Linked service には Dedicated SQL Pool を指定し、DbName の SQLDW1 と Table の Sales を入力します。

f:id:TonyTonyKun:20210104162122p:plain

Linked service を指定する際の注意点として、"Azure Synapse Dedicated SQL Pool" ではなく、"Azure Synapse Analytics" を選択してください。Mapping Data Flow から Datasets を指定する際に、なぜか "Azure Synapse Dedicated SQL Pool" を選択できないためです。

Mapping Data Flow を作成する

ADLS の Parquet から Dedicated SQL Pool にデータを取り込む Mapping Data Flow の完成イメージです。

f:id:TonyTonyKun:20210104162356p:plain

Parameter

Mapping Data Flow の Parameter を設定します。
store は、パイプラインを全店舗で共通化して、Trigger を店舗毎に作る想定のため、店舗を指定します。
folderPath は、日次でフォルダ分けされた Parquet ファイルの場所を指定します。

f:id:TonyTonyKun:20210104162453p:plain

Source(ADLS 用)

ADLS に配置されている Parquet を参照したいので、先ほど作成した ADLS 用の Datasets を指定します。

f:id:TonyTonyKun:20210104162703p:plain

Wildcard paths には、$store + "/" + $folderPath を入力します。これにより、Parameter で受け取った値を使って動的に読み取りパスを決定することができます。

f:id:TonyTonyKun:20210104162813p:plain

Source(SQL Database 用)

SQL Database に配置されている顧客マスターを参照したいので、先ほど作成した SQL Database 用の Datasets を指定します。

f:id:TonyTonyKun:20210104162939p:plain

Filter

SQL Database の CustomerView は、全店舗の顧客マスターが返されるため、Parameter で受け取った store で Filter します。

f:id:TonyTonyKun:20210104163111p:plain

Join

2つの Source(Parquet の売上データと SQL Database の顧客マスター)を Customer Code をキーに Left Outer Join します。

f:id:TonyTonyKun:20210104163248p:plain

Select

2つの Source を Join した結果のうち、必要ない列を削除して、Dedicated SQL Pool に取り込む列名を設定します。

f:id:TonyTonyKun:20210104163424p:plain

Sink

Dedicated SQL Pool に売上データを取り込み取りたいので、先ほど作成した Dedicated SQL Pool 用の Datasets を指定します。

f:id:TonyTonyKun:20210104163706p:plain

以上で、Mapping Data Flow の作成が完了しました。

Pipeline を作成する

Mapping Data Flow を実行するパイプラインを作成します。
f:id:TonyTonyKun:20210106191256p:plain

Parameter

Mapping Data Flow と同様に、パイプライン の Parameter を設定します。

f:id:TonyTonyKun:20210104163945p:plain

Data Flow

先ほど作成した Mapping Data Flow を指定します。Staging の領域が必要となるので、Synapse Analytics の Default Storage(ADLS)に任意のコンテナを指定します。

f:id:TonyTonyKun:20210104164231p:plain

パイプラインの Parameter を Mapping Data Flow の Parameter に引き渡します。

f:id:TonyTonyKun:20210104164056p:plain

"store": @pipeline().parameters.store,
"folderPath": @{formatDateTime(pipeline().parameters.windowStart,'yyyy')}/@{formatDateTime(pipeline().parameters.windowStart,'MM')}/@{formatDateTime(pipeline().parameters.windowStart,'dd')}

Validation

Validation Activity を使って、ADLS の Parquet ファイルの存在をチェックします。
フォルダだけでなく、ファイルの存在もチェックしたいので、Child Items には TRUE を指定します。それ以外の Datasets と Parameter は、同様に設定します。

f:id:TonyTonyKun:20210106191117p:plain

以上で、パイプラインの作成が完了しました。

パイプラインを動作確認する

パイプラインの Trigger now から、Parameter を指定して実行してみます。

f:id:TonyTonyKun:20210104164416p:plain

パイプラインが成功すると、標準顧客コードをマッピングした売上データが Dedicated SQL Pool の Sales テーブルに取り込まれたことを確認できます。

f:id:TonyTonyKun:20210104164641p:plain

Trigger を作成する

最後に、日次実行するトリガーを作成して完了です。Tumbling window を 24 時間の間隔で実行します。

f:id:TonyTonyKun:20210104165040p:plain

トリガーの実行開始時刻を windowStart でパイプラインに引き渡します。トリガーを店舗ごとに作る想定なので、store を決め打ちしています。

f:id:TonyTonyKun:20210104165051p:plain

{
    "store": store-a,
    "windowStart": @trigger().outputs.windowStartTime
}

Schedule でも代用できますが、リトライポリシーを設定できる Tumbling window を選択しました。
docs.microsoft.com

まとめ

Azure Synapse Analytics を使って、売上分析プラットフォームの Batch Layer を作ってみました。
Mapping Data Flow のアクティビティを使いこなせるようになって、もう少し複雑なクレンジングも作れるようになりたいところです。

今回のソースコードは、こちらで公開しています。
github.com