[Groonga-commit] droonga/fluent-plugin-droonga at 7c50c20 [master] Support n_workers > 0 configuration

Back to archive index

Kouhei Sutou null+****@clear*****
Thu Sep 26 18:00:53 JST 2013


Kouhei Sutou	2013-09-26 18:00:53 +0900 (Thu, 26 Sep 2013)

  New Revision: 7c50c20c63fee4e2262de9f5fb23a58cfc247005
  https://github.com/droonga/fluent-plugin-droonga/commit/7c50c20c63fee4e2262de9f5fb23a58cfc247005

  Message:
    Support n_workers > 0 configuration

  Modified files:
    lib/droonga/engine.rb
    lib/droonga/executor.rb
    lib/droonga/server.rb
    lib/droonga/worker.rb

  Modified: lib/droonga/engine.rb (+5 -55)
===================================================================
--- lib/droonga/engine.rb    2013-09-26 17:41:18 +0900 (cd35395)
+++ lib/droonga/engine.rb    2013-09-26 18:00:53 +0900 (23d28e6)
@@ -28,11 +28,7 @@ module Droonga
     DEFAULT_OPTIONS = {
       :queue_name => "DroongaQueue",
       :n_workers  => 0,
-      :with_server  => false
     }
-    # TODO: It doesn't work fine when n_workers > 0 && number of databases > 1
-    #       since more than one ServerEngine instance can't be in a process.
-    #       It causes dump_uncaught_error in the SignalThread.
 
     def initialize(options={})
       @options = DEFAULT_OPTIONS.merge(options)
@@ -43,74 +39,28 @@ module Droonga
         Droonga::JobQueue.ensure_schema(@options[:database],
                                         @options[:queue_name])
       end
-      if @options[:n_workers] > 0 || @options[:with_server]
-        @message_input, @message_output = IO.pipe
-        @message_input.sync = true
-        @message_output.sync = true
-        start_supervisor
-      end
-      if @options[:with_server]
-        start_emitter
-      else
-        @executor = Executor.new(@options)
-      end
+      start_supervisor if @options[:n_workers] > 0
+      @executor = Executor.new(@options)
     end
 
     def shutdown
       $log.trace("engine: shutdown: start")
-      shutdown_emitter if @emitter
       @executor.shutdown if @executor
-      if @supervisor
-        shutdown_supervisor
-        @message_input.close unless @message_input.closed?
-        @message_output.close unless @message_output.closed?
-      end
+      shutdown_supervisor if @supervisor
       $log.trace("engine: shutdown: done")
     end
 
     def emit(tag, time, record, synchronous=nil)
       $log.trace("tag: <#{tag}>")
-      if @executor
-        @executor.dispatch(tag, time, record, synchronous)
-      else
-        @emitter.write(MessagePack.pack([tag, time, record, synchronous]))
-        @loop_breaker.signal
-      end
+      @executor.dispatch(tag, time, record, synchronous)
     end
 
     private
-    def start_emitter
-      @loop = Coolio::Loop.new
-      @emitter = Coolio::IO.new(@message_output)
-      @emitter.on_write_complete do
-        $log.trace("emitter: written")
-      end
-      @emitter.attach(@loop)
-      @loop_breaker = Coolio::AsyncWatcher.new
-      @loop_breaker.attach(@loop)
-      @emitter_thread = Thread.new do
-        @loop.run
-      end
-    end
-
-    def shutdown_emitter
-      $log.trace("emitter: shutdown: start")
-      @emitter.close
-      $log.trace("emitter: shutdown: emitter: closed")
-      @loop.stop
-      @loop_breaker.signal
-      $log.trace("emitter: shutdown: loop: stopped")
-      @emitter_thread.join
-      $log.trace("emitter: shutdown: done")
-    end
-
     def start_supervisor
-      server = @options[:with_server] ? Server : nil
-      @supervisor = ServerEngine::Supervisor.new(server, Worker) do
+      @supervisor = ServerEngine::Supervisor.new(Server, Worker) do
         force_options = {
           :worker_type   => "process",
           :workers       => @options[:n_workers],
-          :message_input => @message_input,
           :log_level     => $log.level,
         }
         @options.merge(force_options)

  Modified: lib/droonga/executor.rb (+0 -8)
===================================================================
--- lib/droonga/executor.rb    2013-09-26 17:41:18 +0900 (bb649b4)
+++ lib/droonga/executor.rb    2013-09-26 18:00:53 +0900 (7b274b9)
@@ -57,14 +57,6 @@ module Droonga
       end
     end
 
-    def unblock_queue
-      3.times do |i|
-        super
-        @queue.unblock
-        sleep(i ** 2 * 0.1)
-      end
-    end
-
     def add_handler(name)
       plugin = HandlerPlugin.new(name)
       @handlers << plugin.instantiate(self)

  Modified: lib/droonga/server.rb (+16 -71)
===================================================================
--- lib/droonga/server.rb    2013-09-26 17:41:18 +0900 (78b1596)
+++ lib/droonga/server.rb    2013-09-26 18:00:53 +0900 (28cc399)
@@ -15,93 +15,38 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "msgpack"
-require "cool.io"
 require "groonga"
 
-require "droonga/executor"
-
 module Droonga
   module Server
-    class Receiver
-      def initialize(input)
-        @input = input
-      end
-
-      def run
-        $log.trace
-        @io = Coolio::IO.new(@input)
-        unpacker = MessagePack::Unpacker.new
-        @io.on_read do |data|
-          $log.trace("receiver: received: <#{data.bytesize}>")
-          unpacker.feed_each(data) do |message|
-            yield message
-          end
-        end
-        @loop = Coolio::Loop.new
-        @loop.attach(@io)
-        @loop_breaker = Coolio::AsyncWatcher.new
-        @loop.attach(@loop_breaker)
-        @running = true
-        @loop.run
-      end
-
-      def stop
-        unless @running
-          $log.trace("receiver: stop: not needed")
-          return
-        end
-
-        $log.trace("receiver: stop: start")
-        @io.close
-        $log.trace("receiver: stop: closed")
-        @loop.stop
-        @running = false
-        @loop_breaker.signal
-        $log.trace("receiver: stop: done")
-      end
-    end
-
-    def initialize
-      super
-      @message_input = config[:message_input]
-      @executor = Executor.new(config)
-    end
-
     def before_run
-      @receiver = Receiver.new(@message_input)
-      @receiver_thread = Thread.new do
-        @receiver.run do |message|
-          $log.trace("received: start")
-          @executor.dispatch(*message)
-          $log.trace("received: done")
-        end
-      end
+      $log.trace("server: before_run: start")
+      # TODO: Use JobQueue object
+      @context = Groonga::Context.new
+      @database =****@conte*****_database(config[:database])
+      @queue = @context[config[:queue_name]]
+      $log.trace("server: before_run: done")
     end
 
     def after_run
       $log.trace("server: after_run: start")
-
-      $log.trace("server: after_run: receiver: start")
-      @receiver_thread.join
-      $log.trace("server: after_run: receiver: done")
-
-      $log.trace("server: after_run: groonga: start")
-      @executor.shutdown
-      $log.trace("server: after_run: groonga: done")
-
+      @queue.close
+      @database.close
+      @context.close
       $log.trace("server: after_run: done")
     end
 
     def stop(stop_graceful)
       $log.trace("server: stop: start")
 
-      $log.trace("server: stop: receiver: stop: start")
-      @receiver.stop
-      $log.trace("server: stop: receiver: stop: done")
-
       $log.trace("server: stop: queue: unblock: start")
-      @executor.unblock_queue
+      3.times do |i|
+        $log.trace("server: stop: queue: unblock: #{i}: start")
+        super(stop_graceful)
+        @queue.unblock
+        sleep(i ** 2 * 0.1)
+        $log.trace("server: stop: queue: unblock: #{i}: done")
+      end
       $log.trace("server: stop: queue: unblock: done")
 
       $log.trace("server: stop: done")

  Modified: lib/droonga/worker.rb (+0 -19)
===================================================================
--- lib/droonga/worker.rb    2013-09-26 17:41:18 +0900 (ba5aa95)
+++ lib/droonga/worker.rb    2013-09-26 18:00:53 +0900 (54b198f)
@@ -42,24 +42,5 @@ module Droonga
       @running = false
       $log.trace("worker: stop: done")
     end
-
-    private
-    def shutdown_workers
-      @pool.each do |pid|
-        Process.kill(:TERM, pid)
-      end
-      queue = @context[@queue_name]
-      3.times do |i|
-        break if****@pool*****?
-        queue.unblock
-        @pool.reject! do |pid|
-          not Process.waitpid(pid, Process::WNOHANG).nil?
-        end
-        sleep(i ** 2 * 0.1)
-      end
-      @pool.each do |pid|
-        Process.kill(:KILL, pid)
-      end
-    end
   end
 end
-------------- next part --------------
HTML����������������������������...
下載 



More information about the Groonga-commit mailing list
Back to archive index