Kouhei Sutou
null+****@clear*****
Thu Nov 8 22:15:22 JST 2012
Kouhei Sutou 2012-11-08 22:15:22 +0900 (Thu, 08 Nov 2012) New Revision: 40bd35bd639cf9bde420d1b6256c6cdc47e29a5e https://github.com/groonga/fluent-plugin-groonga/commit/40bd35bd639cf9bde420d1b6256c6cdc47e29a5e Log: in: always require real server Modified files: lib/fluent/plugin/in_groonga.rb test/run-test.rb test/test_input.rb Modified: lib/fluent/plugin/in_groonga.rb (+37 -84) =================================================================== --- lib/fluent/plugin/in_groonga.rb 2012-11-07 22:49:31 +0900 (a3f227f) +++ lib/fluent/plugin/in_groonga.rb 2012-11-08 22:15:22 +0900 (b09c613) @@ -16,10 +16,7 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA require "English" - -require "webrick/config" -require "webrick/httprequest" -require "webrick/httpresponse" +require "webrick/httputils" require "http_parser" @@ -34,18 +31,17 @@ module Fluent config_param :protocol, :string, :default => "http" config_param :bind, :string, :default => "0.0.0.0" config_param :port, :integer, :default => 10041 - config_param :proxy_protocol, :string, :default => nil - config_param :proxy_host, :string, :default => nil - config_param :proxy_port, :integer, :default => 10041 + config_param :real_host, :string + config_param :real_port, :integer, :default => 10041 def configure(conf) super - @proxy_factory = ProxyFactory.new(@proxy_protocol, @proxy_host, @proxy_port) + repeater_factory = RepeaterFactory.new(@real_host, @real_port) case @protocol when "http" - @input = HTTPInput.new(@bind, @port, @proxy_factory) + @input = HTTPInput.new(@bind, @port, repeater_factory) when "gqtp" - @input = GQTPInput.new(@bind, @port, @proxy_factory) + @input = GQTPInput.new(@bind, @port, repeater_factory) else message = "unknown protocol: <#{@protocol.inspect}>" $log.error message @@ -61,30 +57,39 @@ module Fluent @input.shutdown end - class ProxyFactory - def initialize(protocol, host, port) - @protocol = protocol + class RepeaterFactory + def initialize(host, port) @host = host @port = port end def connect(client) - case @protocol - when "http" - HTTPGroongaProxy.connect(@host, @port, client) - else - nil - end + Repeater.connect(@host, @port, client) + end + end + + class Repeater < Coolio::TCPSocket + def initialize(socket, handler) + super(socket) + @handler = handler + end + + def on_read(data) + @handler.write(data) + end + + def on_close + @handler.close end end class HTTPInput include DetachMultiProcessMixin - def initialize(bind, port, proxy_factory) + def initialize(bind, port, repeater_factory) @bind = bind @port = port - @proxy_factory = proxy_factory + @repeater_factory = repeater_factory end def start @@ -92,8 +97,7 @@ module Fluent detach_multi_process do @loop = Coolio::Loop.new - @socket = Coolio::TCPServer.new(listen_socket, nil, - Handler, @loop, @proxy_factory) + @socket = Coolio::TCPServer.new(listen_socket, nil, Handler, self) @loop.attach(@socket) @shutdown_notifier = Coolio::AsyncWatcher.new @@ -112,6 +116,12 @@ module Fluent @thread.join end + def create_repeater(client) + repeater = @repeater_factory.connect(client) + repeater.attach(@loop) + repeater + end + private def run @loop.run @@ -121,45 +131,19 @@ module Fluent end class Handler < Coolio::Socket - class << self - @@response_config = nil - def response_config - @@response_config ||= WEBrick::Config::HTTP.dup.update( - :Logger => $log - ) - end - end - - def initialize(socket, loop, proxy_factory) + def initialize(socket, input) super(socket) - @loop = loop - @proxy_factory = proxy_factory - @completed = false + @input = input end - alias_method :<<, :write - def on_connect @parser = HTTP::Parser.new(self) - @proxy = @proxy_factory.connect(self) - if @proxy - @proxy.attach(@loop) - @response = nil - else - @response = WEBrick::HTTPResponse.new(self.class.response_config) - end + @repeater =****@input*****_repeater(self) end def on_read(data) @parser << data - @proxy.write(data) if @proxy - end - - def on_write_complete - return unless @completed - if @response - close - end + @repeater.write(data) end def on_message_begin @@ -167,13 +151,6 @@ module Fluent end def on_headers_complete(headers) - expect = nil - headers.each do |name, value| - case name.downcase - when "content-type" - @content_type = value - end - end end def on_body(chunk) @@ -187,16 +164,7 @@ module Fluent when /\A\/d\// command = $POSTMATCH process(command, params, @body) - else - if @response - @response.status = "404" - end - end - if @response - @response["connection"] = "close" - @response.send_response(self) end - @completed = true end private @@ -211,20 +179,5 @@ module Fluent end end end - - class HTTPGroongaProxy < Coolio::TCPSocket - def initialize(socket, handler) - super(socket) - @handler = handler - end - - def on_read(data) - @handler.write(data) - end - - def on_close - @handler.close - end - end end end Modified: test/run-test.rb (+2 -0) =================================================================== --- test/run-test.rb 2012-11-07 22:49:31 +0900 (e76e879) +++ test/run-test.rb 2012-11-08 22:15:22 +0900 (0183bbc) @@ -17,6 +17,8 @@ # $VERBOSE = true +Thread.abort_on_exception = true + base_dir = File.expand_path(File.join(File.dirname(__FILE__), "..")) lib_dir = File.join(base_dir, "lib") test_dir = File.join(base_dir, "test") Modified: test/test_input.rb (+42 -1) =================================================================== --- test/test_input.rb 2012-11-07 22:49:31 +0900 (6dabf72) +++ test/test_input.rb 2012-11-08 22:15:22 +0900 (e313155) @@ -18,10 +18,14 @@ require "time" require "cgi/util" require "net/http" +require "webrick/config" +require "webrick/httpresponse" require "fluent/test" require "fluent/plugin/in_groonga" +require "http_parser" + class GroongaInputTest < Test::Unit::TestCase setup def setup_fluent @@ -43,6 +47,41 @@ EOC end class HTTPTest < self + setup :before => :append + def setup_real_server + @real_host = "127.0.0.1" + @real_port = 29292 + @real_server = TCPServer.new(@real_host, @real_port) + @repeater = nil + response_config = WEBrick::Config::HTTP.dup.update(:Logger => $log) + @real_response = WEBrick::HTTPResponse.new(response_config) + Thread.new do + @repeater = @real_server.accept + @real_server.close + parser = HTTP::Parser.new + parser.on_message_complete = lambda do + @real_response.send_response(@repeater) + @repeater.close + end + + loop do + break if****@repea*****? + data =****@repea*****(4096) + break if data.nil? + parser << data + end + end + end + + teardown + def teardown_real_server + @real_server.close unless @real_server.closed? + + if @repeater and not****@repea*****? + @repeater.close + end + end + def setup @host = "127.0.0.1" @port = 2929 @@ -56,6 +95,8 @@ EOC protocol http bind #{@host} port #{@port} + real_host #{@real_host} + real_port #{@real_port} EOC end @@ -63,7 +104,6 @@ EOC @driver.expect_emit("groonga.command.table_create", @now, {"name" => "Users"}) - @driver.run do get("/d/table_create", "name" => "Users") assert_equal("200", @last_response.code) @@ -92,6 +132,7 @@ EOJ def test_not_command @driver.run do + @real_response.status = 404 get("/index.html") assert_equal("404", @last_response.code) end -------------- next part -------------- HTML����������������������������... 下載