[Groonga-commit] droonga/fluent-plugin-droonga at 634fff3 [master] Define utility methods to distribute "broadcast" and "scatter" messages easily

Back to archive index

YUKI Hiroshi null+****@clear*****
Mon Feb 3 12:12:53 JST 2014


YUKI Hiroshi	2014-02-03 12:12:53 +0900 (Mon, 03 Feb 2014)

  New Revision: 634fff3db02d77258688d484fa72a9a6b7cb447c
  https://github.com/droonga/fluent-plugin-droonga/commit/634fff3db02d77258688d484fa72a9a6b7cb447c

  Message:
    Define utility methods to distribute "broadcast" and "scatter" messages easily

  Modified files:
    lib/droonga/distributor_plugin.rb
    lib/droonga/plugin/distributor/crud.rb
    lib/droonga/plugin/distributor/groonga.rb
    lib/droonga/plugin/distributor/watch.rb

  Modified: lib/droonga/distributor_plugin.rb (+17 -0)
===================================================================
--- lib/droonga/distributor_plugin.rb    2014-02-03 12:12:32 +0900 (b5c149d)
+++ lib/droonga/distributor_plugin.rb    2014-02-03 12:12:53 +0900 (db6d94d)
@@ -31,6 +31,23 @@ module Droonga
       @distributor.distribute(messages)
     end
 
+    def scatter(message, options={})
+      planner = DistributedCommandPlanner.new(message)
+      planner.scatter
+      planner.key = options[:key]
+      planner.reduce(options[:reduce])
+      planner.plan
+      distribute(planner.messages)
+    end
+
+    def broadcast(message, options={})
+      planner = DistributedCommandPlanner.new(message)
+      planner.broadcast(:write => options[:write])
+      planner.reduce(options[:reduce])
+      planner.plan
+      distribute(planner.messages)
+    end
+
     private
     def process_error(command, error, arguments)
       if error.is_a?(MessageProcessingError)

  Modified: lib/droonga/plugin/distributor/crud.rb (+7 -10)
===================================================================
--- lib/droonga/plugin/distributor/crud.rb    2014-02-03 12:12:32 +0900 (e910e92)
+++ lib/droonga/plugin/distributor/crud.rb    2014-02-03 12:12:53 +0900 (f1efe14)
@@ -23,28 +23,25 @@ module Droonga
 
     command :add
     def add(message)
-      scatter_all(message)
+      scatter(message)
     end
 
     command :update
     def update(message)
-      scatter_all(message)
+      scatter(message)
     end
 
     # TODO: What is this?
     command :reset
     def reset(message)
-      scatter_all(message)
+      scatter(message)
     end
 
     private
-    def scatter_all(message)
-      planner = DistributedCommandPlanner.new(message)
-      planner.key = message["body"]["key"] || rand.to_s
-      planner.scatter
-      planner.reduce("success", "type" => "and")
-      planner.plan
-      distribute(planner.messages)
+    def scatter(message)
+      super(message,
+            :key => message["body"]["key"] || rand.to_s,
+            :reduce => { "success" => "type" => "and" })
     end
   end
 end

  Modified: lib/droonga/plugin/distributor/groonga.rb (+7 -9)
===================================================================
--- lib/droonga/plugin/distributor/groonga.rb    2014-02-03 12:12:32 +0900 (e41e76b)
+++ lib/droonga/plugin/distributor/groonga.rb    2014-02-03 12:12:53 +0900 (6579060)
@@ -26,7 +26,7 @@ module Droonga
       unless message["dataset"]
         raise "dataset must be set. FIXME: This error should return client."
       end
-      broadcast_all(message)
+      broadcast(message)
     end
 
     command :table_remove
@@ -34,21 +34,19 @@ module Droonga
       unless message["dataset"]
         raise "dataset must be set. FIXME: This error should return client."
       end
-      broadcast_all(message)
+      broadcast(message)
     end
 
     command :column_create
     def column_create(message)
-      broadcast_all(message)
+      broadcast(message)
     end
 
     private
-    def broadcast_all(message)
-      planner = DistributedCommandPlanner.new(message)
-      planner.broadcast(:write => true)
-      planner.reduce("result", "type" => "or")
-      planner.plan
-      distribute(planner.messages)
+    def broadcast(message)
+      super(message,
+            :write => true,
+            :reduce => { "result" => "type" => "or" })
     end
   end
 end

  Modified: lib/droonga/plugin/distributor/watch.rb (+8 -10)
===================================================================
--- lib/droonga/plugin/distributor/watch.rb    2014-02-03 12:12:32 +0900 (19a0894)
+++ lib/droonga/plugin/distributor/watch.rb    2014-02-03 12:12:53 +0900 (76d4c25)
@@ -23,31 +23,29 @@ module Droonga
 
     command "watch.feed" => :feed
     def feed(message)
-      broadcast_all(message)
+      broadcast(message)
     end
 
     command "watch.subscribe" => :subscribe
     def subscribe(message)
-      broadcast_all(message)
+      broadcast(message)
     end
 
     command "watch.unsubscribe" => :unsubscribe
     def unsubscribe(message)
-      broadcast_all(message)
+      broadcast(message)
     end
 
     command "watch.sweep" => :sweep
     def sweep(message)
-      broadcast_all(message)
+      broadcast(message)
     end
 
     private
-    def broadcast_all(message)
-      planner = DistributedCommandPlanner.new(message)
-      planner.broadcast(:write => true)
-      planner.reduce("success", "type" => "or")
-      planner.plan
-      distribute(planner.messages)
+    def broadcast(message)
+      super(message,
+            :write => true,
+            :reduce => { "success" => "type" => "and" })
     end
   end
 end
-------------- next part --------------
HTML����������������������������...
下載 



More information about the Groonga-commit mailing list
Back to archive index