[Checkins] SVN: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py Use multiprocessing to run replay clients in separate processes to
Jim Fulton
jim at zope.com
Tue Jan 12 10:03:19 EST 2010
Log message for revision 108062:
Use multiprocessing to run replay clients in separate processes to
overcome GIL.
Changed:
U zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py
-=-
Modified: zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py
===================================================================
--- zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py 2010-01-12 14:15:00 UTC (rev 108061)
+++ zc.zeoinputlog/branches/replay/src/zc/zeoinputlog/replay.py 2010-01-12 15:03:19 UTC (rev 108062)
@@ -50,6 +50,7 @@
import cPickle
import logging
import marshal
+import multiprocessing
import os
import sys
import threading
@@ -65,6 +66,8 @@
import ZODB.TimeStamp
import ZODB.utils
+sys.setcheckinterval(999)
+
logging.basicConfig()
def time_stamp(timetime):
@@ -270,23 +273,28 @@
closed = 0
queueing = True
- def __init__(self, addr, session=None, handlers=None):
- if handlers is None:
- handlers = Handlers()
- self.handlers = handlers
+ def __init__(self, addr, session, inq, outq):
self.session = session
self.addr = addr
self.event = threading.Event()
self.queue = []
self.messages = {}
- self.times = handlers.times
- self.active = handlers.active
- self.errtimes = handlers.errtimes
+ def output(op, *args):
+ outq.put((op, args))
+ self.output = output
+ self.outq = outq
self.lock = threading.Lock()
- zc.ngi.async.connect(addr, self)
+ self.ngi = zc.ngi.async.SelectImplementation()
+ self.ngi.connect(addr, self)
+ self.event.wait()
+ while 1:
+ callargs = inq.get()
+ if callargs == 'stop':
+ break
+ self.call(*callargs)
def connected(self, connection):
- self.handlers.connected += 1
+ self.output('connect')
self.protocol = None
self.connection = zc.ngi.adapters.Sized(connection)
self.connection.setHandler(self)
@@ -303,12 +311,11 @@
messages = sorted((v[2], v[0], v[1]) for v in self.messages.values())
print time.ctime(), self.session, [v[:2] for v in messages]
redo = [(0, v[1], v[2]) for v in messages
- if v[1] in ('sendBlob', 'loadEx')]
+ if v[1] in ('sendBlob', 'loadEx', 'loadBefore')]
self.queue.extend(redo)
- self.handlers.abandoned += len(messages) - len(redo)
+ self.output('disconnect', len(messages) - len(redo))
self.messages.clear()
- zc.ngi.async.connect(self.addr, self)
- self.handlers.connected -= 1
+ self.ngi.connect(self.addr, self)
def stop_queueing(self):
assert self.queueing
@@ -325,8 +332,6 @@
self.event.set()
return
- self.handlers.inputs += 1
-
try:
msgid, flags, op, args = cPickle.loads(message)
except:
@@ -334,27 +339,11 @@
traceback.print_exception(*sys.exc_info())
return
- #print ' got', self.session, msgid, flags, op
if (op == '.reply'):
ret = args
op, args, start = self.messages.pop(msgid)
elapsed = time.time()-start
- #print ' reply', op, [
- # (v[0], v[2]) for v in self.messages.values()
- # ], elapsed
- self.handlers.replies += 1
- self.active.remove(self.session)
- if (isinstance(ret, tuple)
- and len(ret) == 2
- and isinstance(ret[1], Exception)
- ):
- n, t = self.errtimes.get(op, zz)
- self.errtimes[op] = n+1, t+elapsed
- print ' OOPS', op, args, elapsed, ret[0].__name__, ret[1]
- self.handlers.errors += 1
- else:
- n, t = self.times.get(op, zz)
- self.times[op] = n+1, t+elapsed
+ self.output('reply', op, args, ret, elapsed)
self.stop_queueing()
@@ -376,24 +365,72 @@
if not async:
#print ' prev out', self.session, [
# (v[0], v[2]) for v in self.messages.values()]
+ self.queueing = True
self.messages[self.msgid] = op, args, time.time()
- self.queueing = True
- self.active.add(self.session)
self.connection.write(cPickle.dumps((self.msgid, async, op, args)))
+ if not async:
+ self.output('request', op, args)
+
zz = 0, 0
class Handlers:
- connected = async = calls = inputs = replies = errors = pending = 0
- abandoned = 0
+ async = abandoned = 0
- def __init__(self):
+ def __init__(self, disconnected):
self.errtimes = {}
self.times = {}
- self.active = set()
+ self.disconnected = disconnected
+ self.connected = self.active = self.calls = self.replies = 0
+ self.errors = 0
+ self.event = threading.Event()
+ def __repr__(self):
+ return ("%(connected)s %(disconnected)s %(active)s"
+ " %(calls)s %(replies)s %(errors)s"
+ % self.__dict__)
+
+ def run(self, queue):
+ while 1:
+ got = queue.get()
+ op, args = got
+ getattr(self, op)(*args)
+
+ def connect(self):
+ self.disconnected -= 1
+ if not self.disconnected:
+ self.event.set()
+ self.connected += 1
+
+ def disconnect(self, abandoned):
+ self.disconnected += 1
+ self.connected -= 1
+ self.abandoned += abandoned
+ self.active -= abandoned
+
+ def request(self, op, args):
+ self.active += 1
+ self.calls += 1
+
+ def reply(self, op, args, ret, elapsed):
+ self.active -= 1
+ self.replies += 1
+ if (isinstance(ret, tuple)
+ and len(ret) == 2
+ and isinstance(ret[1], Exception)
+ ):
+ n, t = self.errtimes.get(op, zz)
+ self.errtimes[op] = n+1, t+elapsed
+ print ' OOPS', op, args, elapsed, ret[0].__name__, ret[1]
+ self.errors += 1
+ else:
+ n, t = self.times.get(op, zz)
+ self.times[op] = n+1, t+elapsed
+
+
+
def parse_addr(addr):
addr = addr.split(':')
return addr[0], int(addr[1])
@@ -439,22 +476,29 @@
log = Log(log)
- handlers = Handlers()
-
# Set up the client connections
sessions = {}
nhandlers = 0
+ handlers_queue = multiprocessing.Queue()
for session, timetime, msgid, async, op, args in log:
if session not in sessions:
- handler = Handler(addr, nhandlers, handlers)
- sessions[session] = handler
+ handler_queue = multiprocessing.Queue()
+ process = multiprocessing.Process(
+ target = Handler,
+ args = (addr, nhandlers, handler_queue, handlers_queue),
+ ).start()
+ sessions[session] = handler_queue
nhandlers += 1
- for handler in sessions.values():
- handler.event.wait(10)
- if not handler.event.is_set():
- raise ValueError("Couldn't connect.")
+ handlers = Handlers(len(sessions))
+ thread = threading.Thread(target=handlers.run, args=(handlers_queue, ))
+ thread.setDaemon(True)
+ thread.start()
+ handlers.event.wait(10)
+ if not handlers.event.is_set():
+ raise ValueError("Couldn't connect.", handlers)
+
# Now, we're ready to replay.
cs = ZEO.ClientStorage.ClientStorage(addr)
@@ -469,10 +513,9 @@
for t in Transactions(source):
sys.stdout.flush()
- pending = handlers.calls - handlers.replies - handlers.abandoned
- while pending > 10000:
+ pending = handlers.active - handlers.abandoned
+ while handlers.active > 10000:
time.sleep(.01)
- pending = handlers.calls - handlers.replies - handlers.abandoned
if nt and (nt%1000 == 0):
last_times = print_times(last_times, handlers.times,
@@ -490,21 +533,17 @@
lasttt = tt
work = nt + nr + handlers.calls + handlers.async
print nt, time.strftime('%H:%M:%S', time.localtime(time.time())),
- print ZODB.TimeStamp.TimeStamp(time_stamp(tt)),
- print handlers.connected, len(handlers.active), handlers.calls,
- print handlers.replies,
- print handlers.errors, handlers.async, pending, speed, speed1
+ print ZODB.TimeStamp.TimeStamp(time_stamp(tt)), handlers,
+ print speed, speed1
while logrecord[1] < tt:
ni += 1
session, _, _, async, op, args = logrecord
logrecord = logiter.next()
if op in ('getAuthProtocol', 'register', 'tpc_finish'):
continue
- sessions[session].call(async, op, args)
+ sessions[session].put((async, op, args))
if async:
handlers.async += 1
- else:
- handlers.calls += 1
#print op, args
if op == 'vote':
handlers.async += 1
More information about the checkins
mailing list