allura
修訂 | 4a3e4835d96aeb1f2fd8d8478f1e4ec3b6db8baf (tree) |
---|---|
時間 | 2010-04-24 06:01:55 |
作者 | Rick Copeland <rcopeland@geek...> |
Commiter | Rick Copeland |
[#266] - Make reactor processes auto-restart on unhandled exceptions
This commit includes
@@ -10,6 +10,25 @@ from carrot.messaging import Consumer, ConsumerSet | ||
10 | 10 | |
11 | 11 | from . import base |
12 | 12 | |
13 | +class RestartableProcess(object): | |
14 | + | |
15 | + def __init__(self, log, *args, **kwargs): | |
16 | + self._log = log | |
17 | + self._args, self._kwargs = args, kwargs | |
18 | + self.reinit() | |
19 | + | |
20 | + def reinit(self): | |
21 | + self._process = Process(*self._args, **self._kwargs) | |
22 | + | |
23 | + def check(self): | |
24 | + if not self.is_alive(): | |
25 | + self._log.error('Process %d has died, restarting', self.pid) | |
26 | + self.reinit() | |
27 | + self.start() | |
28 | + | |
29 | + def __getattr__(self, name): | |
30 | + return getattr(self._process, name) | |
31 | + | |
13 | 32 | class ReactorSetupCommand(base.Command): |
14 | 33 | |
15 | 34 | summary = 'Configure the RabbitMQ queues and bindings for the given set of plugins' |
@@ -63,14 +82,14 @@ class ReactorCommand(base.Command): | ||
63 | 82 | |
64 | 83 | def command(self): |
65 | 84 | self.basic_setup() |
66 | - processes = [ Process(target=self.periodic_main, args=()) ] | |
85 | + processes = [ RestartableProcess(target=self.periodic_main, args=()) ] | |
67 | 86 | configs = [ |
68 | 87 | dict(plugin_name=name, |
69 | 88 | method=method, xn=xn, qn=qn, keys=keys) |
70 | 89 | for name, plugin in self.plugins |
71 | 90 | for method, xn, qn, keys in plugin_consumers(name, plugin) ] |
72 | 91 | for x in xrange(self.options.proc): |
73 | - processes.append(Process(target=self.multi_worker_main, | |
92 | + processes.append(RestartableProcess(target=self.multi_worker_main, | |
74 | 93 | args=(configs,))) |
75 | 94 | continue |
76 | 95 | if self.options.dry_run: return configs |
@@ -82,7 +101,9 @@ class ReactorCommand(base.Command): | ||
82 | 101 | for p in processes: |
83 | 102 | p.start() |
84 | 103 | while True: |
85 | - time.sleep(300) | |
104 | + for x in xrange(60): | |
105 | + time.sleep(5) | |
106 | + for p in processes: p.check() | |
86 | 107 | base.log.info('=== Mark ===') |
87 | 108 | |
88 | 109 | def multi_worker_main(self, configs): |
@@ -98,6 +119,7 @@ class ReactorCommand(base.Command): | ||
98 | 119 | cset.add_consumer(c) |
99 | 120 | if self.options.dry_run: return |
100 | 121 | else: # pragma no cover |
122 | + base.log.info('Ready to handle messages') | |
101 | 123 | for x in cset.iterconsume(): |
102 | 124 | pass |
103 | 125 |
@@ -252,5 +274,3 @@ def debug(): # pragma no cover | ||
252 | 274 | p.setup(sys._getframe(), None) |
253 | 275 | p.cmdloop() |
254 | 276 | p.forget() |
255 | - | |
256 | - |