• R/O
  • SSH
  • HTTPS

okuyama: 提交


Commit MetaInfo

修訂1019 (tree)
時間2013-04-22 20:15:10
作者okuyamaoo

Log Message

Recover process update

Change Summary

差異

--- trunk/src/okuyama/imdst/helper/AbstractMasterManagerHelper.java (revision 1018)
+++ trunk/src/okuyama/imdst/helper/AbstractMasterManagerHelper.java (revision 1019)
@@ -1191,6 +1191,246 @@
11911191
11921192
11931193 /**
1194+ * ノードに現在の生存状態を確認する.<br>
1195+ *
1196+ * @param nodeName ノード名
1197+ * @param port ポート番号
1198+ * @param logger ロガー
1199+ * @return String[] 結果 配列の1番目:"true" or "false", 配列の2番目:1番目が"true"の場合ステータス文字列
1200+ */
1201+ protected String[] getNodeArrivalStatus(String nodeName, int port, ILogger logger) {
1202+ String[] retStrs = new String[2];
1203+ retStrs[0] = "true";
1204+ String connectionFullName = nodeName + ":" + port;
1205+
1206+ KeyNodeConnector keyNodeConnector = null;
1207+
1208+ String[] retParams = null;
1209+ boolean cacheConnectUse = false;
1210+
1211+ retStrs = null;
1212+
1213+ keyNodeConnector = null;
1214+ retParams = null;
1215+
1216+ try {
1217+
1218+ // キャッシュが存在する場合はそこから取得
1219+ if (keyNodeConnectPool.containsKey(connectionFullName)) {
1220+ if((keyNodeConnector = (KeyNodeConnector)((ArrayBlockingQueue)keyNodeConnectPool.get(connectionFullName)).poll()) != null) {
1221+ if (!checkConnectionEffective(connectionFullName, keyNodeConnector.getConnetTime())) {
1222+ keyNodeConnector = null;
1223+ }
1224+ }
1225+ }
1226+
1227+ // コネクションがなければ自身で接続
1228+ if (keyNodeConnector == null) {
1229+
1230+ // 接続
1231+ keyNodeConnector = new KeyNodeConnector(nodeName, port, connectionFullName);
1232+ keyNodeConnector.connect(ImdstDefine.nodeConnectionOpenPingTimeout * 5);
1233+ }
1234+
1235+ // タイムアウト設定
1236+ keyNodeConnector.setSoTimeout(ImdstDefine.nodeConnectionPingTimeout * 5);
1237+
1238+ // Key値でデータノード名を保存
1239+ StringBuilder buf = new StringBuilder(25);
1240+ // パラメータ作成 処理タイプ[セパレータ]キー値のハッシュ値文字列[セパレータ]データノード名
1241+ buf.append("32");
1242+ buf.append(ImdstDefine.keyHelperClientParamSep);
1243+
1244+ // 送信
1245+ keyNodeConnector.println(buf.toString());
1246+ keyNodeConnector.flush();
1247+
1248+ // 返却値取得
1249+ String retParam = keyNodeConnector.readLineWithReady(buf.toString());
1250+
1251+ retParams = retParam.split(ImdstDefine.keyHelperClientParamSep);
1252+
1253+ if (!retParams[1].equals("true")) {
1254+
1255+ retStrs = new String[1];
1256+ retStrs[0] = "false";
1257+ } else {
1258+
1259+
1260+ retStrs = new String[3];
1261+ retStrs[0] = "true";
1262+ retStrs[1] = retParams[2];
1263+ retStrs[2] = retParams[3];
1264+ }
1265+
1266+ if (retStrs[0].equals("true")) {
1267+ try {
1268+
1269+ // 正しく終了した場合のみコネクションをキャッシュに戻す
1270+ if (keyNodeConnector != null) {
1271+ keyNodeConnector.setSoTimeout(ImdstDefine.nodeConnectionTimeout);
1272+ addKeyNodeCacheConnectionPool(keyNodeConnector);
1273+ keyNodeConnector = null;
1274+ }
1275+ } catch(Exception e1) {
1276+ // 無視
1277+ logger.error("", e1);
1278+ }
1279+ }
1280+ } catch(Exception e) {
1281+ retStrs = new String[1];
1282+ retStrs[0] = "false";
1283+ logger.info("Node Status Check Error Node Name = [" + nodeName + "] Port [" + port + "]");
1284+ logger.info(e);
1285+ //e.printStackTrace();
1286+ } finally {
1287+ try {
1288+
1289+ if (keyNodeConnector != null) keyNodeConnector.close();
1290+
1291+ } catch(Exception e2) {
1292+ // 無視
1293+ logger.error("", e2);
1294+ }
1295+ }
1296+ return retStrs;
1297+ }
1298+
1299+
1300+ /**
1301+ * 指定されたノードを強制的に終了する.<br>
1302+ *
1303+ * @param nodeName ノード名
1304+ * @param port ポート番号
1305+ * @param logger ロガー
1306+ */
1307+ protected void execForceShutdownNode(String nodeName, int port, ILogger logger) {
1308+ String connectionFullName = nodeName + ":" + port;
1309+
1310+ KeyNodeConnector keyNodeConnector = null;
1311+
1312+ String[] retParams = null;
1313+ boolean cacheConnectUse = false;
1314+
1315+ keyNodeConnector = null;
1316+ retParams = null;
1317+
1318+ try {
1319+
1320+ // 一回目のPingはCacheのコネクションを積極的に使う
1321+ // キャッシュが存在する場合はそこから取得
1322+ if (keyNodeConnectPool.containsKey(connectionFullName)) {
1323+ if((keyNodeConnector = (KeyNodeConnector)((ArrayBlockingQueue)keyNodeConnectPool.get(connectionFullName)).poll()) != null) {
1324+ if (!checkConnectionEffective(connectionFullName, keyNodeConnector.getConnetTime())) {
1325+ keyNodeConnector = null;
1326+ }
1327+ }
1328+ }
1329+
1330+ // コネクションがなければ自身で接続
1331+ if (keyNodeConnector == null) {
1332+
1333+ // 接続
1334+ keyNodeConnector = new KeyNodeConnector(nodeName, port, connectionFullName);
1335+ keyNodeConnector.connect(ImdstDefine.nodeConnectionOpenPingTimeout * 5);
1336+ }
1337+
1338+ // タイムアウト設定
1339+ keyNodeConnector.setSoTimeout(ImdstDefine.nodeConnectionPingTimeout * 5);
1340+
1341+ // Key値でデータノード名を保存
1342+ StringBuilder buf = new StringBuilder(25);
1343+ // パラメータ作成 処理タイプ[セパレータ]キー値のハッシュ値文字列[セパレータ]データノード名
1344+ buf.append("888");
1345+ buf.append(ImdstDefine.keyHelperClientParamSep);
1346+
1347+ // 送信
1348+ keyNodeConnector.println(buf.toString());
1349+ keyNodeConnector.flush();
1350+ } catch(Exception e) {
1351+ logger.info("Node force shutdown Error Node Name = [" + nodeName + "] Port [" + port + "]");
1352+ logger.info(e);
1353+ //e.printStackTrace();
1354+ } finally {
1355+ try {
1356+
1357+ if (keyNodeConnector != null) keyNodeConnector.close();
1358+
1359+ } catch(Exception e2) {
1360+ // 無視
1361+ logger.error("", e2);
1362+ }
1363+ }
1364+ }
1365+
1366+
1367+ /**
1368+ * 指定されたノードが行なっているリカバリ処理を停止する.<br>
1369+ *
1370+ * @param nodeName ノード名
1371+ * @param port ポート番号
1372+ * @param logger ロガー
1373+ */
1374+ protected void stopRecoverDataOutputOperation(String nodeName, int port, ILogger logger) {
1375+ String connectionFullName = nodeName + ":" + port;
1376+
1377+ KeyNodeConnector keyNodeConnector = null;
1378+
1379+ String[] retParams = null;
1380+ boolean cacheConnectUse = false;
1381+
1382+ keyNodeConnector = null;
1383+ retParams = null;
1384+
1385+ try {
1386+
1387+ // キャッシュが存在する場合はそこから取得
1388+ if (keyNodeConnectPool.containsKey(connectionFullName)) {
1389+ if((keyNodeConnector = (KeyNodeConnector)((ArrayBlockingQueue)keyNodeConnectPool.get(connectionFullName)).poll()) != null) {
1390+ if (!checkConnectionEffective(connectionFullName, keyNodeConnector.getConnetTime())) {
1391+ keyNodeConnector = null;
1392+ }
1393+ }
1394+ }
1395+
1396+ // コネクションがなければ自身で接続
1397+ if (keyNodeConnector == null) {
1398+
1399+ // 接続
1400+ keyNodeConnector = new KeyNodeConnector(nodeName, port, connectionFullName);
1401+ keyNodeConnector.connect(ImdstDefine.nodeConnectionOpenPingTimeout * 5);
1402+ }
1403+
1404+ // タイムアウト設定
1405+ keyNodeConnector.setSoTimeout(1000 * 20);
1406+
1407+ // Key値でデータノード名を保存
1408+ StringBuilder buf = new StringBuilder(25);
1409+ // パラメータ作成 処理タイプ[セパレータ]キー値のハッシュ値文字列[セパレータ]データノード名
1410+ buf.append("887");
1411+ buf.append(ImdstDefine.keyHelperClientParamSep);
1412+
1413+ // 送信
1414+ keyNodeConnector.println(buf.toString());
1415+ keyNodeConnector.flush();
1416+ } catch(Exception e) {
1417+ logger.info("Node stop recover operation stop Error Node Name = [" + nodeName + "] Port [" + port + "]");
1418+ logger.info(e);
1419+ //e.printStackTrace();
1420+ } finally {
1421+ try {
1422+
1423+ if (keyNodeConnector != null) keyNodeConnector.close();
1424+
1425+ } catch(Exception e2) {
1426+ // 無視
1427+ logger.error("", e2);
1428+ }
1429+ }
1430+ }
1431+
1432+
1433+ /**
11941434 * 実行依頼のパラメータと結果パラータをQueueに書き込む.<br>
11951435 * 正し、書き込む命令は限定可能とし、書き込むパラメータは成功したものだけとする.<br>
11961436 *
--- trunk/src/okuyama/imdst/helper/KeyManagerHelper.java (revision 1018)
+++ trunk/src/okuyama/imdst/helper/KeyManagerHelper.java (revision 1019)
@@ -6,6 +6,7 @@
66 import javax.script.*;
77 import java.util.concurrent.atomic.AtomicInteger;
88
9+import okuyama.base.JavaMain;
910 import okuyama.base.lang.BatchException;
1011 import okuyama.base.job.AbstractHelper;
1112 import okuyama.base.job.IJob;
@@ -773,6 +774,22 @@
773774 pw.flush();
774775 //retParamBuf = null;
775776 break;
777+
778+ case 32 :
779+
780+ // KeyMapManagerが現在どのようなステータスで稼働しているかを返す
781+ // 通常稼働 or 復旧対象 or 復旧データ取得先
782+ // 差分データ取得機能On、差分データ取得機能Off
783+ int keyMapManagerOperationStatus = this.keyMapManager.myOperationStatus;
784+ int keyMapManagerDiffModeStatus = this.keyMapManager.myDiffModeOperationStatus;
785+ retParamBuf.append("32");
786+ retParamBuf.append(ImdstDefine.keyHelperClientParamSep);
787+ retParamBuf.append("true");
788+ retParamBuf.append(ImdstDefine.keyHelperClientParamSep);
789+ retParamBuf.append(keyMapManagerOperationStatus);
790+ retParamBuf.append(ImdstDefine.keyHelperClientParamSep);
791+ retParamBuf.append(keyMapManagerDiffModeStatus);
792+ break;
776793 case 40 :
777794
778795 // Key値に紐づいている指定のTagを消す
@@ -961,6 +978,24 @@
961978 }
962979 retParamBuf.append(this.keyMapManager.keyObjectExport(memoryObjBkupFilePath));
963980 break;
981+ case 887 :
982+
983+ // データ復旧を強制終了
984+ this.keyMapManager.outputDataStopSignal = true;
985+ System.out.println("Stop Output Data Start");
986+ Thread.sleep(5000);
987+
988+ this.keyMapManager.outputDataStopSignal = false;
989+ System.out.println("Stop Output Data End");
990+
991+ this.keyMapManager.diffDataModeOff();
992+ retParamBuf.append("Stop Diffmode");
993+ break;
994+ case 888 :
995+
996+ // 強制終了
997+ JavaMain.shutdownMainProcess();
998+ break;
964999 default :
9651000
9661001 logger.debug("KeyManagerHelper No Method =[" + clientParameterList[0] + "]");
@@ -1119,7 +1154,6 @@
11191154 retStrs[2] = "NG:Max Data Size Over";
11201155 }
11211156 } catch (BatchException be) {
1122-be.printStackTrace();
11231157 logger.debug("KeyManagerHelper - setDatanode - Error = [" + key + "]", be);
11241158 //logger.debug("KeyManagerHelper - setDatanode - Error", be);
11251159 retStrs[0] = "1";
--- trunk/src/okuyama/imdst/helper/KeyNodeWatchHelper.java (revision 1018)
+++ trunk/src/okuyama/imdst/helper/KeyNodeWatchHelper.java (revision 1019)
@@ -114,7 +114,8 @@
114114
115115 logger.info("************************************************************");
116116 logger.info(nodeDt[0] + ":" + nodeDt[1] + " Node Check Start");
117-
117+ boolean nodeRecoverExec = false;
118+
118119 pingRet = this.execNodePing(nodeDt[0], new Integer(nodeDt[1]).intValue(), logger);
119120 if (pingRet[1] != null) {
120121 this.nodeStatusStr = pingRet[1];
@@ -127,7 +128,58 @@
127128 StatusUtil.setNodeStatusDt(nodeDt[0] + ":" + nodeDt[1], "Node Check Dead");
128129 } else if (!super.isNodeArrival(nodeInfo)) {
129130
130- // ノードが復旧
131+ // Node復旧を依頼
132+ nodeRecoverExec = true;
133+ } else {
134+ String[] nodeArrivalStatus = super.getNodeArrivalStatus(nodeDt[0], new Integer(nodeDt[1]).intValue(), logger);
135+
136+ if (nodeArrivalStatus != null && nodeArrivalStatus[0].equals("true") && nodeArrivalStatus[1].equals("1") && nodeArrivalStatus[2].equals("1")) {
137+ // 正常に稼働中
138+ logger.info(nodeDt[0] + ":" + nodeDt[1] + " Node Check Arrival");
139+ logger.info(nodeDt[0] + ":" + nodeDt[1] + " Server Status [" + this.nodeStatusStr + "]");
140+ StatusUtil.setNodeStatusDt(nodeDt[0] + ":" + nodeDt[1], "[" + this.nodeStatusStr + "]");
141+
142+ // データサイズが転送されていれば格納
143+ if (i < nodeCount) {
144+ if(this.nodeStatusStr != null) {
145+ String[] sizeWork = this.nodeStatusStr.split("Save Data Size=");
146+ if(sizeWork.length > 1) {
147+
148+ StatusUtil.setNodeDataSize(new Integer(i), sizeWork[1].trim().substring(1,sizeWork[1].length()-1).trim().split(":"));
149+ }
150+ }
151+ }
152+ // SlaveMasterNodeにも伝搬する
153+ super.setArriveNode(nodeDt[0] + ":" + nodeDt[1]);
154+ } else {
155+ if (nodeArrivalStatus != null && nodeArrivalStatus[0].equals("true") && (nodeArrivalStatus[1].equals("2") && (nodeArrivalStatus[2].equals("1") || nodeArrivalStatus[2].equals("2")))) {
156+ // 復旧中のDataNodeが復旧を終えずにノードに加わろうとしている。
157+ // 強制終了
158+ super.execForceShutdownNode(nodeDt[0], new Integer(nodeDt[1]).intValue(), logger);
159+ Thread.sleep(3000);
160+
161+ // ノードダウン
162+ logger.info(nodeDt[0] + ":" + nodeDt[1] + " Node Check Restoration abnormal state killed processing.");
163+ super.setDeadNode(nodeInfo, 1, null, true);
164+ StatusUtil.setNodeStatusDt(nodeDt[0] + ":" + nodeDt[1], "Node Check Dead");
165+ } else if (nodeArrivalStatus != null && nodeArrivalStatus[0].equals("true") && (nodeArrivalStatus[1].equals("3") || nodeArrivalStatus[2].equals("2"))) {
166+ // 復旧データの取得元のDataNodeが復旧を終えずに放置されている
167+ // 復旧及び、差分モードを強制off
168+ super.stopRecoverDataOutputOperation(nodeDt[0], new Integer(nodeDt[1]).intValue(), logger);
169+ logger.info(nodeDt[0] + ":" + nodeDt[1] + " Node Check Arrival");
170+ logger.info(nodeDt[0] + ":" + nodeDt[1] + " This Datanode is recover data output processing now. killed the operation.");
171+ // SlaveMasterNodeにも伝搬する
172+ super.setArriveNode(nodeDt[0] + ":" + nodeDt[1]);
173+ } else if (nodeArrivalStatus != null && nodeArrivalStatus[0].equals("true") && nodeArrivalStatus[1].equals("4")) {
174+ // ノード復旧が必要
175+ nodeRecoverExec = true;
176+ }
177+ }
178+ }
179+
180+
181+ if (nodeRecoverExec) {
182+ // ノード復旧が必要
131183 logger.info("Node Name [" + nodeInfo +"] Reboot");
132184
133185 // ノード復旧処理を記録
@@ -155,23 +207,26 @@
155207 StatusUtil.setNodeStatusDt(nodeInfo, "Recover Start");
156208
157209 // 復旧開始
158- if(nodeDataRecover(nodeInfo, (String)subNodeList.get(i), logger)) {
159-
160- // リカバー成功
161- // 該当ノードの復帰を登録
162- logger.info(nodeInfo + " - Recover Success");
163-
164- // 復旧処理ノードから記録を消す
165- rebootNodeMap.remove(nodeInfo);
166- StatusUtil.setNodeStatusDt(nodeInfo, "Recover Success");
210+ if (ImdstDefine.notPromotionMainMasterNodeStatus == true) {
211+ logger.info(nodeInfo + " - Recover pass");
167212 } else {
168- // リカバー失敗
169- logger.info(nodeInfo + " - Recover Miss");
170- // リカバー終了を伝える
171- super.setRecoverNode(false, "");
172- StatusUtil.setNodeStatusDt(nodeInfo, "Recover Miss");
213+ if(nodeDataRecover(nodeInfo, (String)subNodeList.get(i), logger)) {
214+
215+ // リカバー成功
216+ // 該当ノードの復帰を登録
217+ logger.info(nodeInfo + " - Recover Success");
218+
219+ // 復旧処理ノードから記録を消す
220+ rebootNodeMap.remove(nodeInfo);
221+ StatusUtil.setNodeStatusDt(nodeInfo, "Recover Success");
222+ } else {
223+ // リカバー失敗
224+ logger.info(nodeInfo + " - Recover Miss");
225+ // リカバー終了を伝える
226+ super.setRecoverNode(false, "");
227+ StatusUtil.setNodeStatusDt(nodeInfo, "Recover Miss");
228+ }
173229 }
174-
175230 logger.info(nodeInfo + " - Recover End");
176231 } else {
177232 // ノードの復旧を記録
@@ -179,29 +234,13 @@
179234 rebootNodeMap.remove(nodeInfo);
180235 super.setArriveNode(nodeInfo);
181236 }
182- } else {
183- logger.info(nodeDt[0] + ":" + nodeDt[1] + " Node Check Arrival");
184- logger.info(nodeDt[0] + ":" + nodeDt[1] + " Server Status [" + this.nodeStatusStr + "]");
185- StatusUtil.setNodeStatusDt(nodeDt[0] + ":" + nodeDt[1], "[" + this.nodeStatusStr + "]");
186-
187- // データサイズが転送されていれば格納
188- if (i < nodeCount) {
189- if(this.nodeStatusStr != null) {
190- String[] sizeWork = this.nodeStatusStr.split("Save Data Size=");
191- if(sizeWork.length > 1) {
192-
193- StatusUtil.setNodeDataSize(new Integer(i), sizeWork[1].trim().substring(1,sizeWork[1].length()-1).trim().split(":"));
194- }
195- }
196- }
197- // SlaveMasterNodeにも伝搬する
198- super.setArriveNode(nodeDt[0] + ":" + nodeDt[1]);
199237 }
200238 logger.info(nodeDt[0] + ":" + nodeDt[1] + " Node Check End");
239+ logger.info("------------------------------------------------------------");
201240
202241
203- logger.info("------------------------------------------------------------");
204242 // ノードチェック(Sub)
243+ boolean subNodeRecoverExec = false;
205244 if (subNodeList != null && i < subNodeList.size()) {
206245 String subNodeInfo = (String)subNodeList.get(i);
207246 String[] subNodeDt = subNodeInfo.split(":");
@@ -217,7 +256,59 @@
217256 super.setDeadNode(subNodeInfo, 2,null, true);
218257 StatusUtil.setNodeStatusDt(subNodeDt[0] + ":" + subNodeDt[1], "SubNode Check Dead");
219258 } else if (!super.isNodeArrival(subNodeInfo)) {
259+ // SubNodeのリカバリが必要
260+ subNodeRecoverExec = true;
261+ } else {
262+ String[] nodeArrivalStatus = super.getNodeArrivalStatus(subNodeDt[0], new Integer(subNodeDt[1]).intValue(), logger);
263+
264+ if (nodeArrivalStatus != null && nodeArrivalStatus[0].equals("true") && nodeArrivalStatus[1].equals("1") && nodeArrivalStatus[2].equals("1")) {
220265
266+ logger.info(subNodeDt[0] + ":" + subNodeDt[1] + " Sub Node Check Arrival");
267+ logger.info(subNodeDt[0] + ":" + subNodeDt[1] + " Server Status [" + this.nodeStatusStr + "]");
268+ StatusUtil.setNodeStatusDt(subNodeDt[0] + ":" + subNodeDt[1], "[" + this.nodeStatusStr + "]");
269+
270+ // データサイズが転送されていれば格納
271+ if (i < nodeCount) {
272+ if(this.nodeStatusStr != null) {
273+ String[] sizeWork = this.nodeStatusStr.split("Save Data Size=");
274+ if(sizeWork.length > 1) {
275+
276+ StatusUtil.setNodeDataSize(new Integer(i), sizeWork[1].trim().substring(1,sizeWork[1].length()-1).trim().split(":"));
277+ }
278+ }
279+ }
280+
281+ // SlaveMasterNodeにも伝搬する
282+ super.setArriveNode(subNodeDt[0] + ":" + subNodeDt[1]);
283+ } else {
284+
285+ if (nodeArrivalStatus != null && nodeArrivalStatus[0].equals("true") && (nodeArrivalStatus[1].equals("2") && (nodeArrivalStatus[2].equals("2") || nodeArrivalStatus[2].equals("1")))) {
286+ // 復旧中のDataNodeが復旧を終えずにノードに加わろうとしている。
287+ // 強制終了
288+ super.execForceShutdownNode(subNodeDt[0], new Integer(subNodeDt[1]).intValue(), logger);
289+ Thread.sleep(3000);
290+ // ノードダウン
291+ logger.info(subNodeDt[0] + ":" + subNodeDt[1] + " SubNode Check Restoration abnormal state killed processing.");
292+ super.setDeadNode(subNodeInfo, 2,null, true);
293+ StatusUtil.setNodeStatusDt(subNodeDt[0] + ":" + subNodeDt[1], "SubNode Check Dead");
294+ } else if (nodeArrivalStatus != null && nodeArrivalStatus[0].equals("true") && (nodeArrivalStatus[1].equals("3") || nodeArrivalStatus[2].equals("2"))) {
295+ // 復旧データの取得元のDataNodeが復旧を終えずに放置されている
296+ // 復旧及び、差分モードを強制off
297+ super.stopRecoverDataOutputOperation(subNodeDt[0], new Integer(subNodeDt[1]).intValue(), logger);
298+
299+ logger.info(subNodeDt[0] + ":" + subNodeDt[1] + " Node Check Arrival");
300+ logger.info(subNodeDt[0] + ":" + subNodeDt[1] + " This Datanode is recover data output processing now. killed the operation.");
301+ // SlaveMasterNodeにも伝搬する
302+ super.setArriveNode(subNodeDt[0] + ":" + subNodeDt[1]);
303+ } else if (nodeArrivalStatus != null && nodeArrivalStatus[0].equals("true") && nodeArrivalStatus[1].equals("4")) {
304+ // SubNodeのリカバリが必要
305+ subNodeRecoverExec = true;
306+ }
307+ }
308+ }
309+
310+ if (subNodeRecoverExec) {
311+
221312 // 停止していたノードが復帰した場合
222313 // 停止中に登録予定であったデータを登録する
223314 logger.info("Node Name [" + subNodeInfo +"] Reboot");
@@ -239,43 +330,29 @@
239330 logger.info(subNodeInfo + " - Recover Start");
240331 StatusUtil.setNodeStatusDt(subNodeInfo, "Recover Start");
241332 // 復旧開始
242- if(nodeDataRecover(subNodeInfo, nodeInfo, logger)) {
243-
244- // リカバー成功
245- // 該当ノードの復帰を登録
246- logger.info(subNodeInfo + " - Recover Success");
247-
248- // 復旧処理ノードから記録を消す
249- rebootNodeMap.remove(subNodeInfo);
250- StatusUtil.setNodeStatusDt(subNodeInfo, "Recover Success");
333+ if (ImdstDefine.notPromotionMainMasterNodeStatus == true) {
334+ logger.info(subNodeInfo + " - Recover pass");
251335 } else {
252-
253- // リカバー失敗
254- logger.info(subNodeInfo + " - Recover Miss");
255- // リカバー終了を伝える
256- super.setRecoverNode(false, "");
257- StatusUtil.setNodeStatusDt(subNodeInfo, "Recover Miss");
336+
337+ if(nodeDataRecover(subNodeInfo, nodeInfo, logger)) {
338+
339+ // リカバー成功
340+ // 該当ノードの復帰を登録
341+ logger.info(subNodeInfo + " - Recover Success");
342+
343+ // 復旧処理ノードから記録を消す
344+ rebootNodeMap.remove(subNodeInfo);
345+ StatusUtil.setNodeStatusDt(subNodeInfo, "Recover Success");
346+ } else {
347+
348+ // リカバー失敗
349+ logger.info(subNodeInfo + " - Recover Miss");
350+ // リカバー終了を伝える
351+ super.setRecoverNode(false, "");
352+ StatusUtil.setNodeStatusDt(subNodeInfo, "Recover Miss");
353+ }
258354 }
259-
260355 logger.info(subNodeInfo + " - Recover End");
261- } else {
262- logger.info(subNodeDt[0] + ":" + subNodeDt[1] + " Sub Node Check Arrival");
263- logger.info(subNodeDt[0] + ":" + subNodeDt[1] + " Server Status [" + this.nodeStatusStr + "]");
264- StatusUtil.setNodeStatusDt(subNodeDt[0] + ":" + subNodeDt[1], "[" + this.nodeStatusStr + "]");
265-
266- // データサイズが転送されていれば格納
267- if (i < nodeCount) {
268- if(this.nodeStatusStr != null) {
269- String[] sizeWork = this.nodeStatusStr.split("Save Data Size=");
270- if(sizeWork.length > 1) {
271-
272- StatusUtil.setNodeDataSize(new Integer(i), sizeWork[1].trim().substring(1,sizeWork[1].length()-1).trim().split(":"));
273- }
274- }
275- }
276-
277- // SlaveMasterNodeにも伝搬する
278- super.setArriveNode(subNodeDt[0] + ":" + subNodeDt[1]);
279356 }
280357 logger.info(subNodeDt[0] + ":" + subNodeDt[1] + " Sub Node Check End");
281358 logger.info("************************************************************");
--- trunk/src/okuyama/imdst/helper/MasterManagerHelper.java (revision 1018)
+++ trunk/src/okuyama/imdst/helper/MasterManagerHelper.java (revision 1019)
@@ -703,6 +703,11 @@
703703
704704 if (retParams != null && retParams[0].equals("2")) retParams[0] = "23-f";
705705 break;
706+ case 24 :
707+
708+ // Key値にTagを紐付ける
709+ retParams = this.insertKeyPairTag(clientParameterList[1], clientParameterList[2], clientParameterList[3]);
710+ break;
706711 case 30 :
707712
708713 // 各キーノードへデータロック依頼
@@ -1216,7 +1221,7 @@
12161221 * @return String[] 結果
12171222 * @throws BatchException
12181223 */
1219- private String[] setKeyValue(String keyStr, String tagStr, String transactionCode, String dataStr) throws BatchException {
1224+ public String[] setKeyValue(String keyStr, String tagStr, String transactionCode, String dataStr) throws BatchException {
12201225 return setKeyValue(keyStr, tagStr, transactionCode, dataStr, false);
12211226 }
12221227
@@ -1235,7 +1240,7 @@
12351240 * @return String[] 結果
12361241 * @throws BatchException
12371242 */
1238- private String[] setKeyValue(String keyStr, String tagStr, String transactionCode, String dataStr, boolean fixPrefix) throws BatchException {
1243+ public String[] setKeyValue(String keyStr, String tagStr, String transactionCode, String dataStr, boolean fixPrefix) throws BatchException {
12391244 //logger.debug("MasterManagerHelper - setKeyValue - start");
12401245 String[] retStrs = new String[3];
12411246
@@ -1262,16 +1267,17 @@
12621267 return retStrs;
12631268 }
12641269
1265- // Value値チェック
1266- if (!this.checkValueLength(dataStr)) {
1267- // 保存失敗
1268- retStrs[0] = "1";
1269- retStrs[1] = "false";
1270- retStrs[2] = "Value Length Error";
1271- return retStrs;
1270+ if (dataStr != null || (tagStr == null && dataStr == null)) {
1271+ // Value値チェック
1272+ if (!this.checkValueLength(dataStr)) {
1273+ // 保存失敗
1274+ retStrs[0] = "1";
1275+ retStrs[1] = "false";
1276+ retStrs[2] = "Value Length Error";
1277+ return retStrs;
1278+ }
12721279 }
12731280
1274-
12751281 // Tag値を保存
12761282 if (tagStr != null && !tagStr.equals("")) {
12771283
@@ -1342,6 +1348,14 @@
13421348 }
13431349 }
13441350
1351+ // Tagのみ登録の場合はここで終了
1352+ if (dataStr == null && tagStr != null) {
1353+ retStrs[0] = "1";
1354+ retStrs[1] = "true";
1355+ retStrs[2] = "OK";
1356+ return retStrs;
1357+ }
1358+
13451359 // キー値とデータを保存
13461360 // 保存先問い合わせ
13471361 String[] keyNodeInfo = DataDispatcher.dispatchKeyNode(keyStr, false);
@@ -1394,8 +1408,41 @@
13941408 return retStrs;
13951409 }
13961410
1411+ /**
1412+ * KeyへTagの紐付けのみ行う.<br>
1413+ * 処理フロー.<br>
1414+ *
1415+ * @param keyStr key値の文字列
1416+ * @param tagStr tag値の文字列
1417+ * @return String[] 結果
1418+ * @throws BatchException
1419+ */
1420+ public String[] insertKeyPairTag(String keyStr, String tagStr, String transactionCode) throws BatchException {
1421+ // Tag値チェック
1422+ if (tagStr != null && !tagStr.equals("")) {
1423+ // Tag値が存在しない
1424+ String[] retStrs = new String[3];
1425+ retStrs[0] = "24";
1426+ retStrs[1] = "error";
1427+ retStrs[2] = "Tag parameter Indispensable Error";
1428+ return retStrs;
1429+ }
13971430
1431+ String[] tagInsertRet = setKeyValue(keyStr, tagStr, transactionCode, null, false);
1432+ if (tagInsertRet != null && tagInsertRet[1].equals("true")) {
1433+ tagInsertRet[0] = "24";
1434+ tagInsertRet[1] = "true";
1435+ tagInsertRet[2] = "OK";
1436+ } else {
1437+ tagInsertRet = new String[3];
1438+ tagInsertRet[0] = "24";
1439+ tagInsertRet[1] = "false";
1440+ tagInsertRet[2] = "Tag insert fail";
1441+ }
1442+ return tagInsertRet;
1443+ }
13981444
1445+
13991446 /**
14001447 * Key-Valueを保存する.<br>
14011448 * ただし保存時にValueをN-gram(ユニグラム)方式にてインデックスを作成する<br>
--- trunk/src/okuyama/imdst/util/KeyMapManager.java (revision 1018)
+++ trunk/src/okuyama/imdst/util/KeyMapManager.java (revision 1019)
@@ -156,6 +156,11 @@
156156 private Object diffSync = new Object();
157157 private List diffDataPoolingListForFileBase = null;
158158
159+ // 現在のKeyMapManagerの状態を"通常(1)" or "復旧中(2)" or "復旧データ取得元(3)"の3種類で管理する
160+ public int myOperationStatus = 1;
161+
162+ // 現在のKeyMapManagerのdiffデータモード状態を"通常(1)" or "diffモード(2)"の2種類で管理する
163+ public int myDiffModeOperationStatus = 1;
159164
160165 // ノード間でのデータ移動時に削除として蓄積するMap
161166 private ConcurrentHashMap moveAdjustmentDataMap = null;
@@ -183,6 +188,9 @@
183188 // 初期化メソッド
184189 // Transactionを管理する場合に呼び出す
185190 public KeyMapManager(String keyMapFilePath, String workKeyMapFilePath, boolean workFileMemory, int keySize, boolean dataMemory, boolean dataManage, String diskCacheFile) throws BatchException {
191+ if (ImdstDefine.recoverRequired == true) {
192+ myOperationStatus = 4;
193+ }
186194 this.keyObjBkupMode = true;
187195 this.diskCacheFile = diskCacheFile;
188196 this.bkupObjCheck(keyMapFilePath);
@@ -194,6 +202,9 @@
194202 // 初期化メソッド
195203 // Key値はメモリを使用する場合に使用
196204 public KeyMapManager(String keyMapFilePath, String workKeyMapFilePath, boolean workFileMemory, int keySize, boolean dataMemory, String diskCacheFile) throws BatchException {
205+ if (ImdstDefine.recoverRequired == true) {
206+ myOperationStatus = 4;
207+ }
197208 this.keyObjBkupMode = true;
198209 this.diskCacheFile = diskCacheFile;
199210 this.bkupObjCheck(keyMapFilePath);
@@ -203,6 +214,10 @@
203214 // 初期化メソッド
204215 // Key値はメモリを使用する場合に使用
205216 public KeyMapManager(String keyMapFilePath, String workKeyMapFilePath, boolean workFileMemory, int keySize, boolean dataMemory, int memoryLimitSize, String[] virtualStorageDirs, String diskCacheFile) throws BatchException {
217+ if (ImdstDefine.recoverRequired == true) {
218+ myOperationStatus = 4;
219+ }
220+
206221 this.keyObjBkupMode = true;
207222 this.diskCacheFile = diskCacheFile;
208223 this.bkupObjCheck(keyMapFilePath);
@@ -215,6 +230,10 @@
215230 // 初期化メソッド
216231 // Keyもファイルの場合
217232 public KeyMapManager(String keyMapFilePath, String workKeyMapFilePath, boolean workFileMemory, int keySize, boolean dataMemory, String[] dirs, String diskCacheFile) throws BatchException {
233+ if (ImdstDefine.recoverRequired == true) {
234+ myOperationStatus = 4;
235+ }
236+
218237 boolean renewFlg = false;
219238 this.diskCacheFile = diskCacheFile;
220239 for (int idx = 0; idx < dirs.length; idx++) {
@@ -2326,7 +2345,7 @@
23262345 synchronized (diffSync) {
23272346
23282347 if (flg) {
2329-
2348+ this.myDiffModeOperationStatus = 2;
23302349 this.diffDataPoolingListForFileBase = new FileBaseDataList(this.nodeKeyMapFilePath + ".difftmplist");
23312350 } else {
23322351
@@ -2334,6 +2353,7 @@
23342353 this.diffDataPoolingListForFileBase.clear();
23352354 this.diffDataPoolingListForFileBase = null;
23362355 }
2356+ this.myDiffModeOperationStatus = 1;
23372357 }
23382358 this.diffDataPoolingFlg = flg;
23392359 try {
@@ -2349,7 +2369,7 @@
23492369 synchronized (diffSync) {
23502370
23512371 if (flg) {
2352-
2372+ this.myDiffModeOperationStatus = 2;
23532373 this.diffDataPoolingListForFileBase = new FileBaseDataList(this.nodeKeyMapFilePath + ".difftmplist");
23542374 } else {
23552375
@@ -2357,6 +2377,7 @@
23572377 this.diffDataPoolingListForFileBase.clear();
23582378 this.diffDataPoolingListForFileBase = null;
23592379 }
2380+ this.myDiffModeOperationStatus = 1;
23602381 }
23612382 this.diffDataPoolingFlg = flg;
23622383 }
@@ -2370,6 +2391,7 @@
23702391 this.diffDataPoolingListForFileBase.clear();
23712392 this.diffDataPoolingListForFileBase = null;
23722393 }
2394+ this.myDiffModeOperationStatus = 1;
23732395
23742396 this.diffDataPoolingFlg = false;
23752397 }
@@ -2382,7 +2404,7 @@
23822404 try {
23832405
23842406 synchronized(poolKeyLock) {
2385-
2407+ this.myOperationStatus = 3;
23862408 logger.info("outputKeyMapObj2Stream - synchronized - start");
23872409 String allDataSep = "";
23882410 StringBuilder allDataBuf = new StringBuilder(ImdstDefine.stringBufferLarge_3Size);
@@ -2556,6 +2578,8 @@
25562578 } catch (Exception e) {
25572579 e.printStackTrace();
25582580 logger.error("outputKeyMapObj2Stream - Error =[" + e.getMessage() + "]");
2581+ } finally {
2582+ this.myOperationStatus = 1;
25592583 }
25602584 }
25612585 }
@@ -2569,7 +2593,7 @@
25692593
25702594 synchronized(poolKeyLock) {
25712595 String nextWrite = null;
2572-
2596+ this.myOperationStatus = 3;
25732597 logger.info("outputDiffKeyMapObj2Stream - synchronized - start");
25742598 String allDataSep = "";
25752599 StringBuilder allDataBuf = new StringBuilder(ImdstDefine.stringBufferLarge_3Size);
@@ -2671,6 +2695,8 @@
26712695 //blocking = true;
26722696 //StatusUtil.setStatusAndMessage(1, "outputDiffKeyMapObj2Stream - Error [" + e.getMessage() + "]");
26732697 //throw new BatchException(e);
2698+ } finally {
2699+ this.myOperationStatus = 1;
26742700 }
26752701 }
26762702 }
@@ -2680,6 +2706,7 @@
26802706 public void inputKeyMapObj2Stream(BufferedReader br, PrintWriter pw, int dataLineCount) throws BatchException {
26812707 if (!blocking) {
26822708 try {
2709+ this.myOperationStatus = 2;
26832710 int i = 0;
26842711 String[] oneDatas = null;
26852712 boolean setDataExec = false;
@@ -2799,6 +2826,7 @@
27992826 }
28002827 }
28012828 logger.info("inputKeyMapObj2Stream - synchronized - end");
2829+
28022830 } catch (Exception e) {
28032831 try {
28042832 pw.println("-1");
@@ -2932,7 +2960,12 @@
29322960 }
29332961 }
29342962 logger.info("inputDiffKeyMapObj2Stream - synchronized - end");
2963+ this.myOperationStatus = 1;
2964+
2965+ // 起動時にリカバリが必要な指定が入っている場合もそれをoffとする
2966+ if (ImdstDefine.recoverRequired == true) ImdstDefine.recoverRequired = false;
29352967 } catch (Exception e) {
2968+ this.myOperationStatus = 2;
29362969 logger.error("inputDiffKeyMapObj2Stream - Error");
29372970 blocking = true;
29382971 StatusUtil.setStatusAndMessage(1, "inputDiffKeyMapObj2Stream - Error [" + e.getMessage() + "]");
--- trunk/src/okuyama/imdst/util/ImdstDefine.java (revision 1018)
+++ trunk/src/okuyama/imdst/util/ImdstDefine.java (revision 1019)
@@ -11,8 +11,17 @@
1111 */
1212 public class ImdstDefine {
1313
14- public static final String okuyamaVersion = "VERSION okuyama-0.9.4";
14+ public static final String okuyamaVersion = "VERSION okuyama-0.9.5";
1515
16+ // -- MasterNodeをマルチクラスターのスレーブで起動する場合にtrueになる
17+ public volatile static boolean slaveClusterMasterNode = false;
18+ // -- MasterNodeをマルチクラスターで動かしている場合にリカバリを行う指定
19+ public volatile static boolean rebuildOkuyamaClusterMode =false;
20+
21+ // -- MainMasterNodeに昇格しないMasterNodeを作る際にtrueとする。
22+ public volatile static boolean notPromotionMainMasterNodeStatus = false;
23+
24+
1625 // -- KeyMapファイルに関係する定数 -------------------------------------------------
1726 // KeyNodeのWorkファイルでのセパレータ
1827 //public static final String keyWorkFileSep = "#imdst7386#";
@@ -554,6 +563,9 @@
554563 // DataNodeがリカバリ時、ノード追加時にデータを一度に転送する上限数を制御する
555564 public volatile static int lowSpecDataNodeSendDataCount = 2000;
556565
566+ // DataNodeがリカバリが必要な状態であることを明示して起動する場合にtrueとする。
567+ public volatile static boolean recoverRequired = false;
557568
569+
558570 public volatile static boolean fileBaseMapTimeDebug = false;
559571 }
\ No newline at end of file
--- trunk/src/okuyama/imdst/client/OkuyamaClient.java (revision 1018)
+++ trunk/src/okuyama/imdst/client/OkuyamaClient.java (revision 1019)
@@ -240,6 +240,7 @@
240240
241241 // okuyamaをマルチクラスター化した場合のスレーブのクラスターのMasterNodeのアドレス:port名
242242 private String slaveOkuyamaClusterNode = null;
243+ private boolean slaveClusterUse = false;
243244
244245 /**
245246 * コンストラクタ
--- trunk/src/okuyama/imdst/process/ServerPreprocess.java (revision 1018)
+++ trunk/src/okuyama/imdst/process/ServerPreprocess.java (revision 1019)
@@ -40,8 +40,11 @@
4040 * -efsmo ImdstDefine.executeFileStoreMapObject / バックアップ用のスナップショットObjectを出力するかの有無(デフォルト出力)
4141 * -lsdn ImdstDefine.lowSpecDataNode / DataNodeがLowSpecのサーバで稼働しているもしくはディスクが遅い、リカバリorノード追加時の負荷を下げたい場合にtrueとする
4242 * -lsdnsdc ImdstDefine.lowSpecDataNodeSendDataCount / DataNodeがリカバリ時、ノード追加時にデータを一度に転送する上限数を制御する
43+ * -scmn ImdstDefine.slaveClusterMasterNode / okuyamaをMasterNodeをマルチクラスターで起動する場合に当該ノードをスレーブノードとして起動する場合に、trueとする
44+ * -rocm ImdstDefine.rebuildOkuyamaClusterMode / okuyamaをMasterNodeをマルチクラスターで起動した場合にメインとなるクラスターがダウンし、Slaveからリビルドする場合にtrueとして起動
45+ * -npmmns ImdstDefine.notPromotionMainMasterNodeStatus / MainMasterNodeに昇格しないMasterNodeを作成する場合にtrueとする
46+ * -rr ImdstDefine.recoverRequired / DataNodeがリカバリが必要な場合にtrueとして起動する
4347 *
44- *
4548 * <br>
4649 * @author T.Okuyama
4750 * @license GPL(Lv3)
@@ -402,6 +405,44 @@
402405 }
403406 }
404407 }
408+
409+ // -scmn
410+ if (startOptions[i].trim().equals("-scmn")) {
411+ if (startOptions.length > (i+1)) {
412+ if (startOptions[i+1] != null && startOptions[i+1].trim().equals("true")) {
413+ ImdstDefine.slaveClusterMasterNode = true;
414+ }
415+ }
416+ }
417+
418+ // -rocm
419+ if (startOptions[i].trim().equals("-rocm")) {
420+ if (startOptions.length > (i+1)) {
421+ if (startOptions[i+1] != null && startOptions[i+1].trim().equals("true")) {
422+ ImdstDefine.rebuildOkuyamaClusterMode = true;
423+ }
424+ }
425+ }
426+
427+ // -npmmns
428+ if (startOptions[i].trim().equals("-npmmns")) {
429+ if (startOptions.length > (i+1)) {
430+ if (startOptions[i+1] != null && startOptions[i+1].trim().equals("true")) {
431+ ImdstDefine.notPromotionMainMasterNodeStatus = true;
432+ }
433+ }
434+ }
435+
436+ // -rr
437+ if (startOptions[i].trim().equals("-rr")) {
438+ if (startOptions.length > (i+1)) {
439+ if (startOptions[i+1] != null && startOptions[i+1].trim().equals("true")) {
440+ ImdstDefine.recoverRequired = true;
441+ }
442+ }
443+ }
444+
445+
405446 }
406447 }
407448 } catch (Exception e) {
Show on old repository browser