• R/O
  • SSH
  • HTTPS

okuyama: 提交


Commit MetaInfo

修訂1004 (tree)
時間2013-01-11 21:50:21
作者okuyamaoo

Log Message

書き込み、削除処理をOkuyamaClietの拡張クラスで一時的にバッファするように変更

Change Summary

差異

--- trunk/okuyamaFuse/src/BufferedOkuyamaClient.java (nonexistent)
+++ trunk/okuyamaFuse/src/BufferedOkuyamaClient.java (revision 1004)
@@ -0,0 +1,464 @@
1+package fuse.okuyamafs;
2+
3+import java.util.*;
4+import java.util.concurrent.*;
5+import java.util.concurrent.atomic.*;
6+
7+import okuyama.imdst.client.*;
8+import okuyama.imdst.util.*;
9+
10+
11+public class BufferedOkuyamaClient extends OkuyamaClient {
12+
13+ protected static Map putBufferedDataMap = new ConcurrentHashMap(150);
14+ protected static Map deleteBufferedDataMap = new ConcurrentHashMap(150);
15+
16+ protected static ArrayBlockingQueue okuyamaRequestQueue = new ArrayBlockingQueue(300);
17+
18+ protected static OkuyamaClientFactory factory = null;
19+
20+ protected static OkuyamaSendWorker[] workerList = null;
21+
22+ protected static Object[] sendSyncObject = null;
23+
24+ protected static int parallel = 10;
25+
26+ protected OkuyamaClient client = null;
27+
28+
29+
30+ public BufferedOkuyamaClient(OkuyamaClient client) {
31+ this.client = client;
32+ }
33+
34+
35+ public String[] setNewValue(String key, String value) throws OkuyamaClientException {
36+ return this.client.setNewValue(key, value);
37+ }
38+
39+ public String[] setNewObjectValue(String key, Object value) throws OkuyamaClientException {
40+ return this.client.setNewObjectValue(key, value);
41+ }
42+
43+
44+
45+ /**
46+ * 一度しか呼ばない
47+ *
48+ */
49+ public static void initClientMaster(OkuyamaClientFactory factory) throws Exception {
50+ BufferedOkuyamaClient.factory = factory;
51+
52+ workerList = new OkuyamaSendWorker[parallel];
53+ sendSyncObject = new Object[parallel];
54+ for (int idx = 0; idx < parallel; idx++) {
55+ workerList[idx] = new OkuyamaSendWorker();
56+ sendSyncObject[idx] = new Object();
57+ workerList[idx].start();
58+ }
59+ }
60+
61+
62+
63+ public void close() throws OkuyamaClientException {
64+ this.client.close();
65+ }
66+
67+
68+ public boolean setValue(String key, String value) throws OkuyamaClientException {
69+
70+ try {
71+ synchronized(sendSyncObject[((key.hashCode() << 1) >>> 1) % parallel]) {
72+ putBufferedDataMap.put(key, value);
73+ deleteBufferedDataMap.remove(key);
74+ Object[] request = new Object[3];
75+
76+ request[0] = new Integer(1);
77+ request[1] = key;
78+ request[2] = value;
79+ okuyamaRequestQueue.put(request);
80+ }
81+ } catch (Exception ee) {
82+ throw new OkuyamaClientException(ee);
83+ }
84+ return true;
85+ }
86+
87+ public String[] getValue(String key) throws OkuyamaClientException {
88+ String[] ret = null;
89+
90+ try {
91+ synchronized(sendSyncObject[((key.hashCode() << 1) >>> 1) % parallel]) {
92+ String value = (String)putBufferedDataMap.get(key);
93+
94+ String[] realClientRet = null;
95+
96+ if (value == null) {
97+ realClientRet = this.client.getValue(key);
98+ if (realClientRet != null && realClientRet[0].equals("true")) {
99+ value = realClientRet[1];
100+ }
101+ }
102+
103+ if (value != null) {
104+ if (deleteBufferedDataMap.containsKey(key)) {
105+ value = null;
106+ }
107+ }
108+
109+ if (value == null) {
110+
111+ ret = new String[1];
112+ ret[0] = "false";
113+ } else {
114+
115+ ret = new String[2];
116+ ret[0] = "true";
117+ ret[1] = value;
118+ }
119+ }
120+ } catch (Exception ee) {
121+ throw new OkuyamaClientException(ee);
122+ }
123+ return ret;
124+ }
125+
126+
127+ public Object[] getObjectValue(String key) throws OkuyamaClientException {
128+ Object[] ret = null;
129+
130+ try {
131+ synchronized(sendSyncObject[((key.hashCode() << 1) >>> 1) % parallel]) {
132+ Object value = (Object)putBufferedDataMap.get(key);
133+
134+ Object[] realClientRet = null;
135+
136+ if (value == null) {
137+ realClientRet = this.client.getObjectValue(key);
138+ if (realClientRet != null && realClientRet[0].equals("true")) {
139+ value = realClientRet[1];
140+ }
141+ }
142+
143+ if (value != null) {
144+ if (deleteBufferedDataMap.containsKey(key)) {
145+ value = null;
146+ }
147+ }
148+
149+ if (value == null) {
150+
151+ ret = new Object[1];
152+ ret[0] = "false";
153+ } else {
154+
155+ ret = new Object[2];
156+ ret[0] = "true";
157+ ret[1] = value;
158+ }
159+ }
160+ } catch (Exception ee) {
161+ throw new OkuyamaClientException(ee);
162+ }
163+ return ret;
164+ }
165+
166+
167+ public boolean setObjectValue(String key, Object value) throws OkuyamaClientException {
168+
169+ try {
170+ synchronized(sendSyncObject[((key.hashCode() << 1) >>> 1) % parallel]) {
171+ putBufferedDataMap.put(key, value);
172+ deleteBufferedDataMap.remove(key);
173+ Object[] request = new Object[3];
174+
175+ request[0] = new Integer(2);
176+ request[1] = key;
177+ request[2] = value;
178+ okuyamaRequestQueue.put(request);
179+ }
180+ } catch (Exception ee) {
181+ throw new OkuyamaClientException(ee);
182+ }
183+ return true;
184+ }
185+
186+ public boolean sendByteValue(String key, byte[] value) throws OkuyamaClientException {
187+
188+ try {
189+ synchronized(sendSyncObject[((key.hashCode() << 1) >>> 1) % parallel]) {
190+ putBufferedDataMap.put(key, value);
191+ deleteBufferedDataMap.remove(key);
192+ Object[] request = new Object[3];
193+
194+ request[0] = new Integer(3);
195+ request[1] = key;
196+ request[2] = value;
197+ okuyamaRequestQueue.put(request);
198+ }
199+ } catch (Exception ee) {
200+ throw new OkuyamaClientException(ee);
201+ }
202+ return true;
203+ }
204+
205+
206+
207+
208+ public String[] removeValue(String key) throws OkuyamaClientException {
209+ String[] ret = null;
210+ try {
211+
212+ synchronized(sendSyncObject[((key.hashCode() << 1) >>> 1) % parallel]) {
213+ Object removeRet = putBufferedDataMap.remove(key);
214+ if (removeRet == null) {
215+ String[] realClientRmRet = this.client.getValue(key);
216+ if (realClientRmRet[0].equals("false")) {
217+ ret = new String[1];
218+ ret[0] = "false";
219+ return ret;
220+ }
221+ }
222+ deleteBufferedDataMap.put(key, new Integer(1));
223+ Object[] request = new Object[2];
224+
225+ request[0] = new Integer(4);
226+ request[1] = key;
227+ okuyamaRequestQueue.put(request);
228+
229+ ret = new String[2];
230+ ret[0] = "true";
231+ }
232+ } catch (Exception ee) {
233+ throw new OkuyamaClientException(ee);
234+ }
235+ return ret;
236+ }
237+
238+
239+ public boolean requestRemoveValue(String key) throws OkuyamaClientException {
240+
241+ try {
242+ synchronized(sendSyncObject[((key.hashCode() << 1) >>> 1) % parallel]) {
243+ putBufferedDataMap.remove(key);
244+ deleteBufferedDataMap.put(key, new Integer(1));
245+ Object[] request = new Object[2];
246+
247+ request[0] = new Integer(4);
248+ request[1] = key;
249+ okuyamaRequestQueue.put(request);
250+ }
251+ } catch (Exception ee) {
252+ throw new OkuyamaClientException(ee);
253+ }
254+ return true;
255+ }
256+
257+ public String[] responseRemoveValue(String key) throws OkuyamaClientException {
258+ String[] ret = null;
259+
260+ try {
261+ int i = 0;
262+ while (true) {
263+ i++;
264+ synchronized(sendSyncObject[((key.hashCode() << 1) >>> 1) % parallel]) {
265+ if (deleteBufferedDataMap.containsKey(key) == false) break;
266+ if ((i % 100) == 0) Thread.sleep(10);
267+ }
268+ }
269+ ret = new String[2];
270+ ret[0] = "true";
271+ } catch (Exception ee) {
272+ throw new OkuyamaClientException(ee);
273+ }
274+ return ret;
275+ }
276+
277+
278+ public Object[] readByteValue(String key) throws OkuyamaClientException {
279+ Object[] ret = null;
280+
281+ try {
282+ synchronized(sendSyncObject[((key.hashCode() << 1) >>> 1) % parallel]) {
283+ byte[] value = (byte[])putBufferedDataMap.get(key);
284+
285+ Object[] realClientRet = null;
286+
287+ if (value == null) {
288+ realClientRet = this.client.readByteValue(key);
289+ if (realClientRet != null && realClientRet[0].equals("true")) {
290+ value = (byte[])realClientRet[1];
291+ }
292+ }
293+
294+ if (value != null) {
295+ if (deleteBufferedDataMap.containsKey(key)) {
296+ value = null;
297+ }
298+ }
299+
300+ if (value == null) {
301+
302+ ret = new Object[1];
303+ ret[0] = "false";
304+ } else {
305+
306+ ret = new Object[2];
307+ ret[0] = "true";
308+ ret[1] = value;
309+ }
310+ }
311+ } catch (Exception ee) {
312+ throw new OkuyamaClientException(ee);
313+ }
314+ return ret;
315+ }
316+}
317+
318+class OkuyamaSendWorker extends Thread {
319+
320+
321+ public void run () {
322+
323+ OkuyamaClient client = null;
324+ Object[] requestData = null;
325+
326+
327+ while (true) {
328+
329+ try {
330+
331+ // リクエストがnullの場合だけQueueから取り出す。
332+ // 正常にokuyamaに伝播した場合、nullとするからである。Exception発生時はnull化されない
333+ if (requestData == null) {
334+ // [0] = type(1=put, 2=remove), [1]=DataType(1=byte, 2=String, 3=Object), [2]=Key, [3]=Value
335+
336+ while (true) {
337+ requestData = (Object[])BufferedOkuyamaClient.okuyamaRequestQueue.poll(1000, TimeUnit.MILLISECONDS);
338+
339+ if (requestData != null) break;
340+ if (OkuyamaFilesystem.jvmShutdownStatus == true) break;
341+ }
342+
343+ if (requestData == null && OkuyamaFilesystem.jvmShutdownStatus == true) break;
344+ }
345+
346+ client = BufferedOkuyamaClient.factory.getClient(300*1000);
347+
348+
349+ String key = (String)requestData[1];
350+
351+ synchronized(BufferedOkuyamaClient.sendSyncObject[((key.hashCode() << 1) >>> 1) % BufferedOkuyamaClient.parallel]) {
352+
353+ int method = ((Integer)requestData[0]).intValue();
354+
355+ switch (method) {
356+ case 1 :
357+ // setValueの処理
358+ String nowBufferedValueStr = (String)BufferedOkuyamaClient.putBufferedDataMap.get(key);
359+ String requestValueStr = (String)requestData[2];
360+
361+ // 現在バッファ中のObjectのアドレスと登録QueueのObjectのアドレスが同値の場合は
362+ // 登録する意味がある。異なる場合は後続のリクエストに上書きされているので、
363+ // いづれそちらが行われるので反映しても無駄となる。
364+ // 削除処理がこのJobの後にQueueに入った場合も、バッファが削除されているので、
365+ // 反映しても無駄である
366+ if (nowBufferedValueStr == requestValueStr) {
367+ if (client.setValue(key ,requestValueStr)) {
368+
369+ BufferedOkuyamaClient.putBufferedDataMap.remove(key);
370+ } else {
371+ client = null;
372+ client = BufferedOkuyamaClient.factory.getClient();
373+ if (client.setValue(key ,requestValueStr)) {
374+ BufferedOkuyamaClient.putBufferedDataMap.remove(key);
375+ } else {
376+ throw new Exception("setValue - error");
377+ }
378+ }
379+ }
380+
381+ break;
382+ case 2 :
383+
384+ // setObjectValueの処理
385+ Object nowBufferedValueObj = BufferedOkuyamaClient.putBufferedDataMap.get(key);
386+ Object requestValueObj = requestData[2];
387+
388+ // 現在バッファ中のObjectのアドレスと登録QueueのObjectのアドレスが同値の場合は
389+ // 登録する意味がある。異なる場合は後続のリクエストに上書きされているので、
390+ // いづれそちらが行われるので反映しても無駄となる。
391+ // 削除処理がこのJobの後にQueueに入った場合も、バッファが削除されているので、
392+ // 反映しても無駄である
393+ if (nowBufferedValueObj == requestValueObj) {
394+ if (client.setObjectValue(key ,requestValueObj)) {
395+
396+ BufferedOkuyamaClient.putBufferedDataMap.remove(key);
397+ } else {
398+ client = null;
399+ client = BufferedOkuyamaClient.factory.getClient();
400+ if (client.setObjectValue(key ,requestValueObj)) {
401+ BufferedOkuyamaClient.putBufferedDataMap.remove(key);
402+ } else {
403+ throw new Exception("setObjectValue - error");
404+ }
405+ }
406+ }
407+
408+
409+ break;
410+ case 3 :
411+
412+ // sendByteValueの処理
413+ byte[] nowBufferedValueBytes = (byte[])BufferedOkuyamaClient.putBufferedDataMap.get(key);
414+ byte[] requestValueBytes = (byte[])requestData[2];
415+
416+ // 現在バッファ中のObjectのアドレスと登録QueueのObjectのアドレスが同値の場合は
417+ // 登録する意味がある。異なる場合は後続のリクエストに上書きされているので、
418+ // いづれそちらが行われるので反映しても無駄となる。
419+ // 削除処理がこのJobの後にQueueに入った場合も、バッファが削除されているので、
420+ // 反映しても無駄である
421+ if (nowBufferedValueBytes == requestValueBytes) {
422+
423+
424+ if (client.sendByteValue(key ,requestValueBytes)) {
425+
426+ BufferedOkuyamaClient.putBufferedDataMap.remove(key);
427+ } else {
428+
429+ client = null;
430+ client = BufferedOkuyamaClient.factory.getClient();
431+ if (client.sendByteValue(key ,requestValueBytes)) {
432+ BufferedOkuyamaClient.putBufferedDataMap.remove(key);
433+ } else {
434+ throw new Exception("sendByteValue - error");
435+ }
436+ }
437+ }
438+
439+ break;
440+ case 4 :
441+
442+ // removeValueの処理
443+ // Removeは削除をokuyamaへ実行後、Removeのマーキングバッファから該当Keyを削除
444+ if (BufferedOkuyamaClient.deleteBufferedDataMap.containsKey(key)) {
445+
446+ String[] removeStr = client.removeValue(key);
447+ BufferedOkuyamaClient.deleteBufferedDataMap.remove(key);
448+ }
449+ break;
450+ }
451+ }
452+ } catch (Exception ee) {
453+ ee.printStackTrace();
454+ System.exit(1);
455+ } finally {
456+ try {
457+ client.close();
458+ } catch (Exception e2) {
459+ }
460+ requestData = null;
461+ }
462+ }
463+ }
464+}
\ No newline at end of file
--- trunk/okuyamaFuse/src/CoreMapFactory.java (revision 1003)
+++ trunk/okuyamaFuse/src/CoreMapFactory.java (revision 1004)
@@ -2,6 +2,9 @@
22 import java.util.*;
33 import java.util.concurrent.ConcurrentHashMap;
44
5+import okuyama.imdst.client.*;
6+import okuyama.imdst.util.*;
7+
58 /**
69 * OkuyamaFuse.<br>
710 *
@@ -28,6 +31,13 @@
2831 } else {
2932 // OkuyamaFs
3033 parameterMap.put("okuyamainfo", args);
34+
35+ try {
36+ BufferedOkuyamaClient.initClientMaster(OkuyamaClientFactory.getFactory(args, OkuyamaFsMapUtil.okuyamaClientPoolSize));
37+ } catch (Exception e) {
38+ e.printStackTrace();
39+ System.exit(1);
40+ }
3141 }
3242 }
3343
--- trunk/okuyamaFuse/src/OkuyamaFsMap.java (revision 1003)
+++ trunk/okuyamaFuse/src/OkuyamaFsMap.java (revision 1004)
@@ -109,7 +109,7 @@
109109 public OkuyamaClient createClient() {
110110 OkuyamaClient client = null;
111111 try {
112- client = factory.getClient(300*1000);
112+ client = new BufferedOkuyamaClient(factory.getClient(300*1000));
113113 } catch (Exception e) {
114114 e.printStackTrace();
115115 }
@@ -513,6 +513,36 @@
513513 try {
514514
515515 //List clientList = new ArrayList(20);
516+ OkuyamaClient client = createClient();
517+ for (int idx = 0; idx < keyList.length; idx++) {
518+
519+ if (keyList[idx] != null) {
520+
521+ Object key = keyList[idx];
522+
523+ String removeKey = type + "\t" + (String)key;
524+ dataCache.remove(removeKey);
525+ String[] removeRet = client.removeValue(removeKey);
526+ if(!removeRet[0].equals("true")) {
527+ ret = false;
528+ }
529+ }
530+ }
531+
532+ client.close();
533+ } catch (Exception e) {
534+ e.printStackTrace();
535+ }
536+ return ret;
537+ }
538+/*
539+ public boolean removeMulti(Object[] keyList) {
540+
541+ boolean ret = true;
542+
543+ try {
544+
545+ //List clientList = new ArrayList(20);
516546 Object[] clientList = new Object[keyList.length];
517547 for (int idx = 0; idx < keyList.length; idx++) {
518548
@@ -552,7 +582,7 @@
552582 }
553583 return ret;
554584 }
555-
585+*/
556586 public boolean removeExistObject(Object key) {
557587 OkuyamaClient client = createClient();
558588 try {
@@ -636,7 +666,7 @@
636666 break;
637667 }
638668
639- client = this.factory.getClient(300*1000);
669+ client = new BufferedOkuyamaClient(this.factory.getClient(300*1000));
640670 //long start = System.nanoTime();
641671 Object[] responseSet = client.readByteValue(key);
642672 //long end = System.nanoTime();
@@ -750,7 +780,7 @@
750780 }
751781 clientUseCount = 0;
752782 }
753- if (client == null) client = this.factory.getClient(300*1000);
783+ if (client == null) client = new BufferedOkuyamaClient(this.factory.getClient(300*1000));
754784
755785 clientUseCount++;
756786 //long start = System.nanoTime();
@@ -831,8 +861,8 @@
831861 this.myPool = myPool;
832862 this.nowPreFetchMarker = nowPreFetchMarker;
833863 try {
834- this.dataReadDaemon = new ResponseCheckDaemon[40];
835- for (int idx = 0; idx < 40; idx++) {
864+ this.dataReadDaemon = new ResponseCheckDaemon[10];
865+ for (int idx = 0; idx < 10; idx++) {
836866 this.dataReadDaemon[idx] = new ResponseCheckDaemon(this.factory);
837867 this.dataReadDaemon[idx].start();
838868 }
@@ -868,13 +898,13 @@
868898 this.nowPreFetchMarker.put(dataKey, 1);
869899
870900 long keyIdx = Long.parseLong(startKeyIndexSplit[2]);
871- for (int idx = 0; idx < 10; idx=idx+40) {
901+ for (int idx = 0; idx < 10; idx=idx+10) {
872902
873- String[] getKeys = new String[40];
874- List requestSendGrpIdxList = new ArrayList(40);
903+ String[] getKeys = new String[10];
904+ List requestSendGrpIdxList = new ArrayList(10);
875905 Map tmp = new HashMap();
876906
877- for (int grpIdx = 0; grpIdx < 40; grpIdx++) {
907+ for (int grpIdx = 0; grpIdx < 10; grpIdx++) {
878908 String preFetchRealKey = dataKey + "\t" + (keyIdx + 3 + idx + grpIdx);
879909 if (!this.storeCache.containsKey(preFetchRealKey)) {
880910
Show on old repository browser