o2on svn commit
o2on-****@lists*****
2009年 8月 8日 (土) 04:13:53 JST
Revision: 142 http://sourceforge.jp/projects/o2on/svn/view?view=rev&revision=142 Author: nawota Date: 2009-08-08 04:13:53 +0900 (Sat, 08 Aug 2009) Log Message: ----------- add opy2on files. Added Paths: ----------- trunk/opy2on/ trunk/opy2on/README trunk/opy2on/lib/ trunk/opy2on/lib/o2on_const.py trunk/opy2on/lib/o2on_dat.py trunk/opy2on/lib/o2on_im.py trunk/opy2on/lib/o2on_job.py trunk/opy2on/lib/o2on_key.py trunk/opy2on/lib/o2on_node.py trunk/opy2on/lib/o2on_profile.py trunk/opy2on/lib/o2on_server.py trunk/opy2on/lib/o2on_util.py trunk/opy2on/o2on_config.py.sample trunk/opy2on/opy2on.py Property changes on: trunk/opy2on ___________________________________________________________________ Added: svn:ignore + db o2on_config.py profile util dat error-*.txt Added: trunk/opy2on/README =================================================================== --- trunk/opy2on/README (rev 0) +++ trunk/opy2on/README 2009-08-07 19:13:53 UTC (rev 142) @@ -0,0 +1,110 @@ +これはなに? + +o2on(http://o2on.sourceforge.jp/) の Python での(不完全な)クローン +opy2on (http://pc12.2ch.net/test/read.cgi/tech/1231570128/ 356さん命名) + +※ 本家 o2on の動作と完全に一致するわけではないです。詳しくは「特徴」の + 部分を読んでください。 + +必要なもの + +- python 2.6 +- pycrypto +- D-Bus を使うならば python-dbus + +起動 + +- o2on_config.py.sample を見ながら好みにあわせて修正し、 o2on_config.py + を作る。 + - debug に協力してくださる方は、 OutputErrorFile を True にしてくださ + い。エラー時に大変役に立ちます。 + - Xが動いている時に起動するつもりの方は、 UseDBus = True の時の動きを + 使用WindowManagerとともに教えてください。 + - 重いぞ!という方は RecordProfile を True にしてください。ProfileDir + 下にプロファイルが出力されます。 +- ポートを開けたり閉めたりする。 +- ./opy2on で起動。 + +操作 + +- 以下のコマンドを認識します。 + - datq 検索中のdat一覧表示 + - exit プログラムの終了 + - exportnode ノードをexportnodes.xmlに出力 + - help ヘルプを表示 + - keys datキーを表示 + - PAGER を設定しておいてください + - myid 自ノードのIDを表示 + - mynode 自ノードのハッシュを表示 + - mypubkey 自ノードの公開鍵を表示 + - nodes ノード情報を表示 + - stat ノード数、検索中dat数、所有dat数、datキー数を表示 +- 全ての表示はUTF-8で出力されます。 +- adminサーバも動いています。 + - http://<admin サーバ>/shutdown/really でも終了します。 + +特徴 (というかほとんど直すべきとこの列挙) + +- dat を gzip で保存可能 +- Ping をしなおす時間を設定できる +- プロキシに dat をリクエストした時の反応が違う -> 下記参照 +- ping にたいして localなIPを返してくるノードがいたのでそのIPは無視している +- ノードリストが0になると Web などのノードリストから再読みこみします。 +- D-Bus を使ってポップアップします。 +- 処理されないエラーがおきると自動的に終了します。 + +- ●は対応してないです +- プロキシで range が指定されてもキャッシュから読んだ時は全体で返答 + - 複数台からプロキシに接続することをかんがえてないような気がします…。 +- オリジナルの url を記憶していないので dat をあげる時にはいつも + DAT-Path で送信します +- フラグを解釈していないです +- 大量な dat を持った環境でのテストができていない (メモリ食いすぎるかも) + - sqlite化とかするべきなのだろうな… +- GUI が動いてないです。 +- saku キーに対応してないです +- ノード名に日本語をうまく使えないです + - 一応エラーが出ないようにはなったけれど POST した時に日本語部分が消えます +- UPnP は対応してないです +- IPフィルタがないです + - IPフィルタとかは iptables なんかを使えばいいと思ってます。 + - 帯域制限も iproute とか使えばいいと思います。 + +対応状況 + +- P2P client + - dat o + - collection o + - im o + - broadcast x + - profile x + - ping o + - store o + - findnode o + - findvalue o + +- P2P server + - dat o + - collection o + - im o + - broadcast x + - profile x + - ping o + - store o + - findnode o + - findvalue o + +proxy server キャッシュへの保存の方法 +- 持ってない dat 全体が取得された -> キャッシュに保存 +- 持ってる dat の差分が取得された -> キャッシュに追記 +- 持ってない dat がリクエストされた + - RequestNonExistDat が True の時はこっそり取得 + - こっそり取得するから巡回してるとこなら dat をインポートしなくてもいいかも + +FIXME + +- HTTPヘッダは大文字・小文字区別しない -> 本家 o2on が区別してるから困る… + - とりあえずうけつける分だけでも区別しないようにしよう。 +- リロードバーボンにならないように 2chへの接続は適度に遅延をつけよう。 (navi2ch を参考に) +- エンコードまわりうまくいってるのだろうか…? +- CPU食いすぎな気がする Added: trunk/opy2on/lib/o2on_const.py =================================================================== --- trunk/opy2on/lib/o2on_const.py (rev 0) +++ trunk/opy2on/lib/o2on_const.py 2009-08-07 19:13:53 UTC (rev 142) @@ -0,0 +1,21 @@ +#!/usr/bin/python +import os.path + +regHosts = r'(2ch\.net|bbspink\.com|machibbs\.com|machi\.to)' + +DBDir = "db" +DatQueryFile = os.path.join(DBDir, "datq.pkl") +KeyQueryFile = os.path.join(DBDir, "keyq.pkl") +DatDBFile = os.path.join(DBDir, "datdb.pkl") +KeyDBFile = os.path.join(DBDir, "keydb.pkl") +NodeDBFile = os.path.join(DBDir, "nodedb.pkl") +ProfileFile = os.path.join(DBDir, "profile.pkl") +IMDBFile = os.path.join(DBDir, "imdb.pkl") + +ProtocolName = "O2" +ProtocolVer = 0.2 +AppName = "opy2on" +AppMajorVer = 0 +AppMinorVer = 0 +AppBuildNo = 1 + Added: trunk/opy2on/lib/o2on_dat.py =================================================================== --- trunk/opy2on/lib/o2on_dat.py (rev 0) +++ trunk/opy2on/lib/o2on_dat.py 2009-08-07 19:13:53 UTC (rev 142) @@ -0,0 +1,237 @@ +#!/usr/bin/python + +import threading +import cPickle +import os.path +import re +import gzip +import random +import time + +from o2on_const import regHosts, DatQueryFile, DatDBFile +import o2on_config +import o2on_util + +class Dat: + def __init__(self, path=None): + self.reset() + self.published = 0 + if path: self.setpath(path) + def __cmp__(self,x): + return cmp(self.hash(), x.hash()) + def reset(self): + self.domain = None + self.board = None + self.datnum = None + def setpath(self,path): + self.reset() + regDatPath = re.compile(r'^'+regHosts+'/([^/]+)/(\d+)$') + regDatUrl = re.compile(r'^http://[^.]+\.'+regHosts+'/test/read\.cgi/([^/]+)/(\d+)/$') + m = regDatPath.match(path) + if not m: m = regDatUrl.match(path) + if not m: return False + d = m.group(1) + if d == 'machi.to': d = 'machibbs.com' + self.domain = d + self.board = m.group(2) + self.datnum = m.group(3) + return True + def path(self): + if self.valid(): + return "/".join((self.domain, self.board, self.datnum)) + def fullboard(self): + if self.domain and self.board: + return self.domain+":"+self.board + raise Exception + def valid(self): + return ((self.domain and self.board and self.datnum) != None) + def datpath(self): + if self.valid(): + return os.path.join(o2on_config.DatDir, self.domain, self.board, + self.datnum[:4],self.datnum+".dat") + raise Exception + def data(self): + dp = self.datpath() + if os.path.exists(dp): f=open(dp) + elif os.path.exists(dp+".gz"): f=gzip.GzipFile(dp+".gz") + else: f=None + if f: + res=f.read() + f.close() + return res + return None + def hash(self): + return o2on_util.datkeyhash(self.path()) + def save(self,data, start=0): + if not self.valid(): return + bl = o2on_config.DatCollectionBoardList + if bl != None: + if not self.fullboard() in bl: return False + if(len(data)<2): return False + if(data[-1] != "\n"): return False + if(data[-2] == "\r"): return False + if start == 0: + m = re.compile(r'^([^\n]*)\n').match(data) + first = m.group(1) + if not re.compile(r'^.*<>.*<>.*<>.*<>.*$').match(first): return False + output = self.datpath() + if os.path.exists(output): + if o2on_config.DatSaveAsGzip: + f = open(output,'rb') + g = gzip.GzipFile(output+".gz",'w') + g.write(f.read()) + g.close() + f.close() + os.remove(output) + else: + if(os.path.getsize(output)<start): return True + if(os.path.getsize(output)==start): + f=open(output,'ab') + f.write(data) + f.close() + return True + f=open(output,'rb') + saved = f.read() + f.close() + if saved[start:] == data[:(len(saved)-start)]: + l = start + len(data) - len(saved) + if l >0: + f=open(output,'ab') + f.write(data[(len(saved)-start):]) + f.close() + return True + return True + if os.path.exists(output+".gz"): + f=gzip.GzipFile(output+".gz",'rb') + saved = f.read() + f.close() + if(len(saved)<start): return True + if(len(saved)==start): + f=gzip.GzipFile(output+".gz",'ab') + f.write(data) + f.close() + return True + if saved[start:] == data[:(len(saved)-start)]: + l = start + len(data) - len(saved) + if l >0: + f=gzip.GzipFile(output+".gz",'ab') + f.write(data[(len(saved)-start):]) + f.close() + return True + return True + else: + if start>0: return True + if not os.path.exists(os.path.dirname(output)): + os.makedirs(os.path.dirname(output)) + if o2on_config.DatSaveAsGzip: + f = gzip.GzipFile(output+".gz",'w') + else: + f = open(output, 'wb') + f.write(data) + f.close() + return True + +class DatDB: + def __init__(self,g): + self.glob = g + self.lock = threading.Lock() + with self.lock: + self.hashmap = {} + self.boardmap = {} + self.publishmap = {} + self.load() + if len(self.hashmap) == 0: + g.logger.log("DATDB", "Generating DatDB") + self.generate() + self.save() + g.logger.log("DATDB","Generated DatDB") + def __len__(self): + with self.lock: + return len(self.hashmap) + def getRandomInBoard(self,board): + if board in self.boardmap: + h = random.choice(self.boardmap[board]) + return self.hashmap[h] + return None + def choice(self): + return self.hashmap[random.choice(self.hashmap.keys())] + def get(self,x): + with self.lock: + return self.hashmap.get(x) + def has_key(self,key): + with self.lock: + return o2on_util.datkeyhash(key) in self.hashmap + def add_dat(self, dat): + with self.lock: + befdat = self.hashmap.get(dat.hash()) + self.hashmap[dat.hash()] = dat + if not dat.fullboard() in self.boardmap: + self.boardmap[dat.fullboard()] = [] + self.boardmap[dat.fullboard()].append(dat.hash()) + if not befdat: + dat.published = int(time.time()) + if dat.published not in self.publishmap: + self.publishmap[dat.published]=[] + self.publishmap[dat.published].append(dat.hash()) + else: + dat.published = befdat.published + def add(self, path, data, start=0): + dat = Dat(path) + if dat.save(data, start): self.add_dat(dat) + def published(self, datid, publish_time): + if len(datid) != 20: raise Exception + with self.lock: + if datid not in self.hashmap: raise Exception + dat = self.hashmap[datid] + self.publishmap[dat.published].remove(datid) + dat.published = publish_time + if publish_time not in self.publishmap: self.publishmap[publish_time]=[] + self.publishmap[publish_time].append(datid) + def dat_to_publish(self, last_published_before, limit): + res = [] + if limit == 0: return res + for x in sorted(self.publishmap.keys()): + for y in self.publishmap[x]: + res.append(self.hashmap[y]) + limit -= 1 + if limit == 0: return res + return res + def generate(self): + regdat = re.compile('^(\d+)\.dat(?:\.gz)?$') + sep = re.escape(os.sep) + regdatdir = re.compile(regHosts+sep+'(.+)'+sep+'\d{4}$') + with self.lock: + self.hashmap = {} + self.boardmap = {} + self.publishmap = {0:[]} + for root, dirs, files in os.walk(o2on_config.DatDir): + for f in files: + m1 = regdat.match(f) + if not m1: continue + m2 = regdatdir.search(root) + if not m2: continue + path = m2.group(1)+"/"+m2.group(2)+"/"+m1.group(1) + d = Dat(path) + with self.lock: + self.hashmap[d.hash()] = d + if not d.fullboard() in self.boardmap: + self.boardmap[d.fullboard()] = [] + self.boardmap[d.fullboard()].append(d.hash()) + self.publishmap[0].append(d.hash()) + self.glob.logger.log("DATDB", "added %s" % path) + def load(self): + if(os.path.isfile(DatDBFile)): + pkl_file = open(DatDBFile,"rb") + with self.lock: + self.hashmap = cPickle.load(pkl_file) + self.boardmap = cPickle.load(pkl_file) + self.publishmap = cPickle.load(pkl_file) + pkl_file.close() + def save(self): + pkl_file = open(DatDBFile,"wb") + with self.lock: + cPickle.dump(self.hashmap, pkl_file,-1) + cPickle.dump(self.boardmap, pkl_file,-1) + cPickle.dump(self.publishmap, pkl_file,-1) + pkl_file.close() + Added: trunk/opy2on/lib/o2on_im.py =================================================================== --- trunk/opy2on/lib/o2on_im.py (rev 0) +++ trunk/opy2on/lib/o2on_im.py 2009-08-07 19:13:53 UTC (rev 142) @@ -0,0 +1,81 @@ +#!/usr/bin/python + +import threading +import os.path +import cPickle +import datetime + +from o2on_const import IMDBFile +from o2on_node import e2ip, ip2e +from binascii import hexlify, unhexlify + +class IMessage: + def __init__(self): + self.ip = None + self.port = None + self.id = None + self.pubkey = None + self.name = "" + self.date = 0 + self.msg = None + self.key = None + self.mine = False + self.paths = [] + self.broadcast = False + def __eq__(self, x): + if self.broadcast != x.broadcast: return False + if self.broadcast: + return self.ip == x.ip and self.port == x.port and self.msg == x.msg + else: + return self.key == x.key + def from_node(self, n): + self.ip = n.ip + self.port = n.port + self.id = n.id + self.pubkey = n.pubkey + self.name = n.name + def from_xml_node(self, xml_node): + res = {} + for c in ('ip','port','id','pubkey','name','msg',): + try: res[c] = xml_node.getElementsByTagName(c)[0].childNodes[0].data + except IndexError: continue + if res.get('ip'): self.ip = e2ip(res.get('ip')) + if res.get('port'): self.port = int(res.get('port')) + if res.get('id'): self.id = unhexlify(res.get('id')) + if res.get('pubkey'): self.pubkey = unhexlify(res.get('pubkey')) + if res.get('name'): self.name = res.get('name').encode('utf-8') + if res.get('msg'): self.msg = res.get('msg').encode('utf-8') + +class IMDB: + def __init__(self, g): + self.lock = threading.Lock() + with self.lock: + self.ims = [] + self.glob = g + self.load() + def im_list(self): + res = [] + with self.lock: + for i in self.ims: + x = (i.mine, str(datetime.datetime.fromtimestamp(int(i.date))), + hexlify(i.id), ip2e(i.ip), i.port, i.name.decode('utf-8'), + i.msg.decode('utf-8')) + res.append(x) + return res + def add(self,im): + with self.lock: + self.ims.append(im) + def save(self): + self.expire() + f = open(IMDBFile,'wb') + with self.lock: + cPickle.dump(self.ims, f, -1) + f.close() + def load(self): + if os.path.isfile(IMDBFile): + f = open(IMDBFile,'rb') + with self.lock: + self.ims = cPickle.load(f) + f.close() + def expire(self): + pass Added: trunk/opy2on/lib/o2on_job.py =================================================================== --- trunk/opy2on/lib/o2on_job.py (rev 0) +++ trunk/opy2on/lib/o2on_job.py 2009-08-07 19:13:53 UTC (rev 142) @@ -0,0 +1,416 @@ +#!/usr/bin/python +# -*- coding: utf-8 + +from binascii import hexlify, unhexlify +import threading +import time +import socket +import BaseHTTPServer +import gzip +import random +import re +import time +import socket +import os +import httplib +import traceback +import sys + +import o2on_server +import o2on_config +import o2on_util +import o2on_node +import o2on_key + +if o2on_config.RecordProfile: + import cProfile + +class JobThread(threading.Thread): + def __init__(self, name, s, g): + threading.Thread.__init__(self) + self.finish = False + self.awake = False + self.name = name + self.glob = g + self.sec = s + def stop(self): + self.finish = True + def wakeup(self): + self.awake = True + def run(self): + try: + if o2on_config.RecordProfile: + if not os.path.exists(os.path.dirname(o2on_config.ProfileDir)): + os.makedirs(os.path.dirname(o2on_config.ProfileDir)) + profname = os.path.join(o2on_config.ProfileDir, + "o2on_"+"_".join(self.name.split(" "))+".prof") + cProfile.runctx('self.dummy()', None, {'self':self,}, profname) + else: self.dummy() + except Exception,inst: + if o2on_config.OutputErrorFile: + f = open('error-'+str(int(time.time()))+'.txt', 'w') + traceback.print_exc(file=f) + f.close() + self.glob.logger.popup("ERROR", str(inst)) + self.glob.shutdown.set() + def dummy(self): + self.glob.logger.log("JOBMANAGER", "job %s started" % self.name) + while not self.finish: + #t = time.time() + self.dojob(self.glob.nodedb, self.glob.logger, self.glob.prof, self.glob.datdb, + self.glob.datquery) + if self.finish: break + diff = int(self.sec) #int(self.sec - (time.time()-t)) + if 0<diff: + #self.glob.logger.log("job %s sleep for %d secs" % (self.name,diff)) + self.awake = False + for x in range(0,diff): + if self.finish or self.awake: break + time.sleep(1) + self.glob.logger.log("JOBMANAGER", "job %s finished" % self.name) + def dojob(self, nodes, logger, prof, datdb): + pass + +class NodeCollectorThread(JobThread): + def __init__(self, g): + JobThread.__init__(self,"findnode",180,g) + def dojob(self, nodes, logger, prof, datdb, datq): + if len(nodes) == 0: + if len(o2on_config.Node_List_URLs)>0: + logger.popup("NODECOLLECTOR","load node from web") + nodes.node_from_web() + target = o2on_util.randomid() + for x in nodes.neighbors_nodes(target, False): + if self.finish: break + logger.log("NODECOLLECTOR", "findnode to %s" % (hexlify(x.id))) + try: + newnodes = x.findnode(target) + except o2on_node.NodeRemovable: + nodes.remove(x) + nodes.save() + self.glob.keydb.remove_bynodeid(x.id) + self.glob.keydb.save() + except o2on_node.NodeRefused: + pass + except socket.error, inst: + self.glob.logger.log("NODECOLLECTOR", inst) + else: + for n in newnodes: + logger.log("NODECOLLECTOR","\tadd node %s" % (hexlify(n.id))) + nodes.add_node(n) + nodes.add_node(x) + nodes.save() + time.sleep(1) + if len(nodes)<30: self.sec = 30 + else: self.sec = 180 + +class DatCollectorThread(JobThread): + def __init__(self, g): + JobThread.__init__(self,"dat collector",60,g) + def dojob(self, nodes, logger, prof, datdb, datq): + board = nodes.get_random_board() + if not board: return + for n in nodes.get_nodes_for_board(board): + if self.finish: break + logger.log("DATCOLLECTOR","dat (%s) to %s" % (board,hexlify(n.id))) + try: + dat = n.dat(None, board, datdb) + except o2on_node.NodeRemovable: + nodes.remove(n) + nodes.save() + self.glob.keydb.remove_bynodeid(n.id) + self.glob.keydb.save() + except o2on_node.NodeRefused: + pass + except socket.error, inst: + self.glob.logger.log("DATCOLLECTOR", inst) + else: + if dat: + logger.log("DATCOLLECTOR","\tGot dat %s" % dat.path()) + nodes.add_node(n) + datdb.add_dat(dat) + nodes.save() + datdb.save() + break + time.sleep(1) + +class GetIPThread(JobThread): + def __init__(self, g): + JobThread.__init__(self,"get global IP",60,g) + def dojob(self, nodes, logger, prof, datdb, datq): + regLocalIP = re.compile(r'^(?:10\.|192\.168\.|172\.(?:1[6-9]|2[0-9]|3[01])\.)') + for n in nodes.neighbors_nodes(prof.mynode.id, False, 100): + if self.finish: break + logger.log("GETIP","getIP to %s" % hexlify(n.id)) + try: + r = n.ping(True) + except o2on_node.NodeRemovable: + nodes.remove(n) + nodes.save() + self.glob.keydb.remove_bynodeid(n.id) + self.glob.keydb.save() + except o2on_node.NodeRefused: + pass + except socket.error, inst: + self.glob.logger.log("GETIP", inst) + else: + if r: + if not prof.mynode.ip: + ip = o2on_node.e2ip(r[:8]) + if not regLocalIP.match(ip): + prof.mynode.ip = ip + self.finish = True + logger.popup("GETIP","Got Global IP %s" % ip) + nodes.add_node(n) + break + else: break + +class AskNodeCollectionThread(JobThread): + def __init__(self, g): + JobThread.__init__(self,"ask node collection",60,g) + def dojob(self, nodedb, logger, prof, datdb, datq): + for n in nodedb.neighbors_nodes(o2on_util.randomid(), False): + if self.finish: break + logger.log("ASKNODECOLLECTION", "node collection to %s" % (hexlify(n.id))) + try: + colboards = n.collection(self.glob) + except o2on_node.NodeRemovable: + nodedb.remove(n) + nodedb.save() + self.glob.keydb.remove_bynodeid(n.id) + self.glob.keydb.save() + except o2on_node.NodeRefused: + pass + except socket.error, inst: + logger.log("ASKNODECOLLECTION", inst) + else: + logger.log("ASKNODECOLLECTION", + "\tadd collection for %s" % (hexlify(n.id))) + nodedb.reset_collection_for_node(n) + for b in colboards: + nodedb.add_collection(b,n) + nodedb.add_node(n) + nodedb.save() + +class PublishOrigThread(JobThread): + def __init__(self, g): + JobThread.__init__(self,"publish original",60,g) + def dojob(self, nodedb, logger, prof, datdb, datq): + mynode = prof.mynode + if not mynode.ip or mynode.port == 0: return + if len(nodedb)<10: return + dats = datdb.dat_to_publish(time.time()-(3*60*60), 500) + keys = [] + for d in dats: + if self.finish: return + k = o2on_key.Key() + k.from_dat(d) + k.from_node(prof.mynode) + keys.append(k) + publish_nodes = {} + for k in keys: + if self.finish: return + for n in nodedb.neighbors_nodes(k.hash, False): + if self.finish: return + if n.id not in publish_nodes: publish_nodes[n.id] = [] + publish_nodes[n.id].append(k) + for n in publish_nodes: + if self.finish: return + node = nodedb[n] + if not node: continue + logger.log("PUBLISHORIGINAL","publish original to %s" % (hexlify(n))) + try: + node.store("dat", publish_nodes[n]) + except o2on_node.NodeRemovable: + nodedb.remove(node) + nodedb.save() + self.glob.keydb.remove_bynodeid(node.id) + self.glob.keydb.save() + except o2on_node.NodeRefused: + pass + except socket.error, inst: + logger.log("PUBLISHORIGINAL", inst) + else: + nodedb.add_node(node) + pubtime = int(time.time()) + for k in publish_nodes[n]: + datdb.published(k.hash,pubtime) + self.glob.keydb.save() + datdb.save() + logger.log("PUBLISHORIGINAL","\tpublished original") + nodedb.save() + time.sleep(1) + +class PublishKeyThread(JobThread): + def __init__(self, g): + JobThread.__init__(self,"publish key",120,g) + def dojob(self, nodedb, logger, prof, datdb, datq): + mynode = prof.mynode + if not mynode.ip or mynode.port == 0: + return + publish_nodes = {} + #t = time.time() + keys = self.glob.keydb.keys_to_publish(int(time.time()-30*60)) + #logger.log("get keys to publush %d" % (time.time()-t)) + for k in keys: + if self.finish: return + for n in nodedb.neighbors_nodes(k.hash, False): + if self.finish: return + if n.id not in publish_nodes: publish_nodes[n.id] = [] + publish_nodes[n.id].append(k) + for n in publish_nodes: + if self.finish: return + node = nodedb[n] + if not node: continue + logger.log("PUBLISHKEY","publish key to %s" % (hexlify(n))) + try: + node.store("dat", publish_nodes[n]) + except o2on_node.NodeRemovable: + nodedb.remove(node) + nodedb.save() + self.glob.keydb.remove_bynodeid(node.id) + self.glob.keydb.save() + except o2on_node.NodeRefused: + pass + except socket.error, inst: + logger.log("PUBLISHKEY", inst) + else: + nodedb.add_node(node) + pubtime = int(time.time()) + for k in publish_nodes[n]: + self.glob.keydb.published(k.hash,pubtime) + self.glob.keydb.save() + logger.log("PUBLISHKEY","\tpublished key") + nodedb.save() + time.sleep(1) + +class SearchThread(JobThread): + def __init__(self, g): + JobThread.__init__(self,"search",60,g) + def stop(self): + JobThread.stop(self) + self.glob.datquery.semap.release() + def dojob(self, nodedb, logger, prof, datdb, datq): + d = datq.pop() + if self.finish: return + if datdb.has_key(d): + datq.save() + return + kid = o2on_util.datkeyhash(d) + datq.add(d) + datq.save() + reckey = [] + next = nodedb.neighbors_nodes(kid,False,5) + sent = [] + key = self.glob.keydb.get(kid) + if key: reckey.append(key) + while True: + if self.finish: return + neighbors = next + if len(neighbors)==0: break + next = [] + for node in neighbors: + if self.finish: return + logger.log("SEARCH","findvalue to %s for %s" % (hexlify(node.id),d)) + sent.append(node.id) + try: + res = node.findvalue(kid) + except o2on_node.NodeRemovable: + nodedb.remove(node) + nodedb.save() + self.glob.keydb.remove_bynodeid(node.id) + self.glob.keydb.save() + except o2on_node.NodeRefused: + pass + except socket.error, inst: + logger.log("SEARCH", inst) + else: + nodedb.add_node(node) + if not res: res = [] + for x in res: + if isinstance(x, o2on_node.Node): + nodedb.add_node(x) + if x.id != prof.mynode.id and x not in next and x.id not in sent: + logger.log("SEARCH","\tadd new neighbors %s" % hexlify(x.id)) + next.append(x) + elif isinstance(x, o2on_key.Key): + logger.log("SEARCH","\tadd new key") + if x not in reckey: reckey.append(x) + nodedb.save() + if len(reckey) == 0: logger.log("SEARCH","\tfailed to get key for %s" % d) + for key in reckey: + self.glob.keydb.add(key) + self.glob.keyquery.add(key) + self.glob.keydb.save() + +class DatQueryThread(JobThread): + def __init__(self, g): + JobThread.__init__(self,"dat query",60,g) + def stop(self): + JobThread.stop(self) + self.glob.keyquery.semap.release() + def dojob(self, nodedb, logger, prof, datdb, datq): + k = self.glob.keyquery.pop() + if self.finish: return + node = nodedb[k.nodeid] + if not node: node = o2on_node.Node(k.nodeid, k.ip, k.port) + logger.log("DATQUERY","dat query %s to %s" % (hexlify(k.hash),hexlify(node.id))) + try: + dat = node.dat(k.hash, self.glob) + except o2on_node.NodeRemovable: + nodedb.remove(node) + nodedb.save() + self.glob.keydb.remove_bynodeid(node.id) + self.glob.keydb.save() + except o2on_node.NodeRefused: + pass + except socket.error, inst: + logger.log("DATQUERY", inst) + else: + logger.popup("DATQUERY", "Got queried dat %s" % dat.path()) + nodedb.add_node(node) + datdb.add_dat(dat) + nodedb.save() + datdb.save() + self.glob.keyquery.save() + +# Server thread + +class ProxyServerThread(JobThread): + def __init__(self, g): + JobThread.__init__(self,"proxy server",0,g) + self.serv = o2on_server.O2ONServer(o2on_server.ProxyServerHandler, + o2on_config.ProxyPort, g) + def stop(self): + JobThread.stop(self) + self.serv.shutdown() + def dojob(self, nodes, logger, prof, datdb, datq): + self.serv.serve_forever() + +class P2PServerThread(JobThread): + def __init__(self, g): + JobThread.__init__(self,"p2p server",0,g) + if g.prof.mynode.port == 0: + self.serv = None + else: + self.serv = o2on_server.O2ONServer(o2on_server.P2PServerHandler, + g.prof.mynode.port, g) + def stop(self): + JobThread.stop(self) + if self.serv: self.serv.shutdown() + def dojob(self, nodes, logger, prof, datdb, datq): + if not self.serv: + self.finish = True + return + self.serv.serve_forever() + +class AdminServerThread(JobThread): + def __init__(self, g): + JobThread.__init__(self,"admin server",0,g) + self.serv = o2on_server.O2ONServer(o2on_server.AdminServerHandler, + o2on_config.AdminPort, g) + def stop(self): + JobThread.stop(self) + self.serv.shutdown() + def dojob(self, nodes, logger, prof, datdb, datq): + self.serv.serve_forever() Added: trunk/opy2on/lib/o2on_key.py =================================================================== --- trunk/opy2on/lib/o2on_key.py (rev 0) +++ trunk/opy2on/lib/o2on_key.py 2009-08-07 19:13:53 UTC (rev 142) @@ -0,0 +1,236 @@ +#!/usr/bin/python +# -*- coding: utf-8 + +import os.path +import re +import cPickle +import os.path +from binascii import unhexlify, hexlify +import xml.dom.minidom +import threading +import subprocess +import os +import datetime +import hashlib +import time + +import sys + +from o2on_const import KeyDBFile, regHosts +import o2on_config +from o2on_util import hash_xor_bitlength +import o2on_dat +import o2on_node + +class Key: + def __init__(self): + self.hash = None + self.nodeid = None + self.ip = None + self.port = None + self.size = None + self.url = "" + self.title = "" + self.note = "" + self.published = 0 + self.ikhash = None + def __cmp__(self,x): + return cmp(self.idkeyhash(), x.idkeyhash()) + def idkeyhash(self): + if not self.ikhash: self.ikhash = hashlib.sha1(self.hash + self.nodeid).digest() + return self.ikhash + def from_key(self, key): + if self.idkeyhash() == key.idkeyhash(): + self.ip = key.ip + self.port = key.port + if self.size < key.size: self.size = key.size + if key.url!="": self.url = key.url + if key.title!="":self.title = key.title + if key.note!="":self.note = key.note + def from_dat(self, dat): + self.hash = dat.hash() + data = dat.data() + self.size = len(data) + first = data.split("\n",1)[0] + try: + first = first.decode('cp932').encode('utf-8') + except UnicodeDecodeError, inst: + try: + first = first.decode('euc_jp').encode('utf-8') + except UnicodeDecodeError, inst: raise inst + m = re.compile(r'^.*<>.*<>.*<>.*<>(.*)$').match(first) + if m: self.title = m.group(1) + #reg = re.compile(r'^(?:2ch\.net|machibbs\.com)/(?:cyugoku|hokkaidou|k(?:an(?:a|to)|inki|(?:ousinet|yusy)u)|o(?:(?:kinaw|sak)a)|sikoku|t(?:a(?:(?:m|war)a)|o(?:kyo|u(?:hoku|kai))))') + #if re.compile("^\s*$").match(self.title) and \ + # not reg.match(dat.path()): print dat.datpath() + self.url = "http://xxx.%s/test/read.cgi/%s/%s/" % (dat.domain, + dat.board, + dat.datnum) + def from_node(self, node): + self.nodeid = node.id + self.ip = node.ip + self.port = node.port + def xml(self): + if not self.valid(): return "" + board_xml = "<key>\r\n" + board_xml += "<hash>%s</hash>\r\n" % hexlify(self.hash) + board_xml += "<nodeid>%s</nodeid>\r\n" % hexlify(self.nodeid) + board_xml += "<ip>%s</ip>\r\n" % o2on_node.ip2e(self.ip) + board_xml += "<port>%d</port>\r\n" % (self.port or 0) + board_xml += "<size>%d</size>\r\n" % self.size + board_xml += "<url>%s</url>\r\n" % self.url + title = self.title.decode('utf-8') + if len(title)>32: title = title[:31]+"…".decode('utf-8') + board_xml += "<title><![CDATA[%s]]></title>\r\n" % title + note = self.note.decode('utf-8') + if len(note)>32: note = note[:31]+"…".decode('utf-8') + board_xml += "<note><![CDATA[%s]]></note>\r\n" % note + board_xml += "</key>\r\n" + return board_xml + def valid(self): + return self.hash and self.nodeid and self.ip and self.port + def from_xml_node(self, xml_node): + res = {} + for c in ('hash', 'nodeid', 'ip', 'port', 'size', 'url', 'title', 'note'): + try: res[c] = xml_node.getElementsByTagName(c)[0].childNodes[0].data + except IndexError: continue + if res.get('hash'): self.hash = unhexlify(res.get('hash')) + if res.get('nodeid'): self.nodeid = unhexlify(res.get('nodeid')) + if res.get('ip'): self.ip = o2on_node.e2ip(res.get('ip')) + self.port = res.get('port', self.port) + if self.port: self.port = int(self.port) + self.size = res.get('size', self.size) + if self.size: self.size = int(self.size) + self.url = res.get('url', self.url) + if res.get('title'): self.title = res.get('title').encode('utf-8') + if res.get('note'): self.note = res.get('note').encode('utf-8') + +class KeyDB: + def __init__(self, g): + self.lock = threading.Lock() + with self.lock: + self.keys = {} + self.lenmap = {} + self.publishmap = {} + self.glob = g + self.load() + def __len__(self): + with self.lock: + return len(self.keys) + def show(self): + pager = os.environ.get('PAGER') + if not pager: self.glob.logger.log("KEYDB", "PAGER env must be set") + proc = subprocess.Popen(pager, shell=True, stdin=subprocess.PIPE) + pipe = proc.stdin + try: + with self.lock: + for l in sorted(self.lenmap.keys()): + for h in self.lenmap[l]: + key = self.keys[h] + s = "%3d %s:%d\t%s\t%s\t%s\t%s\t%dB\t%s\n" % \ + (l, o2on_node.ip2e(key.ip), + key.port, key.url, + key.title.decode('utf-8'), + key.note.decode('utf-8'), + str(datetime.datetime.fromtimestamp(int(key.published))), + key.size, + hexlify(key.hash)) + pipe.write(s.encode('utf-8')) + pipe.close() + proc.wait() + except IOError, inst: + if inst.errno == 32:pass # Broken pipe + else: raise inst + self.glob.logger.log("KEYDB", "Finished to show keys") + def key_list(self): + res = [] + with self.lock: + for l in sorted(self.lenmap.keys()): + for h in self.lenmap[l]: + key = self.keys[h] + x = (l, o2on_node.ip2e(key.ip), + key.port, key.url, + key.title.decode('utf-8'), + key.note.decode('utf-8'), + str(datetime.datetime.fromtimestamp(int(key.published))), + key.size, + hexlify(key.hash)) + res.append(x) + return res + def keys_to_publish(self, last_published_before): + res = [] + for x in sorted(self.publishmap.keys()): + for y in self.publishmap[x]: + res.append(self.keys[y]) + return res + def published(self, idkeyhash, publish_time): + if len(idkeyhash) != 20: raise Exception + with self.lock: + if idkeyhash not in self.keys: return + key = self.keys[idkeyhash] + self.publishmap[key.published].remove(idkeyhash) + key.published = publish_time + if publish_time not in self.publishmap: self.publishmap[publish_time]=[] + self.publishmap[publish_time].append(idkeyhash) + def get(self, target): + with self.lock: + return self.keys.get(target) + def remove_bynodeid(self, nid): + if len(nid) != 20:raise Exception + removes = [] + with self.lock: + for k in self.keys.values(): + if k.nodeid == nid: removes.append(k) + for k in removes: + self.remove(k) + def remove(self,k): + with self.lock: + del self.keys[k.idkeyhash()] + l = hash_xor_bitlength(self.glob.prof.mynode.id, k.hash) + self.lenmap[l].remove(k.idkeyhash()) + self.publishmap[k.published].remove(k.idkeyhash()) + if len(self.lenmap[l]) == 0: del self.lenmap[l] + if len(self.publishmap[k.published])==0: del self.publishmap[k.published] + def add(self, k): + if not k.valid(): return + k.published = int(time.time()) + if k.idkeyhash() in self.keys: + self.keys[k.idkeyhash()].from_key(k) + return + bl = hash_xor_bitlength(self.glob.prof.mynode.id, k.hash) + with self.lock: + if len(self.keys) < 3000: + pass + else: + maxlen = max(self.lenmap.keys()) + if bl <= maxlen: + maxkey = self.keys[self.lenmap[maxlen][0]] + del self.keys[self.lenmap[maxlen][0]] + del self.lenmap[maxlen][0] + if len(self.lenmap[maxlen]) == 0: + del self.lenmap[maxlen] + self.publishmap[maxkey.published].remove(maxkey.idkeyhash()) + if len(self.publishmap[maxkey.published]) == 0: + del self.publishmap[maxkey.published] + else: return + self.keys[k.idkeyhash()] = k + if bl not in self.lenmap: self.lenmap[bl] = [] + self.lenmap[bl].append(k.idkeyhash()) + if k.published not in self.publishmap: + self.publishmap[k.published] = [] + self.publishmap[k.published].append(k.idkeyhash()) + def save(self): + f = open(KeyDBFile,'wb') + with self.lock: + cPickle.dump(self.keys, f, -1) + cPickle.dump(self.lenmap, f, -1) + cPickle.dump(self.publishmap,f,-1) + f.close() + def load(self): + if os.path.isfile(KeyDBFile): + f = open(KeyDBFile,'rb') + with self.lock: + self.keys = cPickle.load(f) + self.lenmap = cPickle.load(f) + self.publishmap = cPickle.load(f) + f.close() Added: trunk/opy2on/lib/o2on_node.py =================================================================== --- trunk/opy2on/lib/o2on_node.py (rev 0) +++ trunk/opy2on/lib/o2on_node.py 2009-08-07 19:13:53 UTC (rev 142) @@ -0,0 +1,564 @@ +#!/usr/bin/python +# -*- coding: utf-8 + +import os +from Crypto.Cipher import AES # pycrypto +import xml.dom.minidom +import re +import urllib2 +import cPickle +from struct import unpack, pack +from binascii import unhexlify, hexlify +import httplib +import socket +import threading +import random +import time + +import o2on_config +from o2on_util import hash_xor_bitlength +import o2on_const +import o2on_util +import o2on_dat +import o2on_key + +class NodeRemovable: + pass +class NodeRefused: + pass + +AESKEY = "0000000000000000" + +def aescounter(): + return "\x00" * 16 + +def e2port(s): + aesobj = AES.new(AESKEY, AES.MODE_CTR, counter=aescounter) + return unpack("<H",aesobj.decrypt(unhexlify(s)+"\x00"*14)[0:2])[0] + +def e2ip(s): + aesobj = AES.new(AESKEY, AES.MODE_CTR, counter=aescounter) + x = aesobj.decrypt(unhexlify(s)+"\x00"*12) + return ("%d.%d.%d.%d" % (int(unpack("B",x[0])[0]), + int(unpack("B",x[1])[0]), + int(unpack("B",x[2])[0]), + int(unpack("B",x[3])[0]))) + +def port2e(p): + aesobj = AES.new(AESKEY, AES.MODE_CTR, counter=aescounter) + return hexlify(aesobj.encrypt(pack("<H",p)+"\x00"*14)[:2]) + +def ip2e(ip): + aesobj = AES.new(AESKEY, AES.MODE_CTR, counter=aescounter) + regIP = re.compile(r'(\d+)\.(\d+)\.(\d+)\.(\d+)') + m = regIP.match(ip) + if not m: raise Exception + return hexlify(aesobj.encrypt(pack("B", int(m.group(1)))+ + pack("B", int(m.group(2)))+ + pack("B", int(m.group(3)))+ + pack("B", int(m.group(4)))+"\x00"*12)[:4]) + +def hash_xor(a,b): + res = "" + for i in range(0,len(a)): + res += chr(ord(a[i]) ^ ord(b[i])) + return res + +def hash_bittest(x, pos): + return (((ord(x[pos/8]) >> (pos%8)) & 0x01 != 0)) + +common_header = {} +def build_common_header(prof): + global common_header + if not prof.mynode.id: raise Exception("My ID is NULL") + if prof.mynode.port==None: raise Exception("My Port is NULL") + if not prof.mynode.name: prof.mynode.name="" + if not prof.mynode.pubkey: raise Exception("My pubkey is NULL") + common_header = {'Connection': "close", + 'X-O2-Node-ID': hexlify(prof.mynode.id), + 'X-O2-Port': str(prof.mynode.port), + 'X-O2-Node-Name': prof.mynode.name, + 'X-O2-Node-Flags':'--D', + 'User-Agent': prof.mynode.ua, + 'X-O2-RSA-Public-Key': hexlify(prof.mynode.pubkey)} + +class Node: + def __init__(self, node_id=None, i=None, p=None): + self.id = node_id + self.ip = i + self.port = p + self.name = None + self.pubkey = None + self.ua = "" + self.lock = threading.Lock() + self.lastping = None + self.removable = False + self.flag = "???" + self.flag_running = False + self.flag_history = False + self.flag_dat = False + def __getstate__(self): + return (self.id, self.ip,self.port, self.name, self.pubkey,self.ua,self.flag) + def __setstate__(self,x): + self.id, self.ip, self.port, self.name, self.pubkey,self.ua,self.flag = x[:7] + self.setflag(self.flag) + self.lock = threading.Lock() + self.lastping = None + self.removable = False + def __cmp__(self,x): + return cmp(self.id, x.id) + def from_node(self, n): + if self.id == n.id: + self.ip = n.ip + self.port = n.port + self.pubkey = n.pubkey + # TODO pubkey の変更には気をつけたほうがいい + if n.ua !="": self.ua = n.ua + if n.name !="": self.name = n.name + if n.flag !="": self.setflag(n.flag) + def from_xml(self, xmlnode): + try: + self.ip = e2ip(xmlnode.getElementsByTagName("ip")[0].childNodes[0].data) + self.port = int(xmlnode.getElementsByTagName("port")[0].childNodes[0].data) + self.id = unhexlify(xmlnode.getElementsByTagName("id")[0].childNodes[0].data) + except KeyError: return + names = xmlnode.getElementsByTagName("name") + if len(names)>0 and len(names[0].childNodes)>0: + self.name = names[0].childNodes[0].data.encode('utf-8') + pubkeys = xmlnode.getElementsByTagName("pubkey") + if len(pubkeys)>0 and len(pubkeys[0].childNodes)>0: + self.pubkey = unhexlify(pubkeys[0].childNodes[0].data) + def xml(self): + data = "<node>\r\n" + data += "<id>%s</id>\r\n" % hexlify(self.id) + data += "<ip>%s</ip>\r\n" % ip2e(self.ip) + data += "<port>%s</port>\r\n" % self.port + if self.name: + data += "<name><![CDATA[%s]]></name>\r\n" % self.name.decode('utf-8') + if self.pubkey: + data += "<pubkey>%s</pubkey>\r\n" % hexlify(self.pubkey) + data += "</node>\r\n" + return data + def setflag(self,flag): + self.flag = flag + self.flag_running = "r" in flag + self.flag_history = "t" in flag + self.flag_dat = "D" in flag + def request(self, method, path, body, addheaders): + if self.removable: raise NodeRemovable + headers = common_header.copy() + if method == 'POST': + headers['X-O2-Node-Name'] = \ + headers['X-O2-Node-Name'].decode('utf-8').encode('ascii','replace') + for x in addheaders: headers[x] = addheaders[x] + with self.lock: + conn = httplib.HTTPConnection(self.ip, self.port) + try: + socket.setdefaulttimeout(o2on_config.SocketTimeout) + conn.connect() + socket.setdefaulttimeout(None) + conn.request(method,path,body, headers) + r = conn.getresponse() + conn.close() + except socket.timeout: + socket.setdefaulttimeout(None) + self.removable = True + raise NodeRemovable + except socket.error, inst: + socket.setdefaulttimeout(None) + if inst.errno in (113, 111): raise NodeRemovable + if inst.errno in (110, 104): raise NodeRefused + else: raise inst + except httplib.BadStatusLine: + socket.setdefaulttimeout(None) + raise NodeRefused + else: + name = r.getheader('X-O2-Node-Name') + pubkey = r.getheader('X-O2-RSA-Public-Key') + ua = r.getheader('Server') + flag = r.getheader('X-O2-Node-Flags') + if not name or not pubkey or not ua: raise NodeRemovable + self.name = name + self.pubkey = unhexlify(pubkey) + self.ua = ua + if flag: self.setflag(flag) + return r + def im(self, mynode, msg): + headers = {} + im_xml = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\r\n" + im_xml += "<messages>\r\n" + im_xml += "<message>\r\n" + mynode_xml = mynode.xml() + mynode_xml = mynode_xml[len("<node>\r\n"):] + mynode_xml = mynode_xml[:-len("</node>\r\n")] + im_xml += mynode_xml + im_xml += "<msg><![CDATA[%s]]></msg>\r\n" % msg + im_xml += "</message>\r\n" + im_xml += "</messages>\r\n" + im_xml = im_xml.encode('utf-8') + headers['Content-Type'] = 'text/xml; charset=utf-8' + headers['Content-Length'] = str(len(im_xml)) + r = self.request("POST", "/im", im_xml, headers) + if r.status != 200: raise Exception("im status %d" % r.status) + return True + def store(self,category, keys): + headers={"X-O2-Key-Category": category} + board_xml = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\r\n" + board_xml += "<keys>\r\n" + for key in keys: board_xml += key.xml() + board_xml += "</keys>\r\n" + board_xml = board_xml.encode('utf-8') + headers['Content-Type'] = 'text/xml; charset=utf-8' + headers['Content-Length'] = str(len(board_xml)) + r = self.request("POST", "/store", board_xml, headers) + if r.status != 200: raise Exception("store status %d" % r.status) + return + def findnode(self, targetid): + headers = {"X-O2-Target-Key" : hexlify(targetid)} + r = self.request("GET","/findnode",None,headers) + data = None + result = [] + try: + if r.status == 200: + l = r.getheader('Content-Length') + if l: l = int(l) + else: return [] + data = r.read(l) + elif r.status == 404: pass + else: raise Exception("status error %d" % r.status) + except socket.error: + pass + else: + if data: + dom = xml.dom.minidom.parseString(data) + nn = dom.getElementsByTagName("nodes") + if len(nn): + for n in nn[0].getElementsByTagName("node"): + node = Node() + node.from_xml(n) + if node.ip and node.port and node.id: + result.append(node) + dom.unlink() + return result + def dat(self, dathash, board, datdb=None): + headers = {} + omiyage = None + if datdb and board: + omiyage = datdb.getRandomInBoard(board) + if(dathash): + headers['X-O2-Target-Key'] = dathash + elif(board): + headers['X-O2-Target-Board'] = board + if omiyage: + data = omiyage.data() + headers['X-O2-DAT-Path'] = omiyage.path() + headers['Content-Type'] = 'text/plain; charset=shift_jis' + headers['Content-Length'] = str(len(data)) + if omiyage: + r = self.request("POST","/dat", data,headers) + else: + r = self.request("GET","/dat",None,headers) + stat = r.status + data = r.read() + if stat == 200: + path = None + path = r.getheader("X-O2-Original-DAT-URL") + if not path: path = r.getheader("X-O2-DAT-Path") + if not path: return None + if ".." in path: return None + dat = o2on_dat.Dat(path) + if dat.save(data): return dat + elif stat == 404: pass + elif stat == 400: pass + else: raise Exception("dat status %d" % stat) + return None + def ping(self, force=False): + if not force and self.lastping and \ + (time.time() - self.lastping < o2on_config.RePingSec): + return True + r = self.request("GET","/ping",None,{}) + if r.status == 200: + self.lastping = int(time.time()) + l = r.getheader('Content-Length') + if l: l = int(l) + else: return False + return r.read(l) + else: raise Exception("ping status %d" % r.status) + return False + def collection(self, glob): + board_xml = o2on_util.xml_collecting_boards(glob) + headers = {'Content-Type':'text/xml; charset=utf-8', + 'Content-Length':len(board_xml)} + r = self.request("POST","/collection",board_xml,headers) + result = [] + if r.status == 200: + data = r.read() + # 本家o2on の bug 対策 + if data.rfind("</boards>") == -1: + index = data.rfind("<boards>") + data = data[:index] + "</boards>" + data[index+len("<boards>"):] + if len(data): + dom = xml.dom.minidom.parseString(data) + nn = dom.getElementsByTagName("boards") + if len(nn): + for b in nn[0].getElementsByTagName("board"): + result.append(b.childNodes[0].data) + dom.unlink() + else: raise Exception("collection status %d" % r.status) + return result + def findvalue(self, kid): + headers = {"X-O2-Target-Key":hexlify(kid)} + r = self.request("GET","/findvalue",None,headers) + if r.status == 200: + res = [] + dom = xml.dom.minidom.parseString(r.read()) + nodes = dom.getElementsByTagName("nodes") + keys = dom.getElementsByTagName("keys") + if len(nodes): + for n in nodes[0].getElementsByTagName("node"): + node = Node() + node.from_xml(n) + if node.ip and node.port and node.id: + res.append(node) + elif len(keys): + for k in keys[0].getElementsByTagName("key"): + key = o2on_key.Key() + key.from_xml_node(k) + res.append(key) + return res + elif r.status == 404: pass + else: raise Exception("findvalue status %d" % r.status) +class NodeDB: + def __init__(self, glob): + self.glob = glob + self.KBuckets = [] + self.port0nodes = [] + self.lock = threading.Lock() + for x in range(0,160): self.KBuckets.append([]) + self.nodes = dict() + self.boardmap = dict() + self.load() + if len(self.nodes) == 0: self.node_from_web() + def __len__(self): + with self.lock: + return len(self.nodes) + def __getitem__(self,x): + if x == self.glob.prof.mynode.id: + return self.glob.prof.mynode + with self.lock: + return self.nodes.get(x) + def show(self): + self.glob.logger.begin() + with self.lock: + if len(self.nodes) == 0: + self.glob.logger.log("NODEDB", "No nodes") + self.glob.logger.end() + return + for l in range(0,160): + for x in self.KBuckets[l]: + node = self.nodes[x] + if node.name: name = node.name.decode('utf-8') + else: name = "名無しさん".decode('utf-8') + s = "%3d %-8s\t%s %s:%d\t%24s\t%s" % (l+1, name, + node.flag, + ip2e(node.ip), node.port, + node.ua, hexlify(node.id)) + s = s.encode('utf-8') + self.glob.logger.log("NODEDB", s) + self.glob.logger.end() + def node_list(self): + with self.lock: + res = [] + if len(self.nodes) == 0: return res + for l in range(0,160): + for x in self.KBuckets[l]: + node = self.nodes[x] + if node.name: name = node.name.decode('utf-8') + else: name = "名無しさん".decode('utf-8') + x = (l+1, hexlify(node.id), ip2e(node.ip), node.port,name, + node.flag, ip2e(node.ip), node.port, + node.ua, hexlify(node.id)) + res.append(x) + return res + def exportnode(self): + with self.lock: + f = open('exportnodes.xml','w') + f.write("<nodes>\n") + for x in self.nodes: + n = self.nodes[x] + f.write("<str>%s%s%s</str>\n" % (hexlify(n.id), ip2e(n.ip), port2e(n.port))) + f.write("</nodes>\n") + f.close() + def get_nodes_for_board(self, board): + with self.lock: + if not board in self.boardmap: return [] + return map(lambda x: self.nodes[x], self.boardmap[board]) + def get_random_board(self): + with self.lock: + if len(self.boardmap) == 0: return None + return random.choice(self.boardmap.keys()) + def reset_collection_for_node(self,n): + with self.lock: + zeroboard = [] + for x in self.boardmap: + if n.id in self.boardmap[x]: + self.boardmap[x].remove(n.id) + if len(self.boardmap[x]) == 0: + zeroboard.append(x) + for z in zeroboard: del self.boardmap[z] + def add_collection(self, board, n): + if board not in self.glob.allboards: return + r = None + with self.lock: + if not board in self.boardmap: + self.boardmap[board] = [n.id] + elif len(self.boardmap[board])<10: + self.boardmap[board].append(n.id) + else: + nt = self.nodes.get(self.boardmap[board][0]) + if not nt: + raise Exception + del self.boardmap[board][0] + self.boardmap[board].append(n.id) + else: + r = nt.ping() + if r: + self.boardmap[board].append(self.boardmap[board][0]) + del self.boardmap[board][0] + else: + del self.boardmap[board][0] + self.boardmap[board].append(n.id) + if n.id not in self.nodes: self.add_node(n) + if r: self.add_node(nt) + def choice(self): + with self.lock: + return self.nodes[random.choice(self.nodes.keys())] + def remove(self, x): + if x.port == 0: + with self.lock: + if x in self.port0nodes: + self.port0nodes.remove(x) + return + with self.lock: + if x.id in self.nodes: + del self.nodes[x.id] + self.KBuckets[hash_xor_bitlength( + self.glob.prof.mynode.id, x.id)-1].remove(x.id) + zeroboard = [] + for b in self.boardmap.keys(): + if x.id in self.boardmap[b]: + self.boardmap[b].remove(x.id) + if len(self.boardmap[b]) == 0: + zeroboard.append(b) + for b in zeroboard: self.boardmap[b] + def save(self): + with self.lock: + pkl_file = open(o2on_const.NodeDBFile, 'wb') + cPickle.dump(self.nodes, pkl_file,-1) + cPickle.dump(self.boardmap, pkl_file,-1) + cPickle.dump(self.port0nodes, pkl_file,-1) + pkl_file.close() + def load(self): + if(os.path.isfile(o2on_const.NodeDBFile)): + pkl_file = open(o2on_const.NodeDBFile, 'rb') + try: + tmp = cPickle.load(pkl_file) + except EOFError: + tmp = dict() + with self.lock: + try: self.boardmap = cPickle.load(pkl_file) + except EOFError: tmp2 = dict() + with self.lock: + try: self.port0nodes = cPickle.load(pkl_file) + except: self.port0nodes = [] + pkl_file.close() + for x in tmp: self.add_node(tmp[x]) + def add_node(self,node): + if node.port == 0: + self.glob.logger.log("NODEDB", "Added port0 node %s" % hexlify(node.id)) + with self.lock: + if node in self.port0nodes: + self.port0nodes.remove(node) + self.port0nodes.append(node) + return + bitlen = hash_xor_bitlength(self.glob.prof.mynode.id, node.id)-1 + if(bitlen<0): return + with self.lock: + if node.id in self.KBuckets[bitlen]: + n = self.nodes[node.id] + n.from_node(node) + self.KBuckets[bitlen].remove(node.id) + self.KBuckets[bitlen].append(node.id) + elif len(self.KBuckets[bitlen]) < max(20, bitlen/2): + self.KBuckets[bitlen].append(node.id) + self.nodes[node.id] = node + else: + n = self.nodes[self.KBuckets[bitlen][0]] + r = n.ping() + if r: + del self.KBuckets[bitlen][0] + self.KBuckets[bitlen].append(n.id) + else: + del self.nodes[n.id] + del self.KBuckets[bitlen][0] + zeroboard = [] + for b in self.boardmap.keys(): + if n.id in self.boardmap[b]: + self.boardmap[b].remove(n.id) + if len(self.boardmap[b]) == 0: + zeroboard.append(b) + for b in zeroboard: del self.boardmap[b] + self.KBuckets[bitlen].append(node.id) + self.nodes[node.id] = node + def neighbors_nodes(self,target, includeself, cnt=3): + myid = self.glob.prof.mynode.id + d = hash_xor(target, myid) + bitlen = hash_xor_bitlength(target, myid) - 1 + result = [] + if(bitlen>=0): + with self.lock: + if(len(self.KBuckets[bitlen])): + for x in sorted(map(lambda x: hash_xor(x,target), + self.KBuckets[bitlen])): + result.append(self.nodes[hash_xor(x,target)]) + if(len(result)>=cnt): return result + for l in range(bitlen-1,-1,-1): + if(hash_bittest(d,l) and len(self.KBuckets[l])): + for x in sorted(map(lambda x: hash_xor(x,target), + self.KBuckets[l])): + result.append(self.nodes[hash_xor(x,target)]) + if(len(result)>=cnt): return result + if(includeself and self.glob.prof.mynode.ip and self.glob.prof.mynode.port != 0): + result.append(self.glob.prof.mynode) + if(len(result)>=cnt): return result + if(bitlen>=0): + for l in range(0,bitlen): + with self.lock: + if(not hash_bittest(d,l) and len(self.KBuckets[l])): + for x in sorted(map(lambda x: hash_xor(x,target), + self.KBuckets[l])): + result.append(self.nodes[hash_xor(x,target)]) + if(len(result)>=cnt): return result + for l in range(bitlen+1,160): + with self.lock: + if(len(self.KBuckets[l])): + for x in sorted(map(lambda x: hash_xor(x,target), + self.KBuckets[l])): + result.append(self.nodes[hash_xor(x,target)]) + if(len(result)>=cnt): return result + return result + def node_from_web(self): + nodeurls = o2on_config.Node_List_URLs + regNode = re.compile("([0-9a-f]{40})([0-9a-f]{8})([0-9a-f]{4})") + for url in nodeurls: + xmldata = urllib2.urlopen(url) + dom = xml.dom.minidom.parseString(xmldata.read()) + n = dom.getElementsByTagName("nodes") + if len(n): + for y in n[0].getElementsByTagName("str"): + if len(y.childNodes) == 1: + m = regNode.search(y.childNodes[0].data) + self.add_node(Node(unhexlify(m.group(1)), + e2ip(m.group(2)), + e2port(m.group(3)))) + dom.unlink() Added: trunk/opy2on/lib/o2on_profile.py =================================================================== --- trunk/opy2on/lib/o2on_profile.py (rev 0) +++ trunk/opy2on/lib/o2on_profile.py 2009-08-07 19:13:53 UTC (rev 142) @@ -0,0 +1,67 @@ +#!/usr/bin/python + +from Crypto.PublicKey import RSA # pycrypto +import os +import cPickle +from binascii import unhexlify + +import o2on_util +import o2on_config +import o2on_node +from o2on_const import * + +class MyRSA: + def __init__(self, rsa): + self.rsa = rsa + def hex(self): + s = hex(self.rsa.n)[2:-1] + return "0" * (320-len(s)) + s + def pubkey(self): + return unhexlify(self.hex()) + +def uname(logger): + if os.name == 'posix': + try: + return os.uname()[0]+" "+os.uname()[4] + except AttributeError: + logger.popup("Unknown POSIX %s.\nPlease report it." % os.name) + return "Unknown POSIX" + elif os.name == 'nt': + return "Win" + try: + uname = os.uname()[0]+" "+os.uname()[4] + except AttributeError: + uname = "" + logger.popup("GLOBAL","Unknown OS %s (%s).\nPlease report it." % (os.name, uname)) + return "Unknown" + +class Profile: + def __init__(self, l): + self.logger = l + self.mynode = o2on_node.Node() + self.mynode.id = o2on_util.randomid() + self.mynode.ip = None + self.mynode.port = o2on_config.P2PPort + self.mynode.name = o2on_config.NodeName[:8].encode('utf-8') + self.mynode.pubkey = None + self.rsa = None + self.mynode.ua = "%s/%.1f (%s/%1d.%02d.%04d; %s)" % \ + (ProtocolName, ProtocolVer, AppName, AppMajorVer, AppMinorVer, AppBuildNo, + uname(l)) + self.load() + if not self.rsa: + self.logger.log("PROFILE","generating RSA key") + self.rsa = MyRSA(RSA.generate(160*8,os.urandom)) + self.save() + self.mynode.pubkey = self.rsa.pubkey() + def save(self): + pkl_file = open(ProfileFile,"wb") + cPickle.dump(self.rsa, pkl_file,-1) + cPickle.dump(self.mynode.id, pkl_file,-1) + pkl_file.close() + def load(self): + if(os.path.isfile(ProfileFile)): + pkl_file = open(ProfileFile,"rb") + self.rsa = cPickle.load(pkl_file) + self.mynode.id = cPickle.load(pkl_file) + pkl_file.close() Added: trunk/opy2on/lib/o2on_server.py =================================================================== --- trunk/opy2on/lib/o2on_server.py (rev 0) +++ trunk/opy2on/lib/o2on_server.py 2009-08-07 19:13:53 UTC (rev 142) @@ -0,0 +1,859 @@ +#!/usr/bin/python +# -*- coding: utf-8 + +import BaseHTTPServer +import re +import socket +from urlparse import urlparse +import httplib +import os.path +import gzip +import base64 +import zlib +import StringIO +from binascii import hexlify, unhexlify +import xml.dom.minidom +import urllib +import cgi +import time +import traceback +import sys + +import o2on_config +from o2on_const import regHosts, ProtocolVer, AppName +import o2on_node +import o2on_dat +from o2on_node import ip2e, port2e, e2ip +import o2on_key +import o2on_im + +class O2ONServer(BaseHTTPServer.HTTPServer): + def __init__(self, handler, port, g): + BaseHTTPServer.HTTPServer.__init__(self, + ('', port), + handler) + self.glob = g + self.requests = [] + def shutdown(self): + for r in self.requests: + r.shutdown(socket.SHUT_RDWR) + r.close() + BaseHTTPServer.HTTPServer.shutdown(self) + def finish_request(self, request, client_address): + self.requests.append(request) + try: + BaseHTTPServer.HTTPServer.finish_request(self, request, client_address) + except Exception,inst: + if isinstance(inst, socket.error) and inst.errno in (104, 32): + pass + else: + if o2on_config.OutputErrorFile: + f = open('error-'+str(int(time.time()))+'.txt', 'w') + f.write(str(inst)+"\n") + traceback.print_exc(file=f) + f.close() + self.glob.logger.popup("ERROR", str(inst)) + self.glob.shutdown.set() + self.requests.remove(request) + +class ProxyServerHandler(BaseHTTPServer.BaseHTTPRequestHandler): + URLTYPE_NORMAL = 0 + URLTYPE_DAT = 1 + URLTYPE_KAKO_DAT = 2 + URLTYPE_KAKO_GZ = 3 + URLTYPE_OFFLAW = 4 + URLTYPE_MACHI = 5 + URLTYPE_UNKNOWN = 6 + regs = (re.compile( + r'^http://[^.]+\.'+regHosts+r'(?::\d+)?/test/read.cgi/[^/]+/\d+/$'), + re.compile( + r'^http://[^.]+\.'+regHosts+r'(?::\d+)?/([^/]+)/dat/(\d+)\.dat$'), + re.compile( + r'^http://[^.]+\.'+regHosts+r'(?::\d+)?/([^/]+)/kako/\d+/\d+/(\d+)\.dat$'), + re.compile( + r'^http://[^.]+\.'+regHosts+r'(?::\d+)?/([^/]+)/kako/\d+/\d+/(\d+)\.dat\.gz$'), + re.compile( + r'^http://[^.]+\.'+regHosts+r'(?::\d+)?/test/offlaw.cgi/[^/]+/\d+/\?raw='), + re.compile( + r'^http://[^.]+\.'+regHosts+r'(?::\d+)?/[^/]+/read.pl\?BBS=[^&]+&KEY=\d+'),) + def __init__(self,request, client_address, server): + BaseHTTPServer.BaseHTTPRequestHandler.__init__(self,request, client_address, server) + def urltype(self): + x = 0 + for r in self.regs: + if r.match(self.path): return x + x += 1 + return x + def get_requested_header(self): + h = self.headers.headers + hr = {} + for x in h: + f = x.split(": ",1) + hr[f[0]] = f[1][:-2] + return hr + def get_connection(self, remove=[]): + h = self.headers.headers + hr = {} + for x in h: + f = x.split(": ",1) + if f[0] != "Proxy-Connection": + hr[f[0]] = f[1][:-2] + hr["Connection"] = "close" + p = urlparse(self.path) + if "@" in p.netloc: + x = p.netloc.split("@",1) + loc = x[1] + hr["Authorization"] = "Basic "+base64.b64encode(x[0]) + else: + loc = p.netloc + for r in remove: + if r in hr: del hr[r] + conn = httplib.HTTPConnection(loc) + conn.request("GET",p.path, None, hr) + return conn + def msg(self,r): + res = '' + for x in r.getheaders(): + if x[0] in ("transfer-encoding",): + pass + else: res+=x[0]+': '+x[1]+'\r\n' + return res + def normal_proxy(self): + try: + conn = self.get_connection() + r= conn.getresponse() + conn.close() + except socket.timeout: + return + self.wfile.write("HTTP/%d.%d %d %s\r\n" % + (r.version/10,r.version%10,r.status,r.reason)) + self.wfile.write(self.msg(r)) + self.wfile.write("\r\n") + self.wfile.write(r.read()) + self.wfile.close() + def datpath(self): + m = self.regs[self.URLTYPE_DAT].match(self.path) + if not m: m = self.regs[self.URLTYPE_KAKO_DAT].match(self.path) + if not m: m = self.regs[self.URLTYPE_KAKO_GZ].match(self.path) + if not m: return None + return os.path.join(o2on_config.DatDir, m.group(1), m.group(2), + m.group(3)[:4],m.group(3)+".dat") + def datkey(self): + m = self.regs[self.URLTYPE_DAT].match(self.path) + if not m: m = self.regs[self.URLTYPE_KAKO_DAT].match(self.path) + if not m: m = self.regs[self.URLTYPE_KAKO_GZ].match(self.path) + if not m: return None + return "/".join((m.group(1), m.group(2), m.group(3))) + def do_GET(self): + logger = self.server.glob.logger + logger.log("PROXY", "proxy requested %s" % self.path) + ut = self.urltype() + if ut in (self.URLTYPE_UNKNOWN, self.URLTYPE_NORMAL, self.URLTYPE_MACHI): + self.normal_proxy() + return + try: + conn = self.get_connection() + r= conn.getresponse() + conn.close() + except socket.timeout: + r = None + data = None + header = None + if r: + logger.log("PROXY", "\tresponse %s" % r.status) + if ut != self.URLTYPE_OFFLAW and r.status in (200,206,304): + logger.log("PROXY", "\tgot response from server") + data = r.read() + if r.getheader("content-encoding") == "gzip": + sf = StringIO.StringIO(data) + dec = gzip.GzipFile(fileobj=sf) + datdata = dec.read() + else: + datdata = data + self.wfile.write("HTTP/%d.%d %d %s\r\n" % + (r.version/10,r.version%10,r.status,r.reason)) + self.wfile.write(self.msg(r)) + self.wfile.write("\r\n") + self.wfile.write(data) + self.wfile.close() + dk = self.datkey() + dp = self.datpath() + if r.status == 200: + if not self.server.glob.datdb.has_key(dk): + # 持ってない dat が取得された + logger.log("PROXY", "\tsave responsed dat for myself") + self.server.glob.datdb.add(dk, datdata) + else: + if self.server.glob.datdb.has_key(dk): + if r.status == 206: + # 持ってる dat の差分 + rg = r.getheader('Content-Range') + start = 0 + if rg: + m=re.compile(r'bytes (\d+)-').search(rg) + start = int(m.group(1)) + logger.log("PROXY", "\tsave diff dat for myself (%d-)" % start) + self.server.glob.datdb.add(dk, datdata, start) + elif o2on_config.RequestNonExistDat: + # 持ってない dat がリクエストされた -> こっそり取得 + logger.log("PROXY", "\trequest whole dat for myself :-)") + try: + conn = self.get_connection(['If-Modified-Since', 'Range']) + r2= conn.getresponse() + conn.close() + except socket.timeout: + r2 = None + if r2 and r2.status == 200: + data = r2.read() + if r.getheader("content-encoding") == "gzip": + data = zlib.decompress(data) + self.server.glob.datdb.add(dk, data) + elif ut in (self.URLTYPE_DAT, self.URLTYPE_KAKO_DAT, self.URLTYPE_KAKO_GZ): + logger.log("PROXY", "\ttry to read dat from cache") + dp = self.datpath() + wdata = None + if ut == self.URLTYPE_KAKO_GZ: + if os.path.exists(dp): + f=open(dp) + wdata=zlib.compress(f.read()) + elif os.path.exists(dp+".gz"): + f=open(dp+".gz",'r') + wdata=f.read() + else: f= None + if f: f.close() + else: + if os.path.exists(dp): + f=open(dp) + wdata=f.read() + elif os.path.exists(dp+".gz"): + f=gzip.GzipFile(dp+".gz",'r') + wdata=f.read() + else: f= None + if f: f.close() + if wdata: # FIXME range, gzip + logger.log("PROXY", "\tfound cached dat") + #reqheader = self.get_requested_header() + self.wfile.write("HTTP/1.0 200 OK\r\n") + self.wfile.write("Content-Type: text/plain\r\n") + self.wfile.write("\r\n") + self.wfile.write(wdata) + self.wfile.close() + f.close() + # gzip で書き直す + if os.path.exists(dp) and o2on_config.DatSaveAsGzip: + f = open(dp) + g = gzip.GzipFile(dp+".gz",'w') + g.write(f.read()) + g.close() + f.close() + os.remove(dp) + else: + logger.popup("PROXY", "no cached dat. query for the dat.\n%s" % self.datkey()) + self.wfile.write("HTTP/%d.%d %d %s\r\n" % + (r.version/10,r.version%10,r.status,r.reason)) + self.wfile.write(self.msg(r)) + self.wfile.write("\r\n") + self.wfile.write(r.read()) + self.wfile.close() + self.server.glob.datquery.add(self.datkey()) + +common_header = {} +def build_common_header(prof): + global common_header + if not prof.mynode.id: raise Exception("My ID is NULL") + if prof.mynode.port==None: raise Exception("My Port is NULL") + if not prof.mynode.name: prof.mynode.name="" + if not prof.mynode.pubkey: raise Exception("My pubkey is NULL") + common_header = {'Connection': "close", + 'X-O2-Node-ID': hexlify(prof.mynode.id), + 'X-O2-Port': str(prof.mynode.port), + 'X-O2-Node-Name': prof.mynode.name, + 'X-O2-Node-Flags':'--D', + 'Server': prof.mynode.ua, + 'X-O2-RSA-Public-Key': hexlify(prof.mynode.pubkey)} + +class P2PServerHandler(BaseHTTPServer.BaseHTTPRequestHandler): + regPath = re.compile(r'^(?:http://[^/]+)?/([^/]+)') + def do_dat(self, node): + logger = self.server.glob.logger + if self.command == 'POST' and self.headers.getheader('Content-Length'): + l = int(self.headers.getheader('Content-Length')) + logger.log("P2PSERVER", + "Client gave me omiyage dat %s" % hexlify(node.id)) + data = self.rfile.read(l) + daturl = self.headers.get('X-O2-Original-DAT-URL') + dat = o2on_dat.Dat() + if daturl: + if ".." in daturl: return self.response_400("DAT URL include ..") + if not dat.setpath(daturl): return self.response_400("invalid dat url") + else: + datpath = self.headers.get('X-O2-DAT-Path') + if datpath: + if ".." in datpath: return self.response_400("datpath include ..") + if not dat.setpath(datpath): return self.response_400("invalid datpath") + if not dat.save(data): + logger.log("P2PSERVER", + "I don't like this omiyage dat %s" % self.client_address[0]) + return self.response_400("invalid omiyage") + else: + self.server.glob.datdb.add_dat(dat) + # give dat + targetkey = self.headers.get('X-O2-Target-Key') + targetboard = self.headers.get('X-O2-Target-Board') + if targetkey: + dat = self.server.glob.datdb.get(targetkey) + elif targetboard: + dat = self.server.glob.datdb.getRandomInBoard(targetboard) + else: dat = self.server.glob.datdb.choice(targetboard) + if not dat: return self.response_404() + headers = common_header.copy() + data = dat.data() + headers['X-O2-DAT-Path'] = dat.path() + headers['Content-Type'] = 'text/plain; charset=shift_jis' + headers['Content-Length'] = str(len(data)) + self.wfile.write("HTTP/1.0 200 OK\r\n") + for h in headers: self.wfile.write("%s: %s\r\n" % (h,headers[h])) + self.wfile.write("\r\n") + self.wfile.write(data) + self.wfile.close() + def do_collection(self, node): + boards = o2on_config.DatCollectionBoardList + if boards == None: + boards = self.server.glob.allboards + data = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\r\n" + data += "<boards>\r\n" + for b in boards: + data += "<board>%s</board>\r\n" % b + data += "</boards>\r\n" + headers = common_header.copy() + headers['Content-Type'] = 'text/xml; charset=utf-8' + headers['Content-Length'] = str(len(data)) + self.wfile.write("HTTP/1.0 200 OK\r\n") + for h in headers: self.wfile.write("%s: %s\r\n" % (h, headers[h])) + self.wfile.write("\r\n") + self.wfile.write(data) + self.wfile.close() + self.server.glob.logger.log("P2PSERVER", "gave collection %s" % hexlify(node.id)) + if self.command == 'POST': + l = int(self.headers.getheader('Content-Length')) + data = self.rfile.read(l) + # o2on の bug 対策 + if data.rfind("</boards>") == -1: + index = data.rfind("<boards>") + data = data[:index] + "</boards>" + data[index+len("<boards>"):] + if len(data): + dom = xml.dom.minidom.parseString(data) + nn = dom.getElementsByTagName("boards") + result = [] + if len(nn): + for b in nn[0].getElementsByTagName("board"): + result.append(b.childNodes[0].data) + dom.unlink() + self.server.glob.logger.log("P2PSERVER", "got collection %s" % hexlify(node.id)) + self.server.glob.nodedb.reset_collection_for_node(node) + for b in result: self.server.glob.nodedb.add_collection(b,node) + def do_ping(self, node): + headers = common_header.copy() + headers['Content-Type'] = 'text/plain' + headers['Content-Length'] = "8" + self.wfile.write("HTTP/1.0 200 OK\r\n") + for h in headers: self.wfile.write("%s: %s\r\n" % (h,headers[h])) + self.wfile.write("\r\n") + self.wfile.write(ip2e(node.ip)) + self.wfile.close() + self.server.glob.logger.log("P2PSERVER", "respond to ping %s" % hexlify(node.id)) + def do_findnode(self, node): + target = self.headers.get('X-O2-Target-Key') + if not target: return self.response_400("no target key to findnode") + target = unhexlify(target) + neighbors = self.server.glob.nodedb.neighbors_nodes(target, True) + if neighbors: + data = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\r\n" + data += "<nodes>\r\n" + for node in neighbors: data += node.xml() + data += "</nodes>\r\n" + data = data.encode('utf-8') + headers = common_header.copy() + headers['Content-Type'] = 'text/xml; charset=utf-8' + headers['Content-Length'] = str(len(data)) + self.wfile.write("HTTP/1.0 200 OK\r\n") + for h in headers: self.wfile.write("%s: %s\r\n" % (h,headers[h])) + self.wfile.write("\r\n") + self.wfile.write(data) + self.wfile.close() + else: return self.response_404() + def do_store(self, node): + headers = common_header.copy() + self.wfile.write("HTTP/1.0 200 OK\r\n") + for h in headers: self.wfile.write("%s: %s\r\n" % (h,headers[h])) + self.wfile.write("\r\n") + self.wfile.close() + if self.headers.getheader('Content-Length'): + l = int(self.headers.getheader('Content-Length')) + category = self.headers.get('X-O2-Key-Category') + if not category: category = 'dat' + if category == 'dat': + data = self.rfile.read(l) + dom = xml.dom.minidom.parseString(data) + top = dom.getElementsByTagName("keys") + if len(top): + for k in top[0].getElementsByTagName("key"): + key = o2on_key.Key() + key.from_xml_node(k) + self.server.glob.keydb.add(key) + dom.unlink() + else: self.server.glob.logger.log("P2PSERVER","Unknown Category %s" % category) + def do_im(self, node): + headers = common_header.copy() + self.wfile.write("HTTP/1.0 200 OK\r\n") + for h in headers: self.wfile.write("%s: %s\r\n" % (h,headers[h])) + self.wfile.write("\r\n") + self.wfile.close() + if self.headers.getheader('Content-Length'): + l = int(self.headers.getheader('Content-Length')) + data = self.rfile.read(l) + dom = xml.dom.minidom.parseString(data) + top = dom.getElementsByTagName("messages") + if len(top): + for n in top[0].getElementsByTagName("message"): + im = o2on_im.IMessage() + im.from_xml_node(n) + im.date = int(time.time()) + self.server.glob.imdb.add(im) + self.server.glob.logger.popup("IM", "Received Message!") + self.server.glob.imdb.save() + dom.unlink() + def do_findvalue(self,node): + target = self.headers.get('X-O2-Target-Key') + if not target: return self.response_400("no target key to findvalue") + self.server.glob.logger.log("P2PSERVER", + "\tfindvalue from %s for %s" % (hexlify(node.id), target)) + target = unhexlify(target) + xml_data = None + key = self.server.glob.keydb.get(target) + if key: + xml_data = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\r\n" + xml_data += "<keys>\r\n" + xml_data += key.xml(self.server.glob) + xml_data += "</keys>\r\n" + else: + neighbors = self.server.glob.nodedb.neighbors_nodes(target, True) + if len(neighbors)>0: + xml_data = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\r\n" + xml_data += "<nodes>\r\n" + for node in neighbors: + if node.ip and node.port: + xml_data += "<node>\r\n" + xml_data += "<id>%s</id>\r\n" % hexlify(node.id) + xml_data += "<ip>%s</ip>\r\n" % ip2e(node.ip) + xml_data += "<port>%s</port>\r\n" % node.port + if node.name: + xml_data += "<name><![CDATA[%s]]></name>\r\n" % \ + node.name.decode('utf-8') + if node.pubkey: + xml_data += "<pubkey>%s</pubkey>\r\n" % hexlify(node.pubkey) + xml_data += "</node>\r\n" + xml_data += "</nodes>\r\n" + if xml_data: + xml_data = xml_data.encode('utf-8') + headers = common_header.copy() + headers['Content-Type'] = 'text/xml; charset=utf-8' + headers['Content-Length'] = str(len(xml_data)) + self.wfile.write("HTTP/1.0 200 OK\r\n") + for h in headers: self.wfile.write("%s: %s\r\n" % (h,headers[h])) + self.wfile.write("\r\n") + self.wfile.write(xml_data) + self.wfile.close() + else: return self.response_404() + job = {'dat':do_dat, + 'collection': do_collection, + 'ping': do_ping, + 'findnode':do_findnode, + 'store':do_store, + 'findvalue':do_findvalue, + 'im':do_im,} + def response_400(self, reason=""): + logger = self.server.glob.logger + logger.log("P2PSERVER", + "response 400 %s (%s)" % (self.client_address[0], reason)) + logger.log("P2PSERVER", "\tpath was %s" % self.path) + logger.log("P2PSERVER", "\theader was %s" % self.headers) + header = common_header.copy() + self.wfile.write("HTTP/1.0 400 Bad Request\r\n") + for h in header: self.wfile.write("%s: %s" % (h,header[h])) + self.wfile.write("\r\n") + self.wfile.close() + def response_404(self): + #print "p2p server response 404 %s" % self.client_address[0] + header = common_header.copy() + self.wfile.write("HTTP/1.0 404 Not Found\r\n") + for h in header: self.wfile.write("%s: %s" % (h,header[h])) + self.wfile.write("\r\n") + self.wfile.close() + def get_requested_header(self): + h = self.headers.headers + hr = {} + for x in h: + f = x.split(": ",1) + hr[f[0]] = f[1][:-2] + return hr + def do_POST(self): + self.do_GET() + def do_GET(self): + self.server.glob.logger.log("P2PSERVER", "connection came %s" % (self.path)) + + nid = self.headers.getheader('X-O2-Node-ID') + if not nid: return self.response_400("No NodeID") + port = self.headers.getheader('X-O2-Port') + if not port: return self.response_400("No Port") + port = int(port) + node = o2on_node.Node(unhexlify(nid), self.client_address[0], port) + + if not self.headers.getheader('X-O2-RSA-Public-Key'): + return self.response_400("No public key") + node.pubkey = unhexlify(self.headers.getheader('X-O2-RSA-Public-Key')) + + name = self.headers.getheader('X-O2-Node-Name') + if name: node.name = name.decode('utf-8').encode('utf-8') + flag = self.headers.getheader('X-O2-Node-Flags') + if flag: node.setflag(flag) + + ua = self.headers.getheader('User-Agent') + if not ua: return self.response_400("No UA") + if len(ua)<6: return self.response_403() + m = re.compile(r'O2/(\d+(?:\.\d+)?)').match(ua) + if not m: return self.response_403() + if float(m.group(1)) < ProtocolVer: return self.response_403() + node.ua = ua + + self.server.glob.nodedb.add_node(node) + m = self.regPath.match(self.path) + if m: + func = self.job.get(m.group(1)) + if func: return func(self, node) + return self.response_404() + + +class AdminServerHandler(BaseHTTPServer.BaseHTTPRequestHandler): + default = "status" + regPath = re.compile(r'^(?:http://[^/]+)?/([^/]+)(?:/?(.*?)/?)?$') + html_header = """\ +<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" + "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> +<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="ja" lang="ja" dir="ltr"> + <head> + <title>%s - Admin %s</title> + <style type="text/css"> +<!-- +#navlist +{ +padding: 3px 0; +margin-left: 0; +border-bottom: 1px solid #778; +font: bold 12px Verdana, sans-serif; +} + +#navlist li +{ +list-style: none; +margin: 0; +display: inline; +} + +#navlist li a +{ +padding: 3px 0.5em; +margin-left: 3px; +border: 1px solid #778; +border-bottom: none; +background: #DDE; +text-decoration: none; +} + +#navlist li a:link { color: #448; } +#navlist li a:visited { color: #667; } + +#navlist li a:hover +{ +color: #000; +background: #AAE; +border-color: #227; +} + +#navlist li a#current +{ +background: white; +border-bottom: 1px solid white; +} + +#mine +{ +background: blue; +} +--> + </style> + </head> +<body> +""" + html_footer = """\ +</body> +</html> +""" + pages = (("status", "状態"), + ("nodes", "ノード"), + ("keys", "datキー"), + ("dats", "所有dat"), + ("datq", "dat検索"), + ("im", "IM"), + ("shutdown", "シャットダウン"),) + def send_nav(self, cur): + self.wfile.write("<ul id=\"navlist\">\n") + for x in self.pages: + if x[0] == cur: + self.wfile.write( + "<li id=\"active\"><a href=\"/%s\" id=\"current\">%s</a></li>\n" % x) + else: + self.wfile.write("<li><a href=\"/%s\">%s</a></li>\n" % x) + self.wfile.write("</ul>\n") + def send_common(self, cur, curname): + self.send_response(200) + self.send_header('Content-Type', 'text/html; charset=utf-8') + self.send_header('Connection', 'close') + self.end_headers() + self.wfile.write(self.html_header % (AppName, curname)) + self.send_nav(cur) + def im_send(self,args): + if not re.compile(r'^[0-9a-f]{40}$').match(args[1]) or \ + not re.compile(r'^[0-9a-f]{8}$').match(args[2]) or \ + not re.compile(r'^\d+$').match(args[3]) or \ + not self.server.glob.prof.mynode.ip: + self.send_common("im", "Instant Messenger Send") + self.wfile.write("""\ +<div class='section'> + <h2 class='section_title'>IM送信エラー</h2> + <div class='section_body'> + <p>グローバルIPが確定していないか、送信先がおかしいです。</p> + </div> +</div> +""") + self.wfile.write(self.html_footer) + return + nodedb = self.server.glob.nodedb + nid = unhexlify(args[1]) + ip = e2ip(args[2]) + port = int(args[3]) + node = nodedb[nid] or o2on_node.Node(nid,ip,port) + if node.name: name = "%s (ID: %s)" % (node.name.decode('utf-8'), args[1]) + else: name = "ID: %s" % args[1] + l = self.headers.get('Content-Length') + + if self.command == "GET" or not l: + self.send_common("im", "Instant Messenger Send") + self.wfile.write(("""\ +<div class='section'> + <h2 class='section_title'>IM送信</h2> + <div class='section_body'> + <p>%sにIMを送信。</p> + <form action='/im/send/%s/%s/%s' method='POST'> + <input type='text' name='immsg' size='50' maxlength='256'/><br /> + <input type='submit' id='imsend' value='送信' /> + </form> + </div> +</div> +""".decode('utf-8') % (name, args[1], args[2], args[3])).encode('utf-8')) + self.wfile.write(self.html_footer) + else: + l = int(l) + data = self.rfile.read(l) + m=re.compile(r'^immsg=(.*)$').match(data) + data = urllib.unquote_plus(m.group(1)).decode('utf-8') + + result = "失敗" + try: + node.im(self.server.glob.prof.mynode, data) + except o2on_node.NodeRemovable: + nodedb.remove(node) + nodedb.save() + self.server.glob.keydb.remove_bynodeid(node.id) + self.server.glob.keydb.save() + except o2on_node.NodeRefused: + pass + else: + result = "成功" + nodedb.add_node(node) + nodedb.save() + im = o2on_im.IMessage() + im.from_node(self.server.glob.prof.mynode) + im.msg = data.encode('utf-8') + im.date = int(time.time()) + im.mine = True + self.server.glob.imdb.add(im) + self.server.glob.imdb.save() + self.send_common("im", "Instant Messenger Sent") + self.wfile.write(("""\ +<div class='section'> + <h2 class='section_title'>IMを送信しました。</h2> + <div class='section_body'> + <p>%sに以下のIMを送信し、%sしました。</p> + <p>%s</p> + <p><a href='/im'>もどる</a></p> + </div> +</div> +""".decode('utf-8') % (name,result.decode('utf-8'),data)).encode('utf-8')) + self.wfile.write(self.html_footer) + def im(self,args): + if len(args)==4 and args[0] == "send": return self.im_send(args) + imdb = self.server.glob.imdb + self.send_common("im", "Instant Messenger") + self.wfile.write("""\ +<div class='section'> + <h2 class='section_title'>IM</h2> + <div class='section_body'> + <table> + <tr><th>日時</th><th>名前</th><th>メッセージ</th></tr> +""") + for x in imdb.im_list(): + if x[0]: + self.wfile.write(("<tr class=\"mine\"><td>%s</td><td>"\ + "<a href='/im/send/%s/%s/%d'>%s</a></td>"\ + "<td>%s</td></tr>" % x[1:]).encode('utf-8')) + else: + self.wfile.write(("<tr class=\"other\"><td>%s</td><td>"\ + "<a href='/im/send/%s/%s/%d'>%s</a></td>"\ + "<td>%s</td></tr>" % x[1:]).encode('utf-8')) + self.wfile.write("""\ + </table> + </div> +</div> +""") + self.wfile.write(self.html_footer) + def keys(self,args): + keydb = self.server.glob.keydb + self.send_common("keys", "Key") + self.wfile.write("""\ +<div class='section'> + <h2 class='section_title'>キー情報</h2> + <div class='section_body'> + <p>キー数 %d</p> + <table> + <tr> + <th>d</th><th>IP</th><th>Port</th><th>URL</th><th>title</th><th>note</th><th>date</th> + <th>size</th><th>hash</th> + </tr> +""" % (len(keydb))) + for x in keydb.key_list(): + self.wfile.write(("""\ +<tr> + <td>%d</td><td>%s</td><td>%d</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td> + <td>%d</td><td>%s</td>""" % x).encode('utf-8')) + self.wfile.write("""\ + </table> + </div> +</div> +""") + self.wfile.write(self.html_footer) + def nodes(self,args): + nodedb = self.server.glob.nodedb + self.send_common("nodes", "Nodes") + self.wfile.write("""\ +<div class='section'> + <h2 class='section_title'>ノード情報</h2> + <div class='section_body'> + <p>ノード数 %d</p> + <table> + <tr><th>d</th><th>Name</th><th>flg</th><th>IP</th><th>Port</th><th>UA</th><th>ID</th></tr> +""" % (len(nodedb))) + for x in nodedb.node_list(): + self.wfile.write( + ("<tr><td>%d</td><td><a href='/im/send/%s/%s/%d'>%s</a></td><td>%s</td>"\ + "<td>%s</td><td>%d</td><td>%s</td><td>%s</td></tr>" % x).encode('utf-8')) + self.wfile.write("""\ + </table> + </div> +</div> +""") + self.wfile.write(self.html_footer) + def status(self,args): + self.send_common("status", "Status Summary") + glob = self.server.glob + prof = glob.prof + if prof.mynode.ip: + ip = "%s (%s)" % (prof.mynode.ip, ip2e(prof.mynode.ip)) + else: + ip = "未取得" + name = prof.mynode.name or "なし" + if prof.mynode.ip: + nodehash = hexlify(glob.prof.mynode.id)+ip2e(glob.prof.mynode.ip)+\ + port2e(glob.prof.mynode.port) + else: nodehash = "IP未取得" + self.wfile.write("""\ +<p class='section'> + <h2 class='section_title'>自ノード情報</h2> + <div class='section_body'> + <table> + <tr><th>ID</th><th>IP</th><th>ポート</th><th>ノード名</th><th>UA</th><th>ハッシュ</th></tr> + <tr><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td><td>%s</td></tr> + </table> + </div> +</p> +""" % (hexlify(prof.mynode.id), ip, prof.mynode.port, name, prof.mynode.ua, nodehash)) + self.wfile.write("""\ +<p class='section'> + <h2 class='section_title'>概要</h2> + <div class='section_body'> + <table> + <tr> + <th><a href='/nodes'>ノード数</a></th> + <th><a href='/dats'>dat数</a></th> + <th><a href='/keys'>datキー数</a></th> + <th><a href='/datq'>検索中dat数</a></th></tr> + <tr><td>%d</td><td>%d</td><td>%d</td><td>%d</td></tr> + </table> + </div> +</p> +""" % (len(glob.nodedb), len(glob.datdb), len(glob.keydb), len(glob.datquery))) + self.wfile.write(self.html_footer) + def shutdown(self, args): + if len(args) == 1 and args[0] == "really": + self.send_common("shutdown", "Shutdown") + self.wfile.write("""\ +<p class='section'> + <h2 class='section_title'>シャットダウン</h2> + <div class='section_body'> + <p>opy2onにシャットダウンコマンドを送信しました。</p> + </div> +</p> +""") + self.wfile.write(self.html_footer) + self.server.glob.shutdown.set() + else: + self.send_common("shutdown", "Shutdown") + self.wfile.write("""\ +<p class='section'> + <h2 class='section_title'>シャットダウン</h2> + <div class='section_body'> + <p>opy2onをシャットダウンしますか?</p> + <p><a href="/shutdown/really">はい</a> / <a href="/">いいえ</a></p> + </div> +</p> +""") + self.wfile.write(self.html_footer) + def do_POST(self): + self.do_GET() + def do_GET(self): + m = self.regPath.match(self.path) + if m: + path = m.group(1) + if m.group(2) != "": args = m.group(2).split("/") + else: args = [] + else: + path = self.default + args = [] + if not hasattr(self, path): + self.send_error(404) + return + method = getattr(self, path) + method(args) + # BaseHTTPServer の log を抑制 + def log_message(self, format, *args): + pass Added: trunk/opy2on/lib/o2on_util.py =================================================================== --- trunk/opy2on/lib/o2on_util.py (rev 0) +++ trunk/opy2on/lib/o2on_util.py 2009-08-07 19:13:53 UTC (rev 142) @@ -0,0 +1,109 @@ +#!/usr/bin/python + +import random +from struct import pack +import threading +import hashlib +import re +import os.path +import cPickle + +import o2on_config +from o2on_const import DatQueryFile, KeyQueryFile, AppName + +if o2on_config.UseDBus: import dbus + +def randomid(): + return "".join(map(lambda x: pack("B",random.randint(0,0xff)),range(0,20))) + +def datkeyhash(key): + return hashlib.sha1(key.encode('utf_16_le')).digest() +def datfullboard(key): + m=re.compile(r'^([^/]+)/([^/]+)').match(key) + return m.group(1)+":"+m.group(2) + +class Logger: + def __init__(self): + self.lock = threading.RLock() + self.bus = None + def begin(self): self.lock.acquire() + def end(self): self.lock.release() + def log(self, categ, s): + if o2on_config.NoLog: return + categ = categ[:10] + categ += " " * (10-len(categ)) + if isinstance(s, str): pass #s=s.encode('utf-8','replace') + elif isinstance(s, unicode): pass + else: s = str(s) + with self.lock: + for l in map(lambda x: "[%s] %s" % (categ, s), s.split("\n")): + print l + def popup(self, categ, s): + if o2on_config.UseDBus: + if not self.bus: self.bus = dbus.SessionBus() + obj = self.bus.get_object("org.freedesktop.Notifications", + "/org/freedesktop/Notifications") + obj.Notify(AppName, 0, '', AppName+" "+categ,s, [], {}, -1, + dbus_interface="org.freedesktop.Notifications") + else: + self.log(categ, s) + +def hash_xor_bitlength(a,b): + if len(a) != 20 or len(b) != 20: raise Exception + for i in range(len(a)-1,-1,-1): + xored = ord(a[i]) ^ ord(b[i]) + for j in range(7,-1,-1): + if (xored & (1<<j)): + return i*8+j+1 + return 0 + +def xml_collecting_boards(glob): + boards = o2on_config.DatCollectionBoardList or glob.allboards + board_xml = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\r\n" + board_xml += "<boards>\r\n" + for b in boards: board_xml += "<board>%s</board>\r\n" % b + board_xml += "</boards>\r\n" + return board_xml + +class Query: + def __init__(self, filename): + self.lock = threading.Lock() + self.file = filename + with self.lock: + self.list = [] + self.load() + with self.lock: + self.semap = threading.Semaphore(len(self.list)) + def __len__(self): + with self.lock: return len(self.list) + def __str__(self): + with self.lock: + return "\n".join(map(str, self.list)) + def save(self): + pkl_file = open(self.file,"wb") + with self.lock: + cPickle.dump(self.list, pkl_file,-1) + pkl_file.close() + def load(self): + if(os.path.isfile(self.file)): + pkl_file = open(self.file,"rb") + with self.lock: + self.list = cPickle.load(pkl_file) + pkl_file.close() + def add(self,x): + with self.lock: + if not x in self.list: + self.list.append(x) + self.semap.release() + def pop(self): + self.semap.acquire() + with self.lock: + if len(self.list): + return self.list.pop(0) + +class DatQuery(Query): + def __init__(self): + Query.__init__(self, DatQueryFile) +class KeyQuery(Query): + def __init__(self): + Query.__init__(self, KeyQueryFile) Added: trunk/opy2on/o2on_config.py.sample =================================================================== --- trunk/opy2on/o2on_config.py.sample (rev 0) +++ trunk/opy2on/o2on_config.py.sample 2009-08-07 19:13:53 UTC (rev 142) @@ -0,0 +1,70 @@ +#!/usr/bin/python +# -*- coding: utf-8 + +# P2Pサーバのポート +P2PPort = 0 + +# プロキシのポート +ProxyPort = 8000 + +# Adminサーバのポート +AdminPort = 9999 + +# ノードをインポートするURL http, ftp, file などが使える +Node_List_URLs = ["http://o2on.jf.land.to/node/node.xml", + "http://o2on.sourceforge.jp/nodes/"] + +# dat保存ディレトクリ +DatDir = "dat" + +# 集めるdatのリスト None にしておくと全て +DatCollectionBoardList = [] +# DatCollectionBoardList = None +# DatCollectionBoardList = ["2ch.net:tech", "2ch.net:unix", "2ch.net:linux"] + +# dat を gzip で保存するかどうか +DatSaveAsGzip = False +# DatSaveAsGzip = True + +# 接続タイムアウト (単位: 秒) +SocketTimeout = 20 + +# このノードの名前 今のところ日本語(ascii以外)はうまく動かない。 +NodeName = "" + +# 持ってない dat がプロキシにリクエストされたら取得しておくかどうか +RequestNonExistDat = False +# RequestNonExistDat = True + +# 2channel.brd のパス +Path2channel_brd = "2channel.brd" + +# ping 後にふたたびpingを行なう時間 (単位: 秒) +RePingSec = 300 + +# メッセージの通知にD-Busを使うかどうか +# True にしておくといくつかのメッセージがpopupされる +UseDBus = False +# UseDBus = True + +# ログを標準出力に出力するかどうか +NoLog = False +# NoLog = True + +############################ +# 以下は基本的にデバッグ用 # +############################ + +# pingでローカルなIPアドレスを報告されても無視する +IgnoreLocalIP = True + +# プロファイルを取得する。どこで時間をとっているのかを計測します。 +RecordProfile = False + +# プロファイルの出力先 +ProfileDir = "profile" + +# エラーが起きた時に error-<時間>.txt にエラーメッセージを出力するかど +# うか。 +OutputErrorFile = False +# OutputError = True Added: trunk/opy2on/opy2on.py =================================================================== --- trunk/opy2on/opy2on.py (rev 0) +++ trunk/opy2on/opy2on.py 2009-08-07 19:13:53 UTC (rev 142) @@ -0,0 +1,200 @@ +#!/usr/bin/python +# -*- coding: utf-8 + +from binascii import hexlify +import socket +import sys +import os.path +import os +import re +import traceback +import time +import threading + +sys.path.append("lib") + +import o2on_profile +import o2on_node +import o2on_util +import o2on_config +import o2on_server +import o2on_dat +import o2on_const +import o2on_key +import o2on_job +import o2on_im + +def showstat(args): + glob.logger.begin() + glob.logger.log("GLOBAL", "nodes %d" % len(glob.nodedb)) + glob.logger.log("GLOBAL", "datquery %d" % len(glob.datquery)) + glob.logger.log("GLOBAL", "dat %d" % len(glob.datdb)) + glob.logger.log("GLOBAL", "key %d" % len(glob.keydb)) + glob.logger.end() + +def showmynode(args): + if glob.prof.mynode.ip: + glob.logger.log("GLOBAL", + "my node is %s%s%s" % (hexlify(glob.prof.mynode.id), + o2on_node.ip2e(glob.prof.mynode.ip), + o2on_node.port2e(glob.prof.mynode.port))) + else: + glob.logger.log("GLOBAL", "Didn't get global IP") + +def read_2channel_brd(): + res = [] + regBoard = re.compile(r'^\s+[^.]+\.'+o2on_const.regHosts+r'\s+([a-z0-9]+)\s') + if os.path.isfile(o2on_config.Path2channel_brd): + f = open(o2on_config.Path2channel_brd) + while True: + line = f.readline() + if line == '':break + m = regBoard.match(line) + if m: + res.append(m.group(1)+":"+m.group(2)) + f.close() + return res + +def show_myid(x): + glob.logger.log("GLOBAL", "my ID is %s" % hexlify(glob.prof.mynode.id)) + +def show_mypubkey(x): + glob.logger.log("GLOBAL", "my pubkey is %s" % hexlify(glob.prof.mynode.pubkey)) + +def exportnode(x): + glob.nodedb.exportnode() + +def showdatquery(x): + glob.logger.begin() + glob.logger.log("GLOBAL", "-"*80) + for x in str(glob.datquery).split("\n"): + glob.logger.log("GLOBAL", x) + glob.logger.log("GLOBAL", "-"*80) + glob.logger.end() + +def showhelp(x): + glob.logger.begin() + glob.logger.log("GLOBAL", "datq: show searching dat") + glob.logger.log("GLOBAL", "exit: finish program") + glob.logger.log("GLOBAL", "exportnode: export nodes to 'exportnodes.xml'") + glob.logger.log("GLOBAL", "help: show this help") + glob.logger.log("GLOBAL", "keys: show keys") + glob.logger.log("GLOBAL", "myid: show my ID") + glob.logger.log("GLOBAL", "mynode: show my node info") + glob.logger.log("GLOBAL", "mypubkey: show my RSA public key") + glob.logger.log("GLOBAL", "nodes: show nodes") + glob.logger.log("GLOBAL", + "stat: show the numbers of nodes, searching dats, owning dats, keys") + glob.logger.end() + +def readcommand(glob): + try: + regexit = re.compile(r'^exit\s*$') + while True: + foo = sys.stdin.readline() + if regexit.match(foo): break + if foo == '': break + args = re.compile("\s+").split(foo) + procd = False + for c in commands.keys(): + if c == args[0]: + commands[c](args[1:]) + procd = True + break + if not procd: glob.logger.log("GLOBAL", "No such command: %s" % args[0]) + except Exception,inst: + if o2on_config.OutputErrorFile: + f = open('error-'+str(int(time.time()))+'.txt', 'w') + traceback.print_exc(file=f) + f.close() + self.glob.logger.popup("ERROR", str(inst)) + glob.shutdown.set() + +class dammy: pass + +#socket.setdefaulttimeout(o2on_config.SocketTimeout) + +if not os.path.exists(o2on_const.DBDir): + os.makedirs(o2on_const.DBDir) + +glob = dammy() +glob.logger = o2on_util.Logger() +glob.prof = o2on_profile.Profile(glob.logger) + +glob.nodedb = o2on_node.NodeDB(glob) +glob.datdb = o2on_dat.DatDB(glob) +glob.keydb = o2on_key.KeyDB(glob) +glob.imdb = o2on_im.IMDB(glob) + +glob.datquery = o2on_util.DatQuery() +glob.keyquery = o2on_util.KeyQuery() +glob.allboards = read_2channel_brd() + +glob.shutdown = threading.Event() + +o2on_node.build_common_header(glob.prof) +o2on_server.build_common_header(glob.prof) + +th = threading.Thread(target=readcommand, args=(glob,)) +th.setDaemon(True) +th.start() + +show_myid(None) +show_mypubkey(None) + +jobs = ( + o2on_job.GetIPThread(glob), + o2on_job.ProxyServerThread(glob), + o2on_job.AdminServerThread(glob), + o2on_job.NodeCollectorThread(glob), + o2on_job.DatCollectorThread(glob), + o2on_job.AskNodeCollectionThread(glob), + o2on_job.PublishOrigThread(glob), + o2on_job.PublishKeyThread(glob), + o2on_job.SearchThread(glob), + o2on_job.DatQueryThread(glob), + o2on_job.P2PServerThread(glob), + ) + +for j in jobs: j.start() + +commands = { + "datq": showdatquery, + 'exportnode': exportnode, + "help": showhelp, + "keys": (lambda x: glob.keydb.show()), + "myid": show_myid, + "mynode" : showmynode, + "mypubkey": show_mypubkey, + "nodes": (lambda x: glob.nodedb.show()), + "stat": showstat, +} + +try: + glob.shutdown.wait() +except KeyboardInterrupt: + pass +except Exception,inst: + if o2on_config.OutputErrorFile: + f = open('error-'+str(int(time.time()))+'.txt', 'w') + traceback.print_exc(file=f) + f.close() + glob.logger.popup("ERROR", str(inst)) + +glob.logger.log("GLOBAL", "Finish Jobs") +for j in jobs: j.stop() +glob.logger.popup("GLOBAL", "Waiting for Jobs to stop") +n = len(jobs) +c = 0 +for j in jobs: + j.join(1) + while j.isAlive(): + glob.logger.popup("GLOBAL", "Waiting for %s" % j.name) + j.join(7) + c += 1 + glob.logger.log("GLOBAL", "Finished %d/%d" % (c, n)) +glob.imdb.save() +glob.keydb.save() +glob.datdb.save() +glob.nodedb.save() +glob.logger.popup("GLOBAL", "Finished Completely") Property changes on: trunk/opy2on/opy2on.py ___________________________________________________________________ Added: svn:executable + *