ROMANCE DAWN for the new world

Microsoft Azure を中心とした技術情報を書いています。

Azure Synapse Analytics で Dedicated SQL Pool から SQL Database にデータを取り込むパイプラインを作成する

Azure Synapse Analytics を使って、売上分析プラットフォームを作ってみました。
ここでは、売上分析プラットフォームの Serving Layer を作る部分を記載します。
想定シナリオや全体アーキテクチャについては、次の記事を参照してください。
gooner.hateblo.jp

Serving Layer

f:id:TonyTonyKun:20210105172513p:plain
Serving Layer は、DWH から Data Mart を作る責務を持ちます。
Mapping Data Flow を使って、Dedicated SQL Pool の売上データから SQL Database に顧客別売上データを取り込むパイプラインを作成します。

Datasets を作成する

利用するリソースごとに、2つの Datasets を作成します。

Datasets for Dedicated SQL Pool

Dedicated SQL Pool から売上データを取り込むための Datasets を作成します。
Linked service には Dedicated SQL Pool を指定し、DbName の SQLDW1 と Table の Sales を入力します。

f:id:TonyTonyKun:20210104162122p:plain

Linked service を指定する際の注意点として、"Azure Synapse Dedicated SQL Pool" ではなく、"Azure Synapse Analytics" を選択してください。Mapping Data Flow から Datasets を指定する際に、なぜか "Azure Synapse Dedicated SQL Pool" を選択できないためです。

Datasets for SQL Database

SQL Database の顧客マスターを参照するための Datasets を作成します。
Linked service には SQL Database を指定し、Table で SalesByCustomer を入力します。

f:id:TonyTonyKun:20210105152859p:plain

Mapping Data Flow を作成する

Dedicated SQL Pool の売上データから SQL Database に顧客別売上データを取り込む Mapping Data Flow の完成イメージです。

f:id:TonyTonyKun:20210105151148p:plain

Source

Dedicated SQL Pool から売上データを参照したいので、先ほど作成した Dedicated SQL Pool 用の Datasets を指定します。

f:id:TonyTonyKun:20210105154239p:plain

売上データを参照するためのクエリ(SELECT * FROM Sales)を入力し、import projection ボタンをクリックしてスキーマを取り込みます。

f:id:TonyTonyKun:20210105154429p:plain

Aggregate

顧客別にデータを集計するため、Group By で StandardCustomerCode を指定します。

f:id:TonyTonyKun:20210105155036p:plain

商品単価の平均(UnitPriceAvg)と商品数の合計(QuantitySum)を返すように列を設定します。

f:id:TonyTonyKun:20210105155045p:plain

Sink

SQL Database に顧客別売上データを取り込み取りたいので、先ほど作成した SQL Database 用の Datasets を指定します。

f:id:TonyTonyKun:20210105155848p:plain

パイプラインが実行される都度、データを作り直したいので、Table action で Truncate Table を選択します。

f:id:TonyTonyKun:20210105155858p:plain

以上で、Mapping Data Flow の作成が完了しました。

Pipeline を作成する

Mapping Data Flow を実行するパイプラインを作成します。
パイプラインに先ほど作成した Mapping Data Flow を追加するだけで、特別な設定は必要ありません。

パイプラインを動作確認する

パイプラインの Trigger now から実行してみます。
パイプラインが成功すると、顧客別売上データが SQL Database の SalesByCustomer テーブルに取り込まれたことを確認できます。

f:id:TonyTonyKun:20210105160755p:plain

Trigger を作成する

最後に、日次実行するトリガーを作成して完了です。Tumbling window を 24 時間の間隔で実行します。

f:id:TonyTonyKun:20210105161029p:plain
Schedule でも代用できますが、リトライポリシーを設定できる Tumbling window を選択しました。
docs.microsoft.com

まとめ

Azure Synapse Analytics を使って、売上分析プラットフォームの Serving Layer を作ってみました。
Mapping Data Flow のアクティビティを使いこなせるようになって、もう少し複雑なクレンジングも作れるようになりたいところです。
このレイヤーは、Power BI などの分析ツール側に Data Mart を作ってしまう方法もあるので、要件に合わせて選択することになると思います。

今回のソースコードは、こちらで公開しています。
github.com

Azure Synapse Analytics で Parquet から Dedicated SQL Pool にデータを取り込むパイプラインを作成する

Azure Synapse Analytics を使って、売上分析プラットフォームを作ってみました。
ここでは、売上分析プラットフォームの Batch Layer を作る部分について記載します。
想定シナリオや全体アーキテクチャについては、次の記事を参照してください。
gooner.hateblo.jp

Batch Layer

f:id:TonyTonyKun:20210105172203p:plain
Batch Layer は、DataLake から DWH を作る責務を持ちます。
Mapping Data Flow を使って、Azure Data Lake Storage Gen2(ADLS)の Parquet から Dedicated SQL Pool に売上データを取り込むパイプラインを作成します。
売上データを取り込む過程で、SQL Database の顧客マスターと突き合わせて、各店舗のローカルコードを標準コードに変換します。

Datasets を作成する

利用するリソースごとに、3つの Datasets を作成します。

Datasets for ADLS

ADLS に配置されている Parquet を参照するための Datasets を作成します。
Linked service には、Synapse Analytics の Default Storage(ADLS)を指定し、file path でコンテナ名の sales を入力します。

f:id:TonyTonyKun:20210104161354p:plain

sales コンテナの直下に店舗毎に日付別のフォルダ(e.g. sales/store-a/2021/01/01)を作成して Parquet ファイルを配置しています。
Datasets に引数を作りたかったのですが、Mapping Data Flow から値を引き渡すことができませんでした。そのため、ここではコンテナ名のみを入力して、Mapping Data Flow 側で file path を上書きするように構成します。

Datasets for SQL Database

SQL Database の顧客マスターを参照するための Datasets を作成します。
Linked service には SQL Database を指定し、Table で CustomerView を選択します。

f:id:TonyTonyKun:20210104161527p:plain

Datasets for Dedicated SQL Pool

Dedicated SQL Pool に売上データを取り込むための Datasets を作成します。
Linked service には Dedicated SQL Pool を指定し、DbName の SQLDW1 と Table の Sales を入力します。

f:id:TonyTonyKun:20210104162122p:plain

Linked service を指定する際の注意点として、"Azure Synapse Dedicated SQL Pool" ではなく、"Azure Synapse Analytics" を選択してください。Mapping Data Flow から Datasets を指定する際に、なぜか "Azure Synapse Dedicated SQL Pool" を選択できないためです。

Mapping Data Flow を作成する

ADLS の Parquet から Dedicated SQL Pool にデータを取り込む Mapping Data Flow の完成イメージです。

f:id:TonyTonyKun:20210104162356p:plain

Parameter

Mapping Data Flow の Parameter を設定します。
store は、パイプラインを全店舗で共通化して、Trigger を店舗毎に作る想定のため、店舗を指定します。
folderPath は、日次でフォルダ分けされた Parquet ファイルの場所を指定します。

f:id:TonyTonyKun:20210104162453p:plain

Source(ADLS 用)

ADLS に配置されている Parquet を参照したいので、先ほど作成した ADLS 用の Datasets を指定します。

f:id:TonyTonyKun:20210104162703p:plain

Wildcard paths には、$store + "/" + $folderPath を入力します。これにより、Parameter で受け取った値を使って動的に読み取りパスを決定することができます。

f:id:TonyTonyKun:20210104162813p:plain

Source(SQL Database 用)

SQL Database に配置されている顧客マスターを参照したいので、先ほど作成した SQL Database 用の Datasets を指定します。

f:id:TonyTonyKun:20210104162939p:plain

Filter

SQL Database の CustomerView は、全店舗の顧客マスターが返されるため、Parameter で受け取った store で Filter します。

f:id:TonyTonyKun:20210104163111p:plain

Join

2つの Source(Parquet の売上データと SQL Database の顧客マスター)を Customer Code をキーに Left Outer Join します。

f:id:TonyTonyKun:20210104163248p:plain

Select

2つの Source を Join した結果のうち、必要ない列を削除して、Dedicated SQL Pool に取り込む列名を設定します。

f:id:TonyTonyKun:20210104163424p:plain

Sink

Dedicated SQL Pool に売上データを取り込み取りたいので、先ほど作成した Dedicated SQL Pool 用の Datasets を指定します。

f:id:TonyTonyKun:20210104163706p:plain

以上で、Mapping Data Flow の作成が完了しました。

Pipeline を作成する

Mapping Data Flow を実行するパイプラインを作成します。
f:id:TonyTonyKun:20210106191256p:plain

Parameter

Mapping Data Flow と同様に、パイプライン の Parameter を設定します。

f:id:TonyTonyKun:20210104163945p:plain

Data Flow

先ほど作成した Mapping Data Flow を指定します。Staging の領域が必要となるので、Synapse Analytics の Default Storage(ADLS)に任意のコンテナを指定します。

f:id:TonyTonyKun:20210104164231p:plain

パイプラインの Parameter を Mapping Data Flow の Parameter に引き渡します。

f:id:TonyTonyKun:20210104164056p:plain

"store": @pipeline().parameters.store,
"folderPath": @{formatDateTime(pipeline().parameters.windowStart,'yyyy')}/@{formatDateTime(pipeline().parameters.windowStart,'MM')}/@{formatDateTime(pipeline().parameters.windowStart,'dd')}

Validation

Validation Activity を使って、ADLS の Parquet ファイルの存在をチェックします。
フォルダだけでなく、ファイルの存在もチェックしたいので、Child Items には TRUE を指定します。それ以外の Datasets と Parameter は、同様に設定します。

f:id:TonyTonyKun:20210106191117p:plain

以上で、パイプラインの作成が完了しました。

パイプラインを動作確認する

パイプラインの Trigger now から、Parameter を指定して実行してみます。

f:id:TonyTonyKun:20210104164416p:plain

パイプラインが成功すると、標準顧客コードをマッピングした売上データが Dedicated SQL Pool の Sales テーブルに取り込まれたことを確認できます。

f:id:TonyTonyKun:20210104164641p:plain

Trigger を作成する

最後に、日次実行するトリガーを作成して完了です。Tumbling window を 24 時間の間隔で実行します。

f:id:TonyTonyKun:20210104165040p:plain

トリガーの実行開始時刻を windowStart でパイプラインに引き渡します。トリガーを店舗ごとに作る想定なので、store を決め打ちしています。

f:id:TonyTonyKun:20210104165051p:plain

{
    "store": store-a,
    "windowStart": @trigger().outputs.windowStartTime
}

Schedule でも代用できますが、リトライポリシーを設定できる Tumbling window を選択しました。
docs.microsoft.com

まとめ

Azure Synapse Analytics を使って、売上分析プラットフォームの Batch Layer を作ってみました。
Mapping Data Flow のアクティビティを使いこなせるようになって、もう少し複雑なクレンジングも作れるようになりたいところです。

今回のソースコードは、こちらで公開しています。
github.com

Azure Synapse Analytics で XML を Parquet に変換するパイプラインを作成する

Azure Synapse Analytics を使って、売上分析プラットフォームを作ってみました。
ここでは、売上分析プラットフォームの Ingest Layer を作る部分について記載します。
想定シナリオや全体アーキテクチャについては、次の記事を参照してください。
gooner.hateblo.jp

Ingest Layer

f:id:TonyTonyKun:20210105172003p:plain
Ingest Layer は、収集したデータから DataLake を作る責務を持ちます。
Copy Data を使って、Blob Storage の XML を Parquet に変換して Azure Data Lake Storage Gen2(ADLS)にコピーするパイプラインを作成します。
techcommunity.microsoft.com
この記事によれば、Mapping Data Flow でも XML に対応しているようですが、うまく動かなかったので、今回は Copy Data を使うことにしました。

ディレクトリ構成

Blob Storage と ADLS のディレクトリは、日次でパイプラインを実行したいので日付で分ける構成とします。
この構成であれば、何らかの障害発生時に再試行しやすいですし、ファイル数が増えてもパフォーマンスに影響を与えにくいはずです。

f:id:TonyTonyKun:20210108140818p:plain

XML ファイルをアップロードする店舗ごとに認証認可(SASトークン)を設定したいので、Blob Storage のコンテナを店舗(store-a)にしています。
一方、ADSL は Datasets でコンテナを固定値にする必要があったので、データ種別(Sales)をコンテナにしています。

Datasets を作成する

利用するリソースごとに、3つの Datasets を作成します。

Dataset for Blob Storage

各店舗から Blob Storage にアップロードされた XML を参照するための Datasets を作成します。
store は、パイプラインを全店舗で共通化して、Trigger を店舗毎に作る想定のため、店舗を受け取ります。
folderPath は、日次でフォルダ分けされた XML の場所を受け取ります。

f:id:TonyTonyKun:20210102144840p:plain

Linked service には、Blob Storage を指定し、file path でパイプラインから受け取った値を使って動的に読み取りパスを決定します。

f:id:TonyTonyKun:20210105105922p:plain

@dataset().store / sales/@{dataset().folderPath}

Dataset for ADLS

ADLS に Parquet をコピーするための Datasets を作成します。
Blob Storage の Datasets と同様に、Parameter を設定します。

f:id:TonyTonyKun:20210102145400p:plain

Linked service には、Synapse Analytics の Default Storage(ADLS)を指定し、file path でパイプラインから受け取った値を使って動的にコピー先パスを決定します。

f:id:TonyTonyKun:20210105110112p:plain

sales / @{dataset().store}/@{dataset().folderPath}

Pipeline を作成する

Copy Data で XML を Parquet に変換するパイプラインの完成イメージです。
f:id:TonyTonyKun:20210106190053p:plain

Parameter

Datasets と同様に、パイプライン の Parameter を設定します。

f:id:TonyTonyKun:20210102145604p:plain

Source

Blob Storage に配置されている XML を参照したいので、先ほど作成した Blob Storage 用の Datasets を指定します。
パイプラインの Parameter を Datasets の Parameter に引き渡します。

f:id:TonyTonyKun:20210102145855p:plain

"store": @pipeline().parameters.store,
"folderPath": @{formatDateTime(pipeline().parameters.windowStart,'yyyy')}/@{formatDateTime(pipeline().parameters.windowStart,'MM')}/@{formatDateTime(pipeline().parameters.windowStart,'dd')}

file path type では Wildcard file path を選択し、*.xml を指定します。

Sink

ADLS に Parquet に変換したファイルをコピーしたいので、先ほど作成した ADLS 用の Datasets を指定します。
Blob Storage の Datasets と同様に、パイプライン の Parameter を引き渡します。

f:id:TonyTonyKun:20210102150024p:plain

Mapping

import schema ボタンを押すと、XML からスキーマが読み込まれるので、Parquet に変換する際のデータ型を指定します。
XML スキーマが単一レコードの場合はこれだけで OK ですが、今回は1つの XML ファイルに複数レコードがあるので、Collection reference にもチェックを入れます。

f:id:TonyTonyKun:20210102150205p:plain

Validation

Validation Activity を使って、Blob Storage の XML ファイルの存在をチェックします。
フォルダだけでなく、ファイルの存在もチェックしたいので、Child Items には TRUE を指定します。それ以外の Datasets と Parameter は、同様に設定します。

f:id:TonyTonyKun:20210106190345p:plain

パイプラインを動作確認する

パイプラインの Trigger now から、Parameter を指定して実行してみます。

f:id:TonyTonyKun:20210105110605p:plain

パイプラインが成功すると、XML を Parquet に変換した売上データが ADLS にコピーされたことを確認できます。

f:id:TonyTonyKun:20210105110759p:plain

Trigger を作成する

最後に、日次実行するトリガーを作成して完了です。Tumbling window を 24 時間の間隔で実行します。

f:id:TonyTonyKun:20210102180049p:plain

トリガーの実行開始時刻を windowStart でパイプラインに引き渡します。トリガーを店舗ごとに作る想定なので、store を決め打ちしています。

f:id:TonyTonyKun:20210105110932p:plain
Schedule でも代用できますが、リトライポリシーを設定できる Tumbling window を選択しました。
docs.microsoft.com

まとめ

Azure Synapse Analytics を使って、売上分析プラットフォームの Ingest Layer を作ってみました。
Copy Data が XML に対応したので、解析するパーサーを作りこまなくても、ある程度の解析は可能です。
コピー元ファイルの存在チェックに Validation Activity を使いましたが、パイプライン自体はエラー終了と判断されます。正常終了として扱いたいケースもあるので、そのあたりはもう少し工夫が必要かと思います。

今回のソースコードは、こちらで公開しています。
github.com

Appendix

XML スキーマがもう少し複雑になり、record 要素に kind 属性が追加され、売上データの本体価格(amount)と消費税(tax)を分けるケースを見てみましょう。

<?xml version='1.0' encoding='UTF-8'?>
<dataset>
    <record kind = "amount">
        <id>1</id>
        <order_date>2021-01-01T00:00:00Z</order_date>
        <customer_code>A0001</customer_code>
        <product_code>KE-774</product_code>
        <unit_price>500.00</unit_price>
        <quantity>2</quantity>
    </record>
    <record kind = "tax">
        <id>2</id>
        <order_date>2021-01-01T00:00:00Z</order_date>
        <customer_code>A0001</customer_code>
        <product_code>KE-774</product_code>
        <consumption_tax>100.00</consumption_tax>
    </record>
</dataset>

このケースでは、Advanced editor に切り替えての対応が必要となります。
kind 属性は @kind という書式で読み込まれます。さらに、消費税の場合には consumption_tax 要素が必要なので、手動で追加します。

f:id:TonyTonyKun:20210102173547p:plain

このようにスキーマを定義すると、kind 属性ごとのレコードが Parquet に変換されたことを確認できます。

f:id:TonyTonyKun:20210102173557p:plain