[Zope3-checkins] CVS: ZODB4/src/zodb/storage - bdbfull.py:1.12.4.2

Barry Warsaw barry@wooz.org
Mon, 10 Mar 2003 14:36:25 -0500


Update of /cvs-repository/ZODB4/src/zodb/storage
In directory cvs.zope.org:/tmp/cvs-serv14724

Modified Files:
      Tag: opaque-pickles-branch
	bdbfull.py 
Log Message:
More implementation efforts toward fragrance-free pickles.


=== ZODB4/src/zodb/storage/bdbfull.py 1.12.4.1 => 1.12.4.2 ===
--- ZODB4/src/zodb/storage/bdbfull.py:1.12.4.1	Mon Feb 10 18:05:44 2003
+++ ZODB4/src/zodb/storage/bdbfull.py	Mon Mar 10 14:36:23 2003
@@ -28,7 +28,8 @@
 from zodb.conflict import ConflictResolvingStorage, ResolvedSerial
 from zodb.interfaces import ITransactionAttrs
 from zodb.storage.interfaces import StorageSystemError
-from zodb.storage.base import db, BerkeleyBase, PackStop, _WorkThread
+from zodb.storage.base import db, BerkeleyBase, PackStop, _WorkThread, \
+     splitrefs
 from zodb.storage._helper import incr
 
 ABORT = 'A'
@@ -270,8 +271,10 @@
             flag = self._pending.get(tid)
             assert flag in (ABORT, COMMIT)
             if flag == ABORT:
+                self.log('aborting pending transaction %r', tid)
                 self._withtxn(self._doabort, tid)
             else:
+                self.log('recovering pending transaction %r', tid)
                 self._withtxn(self._docommit, tid)
         # Initialize our cache of the next available version id.
         c = self._versions.cursor()
@@ -572,7 +575,7 @@
         finally:
             self._lock_release()
 
-    def _dorestore(self, txn, oid, serial, data, version, prev_txn):
+    def _dorestore(self, txn, oid, serial, data, version, prev_txn, refs):
         tid = self._serial
         vid = nvrevid = ovid = ZERO
         prevrevid = prev_txn
@@ -633,6 +636,8 @@
             self._serials.put(oid, serial, txn=txn)
         # Update the rest of the tables
         self._metadata.put(revid, vid+nvrevid+lrevid+prevrevid, txn=txn)
+        if refs:
+            self._referents.put(revid, EMPTYSTRING.join(refs), txn=txn)
         self._txnoids.put(tid, oid, txn=txn)
         self._oids.put(oid, PRESENT, txn=txn)
         if vid <> ZERO:
@@ -642,7 +647,7 @@
         if not version or ovid <> ZERO:
             self._objrevs.put(tid+oid, prevrevid, txn=txn)
 
-    def restore(self, oid, serial, data, version, prev_txn, transaction):
+    def restore(self, oid, serial, data, version, prev_txn, refs, transaction):
         # A lot like store() but without all the consistency checks.  This
         # should only be used when we /know/ the data is good, hence the
         # method name.  While the signature looks like store() there are some
@@ -670,7 +675,7 @@
         self._lock_acquire()
         try:
             self._withtxn(
-                self._dorestore, oid, serial, data, version, prev_txn)
+                self._dorestore, oid, serial, data, version, prev_txn, refs)
         finally:
             self._lock_release()
 
@@ -1017,6 +1022,7 @@
         return serial, tid
 
     def _loadSerialEx(self, oid, serial):
+        revid = oid+serial
         # Just like loadSerial, except that it returns the pickle data, the
         # version this object revision is living in, and a backpointer.  The
         # backpointer is None if the lrevid for this metadata record is the
@@ -1026,7 +1032,7 @@
         try:
             # Get the pointer to the pickle for the given serial number.  Let
             # KeyErrors percolate up.
-            metadata = self._metadata[oid+serial]
+            metadata = self._metadata[revid]
             vid, ign, lrevid = unpack('>8s8s8s', metadata[:24])
             if vid == ZERO:
                 version = ''
@@ -1035,14 +1041,20 @@
             # Check for an zombification event, possible with transactional
             # undo.  Use data==None to specify that.
             if lrevid == DNE:
-                return None, version, None
+                return None, version, None, None
             backpointer = None
             if lrevid <> serial:
                 # This transaction shares its pickle data with a previous
                 # transaction.  We need to let the caller know, esp. when it's
                 # the iterator code, so that it can pass this information on.
                 backpointer = lrevid
-            return self._pickles[oid+lrevid], version, backpointer
+            # Also return the list of oids referred to by this object
+            refs = self._referents.get(revid)
+            if refs is None:
+                refs = []
+            else:
+                refs = splitrefs(refs)
+            return self._pickles[oid+lrevid], version, backpointer, refs
         finally:
             self._lock_release()
 
@@ -1595,7 +1607,7 @@
                     # object revision
                     referents = self._referents.get(oid+lrevid)
                     if referents:
-                        for oid in self._splitoids(referents):
+                        for oid in splitrefs(referents):
                             self._oidqueue.append(oid, txn)
             # Pop the next oid off the queue and do it all again
             rec = self._oidqueue.consume(txn)
@@ -1676,27 +1688,6 @@
                 c.close()
             self._lock_release()
 
-    def _alltxnoids(self, tid):
-        self._lock_acquire()
-        c = self._txnoids.cursor()
-        try:
-            oids = []
-            oidkeys = {}
-            try:
-                rec = c.set(tid)
-            except db.DBNotFoundError:
-                rec = None
-            while rec:
-                # Ignore the key
-                oid = rec[1]
-                if not oidkeys.has_key(oid):
-                    oids.append(oid)
-                    oidkeys[oid] = 1
-                rec = c.next_dup()
-            return oids
-        finally:
-            c.close()
-            self._lock_release()
 
 
 class _GetItemBase:
@@ -1712,12 +1703,16 @@
 
     Transactions *must* be accessed sequentially (e.g. with a for loop).
     """
+
+    __implements__ = IStorageIterator
+
     def __init__(self, storage, start, stop):
         self._storage = storage
         self._tid = start
         self._stop = stop
         self._closed = False
         self._first = True
+        self._iters = []
 
     def next(self):
         """Return the ith item in the sequence of transaction data.
@@ -1736,9 +1731,13 @@
         if self._stop is not None and tid > self._stop:
             raise IndexError
         self._tid = tid
-        return _RecordsIterator(self._storage, tid, packedp, user, desc, ext)
+        it = _RecordsIterator(self._storage, tid, packedp, user, desc, ext)
+        self._iters.append(it)
+        return it
 
     def close(self):
+        for it in self._iters:
+            it.close()
         self._closed = True
 
 
@@ -1750,7 +1749,7 @@
     Items *must* be accessed sequentially (e.g. with a for loop).
     """
 
-    __implements__ = ITransactionAttrs
+    __implements__ = ITransactionAttrs, ITransactionRecordIterator
 
     # Transaction id as an 8-byte timestamp string
     tid = None
@@ -1782,10 +1781,10 @@
             self._extension = pickle.loads(ext)
         except EOFError:
             self._extension = {}
-        # Internal pointer
-        self._oids = self._storage._alltxnoids(self.tid)
-        # To make .pop() more efficient
-        self._oids.reverse()
+        # BAW: touching the storage's private parts!
+        self._table = self._storage._txnoids
+        self._cursor = None
+        self._rec = None
 
     def next(self):
         """Return the ith item in the sequence of record data.
@@ -1794,14 +1793,45 @@
         IndexError will be raised after all of the items have been
         returned.
         """
-        # Let IndexError percolate up
-        oid = self._oids.pop()
-        data, version, lrevid = self._storage._loadSerialEx(oid, self.tid)
-        return _Record(oid, self.tid, version, data, lrevid)
+        # Initialize a txnoids cursor and set it to the start of the oids
+        # touched by this transaction.  We do this here to ensure the cursor
+        # is closed if there are any problems.  A hole in this approach is if
+        # the client never exhausts the iterator.  Then I think we have a
+        # problem because I don't think the environment can be closed if
+        # there's an open cursor, but you also cannot close the cursor if the
+        # environment is already closed (core dumps), so an __del__ doesn't
+        # help a whole lot.
+        try:
+            if self._cursor is None:
+                self._cursor = self._table.cursor()
+                try:
+                    self._rec = self._cursor.set(self.tid)
+                except db.DBNotFoundError:
+                    pass
+            # Cursor exhausted?
+            if self._rec is None:
+                self.close()
+                raise IndexError
+            oid = self._rec[1]
+            self._rec = self._cursor.next_dup()
+            data, version, lrevid, refs = self._storage._loadSerialEx(
+                oid, self.tid)
+            return _Record(oid, self.tid, version, data, lrevid, refs)
+        except:
+            self.close()
+            raise
+
+    def close(self):
+        if self._cursor:
+            self._cursor.close()
+            self._cursor = None
 
 
 
 class _Record:
+
+    __implements__ = IDataRecord
+
     # Object Id
     oid = None
     # Object serial number (i.e. revision id)
@@ -1812,13 +1842,16 @@
     data = None
     # The pointer to the transaction containing the pickle data, if not None
     data_txn = None
+    # The list of oids of objects referred to by this object
+    refs = []
 
-    def __init__(self, oid, serial, version, data, data_txn):
+    def __init__(self, oid, serial, version, data, data_txn, refs):
         self.oid = oid
         self.serial = serial
         self.version = version
         self.data = data
         self.data_txn = data_txn
+        self.refs = refs