• R/O
  • HTTP
  • SSH
  • HTTPS

提交

標籤
無標籤

Frequently used words (click to add to your profile)

javac++androidlinuxc#windowsobjective-ccocoa誰得qtpythonphprubygameguibathyscaphec計画中(planning stage)翻訳omegatframeworktwitterdomtestvb.netdirectxゲームエンジンbtronarduinopreviewer

UltraMonkey-L7 V3(multi-thread implementation)


Commit MetaInfo

修訂6ea7bd6f07f1a87e64a426aa9fdc2a585a6bb7c3 (tree)
時間2012-07-31 11:40:39
作者Hiroaki Nakano <nakano.hiroaki@nttc...>
CommiterHiroaki Nakano

Log Message

Merge branch 'ssl_fix_by_CW' into v3.0.4-devel

Change Summary

差異

--- a/l7vsd/include/tcp_session.h
+++ b/l7vsd/include/tcp_session.h
@@ -129,7 +129,7 @@ public:
129129 void down_thread_run();
130130 //! realserver remove
131131 //! @param[in] target endpoint
132- void realserver_remove(boost::asio::ip::tcp::endpoint &);
132+ void realserver_remove(const boost::asio::ip::tcp::endpoint &);
133133 protected:
134134 typedef data_buff_base<boost::asio::ip::tcp> tcp_data;
135135 typedef boost::asio::ip::tcp::endpoint endpoint;
@@ -536,6 +536,7 @@ protected:
536536 virtual void down_thread_client_handle_async_write_some(const TCP_PROCESS_TYPE_TAG);
537537 virtual void down_thread_sorryserver_async_read_some_handler(const boost::system::error_code &error_code, std::size_t len);
538538 virtual void down_thread_sorryserver_handle_async_read_some(const TCP_PROCESS_TYPE_TAG);
539+ virtual void up_thread_client_ssl_socket_clear_socket_handler();
539540
540541 //! down thread receive from realserver and raise module event of handle_realserver_recv
541542 //! @param[in] process_type is process type
--- a/l7vsd/include/tcp_ssl_socket.h
+++ b/l7vsd/include/tcp_ssl_socket.h
@@ -131,7 +131,7 @@ public:
131131
132132 virtual std::size_t read_some(const boost::asio::mutable_buffers_1 &buffers, boost::system::error_code &error_code) {
133133 boost::mutex::scoped_lock lock(ssl_mutex);
134- if (write_con > 0) {
134+ if (write_con > 0 || handshake_con > 0) {
135135 error_code = boost::asio::error::try_again;
136136 return 0;
137137 }
@@ -150,7 +150,7 @@ public:
150150 virtual size_t write_some(const boost::asio::const_buffers_1 &buffer, boost::system::error_code &error_code) {
151151 boost::mutex::scoped_lock lock(ssl_mutex);
152152
153- if (read_con > 0) {
153+ if (read_con > 0 || handshake_con > 0) {
154154 error_code = boost::asio::error::try_again;
155155 return 0;
156156 }
--- a/l7vsd/include/virtualservice.h
+++ b/l7vsd/include/virtualservice.h
@@ -268,6 +268,10 @@ protected:
268268 bool downqos_alert_flag; //! downstream QoS alert flag
269269 bool sessionpool_alert_flag; //! sessionpool alert flag
270270
271+ bool adm_cmd_wait_flag; //! wait for l7vsadm done
272+ boost::mutex adm_cmd_wait_flag_mutex;
273+ boost::condition adm_cmd_wait_flag_cond;
274+
271275 void load_parameter(l7vs::error_code &);
272276
273277 virtual void handle_replication_interrupt(
--- a/l7vsd/src/tcp_session.cpp
+++ b/l7vsd/src/tcp_session.cpp
@@ -595,7 +595,9 @@ void tcp_session::set_virtual_service_message(const TCP_VIRTUAL_SERVICE_MESSAGE_
595595 fmt % boost::this_thread::get_id();
596596 Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, fmt.str(), __FILE__, __LINE__);
597597 }
598- break;
598+ //break;
599+ realserver_remove(endpoint_);
600+ return;
599601 case SORRY_STATE_ENABLE:
600602 //----Debug log----------------------------------------------------------------------
601603 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
@@ -895,7 +897,19 @@ void tcp_session::up_thread_run()
895897
896898 if (ssl_flag) {
897899 client_ssl_socket.wait_async_event_all_end();
898- client_ssl_socket.clear_socket();
900+ upthread_status = UPTHREAD_LOCK;
901+ parent_dispatcher.post(boost::bind(&tcp_session::up_thread_client_ssl_socket_clear_socket_handler,this));
902+ boost::mutex::scoped_lock lock(upthread_status_mutex);
903+ while (unlikely(upthread_status == UPTHREAD_LOCK)) {
904+ to_time(LOCKTIMEOUT, xt);
905+ upthread_status_cond.timed_wait(lock, xt);
906+ tcp_thread_message *msg = up_thread_message_que.pop();
907+ if (msg) { // message is alive.
908+ up_thread_next_call_function.second(LOCAL_PROC);
909+ delete msg;
910+ msg = NULL;
911+ }
912+ } // lockmode while loop end.
899913 }
900914
901915 upthread_status = UPTHREAD_SLEEP;
@@ -1191,7 +1205,7 @@ void tcp_session::up_thread_client_accept_fail_event(const TCP_PROCESS_TYPE_TAG
11911205
11921206 boost::format fmt("Thread ID[%d] tcp_ssl_socket::handshake[%s]");
11931207 fmt % boost::this_thread::get_id() % handshake_error_code.message();
1194- Logger::putLogInfo(LOG_CAT_L7VSD_SESSION, 71, fmt.str(), __FILE__, __LINE__);
1208+ Logger::putLogError(LOG_CAT_L7VSD_SESSION, 71, fmt.str(), __FILE__, __LINE__);
11951209
11961210 }
11971211 up_thread_next_call_function = up_thread_function_array[func_tag];
@@ -1199,22 +1213,40 @@ void tcp_session::up_thread_client_accept_fail_event(const TCP_PROCESS_TYPE_TAG
11991213
12001214 //! real server remove
12011215 //! @param[in] target endpoint
1202-void tcp_session::realserver_remove(endpoint &target_endpoint)
1216+void tcp_session::realserver_remove(const endpoint &target_endpoint)
12031217 {
1218+
1219+ if (target_endpoint != realserver_endpoint && target_endpoint != connecting_endpoint) return;
1220+
12041221 tcp_thread_message *up_msg = new tcp_thread_message;
1205- up_thread_function_pair up_func = up_thread_function_array[UP_FUNC_REALSERVER_CHECK];
1222+ up_thread_function_pair up_func = up_thread_function_array[UP_FUNC_EXIT];
12061223 up_msg->message = up_func.second;
12071224 up_msg->endpoint_info = target_endpoint;
12081225 #ifdef DEBUG
1209- up_msg->func_tag_name = func_tag_to_string(UP_FUNC_REALSERVER_CHECK);
1226+ up_msg->func_tag_name = func_tag_to_string(UP_FUNC_EXIT);
12101227 {
12111228 boost::format fmt("Thread ID[%d] up_queue.push : %s");
1212- fmt % boost::this_thread::get_id() % func_tag_to_string(UP_FUNC_REALSERVER_CHECK);
1229+ fmt % boost::this_thread::get_id() % func_tag_to_string(UP_FUNC_EXIT);
12131230 Logger::putLogInfo(LOG_CAT_L7VSD_SESSION, 999, fmt.str(), __FILE__, __LINE__);
12141231 }
12151232 #endif
12161233 while (!up_thread_message_que.push(up_msg)) {}
12171234 upthread_status_cond.notify_one();
1235+
1236+ tcp_thread_message *down_msg = new tcp_thread_message;
1237+ down_thread_function_pair down_func = down_thread_function_array[DOWN_FUNC_EXIT];
1238+ down_msg->message = down_func.second;
1239+ down_msg->endpoint_info = target_endpoint;
1240+#ifdef DEBUG
1241+ down_msg->func_tag_name = func_tag_to_string(DOWN_FUNC_EXIT);
1242+ {
1243+ boost::format fmt("Thread ID[%d] down_queue.push : %s");
1244+ fmt % boost::this_thread::get_id() % func_tag_to_string(DOWN_FUNC_EXIT);
1245+ Logger::putLogInfo(LOG_CAT_L7VSD_SESSION, 999, fmt.str(), __FILE__, __LINE__);
1246+ }
1247+#endif
1248+ while (!down_thread_message_que.push(down_msg)) {}
1249+ downthread_status_cond.notify_one();
12181250 }
12191251
12201252
@@ -3874,7 +3906,7 @@ void tcp_session::down_thread_realserver_handle_async_read_some(const tcp_sessio
38743906 realserver_socket->async_read_some(boost::asio::buffer(down_thread_data_dest_side.get_data()), handler);
38753907 else
38763908 func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
3877- realserver_socket_mutex.lock();
3909+ realserver_socket_mutex.unlock();
38783910 } else { //recv error
38793911 func_tag = DOWN_FUNC_REALSERVER_DISCONNECT;
38803912 boost::format fmt("Thread ID[%d] down_thread_realserver_handle_async_read_some recv error:%s");
@@ -4017,6 +4049,30 @@ void tcp_session::down_thread_sorryserver_async_read_some_handler(const boost::s
40174049 downthread_status_cond.notify_one();
40184050 }
40194051
4052+void tcp_session::up_thread_client_ssl_socket_clear_socket_handler()
4053+{
4054+ if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_SESSION))) {
4055+ boost::format formatter("Thread ID[%d] FUNC IN up_thread_client_ssl_socket_clear_socket_handler");
4056+ formatter % boost::this_thread::get_id();
4057+ Logger::putLogDebug(LOG_CAT_L7VSD_SESSION, 999, formatter.str(), __FILE__, __LINE__);
4058+ }
4059+
4060+ client_ssl_socket.clear_socket();
4061+
4062+ tcp_thread_message *mes = new tcp_thread_message();
4063+ mes->message = up_que_function_map[UP_FUNC_PAUSE_OFF_EVENT];
4064+#ifdef DEBUG
4065+ mes->func_tag_name = func_tag_to_string(UP_FUNC_PAUSE_OFF_EVENT);
4066+ {
4067+ boost::format fmt("Thread ID[%d] up_queue.push : %s");
4068+ fmt % boost::this_thread::get_id() % func_tag_to_string(UP_FUNC_PAUSE_OFF_EVENT);
4069+ Logger::putLogInfo(LOG_CAT_L7VSD_SESSION, 999, fmt.str(), __FILE__, __LINE__);
4070+ }
4071+#endif
4072+ while (!up_thread_message_que.push(mes)) {}
4073+ upthread_status_cond.notify_one();
4074+}
4075+
40204076 void tcp_session::down_thread_sorryserver_handle_async_read_some(tcp_session::TCP_PROCESS_TYPE_TAG)
40214077 {
40224078
--- a/l7vsd/src/virtualservice_base.cpp
+++ b/l7vsd/src/virtualservice_base.cpp
@@ -69,6 +69,7 @@ l7vs::virtualservice_base::virtualservice_base(const l7vs::l7vsd &invsd,
6969 upqos_alert_flag = false;
7070 downqos_alert_flag = false;
7171 sessionpool_alert_flag = false;
72+ adm_cmd_wait_flag = false;
7273
7374 rs_list.clear();
7475 protomod = NULL;
--- a/l7vsd/src/virtualservice_tcp.cpp
+++ b/l7vsd/src/virtualservice_tcp.cpp
@@ -352,6 +352,13 @@ void l7vs::virtualservice_tcp::handle_accept(const l7vs::session_thread_control
352352 return;
353353 }
354354
355+ {
356+ boost::mutex::scoped_lock lock(adm_cmd_wait_flag_mutex);
357+ if (unlikely(adm_cmd_wait_flag)){
358+ adm_cmd_wait_flag_cond.wait(lock);
359+ }
360+ }
361+
355362 session_thread_control *stc_ptr_noconst = const_cast<session_thread_control *>(stc_ptr);
356363
357364 if (unlikely(err == boost::asio::error::operation_aborted)) { // nomal exit case
@@ -1290,9 +1297,6 @@ void l7vs::virtualservice_tcp::add_realserver(const l7vs::virtualservice_element
12901297 }
12911298 }
12921299
1293- //pause active sessions
1294- active_sessions.do_all(boost::bind(&session_thread_control::session_pause_on, _1));
1295-
12961300 //add realserver
12971301 for (std::vector<realserver_element>::iterator itr = in_element.realserver_vector.begin();
12981302 itr != in_element.realserver_vector.end();
@@ -1305,9 +1309,6 @@ void l7vs::virtualservice_tcp::add_realserver(const l7vs::virtualservice_element
13051309 rs_list.push_back(rs);
13061310 }
13071311
1308- //run active sessions
1309- active_sessions.do_all(boost::bind(&session_thread_control::session_pause_off, _1));
1310-
13111312 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_VIRTUALSERVICE))) {
13121313 boost::format formatter("out_function: void virtualservice_tcp::add_realserver( "
13131314 "const l7vs::virtualservice_element& in,"
@@ -1384,8 +1385,11 @@ void l7vs::virtualservice_tcp::edit_realserver(const l7vs::virtualservice_elemen
13841385 }
13851386 }
13861387
1387- //pause active sessions
1388- active_sessions.do_all(boost::bind(&session_thread_control::session_pause_on, _1));
1388+ //lock adm_cmd_wait_flag on
1389+ adm_cmd_wait_flag_mutex.lock();
1390+ adm_cmd_wait_flag = true;
1391+ adm_cmd_wait_flag_cond.notify_one();
1392+ adm_cmd_wait_flag_mutex.unlock();
13891393
13901394 //edit realserver
13911395 for (std::vector<realserver_element>::iterator itr = in_element.realserver_vector.begin();
@@ -1406,8 +1410,11 @@ void l7vs::virtualservice_tcp::edit_realserver(const l7vs::virtualservice_elemen
14061410 }
14071411 }
14081412
1409- //run active sessions
1410- active_sessions.do_all(boost::bind(&session_thread_control::session_pause_off, _1));
1413+ //lock adm_cmd_wait_flag off
1414+ adm_cmd_wait_flag_mutex.lock();
1415+ adm_cmd_wait_flag = false;
1416+ adm_cmd_wait_flag_cond.notify_one();
1417+ adm_cmd_wait_flag_mutex.unlock();
14111418
14121419 err.setter(false, "");
14131420 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_VIRTUALSERVICE))) {
@@ -1486,8 +1493,11 @@ void l7vs::virtualservice_tcp::del_realserver(const l7vs::virtualservice_element
14861493 }
14871494 }
14881495
1489- //pause active sessions
1490- active_sessions.do_all(boost::bind(&session_thread_control::session_pause_on, _1));
1496+ //lock adm_cmd_wait_flag on
1497+ adm_cmd_wait_flag_mutex.lock();
1498+ adm_cmd_wait_flag = true;
1499+ adm_cmd_wait_flag_cond.notify_one();
1500+ adm_cmd_wait_flag_mutex.unlock();
14911501
14921502 //del realserver
14931503 for (std::vector<realserver_element>::iterator itr = in_element.realserver_vector.begin();
@@ -1503,8 +1513,11 @@ void l7vs::virtualservice_tcp::del_realserver(const l7vs::virtualservice_element
15031513 }
15041514 }
15051515
1506- //run active sessions
1507- active_sessions.do_all(boost::bind(&session_thread_control::session_pause_off, _1));
1516+ //lock adm_cmd_wait_flag off
1517+ adm_cmd_wait_flag_mutex.lock();
1518+ adm_cmd_wait_flag = false;
1519+ adm_cmd_wait_flag_cond.notify_one();
1520+ adm_cmd_wait_flag_mutex.unlock();
15081521
15091522 if (unlikely(LOG_LV_DEBUG == Logger::getLogLevel(LOG_CAT_L7VSD_VIRTUALSERVICE))) {
15101523 boost::format formatter("out_function: void virtualservice_tcp::del_realserver( "