Kouhei Sutou
null+****@clear*****
Tue Aug 20 14:09:55 JST 2013
Kouhei Sutou 2013-08-20 14:09:55 +0900 (Tue, 20 Aug 2013) New Revision: 04225040c57c6b67f551901dd3465dc1f967550c https://github.com/droonga/fluent-plugin-droonga/commit/04225040c57c6b67f551901dd3465dc1f967550c Message: Use serverengine gem to manage workers Added files: lib/droonga/engine.rb lib/droonga/server.rb Modified files: fluent-plugin-droonga.gemspec lib/droonga/worker.rb lib/fluent/plugin/out_droonga.rb Modified: fluent-plugin-droonga.gemspec (+1 -0) =================================================================== --- fluent-plugin-droonga.gemspec 2013-08-19 16:08:45 +0900 (09b1d18) +++ fluent-plugin-droonga.gemspec 2013-08-20 14:09:55 +0900 (1b89081) @@ -31,6 +31,7 @@ Gem::Specification.new do |gem| gem.add_dependency "rroonga", ">= 3.0.3" gem.add_dependency "groonga-command", ">= 1.0.3" gem.add_dependency "fluent-logger" + gem.add_dependency "serverengine" gem.add_development_dependency "rake" gem.add_development_dependency "bundler" gem.add_development_dependency "test-unit" Added: lib/droonga/engine.rb (+111 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/engine.rb 2013-08-20 14:09:55 +0900 (53bc70c) @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 droonga project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "serverengine" +require "msgpack" +require "cool.io" + +require "droonga/server" +require "droonga/worker" + +module Droonga + class Engine + DEFAULT_OPTIONS = { + :database => "droonga/db", + :queue_name => "DroongaQueue", + :handlers => ["proxy"], + :n_workers => 1, + } + + def initialize(options={}) + @options = DEFAULT_OPTIONS.merge(options) + end + + def start + @message_input, @message_output = IO.pipe + @message_input.sync = true + @message_output.sync = true + start_supervisor + start_emitter + end + + def shutdown + $log.trace("engine: shutdown: start") + shutdown_emitter + shutdown_supervisor + @message_input.close unless @message_input.closed? + @message_output.close unless @message_output.closed? + $log.trace("engine: shutdown: done") + end + + def emit(tag, time, record) + $log.trace("tag: <#{tag}>") + @emitter.write(MessagePack.pack([tag, time, record])) + @loop_breaker.signal + end + + private + def start_emitter + @loop = Coolio::Loop.new + @emitter = Coolio::IO.new(@message_output) + @emitter.on_write_complete do + $log.trace("emitter: written") + end + @emitter.attach(@loop) + @loop_breaker = Coolio::AsyncWatcher.new + @loop_breaker.attach(@loop) + @emitter_thread = Thread.new do + @loop.run + end + end + + def shutdown_emitter + $log.trace("emitter: shutdown: start") + @emitter.close + $log.trace("emitter: shutdown: emitter: closed") + @loop.stop + @loop_breaker.signal + $log.trace("emitter: shutdown: loop: stopped") + @emitter_thread.join + $log.trace("emitter: shutdown: done") + end + + def start_supervisor + @supervisor = ServerEngine::Supervisor.new(Server, Worker) do + force_options = { + :worker_type => "process", + :workers => @options[:n_workers], + :message_input => @message_input, + :log_level => $log.level, + } + @options.merge(force_options) + end + @supervisor.logger = $log + @supervisor_thread = Thread.new do + @supervisor.main + end + end + + def shutdown_supervisor + $log.trace("supervisor: shutdown: start") + @supervisor.stop(true) + $log.trace("supervisor: shutdown: stopped") + @supervisor_thread.join + $log.trace("supervisor: shutdown: done") + end + end +end Added: lib/droonga/server.rb (+128 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/server.rb 2013-08-20 14:09:55 +0900 (7119de5) @@ -0,0 +1,128 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 droonga project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "msgpack" +require "cool.io" +require "groonga" + +require "droonga/job_queue" + +module Droonga + module Server + class Receiver + def initialize(input) + @input = input + end + + def run + $log.trace + @io = Coolio::IO.new(@input) + unpacker = MessagePack::Unpacker.new + @io.on_read do |data| + $log.trace("receiver: received: <#{data.bytesize}>") + unpacker.feed_each(data) do |message| + yield message + end + end + @loop = Coolio::Loop.new + @loop.attach(@io) + @loop_breaker = Coolio::AsyncWatcher.new + @loop.attach(@loop_breaker) + @running = true + @loop.run + end + + def stop + unless @running + $log.trace("receiver: stop: not needed") + return + end + + $log.trace("receiver: stop: start") + @io.close + $log.trace("receiver: stop: closed") + @loop.stop + @running = false + @loop_breaker.signal + $log.trace("receiver: stop: done") + end + end + + def initialize + super + @name = config[:name] + @context = Groonga::Context.new + @message_input = config[:message_input] + @database_name = config[:database] || "droonga/db" + @queue_name = config[:queue_name] || "DroongaQueue" + Droonga::JobQueue.ensure_schema(@database_name, @queue_name) + end + + def before_run + @database =****@conte*****_database(@database_name) + @context.encoding = :none + + @receiver = Receiver.new(@message_input) + @receiver_thread = Thread.new do + @receiver.run do |message| + $log.trace("received: start") + packed_message = message.to_msgpack + queue = @context[@queue_name] + queue.push do |record| + record.message = packed_message + end + $log.trace("received: done") + end + end + end + + def after_run + $log.trace("server: after_run: start") + + $log.trace("server: after_run: receiver: start") + @receiver_thread.join + $log.trace("server: after_run: receiver: done") + + $log.trace("server: after_run: groonga: start") + @database.close + @context.close + @database = @context = nil + $log.trace("server: after_run: groonga: done") + + $log.trace("server: after_run: done") + end + + def stop(stop_graceful) + $log.trace("server: stop: start") + + $log.trace("server: stop: receiver: stop: start") + @receiver.stop + $log.trace("server: stop: receiver: stop: done") + + $log.trace("server: stop: queue: unblock: start") + queue = @context[@queue_name] + 3.times do |i| + super + queue.unblock + sleep(i ** 2 * 0.1) + end + $log.trace("server: stop: queue: unblock: done") + + $log.trace("server: stop: done") + end + end +end Modified: lib/droonga/worker.rb (+25 -54) =================================================================== --- lib/droonga/worker.rb 2013-08-19 16:08:45 +0900 (ae9110a) +++ lib/droonga/worker.rb 2013-08-20 14:09:55 +0900 (91287f1) @@ -26,40 +26,50 @@ require "droonga/catalog" require "droonga/proxy" module Droonga - class Worker + module Worker attr_reader :context, :envelope, :name - def initialize(options={}) - @pool = [] + def initialize @handlers = [] @outputs = {} - @name = options[:name] - @database_name = options[:database] || "droonga/db" - @queue_name = options[:queue_name] || "DroongaQueue" - Droonga::JobQueue.ensure_schema(@database_name, @queue_name) - @handler_names = options[:handlers] || ["proxy"] + @name = config[:name] + @database_name = config[:database] || "droonga/db" + @queue_name = config[:queue_name] || "DroongaQueue" + @handler_names = config[:handlers] || ["proxy"] load_handlers - @pool_size = options[:pool_size] || 1 - @pool = spawn prepare end - def shutdown - shutdown_workers + def run + $log.trace("worker: run: start") + @queue = @context[@queue_name] + @running = true + while @running + $log.trace("worker: run: pull_message: start") + message = pull_message + $log.trace("worker: run: pull_message: done") + next unless message + body, command, arguments = parse_message(message) + handler = find_handler(command) + handler.handle(command, body, *arguments) if handler + end @handlers.each do |handler| handler.shutdown end @outputs.each do |dest, output| output[:logger].close if output[:logger] end + @queue = nil @database.close @context.close @database = @context = nil + $log.trace("worker: run: done") end - def dispatch(*message) - body, type, arguments = parse_message(message) - post_or_push(message, body, "type" => type, "arguments" => arguments) + def stop + $log.trace("worker: stop: start") + @running = false + $log.trace("worker: stop: done") end def add_handler(name) @@ -192,9 +202,7 @@ module Droonga def pull_message packed_message = nil - @status = :IDLE @queue.pull do |record| - @status = :BUSY if record packed_message = record.message record.delete @@ -204,43 +212,6 @@ module Droonga MessagePack.unpack(packed_message) end - def start - @finish = false - @status = :IDLE - # TODO: doesn't work - Signal.trap(:TERM) do - @finish = true - exit! 0 if @status == :IDLE - end - @queue = @context[@queue_name] - while !@finish - message = pull_message - next unless message - body, command, arguments = parse_message(message) - handler = find_handler(command) - handler.handle(command, body, *arguments) if handler - end - end - - def spawn - pool = [] - @pool_size.times do - pid = Process.fork - if pid - pool << pid - next - end - # child process - begin - prepare - start - shutdown - exit! 0 - end - end - pool - end - def load_handlers @handler_names.each do |handler_name| plugin = Droonga::Plugin.new("handler", handler_name) Modified: lib/fluent/plugin/out_droonga.rb (+6 -5) =================================================================== --- lib/fluent/plugin/out_droonga.rb 2013-08-19 16:08:45 +0900 (5b985a1) +++ lib/fluent/plugin/out_droonga.rb 2013-08-20 14:09:55 +0900 (f89205e) @@ -15,7 +15,7 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/worker" +require "droonga/engine" module Fluent class DroongaOutput < Output @@ -31,21 +31,22 @@ module Fluent def start super - @worker = Droonga::Worker.new(:database => @database, + @engine = Droonga::Engine.new(:database => @database, :queue_name => @queue_name, - :pool_size => @n_workers, + :n_workers => @n_workers, :handlers => @handlers, :name => @name) + @engine.start end def shutdown + @engine.shutdown super - @worker.shutdown end def emit(tag, es, chain) es.each do |time, record| - @worker.dispatch(tag, time, record) + @engine.emit(tag, time, record) end chain.next end -------------- next part -------------- HTML����������������������������... 下載