nimbus (1.2.4) | 2018-01-25 20:02 |
nimbus-sample (1.2.4) | 2018-01-26 17:06 |
アプリケーション開発において、非同期処理が必要になるケースがあります。非同期処理では、処理を依頼する側と処理を実行する側が存在し、その間にはキューが必要となります。
キュー機能を抽象化したインタフェースがQueueです。
Queue経由で渡された処理依頼を、Queueの後ろで待ち受け、非同期で処理を行う機能が必要になります。
そのような非同期処理コンテナ機能を抽象化したインタフェースがQueueHandlerContainerです。
また、非同期処理コンテナ上で「任意の処理を行う」機能を抽象化したインタフェースが、QueueHandlerです。
Queueの実装には、性能分散のために、分流を行うものが存在します。
投入されたオブジェクトを一定のルールで分流を行う必要があります。
このようなQueueの分流を抽象化したインタフェースがDistributedQueueSelectorです。
このインタフェースは、アプリケーション向けではなく、分流Queue実装向けです。
関連するパッケージは、以下です。
アプリケーション向けインタフェースQueueを使った簡単なアプリケーションのサンプルを示します。
- import jp.ossc.nimbus.core.ServiceManagerFactory;
- import jp.ossc.nimbus.service.queue.Queue;
- // Queueを取得
- final Queue queue = (Queue)ServiceManagerFactory.getServiceObject("Queue");
- // Queueから引き抜くスレッドを作成する
- Thread getterThread = new Thread(new Runnable(){
- public void run(){
- for(int i = 0; i < 10; i++){
- Object obj = queue.get();
- System.out.println(obj);
- }
- }
- }, "Getter Thread");
- // Queueから引き抜くスレッドを開始する
- getterThread.start();
- // Queueに詰める
- for(int i = 1; i <= 10; i++){
- queue.push(new Integer(i));
- }
- // Queueから引き抜き終わるまで待機する
- getterThread.join();
実装サービスの一覧は以下のとおりです。
実装サービス | 実装概要 |
jp.ossc.nimbus.service.queue.DefaultQueueService | Queueのデフォルト実装サービス |
jp.ossc.nimbus.service.queue.DelayQueueService | 引き抜きを遅延させるQueue実装サービス |
jp.ossc.nimbus.service.queue.DistributedQueueService | 内部で分流して性能分散を行うQueue実装サービス |
jp.ossc.nimbus.service.queue.ThreadLocalQueueService | スレッド単位でエントリを投入/引き抜きできるQueue実装サービス |
jp.ossc.nimbus.service.queue.SharedQueueService | 複数のJVM間でエントリを共有して投入/引き抜きできるQueue実装サービス |
アプリケーション向けインタフェースQueueHandlerContainerは、Queueインタフェースを継承しています。
但し、投入専用となっており、外部から引き抜く事はできません。引き抜きは、QueueHandlerContainer内部のスレッドによって行われ、QueueHandlerContainerに設定されたQueueHandlerによって処理されます。
以下に、QueueHandlerContainerを使った非同期処理を行うアプリケーションのサンプルを示します。
- import jp.ossc.nimbus.core.ServiceManagerFactory;
- import jp.ossc.nimbus.service.queue.QueueHandlerContainer;
- import jp.ossc.nimbus.service.queue.AsynchContext;
- // QueueHandlerContainerを取得
- final QueueHandlerContainer container = (QueueHandlerContainer)ServiceManagerFactory.getServiceObject("QueueHandlerContainer");
- // QueueHandlerContainerに非同期処理要求(応答なし)を投入する
- for(int i = 1; i <= 10; i++){
- container.push(new AsynchContext(new Integer(i)));
- }
- // 応答Queueを取得
- // 通常、処理毎に使い捨ての応答Queueが必要なので、サービス定義でinstance="factory"を宣言しておく
- final Queue responseQueue = (Queue)ServiceManagerFactory.getServiceObject("ResponseQueue");
- // QueueHandlerContainerに非同期処理要求(応答あり)を投入する
- for(int i = 1; i <= 10; i++){
- container.push(new AsynchContext(new Integer(i), responseQueue));
- }
- // 要求した回数分、応答待ちをする
- for(int i = 1; i <= 10; i++){
- // 各要求毎に1秒まで応答待ちをする
- // タイムアウトした場合は、nullが返る
- AsynchContext context = (AsynchContext)responseQueue.get(1000l);
- if(context != null){
- // 非同期処理で例外が発生していないかチェックする
- // 例外が発生している場合は、発生した例外がthrowされる
- context.checkError();
- // 応答を取得する
- Object output = context.getOutput();
- System.out.println(output);
- }
- }
実装サービスの一覧は以下のとおりです。
QueueHandlerContainer向けインタフェースQueueHandlerは、QueueHandlerContainerに投入された非同期処理要求を処理します。
以下に、非同期処理要求を処理するQueueHandlerの実装例を示します。
- import jp.ossc.nimbus.core.ServiceBase;
- import jp.ossc.nimbus.service.queue.QueueHandler;
- public class SampleQueueHandlerService extends ServiceBase implements QueueHandler{
- public void handleDequeuedObject(Object obj) throws Throwable{
- // Queueからエントリを取り出すと、呼び出される
- if(obj == null){
- return;
- }
- AsynchContext context = (AsynchContext)obj;
- System.out.println(Thread.currentThread().getName() + " : " + context.getInput());
- // 応答を返す場合は、AsynchContextに応答を設定して、応答QueueにAsynchContextを投入する
- if(context.getResponseQueue() != null){
- context.setOutput(Thread.currentThread().getName());
- context.getResponseQueue().push(context);
- }
- }
- public boolean handleError(Object obj, Throwable th) throws Throwable{
- // handleDequeuedObject(Object)で例外がthrowされると呼び出される
- getLogger().write("WARN", "Error occurred in " + Thread.currentThread().getName() + " : " + obj, th);
- // trueを返すと、再度handleDequeuedObject(Object)が呼び出される
- // falseを返すと、終了する
- return true;
- }
- public void handleRetryOver(Object obj, Throwable th) throws Throwable{
- // handleDequeuedObject(Object)で例外をthrowされ、リトライ回数を越えていると呼び出される
- AsynchContext context = (AsynchContext)obj;
- getLogger().write("ERROR", "Fatal error occurred in " + Thread.currentThread().getName() + " : " + obj, th);
- // 応答を返す場合は、AsynchContextに例外を設定して、応答QueueにAsynchContextを投入する
- if(context.getResponseQueue() != null){
- context.setThrowable(th);
- context.getResponseQueue().push(context);
- }
- }
- }
実装サービスの一覧は以下のとおりです。
実装サービス | 実装概要 |
jp.ossc.nimbus.service.queue.BeanFlowInvokerCallQueueHandlerService | 非同期処理を業務フローで処理する |
DistributedQueueSelectorは、分流Queue実装向けのインタフェースです。
以下の分流Queue実装から使用されます。
実装サービスの一覧は以下のとおりです。
サンプルは、以下。