[ZODB-Dev] ZEO, FileStorage, and Undo
Jeremy Hylton
jeremy@alum.mit.edu
Thu, 22 Aug 2002 12:13:02 -0400
Here's a patch.
Jeremy
Index: ZEO/StorageServer.py
===================================================================
RCS file: /cvs-repository/ZODB3/ZEO/StorageServer.py,v
retrieving revision 1.44
diff -c -r1.44 StorageServer.py
*** ZEO/StorageServer.py 12 Aug 2002 19:42:40 -0000 1.44
--- ZEO/StorageServer.py 22 Aug 2002 16:14:06 -0000
***************
*** 281,286 ****
--- 281,302 ----
n = 1
return [self.__storage.new_oid() for i in range(n)]
+ # Depending on the search criteria, undoLog() and undoInfo() can
+ # spend a very long time searching through the storage for
+ # undoable transactions. To avoid locking up the server, run this
+ # in a separate thread just like pack().
+
+ def undoInfo(self, first, last, spec):
+ t = SlowMethodThread(self.__storage.undoInfo, first, last, spec)
+ t.start()
+ return t.delay
+
+ def undoLog(self, first, last):
+ delay = MTDelay()
+ t = SlowMethodThread(self.__storage.UndoLog, first, last)
+ t.start()
+ return t.delay
+
def undo(self, transaction_id):
oids = self.__storage.undo(transaction_id)
if oids:
***************
*** 582,584 ****
--- 598,624 ----
new_strategy.store(oid, serial, data, version)
meth = getattr(new_strategy, self.name)
return meth(*self.args)
+
+ class SlowMethodThread(threading. Thread):
+ """Thread to run potentially slow storage methods.
+
+ Clients can use the t.delay attribute to access the MTDelay object
+ used to send a zrpc response at the right time.
+ """
+
+ # Some storage methods can take a long time to complete. If we
+ # run these methods via a standard asyncore read handler, they
+ # will block all other server activity until they complete. To
+ # avoid blocking, we spawn a separate thread, return an MTDelay()
+ # object, and have the thread reply() when it finishes.
+
+ def __init__(self, method, *args):
+ threading.Thread.__init__(self)
+ self._method = method
+ self._args = args
+ self.delay = MTDelay()
+
+ def run(self):
+ # XXX What happens if this raises an exception?
+ result = self._method(*self._args)
+ self._delay.reply(result)
Index: ZODB/FileStorage.py
===================================================================
RCS file: /cvs-repository/ZODB3/ZODB/FileStorage.py,v
retrieving revision 1.95
diff -c -r1.95 FileStorage.py
*** ZODB/FileStorage.py 14 Aug 2002 22:07:09 -0000 1.95
--- ZODB/FileStorage.py 22 Aug 2002 16:14:06 -0000
***************
*** 1070,1117 ****
def undoLog(self, first=0, last=-20, filter=None):
if last < 0:
last = first - last + 1
! self._lock_acquire()
! try:
! if self._packt is None:
! raise UndoError(
! 'Undo is currently disabled for database maintenance.<p>')
! pos = self._pos
! r = []
! i = 0
! # BAW: Why 39 please? This makes no sense (see also below).
! while i < last and pos > 39:
! self._file.seek(pos - 8)
! pos = pos - U64(self._file.read(8)) - 8
! self._file.seek(pos)
! h = self._file.read(TRANS_HDR_LEN)
! tid, tl, status, ul, dl, el = struct.unpack(">8s8scHHH", h)
! if tid < self._packt or status == 'p':
break
! if status != ' ':
! continue
! d = u = ''
! if ul:
! u = self._file.read(ul)
! if dl:
! d = self._file.read(dl)
! e = {}
! if el:
! try:
! e = loads(read(el))
! except:
! pass
! d = {'id': base64.encodestring(tid).rstrip(),
! 'time': TimeStamp(tid).timeTime(),
! 'user_name': u,
! 'description': d}
! d.update(e)
! if filter is None or filter(d):
! if i >= first:
! r.append(d)
! i += 1
! return r
! finally:
! self._lock_release()
def transactionalUndo(self, transaction_id, transaction):
"""Undo a transaction, given by transaction_id.
--- 1070,1124 ----
def undoLog(self, first=0, last=-20, filter=None):
if last < 0:
last = first - last + 1
! if self._packt is None:
! raise UndoError(
! 'Undo is currently disabled for database maintenance.<p>')
! pos = self._pos
! r = []
! i = 0
! # BAW: Why 39 please? This makes no sense (see also below).
! while i < last and pos > 39:
! self._lock_acquire()
! try:
! i, pos = self.undoFindNext(pos, r, first, i, filter)
! if i is None:
break
! finally:
! self._lock_release()
! return r
!
! def undoFindNext(self, pos, result, first, i, filter):
! self._file.seek(pos - 8)
! pos = pos - U64(self._file.read(8)) - 8
! self._file.seek(pos)
! h = self._file.read(TRANS_HDR_LEN)
! tid, tl, status, ul, dl, el = struct.unpack(">8s8scHHH", h)
! if tid < self._packt or status == 'p':
! return None, None
! if status != ' ':
! return i, pos
! d = u = ''
! if ul:
! u = self._file.read(ul)
! if dl:
! d = self._file.read(dl)
! e = {}
! if el:
! try:
! e = loads(self._file.read(el))
! except:
! pass
! d = {'id': base64.encodestring(tid).rstrip(),
! 'time': TimeStamp(tid).timeTime(),
! 'user_name': u,
! 'description': d}
! d.update(e)
! if filter is None or filter(d):
! if i >= first:
! result.append(d)
! i += 1
! return i, pos
!
def transactionalUndo(self, transaction_id, transaction):
"""Undo a transaction, given by transaction_id.