YUKI Hiroshi
null+****@clear*****
Tue Jan 6 16:18:54 JST 2015
YUKI Hiroshi 2015-01-06 16:18:54 +0900 (Tue, 06 Jan 2015) New Revision: f3050b1238014a98afa1e9a930e2d7fdf2108f37 https://github.com/droonga/droonga-engine/commit/f3050b1238014a98afa1e9a930e2d7fdf2108f37 Message: Make EngineState independent from Cluster Conflicts: lib/droonga/dispatcher.rb Modified files: lib/droonga/dispatcher.rb lib/droonga/engine.rb lib/droonga/engine_state.rb lib/droonga/handler_messenger.rb Modified: lib/droonga/dispatcher.rb (+22 -13) =================================================================== --- lib/droonga/dispatcher.rb 2015-01-06 15:59:32 +0900 (38c928a) +++ lib/droonga/dispatcher.rb 2015-01-06 16:18:54 +0900 (f9aea4c) @@ -46,16 +46,21 @@ module Droonga end end - attr_reader :engine_state + attr_reader :engine_state, :cluster - def initialize(engine_state, catalog) + def initialize(engine_state, cluster, catalog) @engine_state = engine_state + @cluster = cluster @forwarder = @engine_state.forwarder + @cluster.on_change = lambda do + @forwarder.resume + end @replier = @engine_state.replier @catalog = catalog @adapter_runners = create_adapter_runners @farm = Farm.new(@engine_state.name, @catalog, @engine_state.loop, :engine_state => @engine_state, + :cluster => @cluster, :dispatcher => self, :forwarder => @forwarder) @collector_runners = create_collector_runners @@ -116,7 +121,7 @@ module Droonga def forward(message, destination) logger.trace("forward start") unless local?(destination) - return if @engine_state.cluster.forward(message, destination) + return if****@clust*****(message, destination) end @forwarder.forward(message, destination) logger.trace("forward done") @@ -163,7 +168,7 @@ module Droonga else steps = message["steps"] if steps - session_planner = SessionPlanner.new(@engine_state, steps) + session_planner = SessionPlanner.new(@engine_state, @cluster, steps) dataset = message["dataset"] || @message["dataset"] collector_runner = @collector_runners[dataset] session = session_planner.create_session(id, self, collector_runner) @@ -187,7 +192,7 @@ module Droonga "type" => "dispatcher", "to" => destination, } - @engine_state.cluster.forward(forward_message, forward_destination) || + @cluster.forward(forward_message, forward_destination) || @forwarder.forward(forward_message, forward_destination) end end @@ -200,13 +205,9 @@ module Droonga dataset =****@catal*****(step["dataset"]) if dataset if write_step?(step) - target_nodes = @engine_state.cluster.writable_nodes - logger.trace("dispatch_steps: target_nodes = writable_nodes", - :nodes => target_nodes) + target_nodes =****@clust*****_nodes else - target_nodes = @engine_state.cluster.forwardable_nodes - logger.trace("dispatch_steps: target_nodes = forwardable_nodes", - :nodes => target_nodes) + target_nodes =****@clust*****_nodes end routes = dataset.compute_routes(step, target_nodes) step["routes"] = routes @@ -318,8 +319,9 @@ module Droonga class SessionPlanner attr_reader :steps - def initialize(engine_state, steps) + def initialize(engine_state, cluster, steps) @engine_state = engine_state + @cluster = cluster @steps = steps end @@ -361,7 +363,7 @@ module Droonga (step["outputs"] || []).each do |output| descendants[output] = [] @descendants[output].each do |index| - responsive_routes = @engine_state.select_responsive_routes(step["routes"]) + responsive_routes = select_responsive_routes(step["routes"]) @steps[index]["n_of_expects"] += responsive_routes.size descendants[output].concat(@steps[index]["routes"]) end @@ -369,6 +371,13 @@ module Droonga step["descendants"] = descendants end end + + def select_responsive_routes(routes) + selected_nodes =****@clust*****_nodes + routes.select do |route| + selected_nodes.include?(@engine_state.farm_path(route)) + end + end end end end Modified: lib/droonga/engine.rb (+7 -5) =================================================================== --- lib/droonga/engine.rb 2015-01-06 15:59:32 +0900 (d5fb0da) +++ lib/droonga/engine.rb 2015-01-06 16:18:54 +0900 (06d4958) @@ -20,6 +20,7 @@ require "fileutils" require "droonga/engine/version" require "droonga/loggable" require "droonga/engine_state" +require "droonga/cluster" require "droonga/catalog_loader" require "droonga/dispatcher" require "droonga/file_observer" @@ -32,8 +33,9 @@ module Droonga attr_writer :on_ready def initialize(loop, name, internal_name) @state = EngineState.new(loop, name, internal_name) + @cluster = Cluster.new(loop) @catalog = load_catalog - @state.catalog = @catalog + @state.catalog =****@clust***** = @catalog @dispatcher = create_dispatcher @node_metadata_observer = FileObserver.new(loop, Path.node_metadata) @node_metadata_observer.on_change = lambda do @@ -50,7 +52,7 @@ module Droonga @on_ready.call if @on_ready end @state.start - @state.cluster.start_observe + @cluster.start_observe @node_metadata_observer.start @dispatcher.start logger.trace("start: done") @@ -58,7 +60,7 @@ module Droonga def stop_gracefully logger.trace("stop_gracefully: start") - @state.cluster.stop_observe + @cluster.stop_observe @node_metadata_observer.stop on_finish = lambda do logger.trace("stop_gracefully/on_finish: start") @@ -83,7 +85,7 @@ module Droonga def stop_immediately logger.trace("stop_immediately: start") save_last_processed_message_timestamp - @state.cluster.stop_observe + @cluster.stop_observe @node_metadata_observer.stop @dispatcher.stop_immediately @state.shutdown @@ -112,7 +114,7 @@ module Droonga end def create_dispatcher - Dispatcher.new(@state, @catalog) + Dispatcher.new(@state, @cluster, @catalog) end def save_last_processed_message_timestamp Modified: lib/droonga/engine_state.rb (+1 -18) =================================================================== --- lib/droonga/engine_state.rb 2015-01-06 15:59:32 +0900 (28a65e8) +++ lib/droonga/engine_state.rb 2015-01-06 16:18:54 +0900 (a454921) @@ -21,7 +21,6 @@ require "droonga/loggable" require "droonga/event_loop" require "droonga/buffered_forwarder" require "droonga/replier" -require "droonga/cluster" module Droonga class EngineState @@ -33,8 +32,7 @@ module Droonga attr_reader :forwarder attr_reader :replier attr_writer :on_ready - attr_reader :catalog - attr_reader :cluster + attr_accessor :catalog attr_accessor :on_finish def initialize(loop, name, internal_name) @@ -43,21 +41,13 @@ module Droonga @internal_name = internal_name @sessions = {} @current_id = 0 - @cluster = Cluster.new(@loop) @forwarder = Forwarder.new(@loop, :buffering => true) - @cluster.on_change = lambda do - @forwarder.resume - end @replier = Replier.new(@forwarder) @on_ready = nil @on_finish = nil @catalog = nil end - def catalog=(catalog) - @catalog =****@clust***** = catalog - end - def start logger.trace("start start") @forwarder.start @@ -118,13 +108,6 @@ module Droonga @on_ready.call if @on_ready end - def select_responsive_routes(routes) - selected_nodes =****@clust*****_nodes - routes.select do |route| - selected_nodes.include?(farm_path(route)) - end - end - private def log_tag "engine_state" Modified: lib/droonga/handler_messenger.rb (+2 -1) =================================================================== --- lib/droonga/handler_messenger.rb 2015-01-06 15:59:32 +0900 (a25d6c5) +++ lib/droonga/handler_messenger.rb 2015-01-06 16:18:54 +0900 (148e991) @@ -18,7 +18,7 @@ require "droonga/forwarder" module Droonga class HandlerMessenger - attr_reader :database_name, :dispatcher, :engine_state + attr_reader :database_name, :dispatcher, :engine_state, :cluster def initialize(forwarder, message, options={}) @forwarder = forwarder @@ -27,6 +27,7 @@ module Droonga @replier = Replier.new(@forwarder) @dispatcher = options[:dispatcher] @engine_state = options[:engine_state] + @cluster = options[:cluster] @database_name = options[:database] end -------------- next part -------------- HTML����������������������������...下載