[Groonga-commit] droonga/fluent-plugin-droonga at 27b1d3d [master] Move most Collector's code to Session

Back to archive index

Kouhei Sutou null+****@clear*****
Tue Dec 24 14:29:56 JST 2013


Kouhei Sutou	2013-12-24 14:29:56 +0900 (Tue, 24 Dec 2013)

  New Revision: 27b1d3db1e4dce4011cccd7ef263b5124d60eb46
  https://github.com/droonga/fluent-plugin-droonga/commit/27b1d3db1e4dce4011cccd7ef263b5124d60eb46

  Message:
    Move most Collector's code to Session
    
    Now, Collector is not created for each request. We reuse the same
    instance like other pluggable components.

  Copied files:
    lib/droonga/session.rb
      (from lib/droonga/collector.rb)
  Modified files:
    lib/droonga/collector.rb
    lib/droonga/collector_plugin.rb
    lib/droonga/dispatcher.rb

  Modified: lib/droonga/collector.rb (+2 -76)
===================================================================
--- lib/droonga/collector.rb    2013-12-24 14:10:41 +0900 (c2be942)
+++ lib/droonga/collector.rb    2013-12-24 14:29:56 +0900 (4f37e1e)
@@ -22,87 +22,13 @@ module Droonga
   class Collector
     include Pluggable
 
-    def initialize(id, dispatcher, components, tasks, inputs)
-      @id = id
-      @dispatcher = dispatcher
-      @components = components
-      @tasks = tasks
-      @n_dones = 0
-      @inputs = inputs
+    def initialize
       load_plugins(["basic"]) # TODO: make customizable
     end
 
-    def done?
-      @n_dones ==****@tasks*****
-    end
-
-    def start
-      tasks = @inputs[nil]
-      tasks.each do |task|
-        component = task["component"]
-        type = component["type"]
-        command = component["command"]
-        synchronous = nil
-        descendants = {}
-        component["descendants"].each do |name, indices|
-          descendants[name] = indices.collect do |index|
-            @components[index]["routes"].map do |route|
-              @dispatcher.farm_path(route)
-            end
-          end
-        end
-        message = {
-          "id"          => @id,
-          "task"        => task,
-          "descendants" => descendants
-        }
-        @dispatcher.process_in_farm(task["route"], message, command, synchronous)
-        @n_dones += 1
-      end
-    end
-
-    def receive(name, value)
-      tasks = @inputs[name]
-      unless tasks
-        #TODO: result arrived before its query
-        return
-      end
-      tasks.each do |task|
-        task["n_of_inputs"] += 1
-        component = task["component"]
-        type = component["type"]
-        command = "collector_" + type
-        n_of_expects = component["n_of_expects"]
-        message = {
-          "task"=>task,
-          "name"=>name,
-          "value"=>value
-        }
-        process(command, message)
-        return if task["n_of_inputs"] < n_of_expects
-        #the task is done
-        result = task["values"]
-        post = component["post"]
-        @dispatcher.reply(result) if post
-        component["descendants"].each do |name, indices|
-          message = {
-            "id" => @id,
-            "input" => name,
-            "value" => result[name]
-          }
-          indices.each do |index|
-            @components[index]["routes"].each do |route|
-              @dispatcher.dispatch(message, route)
-            end
-          end
-        end
-        @n_dones += 1
-      end
-    end
-
     private
     def instantiate_plugin(name)
-      CollectorPlugin.repository.instantiate(name, @dispatcher)
+      CollectorPlugin.repository.instantiate(name)
     end
 
     def log_tag

  Modified: lib/droonga/collector_plugin.rb (+1 -2)
===================================================================
--- lib/droonga/collector_plugin.rb    2013-12-24 14:10:41 +0900 (41a381b)
+++ lib/droonga/collector_plugin.rb    2013-12-24 14:29:56 +0900 (b272e36)
@@ -22,9 +22,8 @@ module Droonga
     extend PluginRegisterable
 
     attr_reader :task, :input_name, :component, :output_values, :body, :output_names
-    def initialize(dispatcher)
+    def initialize
       super()
-      @dispatcher = dispatcher
     end
 
     def process(command, message)

  Modified: lib/droonga/dispatcher.rb (+12 -11)
===================================================================
--- lib/droonga/dispatcher.rb    2013-12-24 14:10:41 +0900 (44a41f9)
+++ lib/droonga/dispatcher.rb    2013-12-24 14:29:56 +0900 (d4ef7f1)
@@ -22,6 +22,7 @@ require "droonga/distributor"
 require "droonga/catalog"
 require "droonga/collector"
 require "droonga/farm"
+require "droonga/session"
 
 module Droonga
   class Dispatcher
@@ -31,7 +32,7 @@ module Droonga
       @options = options
       @name = @options[:name]
       @farm = Farm.new(name)
-      @collectors = {}
+      @sessions = {}
       @current_id = 0
       @local = Regexp.new("^#{@name}")
       @input_adapter =
@@ -41,6 +42,7 @@ module Droonga
       @loop = EventLoop.new
       @forwarder = Forwarder.new(@loop)
       @distributor = Distributor.new(self, @options)
+      @collector = Collector.new
     end
 
     def start
@@ -84,21 +86,21 @@ module Droonga
 
     def process_internal_message(message)
       id = message["id"]
-      collector = @collectors[id]
-      if collector
-        collector.receive(message["input"], message["value"])
+      session = @sessions[id]
+      if session
+        session.receive(message["input"], message["value"])
       else
         components = message["components"]
         if components
           planner = Planner.new(self, components)
-          collector = planner.create_collector(id)
-          @collectors[id] = collector
+          session = planner.create_session(id, @collector)
+          @sessions[id] = session
         else
           #todo: take cases receiving result before its query into account
         end
-        collector.start
+        session.start
       end
-      @collectors.delete(id) if collector.done?
+      @sessions.delete(id) if session.done?
     end
 
     def dispatch(message, destination)
@@ -158,7 +160,7 @@ module Droonga
         @components = components
       end
 
-      def create_collector(id)
+      def create_session(id, collector)
         resolve_descendants
         tasks = []
         inputs = {}
@@ -178,8 +180,7 @@ module Droonga
             end
           end
         end
-        collector = Collector.new(id, @dispatcher, @components, tasks, inputs)
-        return collector
+        Session.new(id, @dispatcher, collector, @components, tasks, inputs)
       end
 
       def resolve_descendants

  Copied: lib/droonga/session.rb (+6 -16) 87%
===================================================================
--- lib/droonga/collector.rb    2013-12-24 14:10:41 +0900 (c2be942)
+++ lib/droonga/session.rb    2013-12-24 14:29:56 +0900 (a2b923b)
@@ -1,5 +1,3 @@
-# -*- coding: utf-8 -*-
-#
 # Copyright (C) 2013 Droonga Project
 #
 # This library is free software; you can redistribute it and/or
@@ -15,21 +13,17 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/pluggable"
-require "droonga/collector_plugin"
-
 module Droonga
-  class Collector
-    include Pluggable
-
-    def initialize(id, dispatcher, components, tasks, inputs)
+  class Session
+    def initialize(id, dispatcher, collector, components, tasks, inputs)
       @id = id
       @dispatcher = dispatcher
+      @collector = collector
       @components = components
       @tasks = tasks
       @n_dones = 0
       @inputs = inputs
-      load_plugins(["basic"]) # TODO: make customizable
+      @responses = {}
     end
 
     def done?
@@ -78,7 +72,7 @@ module Droonga
           "name"=>name,
           "value"=>value
         }
-        process(command, message)
+        @collector.process(command, message)
         return if task["n_of_inputs"] < n_of_expects
         #the task is done
         result = task["values"]
@@ -101,12 +95,8 @@ module Droonga
     end
 
     private
-    def instantiate_plugin(name)
-      CollectorPlugin.repository.instantiate(name, @dispatcher)
-    end
-
     def log_tag
-      "collector"
+      "session"
     end
   end
 end
-------------- next part --------------
HTML����������������������������...
下載 



More information about the Groonga-commit mailing list
Back to archive index