[Groonga-commit] droonga/droonga-engine at f3050b1 [buffered-forward] Make EngineState independent from Cluster

Back to archive index

YUKI Hiroshi null+****@clear*****
Tue Jan 6 16:18:54 JST 2015


YUKI Hiroshi	2015-01-06 16:18:54 +0900 (Tue, 06 Jan 2015)

  New Revision: f3050b1238014a98afa1e9a930e2d7fdf2108f37
  https://github.com/droonga/droonga-engine/commit/f3050b1238014a98afa1e9a930e2d7fdf2108f37

  Message:
    Make EngineState independent from Cluster
    
    Conflicts:
    	lib/droonga/dispatcher.rb

  Modified files:
    lib/droonga/dispatcher.rb
    lib/droonga/engine.rb
    lib/droonga/engine_state.rb
    lib/droonga/handler_messenger.rb

  Modified: lib/droonga/dispatcher.rb (+22 -13)
===================================================================
--- lib/droonga/dispatcher.rb    2015-01-06 15:59:32 +0900 (38c928a)
+++ lib/droonga/dispatcher.rb    2015-01-06 16:18:54 +0900 (f9aea4c)
@@ -46,16 +46,21 @@ module Droonga
       end
     end
 
-    attr_reader :engine_state
+    attr_reader :engine_state, :cluster
 
-    def initialize(engine_state, catalog)
+    def initialize(engine_state, cluster, catalog)
       @engine_state = engine_state
+      @cluster = cluster
       @forwarder = @engine_state.forwarder
+      @cluster.on_change = lambda do
+        @forwarder.resume
+      end
       @replier = @engine_state.replier
       @catalog = catalog
       @adapter_runners = create_adapter_runners
       @farm = Farm.new(@engine_state.name, @catalog, @engine_state.loop,
                        :engine_state => @engine_state,
+                       :cluster => @cluster,
                        :dispatcher => self,
                        :forwarder  => @forwarder)
       @collector_runners = create_collector_runners
@@ -116,7 +121,7 @@ module Droonga
     def forward(message, destination)
       logger.trace("forward start")
       unless local?(destination)
-        return if @engine_state.cluster.forward(message, destination)
+        return if****@clust*****(message, destination)
       end
       @forwarder.forward(message, destination)
       logger.trace("forward done")
@@ -163,7 +168,7 @@ module Droonga
       else
         steps = message["steps"]
         if steps
-          session_planner = SessionPlanner.new(@engine_state, steps)
+          session_planner = SessionPlanner.new(@engine_state, @cluster, steps)
           dataset = message["dataset"] || @message["dataset"]
           collector_runner = @collector_runners[dataset]
           session = session_planner.create_session(id, self, collector_runner)
@@ -187,7 +192,7 @@ module Droonga
           "type" => "dispatcher",
           "to"   => destination,
         }
-        @engine_state.cluster.forward(forward_message, forward_destination) ||
+        @cluster.forward(forward_message, forward_destination) ||
           @forwarder.forward(forward_message, forward_destination)
       end
     end
@@ -200,13 +205,9 @@ module Droonga
         dataset =****@catal*****(step["dataset"])
         if dataset
           if write_step?(step)
-            target_nodes = @engine_state.cluster.writable_nodes
-            logger.trace("dispatch_steps: target_nodes = writable_nodes",
-                         :nodes => target_nodes)
+            target_nodes =****@clust*****_nodes
           else
-            target_nodes = @engine_state.cluster.forwardable_nodes
-            logger.trace("dispatch_steps: target_nodes = forwardable_nodes",
-                         :nodes => target_nodes)
+            target_nodes =****@clust*****_nodes
           end
           routes = dataset.compute_routes(step, target_nodes)
           step["routes"] = routes
@@ -318,8 +319,9 @@ module Droonga
     class SessionPlanner
       attr_reader :steps
 
-      def initialize(engine_state, steps)
+      def initialize(engine_state, cluster, steps)
         @engine_state = engine_state
+        @cluster = cluster
         @steps = steps
       end
 
@@ -361,7 +363,7 @@ module Droonga
           (step["outputs"] || []).each do |output|
             descendants[output] = []
             @descendants[output].each do |index|
-              responsive_routes = @engine_state.select_responsive_routes(step["routes"])
+              responsive_routes = select_responsive_routes(step["routes"])
               @steps[index]["n_of_expects"] += responsive_routes.size
               descendants[output].concat(@steps[index]["routes"])
             end
@@ -369,6 +371,13 @@ module Droonga
           step["descendants"] = descendants
         end
       end
+
+      def select_responsive_routes(routes)
+        selected_nodes =****@clust*****_nodes
+        routes.select do |route|
+          selected_nodes.include?(@engine_state.farm_path(route))
+        end
+      end
     end
   end
 end

  Modified: lib/droonga/engine.rb (+7 -5)
===================================================================
--- lib/droonga/engine.rb    2015-01-06 15:59:32 +0900 (d5fb0da)
+++ lib/droonga/engine.rb    2015-01-06 16:18:54 +0900 (06d4958)
@@ -20,6 +20,7 @@ require "fileutils"
 require "droonga/engine/version"
 require "droonga/loggable"
 require "droonga/engine_state"
+require "droonga/cluster"
 require "droonga/catalog_loader"
 require "droonga/dispatcher"
 require "droonga/file_observer"
@@ -32,8 +33,9 @@ module Droonga
     attr_writer :on_ready
     def initialize(loop, name, internal_name)
       @state = EngineState.new(loop, name, internal_name)
+      @cluster = Cluster.new(loop)
       @catalog = load_catalog
-      @state.catalog = @catalog
+      @state.catalog =****@clust***** = @catalog
       @dispatcher = create_dispatcher
       @node_metadata_observer = FileObserver.new(loop, Path.node_metadata)
       @node_metadata_observer.on_change = lambda do
@@ -50,7 +52,7 @@ module Droonga
         @on_ready.call if @on_ready
       end
       @state.start
-      @state.cluster.start_observe
+      @cluster.start_observe
       @node_metadata_observer.start
       @dispatcher.start
       logger.trace("start: done")
@@ -58,7 +60,7 @@ module Droonga
 
     def stop_gracefully
       logger.trace("stop_gracefully: start")
-      @state.cluster.stop_observe
+      @cluster.stop_observe
       @node_metadata_observer.stop
       on_finish = lambda do
         logger.trace("stop_gracefully/on_finish: start")
@@ -83,7 +85,7 @@ module Droonga
     def stop_immediately
       logger.trace("stop_immediately: start")
       save_last_processed_message_timestamp
-      @state.cluster.stop_observe
+      @cluster.stop_observe
       @node_metadata_observer.stop
       @dispatcher.stop_immediately
       @state.shutdown
@@ -112,7 +114,7 @@ module Droonga
     end
 
     def create_dispatcher
-      Dispatcher.new(@state, @catalog)
+      Dispatcher.new(@state, @cluster, @catalog)
     end
 
     def save_last_processed_message_timestamp

  Modified: lib/droonga/engine_state.rb (+1 -18)
===================================================================
--- lib/droonga/engine_state.rb    2015-01-06 15:59:32 +0900 (28a65e8)
+++ lib/droonga/engine_state.rb    2015-01-06 16:18:54 +0900 (a454921)
@@ -21,7 +21,6 @@ require "droonga/loggable"
 require "droonga/event_loop"
 require "droonga/buffered_forwarder"
 require "droonga/replier"
-require "droonga/cluster"
 
 module Droonga
   class EngineState
@@ -33,8 +32,7 @@ module Droonga
     attr_reader :forwarder
     attr_reader :replier
     attr_writer :on_ready
-    attr_reader :catalog
-    attr_reader :cluster
+    attr_accessor :catalog
     attr_accessor :on_finish
 
     def initialize(loop, name, internal_name)
@@ -43,21 +41,13 @@ module Droonga
       @internal_name = internal_name
       @sessions = {}
       @current_id = 0
-      @cluster = Cluster.new(@loop)
       @forwarder = Forwarder.new(@loop, :buffering => true)
-      @cluster.on_change = lambda do
-        @forwarder.resume
-      end
       @replier = Replier.new(@forwarder)
       @on_ready = nil
       @on_finish = nil
       @catalog = nil
     end
 
-    def catalog=(catalog)
-      @catalog =****@clust***** = catalog
-    end
-
     def start
       logger.trace("start start")
       @forwarder.start
@@ -118,13 +108,6 @@ module Droonga
       @on_ready.call if @on_ready
     end
 
-    def select_responsive_routes(routes)
-      selected_nodes =****@clust*****_nodes
-      routes.select do |route|
-        selected_nodes.include?(farm_path(route))
-      end
-    end
-
     private
     def log_tag
       "engine_state"

  Modified: lib/droonga/handler_messenger.rb (+2 -1)
===================================================================
--- lib/droonga/handler_messenger.rb    2015-01-06 15:59:32 +0900 (a25d6c5)
+++ lib/droonga/handler_messenger.rb    2015-01-06 16:18:54 +0900 (148e991)
@@ -18,7 +18,7 @@ require "droonga/forwarder"
 
 module Droonga
   class HandlerMessenger
-    attr_reader :database_name, :dispatcher, :engine_state
+    attr_reader :database_name, :dispatcher, :engine_state, :cluster
 
     def initialize(forwarder, message, options={})
       @forwarder = forwarder
@@ -27,6 +27,7 @@ module Droonga
       @replier = Replier.new(@forwarder)
       @dispatcher = options[:dispatcher]
       @engine_state = options[:engine_state]
+      @cluster = options[:cluster]
       @database_name = options[:database]
     end
 
-------------- next part --------------
HTML����������������������������...
下載 



More information about the Groonga-commit mailing list
Back to archive index