ROMANCE DAWN for the new world

Microsoft Azure を中心とした技術情報を書いています。

Azure Synapse Analytics で Parquet から Dedicated SQL Pool にデータを取り込むパイプラインを作成する

Azure Synapse Analytics を使って、売上分析プラットフォームを作ってみました。
ここでは、売上分析プラットフォームの Batch Layer を作る部分について記載します。
想定シナリオや全体アーキテクチャについては、次の記事を参照してください。
gooner.hateblo.jp

Batch Layer

f:id:TonyTonyKun:20210105172203p:plain
Batch Layer は、DataLake から DWH を作る責務を持ちます。
Mapping Data Flow を使って、Azure Data Lake Storage Gen2(ADLS)の Parquet から Dedicated SQL Pool に売上データを取り込むパイプラインを作成します。
売上データを取り込む過程で、SQL Database の顧客マスターと突き合わせて、各店舗のローカルコードを標準コードに変換します。

Datasets を作成する

利用するリソースごとに、3つの Datasets を作成します。

Datasets for ADLS

ADLS に配置されている Parquet を参照するための Datasets を作成します。
Linked service には、Synapse Analytics の Default Storage(ADLS)を指定し、file path でコンテナ名の sales を入力します。

f:id:TonyTonyKun:20210104161354p:plain

sales コンテナの直下に店舗毎に日付別のフォルダ(e.g. sales/store-a/2021/01/01)を作成して Parquet ファイルを配置しています。
Datasets に引数を作りたかったのですが、Mapping Data Flow から値を引き渡すことができませんでした。そのため、ここではコンテナ名のみを入力して、Mapping Data Flow 側で file path を上書きするように構成します。

Datasets for SQL Database

SQL Database の顧客マスターを参照するための Datasets を作成します。
Linked service には SQL Database を指定し、Table で CustomerView を選択します。

f:id:TonyTonyKun:20210104161527p:plain

Datasets for Dedicated SQL Pool

Dedicated SQL Pool に売上データを取り込むための Datasets を作成します。
Linked service には Dedicated SQL Pool を指定し、DbName の SQLDW1 と Table の Sales を入力します。

f:id:TonyTonyKun:20210104162122p:plain

Linked service を指定する際の注意点として、"Azure Synapse Dedicated SQL Pool" ではなく、"Azure Synapse Analytics" を選択してください。Mapping Data Flow から Datasets を指定する際に、なぜか "Azure Synapse Dedicated SQL Pool" を選択できないためです。

Mapping Data Flow を作成する

ADLS の Parquet から Dedicated SQL Pool にデータを取り込む Mapping Data Flow の完成イメージです。

f:id:TonyTonyKun:20210104162356p:plain

Parameter

Mapping Data Flow の Parameter を設定します。
store は、パイプラインを全店舗で共通化して、Trigger を店舗毎に作る想定のため、店舗を指定します。
folderPath は、日次でフォルダ分けされた Parquet ファイルの場所を指定します。

f:id:TonyTonyKun:20210104162453p:plain

Source(ADLS 用)

ADLS に配置されている Parquet を参照したいので、先ほど作成した ADLS 用の Datasets を指定します。

f:id:TonyTonyKun:20210104162703p:plain

Wildcard paths には、$store + "/" + $folderPath を入力します。これにより、Parameter で受け取った値を使って動的に読み取りパスを決定することができます。

f:id:TonyTonyKun:20210104162813p:plain

Source(SQL Database 用)

SQL Database に配置されている顧客マスターを参照したいので、先ほど作成した SQL Database 用の Datasets を指定します。

f:id:TonyTonyKun:20210104162939p:plain

Filter

SQL Database の CustomerView は、全店舗の顧客マスターが返されるため、Parameter で受け取った store で Filter します。

f:id:TonyTonyKun:20210104163111p:plain

Join

2つの Source(Parquet の売上データと SQL Database の顧客マスター)を Customer Code をキーに Left Outer Join します。

f:id:TonyTonyKun:20210104163248p:plain

Select

2つの Source を Join した結果のうち、必要ない列を削除して、Dedicated SQL Pool に取り込む列名を設定します。

f:id:TonyTonyKun:20210104163424p:plain

Sink

Dedicated SQL Pool に売上データを取り込み取りたいので、先ほど作成した Dedicated SQL Pool 用の Datasets を指定します。

f:id:TonyTonyKun:20210104163706p:plain

以上で、Mapping Data Flow の作成が完了しました。

Pipeline を作成する

Mapping Data Flow を実行するパイプラインを作成します。
f:id:TonyTonyKun:20210106191256p:plain

Parameter

Mapping Data Flow と同様に、パイプライン の Parameter を設定します。

f:id:TonyTonyKun:20210104163945p:plain

Data Flow

先ほど作成した Mapping Data Flow を指定します。Staging の領域が必要となるので、Synapse Analytics の Default Storage(ADLS)に任意のコンテナを指定します。

f:id:TonyTonyKun:20210104164231p:plain

パイプラインの Parameter を Mapping Data Flow の Parameter に引き渡します。

f:id:TonyTonyKun:20210104164056p:plain

"store": @pipeline().parameters.store,
"folderPath": @{formatDateTime(pipeline().parameters.windowStart,'yyyy')}/@{formatDateTime(pipeline().parameters.windowStart,'MM')}/@{formatDateTime(pipeline().parameters.windowStart,'dd')}

Validation

Validation Activity を使って、ADLS の Parquet ファイルの存在をチェックします。
フォルダだけでなく、ファイルの存在もチェックしたいので、Child Items には TRUE を指定します。それ以外の Datasets と Parameter は、同様に設定します。

f:id:TonyTonyKun:20210106191117p:plain

以上で、パイプラインの作成が完了しました。

パイプラインを動作確認する

パイプラインの Trigger now から、Parameter を指定して実行してみます。

f:id:TonyTonyKun:20210104164416p:plain

パイプラインが成功すると、標準顧客コードをマッピングした売上データが Dedicated SQL Pool の Sales テーブルに取り込まれたことを確認できます。

f:id:TonyTonyKun:20210104164641p:plain

Trigger を作成する

最後に、日次実行するトリガーを作成して完了です。Tumbling window を 24 時間の間隔で実行します。

f:id:TonyTonyKun:20210104165040p:plain

トリガーの実行開始時刻を windowStart でパイプラインに引き渡します。トリガーを店舗ごとに作る想定なので、store を決め打ちしています。

f:id:TonyTonyKun:20210104165051p:plain

{
    "store": store-a,
    "windowStart": @trigger().outputs.windowStartTime
}

Schedule でも代用できますが、リトライポリシーを設定できる Tumbling window を選択しました。
docs.microsoft.com

まとめ

Azure Synapse Analytics を使って、売上分析プラットフォームの Batch Layer を作ってみました。
Mapping Data Flow のアクティビティを使いこなせるようになって、もう少し複雑なクレンジングも作れるようになりたいところです。

今回のソースコードは、こちらで公開しています。
github.com

Azure Synapse Analytics で XML を Parquet に変換するパイプラインを作成する

Azure Synapse Analytics を使って、売上分析プラットフォームを作ってみました。
ここでは、売上分析プラットフォームの Ingest Layer を作る部分について記載します。
想定シナリオや全体アーキテクチャについては、次の記事を参照してください。
gooner.hateblo.jp

Ingest Layer

f:id:TonyTonyKun:20210105172003p:plain
Ingest Layer は、収集したデータから DataLake を作る責務を持ちます。
Copy Data を使って、Blob Storage の XML を Parquet に変換して Azure Data Lake Storage Gen2(ADLS)にコピーするパイプラインを作成します。
techcommunity.microsoft.com
この記事によれば、Mapping Data Flow でも XML に対応しているようですが、うまく動かなかったので、今回は Copy Data を使うことにしました。

ディレクトリ構成

Blob Storage と ADLS のディレクトリは、日次でパイプラインを実行したいので日付で分ける構成とします。
この構成であれば、何らかの障害発生時に再試行しやすいですし、ファイル数が増えてもパフォーマンスに影響を与えにくいはずです。

f:id:TonyTonyKun:20210108140818p:plain

XML ファイルをアップロードする店舗ごとに認証認可(SASトークン)を設定したいので、Blob Storage のコンテナを店舗(store-a)にしています。
一方、ADSL は Datasets でコンテナを固定値にする必要があったので、データ種別(Sales)をコンテナにしています。

Datasets を作成する

利用するリソースごとに、3つの Datasets を作成します。

Dataset for Blob Storage

各店舗から Blob Storage にアップロードされた XML を参照するための Datasets を作成します。
store は、パイプラインを全店舗で共通化して、Trigger を店舗毎に作る想定のため、店舗を受け取ります。
folderPath は、日次でフォルダ分けされた XML の場所を受け取ります。

f:id:TonyTonyKun:20210102144840p:plain

Linked service には、Blob Storage を指定し、file path でパイプラインから受け取った値を使って動的に読み取りパスを決定します。

f:id:TonyTonyKun:20210105105922p:plain

@dataset().store / sales/@{dataset().folderPath}

Dataset for ADLS

ADLS に Parquet をコピーするための Datasets を作成します。
Blob Storage の Datasets と同様に、Parameter を設定します。

f:id:TonyTonyKun:20210102145400p:plain

Linked service には、Synapse Analytics の Default Storage(ADLS)を指定し、file path でパイプラインから受け取った値を使って動的にコピー先パスを決定します。

f:id:TonyTonyKun:20210105110112p:plain

sales / @{dataset().store}/@{dataset().folderPath}

Pipeline を作成する

Copy Data で XML を Parquet に変換するパイプラインの完成イメージです。
f:id:TonyTonyKun:20210106190053p:plain

Parameter

Datasets と同様に、パイプライン の Parameter を設定します。

f:id:TonyTonyKun:20210102145604p:plain

Source

Blob Storage に配置されている XML を参照したいので、先ほど作成した Blob Storage 用の Datasets を指定します。
パイプラインの Parameter を Datasets の Parameter に引き渡します。

f:id:TonyTonyKun:20210102145855p:plain

"store": @pipeline().parameters.store,
"folderPath": @{formatDateTime(pipeline().parameters.windowStart,'yyyy')}/@{formatDateTime(pipeline().parameters.windowStart,'MM')}/@{formatDateTime(pipeline().parameters.windowStart,'dd')}

file path type では Wildcard file path を選択し、*.xml を指定します。

Sink

ADLS に Parquet に変換したファイルをコピーしたいので、先ほど作成した ADLS 用の Datasets を指定します。
Blob Storage の Datasets と同様に、パイプライン の Parameter を引き渡します。

f:id:TonyTonyKun:20210102150024p:plain

Mapping

import schema ボタンを押すと、XML からスキーマが読み込まれるので、Parquet に変換する際のデータ型を指定します。
XML スキーマが単一レコードの場合はこれだけで OK ですが、今回は1つの XML ファイルに複数レコードがあるので、Collection reference にもチェックを入れます。

f:id:TonyTonyKun:20210102150205p:plain

Validation

Validation Activity を使って、Blob Storage の XML ファイルの存在をチェックします。
フォルダだけでなく、ファイルの存在もチェックしたいので、Child Items には TRUE を指定します。それ以外の Datasets と Parameter は、同様に設定します。

f:id:TonyTonyKun:20210106190345p:plain

パイプラインを動作確認する

パイプラインの Trigger now から、Parameter を指定して実行してみます。

f:id:TonyTonyKun:20210105110605p:plain

パイプラインが成功すると、XML を Parquet に変換した売上データが ADLS にコピーされたことを確認できます。

f:id:TonyTonyKun:20210105110759p:plain

Trigger を作成する

最後に、日次実行するトリガーを作成して完了です。Tumbling window を 24 時間の間隔で実行します。

f:id:TonyTonyKun:20210102180049p:plain

トリガーの実行開始時刻を windowStart でパイプラインに引き渡します。トリガーを店舗ごとに作る想定なので、store を決め打ちしています。

f:id:TonyTonyKun:20210105110932p:plain
Schedule でも代用できますが、リトライポリシーを設定できる Tumbling window を選択しました。
docs.microsoft.com

まとめ

Azure Synapse Analytics を使って、売上分析プラットフォームの Ingest Layer を作ってみました。
Copy Data が XML に対応したので、解析するパーサーを作りこまなくても、ある程度の解析は可能です。
コピー元ファイルの存在チェックに Validation Activity を使いましたが、パイプライン自体はエラー終了と判断されます。正常終了として扱いたいケースもあるので、そのあたりはもう少し工夫が必要かと思います。

今回のソースコードは、こちらで公開しています。
github.com

Appendix

XML スキーマがもう少し複雑になり、record 要素に kind 属性が追加され、売上データの本体価格(amount)と消費税(tax)を分けるケースを見てみましょう。

<?xml version='1.0' encoding='UTF-8'?>
<dataset>
    <record kind = "amount">
        <id>1</id>
        <order_date>2021-01-01T00:00:00Z</order_date>
        <customer_code>A0001</customer_code>
        <product_code>KE-774</product_code>
        <unit_price>500.00</unit_price>
        <quantity>2</quantity>
    </record>
    <record kind = "tax">
        <id>2</id>
        <order_date>2021-01-01T00:00:00Z</order_date>
        <customer_code>A0001</customer_code>
        <product_code>KE-774</product_code>
        <consumption_tax>100.00</consumption_tax>
    </record>
</dataset>

このケースでは、Advanced editor に切り替えての対応が必要となります。
kind 属性は @kind という書式で読み込まれます。さらに、消費税の場合には consumption_tax 要素が必要なので、手動で追加します。

f:id:TonyTonyKun:20210102173547p:plain

このようにスキーマを定義すると、kind 属性ごとのレコードが Parquet に変換されたことを確認できます。

f:id:TonyTonyKun:20210102173557p:plain

2020 年振り返り

今年も、しばやんさんが作った 2020 年の人気記事ランキング生成 を使わせてもらい、1年を振り返ってみます。

  1. HttpClient を使って同期で通信する
  2. ASP.NET Web API で multipart / form-data を使ってファイルをアップロードする
  3. ASP.NET でクライアントの IP アドレスを取得する
  4. プロキシ環境下で Web API を呼び出す
  5. ASP.NET MVC の Ajax 通信で例外を処理する
  6. ASP.NET MVC で JSON の一部として PartialView を返す方法
  7. ASP.NET Web API で返す JSON のプロパティを指定する
  8. Azure Application Gateway V2 に App Service Certificate を使って HTTPS を構成する
  9. ASP.NET Web API を経由して Azure Blob Storage にアクセスする
  10. ファイルをアップロードする API の Swagger ドキュメントを書く

例年通り、ASP.NET 関連の記事が多く読まれているなかで、Azure の記事が1つだけラインクインしました。今年はブログを書く本数を少し増やしているので、来年のランキングには反映されてほしいところです。

イベント登壇

イベント登壇は、まだ新型コロナウイルスが流行していなかった1月の Ignite The Tour Osaka だけでした。会場は、東京で言うとビックサイトみたいな雰囲気でとにかく広く、参加者の時間と気持ちの余裕に繋がって、満足度の高いイベントでした。
gooner.hateblo.jp
登壇ではないですが、7月の de:code 2020 には Microsoft MVP パーソナルスポンサーとして参加し、Azure Pipelines でデプロイする .NET Core 向けの YAML ファイルのサンプルを公開しました。
初めてのオンライン開催で運営側にいくつか課題があったので、来年は改善されることを期待したいです。
gooner.hateblo.jp

コミュニティ運営

今年は JAZUG 10 周年だったので、例年よりも大きな規模で楽しいイベントをあれこれ企画していましたが、オンライン開催となりました。
Azure に興味をもって JAZUG に参加・運営するようになって人生が変わった人間のひとりなので、10 年という区切りは感慨深いです。
JAZUG に関わるたくさんの方々から祝辞を頂くことができました。いろいろと落ち着いたら、オフラインで会いたいですね。
jazug.connpass.com
正直なところ、コロナ禍でコミュニティへのモチベーションが下がっていたので、来年はオンラインの登壇などにもチャレンジして楽しんでいけたらと思います。

Microsoft MVP

3 月には、3 回目の Microsoft MVP Global Summit に参加しました。例年はシアトルの Microsoft 本社に行って参加していましたが、オンライン開催となりました。深夜に連日のセッション視聴は家族にも迷惑がかかりますし、すでに仕事がリモートワークになっていたので、箱根のホテルに 5 日間ほど滞在しながら参加することにしました。

f:id:TonyTonyKun:20201229211442j:plain
箱根湯本駅前の須雲川

孤独のグルメに登場するお店の宮ノ下のいろり家にも行きました。台風19号の影響で箱根登山鉄道が運休していたので、バスで移動しました。
ステーキ丼の美味しさは、旨さの箱根越えです。

f:id:TonyTonyKun:20201229211509j:plain
いろり家のステーキ丼

食レポみたいになってしまいましたが、温泉駆動のグローバルサミットもなかなか良かったです。
今年も Microsoft MVP for Microsoft Azure を無事に更新できたので、来年のオンライン開催もどこかの温泉から参加したい。次は一緒に行く人、誰か誘ってみようかなー

仕事

4 年間お世話になったアークウェイを退職し、ネクストスケープに転職しました。
これまでのプログラマ18年と技術コンサル4年で培った経験の集大成として、未来志向なお客さんのためにチームでモノを作れる仕事ができるように、動きやすい組織を作ったり、若手を育成したり、楽しくやれるといいなと思います。

コロナ禍での転職となりましたが、皆が暖かく迎えてくれましたし、一緒に働くことを楽しみしてもらえて嬉しかったです。ちょっと期待が高すぎるプレッシャーはありますが。そんな仲間たちとアドベントカレンダーを書いて、最後までバトンが繋げることができました。
qiita.com

退職の際にまとまった休みが取れたので、京都に行ってきました。こんなに人の少ない京都は初めてで、ゆったりと観光できて快適でした。
gooner.hateblo.jp

まとめ

新型コロナウイルスの影響で、これまでの生活スタイルが大きく変わりましたが、それなりに充実した1年でした。
早く安心して飲みに行ったり、旅行に行ったりできるようになってほしいですね。
本年もお世話になりました。よい年をお迎えください。

おまけ

2020 年自分が選ぶ今年の4枚。

f:id:TonyTonyKun:20200921192535j:plain
龍安寺の石庭
f:id:TonyTonyKun:20200921184341j:plain
渡月橋
f:id:TonyTonyKun:20200223110219j:plain
通天閣
f:id:TonyTonyKun:20201231100441j:plain
昇仙峡