YUKI Hiroshi
null+****@clear*****
Thu Nov 27 16:16:39 JST 2014
YUKI Hiroshi 2014-11-27 16:16:39 +0900 (Thu, 27 Nov 2014) New Revision: 2e141f8c556265a81d95dd551d73e12051dd797f https://github.com/droonga/drnbench/commit/2e141f8c556265a81d95dd551d73e12051dd797f Message: Run benchmark clients with multiple processes Modified files: drnbench.gemspec lib/drnbench/client/http-droonga.rb lib/drnbench/client/http.rb lib/drnbench/request-response/result.rb lib/drnbench/request-response/runner.rb Modified: drnbench.gemspec (+1 -0) =================================================================== --- drnbench.gemspec 2014-10-07 16:21:31 +0900 (0d5fd66) +++ drnbench.gemspec 2014-11-27 16:16:39 +0900 (176bfd7) @@ -51,6 +51,7 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency("json") spec.add_runtime_dependency("droonga-client") spec.add_runtime_dependency("drntest") + spec.add_runtime_dependency("facter") spec.add_development_dependency("bundler") spec.add_development_dependency("rake") Modified: lib/drnbench/client/http-droonga.rb (+2 -2) =================================================================== --- lib/drnbench/client/http-droonga.rb 2014-10-07 16:21:31 +0900 (b6735e0) +++ lib/drnbench/client/http-droonga.rb 2014-11-27 16:16:39 +0900 (091755d) @@ -19,9 +19,9 @@ module Drnbench DEFAULT_COMMAND = "search" DEFAULT_METHOD = "POST" - def initialize(params, config) + def initialize(params) super - @command = params["command"] || DEFAULT_COMMAND + @command = params[:command] || DEFAULT_COMMAND end private Modified: lib/drnbench/client/http.rb (+27 -12) =================================================================== --- lib/drnbench/client/http.rb 2014-10-07 16:21:31 +0900 (5d6b4f0) +++ lib/drnbench/client/http.rb 2014-11-27 16:16:39 +0900 (6407fa6) @@ -16,28 +16,36 @@ require "thread" require "droonga/client" require "json" +require "drb" module Drnbench class HttpClient - attr_reader :requests, :results, :wait + attr_reader :runner, :results, :wait SUPPORTED_HTTP_METHODS = ["GET", "POST"] @@count = 0 - def initialize(params, config) - @requests = params[:requests] - @result = params[:result] - @config = config + def initialize(params) + @runner = params[:runner] + @config = params[:config] @count = 0 @id = @@count @@count += 1 + @thread = nil end def run @thread = Thread.new do + start_time = Time.now loop do - request =****@reque***** + if****@runne*****? + puts "WORNING: requests queue becomes empty! (#{Time.now - start_time} sec)" + stop + break + end + + request =****@runne*****_request request = fixup_request(request) client = Droonga::Client.new(:protocol => :http, @@ -51,21 +59,21 @@ module Drnbench @last_start_time = start_time begin response = client.request(request) - @result << { + @runner.push_result( :request => request, :status => response.code, :elapsed_time => Time.now - start_time, :client => @id, :index => @count, - } + ) rescue Timeout::Error - @result << { + @runner.push_result( :request => request, :status => "0", :elapsed_time => Time.now - start_time, :client => @id, :index => @count, - } + ) end @last_request = nil @last_start_time = nil @@ -77,20 +85,27 @@ module Drnbench end def stop + return unless @thread + @thread.exit + @thread = nil if @last_request - @result << { + @runner.push_result( :request => @last_request, :status => "0", :elapsed_time => Time.now - @last_start_time, :client => @id, :index => @count, :last => true, - } + ) end end + def running? + not****@threa*****? + end + private def fixup_request(request) request["host"] ||=****@confi*****_host Modified: lib/drnbench/request-response/result.rb (+4 -0) =================================================================== --- lib/drnbench/request-response/result.rb 2014-10-07 16:21:31 +0900 (cea99fa) +++ lib/drnbench/request-response/result.rb 2014-11-27 16:16:39 +0900 (6bd15c5) @@ -44,6 +44,10 @@ module Drnbench end def <<(result) + push(result) + end + + def push(result) clear_cached_statistics @results << result Modified: lib/drnbench/request-response/runner.rb (+133 -20) =================================================================== --- lib/drnbench/request-response/runner.rb 2014-10-07 16:21:31 +0900 (e13aa7f) +++ lib/drnbench/request-response/runner.rb 2014-11-27 16:16:39 +0900 (68deb25) @@ -13,6 +13,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. +require "facter" require "drnbench/client/http" require "drnbench/client/http-droonga" require "drnbench/request-response/result" @@ -23,6 +24,10 @@ module Drnbench class Runner attr_reader :n_clients, :result + MESSAGE_EXIT = "exit" + MESSAGE_START = "start" + MESSAGE_COMPLETE = "complete" + def initialize(n_clients, config) n_clients = 1 if n_clients.zero? @n_clients = n_clients @@ -37,50 +42,158 @@ module Drnbench @result end + def pop_request + @requests_queue.pop + end + + def push_result(result) + @result << result + end + + def empty? + @requests_queue.empty? + end + private def process_requests - requests_queue = Queue.new + @requests_queue = Queue.new @requests.each do |request| - requests_queue.push(request) + @requests_queue.push(request) end - @result = Result.new(:n_clients => @n_clients, :duration => @config.duration, :n_slow_requests => @config.n_slow_requests) - client_params = { - :requests => requests_queue, - :result => @result, - } - @clients = @n_clients.times.collect do |index| - client = nil + setup_child_processes + initiate_child_processes + wait_for_given_duration + kill_child_processes + + @result + end + + def setup_child_processes + @child_process_pipes = [] + n_processes.times.each do |index| + setup_child_process + end + end + + def setup_child_process + parent_read, child_write = IO.pipe + child_read, parent_write = IO.pipe + @child_process_pipes << [parent_read, parent_write] + + child_process_requests_queue = Queue.new + n_requests_per_process.times.each do |index| + child_process_requests_queue.push(@requests_queue.pop) + end + + fork do + parent_write.close + parent_read.close + druby_uri = child_read.gets.chomp + @parent = DRbObject.new_with_uri(druby_uri) + + @requests_queue = child_process_requests_queue + @result = [] + + clients = setup_clients(n_clients_per_process) + + loop do + message = child_read.gets + if message and message.chomp == MESSAGE_EXIT + clients.each(&:stop) + @result.each do |result| + @parent.push_result(result) + end + child_write.puts(MESSAGE_COMPLETE) + child_write.close + exit! + end + sleep(3) + end + end + child_read.close + child_write.close + end + + def setup_clients(count) + count.times.collect do |index| case****@confi***** when :http - client = HttpClient.new(client_params, @config) + client = HttpClient.new(:runner => self, + :config => @config) when :http_droonga - client = HttpDroongaClient.new(client_params, @config) + client = HttpDroongaClient.new(:runner => self, + :config => @config) else raise ArgumentError.new("Unknown mode: #{@config.mode}") end client.run client end + end + def initiate_child_processes + DRb.start_service("druby://localhost:0", self) + @child_process_pipes.each do |input, output| + output.puts(DRb.uri) + end + end + + ONE_MINUTE_IN_SECONDS = 60 + ONE_HOUR_IN_SECONDS = ONE_MINUTE_IN_SECONDS * 60 + + def wait_for_given_duration start_time = Time.now - while Time.now - start_time < @config.duration + last_message = "" + loop do sleep 1 - if requests_queue.empty? - puts "WORNING: requests queue becomes empty! (#{Time.now - start_time} sec)" - @result.duration = Time.now - start_time - break - end + elapsed_time = (Time.now - start_time).to_i + break if elapsed_time >=****@confi***** + + remaining_seconds =****@confi***** - elapsed_time + remaining_hours = (remaining_seconds / ONE_HOUR_IN_SECONDS).floor + remaining_seconds -= remaining_hours * ONE_HOUR_IN_SECONDS + remaining_minutes = (remaining_seconds / ONE_MINUTE_IN_SECONDS).floor + remaining_seconds -= remaining_minutes * ONE_MINUTE_IN_SECONDS + remaining_time = sprintf("%02i:%02i:%02i", remaining_hours, remaining_minutes, remaining_seconds) + next_message = "#{remaining_time} remaining..." + printf("%s", "#{" " * last_message.size}\r") + printf("%s", "#{next_message}\r") + last_message = next_message + end + end + + def kill_child_processes + @child_process_pipes.each do |input, output| + output.puts(MESSAGE_EXIT) end - @clients.each do |client| - client.stop + loop do + @child_process_pipes = @child_process_pipes.reject do |input, output| + message = input.gets + message and message.chomp == MESSAGE_COMPLETE + end + break if @child_process_pipes.empty? end + end - @result + def n_processes + [[@n_clients, 1].max, max_n_processes].min + end + + def max_n_processes + Facter["processorcount"].value.to_i + end + + def n_clients_per_process + (@n_clients.to_f / n_processes).round + end + + def n_requests_per_process + (@requests.size.to_f / n_processes).round end end end -------------- next part -------------- HTML����������������������������...下載