Kouhei Sutou
null+****@clear*****
Tue Dec 24 14:29:56 JST 2013
Kouhei Sutou 2013-12-24 14:29:56 +0900 (Tue, 24 Dec 2013) New Revision: 27b1d3db1e4dce4011cccd7ef263b5124d60eb46 https://github.com/droonga/fluent-plugin-droonga/commit/27b1d3db1e4dce4011cccd7ef263b5124d60eb46 Message: Move most Collector's code to Session Now, Collector is not created for each request. We reuse the same instance like other pluggable components. Copied files: lib/droonga/session.rb (from lib/droonga/collector.rb) Modified files: lib/droonga/collector.rb lib/droonga/collector_plugin.rb lib/droonga/dispatcher.rb Modified: lib/droonga/collector.rb (+2 -76) =================================================================== --- lib/droonga/collector.rb 2013-12-24 14:10:41 +0900 (c2be942) +++ lib/droonga/collector.rb 2013-12-24 14:29:56 +0900 (4f37e1e) @@ -22,87 +22,13 @@ module Droonga class Collector include Pluggable - def initialize(id, dispatcher, components, tasks, inputs) - @id = id - @dispatcher = dispatcher - @components = components - @tasks = tasks - @n_dones = 0 - @inputs = inputs + def initialize load_plugins(["basic"]) # TODO: make customizable end - def done? - @n_dones ==****@tasks***** - end - - def start - tasks = @inputs[nil] - tasks.each do |task| - component = task["component"] - type = component["type"] - command = component["command"] - synchronous = nil - descendants = {} - component["descendants"].each do |name, indices| - descendants[name] = indices.collect do |index| - @components[index]["routes"].map do |route| - @dispatcher.farm_path(route) - end - end - end - message = { - "id" => @id, - "task" => task, - "descendants" => descendants - } - @dispatcher.process_in_farm(task["route"], message, command, synchronous) - @n_dones += 1 - end - end - - def receive(name, value) - tasks = @inputs[name] - unless tasks - #TODO: result arrived before its query - return - end - tasks.each do |task| - task["n_of_inputs"] += 1 - component = task["component"] - type = component["type"] - command = "collector_" + type - n_of_expects = component["n_of_expects"] - message = { - "task"=>task, - "name"=>name, - "value"=>value - } - process(command, message) - return if task["n_of_inputs"] < n_of_expects - #the task is done - result = task["values"] - post = component["post"] - @dispatcher.reply(result) if post - component["descendants"].each do |name, indices| - message = { - "id" => @id, - "input" => name, - "value" => result[name] - } - indices.each do |index| - @components[index]["routes"].each do |route| - @dispatcher.dispatch(message, route) - end - end - end - @n_dones += 1 - end - end - private def instantiate_plugin(name) - CollectorPlugin.repository.instantiate(name, @dispatcher) + CollectorPlugin.repository.instantiate(name) end def log_tag Modified: lib/droonga/collector_plugin.rb (+1 -2) =================================================================== --- lib/droonga/collector_plugin.rb 2013-12-24 14:10:41 +0900 (41a381b) +++ lib/droonga/collector_plugin.rb 2013-12-24 14:29:56 +0900 (b272e36) @@ -22,9 +22,8 @@ module Droonga extend PluginRegisterable attr_reader :task, :input_name, :component, :output_values, :body, :output_names - def initialize(dispatcher) + def initialize super() - @dispatcher = dispatcher end def process(command, message) Modified: lib/droonga/dispatcher.rb (+12 -11) =================================================================== --- lib/droonga/dispatcher.rb 2013-12-24 14:10:41 +0900 (44a41f9) +++ lib/droonga/dispatcher.rb 2013-12-24 14:29:56 +0900 (d4ef7f1) @@ -22,6 +22,7 @@ require "droonga/distributor" require "droonga/catalog" require "droonga/collector" require "droonga/farm" +require "droonga/session" module Droonga class Dispatcher @@ -31,7 +32,7 @@ module Droonga @options = options @name = @options[:name] @farm = Farm.new(name) - @collectors = {} + @sessions = {} @current_id = 0 @local = Regexp.new("^#{@name}") @input_adapter = @@ -41,6 +42,7 @@ module Droonga @loop = EventLoop.new @forwarder = Forwarder.new(@loop) @distributor = Distributor.new(self, @options) + @collector = Collector.new end def start @@ -84,21 +86,21 @@ module Droonga def process_internal_message(message) id = message["id"] - collector = @collectors[id] - if collector - collector.receive(message["input"], message["value"]) + session = @sessions[id] + if session + session.receive(message["input"], message["value"]) else components = message["components"] if components planner = Planner.new(self, components) - collector = planner.create_collector(id) - @collectors[id] = collector + session = planner.create_session(id, @collector) + @sessions[id] = session else #todo: take cases receiving result before its query into account end - collector.start + session.start end - @collectors.delete(id) if collector.done? + @sessions.delete(id) if session.done? end def dispatch(message, destination) @@ -158,7 +160,7 @@ module Droonga @components = components end - def create_collector(id) + def create_session(id, collector) resolve_descendants tasks = [] inputs = {} @@ -178,8 +180,7 @@ module Droonga end end end - collector = Collector.new(id, @dispatcher, @components, tasks, inputs) - return collector + Session.new(id, @dispatcher, collector, @components, tasks, inputs) end def resolve_descendants Copied: lib/droonga/session.rb (+6 -16) 87% =================================================================== --- lib/droonga/collector.rb 2013-12-24 14:10:41 +0900 (c2be942) +++ lib/droonga/session.rb 2013-12-24 14:29:56 +0900 (a2b923b) @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -# # Copyright (C) 2013 Droonga Project # # This library is free software; you can redistribute it and/or @@ -15,21 +13,17 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/pluggable" -require "droonga/collector_plugin" - module Droonga - class Collector - include Pluggable - - def initialize(id, dispatcher, components, tasks, inputs) + class Session + def initialize(id, dispatcher, collector, components, tasks, inputs) @id = id @dispatcher = dispatcher + @collector = collector @components = components @tasks = tasks @n_dones = 0 @inputs = inputs - load_plugins(["basic"]) # TODO: make customizable + @responses = {} end def done? @@ -78,7 +72,7 @@ module Droonga "name"=>name, "value"=>value } - process(command, message) + @collector.process(command, message) return if task["n_of_inputs"] < n_of_expects #the task is done result = task["values"] @@ -101,12 +95,8 @@ module Droonga end private - def instantiate_plugin(name) - CollectorPlugin.repository.instantiate(name, @dispatcher) - end - def log_tag - "collector" + "session" end end end -------------- next part -------------- HTML����������������������������...下載