YUKI Hiroshi
null+****@clear*****
Mon Feb 3 12:12:53 JST 2014
YUKI Hiroshi 2014-02-03 12:12:53 +0900 (Mon, 03 Feb 2014) New Revision: 634fff3db02d77258688d484fa72a9a6b7cb447c https://github.com/droonga/fluent-plugin-droonga/commit/634fff3db02d77258688d484fa72a9a6b7cb447c Message: Define utility methods to distribute "broadcast" and "scatter" messages easily Modified files: lib/droonga/distributor_plugin.rb lib/droonga/plugin/distributor/crud.rb lib/droonga/plugin/distributor/groonga.rb lib/droonga/plugin/distributor/watch.rb Modified: lib/droonga/distributor_plugin.rb (+17 -0) =================================================================== --- lib/droonga/distributor_plugin.rb 2014-02-03 12:12:32 +0900 (b5c149d) +++ lib/droonga/distributor_plugin.rb 2014-02-03 12:12:53 +0900 (db6d94d) @@ -31,6 +31,23 @@ module Droonga @distributor.distribute(messages) end + def scatter(message, options={}) + planner = DistributedCommandPlanner.new(message) + planner.scatter + planner.key = options[:key] + planner.reduce(options[:reduce]) + planner.plan + distribute(planner.messages) + end + + def broadcast(message, options={}) + planner = DistributedCommandPlanner.new(message) + planner.broadcast(:write => options[:write]) + planner.reduce(options[:reduce]) + planner.plan + distribute(planner.messages) + end + private def process_error(command, error, arguments) if error.is_a?(MessageProcessingError) Modified: lib/droonga/plugin/distributor/crud.rb (+7 -10) =================================================================== --- lib/droonga/plugin/distributor/crud.rb 2014-02-03 12:12:32 +0900 (e910e92) +++ lib/droonga/plugin/distributor/crud.rb 2014-02-03 12:12:53 +0900 (f1efe14) @@ -23,28 +23,25 @@ module Droonga command :add def add(message) - scatter_all(message) + scatter(message) end command :update def update(message) - scatter_all(message) + scatter(message) end # TODO: What is this? command :reset def reset(message) - scatter_all(message) + scatter(message) end private - def scatter_all(message) - planner = DistributedCommandPlanner.new(message) - planner.key = message["body"]["key"] || rand.to_s - planner.scatter - planner.reduce("success", "type" => "and") - planner.plan - distribute(planner.messages) + def scatter(message) + super(message, + :key => message["body"]["key"] || rand.to_s, + :reduce => { "success" => "type" => "and" }) end end end Modified: lib/droonga/plugin/distributor/groonga.rb (+7 -9) =================================================================== --- lib/droonga/plugin/distributor/groonga.rb 2014-02-03 12:12:32 +0900 (e41e76b) +++ lib/droonga/plugin/distributor/groonga.rb 2014-02-03 12:12:53 +0900 (6579060) @@ -26,7 +26,7 @@ module Droonga unless message["dataset"] raise "dataset must be set. FIXME: This error should return client." end - broadcast_all(message) + broadcast(message) end command :table_remove @@ -34,21 +34,19 @@ module Droonga unless message["dataset"] raise "dataset must be set. FIXME: This error should return client." end - broadcast_all(message) + broadcast(message) end command :column_create def column_create(message) - broadcast_all(message) + broadcast(message) end private - def broadcast_all(message) - planner = DistributedCommandPlanner.new(message) - planner.broadcast(:write => true) - planner.reduce("result", "type" => "or") - planner.plan - distribute(planner.messages) + def broadcast(message) + super(message, + :write => true, + :reduce => { "result" => "type" => "or" }) end end end Modified: lib/droonga/plugin/distributor/watch.rb (+8 -10) =================================================================== --- lib/droonga/plugin/distributor/watch.rb 2014-02-03 12:12:32 +0900 (19a0894) +++ lib/droonga/plugin/distributor/watch.rb 2014-02-03 12:12:53 +0900 (76d4c25) @@ -23,31 +23,29 @@ module Droonga command "watch.feed" => :feed def feed(message) - broadcast_all(message) + broadcast(message) end command "watch.subscribe" => :subscribe def subscribe(message) - broadcast_all(message) + broadcast(message) end command "watch.unsubscribe" => :unsubscribe def unsubscribe(message) - broadcast_all(message) + broadcast(message) end command "watch.sweep" => :sweep def sweep(message) - broadcast_all(message) + broadcast(message) end private - def broadcast_all(message) - planner = DistributedCommandPlanner.new(message) - planner.broadcast(:write => true) - planner.reduce("success", "type" => "or") - planner.plan - distribute(planner.messages) + def broadcast(message) + super(message, + :write => true, + :reduce => { "success" => "type" => "and" }) end end end -------------- next part -------------- HTML����������������������������... 下載