YUKI Hiroshi
null+****@clear*****
Tue Jul 1 17:11:13 JST 2014
YUKI Hiroshi 2014-07-01 17:11:13 +0900 (Tue, 01 Jul 2014) New Revision: 8d32be896cc7afcfa716b12636964c24462e5b5c https://github.com/droonga/droonga-engine/commit/8d32be896cc7afcfa716b12636964c24462e5b5c Message: Fetch catalog.json from the source host Modified files: bin/droonga-engine-join lib/droonga/command/serf_event_handler.rb Modified: bin/droonga-engine-join (+2 -1) =================================================================== --- bin/droonga-engine-join 2014-06-29 08:43:42 +0900 (2792f74) +++ bin/droonga-engine-join 2014-07-01 17:11:13 +0900 (d247140) @@ -102,12 +102,13 @@ options.tag = dataset.replicas.tag options.port = dataset.replicas.port options.joining_node = "#{options.joining_host}:#{options.port}/#{options.tag}" +options.source_node = "#{options.replica_source_host}:#{options.port}/#{options.tag}" puts "Joining new replica to the cluster..." Droonga::Serf.send_query(options.joining_node, "join", "node" => options.joining_node, "type" => "replica", - "source" => options.replica_source_host, + "source" => options.source_node, "copy" => options.copy) puts "Done." Modified: lib/droonga/command/serf_event_handler.rb (+52 -11) =================================================================== --- lib/droonga/command/serf_event_handler.rb 2014-06-29 08:43:42 +0900 (bdef561) +++ lib/droonga/command/serf_event_handler.rb 2014-07-01 17:11:13 +0900 (717790a) @@ -82,6 +82,10 @@ module Droonga remove_replicas when "absorb_data" absorb_data + when "publish_catalog" + publish_catalog + when "unpublish_catalog" + unpublish_catalog end end @@ -106,16 +110,22 @@ module Droonga end def join_as_replica - source = @payload["source"] - return unless source + source_node = @payload["source"] + return unless source_node - puts "source = #{source}" + puts "source_node = #{source_node}" - generator = create_current_catalog_generator - dataset = generator.dataset_for_host(source) || + source_host = source_node.split(":").first + + catalog = fetch_catalog(source_node) + generator = create_current_catalog_generator(catalog) + dataset = generator.dataset_for_host(source_host) || generator.dataset_for_host(host) return unless dataset + # restart self with the fetched catalog. + SafeFileWriter.write(Path.catalog, JSON.pretty_generate(catalog)) + dataset_name = dataset.name tag = dataset.replicas.tag port = dataset.replicas.port @@ -126,7 +136,7 @@ module Droonga puts "tag = #{tag}" if @payload["copy"] - puts "starting to copy data from #{source}" + puts "starting to copy data from #{source_host}" modify_catalog do |modifier| modifier.datasets[dataset_name].replicas.hosts = [host] @@ -134,7 +144,7 @@ module Droonga sleep(1) # wait for restart DataAbsorber.absorb(:dataset => dataset_name, - :source_host => source, + :source_host => source_host, :destination_host => host, :port => port, :tag => tag) @@ -157,6 +167,39 @@ module Droonga "hosts" => [host]) end + def fetch_catalog(source_node) + source_host = source_node.split(":").first + port = 10032 + rand(10000) + + Serf.send_query(source_node, "publish_catalog", + "node" => source_node, + "port" => port) + + url = "http://#{source_host}:#{port}/" + connection = Faraday.new(url) do |builder| + builder.response(:follow_redirects) + builder.adapter(Faraday.default_adapter) + end + response = connection.get + catalog = response.body + + Serf.send_query(source_node, "unpublish_catalog", + "node" => source_node) + + JSON.parse(catalog) + end + + def publish_catalog + port = @payload["port"] + return unless port + + # TODO: implement me! + end + + def unpublish_catalog + # TODO: implement me! + end + def set_replicas dataset = @payload["dataset"] return unless dataset @@ -209,15 +252,13 @@ module Droonga SafeFileWriter.write(Path.catalog, JSON.pretty_generate(generator.generate)) end - def create_current_catalog_generator - current_catalog = JSON.parse(Path.catalog.read) + def create_current_catalog_generator(current_catalog=nil) + current_catalog ||= JSON.parse(Path.catalog.read) generator = CatalogGenerator.new generator.load(current_catalog) end def absorb_data - return unless event_for_me? - source = @payload["source"] return unless source -------------- next part -------------- HTML����������������������������... 下載