Kouhei Sutou
null+****@clear*****
Mon Dec 9 17:31:05 JST 2013
Kouhei Sutou 2013-12-09 17:31:05 +0900 (Mon, 09 Dec 2013) New Revision: 1875a7d1f91b00012b72f0e0167dc6a9d4a8b6d4 https://github.com/droonga/fluent-plugin-droonga/commit/1875a7d1f91b00012b72f0e0167dc6a9d4a8b6d4 Message: Use TCP/IP based job queue instead of Groonga's table based job queue It decreases shutdown time but it may also decrease messaging throughput. Added files: lib/droonga/event_loop.rb lib/droonga/message_pusher.rb lib/droonga/message_receiver.rb Removed files: lib/droonga/job_queue.rb test/unit/test_job_queue_schema.rb Modified files: lib/droonga/partition.rb lib/droonga/processor.rb lib/droonga/server.rb lib/droonga/worker.rb Added: lib/droonga/event_loop.rb (+42 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/event_loop.rb 2013-12-09 17:31:05 +0900 (e656f8b) @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "coolio" + +module Droonga + class EventLoop + def initialize + @loop = Coolio::Loop.new + @loop_breaker = Coolio::AsyncWatcher.new + @loop_breaker.attach(@loop) + end + + def run + @loop.run + end + + def stop + @loop.stop + @loop_breaker.signal + end + + def attach(watcher) + @loop.attach(watcher) + @loop_breaker.signal + end + end +end Deleted: lib/droonga/job_queue.rb (+0 -87) 100644 =================================================================== --- lib/droonga/job_queue.rb 2013-12-09 17:29:56 +0900 (96ed113) +++ /dev/null @@ -1,87 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2013 Droonga Project -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License version 2.1 as published by the Free Software Foundation. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -require "droonga/job_queue_schema" -require "msgpack" - -module Droonga - class JobQueue - class << self - def ensure_schema(database_path, queue_name) - schema = JobQueueSchema.new(database_path, queue_name) - schema.ensure_created - end - - def open(database_path, queue_name) - job_queue = new(database_path, queue_name) - job_queue.open - job_queue - end - end - - def initialize(database_path, queue_name) - @database_path = database_path - @queue_name = queue_name - end - - def open - @context = Groonga::Context.new - @database =****@conte*****_database(@database_path) - @context.encoding = :none - - @queue = @context[@queue_name] - end - - def push_message(message) - $log.trace("#{log_tag}: push_message: start") - packed_message = message.to_msgpack - @queue.push do |record| - record.message = packed_message - end - $log.trace("#{log_tag}: push_message: done") - end - - def pull_message - packed_message = nil - @queue.pull do |record| - if record - packed_message = record.message - record.delete - end - end - return nil unless packed_message - MessagePack.unpack(packed_message) - end - - def unblock - @queue.unblock - end - - def close - @queue = nil - if @database - @database.close - @context.close - @database = @context = nil - end - end - - def log_tag - "[#{Process.ppid}][#{Process.pid}] job_queue" - end - end -end Added: lib/droonga/message_pusher.rb (+62 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/message_pusher.rb 2013-12-09 17:31:05 +0900 (e940379) @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "msgpack" + +require "droonga/event_loop" + +module Droonga + class MessagePusher + attr_reader :raw_receiver + def initialize + @loop = EventLoop.new + end + + def start + @raw_receiver = TCPServer.new("127.0.0.1", 0) + @loop_thread = Thread.new do + @loop.run + end + end + + def shutdown + $log.trace("#{log_tag}: shutdown: start") + @raw_receiver.close + @loop.stop + @loop_thread.join + $log.trace("#{log_tag}: shutdown: done") + end + + def push(message) + $log.trace("#{log_tag}: push: start") + packed_message = message.to_msgpack + _, port, _, ip_address = @raw_receiver.addr + sender = Coolio::TCPSocket.connect(ip_address, port) + sender.write(message.to_msgpack) + sender.on_write_complete do + close + end + @loop.attach(sender) + $log.trace("#{log_tag}: push: done") + end + + private + def log_tag + "message_pusher" + end + end +end Added: lib/droonga/message_receiver.rb (+63 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/message_receiver.rb 2013-12-09 17:31:05 +0900 (323c296) @@ -0,0 +1,63 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 Droonga Project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require "msgpack" + +require "droonga/event_loop" + +module Droonga + class MessageReceiver + def initialize(receiver, &callback) + @loop = EventLoop.new + @receiver = Coolio::Server.new(receiver, Coolio::Socket) do |connection| + setup_receive_handler(connection) + end + @loop.attach(@receiver) + @callback = callback + end + + def run + @loop.run + end + + def stop + $log.trace("#{log_tag}: shutdown: start") + @receiver.close + @loop.stop + $log.trace("#{log_tag}: shutdown: done") + end + + private + def setup_receive_handler(connection) + unpacker = MessagePack::Unpacker.new + on_read = lambda do |data| + $log.trace("#{log_tag}: on_read: start") + unpacker.feed_each(data) do |message| + @callback.call(message) + end + $log.trace("#{log_tag}: on_read: done") + end + connection.on_read do |data| + on_read.call(data) + end + end + + def log_tag + "message_puller" + end + end +end Modified: lib/droonga/partition.rb (+21 -2) =================================================================== --- lib/droonga/partition.rb 2013-12-09 17:29:56 +0900 (97614ba) +++ lib/droonga/partition.rb 2013-12-09 17:31:05 +0900 (661d5aa) @@ -19,6 +19,7 @@ require "serverengine" require "droonga/server" require "droonga/worker" +require "droonga/message_pusher" require "droonga/processor" module Droonga @@ -26,18 +27,22 @@ module Droonga def initialize(options={}) @options = options @n_workers = @options[:n_workers] || 0 - @processor = Processor.new(@options) + @message_pusher = MessagePusher.new + @processor = Processor.new(@message_pusher, @options) @supervisor = nil end def start + ensure_database @processor.start + @message_pusher.start start_supervisor if @n_workers > 0 end def shutdown $log.trace("partition: shutdown: start") shutdown_supervisor if @supervisor + @message_pusher.shutdown @processor.shutdown $log.trace("partition: shutdown: done") end @@ -49,6 +54,19 @@ module Droonga end private + def ensure_database + database_path = @options[:database] + return if File.exist?(database_path) + FileUtils.mkdir_p(File.dirname(database_path)) + context = Groonga::Context.new + begin + context.create_database(database_path) do + end + ensure + context.close + end + end + def start_supervisor @supervisor = ServerEngine::Supervisor.new(Server, Worker) do force_options = { @@ -56,7 +74,8 @@ module Droonga :workers => @options[:n_workers], :log_level => $log.level, :server_process_name => "Server[#{@options[:database]}] #$0", - :worker_process_name => "Worker[#{@options[:database]}] #$0" + :worker_process_name => "Worker[#{@options[:database]}] #$0", + :message_receiver => @message_pusher.raw_receiver, } @options.merge(force_options) end Modified: lib/droonga/processor.rb (+3 -9) =================================================================== --- lib/droonga/processor.rb 2013-12-09 17:29:56 +0900 (d98a927) +++ lib/droonga/processor.rb 2013-12-09 17:31:05 +0900 (fb449db) @@ -15,29 +15,23 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/job_queue" require "droonga/handler" module Droonga class Processor - def initialize(options={}) + def initialize(message_pusher, options={}) + @message_pusher = message_pusher @options = options - @database_name = @options[:database] - @queue_name = @options[:options] || "DroongaQueue" @n_workers = @options[:n_workers] || 0 end def start - Droonga::JobQueue.ensure_schema(@database_name, - @queue_name) - @job_queue = JobQueue.open(@database_name, @queue_name) @handler = Handler.new(@options) end def shutdown $log.trace("processor: shutdown: start") @handler.shutdown - @job_queue.close $log.trace("processor: shutdown: done") end @@ -53,7 +47,7 @@ module Droonga if @n_workers.zero? or synchronous @handler.process(envelope) else - @job_queue.push_message(envelope) + @message_pusher.push(envelope) end else $log.trace("proessor: process: ignore #{command}") Modified: lib/droonga/server.rb (+0 -67) =================================================================== --- lib/droonga/server.rb 2013-12-09 17:29:56 +0900 (f4a51c7) +++ lib/droonga/server.rb 2013-12-09 17:31:05 +0900 (a906653) @@ -15,8 +15,6 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/job_queue" - module Droonga module Server def before_run @@ -39,70 +37,5 @@ module Droonga def log_tag "[#{Process.ppid}][#{Process.pid}] server" end - - def start_worker(wid) - worker = super(wid) - worker.extend(WorkerStopper) - worker - end - - module WorkerStopper - def send_stop(stop_graceful) - in_signal_sending do - open_queue do |queue| - $log.trace("#{log_tag}: stop: start") - - $log.trace("#{log_tag}: stop: queue: unblock: start") - max_n_retries = 20 - max_n_retries.times do |i| - $log.trace("#{log_tag}: stop: queue: unblock: #{i}: start") - super(stop_graceful) - queue.unblock - alive_p = alive? - $log.trace("#{log_tag}: stop: queue: unblock: #{i}: done: " + - "#{alive_p}") - break unless alive_p - sleep(i * 0.1) - end - $log.trace("#{log_tag}: stop: queue: unblock: done") - - $log.trace("#{log_tag}: stop: done") - end - end - end - - def send_reload - in_signal_sending do - open_queue do |queue| - $log.trace("#{log_tag}: reload: start") - super - queue.unblock - $log.trace("#{log_tag}: reload: done") - end - end - end - - private - def log_tag - "[#{Process.ppid}][#{Process.pid}][#{@wid}] server: worker-stopper" - end - - def in_signal_sending - Thread.new do - yield - end - end - - def open_queue - config =****@worke***** - queue = JobQueue.open(config[:database], - config[:queue_name] || "DroongaQueue") - begin - yield(queue) - ensure - queue.close - end - end - end end end Modified: lib/droonga/worker.rb (+10 -19) =================================================================== --- lib/droonga/worker.rb 2013-12-09 17:29:56 +0900 (04c6432) +++ lib/droonga/worker.rb 2013-12-09 17:31:05 +0900 (f861a48) @@ -15,44 +15,35 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/job_queue" require "droonga/handler" +require "droonga/message_receiver" module Droonga module Worker def initialize - @database_name = config[:database] - @queue_name = config[:queue_name] || "DroongaQueue" + @handler = Handler.new(config) + receiver_socket = config[:message_receiver] + @message_receiver = MessageReceiver.new(receiver_socket) do |message| + process(message) + end end def run $log.trace("#{log_tag}: run: start") - handler = Handler.new(config) - job_queue = JobQueue.open(@database_name, @queue_name) - @running = true - while @running - process(handler, job_queue) - end - handler.shutdown - job_queue.close + @message_receiver.run $log.trace("#{log_tag}: run: done") end def stop $log.trace("#{log_tag}: stop: start") - @running = false + @message_receiver.stop $log.trace("#{log_tag}: stop: done") end private - def process(handler, job_queue) + def process(envelope) $log.trace("#{log_tag}: process: start") - envelope = job_queue.pull_message - unless envelope - $log.trace("#{log_tag}: process: abort: no message") - return - end - handler.process(envelope) + @handler.process(envelope) $log.trace("#{log_tag}: process: done") end Deleted: test/unit/test_job_queue_schema.rb (+0 -45) 100644 =================================================================== --- test/unit/test_job_queue_schema.rb 2013-12-09 17:29:56 +0900 (624b41e) +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright (C) 2013 Droonga Project -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License version 2.1 as published by the Free Software Foundation. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -require "helper" - -require "droonga/job_queue_schema" - -class JobQueueSchemaTest < Test::Unit::TestCase - def setup - @database_path = @temporary_directory + "droonga/db" - @queue_name = "DroongaQueue" - end - - def test_ensure_created - schema = Droonga::JobQueueSchema.new(@database_path.to_s, @queue_name) - - assert_not_predicate(@database_path, :exist?) - schema.ensure_created - assert_predicate(@database_path, :exist?) - - context = Groonga::Context.new - dumped_commands = nil - context.open_database(@database_path.to_s) do |database| - dumped_commands = Groonga::DatabaseDumper.dump(:context => context, - :database => database) - end - context.close - assert_equal(<<-SCHEMA, dumped_commands) -table_create #{@queue_name} TABLE_NO_KEY -column_create #{@queue_name} message COLUMN_SCALAR Text -SCHEMA - end -end -------------- next part -------------- HTML����������������������������...下載