[Groonga-commit] droonga/fluent-plugin-droonga at 1875a7d [master] Use TCP/IP based job queue instead of Groonga's table based job queue

Back to archive index

Kouhei Sutou null+****@clear*****
Mon Dec 9 17:31:05 JST 2013


Kouhei Sutou	2013-12-09 17:31:05 +0900 (Mon, 09 Dec 2013)

  New Revision: 1875a7d1f91b00012b72f0e0167dc6a9d4a8b6d4
  https://github.com/droonga/fluent-plugin-droonga/commit/1875a7d1f91b00012b72f0e0167dc6a9d4a8b6d4

  Message:
    Use TCP/IP based job queue instead of Groonga's table based job queue
    
    It decreases shutdown time but it may also decrease messaging
    throughput.

  Added files:
    lib/droonga/event_loop.rb
    lib/droonga/message_pusher.rb
    lib/droonga/message_receiver.rb
  Removed files:
    lib/droonga/job_queue.rb
    test/unit/test_job_queue_schema.rb
  Modified files:
    lib/droonga/partition.rb
    lib/droonga/processor.rb
    lib/droonga/server.rb
    lib/droonga/worker.rb

  Added: lib/droonga/event_loop.rb (+42 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/event_loop.rb    2013-12-09 17:31:05 +0900 (e656f8b)
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "coolio"
+
+module Droonga
+  class EventLoop
+    def initialize
+      @loop = Coolio::Loop.new
+      @loop_breaker = Coolio::AsyncWatcher.new
+      @loop_breaker.attach(@loop)
+    end
+
+    def run
+      @loop.run
+    end
+
+    def stop
+      @loop.stop
+      @loop_breaker.signal
+    end
+
+    def attach(watcher)
+      @loop.attach(watcher)
+      @loop_breaker.signal
+    end
+  end
+end

  Deleted: lib/droonga/job_queue.rb (+0 -87) 100644
===================================================================
--- lib/droonga/job_queue.rb    2013-12-09 17:29:56 +0900 (96ed113)
+++ /dev/null
@@ -1,87 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2013 Droonga Project
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License version 2.1 as published by the Free Software Foundation.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-
-require "droonga/job_queue_schema"
-require "msgpack"
-
-module Droonga
-  class JobQueue
-    class << self
-      def ensure_schema(database_path, queue_name)
-        schema = JobQueueSchema.new(database_path, queue_name)
-        schema.ensure_created
-      end
-
-      def open(database_path, queue_name)
-        job_queue = new(database_path, queue_name)
-        job_queue.open
-        job_queue
-      end
-    end
-
-    def initialize(database_path, queue_name)
-      @database_path = database_path
-      @queue_name = queue_name
-    end
-
-    def open
-      @context = Groonga::Context.new
-      @database =****@conte*****_database(@database_path)
-      @context.encoding = :none
-
-      @queue = @context[@queue_name]
-    end
-
-    def push_message(message)
-      $log.trace("#{log_tag}: push_message: start")
-      packed_message = message.to_msgpack
-      @queue.push do |record|
-        record.message = packed_message
-      end
-      $log.trace("#{log_tag}: push_message: done")
-    end
-
-    def pull_message
-      packed_message = nil
-      @queue.pull do |record|
-        if record
-          packed_message = record.message
-          record.delete
-        end
-      end
-      return nil unless packed_message
-      MessagePack.unpack(packed_message)
-    end
-
-    def unblock
-      @queue.unblock
-    end
-
-    def close
-      @queue = nil
-      if @database
-        @database.close
-        @context.close
-        @database = @context = nil
-      end
-    end
-
-    def log_tag
-      "[#{Process.ppid}][#{Process.pid}] job_queue"
-    end
-  end
-end

  Added: lib/droonga/message_pusher.rb (+62 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/message_pusher.rb    2013-12-09 17:31:05 +0900 (e940379)
@@ -0,0 +1,62 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "msgpack"
+
+require "droonga/event_loop"
+
+module Droonga
+  class MessagePusher
+    attr_reader :raw_receiver
+    def initialize
+      @loop = EventLoop.new
+    end
+
+    def start
+      @raw_receiver = TCPServer.new("127.0.0.1", 0)
+      @loop_thread = Thread.new do
+        @loop.run
+      end
+    end
+
+    def shutdown
+      $log.trace("#{log_tag}: shutdown: start")
+      @raw_receiver.close
+      @loop.stop
+      @loop_thread.join
+      $log.trace("#{log_tag}: shutdown: done")
+    end
+
+    def push(message)
+      $log.trace("#{log_tag}: push: start")
+      packed_message = message.to_msgpack
+      _, port, _, ip_address = @raw_receiver.addr
+      sender = Coolio::TCPSocket.connect(ip_address, port)
+      sender.write(message.to_msgpack)
+      sender.on_write_complete do
+        close
+      end
+      @loop.attach(sender)
+      $log.trace("#{log_tag}: push: done")
+    end
+
+    private
+    def log_tag
+      "message_pusher"
+    end
+  end
+end

  Added: lib/droonga/message_receiver.rb (+63 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/message_receiver.rb    2013-12-09 17:31:05 +0900 (323c296)
@@ -0,0 +1,63 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "msgpack"
+
+require "droonga/event_loop"
+
+module Droonga
+  class MessageReceiver
+    def initialize(receiver, &callback)
+      @loop = EventLoop.new
+      @receiver = Coolio::Server.new(receiver, Coolio::Socket) do |connection|
+        setup_receive_handler(connection)
+      end
+      @loop.attach(@receiver)
+      @callback = callback
+    end
+
+    def run
+      @loop.run
+    end
+
+    def stop
+      $log.trace("#{log_tag}: shutdown: start")
+      @receiver.close
+      @loop.stop
+      $log.trace("#{log_tag}: shutdown: done")
+    end
+
+    private
+    def setup_receive_handler(connection)
+      unpacker = MessagePack::Unpacker.new
+      on_read = lambda do |data|
+        $log.trace("#{log_tag}: on_read: start")
+        unpacker.feed_each(data) do |message|
+          @callback.call(message)
+        end
+        $log.trace("#{log_tag}: on_read: done")
+      end
+      connection.on_read do |data|
+        on_read.call(data)
+      end
+    end
+
+    def log_tag
+      "message_puller"
+    end
+  end
+end

  Modified: lib/droonga/partition.rb (+21 -2)
===================================================================
--- lib/droonga/partition.rb    2013-12-09 17:29:56 +0900 (97614ba)
+++ lib/droonga/partition.rb    2013-12-09 17:31:05 +0900 (661d5aa)
@@ -19,6 +19,7 @@ require "serverengine"
 
 require "droonga/server"
 require "droonga/worker"
+require "droonga/message_pusher"
 require "droonga/processor"
 
 module Droonga
@@ -26,18 +27,22 @@ module Droonga
     def initialize(options={})
       @options = options
       @n_workers = @options[:n_workers] || 0
-      @processor = Processor.new(@options)
+      @message_pusher = MessagePusher.new
+      @processor = Processor.new(@message_pusher, @options)
       @supervisor = nil
     end
 
     def start
+      ensure_database
       @processor.start
+      @message_pusher.start
       start_supervisor if @n_workers > 0
     end
 
     def shutdown
       $log.trace("partition: shutdown: start")
       shutdown_supervisor if @supervisor
+      @message_pusher.shutdown
       @processor.shutdown
       $log.trace("partition: shutdown: done")
     end
@@ -49,6 +54,19 @@ module Droonga
     end
 
     private
+    def ensure_database
+      database_path = @options[:database]
+      return if File.exist?(database_path)
+      FileUtils.mkdir_p(File.dirname(database_path))
+      context = Groonga::Context.new
+      begin
+        context.create_database(database_path) do
+        end
+      ensure
+        context.close
+      end
+    end
+
     def start_supervisor
       @supervisor = ServerEngine::Supervisor.new(Server, Worker) do
         force_options = {
@@ -56,7 +74,8 @@ module Droonga
           :workers       => @options[:n_workers],
           :log_level     => $log.level,
           :server_process_name => "Server[#{@options[:database]}] #$0",
-          :worker_process_name => "Worker[#{@options[:database]}] #$0"
+          :worker_process_name => "Worker[#{@options[:database]}] #$0",
+          :message_receiver => @message_pusher.raw_receiver,
         }
         @options.merge(force_options)
       end

  Modified: lib/droonga/processor.rb (+3 -9)
===================================================================
--- lib/droonga/processor.rb    2013-12-09 17:29:56 +0900 (d98a927)
+++ lib/droonga/processor.rb    2013-12-09 17:31:05 +0900 (fb449db)
@@ -15,29 +15,23 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/job_queue"
 require "droonga/handler"
 
 module Droonga
   class Processor
-    def initialize(options={})
+    def initialize(message_pusher, options={})
+      @message_pusher = message_pusher
       @options = options
-      @database_name = @options[:database]
-      @queue_name = @options[:options] || "DroongaQueue"
       @n_workers = @options[:n_workers] || 0
     end
 
     def start
-      Droonga::JobQueue.ensure_schema(@database_name,
-                                      @queue_name)
-      @job_queue = JobQueue.open(@database_name, @queue_name)
       @handler = Handler.new(@options)
     end
 
     def shutdown
       $log.trace("processor: shutdown: start")
       @handler.shutdown
-      @job_queue.close
       $log.trace("processor: shutdown: done")
     end
 
@@ -53,7 +47,7 @@ module Droonga
         if @n_workers.zero? or synchronous
           @handler.process(envelope)
         else
-          @job_queue.push_message(envelope)
+          @message_pusher.push(envelope)
         end
       else
         $log.trace("proessor: process: ignore #{command}")

  Modified: lib/droonga/server.rb (+0 -67)
===================================================================
--- lib/droonga/server.rb    2013-12-09 17:29:56 +0900 (f4a51c7)
+++ lib/droonga/server.rb    2013-12-09 17:31:05 +0900 (a906653)
@@ -15,8 +15,6 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/job_queue"
-
 module Droonga
   module Server
     def before_run
@@ -39,70 +37,5 @@ module Droonga
     def log_tag
       "[#{Process.ppid}][#{Process.pid}] server"
     end
-
-    def start_worker(wid)
-      worker = super(wid)
-      worker.extend(WorkerStopper)
-      worker
-    end
-
-    module WorkerStopper
-      def send_stop(stop_graceful)
-        in_signal_sending do
-          open_queue do |queue|
-            $log.trace("#{log_tag}: stop: start")
-
-            $log.trace("#{log_tag}: stop: queue: unblock: start")
-            max_n_retries = 20
-            max_n_retries.times do |i|
-              $log.trace("#{log_tag}: stop: queue: unblock: #{i}: start")
-              super(stop_graceful)
-              queue.unblock
-              alive_p = alive?
-              $log.trace("#{log_tag}: stop: queue: unblock: #{i}: done: " +
-                         "#{alive_p}")
-              break unless alive_p
-              sleep(i * 0.1)
-            end
-            $log.trace("#{log_tag}: stop: queue: unblock: done")
-
-            $log.trace("#{log_tag}: stop: done")
-          end
-        end
-      end
-
-      def send_reload
-        in_signal_sending do
-          open_queue do |queue|
-            $log.trace("#{log_tag}: reload: start")
-            super
-            queue.unblock
-            $log.trace("#{log_tag}: reload: done")
-          end
-        end
-      end
-
-      private
-      def log_tag
-        "[#{Process.ppid}][#{Process.pid}][#{@wid}] server: worker-stopper"
-      end
-
-      def in_signal_sending
-        Thread.new do
-          yield
-        end
-      end
-
-      def open_queue
-        config =****@worke*****
-        queue = JobQueue.open(config[:database],
-                              config[:queue_name] || "DroongaQueue")
-        begin
-          yield(queue)
-        ensure
-          queue.close
-        end
-      end
-    end
   end
 end

  Modified: lib/droonga/worker.rb (+10 -19)
===================================================================
--- lib/droonga/worker.rb    2013-12-09 17:29:56 +0900 (04c6432)
+++ lib/droonga/worker.rb    2013-12-09 17:31:05 +0900 (f861a48)
@@ -15,44 +15,35 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/job_queue"
 require "droonga/handler"
+require "droonga/message_receiver"
 
 module Droonga
   module Worker
     def initialize
-      @database_name = config[:database]
-      @queue_name = config[:queue_name] || "DroongaQueue"
+      @handler = Handler.new(config)
+      receiver_socket = config[:message_receiver]
+      @message_receiver = MessageReceiver.new(receiver_socket) do |message|
+        process(message)
+      end
     end
 
     def run
       $log.trace("#{log_tag}: run: start")
-      handler = Handler.new(config)
-      job_queue = JobQueue.open(@database_name, @queue_name)
-      @running = true
-      while @running
-        process(handler, job_queue)
-      end
-      handler.shutdown
-      job_queue.close
+      @message_receiver.run
       $log.trace("#{log_tag}: run: done")
     end
 
     def stop
       $log.trace("#{log_tag}: stop: start")
-      @running = false
+      @message_receiver.stop
       $log.trace("#{log_tag}: stop: done")
     end
 
     private
-    def process(handler, job_queue)
+    def process(envelope)
       $log.trace("#{log_tag}: process: start")
-      envelope = job_queue.pull_message
-      unless envelope
-        $log.trace("#{log_tag}: process: abort: no message")
-        return
-      end
-      handler.process(envelope)
+      @handler.process(envelope)
       $log.trace("#{log_tag}: process: done")
     end
 

  Deleted: test/unit/test_job_queue_schema.rb (+0 -45) 100644
===================================================================
--- test/unit/test_job_queue_schema.rb    2013-12-09 17:29:56 +0900 (624b41e)
+++ /dev/null
@@ -1,45 +0,0 @@
-# Copyright (C) 2013 Droonga Project
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License version 2.1 as published by the Free Software Foundation.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-
-require "helper"
-
-require "droonga/job_queue_schema"
-
-class JobQueueSchemaTest < Test::Unit::TestCase
-  def setup
-    @database_path = @temporary_directory + "droonga/db"
-    @queue_name = "DroongaQueue"
-  end
-
-  def test_ensure_created
-    schema = Droonga::JobQueueSchema.new(@database_path.to_s, @queue_name)
-
-    assert_not_predicate(@database_path, :exist?)
-    schema.ensure_created
-    assert_predicate(@database_path, :exist?)
-
-    context = Groonga::Context.new
-    dumped_commands = nil
-    context.open_database(@database_path.to_s) do |database|
-      dumped_commands = Groonga::DatabaseDumper.dump(:context => context,
-                                                     :database => database)
-    end
-    context.close
-    assert_equal(<<-SCHEMA, dumped_commands)
-table_create #{@queue_name} TABLE_NO_KEY
-column_create #{@queue_name} message COLUMN_SCALAR Text
-SCHEMA
-  end
-end
-------------- next part --------------
HTML����������������������������...
下載 



More information about the Groonga-commit mailing list
Back to archive index