[Zope3-checkins] CVS: ZODB4/src/zodb/storage - bdbfull.py:1.12.4.2
Barry Warsaw
barry@wooz.org
Mon, 10 Mar 2003 14:36:25 -0500
Update of /cvs-repository/ZODB4/src/zodb/storage
In directory cvs.zope.org:/tmp/cvs-serv14724
Modified Files:
Tag: opaque-pickles-branch
bdbfull.py
Log Message:
More implementation efforts toward fragrance-free pickles.
=== ZODB4/src/zodb/storage/bdbfull.py 1.12.4.1 => 1.12.4.2 ===
--- ZODB4/src/zodb/storage/bdbfull.py:1.12.4.1 Mon Feb 10 18:05:44 2003
+++ ZODB4/src/zodb/storage/bdbfull.py Mon Mar 10 14:36:23 2003
@@ -28,7 +28,8 @@
from zodb.conflict import ConflictResolvingStorage, ResolvedSerial
from zodb.interfaces import ITransactionAttrs
from zodb.storage.interfaces import StorageSystemError
-from zodb.storage.base import db, BerkeleyBase, PackStop, _WorkThread
+from zodb.storage.base import db, BerkeleyBase, PackStop, _WorkThread, \
+ splitrefs
from zodb.storage._helper import incr
ABORT = 'A'
@@ -270,8 +271,10 @@
flag = self._pending.get(tid)
assert flag in (ABORT, COMMIT)
if flag == ABORT:
+ self.log('aborting pending transaction %r', tid)
self._withtxn(self._doabort, tid)
else:
+ self.log('recovering pending transaction %r', tid)
self._withtxn(self._docommit, tid)
# Initialize our cache of the next available version id.
c = self._versions.cursor()
@@ -572,7 +575,7 @@
finally:
self._lock_release()
- def _dorestore(self, txn, oid, serial, data, version, prev_txn):
+ def _dorestore(self, txn, oid, serial, data, version, prev_txn, refs):
tid = self._serial
vid = nvrevid = ovid = ZERO
prevrevid = prev_txn
@@ -633,6 +636,8 @@
self._serials.put(oid, serial, txn=txn)
# Update the rest of the tables
self._metadata.put(revid, vid+nvrevid+lrevid+prevrevid, txn=txn)
+ if refs:
+ self._referents.put(revid, EMPTYSTRING.join(refs), txn=txn)
self._txnoids.put(tid, oid, txn=txn)
self._oids.put(oid, PRESENT, txn=txn)
if vid <> ZERO:
@@ -642,7 +647,7 @@
if not version or ovid <> ZERO:
self._objrevs.put(tid+oid, prevrevid, txn=txn)
- def restore(self, oid, serial, data, version, prev_txn, transaction):
+ def restore(self, oid, serial, data, version, prev_txn, refs, transaction):
# A lot like store() but without all the consistency checks. This
# should only be used when we /know/ the data is good, hence the
# method name. While the signature looks like store() there are some
@@ -670,7 +675,7 @@
self._lock_acquire()
try:
self._withtxn(
- self._dorestore, oid, serial, data, version, prev_txn)
+ self._dorestore, oid, serial, data, version, prev_txn, refs)
finally:
self._lock_release()
@@ -1017,6 +1022,7 @@
return serial, tid
def _loadSerialEx(self, oid, serial):
+ revid = oid+serial
# Just like loadSerial, except that it returns the pickle data, the
# version this object revision is living in, and a backpointer. The
# backpointer is None if the lrevid for this metadata record is the
@@ -1026,7 +1032,7 @@
try:
# Get the pointer to the pickle for the given serial number. Let
# KeyErrors percolate up.
- metadata = self._metadata[oid+serial]
+ metadata = self._metadata[revid]
vid, ign, lrevid = unpack('>8s8s8s', metadata[:24])
if vid == ZERO:
version = ''
@@ -1035,14 +1041,20 @@
# Check for an zombification event, possible with transactional
# undo. Use data==None to specify that.
if lrevid == DNE:
- return None, version, None
+ return None, version, None, None
backpointer = None
if lrevid <> serial:
# This transaction shares its pickle data with a previous
# transaction. We need to let the caller know, esp. when it's
# the iterator code, so that it can pass this information on.
backpointer = lrevid
- return self._pickles[oid+lrevid], version, backpointer
+ # Also return the list of oids referred to by this object
+ refs = self._referents.get(revid)
+ if refs is None:
+ refs = []
+ else:
+ refs = splitrefs(refs)
+ return self._pickles[oid+lrevid], version, backpointer, refs
finally:
self._lock_release()
@@ -1595,7 +1607,7 @@
# object revision
referents = self._referents.get(oid+lrevid)
if referents:
- for oid in self._splitoids(referents):
+ for oid in splitrefs(referents):
self._oidqueue.append(oid, txn)
# Pop the next oid off the queue and do it all again
rec = self._oidqueue.consume(txn)
@@ -1676,27 +1688,6 @@
c.close()
self._lock_release()
- def _alltxnoids(self, tid):
- self._lock_acquire()
- c = self._txnoids.cursor()
- try:
- oids = []
- oidkeys = {}
- try:
- rec = c.set(tid)
- except db.DBNotFoundError:
- rec = None
- while rec:
- # Ignore the key
- oid = rec[1]
- if not oidkeys.has_key(oid):
- oids.append(oid)
- oidkeys[oid] = 1
- rec = c.next_dup()
- return oids
- finally:
- c.close()
- self._lock_release()
class _GetItemBase:
@@ -1712,12 +1703,16 @@
Transactions *must* be accessed sequentially (e.g. with a for loop).
"""
+
+ __implements__ = IStorageIterator
+
def __init__(self, storage, start, stop):
self._storage = storage
self._tid = start
self._stop = stop
self._closed = False
self._first = True
+ self._iters = []
def next(self):
"""Return the ith item in the sequence of transaction data.
@@ -1736,9 +1731,13 @@
if self._stop is not None and tid > self._stop:
raise IndexError
self._tid = tid
- return _RecordsIterator(self._storage, tid, packedp, user, desc, ext)
+ it = _RecordsIterator(self._storage, tid, packedp, user, desc, ext)
+ self._iters.append(it)
+ return it
def close(self):
+ for it in self._iters:
+ it.close()
self._closed = True
@@ -1750,7 +1749,7 @@
Items *must* be accessed sequentially (e.g. with a for loop).
"""
- __implements__ = ITransactionAttrs
+ __implements__ = ITransactionAttrs, ITransactionRecordIterator
# Transaction id as an 8-byte timestamp string
tid = None
@@ -1782,10 +1781,10 @@
self._extension = pickle.loads(ext)
except EOFError:
self._extension = {}
- # Internal pointer
- self._oids = self._storage._alltxnoids(self.tid)
- # To make .pop() more efficient
- self._oids.reverse()
+ # BAW: touching the storage's private parts!
+ self._table = self._storage._txnoids
+ self._cursor = None
+ self._rec = None
def next(self):
"""Return the ith item in the sequence of record data.
@@ -1794,14 +1793,45 @@
IndexError will be raised after all of the items have been
returned.
"""
- # Let IndexError percolate up
- oid = self._oids.pop()
- data, version, lrevid = self._storage._loadSerialEx(oid, self.tid)
- return _Record(oid, self.tid, version, data, lrevid)
+ # Initialize a txnoids cursor and set it to the start of the oids
+ # touched by this transaction. We do this here to ensure the cursor
+ # is closed if there are any problems. A hole in this approach is if
+ # the client never exhausts the iterator. Then I think we have a
+ # problem because I don't think the environment can be closed if
+ # there's an open cursor, but you also cannot close the cursor if the
+ # environment is already closed (core dumps), so an __del__ doesn't
+ # help a whole lot.
+ try:
+ if self._cursor is None:
+ self._cursor = self._table.cursor()
+ try:
+ self._rec = self._cursor.set(self.tid)
+ except db.DBNotFoundError:
+ pass
+ # Cursor exhausted?
+ if self._rec is None:
+ self.close()
+ raise IndexError
+ oid = self._rec[1]
+ self._rec = self._cursor.next_dup()
+ data, version, lrevid, refs = self._storage._loadSerialEx(
+ oid, self.tid)
+ return _Record(oid, self.tid, version, data, lrevid, refs)
+ except:
+ self.close()
+ raise
+
+ def close(self):
+ if self._cursor:
+ self._cursor.close()
+ self._cursor = None
class _Record:
+
+ __implements__ = IDataRecord
+
# Object Id
oid = None
# Object serial number (i.e. revision id)
@@ -1812,13 +1842,16 @@
data = None
# The pointer to the transaction containing the pickle data, if not None
data_txn = None
+ # The list of oids of objects referred to by this object
+ refs = []
- def __init__(self, oid, serial, version, data, data_txn):
+ def __init__(self, oid, serial, version, data, data_txn, refs):
self.oid = oid
self.serial = serial
self.version = version
self.data = data
self.data_txn = data_txn
+ self.refs = refs