[Zodb-checkins] SVN: ZODB/trunk/src/ Merge gocept-iteration branch.

Christian Theune ct at gocept.com
Sat Aug 30 09:21:50 EDT 2008

Log message for revision 90616:
  Merge gocept-iteration branch.

  U   ZODB/trunk/src/CHANGES.txt
  U   ZODB/trunk/src/ZEO/ClientStorage.py
  U   ZODB/trunk/src/ZEO/CommitLog.py
  U   ZODB/trunk/src/ZEO/ServerStub.py
  U   ZODB/trunk/src/ZEO/StorageServer.py
  U   ZODB/trunk/src/ZEO/tests/ConnectionTests.py
  A   ZODB/trunk/src/ZEO/tests/IterationTests.py
  U   ZODB/trunk/src/ZEO/tests/testZEO.py
  U   ZODB/trunk/src/ZODB/BaseStorage.py
  U   ZODB/trunk/src/ZODB/DemoStorage.py
  U   ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
  U   ZODB/trunk/src/ZODB/FileStorage/__init__.py
  U   ZODB/trunk/src/ZODB/MappingStorage.py
  U   ZODB/trunk/src/ZODB/blob.py
  U   ZODB/trunk/src/ZODB/fsrecover.py
  U   ZODB/trunk/src/ZODB/interfaces.py
  U   ZODB/trunk/src/ZODB/tests/IteratorStorage.py
  U   ZODB/trunk/src/ZODB/tests/PackableStorage.py
  U   ZODB/trunk/src/ZODB/tests/RecoveryStorage.py
  U   ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py
  U   ZODB/trunk/src/ZODB/tests/testFileStorage.py
  U   ZODB/trunk/src/ZODB/tests/testMappingStorage.py
  U   ZODB/trunk/src/ZODB/utils.py

Modified: ZODB/trunk/src/CHANGES.txt
--- ZODB/trunk/src/CHANGES.txt	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/CHANGES.txt	2008-08-30 13:21:49 UTC (rev 90616)
@@ -8,6 +8,9 @@
 New Features
+- Cleaned-up the storage iteration API and provided an iterator implementation
+  for ZEO.
 - Versions are no-longer supported.
 - ZEO cache files can be larger than 4G. Note that older ZEO cache

Modified: ZODB/trunk/src/ZEO/ClientStorage.py
--- ZODB/trunk/src/ZEO/ClientStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/ClientStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -29,8 +29,9 @@
 import time
 import types
 import logging
+import weakref
-from zope.interface import implements
+import zope.interface
 from ZEO import ServerStub
 from ZEO.cache import ClientCache
 from ZEO.TransactionBuffer import TransactionBuffer
@@ -38,11 +39,12 @@
 from ZEO.auth import get_module
 from ZEO.zrpc.client import ConnectionManager
+import ZODB.interfaces
 import ZODB.lock_file
+import ZODB.BaseStorage
 from ZODB import POSException
 from ZODB import utils
 from ZODB.loglevels import BLATHER
-from ZODB.interfaces import IBlobStorage
 from ZODB.blob import rename_or_copy_blob
 from persistent.TimeStamp import TimeStamp
@@ -103,8 +105,11 @@
-    implements(IBlobStorage)
+    # ClientStorage does not declare any interfaces here. Interfaces are
+    # declared according to the server's storage once a connection is
+    # established.
     # Classes we instantiate.  A subclass might override.
     TransactionBufferClass = TransactionBuffer
     ClientCacheClass = ClientCache
@@ -260,6 +265,9 @@
         self._password = password
         self._realm = realm
+        self._iterators = weakref.WeakValueDictionary()
+        self._iterator_ids = set()
         # Flag tracking disconnections in the middle of a transaction.  This
         # is reset in tpc_begin() and set in notifyDisconnected().
         self._midtxn_disconnect = 0
@@ -270,7 +278,7 @@
         self._verification_invalidations = None
         self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
-                      'supportsUndo':0}
+                      'supportsUndo': 0, 'interfaces': ()}
         self._tbuf = self.TransactionBufferClass()
         self._db = None
@@ -526,6 +534,15 @@
+        # Decorate ClientStorage with all interfaces that the backend storage
+        # supports.
+        remote_interfaces = []
+        for module_name, interface_name in self._info['interfaces']:
+            module = __import__(module_name, globals(), locals(), [interface_name])
+            interface = getattr(module, interface_name)
+            remote_interfaces.append(interface)
+        zope.interface.directlyProvides(self, remote_interfaces)
     def _handle_extensions(self):
         for name in self.getExtensionMethods().keys():
             if not hasattr(self, name):
@@ -633,6 +650,7 @@
         self._server = disconnected_stub
         self._midtxn_disconnect = 1
+        self._iterator_gc()
     def __len__(self):
         """Return the size of the storage."""
@@ -933,14 +951,7 @@
             raise POSException.POSKeyError("No blob file", oid, serial)
         # First, we'll create the directory for this oid, if it doesn't exist. 
-        targetpath = self.fshelper.getPathForOID(oid)
-        if not os.path.exists(targetpath):
-            try:
-                os.makedirs(targetpath, 0700)
-            except OSError:
-                # We might have lost a race.  If so, the directory
-                # must exist now
-                assert os.path.exists(targetpath)
+        self.fshelper.createPathForOID(oid)
         # OK, it's not here and we (or someone) needs to get it.  We
         # want to avoid getting it multiple times.  We want to avoid
@@ -1075,6 +1086,7 @@
             del self._serials[:]
+            self._iterator_gc()
     def tpc_finish(self, txn, f=None):
@@ -1104,6 +1116,7 @@
             assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
+            self._iterator_gc()
     def _update_cache(self, tid):
@@ -1176,6 +1189,25 @@
             return []
         return self._server.undoLog(first, last)
+    # Recovery support
+    def copyTransactionsFrom(self, other, verbose=0):
+        """Copy transactions from another storage.
+        This is typically used for converting data from one storage to
+        another.  `other` must have an .iterator() method.
+        """
+        ZODB.BaseStorage.copy(other, self, verbose)
+    def restore(self, oid, serial, data, version, prev_txn, transaction):
+        """Write data already committed in a separate database."""
+        assert not version
+        self._check_trans(transaction)
+        self._server.restorea(oid, serial, data, prev_txn, id(transaction))
+        # Don't update the transaction buffer, because current data are
+        # unaffected.
+        return self._check_serials()
     # Below are methods invoked by the StorageServer
     def serialnos(self, args):
@@ -1249,3 +1281,103 @@
     invalidate = invalidateVerify
     end = endVerify
     Invalidate = invalidateTrans
+    # IStorageIteration
+    def iterator(self, start=None, stop=None):
+        """Return an IStorageTransactionInformation iterator."""
+        # iids are "iterator IDs" that can be used to query an iterator whose
+        # status is held on the server.
+        iid = self._server.iterator_start(start, stop)
+        return self._setup_iterator(TransactionIterator, iid)
+    def _setup_iterator(self, factory, iid, *args):
+        self._iterators[iid] = iterator = factory(self, iid, *args)
+        self._iterator_ids.add(iid)
+        return iterator
+    def _forget_iterator(self, iid):
+        self._iterators.pop(iid, None)
+        self._iterator_ids.remove(iid)
+    def _iterator_gc(self):
+        iids = self._iterator_ids - set(self._iterators)
+        try:
+            self._server.iterator_gc(list(iids))
+        except ClientDisconnected:
+            # We could not successfully garbage-collect iterators.
+            # The server might have been restarted, so the IIDs might mean
+            # something different now. We simply forget our unused IIDs to
+            # avoid gc'ing foreign iterators.
+            # In the case that the server was not restarted, we accept the
+            # risk of leaking resources on the ZEO server.
+            pass
+        self._iterator_ids -= iids
+class TransactionIterator(object):
+    def __init__(self, storage, iid, *args):
+        self._storage = storage 
+        self._iid = iid
+        self._ended = False
+    def __iter__(self):
+        return self
+    def next(self):
+        if self._ended:
+            raise ZODB.interfaces.StorageStopIteration()
+        tx_data = self._storage._server.iterator_next(self._iid)
+        if tx_data is None:
+            # The iterator is exhausted, and the server has already
+            # disposed it.
+            self._ended = True
+            self._storage._forget_iterator(self._iid)
+            raise ZODB.interfaces.StorageStopIteration()
+        return ClientStorageTransactionInformation(self._storage, self, *tx_data)
+class ClientStorageTransactionInformation(ZODB.BaseStorage.TransactionRecord):
+    def __init__(self, storage, txiter, tid, status, user, description, extension):
+        self._storage = storage
+        self._txiter = txiter
+        self._completed = False
+        self._riid = None
+        self.tid = tid
+        self.status = status
+        self.user = user
+        self.description = description
+        self.extension = extension
+    def __iter__(self):
+        riid = self._storage._server.iterator_record_start(self._txiter._iid, self.tid)
+        return self._storage._setup_iterator(RecordIterator, riid)
+class RecordIterator(object):
+    def __init__(self, storage, riid):
+        self._riid = riid
+        self._completed = False
+        self._storage = storage
+    def __iter__(self):
+        return self
+    def next(self):
+        if self._completed:
+            # We finished iteration once already and the server can't know
+            # about the iteration anymore.
+            raise ZODB.interfaces.StorageStopIteration()
+        item = self._storage._server.iterator_record_next(self._riid)
+        if item is None:
+            # The iterator is exhausted, and the server has already
+            # disposed it.
+            self._completed = True
+            raise ZODB.interfaces.StorageStopIteration()
+        return ZODB.BaseStorage.DataRecord(*item)

Modified: ZODB/trunk/src/ZEO/CommitLog.py
--- ZODB/trunk/src/ZEO/CommitLog.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/CommitLog.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -18,10 +18,13 @@
 concurrent commits are achieved by logging actions up until the
 tpc_vote().  At that point, the entire transaction is committed on the
 real storage.
 import cPickle
 import tempfile
 class CommitLog:
     def __init__(self):
@@ -35,9 +38,13 @@
         return self.file.tell()
     def store(self, oid, serial, data):
-        self.pickler.dump((oid, serial, data))
+        self.pickler.dump(('s', oid, serial, data))
         self.stores += 1
+    def restore(self, oid, serial, data, prev_txn):
+        self.pickler.dump(('r', oid, serial, data, prev_txn))
+        self.stores += 1
     def get_loader(self):
         self.read = 1

Modified: ZODB/trunk/src/ZEO/ServerStub.py
--- ZODB/trunk/src/ZEO/ServerStub.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/ServerStub.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -209,6 +209,10 @@
     def storea(self, oid, serial, data, id):
         self.rpc.callAsync('storea', oid, serial, data, '', id)
+    def restorea(self, oid, serial, data, prev_txn, id):
+        self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id)
     def storeBlob(self, oid, serial, data, blobfilename, txn):
         # Store a blob to the server.  We don't want to real all of
@@ -292,6 +296,22 @@
     def undoInfo(self, first, last, spec):
         return self.rpc.call('undoInfo', first, last, spec)
+    def iterator_start(self, start, stop):
+        return self.rpc.call('iterator_start', start, stop)
+    def iterator_next(self, iid):
+        return self.rpc.call('iterator_next', iid)
+    def iterator_record_start(self, txn_iid, tid):
+        return self.rpc.call('iterator_record_start', txn_iid, tid)
+    def iterator_record_next(self, iid):
+        return self.rpc.call('iterator_record_next', iid)
+    def iterator_gc(self, iids):
+        return self.rpc.call('iterator_gc', iids)
 class ExtensionMethodWrapper:
     def __init__(self, rpc, name):
         self.rpc = rpc

Modified: ZODB/trunk/src/ZEO/StorageServer.py
--- ZODB/trunk/src/ZEO/StorageServer.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -29,12 +29,14 @@
 import threading
 import time
 import warnings
+import itertools
 import transaction
 import ZODB.serialize
 import ZEO.zrpc.error
+import zope.interface
 from ZEO import ClientStub
 from ZEO.CommitLog import CommitLog
 from ZEO.monitor import StorageStats, StatsServer
@@ -53,7 +55,6 @@
 logger = logging.getLogger('ZEO.StorageServer')
 # TODO:  This used to say "ZSS", which is now implied in the logger name.
 # Can this be either set to str(os.getpid()) (if that makes sense) or removed?
 _label = "" # default label used for logging.
@@ -110,6 +111,11 @@
         self._extensions = {}
         for func in self.extensions:
             self._extensions[func.func_name] = None
+        self._iterators = {}
+        self._iterator_ids = itertools.count()
+        # Stores the last item that was handed out for a
+        # transaction iterator.
+        self._txn_iterators_last = {}
     def finish_auth(self, authenticated):
         if not self.auth_realm:
@@ -272,6 +278,12 @@
             supportsUndo = supportsUndo()
+        # Communicate the backend storage interfaces to the client
+        storage_provides = zope.interface.providedBy(storage)
+        interfaces = []
+        for candidate in storage_provides.__iro__:
+            interfaces.append((candidate.__module__, candidate.__name__))
         return {'length': len(storage),
                 'size': storage.getSize(),
                 'name': storage.getName(),
@@ -279,6 +291,7 @@
                 'supportsVersions': False,
                 'extensionMethods': self.getExtensionMethods(),
                 'supports_record_iternext': hasattr(self, 'record_iternext'),
+                'interfaces': tuple(interfaces),
     def get_size_info(self):
@@ -477,6 +490,11 @@
         self.stats.stores += 1
         self.txnlog.store(oid, serial, data)
+    def restorea(self, oid, serial, data, prev_txn, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        self.stats.stores += 1
+        self.txnlog.restore(oid, serial, data, prev_txn)
     def storeBlobStart(self):
         assert self.blob_tempfile is None
         self.blob_tempfile = tempfile.mkstemp(
@@ -544,16 +562,7 @@
                 # Unexpected errors are logged and passed to the client
                 self.log("store error: %s, %s" % sys.exc_info()[:2],
                          logging.ERROR, exc_info=True)
-            # Try to pickle the exception.  If it can't be pickled,
-            # the RPC response would fail, so use something else.
-            pickler = cPickle.Pickler()
-            pickler.fast = 1
-            try:
-                pickler.dump(err, 1)
-            except:
-                msg = "Couldn't pickle storage exception: %s" % repr(err)
-                self.log(msg, logging.ERROR)
-                err = StorageServerError(msg)
+            err = self._marshal_error(err)
             # The exception is reported back as newserial for this oid
             newserial = [(oid, err)]
@@ -575,6 +584,37 @@
         return err is None
+    def _restore(self, oid, serial, data, prev_txn):
+        err = None
+        try:
+            self.storage.restore(oid, serial, data, '', prev_txn, self.transaction)
+        except (SystemExit, KeyboardInterrupt):
+            raise
+        except Exception, err:
+            self.store_failed = 1
+            if not isinstance(err, TransactionError):
+                # Unexpected errors are logged and passed to the client
+                self.log("store error: %s, %s" % sys.exc_info()[:2],
+                         logging.ERROR, exc_info=True)
+            err = self._marshal_error(err)
+            # The exception is reported back as newserial for this oid
+            self.serials.append((oid, err))
+        return err is None
+    def _marshal_error(self, error):
+        # Try to pickle the exception.  If it can't be pickled,
+        # the RPC response would fail, so use something that can be pickled.
+        pickler = cPickle.Pickler()
+        pickler.fast = 1
+        try:
+            pickler.dump(error, 1)
+        except:
+            msg = "Couldn't pickle storage exception: %s" % repr(error)
+            self.log(msg, logging.ERROR)
+            error = StorageServerError(msg)
+        return error
     def _vote(self):
         if not self.store_failed:
             # Only call tpc_vote of no store call failed, otherwise
@@ -627,8 +667,18 @@
         self._tpc_begin(self.transaction, self.tid, self.status)
         loads, loader = self.txnlog.get_loader()
         for i in range(loads):
-            # load oid, serial, data, version
-            if not self._store(*loader.load()):
+            store = loader.load()
+            store_type = store[0]
+            store_args = store[1:]
+            if store_type == 's':
+                do_store = self._store
+            elif store_type == 'r':
+                do_store = self._restore
+            else:
+                raise ValueError('Invalid store type: %r' % store_type)
+            if not do_store(*store_args):
         # Blob support
@@ -683,6 +733,62 @@
     abortVersion = commitVersion
+    # IStorageIteration support
+    def iterator_start(self, start, stop):
+        iid = self._iterator_ids.next()
+        self._iterators[iid] = iter(self.storage.iterator(start, stop))
+        return iid
+    def iterator_next(self, iid):
+        iterator = self._iterators[iid]
+        try:
+            info = iterator.next()
+        except StopIteration:
+            del self._iterators[iid]
+            item = None
+            if iid in self._txn_iterators_last:
+                del self._txn_iterators_last[iid]
+        else:
+            item = (info.tid,
+                    info.status,
+                    info.user,
+                    info.description,
+                    info.extension)
+            # Keep a reference to the last iterator result to allow starting a
+            # record iterator off it.
+            self._txn_iterators_last[iid] = info
+        return item
+    def iterator_record_start(self, txn_iid, tid):
+        record_iid = self._iterator_ids.next()
+        txn_info = self._txn_iterators_last[txn_iid]
+        if txn_info.tid != tid:
+            raise Exception(
+                'Out-of-order request for record iterator for transaction %r' % tid)
+        self._iterators[record_iid] = iter(txn_info)
+        return record_iid
+    def iterator_record_next(self, iid):
+        iterator = self._iterators[iid]
+        try:
+            info = iterator.next()
+        except StopIteration:
+            del self._iterators[iid]
+            item = None
+        else:
+            item = (info.oid,
+                    info.tid,
+                    info.data,
+                    info.version,
+                    info.data_txn)
+        return item
+    def iterator_gc(self, iids):
+        for iid in iids:
+            self._iterators.pop(iid, None)
 class StorageServerDB:
     def __init__(self, server, storage_id):

Modified: ZODB/trunk/src/ZEO/tests/ConnectionTests.py
--- ZODB/trunk/src/ZEO/tests/ConnectionTests.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/tests/ConnectionTests.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -102,7 +102,8 @@
         logging.info("setUp() %s", self.id())
-        self.file = tempfile.mktemp()
+        fd, self.file = tempfile.mkstemp()
+        os.close(fd)
         self.addr = []
         self._pids = []
         self._servers = []

Copied: ZODB/trunk/src/ZEO/tests/IterationTests.py (from rev 90352, ZODB/branches/gocept-iteration/src/ZEO/tests/IterationTests.py)
--- ZODB/trunk/src/ZEO/tests/IterationTests.py	                        (rev 0)
+++ ZODB/trunk/src/ZEO/tests/IterationTests.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -0,0 +1,101 @@
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+"""ZEO iterator protocol tests."""
+import transaction
+class IterationTests:
+    def checkIteratorGCProtocol(self):
+        # Test garbage collection on protocol level.
+        server = self._storage._server
+        iid = server.iterator_start(None, None)
+        # None signals the end of iteration.
+        self.assertEquals(None, server.iterator_next(iid))
+        # The server has disposed the iterator already.
+        self.assertRaises(KeyError, server.iterator_next, iid)
+        iid = server.iterator_start(None, None)
+        # This time, we tell the server to throw the iterator away.
+        server.iterator_gc([iid])
+        self.assertRaises(KeyError, server.iterator_next, iid)
+    def checkIteratorExhaustionStorage(self):
+        # Test the storage's garbage collection mechanism.
+        iterator = self._storage.iterator()
+        self.assertEquals(1, len(self._storage._iterator_ids))
+        iid = list(self._storage._iterator_ids)[0]
+        self.assertEquals([], list(iterator))
+        self.assertEquals(0, len(self._storage._iterator_ids))
+        # The iterator has run through, so the server has already disposed it.
+        self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
+    def checkIteratorGCSpanTransactions(self):
+        iterator = self._storage.iterator()
+        t = transaction.Transaction()
+        self._storage.tpc_begin(t)
+        self._storage.tpc_vote(t)
+        self._storage.tpc_finish(t)
+        self.assertEquals([], list(iterator))
+    def checkIteratorGCStorageCommitting(self):
+        # We want the iterator to be garbage-collected, so we don't keep any
+        # hard references to it. The storage tracks its ID, though.
+        self._storage.iterator()
+        self.assertEquals(1, len(self._storage._iterator_ids))
+        iid = list(self._storage._iterator_ids)[0]
+        # GC happens at the transaction boundary. After that, both the storage
+        # and the server have forgotten the iterator.
+        self._dostore()
+        self.assertEquals(0, len(self._storage._iterator_ids))
+        self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
+    def checkIteratorGCStorageTPCAborting(self):
+        self._storage.iterator()
+        iid = list(self._storage._iterator_ids)[0]
+        t = transaction.Transaction()
+        self._storage.tpc_begin(t)
+        self._storage.tpc_abort(t)
+        self.assertEquals(0, len(self._storage._iterator_ids))
+        self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
+    def checkIteratorGCStorageDisconnect(self):
+        self._storage.iterator()
+        iid = list(self._storage._iterator_ids)[0]
+        t = transaction.Transaction()
+        self._storage.tpc_begin(t)
+        # Show that after disconnecting, the client side GCs the iterators
+        # as well. I'm calling this directly to avoid accidentally
+        # calling tpc_abort implicitly.
+        self._storage.notifyDisconnected()
+        self.assertEquals(0, len(self._storage._iterator_ids))
+    def checkIteratorParallel(self):
+        self._dostore()
+        self._dostore()
+        iter1 = self._storage.iterator()
+        iter2 = self._storage.iterator()
+        txn_info1 = iter1.next()
+        txn_info2 = iter2.next()
+        self.assertEquals(txn_info1.tid, txn_info2.tid)
+        txn_info1 = iter1.next()
+        txn_info2 = iter2.next()
+        self.assertEquals(txn_info1.tid, txn_info2.tid)
+        self.assertRaises(StopIteration, iter1.next)
+        self.assertRaises(StopIteration, iter2.next)

Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
--- ZODB/trunk/src/ZEO/tests/testZEO.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -39,7 +39,7 @@
 from ZODB.tests import StorageTestBase, BasicStorage,  \
      TransactionalUndoStorage,  \
      PackableStorage, Synchronization, ConflictResolution, RevisionStorage, \
-     MTStorage, ReadOnlyStorage
+     MTStorage, ReadOnlyStorage, IteratorStorage, RecoveryStorage
 from ZODB.tests.testDemoStorage import DemoStorageWrappedBase
@@ -47,7 +47,8 @@
 import ZEO.zrpc.connection
-from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
+from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests, \
+     IterationTests
 from ZEO.tests.forker import get_port
 import ZEO.tests.ConnectionTests
@@ -56,7 +57,6 @@
 logger = logging.getLogger('ZEO.tests.testZEO')
 class DummyDB:
     def invalidate(self, *args):
@@ -158,7 +158,7 @@
     # Locally defined (see above)
-    MiscZEOTests
+    MiscZEOTests,
     """Combine tests from various origins in one class."""
@@ -196,6 +196,15 @@
             for pid in self._pids:
                 os.waitpid(pid, 0)
+    def runTest(self):
+        try:
+            super(GenericTests, self).runTest()
+        except:
+            self._failed = True
+            raise
+        else:
+            self._failed = False
     def open(self, read_only=0):
         # Needed to support ReadOnlyStorage tests.  Ought to be a
         # cleaner way.
@@ -226,9 +235,75 @@
+    IteratorStorage.IteratorStorage,
+    IterationTests.IterationTests,
     """Extend GenericTests with tests that MappingStorage can't pass."""
+class FileStorageRecoveryTests(StorageTestBase.StorageTestBase,
+                               RecoveryStorage.RecoveryStorage):
+    level = 2
+    def setUp(self):
+        self._storage = ZODB.FileStorage.FileStorage("Source.fs", create=True)
+        self._dst = ZODB.FileStorage.FileStorage("Dest.fs", create=True)
+    def getConfig(self):
+        filename = self.__fs_base = tempfile.mktemp()
+        return """\
+        <filestorage 1>
+        path %s
+        </filestorage>
+        """ % filename
+    def _new_storage(self):
+        port = get_port()
+        zconf = forker.ZEOConfig(('', port))
+        zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(),
+                                                              zconf, port)
+        blob_cache_dir = tempfile.mkdtemp()
+        self._pids.append(pid)
+        self._servers.append(adminaddr)
+        self._conf_paths.append(path)
+        self.blob_cache_dirs.append(blob_cache_dir)
+        storage = ClientStorage(
+            zport, '1', cache_size=20000000,
+            min_disconnect_poll=0.5, wait=1,
+            wait_timeout=60, blob_dir=blob_cache_dir)
+        storage.registerDB(DummyDB())
+        return storage
+    def setUp(self):
+        self._pids = []
+        self._servers = []
+        self._conf_paths = []
+        self.blob_cache_dirs = []
+        self._storage = self._new_storage()
+        self._dst = self._new_storage()
+    def tearDown(self):
+        self._storage.close()
+        self._dst.close()
+        for p in self._conf_paths:
+            os.remove(p)
+        for p in self.blob_cache_dirs:
+            ZODB.blob.remove_committed_dir(p)
+        for server in self._servers:
+            forker.shutdown_zeo_server(server)
+        if hasattr(os, 'waitpid'):
+            # Not in Windows Python until 2.3
+            for pid in self._pids:
+                os.waitpid(pid, 0)
+    def new_dest(self):
+        return self._new_storage()
 class FileStorageTests(FullGenericTests):
     """Test ZEO backed by a FileStorage."""
     level = 2
@@ -241,12 +316,36 @@
         """ % filename
+    def checkInterfaceFromRemoteStorage(self):
+        # ClientStorage itself doesn't implement IStorageIteration, but the
+        # FileStorage on the other end does, and thus the ClientStorage
+        # instance that is connected to it reflects this.
+        self.failIf(ZODB.interfaces.IStorageIteration.implementedBy(
+            ZEO.ClientStorage.ClientStorage))
+        self.failUnless(ZODB.interfaces.IStorageIteration.providedBy(
+            self._storage))
+        # This is communicated using ClientStorage's _info object:
+        self.assertEquals((('ZODB.interfaces', 'IStorageIteration'),
+                           ('zope.interface', 'Interface')),
+                          self._storage._info['interfaces'])
 class MappingStorageTests(GenericTests):
     """ZEO backed by a Mapping storage."""
     def getConfig(self):
         return """<mappingstorage 1/>"""
+    def checkSimpleIteration(self):
+        # The test base class IteratorStorage assumes that we keep undo data
+        # to construct our iterator, which we don't, so we disable this test.
+        pass
+    def checkUndoZombie(self):
+        # The test base class IteratorStorage assumes that we keep undo data
+        # to construct our iterator, which we don't, so we disable this test.
+        pass
 class DemoStorageTests(
@@ -260,6 +359,11 @@
         """ % tempfile.mktemp()
+    def checkUndoZombie(self):
+        # The test base class IteratorStorage assumes that we keep undo data
+        # to construct our iterator, which we don't, so we disable this test.
+        pass
 class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
     """Make sure a heartbeat is being sent and that it does no harm
@@ -554,7 +658,7 @@
-class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
+class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests):
     """ZEO backed by a BlobStorage-adapted FileStorage."""
     def setUp(self):
@@ -645,7 +749,7 @@
-class BlobWritableCacheTests(GenericTests, CommonBlobTests):
+class BlobWritableCacheTests(FullGenericTests, CommonBlobTests):
     def setUp(self):
         self.blobdir = self.blob_cache_dir = tempfile.mkdtemp()
@@ -823,7 +927,7 @@
     >>> st = StorageServerWrapper(sv, 'fs')
     >>> s = st.server
-Now, if we ask fior the invalidations since the last committed
+Now, if we ask for the invalidations since the last committed
 transaction, we'll get a result:
     >>> tid, oids = s.getInvalidations(last[-1])
@@ -849,7 +953,8 @@
-test_classes = [FileStorageTests, MappingStorageTests, DemoStorageTests,
+test_classes = [FileStorageTests, FileStorageRecoveryTests,
+                MappingStorageTests, DemoStorageTests,
                 BlobAdaptedFileStorageTests, BlobWritableCacheTests]
 def test_suite():

Modified: ZODB/trunk/src/ZODB/BaseStorage.py
--- ZODB/trunk/src/ZODB/BaseStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/BaseStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -21,14 +21,18 @@
 import logging
 from struct import pack as _structpack, unpack as _structunpack
+import zope.interface
 from persistent.TimeStamp import TimeStamp
+import ZODB.interfaces
 from ZODB import POSException
 from ZODB.utils import z64, oid_repr
 from ZODB.UndoLogCompatible import UndoLogCompatible
 log = logging.getLogger("ZODB.BaseStorage")
 class BaseStorage(UndoLogCompatible):
     """Base class that supports storage implementations.
@@ -188,6 +192,7 @@
                 ext = cPickle.dumps(ext, 1)
                 ext = ""
             self._ude = user, desc, ext
             if tid is None:
@@ -338,7 +343,7 @@
             if verbose:
                 print oid_repr(oid), r.version, len(r.data)
             if restoring:
-                dest.restore(oid, r.tid, r.data, '',
+                dest.restore(oid, r.tid, r.data, r.version,
                              r.data_txn, transaction)
                 pre = preget(oid, None)
@@ -348,10 +353,36 @@
-    fiter.close()
-class TransactionRecord:
+class TransactionRecord(object):
     """Abstract base class for iterator protocol"""
-class DataRecord:
+    zope.interface.implements(ZODB.interfaces.IStorageTransactionInformation)
+    def __init__(self, tid, status, user, description, extension):
+        self.tid = tid
+        self.status = status
+        self.user = user
+        self.description = description
+        self.extension = extension
+    # XXX This is a workaround to make the TransactionRecord compatible with a
+    # transaction object because it is passed to tpc_begin().
+    def _ext_set(self, value):
+        self.extension = value
+    def _ext_get(self):
+        return self.extension
+    _extension = property(fset=_ext_set, fget=_ext_get)
+class DataRecord(object):
     """Abstract base class for iterator protocol"""
+    zope.interface.implements(ZODB.interfaces.IStorageRecordInformation)
+    def __init__(self, oid, tid, data, version, prev):
+        self.oid = oid
+        self.tid = tid
+        self.data = data
+        self.version = version
+        self.data_txn = prev

Modified: ZODB/trunk/src/ZODB/DemoStorage.py
--- ZODB/trunk/src/ZODB/DemoStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/DemoStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -80,15 +80,19 @@
+import cPickle
 import base64, time
+import ZODB.BaseStorage
+import ZODB.interfaces
+import zope.interface
 from ZODB import POSException
 from ZODB.utils import z64, oid_repr
-from ZODB.BaseStorage import BaseStorage
 from persistent.TimeStamp import TimeStamp
-from cPickle import loads
 from BTrees import OOBTree
-class DemoStorage(BaseStorage):
+class DemoStorage(ZODB.BaseStorage.BaseStorage):
     """Demo storage
     Demo storages provide useful storages for writing tests because
@@ -104,9 +108,10 @@
+    zope.interface.implements(ZODB.interfaces.IStorageIteration)
     def __init__(self, name='Demo Storage', base=None, quota=None):
-        BaseStorage.__init__(self, name, base)
+        ZODB.BaseStorage.BaseStorage.__init__(self, name, base)
         # We use a BTree because the items are sorted!
         self._data = OOBTree.OOBTree()
@@ -133,7 +138,7 @@
     # by the base storage, leading to a variety of "impossible" problems.
     def new_oid(self):
         if self._base is None:
-            return BaseStorage.new_oid(self)
+            return ZODB.BaseStorage.BaseStorage.new_oid(self)
             return self._base.new_oid()
@@ -317,6 +322,10 @@
         self._tsize = self._size + 120 + len(u) + len(d) + len(e)
     def _finish(self, tid, user, desc, ext):
+        if not self._tindex:
+            # No data, so we don't update anything.
+            return
         self._size = self._tsize
         self._data[tid] = None, user, desc, ext, tuple(self._tindex)
@@ -364,7 +373,7 @@
                      'time': TimeStamp(tid).timeTime(),
                      'user_name': u, 'description': d}
                 if e:
-                    d.update(loads(e))
+                    d.update(cPickle.loads(e))
                 if filter is None or filter(d):
                     if i >= first:
@@ -569,3 +578,27 @@
     def close(self):
         if self._base is not None:
+    def iterator(self, start=None, end=None):
+        # First iterate over the base storage
+        if self._base is not None:
+            for transaction in self._base.iterator(start, end):
+                yield transaction
+        # Then iterate over our local transactions
+        for tid, transaction in self._data.items():
+            if tid >= start and tid <= end:
+                yield TransactionRecord(tid, transaction)
+class TransactionRecord(ZODB.BaseStorage.TransactionRecord):
+    def __init__(self, tid, transaction):
+        packed, user, description, extension, records = transaction
+        super(TransactionRecord, self).__init__(
+            tid, packed, user, description, extension)
+        self.records = transaction
+    def __iter__(self):
+        for record in self.records:
+            oid, prev, version, data, tid = record
+            yield ZODB.BaseStorage.DataRecord(oid, tid, data, version, prev)

Modified: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
--- ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -29,6 +29,8 @@
 # Not all platforms have fsync
 fsync = getattr(os, "fsync", None)
+import zope.interface
+import ZODB.interfaces
 from ZODB import BaseStorage, ConflictResolution, POSException
 from ZODB.POSException import UndoError, POSKeyError, MultipleUndoErrors
 from persistent.TimeStamp import TimeStamp
@@ -88,6 +90,8 @@
+    zope.interface.implements(ZODB.interfaces.IStorageIteration)
     # Set True while a pack is in progress; undo is blocked for the duration.
     _pack_is_in_progress = False
@@ -1127,7 +1131,7 @@
             return [(trans.tid, [(r.oid, '') for r in trans])
-                    for trans in FileIterator(self._file, pos=pos)]
+                    for trans in FileIterator(self._file_name, pos=pos)]
@@ -1516,31 +1520,16 @@
-class Iterator:
-    """A General simple iterator that uses the Python for-loop index protocol
-    """
-    __index=-1
-    __current=None
-    def __getitem__(self, i):
-        __index=self.__index
-        while i > __index:
-            __index=__index+1
-            self.__current=self.next(__index)
-        self.__index=__index
-        return self.__current
-class FileIterator(Iterator, FileStorageFormatter):
+class FileIterator(FileStorageFormatter):
     """Iterate over the transactions in a FileStorage file.
     _ltid = z64
     _file = None
-    def __init__(self, file, start=None, stop=None, pos=4L):
-        if isinstance(file, str):
-            file = open(file, 'rb')
+    def __init__(self, filename, start=None, stop=None, pos=4L):
+        assert isinstance(filename, str)
+        file = open(filename, 'rb')
         self._file = file
         if file.read(4) != packed_version:
             raise FileStorageFormatError(file.name)
@@ -1602,14 +1591,17 @@
                     panic("%s has inconsistent transaction length at %s "
                           "(%s != %s)", file.name, pos, u64(rtl), u64(stl))
-    def next(self, index=0):
+    # Iterator protocol
+    def __iter__(self):
+        return self
+    def next(self):
         if self._file is None:
-            # A closed iterator.  Is IOError the best we can do?  For
-            # now, mimic a read on a closed file.
-            raise IOError('iterator is closed')
+            raise ZODB.interfaces.StorageStopIteration()
         pos = self._pos
-        while 1:
+        while True:
             # Read the transaction record
                 h = self._read_txn_header(pos)
@@ -1625,11 +1617,11 @@
             self._ltid = h.tid
             if self._stop is not None and h.tid > self._stop:
-                raise IndexError(index)
+                break
             if h.status == "c":
                 # Assume we've hit the last, in-progress transaction
-                raise IndexError(index)
+                break
             if pos + h.tlen + 8 > self._file_size:
                 # Hm, the data were truncated or the checkpoint flag wasn't
@@ -1679,8 +1671,8 @@
-                result = RecordIterator(h.tid, h.status, h.user, h.descr,
-                                        e, pos, tend, self._file, tpos)
+                result = TransactionRecord(h.tid, h.status, h.user, h.descr,
+                                           e, pos, tend, self._file, tpos)
             # Read the (intentionally redundant) transaction length
@@ -1693,23 +1685,25 @@
             return result
-        raise IndexError(index)
+        self.close()
+        raise ZODB.interfaces.StorageStopIteration()
-class RecordIterator(Iterator, BaseStorage.TransactionRecord,
-                     FileStorageFormatter):
+class TransactionRecord(BaseStorage.TransactionRecord, FileStorageFormatter):
     """Iterate over the transactions in a FileStorage file."""
     def __init__(self, tid, status, user, desc, ext, pos, tend, file, tpos):
-        self.tid = tid
-        self.status = status
-        self.user = user
-        self.description = desc
-        self._extension = ext
+        BaseStorage.TransactionRecord.__init__(
+            self, tid, status, user, desc, ext)
         self._pos = pos
         self._tend = tend
         self._file = file
         self._tpos = tpos
-    def next(self, index=0):
+    def __iter__(self):
+        return self
+    def next(self):
         pos = self._pos
         while pos < self._tend:
             # Read the data records for this transaction
@@ -1738,20 +1732,18 @@
                     # Should it go to the original data like BDBFullStorage?
                     prev_txn = self.getTxnFromData(h.oid, h.back)
-            r = Record(h.oid, h.tid, data, prev_txn, pos)
-            return r
+            return Record(h.oid, h.tid, data, prev_txn, pos)
-        raise IndexError(index)
+        raise ZODB.interfaces.StorageStopIteration()
 class Record(BaseStorage.DataRecord):
-    """An abstract database record."""
     def __init__(self, oid, tid, data, prev, pos):
-        self.oid = oid
-        self.tid = tid
-        self.data = data
-        self.data_txn = prev
+        super(Record, self).__init__(oid, tid, data, '', prev)
         self.pos = pos
 class UndoSearch:
     def __init__(self, file, pos, first, last, filter=None):

Modified: ZODB/trunk/src/ZODB/FileStorage/__init__.py
--- ZODB/trunk/src/ZODB/FileStorage/__init__.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/FileStorage/__init__.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -1,4 +1,8 @@
 # this is a package
-from ZODB.FileStorage.FileStorage import FileStorage, RecordIterator
+from ZODB.FileStorage.FileStorage import FileStorage, TransactionRecord
 from ZODB.FileStorage.FileStorage import FileIterator, Record, packed_version
+# BBB Alias for compatibility
+RecordIterator = TransactionRecord

Modified: ZODB/trunk/src/ZODB/MappingStorage.py
--- ZODB/trunk/src/ZODB/MappingStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/MappingStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -21,16 +21,16 @@
 The Mapping storage uses a single data structure to map object ids to data.
+import ZODB.BaseStorage
 from ZODB.utils import u64, z64
-from ZODB.BaseStorage import BaseStorage
 from ZODB import POSException
 from persistent.TimeStamp import TimeStamp
-class MappingStorage(BaseStorage):
+class MappingStorage(ZODB.BaseStorage.BaseStorage):
     def __init__(self, name='Mapping Storage'):
-        BaseStorage.__init__(self, name)
+        ZODB.BaseStorage.BaseStorage.__init__(self, name)
         # ._index maps an oid to a string s.  s[:8] is the tid of the
         # transaction that created oid's current state, and s[8:] is oid's
         # current state.

Modified: ZODB/trunk/src/ZODB/blob.py
--- ZODB/trunk/src/ZODB/blob.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/blob.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -324,6 +324,21 @@
         return os.path.join(self.base_dir, utils.oid_repr(oid))
+    def createPathForOID(self, oid):
+        """Given an OID, creates a directory on the filesystem where
+        the blob data relating to that OID is stored, if it doesn't exist.
+        """
+        path = self.getPathForOID(oid)
+        if os.path.exists(path):
+            return
+        try:
+            os.makedirs(path, 0700)
+        except OSError:
+            # We might have lost a race.  If so, the directory
+            # must exist now
+            assert os.path.exists(path)
     def getBlobFilename(self, oid, tid):
         """Given an oid and a tid, return the full filename of the
         'committed' blob file related to that oid and tid.

Modified: ZODB/trunk/src/ZODB/fsrecover.py
--- ZODB/trunk/src/ZODB/fsrecover.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/fsrecover.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -82,7 +82,7 @@
 import ZODB.FileStorage
 from ZODB.utils import u64
-from ZODB.FileStorage import RecordIterator
+from ZODB.FileStorage import TransactionRecord
 from persistent.TimeStamp import TimeStamp
@@ -146,8 +146,8 @@
         except: e={}
     else: e={}
-    result = RecordIterator(tid, status, user, description, e, pos, tend,
-                            f, tpos)
+    result = TransactionRecord(tid, status, user, description, e, pos, tend,
+                               f, tpos)
     pos = tend
     # Read the (intentionally redundant) transaction length

Modified: ZODB/trunk/src/ZODB/interfaces.py
--- ZODB/trunk/src/ZODB/interfaces.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/interfaces.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -286,6 +286,7 @@
         begins or until the connection os reopned.
 class IStorageDB(Interface):
     """Database interface exposed to storages
@@ -418,6 +419,7 @@
         should also close all the Connections.
 class IStorage(Interface):
     """A storage is responsible for storing and retrieving data of objects.
@@ -710,6 +712,7 @@
 class IStorageRestoreable(IStorage):
     """Copying Transactions
@@ -744,7 +747,7 @@
         #   failed to take into account records after the pack time.
-    def restore(oid, serial, data, prev_txn, transaction):
+    def restore(oid, serial, data, version, prev_txn, transaction):
         """Write data already committed in a separate database
         The restore method is used when copying data from one database
@@ -775,41 +778,44 @@
         Nothing is returned.
 class IStorageRecordInformation(Interface):
     """Provide information about a single storage record
     oid = Attribute("The object id")
+    tid = Attribute("The transaction id")
     data = Attribute("The data record")
+    version = Attribute("The version id")
+    data_txn = Attribute("The previous transaction id")
 class IStorageTransactionInformation(Interface):
-    """Provide information about a storage transaction
+    """Provide information about a storage transaction.
+    Can be iterated over to retrieve the records modified in the transaction.
     tid = Attribute("Transaction id")
     status = Attribute("Transaction Status") # XXX what are valid values?
     user = Attribute("Transaction user")
     description = Attribute("Transaction Description")
-    extension = Attribute("Transaction extension data")
+    extension = Attribute("A dictionary carrying the transaction's extension data")
     def __iter__():
-        """Return an iterable of IStorageRecordInformation
+        """Iterate over the transaction's records given as
+        IStorageRecordInformation objects.
 class IStorageIteration(Interface):
-    """API for iterating over the contents of a storage
+    """API for iterating over the contents of a storage."""
-    Note that this is a future API.  Some storages now provide an
-    approximation of this.
-    """
     def iterator(start=None, stop=None):
         """Return an IStorageTransactionInformation iterator.
-        An IStorageTransactionInformation iterator is returned for
-        iterating over the transactions in the storage.
         If the start argument is not None, then iteration will start
         with the first transaction whose identifier is greater than or
         equal to start.
@@ -818,8 +824,12 @@
         the last transaction whose identifier is less than or equal to
+        The iterator provides access to the data as available at the time when
+        the iterator was retrieved.
 class IStorageUndoable(IStorage):
     """A storage supporting transactional undo.
@@ -932,6 +942,7 @@
 class IBlob(Interface):
     """A BLOB supports efficient handling of large data within ZODB."""
@@ -986,5 +997,12 @@
         If Blobs use this, then commits can be performed with a simple rename.
 class BlobError(Exception):
+class StorageStopIteration(IndexError, StopIteration):
+    """A combination of StopIteration and IndexError to provide a
+    backwards-compatible exception.
+    """

Modified: ZODB/trunk/src/ZODB/tests/IteratorStorage.py
--- ZODB/trunk/src/ZODB/tests/IteratorStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/IteratorStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -15,6 +15,7 @@
 Any storage that supports the iterator() method should be able to pass
 all these tests.
 from ZODB.tests.MinPO import MinPO
@@ -23,13 +24,16 @@
 from transaction import Transaction
+import itertools
 class IteratorCompare:
     def iter_verify(self, txniter, revids, val0):
         eq = self.assertEqual
         oid = self._oid
         val = val0
-        for reciter, revid in zip(txniter, revids + [None]):
+        for reciter, revid in itertools.izip(txniter, revids + [None]):
             eq(reciter.tid, revid)
             for rec in reciter:
                 eq(rec.oid, oid)
@@ -37,8 +41,8 @@
                 eq(zodb_unpickle(rec.data), MinPO(val))
                 val = val + 1
         eq(val, val0 + len(revids))
-        txniter.close()
 class IteratorStorage(IteratorCompare):
     def checkSimpleIteration(self):
@@ -51,13 +55,6 @@
         txniter = self._storage.iterator()
         self.iter_verify(txniter, [revid1, revid2, revid3], 11)
-    def checkClose(self):
-        self._oid = oid = self._storage.new_oid()
-        revid1 = self._dostore(oid, data=MinPO(11))
-        txniter = self._storage.iterator()
-        txniter.close()
-        self.assertRaises(IOError, txniter.__getitem__, 0)
     def checkUndoZombie(self):
         oid = self._storage.new_oid()
         revid = self._dostore(oid, data=MinPO(94))
@@ -89,7 +86,7 @@
         iter = self._storage.iterator()
         count = 0
         for txn in iter:
-            self.assertEqual(txn._extension, {})
+            self.assertEqual(txn.extension, {})
             count +=1
         self.assertEqual(count, 1)
@@ -129,9 +126,34 @@
                     match = True
         if not match:
             self.fail("Could not find transaction with matching id")
+    def checkIterateRepeatedly(self):
+        self._dostore()
+        transactions = self._storage.iterator()
+        self.assertEquals(1, len(list(transactions)))
+        # The iterator can only be consumed once:
+        self.assertEquals(0, len(list(transactions)))
+    def checkIterateRecordsRepeatedly(self):
+        self._dostore()
+        tinfo = self._storage.iterator().next()
+        self.assertEquals(1, len(list(tinfo)))
+        # The iterator can only be consumed once:
+        self.assertEquals(0, len(list(tinfo)))
+    def checkIterateWhileWriting(self):
+        self._dostore()
+        iterator = self._storage.iterator()
+        # We have one transaction with 1 modified object.
+        txn_1 = iterator.next()
+        self.assertEquals(1, len(list(txn_1)))
+        # We store another transaction with 1 object, the already running
+        # iterator does not pick this up.
+        self._dostore()
+        self.assertRaises(StopIteration, iterator.next)
 class ExtendedIteratorStorage(IteratorCompare):
     def checkExtendedIteration(self):
@@ -173,28 +195,36 @@
         txniter = self._storage.iterator(revid3, revid3)
         self.iter_verify(txniter, [revid3], 13)
 class IteratorDeepCompare:
     def compare(self, storage1, storage2):
         eq = self.assertEqual
         iter1 = storage1.iterator()
         iter2 = storage2.iterator()
-        for txn1, txn2 in zip(iter1, iter2):
+        for txn1, txn2 in itertools.izip(iter1, iter2):
             eq(txn1.tid,         txn2.tid)
             eq(txn1.status,      txn2.status)
             eq(txn1.user,        txn2.user)
             eq(txn1.description, txn2.description)
-            eq(txn1._extension,  txn2._extension)
-            for rec1, rec2 in zip(txn1, txn2):
+            eq(txn1.extension,  txn2.extension)
+            itxn1 = iter(txn1)
+            itxn2 = iter(txn2)
+            for rec1, rec2 in itertools.izip(itxn1, itxn2):
                 eq(rec1.oid,     rec2.oid)
                 eq(rec1.tid,  rec2.tid)
                 eq(rec1.data,    rec2.data)
             # Make sure there are no more records left in rec1 and rec2,
             # meaning they were the same length.
-            self.assertRaises(IndexError, txn1.next)
-            self.assertRaises(IndexError, txn2.next)
+            # Additionally, check that we're backwards compatible to the
+            # IndexError we used to raise before.
+            self.assertRaises(IndexError, itxn1.next)
+            self.assertRaises(IndexError, itxn2.next)
+            self.assertRaises(StopIteration, itxn1.next)
+            self.assertRaises(StopIteration, itxn2.next)
         # Make sure ther are no more records left in txn1 and txn2, meaning
         # they were the same length
         self.assertRaises(IndexError, iter1.next)
         self.assertRaises(IndexError, iter2.next)
-        iter1.close()
-        iter2.close()
+        self.assertRaises(StopIteration, iter1.next)
+        self.assertRaises(StopIteration, iter2.next)

Modified: ZODB/trunk/src/ZODB/tests/PackableStorage.py
--- ZODB/trunk/src/ZODB/tests/PackableStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/PackableStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -30,6 +30,7 @@
 from persistent import Persistent
 from persistent.mapping import PersistentMapping
 import transaction
+import ZODB.interfaces
 from ZODB import DB
 from ZODB.serialize import referencesf
 from ZODB.tests.MinPO import MinPO
@@ -149,6 +150,16 @@
+    def _sanity_check(self):
+        # Iterate over the storage to make sure it's sane.
+        if not ZODB.interfaces.IStorageIteration.providedBy(self._storage):
+            return
+        it = self._storage.iterator()
+        for txn in it:
+            for data in txn:
+                pass
 class PackableStorage(PackableStorageBase):
     def checkPackEmptyStorage(self):
@@ -253,17 +264,8 @@
             self.fail('a thread is still alive')
-        # Iterate over the storage to make sure it's sane, but not every
-        # storage supports iterators.
-        if not hasattr(self._storage, "iterator"):
-            return
+        self._sanity_check()
-        it = self._storage.iterator()
-        for txn in it:
-            for data in txn:
-                pass
-        it.close()
     def checkPackWhileWriting(self):
@@ -304,14 +306,7 @@
             packt = time.time()
-        # Iterate over the storage to make sure it's sane.
-        if not hasattr(self._storage, "iterator"):
-            return
-        it = self._storage.iterator()
-        for txn in it:
-            for data in txn:
-                pass
-        it.close()
+        self._sanity_check()
     def checkPackWithMultiDatabaseReferences(self):
         databases = {}

Modified: ZODB/trunk/src/ZODB/tests/RecoveryStorage.py
--- ZODB/trunk/src/ZODB/tests/RecoveryStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/RecoveryStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -22,6 +22,7 @@
 import time
 class RecoveryStorage(IteratorDeepCompare):
     # Requires a setUp() that creates a self._dst destination storage
     def checkSimpleRecovery(self):
@@ -49,12 +50,16 @@
         # copy the final transaction manually.  even though there
         # was a pack, the restore() ought to succeed.
         it = self._storage.iterator()
-        final = list(it)[-1]
+        # Get the last transaction and its record iterator. Record iterators
+        # can't be accessed out-of-order, so we need to do this in a bit
+        # complicated way:
+        for final  in it:
+            records = list(final)
         self._dst.tpc_begin(final, final.tid, final.status)
-        for r in final:
+        for r in records:
             self._dst.restore(r.oid, r.tid, r.data, '', r.data_txn,
-        it.close()

Modified: ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py
--- ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -636,11 +636,13 @@
             for j in range(OBJECTS):
                 oid = s.new_oid()
                 obj = MinPO(i * OBJECTS + j)
-                revid = s.store(oid, None, zodb_pickle(obj), '', t)
-                orig.append((tid, oid, revid))
+                s.store(oid, None, zodb_pickle(obj), '', t)
+                orig.append((tid, oid))
+        orig = [(tid, oid, s.getTid(oid)) for tid, oid in orig]
         i = 0
         for tid, oid, revid in orig:
             self._dostore(oid, revid=revid, data=MinPO(revid),
@@ -668,14 +670,11 @@
         #     OBJECTS * BATCHES modifications, followed by
         #     BATCHES undos
-        iter = s.iterator()
-        offset = 0
+        transactions = s.iterator()
         eq = self.assertEqual
         for i in range(BATCHES):
-            txn = iter[offset]
-            offset += 1
+            txn = transactions.next()
             tid = p64(i + 1)
             eq(txn.tid, tid)
@@ -687,13 +686,11 @@
             eq(L1, L2)
         for i in range(BATCHES * OBJECTS):
-            txn = iter[offset]
-            offset += 1
+            txn = transactions.next()
             eq(len([rec for rec in txn if rec.data_txn is None]), 1)
         for i in range(BATCHES):
-            txn = iter[offset]
-            offset += 1
+            txn = transactions.next()
             # The undos are performed in reverse order.
             otid = p64(BATCHES - i)
@@ -704,7 +701,7 @@
             eq(L1, L2)
-        self.assertRaises(IndexError, iter.__getitem__, offset)
+        self.assertRaises(StopIteration, transactions.next)
     def checkUndoLogMetadata(self):
         # test that the metadata is correct in the undo log

Modified: ZODB/trunk/src/ZODB/tests/testFileStorage.py
--- ZODB/trunk/src/ZODB/tests/testFileStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/testFileStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -308,6 +308,7 @@
                 self.assertNotEqual(next_oid, None)
 class FileStorageRecoveryTest(
@@ -326,6 +327,40 @@
     def new_dest(self):
         return ZODB.FileStorage.FileStorage('Dest.fs')
+class FileStorageNoRestore(ZODB.FileStorage.FileStorage):
+    @property
+    def restore(self):
+        raise Exception
+class FileStorageNoRestoreRecoveryTest(
+    StorageTestBase.StorageTestBase,
+    RecoveryStorage.RecoveryStorage,
+    ):
+    # This test actually verifies a code path of
+    # BaseStorage.copyTransactionsFrom. For simplicity of implementation, we
+    # use a FileStorage deprived of its restore method.
+    def setUp(self):
+        self._storage = FileStorageNoRestore("Source.fs", create=True)
+        self._dst = FileStorageNoRestore("Dest.fs", create=True)
+    def tearDown(self):
+        self._storage.close()
+        self._dst.close()
+        self._storage.cleanup()
+        self._dst.cleanup()
+    def new_dest(self):
+        return FileStorageNoRestore('Dest.fs')
+    def checkRestoreAcrossPack(self):
+        # Skip this check as it calls restore directly.
+        pass
 class SlowFileStorageTest(BaseFileStorageTests):
     level = 2
@@ -492,7 +527,8 @@
     suite = unittest.TestSuite()
     for klass in [FileStorageTests, Corruption.FileStorageCorruptTests,
-                  FileStorageRecoveryTest, SlowFileStorageTest]:
+                  FileStorageRecoveryTest, FileStorageNoRestoreRecoveryTest,
+                  SlowFileStorageTest]:
         suite.addTest(unittest.makeSuite(klass, "check"))

Modified: ZODB/trunk/src/ZODB/tests/testMappingStorage.py
--- ZODB/trunk/src/ZODB/tests/testMappingStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/testMappingStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -37,6 +37,7 @@
         # have this limit, so we inhibit this test here.
 def test_suite():
     suite = unittest.makeSuite(MappingStorageTests, 'check')
     return suite

Modified: ZODB/trunk/src/ZODB/utils.py
--- ZODB/trunk/src/ZODB/utils.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/utils.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -295,5 +295,3 @@
     handle, filename = mkstemp(dir=dir)
     return filename

More information about the Zodb-checkins mailing list