[Groonga-commit] droonga/droonga-engine at da739cc [master] Implement droonga-engine-join with a class

Back to archive index

YUKI Hiroshi null+****@clear*****
Wed Nov 26 15:40:38 JST 2014


YUKI Hiroshi	2014-11-26 15:40:38 +0900 (Wed, 26 Nov 2014)

  New Revision: da739cce2eba57bb3c5b3e9941e51355959d8120
  https://github.com/droonga/droonga-engine/commit/da739cce2eba57bb3c5b3e9941e51355959d8120

  Message:
    Implement droonga-engine-join with a class

  Modified files:
    bin/droonga-engine-join

  Modified: bin/droonga-engine-join (+79 -46)
===================================================================
--- bin/droonga-engine-join    2014-11-21 22:03:14 +0900 (17c9cc2)
+++ bin/droonga-engine-join    2014-11-26 15:40:38 +0900 (29c5360)
@@ -26,37 +26,57 @@ require "droonga/safe_file_writer"
 require "droonga/data_absorber"
 require "droonga/serf"
 
-options = nil
-begin
-  options = Slop.parse(:help => true) do |option|
-    option.on("no-copy", "Don't copy data from the source cluster.",
-              :default => false)
-
-    option.separator("Connections:")
-    option.on(:host=,
-              "Host name of the new node to be joined.",
-              :required => true)
-    option.on("replica-source-host=",
-              "Host name of the soruce node in the cluster to be connected.",
-              :required => true)
-    option.on(:dataset=,
-              "Dataset name of for the node to be joined.",
-              :default => Droonga::CatalogGenerator::DEFAULT_DATASET)
-    option.on(:port=,
-              "Port number of the source cluster to be connected.",
-              :as => Integer,
-              :default => Droonga::CatalogGenerator::DEFAULT_PORT)
-    option.on(:tag=,
-              "Tag name of the soruce cluster to be connected.",
-              :default => Droonga::CatalogGenerator::DEFAULT_TAG)
+class JoinCommand
+  def run
+    parse_options
+    set_node_role
+    do_join
+    sleep(5) #TODO: wait for restarting of the joining node. this should be done more safely.
+    do_copy unless options["no-copy"]
+    set_effective_message_timestamp
+    update_other_nodes
+    reset_node_role
+    puts("Done.")
+    exit(true)
+  end
+
+  private
+  def parse_options
+    options = Slop.parse(:help => true) do |option|
+      option.on("no-copy", "Don't copy data from the source cluster.",
+                :default => false)
+
+      option.separator("Connections:")
+      option.on(:host=,
+                "Host name of the new node to be joined.",
+                :required => true)
+      option.on("replica-source-host=",
+                "Host name of the soruce node in the cluster to be connected.",
+                :required => true)
+      option.on(:dataset=,
+                "Dataset name of for the node to be joined.",
+                :default => Droonga::CatalogGenerator::DEFAULT_DATASET)
+      option.on(:port=,
+                "Port number of the source cluster to be connected.",
+                :as => Integer,
+                :default => Droonga::CatalogGenerator::DEFAULT_PORT)
+      option.on(:tag=,
+                "Tag name of the soruce cluster to be connected.",
+                :default => Droonga::CatalogGenerator::DEFAULT_TAG)
+    end
+    @options = options
+  rescue Slop::MissingOptionError => error
+    $stderr.puts(error)
+    exit(false)
   end
-rescue Slop::MissingOptionError => e
-  $stderr.puts(e)
-  exit(false)
-end
 
-joining_node = "#{options[:host]}:#{options[:port]}/#{options[:tag]}"
-source_node  = "#{options["replica-source-host"]}:#{options[:port]}/#{options[:tag]}"
+  def joining_node
+    "#{@options[:host]}:#{@options[:port]}/#{@options[:tag]}"
+  end
+
+  def source_node
+    "#{@options["replica-source-host"]}:#{@options[:port]}/#{@options[:tag]}"
+  end
 
 def run_remote_command(target, command, options)
   serf = Droonga::Serf.new(nil, target)
@@ -66,6 +86,11 @@ def run_remote_command(target, command, options)
   result[:response]
 end
 
+  def absorber
+    @absorber ||= prepare_absorber
+  end
+
+  def prepare_absorber
 absorber_options = {
   :dataset          => options[:dataset],
   :source_host      => options["replica-source-host"],
@@ -74,7 +99,9 @@ absorber_options = {
   :tag              => options[:tag],
 }
 absorber = Droonga::DataAbsorber.new(absorber_options)
+  end
 
+  def set_node_role
 if absorber.source_node_suspendable?
   run_remote_command(source_node, "change_role",
                      "node" => source_node,
@@ -83,8 +110,20 @@ end
 run_remote_command(joining_node, "change_role",
                    "node" => joining_node,
                    "role" => "destination")
+  end
+
+  def reset_node_role
+    if absorber.source_node_suspendable?
+      run_remote_command(source_node, "change_role",
+                         "node" => source_node,
+                         "role" => "")
+    end
+    run_remote_command(joining_node, "change_role",
+                       "node" => joining_node,
+                       "role" => "")
+  end
 
-start_time_in_seconds = Time.new.to_i
+  def do_join
 puts("Joining new replica to the cluster...")
 run_remote_command(joining_node, "join",
                    "node"    => joining_node,
@@ -92,9 +131,10 @@ run_remote_command(joining_node, "join",
                    "source"  => source_node,
                    "dataset" => options[:dataset],
                    "copy"    => !options["no-copy"])
-sleep(5) #TODO: wait for restarting of the joining node. this should be done more safely.
+  end
 
-unless options["no-copy"]
+  def do_copy
+  @start_time_in_seconds = Time.new.to_i
   puts("Copying data from the source node...")
   last_progress = ""
   while true
@@ -107,7 +147,7 @@ unless options["no-copy"]
       break unless absorbing
     end
 
-    progress = absorber.report_progress(start_time_in_seconds)
+    progress = absorber.report_progress(@start_time_in_seconds)
     if progress
       printf("%s", "#{" " * last_progress.size}\r")
       printf("%s", "#{progress}\r")
@@ -115,8 +155,9 @@ unless options["no-copy"]
     end
   end
   puts ""
-end
+  end
 
+  def set_effective_message_timestamp
 response = run_remote_command(source_node, "report_status",
                               "node" => source_node,
                               "key" => "last_processed_message_timestamp")
@@ -129,22 +170,14 @@ if timestamp and not timestamp.empty?
                                 "key" => "effective_message_timestamp",
                                 "value" => timestamp)
 end
+  end
 
+  def update_other_nodes
 puts("Update existing hosts in the cluster...")
 run_remote_command(source_node, "add_replicas",
                    "dataset" => options[:dataset],
                    "hosts"   => [options[:host]])
-
-if absorber.source_node_suspendable?
-  run_remote_command(source_node, "change_role",
-                     "node" => source_node,
-                     "role" => "")
+  end
 end
-run_remote_command(joining_node, "change_role",
-                   "node" => joining_node,
-                   "role" => "")
-
-
-puts("Done.")
 
-exit(true)
+JoinCommand.new.run
-------------- next part --------------
HTML����������������������������...
下載 



More information about the Groonga-commit mailing list
Back to archive index