ROMANCE DAWN for the new world

Microsoft Azure を中心とした技術情報を書いています。

Azure Service Bus のイベント駆動型メッセージ プログラミング モデル

クラウドデザインパターンを読んでいて、Pipes and Filters や Priority Queue で Azure Service Bus のキューを使用する際に、新しいプログラミング モデルが使われていることに気が付きました。イベント駆動型メッセージ プログラミング モデルと呼ばれており、Azure SDK 2.0(2013/04)で機能追加されていたようです。メッセージを受信するためのループを作るのではなく、プッシュ方式でメッセージを受信できるようになりました。

サービスバスの作成

事前準備として、Azure 管理ポータルでサービスバスのキューを作成しておきます。サービスバスの名前空間とキューの名前を任意で指定し、それ以外はデフォルトのままで構いません。キューは、コードでも作成できますが、今回は管理ポータルで作成することにします。

プロジェクトの作成

Visual Studio 2013 で、Cloud Services のプロジェクトを作成します。送信用の Web Role と受信用の Worker Role を追加します。ASP.NET Web API でキューを送信するので、テンプレートには「Web API」を選択します。それぞれのプロジェクトに、ServiceBus SDK の NuGet パッケージをインストールします。

  • Install-Package WindowsAzure.ServiceBus

サービスバスの接続情報を設定

Web Role と Worker Role のプロジェクトに、サービスバスの接続情報を設定します。

  • ServiceBusConnectionString・・・管理ポータルから取得できる接続文字列
  • QueueName・・・キューの名前

queue

受信用の Worker Role

受信ループを作るとか、数秒待機するとか、Complete や Abandon を操作するとか面倒な実装が必要ないだけでなく、同時に複数のメッセージを処理できるようもなりました。

#WorkerRole.cs
public class WorkerRole : RoleEntryPoint
{
    private ManualResetEvent completedEvent = new ManualResetEvent(false);
    private QueueClient client;
 
    public override void Run()
    {
        var options = new OnMessageOptions();
        options.AutoComplete = true;        // 自分でCompleteを呼ばない
        options.MaxConcurrentCalls = 10;    // 同時に処理するメッセージ数
        options.ExceptionReceived += options_ExceptionReceived;
 
        this.client.OnMessageAsync(
             async (msg) =>
             {
                 // 繰り返し処理しても完了できなかったら、メッセージを削除
                 if (msg.DeliveryCount > 10)
                 {
                     await msg.DeadLetterAsync();
                     Trace.TraceWarning("Maximum Message Count Exceeded: {0} for MessageID: {1} ", 10, msg.MessageId);
                     return;
                 }
 
                // メッセージを受信して、何かしらの処理を実行
                await Task.Delay(TimeSpan.FromSeconds(2));
                Trace.TraceInformation("Recived Message ID : {0} Body : {1}", msg.MessageId, msg.GetBody<string>());
             },
             options);
 
        this.completedEvent.WaitOne();
    }
 
    void options_ExceptionReceived(object sender, ExceptionReceivedEventArgs e)
    {
        var exceptionMessage = "null";
        if (e != null && e.Exception != null)
        {
            exceptionMessage = e.Exception.Message;
            Trace.TraceError("Exception in QueueClient.ExceptionReceived: {0}", exceptionMessage);
        }
    }
 
    public override bool OnStart()
    {
        ServicePointManager.DefaultConnectionLimit = 12;
 
        var connectionString = CloudConfigurationManager.GetSetting("ServiceBusConnectionString");
        var queueName = CloudConfigurationManager.GetSetting("QueueName");
        this.client = QueueClient.CreateFromConnectionString(connectionString, queueName);
 
        return base.OnStart();
    }
 
    public override void OnStop()
    {
        this.client.Close();
        this.completedEvent.Set();
        base.OnStop();
    }
}

QueueClient クラスの OnMessageAsync() メソッドの引数コールバックが、キューのメッセージを受信されたときに実行されます。ManualResetEvent クラスでメインスレッドを待機させてるところがポイントです。

送信用のWeb Role

ASP.NET Web API の Controller クラスを作り、Get メソッドに送信ロジックを実装します。ここは、従来のプログラミング モデルと一緒です。

#SendController.cs
public class SendController : ApiController
{
    private QueueClient client;
 
    public SendController()
    {
        var connectionString = CloudConfigurationManager.GetSetting("ServiceBusConnectionString");
        var queueName = CloudConfigurationManager.GetSetting("QueueName");
        this.client = QueueClient.CreateFromConnectionString(connectionString, queueName);
    }
 
    public async Task<string> Get()
    {
        var message = new BrokeredMessage("Gooner");
        message.MessageId = Guid.NewGuid().ToString();
        await this.client.SendAsync(message);
        return "OK";
    }
 
    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            this.client.Close();
        }
        base.Dispose(disposing);
    }
}

まとめ

受信ループとか作らないスッキリとしたコードで、プッシュ方式によるメッセージを受信できるのはいい感じです。Topics / Subscription の場合は、QueueClient を TopicsClient と SubscriptionClient に置き換えるぐらいで、大きな違いはありません。クラウドデザインパターンの Pipes and Filters や Priority Queue では、ServiceBusPipeFilter や PriorityWorkerRole クラスを作ってロジックを共通化しています。

Azure の Queue は、Storage にもありますが、Service Bus の方が便利な機能が多いので、Web Role と Worker Role をシンプルに繋ぐだけというシナリオを除いては、Service Bus を使ったほうが良さそうです。両者の機能比較は、こちらのサイトにまとまっています。