ROMANCE DAWN for the new world

Microsoft Azure を中心とした技術情報を書いています。

Azure Synapse Analytics で Parquet から Dedicated SQL Pool にデータを取り込むパイプラインを作成する

Azure Synapse Analytics を使って、売上分析プラットフォームを作ってみました。
ここでは、売上分析プラットフォームの Batch Layer を作る部分について記載します。
想定シナリオや全体アーキテクチャについては、次の記事を参照してください。
gooner.hateblo.jp

Batch Layer

f:id:TonyTonyKun:20210105172203p:plain
Batch Layer は、DataLake から DWH を作る責務を持ちます。
Mapping Data Flow を使って、Azure Data Lake Storage Gen2(ADLS)の Parquet から Dedicated SQL Pool に売上データを取り込むパイプラインを作成します。
売上データを取り込む過程で、SQL Database の顧客マスターと突き合わせて、各店舗のローカルコードを標準コードに変換します。

Datasets を作成する

利用するリソースごとに、3つの Datasets を作成します。

Datasets for ADLS

ADLS に配置されている Parquet を参照するための Datasets を作成します。
Linked service には、Synapse Analytics の Default Storage(ADLS)を指定し、file path でコンテナ名の sales を入力します。

f:id:TonyTonyKun:20210104161354p:plain

sales コンテナの直下に店舗毎に日付別のフォルダ(e.g. sales/store-a/2021/01/01)を作成して Parquet ファイルを配置しています。
Datasets に引数を作りたかったのですが、Mapping Data Flow から値を引き渡すことができませんでした。そのため、ここではコンテナ名のみを入力して、Mapping Data Flow 側で file path を上書きするように構成します。

Datasets for SQL Database

SQL Database の顧客マスターを参照するための Datasets を作成します。
Linked service には SQL Database を指定し、Table で CustomerView を選択します。

f:id:TonyTonyKun:20210104161527p:plain

Datasets for Dedicated SQL Pool

Dedicated SQL Pool に売上データを取り込むための Datasets を作成します。
Linked service には Dedicated SQL Pool を指定し、DbName の SQLDW1 と Table の Sales を入力します。

f:id:TonyTonyKun:20210104162122p:plain

Linked service を指定する際の注意点として、"Azure Synapse Dedicated SQL Pool" ではなく、"Azure Synapse Analytics" を選択してください。Mapping Data Flow から Datasets を指定する際に、なぜか "Azure Synapse Dedicated SQL Pool" を選択できないためです。

Mapping Data Flow を作成する

ADLS の Parquet から Dedicated SQL Pool にデータを取り込む Mapping Data Flow の完成イメージです。

f:id:TonyTonyKun:20210104162356p:plain

Parameter

Mapping Data Flow の Parameter を設定します。
store は、パイプラインを全店舗で共通化して、Trigger を店舗毎に作る想定のため、店舗を指定します。
folderPath は、日次でフォルダ分けされた Parquet ファイルの場所を指定します。

f:id:TonyTonyKun:20210104162453p:plain

Source(ADLS 用)

ADLS に配置されている Parquet を参照したいので、先ほど作成した ADLS 用の Datasets を指定します。

f:id:TonyTonyKun:20210104162703p:plain

Wildcard paths には、$store + "/" + $folderPath を入力します。これにより、Parameter で受け取った値を使って動的に読み取りパスを決定することができます。

f:id:TonyTonyKun:20210104162813p:plain

Source(SQL Database 用)

SQL Database に配置されている顧客マスターを参照したいので、先ほど作成した SQL Database 用の Datasets を指定します。

f:id:TonyTonyKun:20210104162939p:plain

Filter

SQL Database の CustomerView は、全店舗の顧客マスターが返されるため、Parameter で受け取った store で Filter します。

f:id:TonyTonyKun:20210104163111p:plain

Join

2つの Source(Parquet の売上データと SQL Database の顧客マスター)を Customer Code をキーに Left Outer Join します。

f:id:TonyTonyKun:20210104163248p:plain

Select

2つの Source を Join した結果のうち、必要ない列を削除して、Dedicated SQL Pool に取り込む列名を設定します。

f:id:TonyTonyKun:20210104163424p:plain

Sink

Dedicated SQL Pool に売上データを取り込み取りたいので、先ほど作成した Dedicated SQL Pool 用の Datasets を指定します。

f:id:TonyTonyKun:20210104163706p:plain

以上で、Mapping Data Flow の作成が完了しました。

Pipeline を作成する

Mapping Data Flow を実行するパイプラインを作成します。
f:id:TonyTonyKun:20210106191256p:plain

Parameter

Mapping Data Flow と同様に、パイプライン の Parameter を設定します。

f:id:TonyTonyKun:20210104163945p:plain

Data Flow

先ほど作成した Mapping Data Flow を指定します。Staging の領域が必要となるので、Synapse Analytics の Default Storage(ADLS)に任意のコンテナを指定します。

f:id:TonyTonyKun:20210104164231p:plain

パイプラインの Parameter を Mapping Data Flow の Parameter に引き渡します。

f:id:TonyTonyKun:20210104164056p:plain

"store": @pipeline().parameters.store,
"folderPath": @{formatDateTime(pipeline().parameters.windowStart,'yyyy')}/@{formatDateTime(pipeline().parameters.windowStart,'MM')}/@{formatDateTime(pipeline().parameters.windowStart,'dd')}

Validation

Validation Activity を使って、ADLS の Parquet ファイルの存在をチェックします。
フォルダだけでなく、ファイルの存在もチェックしたいので、Child Items には TRUE を指定します。それ以外の Datasets と Parameter は、同様に設定します。

f:id:TonyTonyKun:20210106191117p:plain

以上で、パイプラインの作成が完了しました。

パイプラインを動作確認する

パイプラインの Trigger now から、Parameter を指定して実行してみます。

f:id:TonyTonyKun:20210104164416p:plain

パイプラインが成功すると、標準顧客コードをマッピングした売上データが Dedicated SQL Pool の Sales テーブルに取り込まれたことを確認できます。

f:id:TonyTonyKun:20210104164641p:plain

Trigger を作成する

最後に、日次実行するトリガーを作成して完了です。Tumbling window を 24 時間の間隔で実行します。

f:id:TonyTonyKun:20210104165040p:plain

トリガーの実行開始時刻を windowStart でパイプラインに引き渡します。トリガーを店舗ごとに作る想定なので、store を決め打ちしています。

f:id:TonyTonyKun:20210104165051p:plain

{
    "store": store-a,
    "windowStart": @trigger().outputs.windowStartTime
}

Schedule でも代用できますが、リトライポリシーを設定できる Tumbling window を選択しました。
docs.microsoft.com

まとめ

Azure Synapse Analytics を使って、売上分析プラットフォームの Batch Layer を作ってみました。
Mapping Data Flow のアクティビティを使いこなせるようになって、もう少し複雑なクレンジングも作れるようになりたいところです。

今回のソースコードは、こちらで公開しています。
github.com