クラウドデザインパターンを読んでいて、Pipes and Filters や Priority Queue で Azure Service Bus のキューを使用する際に、新しいプログラミング モデルが使われていることに気が付きました。イベント駆動型メッセージ プログラミング モデルと呼ばれており、Azure SDK 2.0(2013/04)で機能追加されていたようです。メッセージを受信するためのループを作るのではなく、プッシュ方式でメッセージを受信できるようになりました。
サービスバスの作成
事前準備として、Azure 管理ポータルでサービスバスのキューを作成しておきます。サービスバスの名前空間とキューの名前を任意で指定し、それ以外はデフォルトのままで構いません。キューは、コードでも作成できますが、今回は管理ポータルで作成することにします。
プロジェクトの作成
Visual Studio 2013 で、Cloud Services のプロジェクトを作成します。送信用の Web Role と受信用の Worker Role を追加します。ASP.NET Web API でキューを送信するので、テンプレートには「Web API」を選択します。それぞれのプロジェクトに、ServiceBus SDK の NuGet パッケージをインストールします。
- Install-Package WindowsAzure.ServiceBus
サービスバスの接続情報を設定
Web Role と Worker Role のプロジェクトに、サービスバスの接続情報を設定します。
- ServiceBusConnectionString・・・管理ポータルから取得できる接続文字列
- QueueName・・・キューの名前
受信用の Worker Role
受信ループを作るとか、数秒待機するとか、Complete や Abandon を操作するとか面倒な実装が必要ないだけでなく、同時に複数のメッセージを処理できるようもなりました。
#WorkerRole.cs public class WorkerRole : RoleEntryPoint { private ManualResetEvent completedEvent = new ManualResetEvent(false); private QueueClient client; public override void Run() { var options = new OnMessageOptions(); options.AutoComplete = true; // 自分でCompleteを呼ばない options.MaxConcurrentCalls = 10; // 同時に処理するメッセージ数 options.ExceptionReceived += options_ExceptionReceived; this.client.OnMessageAsync( async (msg) => { // 繰り返し処理しても完了できなかったら、メッセージを削除 if (msg.DeliveryCount > 10) { await msg.DeadLetterAsync(); Trace.TraceWarning("Maximum Message Count Exceeded: {0} for MessageID: {1} ", 10, msg.MessageId); return; } // メッセージを受信して、何かしらの処理を実行 await Task.Delay(TimeSpan.FromSeconds(2)); Trace.TraceInformation("Recived Message ID : {0} Body : {1}", msg.MessageId, msg.GetBody<string>()); }, options); this.completedEvent.WaitOne(); } void options_ExceptionReceived(object sender, ExceptionReceivedEventArgs e) { var exceptionMessage = "null"; if (e != null && e.Exception != null) { exceptionMessage = e.Exception.Message; Trace.TraceError("Exception in QueueClient.ExceptionReceived: {0}", exceptionMessage); } } public override bool OnStart() { ServicePointManager.DefaultConnectionLimit = 12; var connectionString = CloudConfigurationManager.GetSetting("ServiceBusConnectionString"); var queueName = CloudConfigurationManager.GetSetting("QueueName"); this.client = QueueClient.CreateFromConnectionString(connectionString, queueName); return base.OnStart(); } public override void OnStop() { this.client.Close(); this.completedEvent.Set(); base.OnStop(); } }
QueueClient クラスの OnMessageAsync() メソッドの引数コールバックが、キューのメッセージを受信されたときに実行されます。ManualResetEvent クラスでメインスレッドを待機させてるところがポイントです。
送信用のWeb Role
ASP.NET Web API の Controller クラスを作り、Get メソッドに送信ロジックを実装します。ここは、従来のプログラミング モデルと一緒です。
#SendController.cs public class SendController : ApiController { private QueueClient client; public SendController() { var connectionString = CloudConfigurationManager.GetSetting("ServiceBusConnectionString"); var queueName = CloudConfigurationManager.GetSetting("QueueName"); this.client = QueueClient.CreateFromConnectionString(connectionString, queueName); } public async Task<string> Get() { var message = new BrokeredMessage("Gooner"); message.MessageId = Guid.NewGuid().ToString(); await this.client.SendAsync(message); return "OK"; } protected override void Dispose(bool disposing) { if (disposing) { this.client.Close(); } base.Dispose(disposing); } }
まとめ
受信ループとか作らないスッキリとしたコードで、プッシュ方式によるメッセージを受信できるのはいい感じです。Topics / Subscription の場合は、QueueClient を TopicsClient と SubscriptionClient に置き換えるぐらいで、大きな違いはありません。クラウドデザインパターンの Pipes and Filters や Priority Queue では、ServiceBusPipeFilter や PriorityWorkerRole クラスを作ってロジックを共通化しています。
Azure の Queue は、Storage にもありますが、Service Bus の方が便利な機能が多いので、Web Role と Worker Role をシンプルに繋ぐだけというシナリオを除いては、Service Bus を使ったほうが良さそうです。両者の機能比較は、こちらのサイトにまとまっています。