[Groonga-commit] droonga/drnbench at 2e141f8 [master] Run benchmark clients with multiple processes

Back to archive index

YUKI Hiroshi null+****@clear*****
Thu Nov 27 16:16:39 JST 2014


YUKI Hiroshi	2014-11-27 16:16:39 +0900 (Thu, 27 Nov 2014)

  New Revision: 2e141f8c556265a81d95dd551d73e12051dd797f
  https://github.com/droonga/drnbench/commit/2e141f8c556265a81d95dd551d73e12051dd797f

  Message:
    Run benchmark clients with multiple processes

  Modified files:
    drnbench.gemspec
    lib/drnbench/client/http-droonga.rb
    lib/drnbench/client/http.rb
    lib/drnbench/request-response/result.rb
    lib/drnbench/request-response/runner.rb

  Modified: drnbench.gemspec (+1 -0)
===================================================================
--- drnbench.gemspec    2014-10-07 16:21:31 +0900 (0d5fd66)
+++ drnbench.gemspec    2014-11-27 16:16:39 +0900 (176bfd7)
@@ -51,6 +51,7 @@ Gem::Specification.new do |spec|
   spec.add_runtime_dependency("json")
   spec.add_runtime_dependency("droonga-client")
   spec.add_runtime_dependency("drntest")
+  spec.add_runtime_dependency("facter")
 
   spec.add_development_dependency("bundler")
   spec.add_development_dependency("rake")

  Modified: lib/drnbench/client/http-droonga.rb (+2 -2)
===================================================================
--- lib/drnbench/client/http-droonga.rb    2014-10-07 16:21:31 +0900 (b6735e0)
+++ lib/drnbench/client/http-droonga.rb    2014-11-27 16:16:39 +0900 (091755d)
@@ -19,9 +19,9 @@ module Drnbench
     DEFAULT_COMMAND   = "search"
     DEFAULT_METHOD    = "POST"
 
-    def initialize(params, config)
+    def initialize(params)
       super
-      @command = params["command"] || DEFAULT_COMMAND
+      @command = params[:command] || DEFAULT_COMMAND
     end
 
     private

  Modified: lib/drnbench/client/http.rb (+27 -12)
===================================================================
--- lib/drnbench/client/http.rb    2014-10-07 16:21:31 +0900 (5d6b4f0)
+++ lib/drnbench/client/http.rb    2014-11-27 16:16:39 +0900 (6407fa6)
@@ -16,28 +16,36 @@
 require "thread"
 require "droonga/client"
 require "json"
+require "drb"
 
 module Drnbench
   class HttpClient
-    attr_reader :requests, :results, :wait
+    attr_reader :runner, :results, :wait
 
     SUPPORTED_HTTP_METHODS = ["GET", "POST"]
 
     @@count = 0
 
-    def initialize(params, config)
-      @requests = params[:requests]
-      @result   = params[:result]
-      @config   = config
+    def initialize(params)
+      @runner   = params[:runner]
+      @config   = params[:config]
       @count    = 0
       @id       = @@count
       @@count += 1
+      @thread = nil
     end
 
     def run
       @thread = Thread.new do
+        start_time = Time.now
         loop do
-          request =****@reque*****
+          if****@runne*****?
+            puts "WORNING: requests queue becomes empty! (#{Time.now - start_time} sec)"
+            stop
+            break
+          end
+
+          request =****@runne*****_request
           request = fixup_request(request)
 
           client = Droonga::Client.new(:protocol => :http,
@@ -51,21 +59,21 @@ module Drnbench
           @last_start_time = start_time
           begin
           response = client.request(request)
-            @result << {
+            @runner.push_result(
               :request => request,
               :status => response.code,
               :elapsed_time => Time.now - start_time,
               :client => @id,
               :index => @count,
-            }
+            )
           rescue Timeout::Error
-            @result << {
+            @runner.push_result(
               :request => request,
               :status => "0",
               :elapsed_time => Time.now - start_time,
               :client => @id,
               :index => @count,
-            }
+            )
           end
           @last_request = nil
           @last_start_time = nil
@@ -77,20 +85,27 @@ module Drnbench
     end
 
     def stop
+      return unless @thread
+
       @thread.exit
+      @thread = nil
 
       if @last_request
-        @result << {
+        @runner.push_result(
           :request => @last_request,
           :status => "0",
           :elapsed_time => Time.now - @last_start_time,
           :client => @id,
           :index => @count,
           :last => true,
-        }
+        )
       end
     end
 
+    def running?
+      not****@threa*****?
+    end
+
     private
     def fixup_request(request)
       request["host"] ||=****@confi*****_host

  Modified: lib/drnbench/request-response/result.rb (+4 -0)
===================================================================
--- lib/drnbench/request-response/result.rb    2014-10-07 16:21:31 +0900 (cea99fa)
+++ lib/drnbench/request-response/result.rb    2014-11-27 16:16:39 +0900 (6bd15c5)
@@ -44,6 +44,10 @@ module Drnbench
       end
 
       def <<(result)
+        push(result)
+      end
+
+      def push(result)
         clear_cached_statistics
 
         @results << result

  Modified: lib/drnbench/request-response/runner.rb (+133 -20)
===================================================================
--- lib/drnbench/request-response/runner.rb    2014-10-07 16:21:31 +0900 (e13aa7f)
+++ lib/drnbench/request-response/runner.rb    2014-11-27 16:16:39 +0900 (68deb25)
@@ -13,6 +13,7 @@
 # You should have received a copy of the GNU General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+require "facter"
 require "drnbench/client/http"
 require "drnbench/client/http-droonga"
 require "drnbench/request-response/result"
@@ -23,6 +24,10 @@ module Drnbench
     class Runner
       attr_reader :n_clients, :result
 
+      MESSAGE_EXIT     = "exit"
+      MESSAGE_START    = "start"
+      MESSAGE_COMPLETE = "complete"
+
       def initialize(n_clients, config)
         n_clients = 1 if n_clients.zero?
         @n_clients = n_clients
@@ -37,50 +42,158 @@ module Drnbench
         @result
       end
 
+      def pop_request
+        @requests_queue.pop
+      end
+
+      def push_result(result)
+        @result << result
+      end
+
+      def empty?
+        @requests_queue.empty?
+      end
+
       private
       def process_requests
-        requests_queue = Queue.new
+        @requests_queue = Queue.new
         @requests.each do |request|
-          requests_queue.push(request)
+          @requests_queue.push(request)
         end
-
         @result = Result.new(:n_clients => @n_clients,
                              :duration => @config.duration,
                              :n_slow_requests => @config.n_slow_requests)
 
-        client_params = {
-          :requests => requests_queue,
-          :result   => @result,
-        }
-        @clients = @n_clients.times.collect do |index|
-          client = nil
+        setup_child_processes
+        initiate_child_processes
+        wait_for_given_duration
+        kill_child_processes
+
+        @result
+      end
+
+      def setup_child_processes
+        @child_process_pipes = []
+        n_processes.times.each do |index|
+          setup_child_process
+        end
+      end
+
+      def setup_child_process
+        parent_read, child_write = IO.pipe
+        child_read, parent_write = IO.pipe
+        @child_process_pipes << [parent_read, parent_write]
+
+        child_process_requests_queue = Queue.new
+        n_requests_per_process.times.each do |index|
+          child_process_requests_queue.push(@requests_queue.pop)
+        end
+
+        fork do
+          parent_write.close
+          parent_read.close
+          druby_uri = child_read.gets.chomp
+          @parent = DRbObject.new_with_uri(druby_uri)
+
+          @requests_queue = child_process_requests_queue
+          @result = []
+
+          clients = setup_clients(n_clients_per_process)
+
+          loop do
+            message = child_read.gets
+            if message and message.chomp == MESSAGE_EXIT
+              clients.each(&:stop)
+              @result.each do |result|
+                @parent.push_result(result)
+              end
+              child_write.puts(MESSAGE_COMPLETE)
+              child_write.close
+              exit!
+            end
+            sleep(3)
+          end
+        end
+        child_read.close
+        child_write.close
+      end
+
+      def setup_clients(count)
+        count.times.collect do |index|
           case****@confi*****
           when :http
-            client = HttpClient.new(client_params, @config)
+            client = HttpClient.new(:runner => self,
+                                    :config => @config)
           when :http_droonga
-            client = HttpDroongaClient.new(client_params, @config)
+            client = HttpDroongaClient.new(:runner => self,
+                                           :config => @config)
           else
             raise ArgumentError.new("Unknown mode: #{@config.mode}")
           end
           client.run
           client
         end
+      end
 
+      def initiate_child_processes
+        DRb.start_service("druby://localhost:0", self)
+        @child_process_pipes.each do |input, output|
+          output.puts(DRb.uri)
+        end
+      end
+
+      ONE_MINUTE_IN_SECONDS = 60
+      ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60
+
+      def wait_for_given_duration
         start_time = Time.now
-        while Time.now - start_time < @config.duration
+        last_message = ""
+        loop do
           sleep 1
-          if requests_queue.empty?
-            puts "WORNING: requests queue becomes empty! (#{Time.now - start_time} sec)"
-            @result.duration = Time.now - start_time
-            break
-          end
+          elapsed_time = (Time.now - start_time).to_i
+          break if elapsed_time >=****@confi*****
+
+          remaining_seconds  =****@confi***** - elapsed_time
+          remaining_hours    = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor
+          remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS
+          remaining_minutes  = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor
+          remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS
+          remaining_time     = sprintf("%02i:%02i:%02i", remaining_hours, remaining_minutes, remaining_seconds)
+          next_message = "#{remaining_time} remaining..."
+          printf("%s", "#{" " * last_message.size}\r")
+          printf("%s", "#{next_message}\r")
+          last_message = next_message
+        end
+      end
+
+      def kill_child_processes
+        @child_process_pipes.each do |input, output|
+          output.puts(MESSAGE_EXIT)
         end
 
-        @clients.each do |client|
-          client.stop
+        loop do
+          @child_process_pipes = @child_process_pipes.reject do |input, output|
+            message = input.gets
+            message and message.chomp == MESSAGE_COMPLETE
+          end
+          break if @child_process_pipes.empty?
         end
+      end
 
-        @result
+      def n_processes
+        [[@n_clients, 1].max, max_n_processes].min
+      end
+
+      def max_n_processes
+        Facter["processorcount"].value.to_i
+      end
+
+      def n_clients_per_process
+        (@n_clients.to_f / n_processes).round
+      end
+
+      def n_requests_per_process
+        (@requests.size.to_f / n_processes).round
       end
     end
   end
-------------- next part --------------
HTML����������������������������...
下載 



More information about the Groonga-commit mailing list
Back to archive index