[Zodb-checkins] SVN: ZODB/trunk/src/ Merge gocept-iteration branch.

Christian Theune ct at gocept.com
Sat Aug 30 09:21:50 EDT 2008


Log message for revision 90616:
  Merge gocept-iteration branch.
  

Changed:
  U   ZODB/trunk/src/CHANGES.txt
  U   ZODB/trunk/src/ZEO/ClientStorage.py
  U   ZODB/trunk/src/ZEO/CommitLog.py
  U   ZODB/trunk/src/ZEO/ServerStub.py
  U   ZODB/trunk/src/ZEO/StorageServer.py
  U   ZODB/trunk/src/ZEO/tests/ConnectionTests.py
  A   ZODB/trunk/src/ZEO/tests/IterationTests.py
  U   ZODB/trunk/src/ZEO/tests/testZEO.py
  U   ZODB/trunk/src/ZODB/BaseStorage.py
  U   ZODB/trunk/src/ZODB/DemoStorage.py
  U   ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
  U   ZODB/trunk/src/ZODB/FileStorage/__init__.py
  U   ZODB/trunk/src/ZODB/MappingStorage.py
  U   ZODB/trunk/src/ZODB/blob.py
  U   ZODB/trunk/src/ZODB/fsrecover.py
  U   ZODB/trunk/src/ZODB/interfaces.py
  U   ZODB/trunk/src/ZODB/tests/IteratorStorage.py
  U   ZODB/trunk/src/ZODB/tests/PackableStorage.py
  U   ZODB/trunk/src/ZODB/tests/RecoveryStorage.py
  U   ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py
  U   ZODB/trunk/src/ZODB/tests/testFileStorage.py
  U   ZODB/trunk/src/ZODB/tests/testMappingStorage.py
  U   ZODB/trunk/src/ZODB/utils.py

-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/CHANGES.txt	2008-08-30 13:21:49 UTC (rev 90616)
@@ -8,6 +8,9 @@
 New Features
 ------------
 
+- Cleaned-up the storage iteration API and provided an iterator implementation
+  for ZEO.
+
 - Versions are no-longer supported.
 
 - ZEO cache files can be larger than 4G. Note that older ZEO cache

Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/ClientStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -29,8 +29,9 @@
 import time
 import types
 import logging
+import weakref
 
-from zope.interface import implements
+import zope.interface
 from ZEO import ServerStub
 from ZEO.cache import ClientCache
 from ZEO.TransactionBuffer import TransactionBuffer
@@ -38,11 +39,12 @@
 from ZEO.auth import get_module
 from ZEO.zrpc.client import ConnectionManager
 
+import ZODB.interfaces
 import ZODB.lock_file
+import ZODB.BaseStorage
 from ZODB import POSException
 from ZODB import utils
 from ZODB.loglevels import BLATHER
-from ZODB.interfaces import IBlobStorage
 from ZODB.blob import rename_or_copy_blob
 from persistent.TimeStamp import TimeStamp
 
@@ -103,8 +105,11 @@
 
     """
 
-    implements(IBlobStorage)
+    # ClientStorage does not declare any interfaces here. Interfaces are
+    # declared according to the server's storage once a connection is
+    # established.
 
+
     # Classes we instantiate.  A subclass might override.
     TransactionBufferClass = TransactionBuffer
     ClientCacheClass = ClientCache
@@ -260,6 +265,9 @@
         self._password = password
         self._realm = realm
 
+        self._iterators = weakref.WeakValueDictionary()
+        self._iterator_ids = set()
+
         # Flag tracking disconnections in the middle of a transaction.  This
         # is reset in tpc_begin() and set in notifyDisconnected().
         self._midtxn_disconnect = 0
@@ -270,7 +278,7 @@
         self._verification_invalidations = None
 
         self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
-                      'supportsUndo':0}
+                      'supportsUndo': 0, 'interfaces': ()}
 
         self._tbuf = self.TransactionBufferClass()
         self._db = None
@@ -526,6 +534,15 @@
 
         self._handle_extensions()
 
+        # Decorate ClientStorage with all interfaces that the backend storage
+        # supports.
+        remote_interfaces = []
+        for module_name, interface_name in self._info['interfaces']:
+            module = __import__(module_name, globals(), locals(), [interface_name])
+            interface = getattr(module, interface_name)
+            remote_interfaces.append(interface)
+        zope.interface.directlyProvides(self, remote_interfaces)
+
     def _handle_extensions(self):
         for name in self.getExtensionMethods().keys():
             if not hasattr(self, name):
@@ -633,6 +650,7 @@
         self._ready.clear()
         self._server = disconnected_stub
         self._midtxn_disconnect = 1
+        self._iterator_gc()
 
     def __len__(self):
         """Return the size of the storage."""
@@ -933,14 +951,7 @@
             raise POSException.POSKeyError("No blob file", oid, serial)
 
         # First, we'll create the directory for this oid, if it doesn't exist. 
-        targetpath = self.fshelper.getPathForOID(oid)
-        if not os.path.exists(targetpath):
-            try:
-                os.makedirs(targetpath, 0700)
-            except OSError:
-                # We might have lost a race.  If so, the directory
-                # must exist now
-                assert os.path.exists(targetpath)
+        self.fshelper.createPathForOID(oid)
 
         # OK, it's not here and we (or someone) needs to get it.  We
         # want to avoid getting it multiple times.  We want to avoid
@@ -1075,6 +1086,7 @@
             self._tbuf.clear()
             self._seriald.clear()
             del self._serials[:]
+            self._iterator_gc()
             self.end_transaction()
 
     def tpc_finish(self, txn, f=None):
@@ -1104,6 +1116,7 @@
             assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
         finally:
             self._load_lock.release()
+            self._iterator_gc()
             self.end_transaction()
 
     def _update_cache(self, tid):
@@ -1176,6 +1189,25 @@
             return []
         return self._server.undoLog(first, last)
 
+    # Recovery support
+
+    def copyTransactionsFrom(self, other, verbose=0):
+        """Copy transactions from another storage.
+
+        This is typically used for converting data from one storage to
+        another.  `other` must have an .iterator() method.
+        """
+        ZODB.BaseStorage.copy(other, self, verbose)
+
+    def restore(self, oid, serial, data, version, prev_txn, transaction):
+        """Write data already committed in a separate database."""
+        assert not version
+        self._check_trans(transaction)
+        self._server.restorea(oid, serial, data, prev_txn, id(transaction))
+        # Don't update the transaction buffer, because current data are
+        # unaffected.
+        return self._check_serials()
+
     # Below are methods invoked by the StorageServer
 
     def serialnos(self, args):
@@ -1249,3 +1281,103 @@
     invalidate = invalidateVerify
     end = endVerify
     Invalidate = invalidateTrans
+
+    # IStorageIteration
+
+    def iterator(self, start=None, stop=None):
+        """Return an IStorageTransactionInformation iterator."""
+        # iids are "iterator IDs" that can be used to query an iterator whose
+        # status is held on the server.
+        iid = self._server.iterator_start(start, stop)
+        return self._setup_iterator(TransactionIterator, iid)
+
+    def _setup_iterator(self, factory, iid, *args):
+        self._iterators[iid] = iterator = factory(self, iid, *args)
+        self._iterator_ids.add(iid)
+        return iterator
+
+    def _forget_iterator(self, iid):
+        self._iterators.pop(iid, None)
+        self._iterator_ids.remove(iid)
+
+    def _iterator_gc(self):
+        iids = self._iterator_ids - set(self._iterators)
+        try:
+            self._server.iterator_gc(list(iids))
+        except ClientDisconnected:
+            # We could not successfully garbage-collect iterators.
+            # The server might have been restarted, so the IIDs might mean
+            # something different now. We simply forget our unused IIDs to
+            # avoid gc'ing foreign iterators.
+            # In the case that the server was not restarted, we accept the
+            # risk of leaking resources on the ZEO server.
+            pass
+        self._iterator_ids -= iids
+
+
+class TransactionIterator(object):
+
+    def __init__(self, storage, iid, *args):
+        self._storage = storage 
+        self._iid = iid
+        self._ended = False
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        if self._ended:
+            raise ZODB.interfaces.StorageStopIteration()
+
+        tx_data = self._storage._server.iterator_next(self._iid)
+        if tx_data is None:
+            # The iterator is exhausted, and the server has already
+            # disposed it.
+            self._ended = True
+            self._storage._forget_iterator(self._iid)
+            raise ZODB.interfaces.StorageStopIteration()
+
+        return ClientStorageTransactionInformation(self._storage, self, *tx_data)
+
+
+class ClientStorageTransactionInformation(ZODB.BaseStorage.TransactionRecord):
+
+    def __init__(self, storage, txiter, tid, status, user, description, extension):
+        self._storage = storage
+        self._txiter = txiter
+        self._completed = False
+        self._riid = None
+
+        self.tid = tid
+        self.status = status
+        self.user = user
+        self.description = description
+        self.extension = extension
+
+    def __iter__(self):
+        riid = self._storage._server.iterator_record_start(self._txiter._iid, self.tid)
+        return self._storage._setup_iterator(RecordIterator, riid)
+
+
+class RecordIterator(object):
+
+    def __init__(self, storage, riid):
+        self._riid = riid
+        self._completed = False
+        self._storage = storage
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        if self._completed:
+            # We finished iteration once already and the server can't know
+            # about the iteration anymore.
+            raise ZODB.interfaces.StorageStopIteration()
+        item = self._storage._server.iterator_record_next(self._riid)
+        if item is None:
+            # The iterator is exhausted, and the server has already
+            # disposed it.
+            self._completed = True
+            raise ZODB.interfaces.StorageStopIteration()
+        return ZODB.BaseStorage.DataRecord(*item)

Modified: ZODB/trunk/src/ZEO/CommitLog.py
===================================================================
--- ZODB/trunk/src/ZEO/CommitLog.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/CommitLog.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -18,10 +18,13 @@
 concurrent commits are achieved by logging actions up until the
 tpc_vote().  At that point, the entire transaction is committed on the
 real storage.
+
 """
+
 import cPickle
 import tempfile
 
+
 class CommitLog:
 
     def __init__(self):
@@ -35,9 +38,13 @@
         return self.file.tell()
 
     def store(self, oid, serial, data):
-        self.pickler.dump((oid, serial, data))
+        self.pickler.dump(('s', oid, serial, data))
         self.stores += 1
 
+    def restore(self, oid, serial, data, prev_txn):
+        self.pickler.dump(('r', oid, serial, data, prev_txn))
+        self.stores += 1
+
     def get_loader(self):
         self.read = 1
         self.file.seek(0)

Modified: ZODB/trunk/src/ZEO/ServerStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ServerStub.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/ServerStub.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -209,6 +209,10 @@
     def storea(self, oid, serial, data, id):
         self.rpc.callAsync('storea', oid, serial, data, '', id)
 
+    def restorea(self, oid, serial, data, prev_txn, id):
+        self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id)
+
+
     def storeBlob(self, oid, serial, data, blobfilename, txn):
 
         # Store a blob to the server.  We don't want to real all of
@@ -292,6 +296,22 @@
     def undoInfo(self, first, last, spec):
         return self.rpc.call('undoInfo', first, last, spec)
 
+    def iterator_start(self, start, stop):
+        return self.rpc.call('iterator_start', start, stop)
+
+    def iterator_next(self, iid):
+        return self.rpc.call('iterator_next', iid)
+
+    def iterator_record_start(self, txn_iid, tid):
+        return self.rpc.call('iterator_record_start', txn_iid, tid)
+
+    def iterator_record_next(self, iid):
+        return self.rpc.call('iterator_record_next', iid)
+
+    def iterator_gc(self, iids):
+        return self.rpc.call('iterator_gc', iids)
+
+
 class ExtensionMethodWrapper:
     def __init__(self, rpc, name):
         self.rpc = rpc

Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/StorageServer.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -29,12 +29,14 @@
 import threading
 import time
 import warnings
+import itertools
 
 import transaction
 
 import ZODB.serialize
 import ZEO.zrpc.error
 
+import zope.interface
 from ZEO import ClientStub
 from ZEO.CommitLog import CommitLog
 from ZEO.monitor import StorageStats, StatsServer
@@ -53,7 +55,6 @@
 
 logger = logging.getLogger('ZEO.StorageServer')
 
-
 # TODO:  This used to say "ZSS", which is now implied in the logger name.
 # Can this be either set to str(os.getpid()) (if that makes sense) or removed?
 _label = "" # default label used for logging.
@@ -110,6 +111,11 @@
         self._extensions = {}
         for func in self.extensions:
             self._extensions[func.func_name] = None
+        self._iterators = {}
+        self._iterator_ids = itertools.count()
+        # Stores the last item that was handed out for a
+        # transaction iterator.
+        self._txn_iterators_last = {}
 
     def finish_auth(self, authenticated):
         if not self.auth_realm:
@@ -272,6 +278,12 @@
         else:
             supportsUndo = supportsUndo()
 
+        # Communicate the backend storage interfaces to the client
+        storage_provides = zope.interface.providedBy(storage)
+        interfaces = []
+        for candidate in storage_provides.__iro__:
+            interfaces.append((candidate.__module__, candidate.__name__))
+
         return {'length': len(storage),
                 'size': storage.getSize(),
                 'name': storage.getName(),
@@ -279,6 +291,7 @@
                 'supportsVersions': False,
                 'extensionMethods': self.getExtensionMethods(),
                 'supports_record_iternext': hasattr(self, 'record_iternext'),
+                'interfaces': tuple(interfaces),
                 }
 
     def get_size_info(self):
@@ -477,6 +490,11 @@
         self.stats.stores += 1
         self.txnlog.store(oid, serial, data)
 
+    def restorea(self, oid, serial, data, prev_txn, id):
+        self._check_tid(id, exc=StorageTransactionError)
+        self.stats.stores += 1
+        self.txnlog.restore(oid, serial, data, prev_txn)
+
     def storeBlobStart(self):
         assert self.blob_tempfile is None
         self.blob_tempfile = tempfile.mkstemp(
@@ -544,16 +562,7 @@
                 # Unexpected errors are logged and passed to the client
                 self.log("store error: %s, %s" % sys.exc_info()[:2],
                          logging.ERROR, exc_info=True)
-            # Try to pickle the exception.  If it can't be pickled,
-            # the RPC response would fail, so use something else.
-            pickler = cPickle.Pickler()
-            pickler.fast = 1
-            try:
-                pickler.dump(err, 1)
-            except:
-                msg = "Couldn't pickle storage exception: %s" % repr(err)
-                self.log(msg, logging.ERROR)
-                err = StorageServerError(msg)
+            err = self._marshal_error(err)
             # The exception is reported back as newserial for this oid
             newserial = [(oid, err)]
         else:
@@ -575,6 +584,37 @@
 
         return err is None
 
+    def _restore(self, oid, serial, data, prev_txn):
+        err = None
+        try:
+            self.storage.restore(oid, serial, data, '', prev_txn, self.transaction)
+        except (SystemExit, KeyboardInterrupt):
+            raise
+        except Exception, err:
+            self.store_failed = 1
+            if not isinstance(err, TransactionError):
+                # Unexpected errors are logged and passed to the client
+                self.log("store error: %s, %s" % sys.exc_info()[:2],
+                         logging.ERROR, exc_info=True)
+            err = self._marshal_error(err)
+            # The exception is reported back as newserial for this oid
+            self.serials.append((oid, err))
+
+        return err is None
+
+    def _marshal_error(self, error):
+        # Try to pickle the exception.  If it can't be pickled,
+        # the RPC response would fail, so use something that can be pickled.
+        pickler = cPickle.Pickler()
+        pickler.fast = 1
+        try:
+            pickler.dump(error, 1)
+        except:
+            msg = "Couldn't pickle storage exception: %s" % repr(error)
+            self.log(msg, logging.ERROR)
+            error = StorageServerError(msg)
+        return error
+
     def _vote(self):
         if not self.store_failed:
             # Only call tpc_vote of no store call failed, otherwise
@@ -627,8 +667,18 @@
         self._tpc_begin(self.transaction, self.tid, self.status)
         loads, loader = self.txnlog.get_loader()
         for i in range(loads):
-            # load oid, serial, data, version
-            if not self._store(*loader.load()):
+            store = loader.load()
+            store_type = store[0]
+            store_args = store[1:]
+
+            if store_type == 's':
+                do_store = self._store
+            elif store_type == 'r':
+                do_store = self._restore
+            else:
+                raise ValueError('Invalid store type: %r' % store_type)
+
+            if not do_store(*store_args):
                 break
 
         # Blob support
@@ -683,6 +733,62 @@
 
     abortVersion = commitVersion
 
+    # IStorageIteration support
+
+    def iterator_start(self, start, stop):
+        iid = self._iterator_ids.next()
+        self._iterators[iid] = iter(self.storage.iterator(start, stop))
+        return iid
+
+    def iterator_next(self, iid):
+        iterator = self._iterators[iid]
+        try:
+            info = iterator.next()
+        except StopIteration:
+            del self._iterators[iid]
+            item = None
+            if iid in self._txn_iterators_last:
+                del self._txn_iterators_last[iid]
+        else:
+            item = (info.tid,
+                    info.status,
+                    info.user,
+                    info.description,
+                    info.extension)
+            # Keep a reference to the last iterator result to allow starting a
+            # record iterator off it.
+            self._txn_iterators_last[iid] = info
+        return item
+
+    def iterator_record_start(self, txn_iid, tid):
+        record_iid = self._iterator_ids.next()
+        txn_info = self._txn_iterators_last[txn_iid]
+        if txn_info.tid != tid:
+            raise Exception(
+                'Out-of-order request for record iterator for transaction %r' % tid)
+        self._iterators[record_iid] = iter(txn_info)
+        return record_iid
+
+    def iterator_record_next(self, iid):
+        iterator = self._iterators[iid]
+        try:
+            info = iterator.next()
+        except StopIteration:
+            del self._iterators[iid]
+            item = None
+        else:
+            item = (info.oid,
+                    info.tid,
+                    info.data,
+                    info.version,
+                    info.data_txn)
+        return item
+
+    def iterator_gc(self, iids):
+        for iid in iids:
+            self._iterators.pop(iid, None)
+
+
 class StorageServerDB:
 
     def __init__(self, server, storage_id):

Modified: ZODB/trunk/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/ConnectionTests.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/tests/ConnectionTests.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -102,7 +102,8 @@
         """
         self.__super_setUp()
         logging.info("setUp() %s", self.id())
-        self.file = tempfile.mktemp()
+        fd, self.file = tempfile.mkstemp()
+        os.close(fd)
         self.addr = []
         self._pids = []
         self._servers = []

Copied: ZODB/trunk/src/ZEO/tests/IterationTests.py (from rev 90352, ZODB/branches/gocept-iteration/src/ZEO/tests/IterationTests.py)
===================================================================
--- ZODB/trunk/src/ZEO/tests/IterationTests.py	                        (rev 0)
+++ ZODB/trunk/src/ZEO/tests/IterationTests.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -0,0 +1,101 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""ZEO iterator protocol tests."""
+
+import transaction
+
+
+class IterationTests:
+
+    def checkIteratorGCProtocol(self):
+        # Test garbage collection on protocol level.
+        server = self._storage._server
+
+        iid = server.iterator_start(None, None)
+        # None signals the end of iteration.
+        self.assertEquals(None, server.iterator_next(iid))
+        # The server has disposed the iterator already.
+        self.assertRaises(KeyError, server.iterator_next, iid)
+
+        iid = server.iterator_start(None, None)
+        # This time, we tell the server to throw the iterator away.
+        server.iterator_gc([iid])
+        self.assertRaises(KeyError, server.iterator_next, iid)
+
+    def checkIteratorExhaustionStorage(self):
+        # Test the storage's garbage collection mechanism.
+        iterator = self._storage.iterator()
+        self.assertEquals(1, len(self._storage._iterator_ids))
+        iid = list(self._storage._iterator_ids)[0]
+        self.assertEquals([], list(iterator))
+        self.assertEquals(0, len(self._storage._iterator_ids))
+
+        # The iterator has run through, so the server has already disposed it.
+        self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
+
+    def checkIteratorGCSpanTransactions(self):
+        iterator = self._storage.iterator()
+        t = transaction.Transaction()
+        self._storage.tpc_begin(t)
+        self._storage.tpc_vote(t)
+        self._storage.tpc_finish(t)
+        self.assertEquals([], list(iterator))
+
+    def checkIteratorGCStorageCommitting(self):
+        # We want the iterator to be garbage-collected, so we don't keep any
+        # hard references to it. The storage tracks its ID, though.
+        self._storage.iterator()
+
+        self.assertEquals(1, len(self._storage._iterator_ids))
+        iid = list(self._storage._iterator_ids)[0]
+
+        # GC happens at the transaction boundary. After that, both the storage
+        # and the server have forgotten the iterator.
+        self._dostore()
+        self.assertEquals(0, len(self._storage._iterator_ids))
+        self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
+
+    def checkIteratorGCStorageTPCAborting(self):
+        self._storage.iterator()
+        iid = list(self._storage._iterator_ids)[0]
+        t = transaction.Transaction()
+        self._storage.tpc_begin(t)
+        self._storage.tpc_abort(t)
+        self.assertEquals(0, len(self._storage._iterator_ids))
+        self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
+
+    def checkIteratorGCStorageDisconnect(self):
+        self._storage.iterator()
+        iid = list(self._storage._iterator_ids)[0]
+        t = transaction.Transaction()
+        self._storage.tpc_begin(t)
+        # Show that after disconnecting, the client side GCs the iterators
+        # as well. I'm calling this directly to avoid accidentally
+        # calling tpc_abort implicitly.
+        self._storage.notifyDisconnected()
+        self.assertEquals(0, len(self._storage._iterator_ids))
+
+    def checkIteratorParallel(self):
+        self._dostore()
+        self._dostore()
+        iter1 = self._storage.iterator()
+        iter2 = self._storage.iterator()
+        txn_info1 = iter1.next()
+        txn_info2 = iter2.next()
+        self.assertEquals(txn_info1.tid, txn_info2.tid)
+        txn_info1 = iter1.next()
+        txn_info2 = iter2.next()
+        self.assertEquals(txn_info1.tid, txn_info2.tid)
+        self.assertRaises(StopIteration, iter1.next)
+        self.assertRaises(StopIteration, iter2.next)

Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -39,7 +39,7 @@
 from ZODB.tests import StorageTestBase, BasicStorage,  \
      TransactionalUndoStorage,  \
      PackableStorage, Synchronization, ConflictResolution, RevisionStorage, \
-     MTStorage, ReadOnlyStorage
+     MTStorage, ReadOnlyStorage, IteratorStorage, RecoveryStorage
 
 from ZODB.tests.testDemoStorage import DemoStorageWrappedBase
 
@@ -47,7 +47,8 @@
 
 import ZEO.zrpc.connection
 
-from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
+from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests, \
+     IterationTests
 from ZEO.tests.forker import get_port
 
 import ZEO.tests.ConnectionTests
@@ -56,7 +57,6 @@
 
 logger = logging.getLogger('ZEO.tests.testZEO')
 
-
 class DummyDB:
     def invalidate(self, *args):
         pass
@@ -158,7 +158,7 @@
     CommitLockTests.CommitLockVoteTests,
     ThreadTests.ThreadTests,
     # Locally defined (see above)
-    MiscZEOTests
+    MiscZEOTests,
     ):
 
     """Combine tests from various origins in one class."""
@@ -196,6 +196,15 @@
             for pid in self._pids:
                 os.waitpid(pid, 0)
 
+    def runTest(self):
+        try:
+            super(GenericTests, self).runTest()
+        except:
+            self._failed = True
+            raise
+        else:
+            self._failed = False
+
     def open(self, read_only=0):
         # Needed to support ReadOnlyStorage tests.  Ought to be a
         # cleaner way.
@@ -226,9 +235,75 @@
     PackableStorage.PackableUndoStorage,
     RevisionStorage.RevisionStorage,
     TransactionalUndoStorage.TransactionalUndoStorage,
+    IteratorStorage.IteratorStorage,
+    IterationTests.IterationTests,
     ):
     """Extend GenericTests with tests that MappingStorage can't pass."""
 
+class FileStorageRecoveryTests(StorageTestBase.StorageTestBase,
+                               RecoveryStorage.RecoveryStorage):
+
+    level = 2
+
+    def setUp(self):
+        self._storage = ZODB.FileStorage.FileStorage("Source.fs", create=True)
+        self._dst = ZODB.FileStorage.FileStorage("Dest.fs", create=True)
+
+    def getConfig(self):
+        filename = self.__fs_base = tempfile.mktemp()
+        return """\
+        <filestorage 1>
+        path %s
+        </filestorage>
+        """ % filename
+
+    def _new_storage(self):
+        port = get_port()
+        zconf = forker.ZEOConfig(('', port))
+        zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(),
+                                                              zconf, port)
+        blob_cache_dir = tempfile.mkdtemp()
+
+        self._pids.append(pid)
+        self._servers.append(adminaddr)
+        self._conf_paths.append(path)
+        self.blob_cache_dirs.append(blob_cache_dir)
+
+        storage = ClientStorage(
+            zport, '1', cache_size=20000000,
+            min_disconnect_poll=0.5, wait=1,
+            wait_timeout=60, blob_dir=blob_cache_dir)
+        storage.registerDB(DummyDB())
+        return storage
+
+    def setUp(self):
+        self._pids = []
+        self._servers = []
+        self._conf_paths = []
+        self.blob_cache_dirs = []
+
+        self._storage = self._new_storage()
+        self._dst = self._new_storage()
+
+    def tearDown(self):
+        self._storage.close()
+        self._dst.close()
+
+        for p in self._conf_paths:
+            os.remove(p)
+        for p in self.blob_cache_dirs:
+            ZODB.blob.remove_committed_dir(p)
+        for server in self._servers:
+            forker.shutdown_zeo_server(server)
+        if hasattr(os, 'waitpid'):
+            # Not in Windows Python until 2.3
+            for pid in self._pids:
+                os.waitpid(pid, 0)
+
+    def new_dest(self):
+        return self._new_storage()
+
+
 class FileStorageTests(FullGenericTests):
     """Test ZEO backed by a FileStorage."""
     level = 2
@@ -241,12 +316,36 @@
         </filestorage>
         """ % filename
 
+    def checkInterfaceFromRemoteStorage(self):
+        # ClientStorage itself doesn't implement IStorageIteration, but the
+        # FileStorage on the other end does, and thus the ClientStorage
+        # instance that is connected to it reflects this.
+        self.failIf(ZODB.interfaces.IStorageIteration.implementedBy(
+            ZEO.ClientStorage.ClientStorage))
+        self.failUnless(ZODB.interfaces.IStorageIteration.providedBy(
+            self._storage))
+        # This is communicated using ClientStorage's _info object:
+        self.assertEquals((('ZODB.interfaces', 'IStorageIteration'),
+                           ('zope.interface', 'Interface')),
+                          self._storage._info['interfaces'])
+
+
 class MappingStorageTests(GenericTests):
     """ZEO backed by a Mapping storage."""
 
     def getConfig(self):
         return """<mappingstorage 1/>"""
 
+    def checkSimpleIteration(self):
+        # The test base class IteratorStorage assumes that we keep undo data
+        # to construct our iterator, which we don't, so we disable this test.
+        pass
+
+    def checkUndoZombie(self):
+        # The test base class IteratorStorage assumes that we keep undo data
+        # to construct our iterator, which we don't, so we disable this test.
+        pass
+
 class DemoStorageTests(
     GenericTests,
     ):
@@ -260,6 +359,11 @@
         </demostorage>
         """ % tempfile.mktemp()
 
+    def checkUndoZombie(self):
+        # The test base class IteratorStorage assumes that we keep undo data
+        # to construct our iterator, which we don't, so we disable this test.
+        pass
+
 class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
     """Make sure a heartbeat is being sent and that it does no harm
 
@@ -554,7 +658,7 @@
         self._storage.close()
 
 
-class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
+class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests):
     """ZEO backed by a BlobStorage-adapted FileStorage."""
 
     def setUp(self):
@@ -645,7 +749,7 @@
         check_data(filename)
 
 
-class BlobWritableCacheTests(GenericTests, CommonBlobTests):
+class BlobWritableCacheTests(FullGenericTests, CommonBlobTests):
 
     def setUp(self):
         self.blobdir = self.blob_cache_dir = tempfile.mkdtemp()
@@ -823,7 +927,7 @@
     >>> st = StorageServerWrapper(sv, 'fs')
     >>> s = st.server
     
-Now, if we ask fior the invalidations since the last committed
+Now, if we ask for the invalidations since the last committed
 transaction, we'll get a result:
 
     >>> tid, oids = s.getInvalidations(last[-1])
@@ -849,7 +953,8 @@
     """
 
 
-test_classes = [FileStorageTests, MappingStorageTests, DemoStorageTests,
+test_classes = [FileStorageTests, FileStorageRecoveryTests,
+                MappingStorageTests, DemoStorageTests,
                 BlobAdaptedFileStorageTests, BlobWritableCacheTests]
 
 def test_suite():

Modified: ZODB/trunk/src/ZODB/BaseStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/BaseStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/BaseStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -21,14 +21,18 @@
 import logging
 from struct import pack as _structpack, unpack as _structunpack
 
+import zope.interface
+
 from persistent.TimeStamp import TimeStamp
 
+import ZODB.interfaces
 from ZODB import POSException
 from ZODB.utils import z64, oid_repr
 from ZODB.UndoLogCompatible import UndoLogCompatible
 
 log = logging.getLogger("ZODB.BaseStorage")
 
+
 class BaseStorage(UndoLogCompatible):
     """Base class that supports storage implementations.
 
@@ -188,6 +192,7 @@
                 ext = cPickle.dumps(ext, 1)
             else:
                 ext = ""
+
             self._ude = user, desc, ext
 
             if tid is None:
@@ -338,7 +343,7 @@
             if verbose:
                 print oid_repr(oid), r.version, len(r.data)
             if restoring:
-                dest.restore(oid, r.tid, r.data, '',
+                dest.restore(oid, r.tid, r.data, r.version,
                              r.data_txn, transaction)
             else:
                 pre = preget(oid, None)
@@ -348,10 +353,36 @@
         dest.tpc_vote(transaction)
         dest.tpc_finish(transaction)
 
-    fiter.close()
 
-class TransactionRecord:
+class TransactionRecord(object):
     """Abstract base class for iterator protocol"""
 
-class DataRecord:
+    zope.interface.implements(ZODB.interfaces.IStorageTransactionInformation)
+
+    def __init__(self, tid, status, user, description, extension):
+        self.tid = tid
+        self.status = status
+        self.user = user
+        self.description = description
+        self.extension = extension
+
+    # XXX This is a workaround to make the TransactionRecord compatible with a
+    # transaction object because it is passed to tpc_begin().
+    def _ext_set(self, value):
+        self.extension = value
+    def _ext_get(self):
+        return self.extension
+    _extension = property(fset=_ext_set, fget=_ext_get)
+
+
+class DataRecord(object):
     """Abstract base class for iterator protocol"""
+
+    zope.interface.implements(ZODB.interfaces.IStorageRecordInformation)
+
+    def __init__(self, oid, tid, data, version, prev):
+        self.oid = oid
+        self.tid = tid
+        self.data = data
+        self.version = version
+        self.data_txn = prev

Modified: ZODB/trunk/src/ZODB/DemoStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/DemoStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/DemoStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -80,15 +80,19 @@
 
 """
 
+import cPickle
 import base64, time
+
+import ZODB.BaseStorage
+import ZODB.interfaces
+import zope.interface
 from ZODB import POSException
 from ZODB.utils import z64, oid_repr
-from ZODB.BaseStorage import BaseStorage
 from persistent.TimeStamp import TimeStamp
-from cPickle import loads
 from BTrees import OOBTree
 
-class DemoStorage(BaseStorage):
+
+class DemoStorage(ZODB.BaseStorage.BaseStorage):
     """Demo storage
 
     Demo storages provide useful storages for writing tests because
@@ -104,9 +108,10 @@
     
     """
     
+    zope.interface.implements(ZODB.interfaces.IStorageIteration)
 
     def __init__(self, name='Demo Storage', base=None, quota=None):
-        BaseStorage.__init__(self, name, base)
+        ZODB.BaseStorage.BaseStorage.__init__(self, name, base)
 
         # We use a BTree because the items are sorted!
         self._data = OOBTree.OOBTree()
@@ -133,7 +138,7 @@
     # by the base storage, leading to a variety of "impossible" problems.
     def new_oid(self):
         if self._base is None:
-            return BaseStorage.new_oid(self)
+            return ZODB.BaseStorage.BaseStorage.new_oid(self)
         else:
             return self._base.new_oid()
 
@@ -317,6 +322,10 @@
         self._tsize = self._size + 120 + len(u) + len(d) + len(e)
 
     def _finish(self, tid, user, desc, ext):
+        if not self._tindex:
+            # No data, so we don't update anything.
+            return
+
         self._size = self._tsize
 
         self._data[tid] = None, user, desc, ext, tuple(self._tindex)
@@ -364,7 +373,7 @@
                      'time': TimeStamp(tid).timeTime(),
                      'user_name': u, 'description': d}
                 if e:
-                    d.update(loads(e))
+                    d.update(cPickle.loads(e))
                 if filter is None or filter(d):
                     if i >= first:
                         r.append(d)
@@ -569,3 +578,27 @@
     def close(self):
         if self._base is not None:
             self._base.close()
+
+    def iterator(self, start=None, end=None):
+        # First iterate over the base storage
+        if self._base is not None:
+            for transaction in self._base.iterator(start, end):
+                yield transaction
+        # Then iterate over our local transactions
+        for tid, transaction in self._data.items():
+            if tid >= start and tid <= end:
+                yield TransactionRecord(tid, transaction)
+
+
+class TransactionRecord(ZODB.BaseStorage.TransactionRecord):
+    
+    def __init__(self, tid, transaction):
+        packed, user, description, extension, records = transaction
+        super(TransactionRecord, self).__init__(
+            tid, packed, user, description, extension)
+        self.records = transaction
+
+    def __iter__(self):
+        for record in self.records:
+            oid, prev, version, data, tid = record
+            yield ZODB.BaseStorage.DataRecord(oid, tid, data, version, prev)

Modified: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/FileStorage/FileStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -29,6 +29,8 @@
 # Not all platforms have fsync
 fsync = getattr(os, "fsync", None)
 
+import zope.interface
+import ZODB.interfaces
 from ZODB import BaseStorage, ConflictResolution, POSException
 from ZODB.POSException import UndoError, POSKeyError, MultipleUndoErrors
 from persistent.TimeStamp import TimeStamp
@@ -88,6 +90,8 @@
                   ConflictResolution.ConflictResolvingStorage,
                   FileStorageFormatter):
 
+    zope.interface.implements(ZODB.interfaces.IStorageIteration)
+
     # Set True while a pack is in progress; undo is blocked for the duration.
     _pack_is_in_progress = False
 
@@ -1127,7 +1131,7 @@
 
             seek(0)
             return [(trans.tid, [(r.oid, '') for r in trans])
-                    for trans in FileIterator(self._file, pos=pos)]
+                    for trans in FileIterator(self._file_name, pos=pos)]
         finally:
             self._lock_release()
         
@@ -1516,31 +1520,16 @@
     file.seek(pos)
     file.truncate()
 
-class Iterator:
-    """A General simple iterator that uses the Python for-loop index protocol
-    """
-    __index=-1
-    __current=None
 
-    def __getitem__(self, i):
-        __index=self.__index
-        while i > __index:
-            __index=__index+1
-            self.__current=self.next(__index)
-
-        self.__index=__index
-        return self.__current
-
-
-class FileIterator(Iterator, FileStorageFormatter):
+class FileIterator(FileStorageFormatter):
     """Iterate over the transactions in a FileStorage file.
     """
     _ltid = z64
     _file = None
 
-    def __init__(self, file, start=None, stop=None, pos=4L):
-        if isinstance(file, str):
-            file = open(file, 'rb')
+    def __init__(self, filename, start=None, stop=None, pos=4L):
+        assert isinstance(filename, str)
+        file = open(filename, 'rb')
         self._file = file
         if file.read(4) != packed_version:
             raise FileStorageFormatError(file.name)
@@ -1602,14 +1591,17 @@
                     panic("%s has inconsistent transaction length at %s "
                           "(%s != %s)", file.name, pos, u64(rtl), u64(stl))
 
-    def next(self, index=0):
+    # Iterator protocol
+    def __iter__(self):
+        return self
+
+    def next(self):
         if self._file is None:
-            # A closed iterator.  Is IOError the best we can do?  For
-            # now, mimic a read on a closed file.
-            raise IOError('iterator is closed')
+            raise ZODB.interfaces.StorageStopIteration()
 
         pos = self._pos
-        while 1:
+        while True:
+
             # Read the transaction record
             try:
                 h = self._read_txn_header(pos)
@@ -1625,11 +1617,11 @@
             self._ltid = h.tid
 
             if self._stop is not None and h.tid > self._stop:
-                raise IndexError(index)
+                break
 
             if h.status == "c":
                 # Assume we've hit the last, in-progress transaction
-                raise IndexError(index)
+                break
 
             if pos + h.tlen + 8 > self._file_size:
                 # Hm, the data were truncated or the checkpoint flag wasn't
@@ -1679,8 +1671,8 @@
                     except:
                         pass
 
-                result = RecordIterator(h.tid, h.status, h.user, h.descr,
-                                        e, pos, tend, self._file, tpos)
+                result = TransactionRecord(h.tid, h.status, h.user, h.descr,
+                                           e, pos, tend, self._file, tpos)
 
             # Read the (intentionally redundant) transaction length
             self._file.seek(tend)
@@ -1693,23 +1685,25 @@
 
             return result
 
-        raise IndexError(index)
+        self.close()
+        raise ZODB.interfaces.StorageStopIteration()
 
-class RecordIterator(Iterator, BaseStorage.TransactionRecord,
-                     FileStorageFormatter):
+
+class TransactionRecord(BaseStorage.TransactionRecord, FileStorageFormatter):
     """Iterate over the transactions in a FileStorage file."""
+
     def __init__(self, tid, status, user, desc, ext, pos, tend, file, tpos):
-        self.tid = tid
-        self.status = status
-        self.user = user
-        self.description = desc
-        self._extension = ext
+        BaseStorage.TransactionRecord.__init__(
+            self, tid, status, user, desc, ext)
         self._pos = pos
         self._tend = tend
         self._file = file
         self._tpos = tpos
 
-    def next(self, index=0):
+    def __iter__(self):
+        return self
+
+    def next(self):
         pos = self._pos
         while pos < self._tend:
             # Read the data records for this transaction
@@ -1738,20 +1732,18 @@
                     # Should it go to the original data like BDBFullStorage?
                     prev_txn = self.getTxnFromData(h.oid, h.back)
 
-            r = Record(h.oid, h.tid, data, prev_txn, pos)
-            return r
+            return Record(h.oid, h.tid, data, prev_txn, pos)
 
-        raise IndexError(index)
+        raise ZODB.interfaces.StorageStopIteration()
 
+
 class Record(BaseStorage.DataRecord):
-    """An abstract database record."""
+
     def __init__(self, oid, tid, data, prev, pos):
-        self.oid = oid
-        self.tid = tid
-        self.data = data
-        self.data_txn = prev
+        super(Record, self).__init__(oid, tid, data, '', prev)
         self.pos = pos
 
+
 class UndoSearch:
 
     def __init__(self, file, pos, first, last, filter=None):

Modified: ZODB/trunk/src/ZODB/FileStorage/__init__.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/__init__.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/FileStorage/__init__.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -1,4 +1,8 @@
 # this is a package
 
-from ZODB.FileStorage.FileStorage import FileStorage, RecordIterator
+from ZODB.FileStorage.FileStorage import FileStorage, TransactionRecord
 from ZODB.FileStorage.FileStorage import FileIterator, Record, packed_version
+
+
+# BBB Alias for compatibility
+RecordIterator = TransactionRecord

Modified: ZODB/trunk/src/ZODB/MappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/MappingStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/MappingStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -21,16 +21,16 @@
 The Mapping storage uses a single data structure to map object ids to data.
 """
 
+import ZODB.BaseStorage
 from ZODB.utils import u64, z64
-from ZODB.BaseStorage import BaseStorage
 from ZODB import POSException
 from persistent.TimeStamp import TimeStamp
 
 
-class MappingStorage(BaseStorage):
+class MappingStorage(ZODB.BaseStorage.BaseStorage):
 
     def __init__(self, name='Mapping Storage'):
-        BaseStorage.__init__(self, name)
+        ZODB.BaseStorage.BaseStorage.__init__(self, name)
         # ._index maps an oid to a string s.  s[:8] is the tid of the
         # transaction that created oid's current state, and s[8:] is oid's
         # current state.

Modified: ZODB/trunk/src/ZODB/blob.py
===================================================================
--- ZODB/trunk/src/ZODB/blob.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/blob.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -324,6 +324,21 @@
         """
         return os.path.join(self.base_dir, utils.oid_repr(oid))
 
+    def createPathForOID(self, oid):
+        """Given an OID, creates a directory on the filesystem where
+        the blob data relating to that OID is stored, if it doesn't exist.
+
+        """
+        path = self.getPathForOID(oid)
+        if os.path.exists(path):
+            return
+        try:
+            os.makedirs(path, 0700)
+        except OSError:
+            # We might have lost a race.  If so, the directory
+            # must exist now
+            assert os.path.exists(path)
+
     def getBlobFilename(self, oid, tid):
         """Given an oid and a tid, return the full filename of the
         'committed' blob file related to that oid and tid.

Modified: ZODB/trunk/src/ZODB/fsrecover.py
===================================================================
--- ZODB/trunk/src/ZODB/fsrecover.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/fsrecover.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -82,7 +82,7 @@
 
 import ZODB.FileStorage
 from ZODB.utils import u64
-from ZODB.FileStorage import RecordIterator
+from ZODB.FileStorage import TransactionRecord
 
 from persistent.TimeStamp import TimeStamp
 
@@ -146,8 +146,8 @@
         except: e={}
     else: e={}
 
-    result = RecordIterator(tid, status, user, description, e, pos, tend,
-                            f, tpos)
+    result = TransactionRecord(tid, status, user, description, e, pos, tend,
+                               f, tpos)
     pos = tend
 
     # Read the (intentionally redundant) transaction length

Modified: ZODB/trunk/src/ZODB/interfaces.py
===================================================================
--- ZODB/trunk/src/ZODB/interfaces.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/interfaces.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -286,6 +286,7 @@
         begins or until the connection os reopned.
         """
 
+
 class IStorageDB(Interface):
     """Database interface exposed to storages
 
@@ -418,6 +419,7 @@
         should also close all the Connections.
         """
 
+
 class IStorage(Interface):
     """A storage is responsible for storing and retrieving data of objects.
     """
@@ -710,6 +712,7 @@
 
         """
 
+
 class IStorageRestoreable(IStorage):
     """Copying Transactions
 
@@ -744,7 +747,7 @@
         #   failed to take into account records after the pack time.
         
 
-    def restore(oid, serial, data, prev_txn, transaction):
+    def restore(oid, serial, data, version, prev_txn, transaction):
         """Write data already committed in a separate database
 
         The restore method is used when copying data from one database
@@ -775,41 +778,44 @@
         Nothing is returned.
         """
 
+
 class IStorageRecordInformation(Interface):
     """Provide information about a single storage record
     """
 
     oid = Attribute("The object id")
+    tid = Attribute("The transaction id")
     data = Attribute("The data record")
+    version = Attribute("The version id")
+    data_txn = Attribute("The previous transaction id")
 
+
 class IStorageTransactionInformation(Interface):
-    """Provide information about a storage transaction
+    """Provide information about a storage transaction.
+
+    Can be iterated over to retrieve the records modified in the transaction.
+
     """
 
     tid = Attribute("Transaction id")
     status = Attribute("Transaction Status") # XXX what are valid values?
     user = Attribute("Transaction user")
     description = Attribute("Transaction Description")
-    extension = Attribute("Transaction extension data")
+    extension = Attribute("A dictionary carrying the transaction's extension data")
 
     def __iter__():
-        """Return an iterable of IStorageRecordInformation
+        """Iterate over the transaction's records given as
+        IStorageRecordInformation objects.
+
         """
 
+
 class IStorageIteration(Interface):
-    """API for iterating over the contents of a storage
+    """API for iterating over the contents of a storage."""
 
-    Note that this is a future API.  Some storages now provide an
-    approximation of this.
-
-    """
-
     def iterator(start=None, stop=None):
         """Return an IStorageTransactionInformation iterator.
 
-        An IStorageTransactionInformation iterator is returned for
-        iterating over the transactions in the storage.
-
         If the start argument is not None, then iteration will start
         with the first transaction whose identifier is greater than or
         equal to start.
@@ -818,8 +824,12 @@
         the last transaction whose identifier is less than or equal to
         stop.
 
+        The iterator provides access to the data as available at the time when
+        the iterator was retrieved.
+
         """
 
+
 class IStorageUndoable(IStorage):
     """A storage supporting transactional undo.
     """
@@ -932,6 +942,7 @@
         
         """
 
+
 class IBlob(Interface):
     """A BLOB supports efficient handling of large data within ZODB."""
 
@@ -986,5 +997,12 @@
         If Blobs use this, then commits can be performed with a simple rename.
         """
 
+
 class BlobError(Exception):
     pass
+
+
+class StorageStopIteration(IndexError, StopIteration):
+    """A combination of StopIteration and IndexError to provide a
+    backwards-compatible exception.
+    """

Modified: ZODB/trunk/src/ZODB/tests/IteratorStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/IteratorStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/IteratorStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -15,6 +15,7 @@
 
 Any storage that supports the iterator() method should be able to pass
 all these tests.
+
 """
 
 from ZODB.tests.MinPO import MinPO
@@ -23,13 +24,16 @@
 
 from transaction import Transaction
 
+import itertools
+
+
 class IteratorCompare:
 
     def iter_verify(self, txniter, revids, val0):
         eq = self.assertEqual
         oid = self._oid
         val = val0
-        for reciter, revid in zip(txniter, revids + [None]):
+        for reciter, revid in itertools.izip(txniter, revids + [None]):
             eq(reciter.tid, revid)
             for rec in reciter:
                 eq(rec.oid, oid)
@@ -37,8 +41,8 @@
                 eq(zodb_unpickle(rec.data), MinPO(val))
                 val = val + 1
         eq(val, val0 + len(revids))
-        txniter.close()
 
+
 class IteratorStorage(IteratorCompare):
 
     def checkSimpleIteration(self):
@@ -51,13 +55,6 @@
         txniter = self._storage.iterator()
         self.iter_verify(txniter, [revid1, revid2, revid3], 11)
 
-    def checkClose(self):
-        self._oid = oid = self._storage.new_oid()
-        revid1 = self._dostore(oid, data=MinPO(11))
-        txniter = self._storage.iterator()
-        txniter.close()
-        self.assertRaises(IOError, txniter.__getitem__, 0)
-
     def checkUndoZombie(self):
         oid = self._storage.new_oid()
         revid = self._dostore(oid, data=MinPO(94))
@@ -89,7 +86,7 @@
         iter = self._storage.iterator()
         count = 0
         for txn in iter:
-            self.assertEqual(txn._extension, {})
+            self.assertEqual(txn.extension, {})
             count +=1
         self.assertEqual(count, 1)
 
@@ -129,9 +126,34 @@
                     match = True
         if not match:
             self.fail("Could not find transaction with matching id")
- 
 
+    def checkIterateRepeatedly(self):
+        self._dostore()
+        transactions = self._storage.iterator()
+        self.assertEquals(1, len(list(transactions)))
+        # The iterator can only be consumed once:
+        self.assertEquals(0, len(list(transactions)))
 
+    def checkIterateRecordsRepeatedly(self):
+        self._dostore()
+        tinfo = self._storage.iterator().next()
+        self.assertEquals(1, len(list(tinfo)))
+        # The iterator can only be consumed once:
+        self.assertEquals(0, len(list(tinfo)))
+
+    def checkIterateWhileWriting(self):
+        self._dostore()
+        iterator = self._storage.iterator()
+        # We have one transaction with 1 modified object.
+        txn_1 = iterator.next()
+        self.assertEquals(1, len(list(txn_1)))
+
+        # We store another transaction with 1 object, the already running
+        # iterator does not pick this up.
+        self._dostore()
+        self.assertRaises(StopIteration, iterator.next)
+
+
 class ExtendedIteratorStorage(IteratorCompare):
 
     def checkExtendedIteration(self):
@@ -173,28 +195,36 @@
         txniter = self._storage.iterator(revid3, revid3)
         self.iter_verify(txniter, [revid3], 13)
 
+
 class IteratorDeepCompare:
+
     def compare(self, storage1, storage2):
         eq = self.assertEqual
         iter1 = storage1.iterator()
         iter2 = storage2.iterator()
-        for txn1, txn2 in zip(iter1, iter2):
+        for txn1, txn2 in itertools.izip(iter1, iter2):
             eq(txn1.tid,         txn2.tid)
             eq(txn1.status,      txn2.status)
             eq(txn1.user,        txn2.user)
             eq(txn1.description, txn2.description)
-            eq(txn1._extension,  txn2._extension)
-            for rec1, rec2 in zip(txn1, txn2):
+            eq(txn1.extension,  txn2.extension)
+            itxn1 = iter(txn1)
+            itxn2 = iter(txn2)
+            for rec1, rec2 in itertools.izip(itxn1, itxn2):
                 eq(rec1.oid,     rec2.oid)
                 eq(rec1.tid,  rec2.tid)
                 eq(rec1.data,    rec2.data)
             # Make sure there are no more records left in rec1 and rec2,
             # meaning they were the same length.
-            self.assertRaises(IndexError, txn1.next)
-            self.assertRaises(IndexError, txn2.next)
+            # Additionally, check that we're backwards compatible to the
+            # IndexError we used to raise before.
+            self.assertRaises(IndexError, itxn1.next)
+            self.assertRaises(IndexError, itxn2.next)
+            self.assertRaises(StopIteration, itxn1.next)
+            self.assertRaises(StopIteration, itxn2.next)
         # Make sure ther are no more records left in txn1 and txn2, meaning
         # they were the same length
         self.assertRaises(IndexError, iter1.next)
         self.assertRaises(IndexError, iter2.next)
-        iter1.close()
-        iter2.close()
+        self.assertRaises(StopIteration, iter1.next)
+        self.assertRaises(StopIteration, iter2.next)

Modified: ZODB/trunk/src/ZODB/tests/PackableStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/PackableStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/PackableStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -30,6 +30,7 @@
 from persistent import Persistent
 from persistent.mapping import PersistentMapping
 import transaction
+import ZODB.interfaces
 from ZODB import DB
 from ZODB.serialize import referencesf
 from ZODB.tests.MinPO import MinPO
@@ -149,6 +150,16 @@
             self._storage.tpc_vote(t)
             self._storage.tpc_finish(t)
 
+    def _sanity_check(self):
+        # Iterate over the storage to make sure it's sane.
+        if not ZODB.interfaces.IStorageIteration.providedBy(self._storage):
+            return
+        it = self._storage.iterator()
+        for txn in it:
+            for data in txn:
+                pass
+
+
 class PackableStorage(PackableStorageBase):
 
     def checkPackEmptyStorage(self):
@@ -253,17 +264,8 @@
 
             self.fail('a thread is still alive')
 
-        # Iterate over the storage to make sure it's sane, but not every
-        # storage supports iterators.
-        if not hasattr(self._storage, "iterator"):
-            return
+        self._sanity_check()
 
-        it = self._storage.iterator()
-        for txn in it:
-            for data in txn:
-                pass
-        it.close()
-
     def checkPackWhileWriting(self):
         self._PackWhileWriting(pack_now=False)
 
@@ -304,14 +306,7 @@
             packt = time.time()
         thread.join()
 
-        # Iterate over the storage to make sure it's sane.
-        if not hasattr(self._storage, "iterator"):
-            return
-        it = self._storage.iterator()
-        for txn in it:
-            for data in txn:
-                pass
-        it.close()
+        self._sanity_check()
 
     def checkPackWithMultiDatabaseReferences(self):
         databases = {}

Modified: ZODB/trunk/src/ZODB/tests/RecoveryStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/RecoveryStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/RecoveryStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -22,6 +22,7 @@
 
 import time
 
+
 class RecoveryStorage(IteratorDeepCompare):
     # Requires a setUp() that creates a self._dst destination storage
     def checkSimpleRecovery(self):
@@ -49,12 +50,16 @@
         # copy the final transaction manually.  even though there
         # was a pack, the restore() ought to succeed.
         it = self._storage.iterator()
-        final = list(it)[-1]
+        # Get the last transaction and its record iterator. Record iterators
+        # can't be accessed out-of-order, so we need to do this in a bit
+        # complicated way:
+        for final  in it:
+            records = list(final)
+
         self._dst.tpc_begin(final, final.tid, final.status)
-        for r in final:
+        for r in records:
             self._dst.restore(r.oid, r.tid, r.data, '', r.data_txn,
                               final)
-        it.close()
         self._dst.tpc_vote(final)
         self._dst.tpc_finish(final)
 

Modified: ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -636,11 +636,13 @@
             for j in range(OBJECTS):
                 oid = s.new_oid()
                 obj = MinPO(i * OBJECTS + j)
-                revid = s.store(oid, None, zodb_pickle(obj), '', t)
-                orig.append((tid, oid, revid))
+                s.store(oid, None, zodb_pickle(obj), '', t)
+                orig.append((tid, oid))
             s.tpc_vote(t)
             s.tpc_finish(t)
 
+        orig = [(tid, oid, s.getTid(oid)) for tid, oid in orig]
+
         i = 0
         for tid, oid, revid in orig:
             self._dostore(oid, revid=revid, data=MinPO(revid),
@@ -668,14 +670,11 @@
         #     OBJECTS * BATCHES modifications, followed by
         #     BATCHES undos
 
-        iter = s.iterator()
-        offset = 0
-
+        transactions = s.iterator()
         eq = self.assertEqual
 
         for i in range(BATCHES):
-            txn = iter[offset]
-            offset += 1
+            txn = transactions.next()
 
             tid = p64(i + 1)
             eq(txn.tid, tid)
@@ -687,13 +686,11 @@
             eq(L1, L2)
 
         for i in range(BATCHES * OBJECTS):
-            txn = iter[offset]
-            offset += 1
+            txn = transactions.next()
             eq(len([rec for rec in txn if rec.data_txn is None]), 1)
 
         for i in range(BATCHES):
-            txn = iter[offset]
-            offset += 1
+            txn = transactions.next()
 
             # The undos are performed in reverse order.
             otid = p64(BATCHES - i)
@@ -704,7 +701,7 @@
             L2.sort()
             eq(L1, L2)
 
-        self.assertRaises(IndexError, iter.__getitem__, offset)
+        self.assertRaises(StopIteration, transactions.next)
 
     def checkUndoLogMetadata(self):
         # test that the metadata is correct in the undo log

Modified: ZODB/trunk/src/ZODB/tests/testFileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testFileStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/testFileStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -308,6 +308,7 @@
             else:
                 self.assertNotEqual(next_oid, None)
 
+
 class FileStorageRecoveryTest(
     StorageTestBase.StorageTestBase,
     RecoveryStorage.RecoveryStorage,
@@ -326,6 +327,40 @@
     def new_dest(self):
         return ZODB.FileStorage.FileStorage('Dest.fs')
 
+
+class FileStorageNoRestore(ZODB.FileStorage.FileStorage):
+
+    @property
+    def restore(self):
+        raise Exception
+
+
+class FileStorageNoRestoreRecoveryTest(
+    StorageTestBase.StorageTestBase,
+    RecoveryStorage.RecoveryStorage,
+    ):
+    # This test actually verifies a code path of
+    # BaseStorage.copyTransactionsFrom. For simplicity of implementation, we
+    # use a FileStorage deprived of its restore method.
+
+    def setUp(self):
+        self._storage = FileStorageNoRestore("Source.fs", create=True)
+        self._dst = FileStorageNoRestore("Dest.fs", create=True)
+
+    def tearDown(self):
+        self._storage.close()
+        self._dst.close()
+        self._storage.cleanup()
+        self._dst.cleanup()
+
+    def new_dest(self):
+        return FileStorageNoRestore('Dest.fs')
+
+    def checkRestoreAcrossPack(self):
+        # Skip this check as it calls restore directly.
+        pass
+
+
 class SlowFileStorageTest(BaseFileStorageTests):
 
     level = 2
@@ -492,7 +527,8 @@
 
     suite = unittest.TestSuite()
     for klass in [FileStorageTests, Corruption.FileStorageCorruptTests,
-                  FileStorageRecoveryTest, SlowFileStorageTest]:
+                  FileStorageRecoveryTest, FileStorageNoRestoreRecoveryTest,
+                  SlowFileStorageTest]:
         suite.addTest(unittest.makeSuite(klass, "check"))
     suite.addTest(doctest.DocTestSuite(setUp=ZODB.tests.util.setUp,
                                        tearDown=ZODB.tests.util.tearDown))

Modified: ZODB/trunk/src/ZODB/tests/testMappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testMappingStorage.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/testMappingStorage.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -37,6 +37,7 @@
         # have this limit, so we inhibit this test here.
         pass
 
+
 def test_suite():
     suite = unittest.makeSuite(MappingStorageTests, 'check')
     return suite

Modified: ZODB/trunk/src/ZODB/utils.py
===================================================================
--- ZODB/trunk/src/ZODB/utils.py	2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/utils.py	2008-08-30 13:21:49 UTC (rev 90616)
@@ -295,5 +295,3 @@
     handle, filename = mkstemp(dir=dir)
     os.close(handle)
     return filename
-
-



More information about the Zodb-checkins mailing list