• R/O
  • SSH
  • HTTPS

okuyama: 提交


Commit MetaInfo

修訂832 (tree)
時間2011-11-20 21:41:06
作者okuyamaoo

Log Message

FileBaseDataMap CompressMode Add

Change Summary

差異

--- trunk/src/okuyama/imdst/util/FileBaseDataMap.java (revision 831)
+++ trunk/src/okuyama/imdst/util/FileBaseDataMap.java (revision 832)
@@ -782,90 +782,87 @@
782782 int hashCode = ((Integer)instructionObj[2]).intValue();
783783 StringBuilder buf = new StringBuilder(this.lineDataSize);
784784 CacheContainer accessor = null;
785- RandomAccessFile raf = null;
785+
786786 BufferedWriter wr = null;
787787
788788 buf.append(this.fillCharacter(key, keyDataLength));
789789 buf.append(this.fillCharacter(value, oneDataLength));
790790
791- File file = this.dataFileList[hashCode % numberOfDataFiles];
792791
793- if (this.innerCache != null)
794- accessor = (CacheContainer)this.innerCache.get(file.getAbsolutePath());
795792
796-
797793 synchronized (this.dataFileList[hashCode % numberOfDataFiles]) {
798- if (accessor == null || accessor.isClosed == true) {
799794
800- raf = new RandomAccessFile(file, "rwd");
801- wr = new BufferedWriter(new FileWriter(file, true));
802- accessor = new CacheContainer();
803- accessor.raf = raf;
804- accessor.wr = wr;
805- accessor.file = file;
806- innerCache.put(file.getAbsolutePath(), accessor);
807- } else {
808-
809- raf = accessor.raf;
810- wr = accessor.wr;
795+ File compressFile = this.dataFileList[hashCode % numberOfDataFiles];
796+ byte[] compressData = null;
797+ StringBuilder decompressDataStr =null;
798+ byte[] decompressData = null;
799+ if (compressFile.exists()) {
800+ BufferedInputStream bis = new BufferedInputStream(new FileInputStream(compressFile));
801+ compressData = new byte[new Long(compressFile.length()).intValue()];
802+ bis.read(compressData);
803+ bis.close();
804+ decompressData = SystemUtil.dataDecompress(compressData);
811805 }
812806
813-
814807 // KeyData Write File
815- for (int tryIdx = 0; tryIdx < 2; tryIdx++) {
816- try {
808+ try {
817809
818810 // Key値の場所を特定する
819811 long start1 = System.nanoTime();
820- long[] dataLineNoRet = this.getLinePoint(key, raf);
812+ long[] dataLineNoRet = this.getLinePoint(key, decompressData);
821813 long end1 = System.nanoTime();
822814
823- if (dataLineNoRet[0] == -1) {
815+ if (dataLineNoRet[0] == -1) {
824816 long start2 = System.nanoTime();
825- wr.write(buf.toString());
826- SystemUtil.diskAccessSync(wr);
817+
818+ byte[] fixNewData = null;
819+ byte[] bufBytes = buf.toString().getBytes();
820+
821+ if (decompressData == null || decompressData.length < 1) {
822+ fixNewData = bufBytes;
823+ } else {
824+ fixNewData = new byte[bufBytes.length + decompressData.length];
825+ for (int cpIdx = 0; cpIdx < decompressData.length; cpIdx++) {
826+ fixNewData[cpIdx] = decompressData[cpIdx];
827+ }
828+
829+ int newCpIdx = decompressData.length;
830+ for (int cpBufIdx = 0; cpBufIdx < bufBytes.length; cpBufIdx++) {
831+ fixNewData[newCpIdx] = bufBytes[cpBufIdx];
832+ newCpIdx++;
833+ }
834+ }
835+
836+ decompressData = fixNewData;
827837 long end2 = System.nanoTime();
828- if (((end2 - start2) > (1000 * 1000 * 10)) || ((end1 - start1) > (1000 * 1000 * 10))) {
829- System.out.println("1=" + ((end1 - start1) / 1000 /1000) + " 2=" + ((end2 - start2) / 1000 /1000));
830- }
831- // The size of an increment
832- this.totalSize.getAndIncrement();
833- } else {
838+if (((end2 - start2) > (1000 * 1000 * 100)) || ((end1 - start1) > (1000 * 1000 * 100))) {
839+System.out.println("1=" + ((end1 - start1) / 1000 /1000) + " 2=" + ((end2 - start2) / 1000 /1000));
840+}
841+ // The size of an increment
842+ this.totalSize.getAndIncrement();
843+ } else {
834844
835- // 過去に存在したデータなら1増分
836- boolean increMentFlg = false;
837- if (dataLineNoRet[1] == -1) increMentFlg = true;
838- //if (this.get(key, hashCode) == null) increMentFlg = true;
839-
840- raf.seek(dataLineNoRet[0] * (lineDataSize));
841-
842- raf.write(buf.toString().getBytes(), 0, lineDataSize);
843-
844- if (increMentFlg) this.totalSize.getAndIncrement();
845+ // 過去に存在したデータなら1増分
846+ boolean increMentFlg = false;
847+ if (dataLineNoRet[1] == -1) increMentFlg = true;
848+ //if (this.get(key, hashCode) == null) increMentFlg = true;
849+ int insIdx = new Long((dataLineNoRet[0] * (lineDataSize))).intValue();
850+ byte[] insBytes = buf.toString().getBytes();
851+ for (int i = 0; i < lineDataSize; i++) {
852+
853+ decompressData[insIdx] = insBytes[i];
845854 }
846855
847- break;
848- } catch (IOException ie) {
849-
850- // IOExceptionの場合は1回のみファイルを再度開く
851- if (tryIdx == 1) throw ie;
852- try {
853-
854- if (raf != null) raf.close();
855- if (wr != null) wr.close();
856-
857- raf = new RandomAccessFile(file, "rwd");
858- wr = new BufferedWriter(new FileWriter(file, true));
859- accessor = new CacheContainer();
860- accessor.raf = raf;
861- accessor.wr = wr;
862- accessor.file = file;
863- innerCache.put(file.getAbsolutePath(), accessor);
864- } catch (Exception e) {
865- throw e;
866- }
856+ if (increMentFlg) this.totalSize.getAndIncrement();
867857 }
858+ } catch (IOException ie) {
868859 }
860+
861+ compressData = SystemUtil.dataCompress(decompressData);
862+ BufferedOutputStream compressBos = new BufferedOutputStream(new FileOutputStream(compressFile, false));
863+ compressBos.write(compressData);
864+ compressBos.flush();
865+ compressBos.close();
869866 }
870867
871868 // 削除処理の場合
@@ -908,7 +905,6 @@
908905 * @param hashCode This is a key value hash code
909906 */
910907 public void put(String key, String value, int hashCode) {
911-
912908 Object[] instructionObj = new Object[3];
913909 instructionObj[0] = key;
914910 instructionObj[1] = value;
@@ -929,6 +925,7 @@
929925 if (this.delayWriteQueue.size() > (delayWriteQueueSize - 2000)) Thread.sleep(10);
930926 if (this.delayWriteQueue.size() > (delayWriteQueueSize - 1000)) Thread.sleep(20);
931927
928+
932929 this.delayWriteQueue.put(instructionObj);
933930 //end2 = System.nanoTime();
934931 this.delayWriteRequestCount++;
@@ -945,14 +942,17 @@
945942
946943
947944 // 指定のキー値が指定のファイル内でどこにあるかを調べる
948- private long[] getLinePoint(String key, RandomAccessFile raf) throws Exception {
945+ private long[] getLinePoint(String key, byte[] targetData) throws Exception {
946+
949947 long[] ret = {-1, 0};
948+ if (targetData == null) return ret;
949+
950950 long line = -1;
951951 long lineCount = 0L;
952952
953953 byte[] keyBytes = key.getBytes();
954954 byte[] equalKeyBytes = new byte[keyBytes.length + 1];
955- byte[] lineBufs = new byte[this.getDataSize];
955+ byte[] lineBufs = null;
956956 boolean matchFlg = true;
957957
958958 // マッチング用配列作成
@@ -964,46 +964,38 @@
964964
965965 try {
966966
967- raf.seek(0);
968- int readLen = -1;
969- while((readLen = SystemUtil.diskAccessSync(raf, lineBufs)) != -1) {
967+ int readLen = targetData.length;
968+ lineBufs = targetData;
969+ int loop = readLen / lineDataSize;
970970
971- matchFlg = true;
971+ for (int loopIdx = 0; loopIdx < loop; loopIdx++) {
972972
973- int loop = readLen / lineDataSize;
973+ int assist = (lineDataSize * loopIdx);
974974
975- for (int loopIdx = 0; loopIdx < loop; loopIdx++) {
976-
977- int assist = (lineDataSize * loopIdx);
978-
979- matchFlg = true;
980- if (equalKeyBytes[equalKeyBytes.length - 1] == lineBufs[assist + (equalKeyBytes.length - 1)]) {
981- for (int i = 0; i < equalKeyBytes.length; i++) {
982- if (equalKeyBytes[i] != lineBufs[assist + i]) {
983- matchFlg = false;
984- break;
985- }
975+ matchFlg = true;
976+ if (equalKeyBytes[equalKeyBytes.length - 1] == lineBufs[assist + (equalKeyBytes.length - 1)]) {
977+ for (int i = 0; i < equalKeyBytes.length; i++) {
978+ if (equalKeyBytes[i] != lineBufs[assist + i]) {
979+ matchFlg = false;
980+ break;
986981 }
987- } else {
988- matchFlg = false;
989982 }
983+ } else {
984+ matchFlg = false;
985+ }
990986
991- // マッチした場合のみ返す
992- if (matchFlg) {
987+ // マッチした場合のみ返す
988+ if (matchFlg) {
993989
994- line = lineCount;
995- // 削除データか確かめる
996- if (lineBufs[assist + keyDataLength] == FileBaseDataMap.paddingSymbol) ret[1] = -1;
997- break;
998- }
990+ line = lineCount;
991+ // 削除データか確かめる
992+ if (lineBufs[assist + keyDataLength] == FileBaseDataMap.paddingSymbol) ret[1] = -1;
993+ break;
994+ }
999995
1000- lineCount++;
1001- }
1002- if (matchFlg) break;
996+ lineCount++;
1003997 }
1004998
1005- } catch (IOException ie) {
1006- throw ie;
1007999 } catch (Exception e) {
10081000 throw e;
10091001 }
@@ -1048,7 +1040,7 @@
10481040 String ret = null;
10491041 byte[] keyBytes = key.getBytes();
10501042 byte[] equalKeyBytes = new byte[keyBytes.length + 1];
1051- byte[] lineBufs = new byte[this.getDataSize];
1043+ byte[] lineBufs = null;
10521044 boolean matchFlg = true;
10531045
10541046 // マッチング用配列作成
@@ -1061,101 +1053,60 @@
10611053 //end1 = System.nanoTime();
10621054 try {
10631055 //start2 = System.nanoTime();
1064- File file = this.dataFileList[hashCode % numberOfDataFiles];
10651056 CacheContainer accessor = null;
1066- RandomAccessFile raf = null;
1067- BufferedWriter wr = null;
10681057
1069- if (innerCache != null)
1070- accessor = (CacheContainer)this.innerCache.get(file.getAbsolutePath());
1071-
10721058 synchronized (this.dataFileList[hashCode % numberOfDataFiles]) {
1073- if (accessor == null || accessor.isClosed) {
1074-
1075- raf = new RandomAccessFile(file, "rwd");
1076- wr = new BufferedWriter(new FileWriter(file, true));
1077- accessor = new CacheContainer();
1078- accessor.raf = raf;
1079- accessor.wr = wr;
1080- accessor.file = file;
1081- innerCache.put(file.getAbsolutePath(), accessor);
1082- } else {
1083-
1084- raf = accessor.raf;
1059+
1060+ File compressFile = this.dataFileList[hashCode % numberOfDataFiles];
1061+ byte[] compressData = null;
1062+ byte[] decompressData = null;
1063+ if (compressFile.exists()) {
1064+ BufferedInputStream bis = new BufferedInputStream(new FileInputStream(compressFile));
1065+ compressData = new byte[new Long(compressFile.length()).intValue()];
1066+ bis.read(compressData);
1067+ bis.close();
1068+ decompressData = SystemUtil.dataDecompress(compressData);
1069+ lineBufs = decompressData;
10851070 }
1086-//end2 = System.nanoTime();
1087-//start3 = System.nanoTime();
1088- for (int tryIdx = 0; tryIdx < 2; tryIdx++) {
1071+ int readLen = 0;
1072+ if (decompressData != null && decompressData.length > 0) {
1073+ readLen = decompressData.length;
1074+ }
1075+
1076+ matchFlg = true;
10891077
1090- try {
1091-//start4 = System.nanoTime();
1092- raf.seek(0);
1093-//end4 = System.nanoTime();
1094- int readLen = -1;
1095-//start5 = System.nanoTime();
1096- while((readLen = SystemUtil.diskAccessSync(raf, lineBufs)) != -1) {
1097-//end5 = System.nanoTime();
1098-//timeList.add((end5 - start5));
1099- matchFlg = true;
1078+ int loop = readLen / lineDataSize;
11001079
1101- int loop = readLen / lineDataSize;
1080+ for (int loopIdx = 0; loopIdx < loop; loopIdx++) {
11021081
1103- for (int loopIdx = 0; loopIdx < loop; loopIdx++) {
1082+ int assist = (lineDataSize * loopIdx);
11041083
1105- int assist = (lineDataSize * loopIdx);
1084+ matchFlg = true;
11061085
1107- matchFlg = true;
1086+ if (equalKeyBytes[equalKeyBytes.length - 1] == lineBufs[assist + (equalKeyBytes.length - 1)]) {
11081087
1109- if (equalKeyBytes[equalKeyBytes.length - 1] == lineBufs[assist + (equalKeyBytes.length - 1)]) {
1088+ for (int i = 0; i < equalKeyBytes.length; i++) {
11101089
1111- for (int i = 0; i < equalKeyBytes.length; i++) {
1090+ if (equalKeyBytes[i] != lineBufs[assist + i]) {
1091+ matchFlg = false;
1092+ break;
1093+ }
1094+ }
1095+ } else {
11121096
1113- if (equalKeyBytes[i] != lineBufs[assist + i]) {
1114- matchFlg = false;
1115- break;
1116- }
1117- }
1118- } else {
1097+ matchFlg = false;
1098+ }
11191099
1120- matchFlg = false;
1121- }
1100+ // マッチした場合のみ配列化
1101+ if (matchFlg) {
11221102
1123- // マッチした場合のみ配列化
1124- if (matchFlg) {
1103+ tmpBytes = new byte[lineDataSize];
11251104
1126- tmpBytes = new byte[lineDataSize];
1105+ for (int i = 0; i < lineDataSize; i++) {
11271106
1128- for (int i = 0; i < lineDataSize; i++) {
1129-
1130- tmpBytes[i] = lineBufs[assist + i];
1131- }
1132- break;
1133- }
1134- }
1135-//start5 = System.nanoTime();
1136- if (matchFlg) break;
1107+ tmpBytes[i] = lineBufs[assist + i];
11371108 }
11381109 break;
1139- } catch (IOException ie) {
1140-
1141- // IOExceptionの場合は1回のみファイルをサイド開く
1142- if (tryIdx == 1) throw ie;
1143-
1144- try {
1145-
1146- if (raf != null) raf.close();
1147- if (wr != null) wr.close();
1148-
1149- raf = new RandomAccessFile(file, "rwd");
1150- wr = new BufferedWriter(new FileWriter(file, true));
1151- accessor = new CacheContainer();
1152- accessor.raf = raf;
1153- accessor.wr = wr;
1154- accessor.file = file;
1155- innerCache.put(file.getAbsolutePath(), accessor);
1156- } catch (Exception e) {
1157- throw e;
1158- }
11591110 }
11601111 }
11611112 }
@@ -1275,19 +1226,31 @@
12751226 List keys = null;
12761227 byte[] datas = null;
12771228 StringBuilder keysBuf = null;
1278- RandomAccessFile raf = null;
1229+
12791230
12801231 try {
12811232 if (this.nowIterationFileIndex < this.dataFileList.length) {
12821233
12831234 keys = new ArrayList();
1284- datas = new byte[new Long(this.dataFileList[this.nowIterationFileIndex].length()).intValue()];
1235+
1236+ File compressFile = this.dataFileList[this.nowIterationFileIndex];
1237+ byte[] compressData = null;
1238+ byte[] decompressData = null;
1239+ if (compressFile.exists()) {
1240+ BufferedInputStream bis = new BufferedInputStream(new FileInputStream(compressFile));
1241+ compressData = new byte[new Long(compressFile.length()).intValue()];
1242+ bis.read(compressData);
1243+ bis.close();
1244+ decompressData = SystemUtil.dataDecompress(compressData);
1245+ }
12851246
1286- raf = new RandomAccessFile(this.dataFileList[this.nowIterationFileIndex], "rwd");
1247+
1248+ datas = decompressData;
12871249
1288- raf.seek(0);
12891250 int readLen = -1;
1290- readLen = SystemUtil.diskAccessSync(raf, datas);
1251+ if (decompressData != null && decompressData.length > 0) {
1252+ readLen = decompressData.length;
1253+ }
12911254
12921255 if (readLen > 0) {
12931256
@@ -1321,9 +1284,7 @@
13211284 } finally {
13221285
13231286 try {
1324- if(raf != null) raf.close();
13251287
1326- raf = null;
13271288 datas = null;
13281289 } catch (Exception e2) {
13291290 e2.printStackTrace();
Show on old repository browser