[Groonga-commit] droonga/droonga-engine at 004cdcb [master] Report progress by data absorber

Back to archive index

YUKI Hiroshi null+****@clear*****
Fri Nov 21 17:40:44 JST 2014


YUKI Hiroshi	2014-11-21 17:40:44 +0900 (Fri, 21 Nov 2014)

  New Revision: 004cdcb87786436b6f7273f62a9af71c8d42bdc3
  https://github.com/droonga/droonga-engine/commit/004cdcb87786436b6f7273f62a9af71c8d42bdc3

  Message:
    Report progress by data absorber

  Modified files:
    bin/droonga-engine-absorb-data
    lib/droonga/data_absorber.rb

  Modified: bin/droonga-engine-absorb-data (+1 -0)
===================================================================
--- bin/droonga-engine-absorb-data    2014-11-21 17:01:06 +0900 (11c6a23)
+++ bin/droonga-engine-absorb-data    2014-11-21 17:40:44 +0900 (59b7941)
@@ -24,6 +24,7 @@ require "droonga/catalog_generator"
 require "droonga/path"
 require "droonga/data_absorber"
 require "droonga/serf"
+require "droonga/client"
 
 options = OpenStruct.new
 options.port    = Droonga::CatalogGenerator::DEFAULT_PORT

  Modified: lib/droonga/data_absorber.rb (+113 -21)
===================================================================
--- lib/droonga/data_absorber.rb    2014-11-21 17:01:06 +0900 (0b5a4fc)
+++ lib/droonga/data_absorber.rb    2014-11-21 17:40:44 +0900 (d01063a)
@@ -15,31 +15,53 @@
 
 require "open3"
 
+require "droonga/loggable"
+require "droonga/client"
+
 module Droonga
   class DataAbsorber
+    include Loggable
+
     DEFAULT_MESSAGES_PER_SECOND = 100
 
+    TIME_UNKNOWN = -1
+    PROGRESS_UNKNOWN = -1
+
     class << self
       def absorb(params)
-        drndump = params[:drndump] || "drndump"
+        new(params).absorb
+      end
+    end
+
+    attr_reader :params
+    def initialize(params)
+      @params = params
+
+      @params[:messages_per_second] ||= DEFAULT_MESSAGES_PER_SECOND
+      @params[:drndump] ||= "drndump"
+      # We should use droonga-send instead of droonga-request,
+      # because droonga-request is too slow.
+      @params[:client] ||= "droonga-send"
+    end
+
+      def absorb
+        drndump = @params[:drndump]
         drndump_options = []
-        drndump_options += ["--host", params[:source_host]] if params[:source_host]
-        drndump_options += ["--port", params[:port].to_s] if params[:port]
-        drndump_options += ["--tag", params[:tag]] if params[:tag]
-        drndump_options += ["--dataset", params[:dataset]] if params[:dataset]
-        drndump_options += ["--receiver-host", params[:destination_host]]
-        drndump_options += ["--receiver-port", params[:receiver_port].to_s] if params[:receiver_port]
-
-        #TODO: We should use droonga-send instead of droonga-request,
-        #      because droonga-request is too slow.
-        client = params[:client] || "droonga-send"
+        drndump_options += ["--host", @params[:source_host]] if @params[:source_host]
+        drndump_options += ["--port", @params[:port].to_s] if @params[:port]
+        drndump_options += ["--tag", @params[:tag]] if @params[:tag]
+        drndump_options += ["--dataset", @params[:dataset]] if @params[:dataset]
+        drndump_options += ["--receiver-host", @params[:destination_host]]
+        drndump_options += ["--receiver-port", @params[:receiver_port].to_s] if @params[:receiver_port]
+
+        client = @params[:client]
         client_options = []
         if client.include?("droonga-request")
-          client_options += ["--host", params[:destination_host]]
-          client_options += ["--port", params[:port].to_s] if params[:port]
-          client_options += ["--tag", params[:tag]] if params[:tag]
-          client_options += ["--receiver-host", params[:destination_host]]
-          client_options += ["--receiver-port", params[:receiver_port].to_s] if params[:receiver_port]
+          client_options += ["--host", @params[:destination_host]]
+          client_options += ["--port", @params[:port].to_s] if @params[:port]
+          client_options += ["--tag", @params[:tag]] if @params[:tag]
+          client_options += ["--receiver-host", @params[:destination_host]]
+          client_options += ["--receiver-port", @params[:receiver_port].to_s] if @params[:receiver_port]
         elsif client.include?("droonga-send")
           #XXX Don't use round-robin with multiple endpoints
           #    even if there are too much data.
@@ -49,12 +71,11 @@ module Droonga
           #    So, we always use just one endpoint for now,
           #    even if there are too much data.
           server = "droonga:#{params[:destination_host]}"
-          server = "#{server}:#{params[:port].to_s}" if params[:port]
-          server = "#{server}/#{params[:tag].to_s}" if params[:tag]
+          server = "#{server}:#{params[:port].to_s}" if @params[:port]
+          server = "#{server}/#{params[:tag].to_s}" if @params[:tag]
           client_options += ["--server", server]
           #XXX We should restrict the traffic to avoid overflowing!
-          params[:messages_per_second] ||= DEFAULT_MESSAGES_PER_SECOND
-          client_options += ["--messages-per-second", params[:messages_per_second]]
+          client_options += ["--messages-per-second", @params[:messages_per_second]]
         else
           raise ArgumentError.new("Unknwon type client: #{client}")
         end
@@ -62,14 +83,85 @@ module Droonga
         drndump_command_line = [drndump] + drndump_options
         client_command_line = [client] + client_options
 
+        calculated_required_time = required_time_in_seconds
+        unless calculated_required_time == TIME_UNKNOWN
+          logger.info("calculated required time: #{calculated_required_time}sec")
+          if block_given?
+            yield(:required_time_in_seconds => calculated_required_time)
+          end
+
+        start = Time.new.to_i
         env = {}
         Open3.pipeline_r([env, *drndump_command_line],
                          [env, *client_command_line]) do |last_stdout, thread|
           last_stdout.each do |output|
-            yield output if block_given?
+            progress = nil
+            if calculated_required_time == TIME_UNKNOWN or
+               calculated_required_time <= 0
+              progress = PROGRESS_UNKNOWN
+            else
+              progress = (Time.new.to_i - start) / calculated_required_time
+            end
+            yield(:progress => progress,
+                  :output   => output) 
           end
         end
       end
+
+    def required_time_in_seconds
+      @params[:client].include?("droonga-send")
+        total_n_source_records / @params[:messages_per_second]
+      else
+        TIME_UNKNOWN
+      end
+    end
+
+    def source_client
+      options = {
+        :host          => @params[:source_host],
+        :port          => @params[:port],
+        :tag           => @params[:tag],
+        :progocol      => :droonga,
+        :receiver_host => @params[:destination_host],
+        :receiver_port => 0,
+      }
+      @source_client ||= Droonga::Client.new(options)
+    end
+
+    def source_tables
+      response = source_client.request("dataset" => @params[:dataset],
+                                       "type"    => "table_list")
+      body = response["body"][1]
+      tables = body[1..-1]
+      tables.collect do |table|
+        table[1]
+      end
+    end
+
+    def total_n_source_records
+      queries = {}
+      source_tables.each do |table|
+        queries["n_records_of_#{table}"] = {
+          "source" => table,
+          "output" => {
+            "elements" => ["count"],
+          },
+        }
+      end
+      response = source_client.request("dataset" => @params[:dataset],
+                                       "type"    => "search",
+                                       "body"    => {
+                                         "queries" => queries,
+                                       })
+      n_records = 0
+      response["body"].each do |query_name, result|
+        n_records += result["count"]
+      end
+      n_records
+    end
+
+    def log_tag
+      "data-absorber"
     end
   end
 end
-------------- next part --------------
HTML����������������������������...
下載 



More information about the Groonga-commit mailing list
Back to archive index