ROMANCE DAWN for the new world

Microsoft Azure を中心とした技術情報を書いています。

Azure Synapse Analytics で XML を Parquet に変換するパイプラインを作成する

Azure Synapse Analytics を使って、売上分析プラットフォームを作ってみました。
ここでは、売上分析プラットフォームの Ingest Layer を作る部分について記載します。
想定シナリオや全体アーキテクチャについては、次の記事を参照してください。
gooner.hateblo.jp

Ingest Layer

f:id:TonyTonyKun:20210105172003p:plain
Ingest Layer は、収集したデータから DataLake を作る責務を持ちます。
Copy Data を使って、Blob Storage の XML を Parquet に変換して Azure Data Lake Storage Gen2(ADLS)にコピーするパイプラインを作成します。
techcommunity.microsoft.com
この記事によれば、Mapping Data Flow でも XML に対応しているようですが、うまく動かなかったので、今回は Copy Data を使うことにしました。

ディレクトリ構成

Blob Storage と ADLS のディレクトリは、日次でパイプラインを実行したいので日付で分ける構成とします。
この構成であれば、何らかの障害発生時に再試行しやすいですし、ファイル数が増えてもパフォーマンスに影響を与えにくいはずです。

f:id:TonyTonyKun:20210108140818p:plain

XML ファイルをアップロードする店舗ごとに認証認可(SASトークン)を設定したいので、Blob Storage のコンテナを店舗(store-a)にしています。
一方、ADSL は Datasets でコンテナを固定値にする必要があったので、データ種別(Sales)をコンテナにしています。

Datasets を作成する

利用するリソースごとに、3つの Datasets を作成します。

Dataset for Blob Storage

各店舗から Blob Storage にアップロードされた XML を参照するための Datasets を作成します。
store は、パイプラインを全店舗で共通化して、Trigger を店舗毎に作る想定のため、店舗を受け取ります。
folderPath は、日次でフォルダ分けされた XML の場所を受け取ります。

f:id:TonyTonyKun:20210102144840p:plain

Linked service には、Blob Storage を指定し、file path でパイプラインから受け取った値を使って動的に読み取りパスを決定します。

f:id:TonyTonyKun:20210105105922p:plain

@dataset().store / sales/@{dataset().folderPath}

Dataset for ADLS

ADLS に Parquet をコピーするための Datasets を作成します。
Blob Storage の Datasets と同様に、Parameter を設定します。

f:id:TonyTonyKun:20210102145400p:plain

Linked service には、Synapse Analytics の Default Storage(ADLS)を指定し、file path でパイプラインから受け取った値を使って動的にコピー先パスを決定します。

f:id:TonyTonyKun:20210105110112p:plain

sales / @{dataset().store}/@{dataset().folderPath}

Pipeline を作成する

Copy Data で XML を Parquet に変換するパイプラインの完成イメージです。
f:id:TonyTonyKun:20210106190053p:plain

Parameter

Datasets と同様に、パイプライン の Parameter を設定します。

f:id:TonyTonyKun:20210102145604p:plain

Source

Blob Storage に配置されている XML を参照したいので、先ほど作成した Blob Storage 用の Datasets を指定します。
パイプラインの Parameter を Datasets の Parameter に引き渡します。

f:id:TonyTonyKun:20210102145855p:plain

"store": @pipeline().parameters.store,
"folderPath": @{formatDateTime(pipeline().parameters.windowStart,'yyyy')}/@{formatDateTime(pipeline().parameters.windowStart,'MM')}/@{formatDateTime(pipeline().parameters.windowStart,'dd')}

file path type では Wildcard file path を選択し、*.xml を指定します。

Sink

ADLS に Parquet に変換したファイルをコピーしたいので、先ほど作成した ADLS 用の Datasets を指定します。
Blob Storage の Datasets と同様に、パイプライン の Parameter を引き渡します。

f:id:TonyTonyKun:20210102150024p:plain

Mapping

import schema ボタンを押すと、XML からスキーマが読み込まれるので、Parquet に変換する際のデータ型を指定します。
XML スキーマが単一レコードの場合はこれだけで OK ですが、今回は1つの XML ファイルに複数レコードがあるので、Collection reference にもチェックを入れます。

f:id:TonyTonyKun:20210102150205p:plain

Validation

Validation Activity を使って、Blob Storage の XML ファイルの存在をチェックします。
フォルダだけでなく、ファイルの存在もチェックしたいので、Child Items には TRUE を指定します。それ以外の Datasets と Parameter は、同様に設定します。

f:id:TonyTonyKun:20210106190345p:plain

パイプラインを動作確認する

パイプラインの Trigger now から、Parameter を指定して実行してみます。

f:id:TonyTonyKun:20210105110605p:plain

パイプラインが成功すると、XML を Parquet に変換した売上データが ADLS にコピーされたことを確認できます。

f:id:TonyTonyKun:20210105110759p:plain

Trigger を作成する

最後に、日次実行するトリガーを作成して完了です。Tumbling window を 24 時間の間隔で実行します。

f:id:TonyTonyKun:20210102180049p:plain

トリガーの実行開始時刻を windowStart でパイプラインに引き渡します。トリガーを店舗ごとに作る想定なので、store を決め打ちしています。

f:id:TonyTonyKun:20210105110932p:plain
Schedule でも代用できますが、リトライポリシーを設定できる Tumbling window を選択しました。
docs.microsoft.com

まとめ

Azure Synapse Analytics を使って、売上分析プラットフォームの Ingest Layer を作ってみました。
Copy Data が XML に対応したので、解析するパーサーを作りこまなくても、ある程度の解析は可能です。
コピー元ファイルの存在チェックに Validation Activity を使いましたが、パイプライン自体はエラー終了と判断されます。正常終了として扱いたいケースもあるので、そのあたりはもう少し工夫が必要かと思います。

今回のソースコードは、こちらで公開しています。
github.com

Appendix

XML スキーマがもう少し複雑になり、record 要素に kind 属性が追加され、売上データの本体価格(amount)と消費税(tax)を分けるケースを見てみましょう。

<?xml version='1.0' encoding='UTF-8'?>
<dataset>
    <record kind = "amount">
        <id>1</id>
        <order_date>2021-01-01T00:00:00Z</order_date>
        <customer_code>A0001</customer_code>
        <product_code>KE-774</product_code>
        <unit_price>500.00</unit_price>
        <quantity>2</quantity>
    </record>
    <record kind = "tax">
        <id>2</id>
        <order_date>2021-01-01T00:00:00Z</order_date>
        <customer_code>A0001</customer_code>
        <product_code>KE-774</product_code>
        <consumption_tax>100.00</consumption_tax>
    </record>
</dataset>

このケースでは、Advanced editor に切り替えての対応が必要となります。
kind 属性は @kind という書式で読み込まれます。さらに、消費税の場合には consumption_tax 要素が必要なので、手動で追加します。

f:id:TonyTonyKun:20210102173547p:plain

このようにスキーマを定義すると、kind 属性ごとのレコードが Parquet に変換されたことを確認できます。

f:id:TonyTonyKun:20210102173557p:plain