[Zodb-checkins] SVN: ZODB/trunk/src/ Merge gocept-iteration branch.
Christian Theune
ct at gocept.com
Sat Aug 30 09:21:50 EDT 2008
Log message for revision 90616:
Merge gocept-iteration branch.
Changed:
U ZODB/trunk/src/CHANGES.txt
U ZODB/trunk/src/ZEO/ClientStorage.py
U ZODB/trunk/src/ZEO/CommitLog.py
U ZODB/trunk/src/ZEO/ServerStub.py
U ZODB/trunk/src/ZEO/StorageServer.py
U ZODB/trunk/src/ZEO/tests/ConnectionTests.py
A ZODB/trunk/src/ZEO/tests/IterationTests.py
U ZODB/trunk/src/ZEO/tests/testZEO.py
U ZODB/trunk/src/ZODB/BaseStorage.py
U ZODB/trunk/src/ZODB/DemoStorage.py
U ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
U ZODB/trunk/src/ZODB/FileStorage/__init__.py
U ZODB/trunk/src/ZODB/MappingStorage.py
U ZODB/trunk/src/ZODB/blob.py
U ZODB/trunk/src/ZODB/fsrecover.py
U ZODB/trunk/src/ZODB/interfaces.py
U ZODB/trunk/src/ZODB/tests/IteratorStorage.py
U ZODB/trunk/src/ZODB/tests/PackableStorage.py
U ZODB/trunk/src/ZODB/tests/RecoveryStorage.py
U ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py
U ZODB/trunk/src/ZODB/tests/testFileStorage.py
U ZODB/trunk/src/ZODB/tests/testMappingStorage.py
U ZODB/trunk/src/ZODB/utils.py
-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/CHANGES.txt 2008-08-30 13:21:49 UTC (rev 90616)
@@ -8,6 +8,9 @@
New Features
------------
+- Cleaned-up the storage iteration API and provided an iterator implementation
+ for ZEO.
+
- Versions are no-longer supported.
- ZEO cache files can be larger than 4G. Note that older ZEO cache
Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/ClientStorage.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -29,8 +29,9 @@
import time
import types
import logging
+import weakref
-from zope.interface import implements
+import zope.interface
from ZEO import ServerStub
from ZEO.cache import ClientCache
from ZEO.TransactionBuffer import TransactionBuffer
@@ -38,11 +39,12 @@
from ZEO.auth import get_module
from ZEO.zrpc.client import ConnectionManager
+import ZODB.interfaces
import ZODB.lock_file
+import ZODB.BaseStorage
from ZODB import POSException
from ZODB import utils
from ZODB.loglevels import BLATHER
-from ZODB.interfaces import IBlobStorage
from ZODB.blob import rename_or_copy_blob
from persistent.TimeStamp import TimeStamp
@@ -103,8 +105,11 @@
"""
- implements(IBlobStorage)
+ # ClientStorage does not declare any interfaces here. Interfaces are
+ # declared according to the server's storage once a connection is
+ # established.
+
# Classes we instantiate. A subclass might override.
TransactionBufferClass = TransactionBuffer
ClientCacheClass = ClientCache
@@ -260,6 +265,9 @@
self._password = password
self._realm = realm
+ self._iterators = weakref.WeakValueDictionary()
+ self._iterator_ids = set()
+
# Flag tracking disconnections in the middle of a transaction. This
# is reset in tpc_begin() and set in notifyDisconnected().
self._midtxn_disconnect = 0
@@ -270,7 +278,7 @@
self._verification_invalidations = None
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
- 'supportsUndo':0}
+ 'supportsUndo': 0, 'interfaces': ()}
self._tbuf = self.TransactionBufferClass()
self._db = None
@@ -526,6 +534,15 @@
self._handle_extensions()
+ # Decorate ClientStorage with all interfaces that the backend storage
+ # supports.
+ remote_interfaces = []
+ for module_name, interface_name in self._info['interfaces']:
+ module = __import__(module_name, globals(), locals(), [interface_name])
+ interface = getattr(module, interface_name)
+ remote_interfaces.append(interface)
+ zope.interface.directlyProvides(self, remote_interfaces)
+
def _handle_extensions(self):
for name in self.getExtensionMethods().keys():
if not hasattr(self, name):
@@ -633,6 +650,7 @@
self._ready.clear()
self._server = disconnected_stub
self._midtxn_disconnect = 1
+ self._iterator_gc()
def __len__(self):
"""Return the size of the storage."""
@@ -933,14 +951,7 @@
raise POSException.POSKeyError("No blob file", oid, serial)
# First, we'll create the directory for this oid, if it doesn't exist.
- targetpath = self.fshelper.getPathForOID(oid)
- if not os.path.exists(targetpath):
- try:
- os.makedirs(targetpath, 0700)
- except OSError:
- # We might have lost a race. If so, the directory
- # must exist now
- assert os.path.exists(targetpath)
+ self.fshelper.createPathForOID(oid)
# OK, it's not here and we (or someone) needs to get it. We
# want to avoid getting it multiple times. We want to avoid
@@ -1075,6 +1086,7 @@
self._tbuf.clear()
self._seriald.clear()
del self._serials[:]
+ self._iterator_gc()
self.end_transaction()
def tpc_finish(self, txn, f=None):
@@ -1104,6 +1116,7 @@
assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
finally:
self._load_lock.release()
+ self._iterator_gc()
self.end_transaction()
def _update_cache(self, tid):
@@ -1176,6 +1189,25 @@
return []
return self._server.undoLog(first, last)
+ # Recovery support
+
+ def copyTransactionsFrom(self, other, verbose=0):
+ """Copy transactions from another storage.
+
+ This is typically used for converting data from one storage to
+ another. `other` must have an .iterator() method.
+ """
+ ZODB.BaseStorage.copy(other, self, verbose)
+
+ def restore(self, oid, serial, data, version, prev_txn, transaction):
+ """Write data already committed in a separate database."""
+ assert not version
+ self._check_trans(transaction)
+ self._server.restorea(oid, serial, data, prev_txn, id(transaction))
+ # Don't update the transaction buffer, because current data are
+ # unaffected.
+ return self._check_serials()
+
# Below are methods invoked by the StorageServer
def serialnos(self, args):
@@ -1249,3 +1281,103 @@
invalidate = invalidateVerify
end = endVerify
Invalidate = invalidateTrans
+
+ # IStorageIteration
+
+ def iterator(self, start=None, stop=None):
+ """Return an IStorageTransactionInformation iterator."""
+ # iids are "iterator IDs" that can be used to query an iterator whose
+ # status is held on the server.
+ iid = self._server.iterator_start(start, stop)
+ return self._setup_iterator(TransactionIterator, iid)
+
+ def _setup_iterator(self, factory, iid, *args):
+ self._iterators[iid] = iterator = factory(self, iid, *args)
+ self._iterator_ids.add(iid)
+ return iterator
+
+ def _forget_iterator(self, iid):
+ self._iterators.pop(iid, None)
+ self._iterator_ids.remove(iid)
+
+ def _iterator_gc(self):
+ iids = self._iterator_ids - set(self._iterators)
+ try:
+ self._server.iterator_gc(list(iids))
+ except ClientDisconnected:
+ # We could not successfully garbage-collect iterators.
+ # The server might have been restarted, so the IIDs might mean
+ # something different now. We simply forget our unused IIDs to
+ # avoid gc'ing foreign iterators.
+ # In the case that the server was not restarted, we accept the
+ # risk of leaking resources on the ZEO server.
+ pass
+ self._iterator_ids -= iids
+
+
+class TransactionIterator(object):
+
+ def __init__(self, storage, iid, *args):
+ self._storage = storage
+ self._iid = iid
+ self._ended = False
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if self._ended:
+ raise ZODB.interfaces.StorageStopIteration()
+
+ tx_data = self._storage._server.iterator_next(self._iid)
+ if tx_data is None:
+ # The iterator is exhausted, and the server has already
+ # disposed it.
+ self._ended = True
+ self._storage._forget_iterator(self._iid)
+ raise ZODB.interfaces.StorageStopIteration()
+
+ return ClientStorageTransactionInformation(self._storage, self, *tx_data)
+
+
+class ClientStorageTransactionInformation(ZODB.BaseStorage.TransactionRecord):
+
+ def __init__(self, storage, txiter, tid, status, user, description, extension):
+ self._storage = storage
+ self._txiter = txiter
+ self._completed = False
+ self._riid = None
+
+ self.tid = tid
+ self.status = status
+ self.user = user
+ self.description = description
+ self.extension = extension
+
+ def __iter__(self):
+ riid = self._storage._server.iterator_record_start(self._txiter._iid, self.tid)
+ return self._storage._setup_iterator(RecordIterator, riid)
+
+
+class RecordIterator(object):
+
+ def __init__(self, storage, riid):
+ self._riid = riid
+ self._completed = False
+ self._storage = storage
+
+ def __iter__(self):
+ return self
+
+ def next(self):
+ if self._completed:
+ # We finished iteration once already and the server can't know
+ # about the iteration anymore.
+ raise ZODB.interfaces.StorageStopIteration()
+ item = self._storage._server.iterator_record_next(self._riid)
+ if item is None:
+ # The iterator is exhausted, and the server has already
+ # disposed it.
+ self._completed = True
+ raise ZODB.interfaces.StorageStopIteration()
+ return ZODB.BaseStorage.DataRecord(*item)
Modified: ZODB/trunk/src/ZEO/CommitLog.py
===================================================================
--- ZODB/trunk/src/ZEO/CommitLog.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/CommitLog.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -18,10 +18,13 @@
concurrent commits are achieved by logging actions up until the
tpc_vote(). At that point, the entire transaction is committed on the
real storage.
+
"""
+
import cPickle
import tempfile
+
class CommitLog:
def __init__(self):
@@ -35,9 +38,13 @@
return self.file.tell()
def store(self, oid, serial, data):
- self.pickler.dump((oid, serial, data))
+ self.pickler.dump(('s', oid, serial, data))
self.stores += 1
+ def restore(self, oid, serial, data, prev_txn):
+ self.pickler.dump(('r', oid, serial, data, prev_txn))
+ self.stores += 1
+
def get_loader(self):
self.read = 1
self.file.seek(0)
Modified: ZODB/trunk/src/ZEO/ServerStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ServerStub.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/ServerStub.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -209,6 +209,10 @@
def storea(self, oid, serial, data, id):
self.rpc.callAsync('storea', oid, serial, data, '', id)
+ def restorea(self, oid, serial, data, prev_txn, id):
+ self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id)
+
+
def storeBlob(self, oid, serial, data, blobfilename, txn):
# Store a blob to the server. We don't want to real all of
@@ -292,6 +296,22 @@
def undoInfo(self, first, last, spec):
return self.rpc.call('undoInfo', first, last, spec)
+ def iterator_start(self, start, stop):
+ return self.rpc.call('iterator_start', start, stop)
+
+ def iterator_next(self, iid):
+ return self.rpc.call('iterator_next', iid)
+
+ def iterator_record_start(self, txn_iid, tid):
+ return self.rpc.call('iterator_record_start', txn_iid, tid)
+
+ def iterator_record_next(self, iid):
+ return self.rpc.call('iterator_record_next', iid)
+
+ def iterator_gc(self, iids):
+ return self.rpc.call('iterator_gc', iids)
+
+
class ExtensionMethodWrapper:
def __init__(self, rpc, name):
self.rpc = rpc
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/StorageServer.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -29,12 +29,14 @@
import threading
import time
import warnings
+import itertools
import transaction
import ZODB.serialize
import ZEO.zrpc.error
+import zope.interface
from ZEO import ClientStub
from ZEO.CommitLog import CommitLog
from ZEO.monitor import StorageStats, StatsServer
@@ -53,7 +55,6 @@
logger = logging.getLogger('ZEO.StorageServer')
-
# TODO: This used to say "ZSS", which is now implied in the logger name.
# Can this be either set to str(os.getpid()) (if that makes sense) or removed?
_label = "" # default label used for logging.
@@ -110,6 +111,11 @@
self._extensions = {}
for func in self.extensions:
self._extensions[func.func_name] = None
+ self._iterators = {}
+ self._iterator_ids = itertools.count()
+ # Stores the last item that was handed out for a
+ # transaction iterator.
+ self._txn_iterators_last = {}
def finish_auth(self, authenticated):
if not self.auth_realm:
@@ -272,6 +278,12 @@
else:
supportsUndo = supportsUndo()
+ # Communicate the backend storage interfaces to the client
+ storage_provides = zope.interface.providedBy(storage)
+ interfaces = []
+ for candidate in storage_provides.__iro__:
+ interfaces.append((candidate.__module__, candidate.__name__))
+
return {'length': len(storage),
'size': storage.getSize(),
'name': storage.getName(),
@@ -279,6 +291,7 @@
'supportsVersions': False,
'extensionMethods': self.getExtensionMethods(),
'supports_record_iternext': hasattr(self, 'record_iternext'),
+ 'interfaces': tuple(interfaces),
}
def get_size_info(self):
@@ -477,6 +490,11 @@
self.stats.stores += 1
self.txnlog.store(oid, serial, data)
+ def restorea(self, oid, serial, data, prev_txn, id):
+ self._check_tid(id, exc=StorageTransactionError)
+ self.stats.stores += 1
+ self.txnlog.restore(oid, serial, data, prev_txn)
+
def storeBlobStart(self):
assert self.blob_tempfile is None
self.blob_tempfile = tempfile.mkstemp(
@@ -544,16 +562,7 @@
# Unexpected errors are logged and passed to the client
self.log("store error: %s, %s" % sys.exc_info()[:2],
logging.ERROR, exc_info=True)
- # Try to pickle the exception. If it can't be pickled,
- # the RPC response would fail, so use something else.
- pickler = cPickle.Pickler()
- pickler.fast = 1
- try:
- pickler.dump(err, 1)
- except:
- msg = "Couldn't pickle storage exception: %s" % repr(err)
- self.log(msg, logging.ERROR)
- err = StorageServerError(msg)
+ err = self._marshal_error(err)
# The exception is reported back as newserial for this oid
newserial = [(oid, err)]
else:
@@ -575,6 +584,37 @@
return err is None
+ def _restore(self, oid, serial, data, prev_txn):
+ err = None
+ try:
+ self.storage.restore(oid, serial, data, '', prev_txn, self.transaction)
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except Exception, err:
+ self.store_failed = 1
+ if not isinstance(err, TransactionError):
+ # Unexpected errors are logged and passed to the client
+ self.log("store error: %s, %s" % sys.exc_info()[:2],
+ logging.ERROR, exc_info=True)
+ err = self._marshal_error(err)
+ # The exception is reported back as newserial for this oid
+ self.serials.append((oid, err))
+
+ return err is None
+
+ def _marshal_error(self, error):
+ # Try to pickle the exception. If it can't be pickled,
+ # the RPC response would fail, so use something that can be pickled.
+ pickler = cPickle.Pickler()
+ pickler.fast = 1
+ try:
+ pickler.dump(error, 1)
+ except:
+ msg = "Couldn't pickle storage exception: %s" % repr(error)
+ self.log(msg, logging.ERROR)
+ error = StorageServerError(msg)
+ return error
+
def _vote(self):
if not self.store_failed:
# Only call tpc_vote of no store call failed, otherwise
@@ -627,8 +667,18 @@
self._tpc_begin(self.transaction, self.tid, self.status)
loads, loader = self.txnlog.get_loader()
for i in range(loads):
- # load oid, serial, data, version
- if not self._store(*loader.load()):
+ store = loader.load()
+ store_type = store[0]
+ store_args = store[1:]
+
+ if store_type == 's':
+ do_store = self._store
+ elif store_type == 'r':
+ do_store = self._restore
+ else:
+ raise ValueError('Invalid store type: %r' % store_type)
+
+ if not do_store(*store_args):
break
# Blob support
@@ -683,6 +733,62 @@
abortVersion = commitVersion
+ # IStorageIteration support
+
+ def iterator_start(self, start, stop):
+ iid = self._iterator_ids.next()
+ self._iterators[iid] = iter(self.storage.iterator(start, stop))
+ return iid
+
+ def iterator_next(self, iid):
+ iterator = self._iterators[iid]
+ try:
+ info = iterator.next()
+ except StopIteration:
+ del self._iterators[iid]
+ item = None
+ if iid in self._txn_iterators_last:
+ del self._txn_iterators_last[iid]
+ else:
+ item = (info.tid,
+ info.status,
+ info.user,
+ info.description,
+ info.extension)
+ # Keep a reference to the last iterator result to allow starting a
+ # record iterator off it.
+ self._txn_iterators_last[iid] = info
+ return item
+
+ def iterator_record_start(self, txn_iid, tid):
+ record_iid = self._iterator_ids.next()
+ txn_info = self._txn_iterators_last[txn_iid]
+ if txn_info.tid != tid:
+ raise Exception(
+ 'Out-of-order request for record iterator for transaction %r' % tid)
+ self._iterators[record_iid] = iter(txn_info)
+ return record_iid
+
+ def iterator_record_next(self, iid):
+ iterator = self._iterators[iid]
+ try:
+ info = iterator.next()
+ except StopIteration:
+ del self._iterators[iid]
+ item = None
+ else:
+ item = (info.oid,
+ info.tid,
+ info.data,
+ info.version,
+ info.data_txn)
+ return item
+
+ def iterator_gc(self, iids):
+ for iid in iids:
+ self._iterators.pop(iid, None)
+
+
class StorageServerDB:
def __init__(self, server, storage_id):
Modified: ZODB/trunk/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/ConnectionTests.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/tests/ConnectionTests.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -102,7 +102,8 @@
"""
self.__super_setUp()
logging.info("setUp() %s", self.id())
- self.file = tempfile.mktemp()
+ fd, self.file = tempfile.mkstemp()
+ os.close(fd)
self.addr = []
self._pids = []
self._servers = []
Copied: ZODB/trunk/src/ZEO/tests/IterationTests.py (from rev 90352, ZODB/branches/gocept-iteration/src/ZEO/tests/IterationTests.py)
===================================================================
--- ZODB/trunk/src/ZEO/tests/IterationTests.py (rev 0)
+++ ZODB/trunk/src/ZEO/tests/IterationTests.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -0,0 +1,101 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+#
+##############################################################################
+"""ZEO iterator protocol tests."""
+
+import transaction
+
+
+class IterationTests:
+
+ def checkIteratorGCProtocol(self):
+ # Test garbage collection on protocol level.
+ server = self._storage._server
+
+ iid = server.iterator_start(None, None)
+ # None signals the end of iteration.
+ self.assertEquals(None, server.iterator_next(iid))
+ # The server has disposed the iterator already.
+ self.assertRaises(KeyError, server.iterator_next, iid)
+
+ iid = server.iterator_start(None, None)
+ # This time, we tell the server to throw the iterator away.
+ server.iterator_gc([iid])
+ self.assertRaises(KeyError, server.iterator_next, iid)
+
+ def checkIteratorExhaustionStorage(self):
+ # Test the storage's garbage collection mechanism.
+ iterator = self._storage.iterator()
+ self.assertEquals(1, len(self._storage._iterator_ids))
+ iid = list(self._storage._iterator_ids)[0]
+ self.assertEquals([], list(iterator))
+ self.assertEquals(0, len(self._storage._iterator_ids))
+
+ # The iterator has run through, so the server has already disposed it.
+ self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
+
+ def checkIteratorGCSpanTransactions(self):
+ iterator = self._storage.iterator()
+ t = transaction.Transaction()
+ self._storage.tpc_begin(t)
+ self._storage.tpc_vote(t)
+ self._storage.tpc_finish(t)
+ self.assertEquals([], list(iterator))
+
+ def checkIteratorGCStorageCommitting(self):
+ # We want the iterator to be garbage-collected, so we don't keep any
+ # hard references to it. The storage tracks its ID, though.
+ self._storage.iterator()
+
+ self.assertEquals(1, len(self._storage._iterator_ids))
+ iid = list(self._storage._iterator_ids)[0]
+
+ # GC happens at the transaction boundary. After that, both the storage
+ # and the server have forgotten the iterator.
+ self._dostore()
+ self.assertEquals(0, len(self._storage._iterator_ids))
+ self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
+
+ def checkIteratorGCStorageTPCAborting(self):
+ self._storage.iterator()
+ iid = list(self._storage._iterator_ids)[0]
+ t = transaction.Transaction()
+ self._storage.tpc_begin(t)
+ self._storage.tpc_abort(t)
+ self.assertEquals(0, len(self._storage._iterator_ids))
+ self.assertRaises(KeyError, self._storage._server.iterator_next, iid)
+
+ def checkIteratorGCStorageDisconnect(self):
+ self._storage.iterator()
+ iid = list(self._storage._iterator_ids)[0]
+ t = transaction.Transaction()
+ self._storage.tpc_begin(t)
+ # Show that after disconnecting, the client side GCs the iterators
+ # as well. I'm calling this directly to avoid accidentally
+ # calling tpc_abort implicitly.
+ self._storage.notifyDisconnected()
+ self.assertEquals(0, len(self._storage._iterator_ids))
+
+ def checkIteratorParallel(self):
+ self._dostore()
+ self._dostore()
+ iter1 = self._storage.iterator()
+ iter2 = self._storage.iterator()
+ txn_info1 = iter1.next()
+ txn_info2 = iter2.next()
+ self.assertEquals(txn_info1.tid, txn_info2.tid)
+ txn_info1 = iter1.next()
+ txn_info2 = iter2.next()
+ self.assertEquals(txn_info1.tid, txn_info2.tid)
+ self.assertRaises(StopIteration, iter1.next)
+ self.assertRaises(StopIteration, iter2.next)
Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -39,7 +39,7 @@
from ZODB.tests import StorageTestBase, BasicStorage, \
TransactionalUndoStorage, \
PackableStorage, Synchronization, ConflictResolution, RevisionStorage, \
- MTStorage, ReadOnlyStorage
+ MTStorage, ReadOnlyStorage, IteratorStorage, RecoveryStorage
from ZODB.tests.testDemoStorage import DemoStorageWrappedBase
@@ -47,7 +47,8 @@
import ZEO.zrpc.connection
-from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
+from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests, \
+ IterationTests
from ZEO.tests.forker import get_port
import ZEO.tests.ConnectionTests
@@ -56,7 +57,6 @@
logger = logging.getLogger('ZEO.tests.testZEO')
-
class DummyDB:
def invalidate(self, *args):
pass
@@ -158,7 +158,7 @@
CommitLockTests.CommitLockVoteTests,
ThreadTests.ThreadTests,
# Locally defined (see above)
- MiscZEOTests
+ MiscZEOTests,
):
"""Combine tests from various origins in one class."""
@@ -196,6 +196,15 @@
for pid in self._pids:
os.waitpid(pid, 0)
+ def runTest(self):
+ try:
+ super(GenericTests, self).runTest()
+ except:
+ self._failed = True
+ raise
+ else:
+ self._failed = False
+
def open(self, read_only=0):
# Needed to support ReadOnlyStorage tests. Ought to be a
# cleaner way.
@@ -226,9 +235,75 @@
PackableStorage.PackableUndoStorage,
RevisionStorage.RevisionStorage,
TransactionalUndoStorage.TransactionalUndoStorage,
+ IteratorStorage.IteratorStorage,
+ IterationTests.IterationTests,
):
"""Extend GenericTests with tests that MappingStorage can't pass."""
+class FileStorageRecoveryTests(StorageTestBase.StorageTestBase,
+ RecoveryStorage.RecoveryStorage):
+
+ level = 2
+
+ def setUp(self):
+ self._storage = ZODB.FileStorage.FileStorage("Source.fs", create=True)
+ self._dst = ZODB.FileStorage.FileStorage("Dest.fs", create=True)
+
+ def getConfig(self):
+ filename = self.__fs_base = tempfile.mktemp()
+ return """\
+ <filestorage 1>
+ path %s
+ </filestorage>
+ """ % filename
+
+ def _new_storage(self):
+ port = get_port()
+ zconf = forker.ZEOConfig(('', port))
+ zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(),
+ zconf, port)
+ blob_cache_dir = tempfile.mkdtemp()
+
+ self._pids.append(pid)
+ self._servers.append(adminaddr)
+ self._conf_paths.append(path)
+ self.blob_cache_dirs.append(blob_cache_dir)
+
+ storage = ClientStorage(
+ zport, '1', cache_size=20000000,
+ min_disconnect_poll=0.5, wait=1,
+ wait_timeout=60, blob_dir=blob_cache_dir)
+ storage.registerDB(DummyDB())
+ return storage
+
+ def setUp(self):
+ self._pids = []
+ self._servers = []
+ self._conf_paths = []
+ self.blob_cache_dirs = []
+
+ self._storage = self._new_storage()
+ self._dst = self._new_storage()
+
+ def tearDown(self):
+ self._storage.close()
+ self._dst.close()
+
+ for p in self._conf_paths:
+ os.remove(p)
+ for p in self.blob_cache_dirs:
+ ZODB.blob.remove_committed_dir(p)
+ for server in self._servers:
+ forker.shutdown_zeo_server(server)
+ if hasattr(os, 'waitpid'):
+ # Not in Windows Python until 2.3
+ for pid in self._pids:
+ os.waitpid(pid, 0)
+
+ def new_dest(self):
+ return self._new_storage()
+
+
class FileStorageTests(FullGenericTests):
"""Test ZEO backed by a FileStorage."""
level = 2
@@ -241,12 +316,36 @@
</filestorage>
""" % filename
+ def checkInterfaceFromRemoteStorage(self):
+ # ClientStorage itself doesn't implement IStorageIteration, but the
+ # FileStorage on the other end does, and thus the ClientStorage
+ # instance that is connected to it reflects this.
+ self.failIf(ZODB.interfaces.IStorageIteration.implementedBy(
+ ZEO.ClientStorage.ClientStorage))
+ self.failUnless(ZODB.interfaces.IStorageIteration.providedBy(
+ self._storage))
+ # This is communicated using ClientStorage's _info object:
+ self.assertEquals((('ZODB.interfaces', 'IStorageIteration'),
+ ('zope.interface', 'Interface')),
+ self._storage._info['interfaces'])
+
+
class MappingStorageTests(GenericTests):
"""ZEO backed by a Mapping storage."""
def getConfig(self):
return """<mappingstorage 1/>"""
+ def checkSimpleIteration(self):
+ # The test base class IteratorStorage assumes that we keep undo data
+ # to construct our iterator, which we don't, so we disable this test.
+ pass
+
+ def checkUndoZombie(self):
+ # The test base class IteratorStorage assumes that we keep undo data
+ # to construct our iterator, which we don't, so we disable this test.
+ pass
+
class DemoStorageTests(
GenericTests,
):
@@ -260,6 +359,11 @@
</demostorage>
""" % tempfile.mktemp()
+ def checkUndoZombie(self):
+ # The test base class IteratorStorage assumes that we keep undo data
+ # to construct our iterator, which we don't, so we disable this test.
+ pass
+
class HeartbeatTests(ZEO.tests.ConnectionTests.CommonSetupTearDown):
"""Make sure a heartbeat is being sent and that it does no harm
@@ -554,7 +658,7 @@
self._storage.close()
-class BlobAdaptedFileStorageTests(GenericTests, CommonBlobTests):
+class BlobAdaptedFileStorageTests(FullGenericTests, CommonBlobTests):
"""ZEO backed by a BlobStorage-adapted FileStorage."""
def setUp(self):
@@ -645,7 +749,7 @@
check_data(filename)
-class BlobWritableCacheTests(GenericTests, CommonBlobTests):
+class BlobWritableCacheTests(FullGenericTests, CommonBlobTests):
def setUp(self):
self.blobdir = self.blob_cache_dir = tempfile.mkdtemp()
@@ -823,7 +927,7 @@
>>> st = StorageServerWrapper(sv, 'fs')
>>> s = st.server
-Now, if we ask fior the invalidations since the last committed
+Now, if we ask for the invalidations since the last committed
transaction, we'll get a result:
>>> tid, oids = s.getInvalidations(last[-1])
@@ -849,7 +953,8 @@
"""
-test_classes = [FileStorageTests, MappingStorageTests, DemoStorageTests,
+test_classes = [FileStorageTests, FileStorageRecoveryTests,
+ MappingStorageTests, DemoStorageTests,
BlobAdaptedFileStorageTests, BlobWritableCacheTests]
def test_suite():
Modified: ZODB/trunk/src/ZODB/BaseStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/BaseStorage.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/BaseStorage.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -21,14 +21,18 @@
import logging
from struct import pack as _structpack, unpack as _structunpack
+import zope.interface
+
from persistent.TimeStamp import TimeStamp
+import ZODB.interfaces
from ZODB import POSException
from ZODB.utils import z64, oid_repr
from ZODB.UndoLogCompatible import UndoLogCompatible
log = logging.getLogger("ZODB.BaseStorage")
+
class BaseStorage(UndoLogCompatible):
"""Base class that supports storage implementations.
@@ -188,6 +192,7 @@
ext = cPickle.dumps(ext, 1)
else:
ext = ""
+
self._ude = user, desc, ext
if tid is None:
@@ -338,7 +343,7 @@
if verbose:
print oid_repr(oid), r.version, len(r.data)
if restoring:
- dest.restore(oid, r.tid, r.data, '',
+ dest.restore(oid, r.tid, r.data, r.version,
r.data_txn, transaction)
else:
pre = preget(oid, None)
@@ -348,10 +353,36 @@
dest.tpc_vote(transaction)
dest.tpc_finish(transaction)
- fiter.close()
-class TransactionRecord:
+class TransactionRecord(object):
"""Abstract base class for iterator protocol"""
-class DataRecord:
+ zope.interface.implements(ZODB.interfaces.IStorageTransactionInformation)
+
+ def __init__(self, tid, status, user, description, extension):
+ self.tid = tid
+ self.status = status
+ self.user = user
+ self.description = description
+ self.extension = extension
+
+ # XXX This is a workaround to make the TransactionRecord compatible with a
+ # transaction object because it is passed to tpc_begin().
+ def _ext_set(self, value):
+ self.extension = value
+ def _ext_get(self):
+ return self.extension
+ _extension = property(fset=_ext_set, fget=_ext_get)
+
+
+class DataRecord(object):
"""Abstract base class for iterator protocol"""
+
+ zope.interface.implements(ZODB.interfaces.IStorageRecordInformation)
+
+ def __init__(self, oid, tid, data, version, prev):
+ self.oid = oid
+ self.tid = tid
+ self.data = data
+ self.version = version
+ self.data_txn = prev
Modified: ZODB/trunk/src/ZODB/DemoStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/DemoStorage.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/DemoStorage.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -80,15 +80,19 @@
"""
+import cPickle
import base64, time
+
+import ZODB.BaseStorage
+import ZODB.interfaces
+import zope.interface
from ZODB import POSException
from ZODB.utils import z64, oid_repr
-from ZODB.BaseStorage import BaseStorage
from persistent.TimeStamp import TimeStamp
-from cPickle import loads
from BTrees import OOBTree
-class DemoStorage(BaseStorage):
+
+class DemoStorage(ZODB.BaseStorage.BaseStorage):
"""Demo storage
Demo storages provide useful storages for writing tests because
@@ -104,9 +108,10 @@
"""
+ zope.interface.implements(ZODB.interfaces.IStorageIteration)
def __init__(self, name='Demo Storage', base=None, quota=None):
- BaseStorage.__init__(self, name, base)
+ ZODB.BaseStorage.BaseStorage.__init__(self, name, base)
# We use a BTree because the items are sorted!
self._data = OOBTree.OOBTree()
@@ -133,7 +138,7 @@
# by the base storage, leading to a variety of "impossible" problems.
def new_oid(self):
if self._base is None:
- return BaseStorage.new_oid(self)
+ return ZODB.BaseStorage.BaseStorage.new_oid(self)
else:
return self._base.new_oid()
@@ -317,6 +322,10 @@
self._tsize = self._size + 120 + len(u) + len(d) + len(e)
def _finish(self, tid, user, desc, ext):
+ if not self._tindex:
+ # No data, so we don't update anything.
+ return
+
self._size = self._tsize
self._data[tid] = None, user, desc, ext, tuple(self._tindex)
@@ -364,7 +373,7 @@
'time': TimeStamp(tid).timeTime(),
'user_name': u, 'description': d}
if e:
- d.update(loads(e))
+ d.update(cPickle.loads(e))
if filter is None or filter(d):
if i >= first:
r.append(d)
@@ -569,3 +578,27 @@
def close(self):
if self._base is not None:
self._base.close()
+
+ def iterator(self, start=None, end=None):
+ # First iterate over the base storage
+ if self._base is not None:
+ for transaction in self._base.iterator(start, end):
+ yield transaction
+ # Then iterate over our local transactions
+ for tid, transaction in self._data.items():
+ if tid >= start and tid <= end:
+ yield TransactionRecord(tid, transaction)
+
+
+class TransactionRecord(ZODB.BaseStorage.TransactionRecord):
+
+ def __init__(self, tid, transaction):
+ packed, user, description, extension, records = transaction
+ super(TransactionRecord, self).__init__(
+ tid, packed, user, description, extension)
+ self.records = transaction
+
+ def __iter__(self):
+ for record in self.records:
+ oid, prev, version, data, tid = record
+ yield ZODB.BaseStorage.DataRecord(oid, tid, data, version, prev)
Modified: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/FileStorage.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/FileStorage/FileStorage.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -29,6 +29,8 @@
# Not all platforms have fsync
fsync = getattr(os, "fsync", None)
+import zope.interface
+import ZODB.interfaces
from ZODB import BaseStorage, ConflictResolution, POSException
from ZODB.POSException import UndoError, POSKeyError, MultipleUndoErrors
from persistent.TimeStamp import TimeStamp
@@ -88,6 +90,8 @@
ConflictResolution.ConflictResolvingStorage,
FileStorageFormatter):
+ zope.interface.implements(ZODB.interfaces.IStorageIteration)
+
# Set True while a pack is in progress; undo is blocked for the duration.
_pack_is_in_progress = False
@@ -1127,7 +1131,7 @@
seek(0)
return [(trans.tid, [(r.oid, '') for r in trans])
- for trans in FileIterator(self._file, pos=pos)]
+ for trans in FileIterator(self._file_name, pos=pos)]
finally:
self._lock_release()
@@ -1516,31 +1520,16 @@
file.seek(pos)
file.truncate()
-class Iterator:
- """A General simple iterator that uses the Python for-loop index protocol
- """
- __index=-1
- __current=None
- def __getitem__(self, i):
- __index=self.__index
- while i > __index:
- __index=__index+1
- self.__current=self.next(__index)
-
- self.__index=__index
- return self.__current
-
-
-class FileIterator(Iterator, FileStorageFormatter):
+class FileIterator(FileStorageFormatter):
"""Iterate over the transactions in a FileStorage file.
"""
_ltid = z64
_file = None
- def __init__(self, file, start=None, stop=None, pos=4L):
- if isinstance(file, str):
- file = open(file, 'rb')
+ def __init__(self, filename, start=None, stop=None, pos=4L):
+ assert isinstance(filename, str)
+ file = open(filename, 'rb')
self._file = file
if file.read(4) != packed_version:
raise FileStorageFormatError(file.name)
@@ -1602,14 +1591,17 @@
panic("%s has inconsistent transaction length at %s "
"(%s != %s)", file.name, pos, u64(rtl), u64(stl))
- def next(self, index=0):
+ # Iterator protocol
+ def __iter__(self):
+ return self
+
+ def next(self):
if self._file is None:
- # A closed iterator. Is IOError the best we can do? For
- # now, mimic a read on a closed file.
- raise IOError('iterator is closed')
+ raise ZODB.interfaces.StorageStopIteration()
pos = self._pos
- while 1:
+ while True:
+
# Read the transaction record
try:
h = self._read_txn_header(pos)
@@ -1625,11 +1617,11 @@
self._ltid = h.tid
if self._stop is not None and h.tid > self._stop:
- raise IndexError(index)
+ break
if h.status == "c":
# Assume we've hit the last, in-progress transaction
- raise IndexError(index)
+ break
if pos + h.tlen + 8 > self._file_size:
# Hm, the data were truncated or the checkpoint flag wasn't
@@ -1679,8 +1671,8 @@
except:
pass
- result = RecordIterator(h.tid, h.status, h.user, h.descr,
- e, pos, tend, self._file, tpos)
+ result = TransactionRecord(h.tid, h.status, h.user, h.descr,
+ e, pos, tend, self._file, tpos)
# Read the (intentionally redundant) transaction length
self._file.seek(tend)
@@ -1693,23 +1685,25 @@
return result
- raise IndexError(index)
+ self.close()
+ raise ZODB.interfaces.StorageStopIteration()
-class RecordIterator(Iterator, BaseStorage.TransactionRecord,
- FileStorageFormatter):
+
+class TransactionRecord(BaseStorage.TransactionRecord, FileStorageFormatter):
"""Iterate over the transactions in a FileStorage file."""
+
def __init__(self, tid, status, user, desc, ext, pos, tend, file, tpos):
- self.tid = tid
- self.status = status
- self.user = user
- self.description = desc
- self._extension = ext
+ BaseStorage.TransactionRecord.__init__(
+ self, tid, status, user, desc, ext)
self._pos = pos
self._tend = tend
self._file = file
self._tpos = tpos
- def next(self, index=0):
+ def __iter__(self):
+ return self
+
+ def next(self):
pos = self._pos
while pos < self._tend:
# Read the data records for this transaction
@@ -1738,20 +1732,18 @@
# Should it go to the original data like BDBFullStorage?
prev_txn = self.getTxnFromData(h.oid, h.back)
- r = Record(h.oid, h.tid, data, prev_txn, pos)
- return r
+ return Record(h.oid, h.tid, data, prev_txn, pos)
- raise IndexError(index)
+ raise ZODB.interfaces.StorageStopIteration()
+
class Record(BaseStorage.DataRecord):
- """An abstract database record."""
+
def __init__(self, oid, tid, data, prev, pos):
- self.oid = oid
- self.tid = tid
- self.data = data
- self.data_txn = prev
+ super(Record, self).__init__(oid, tid, data, '', prev)
self.pos = pos
+
class UndoSearch:
def __init__(self, file, pos, first, last, filter=None):
Modified: ZODB/trunk/src/ZODB/FileStorage/__init__.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/__init__.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/FileStorage/__init__.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -1,4 +1,8 @@
# this is a package
-from ZODB.FileStorage.FileStorage import FileStorage, RecordIterator
+from ZODB.FileStorage.FileStorage import FileStorage, TransactionRecord
from ZODB.FileStorage.FileStorage import FileIterator, Record, packed_version
+
+
+# BBB Alias for compatibility
+RecordIterator = TransactionRecord
Modified: ZODB/trunk/src/ZODB/MappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/MappingStorage.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/MappingStorage.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -21,16 +21,16 @@
The Mapping storage uses a single data structure to map object ids to data.
"""
+import ZODB.BaseStorage
from ZODB.utils import u64, z64
-from ZODB.BaseStorage import BaseStorage
from ZODB import POSException
from persistent.TimeStamp import TimeStamp
-class MappingStorage(BaseStorage):
+class MappingStorage(ZODB.BaseStorage.BaseStorage):
def __init__(self, name='Mapping Storage'):
- BaseStorage.__init__(self, name)
+ ZODB.BaseStorage.BaseStorage.__init__(self, name)
# ._index maps an oid to a string s. s[:8] is the tid of the
# transaction that created oid's current state, and s[8:] is oid's
# current state.
Modified: ZODB/trunk/src/ZODB/blob.py
===================================================================
--- ZODB/trunk/src/ZODB/blob.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/blob.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -324,6 +324,21 @@
"""
return os.path.join(self.base_dir, utils.oid_repr(oid))
+ def createPathForOID(self, oid):
+ """Given an OID, creates a directory on the filesystem where
+ the blob data relating to that OID is stored, if it doesn't exist.
+
+ """
+ path = self.getPathForOID(oid)
+ if os.path.exists(path):
+ return
+ try:
+ os.makedirs(path, 0700)
+ except OSError:
+ # We might have lost a race. If so, the directory
+ # must exist now
+ assert os.path.exists(path)
+
def getBlobFilename(self, oid, tid):
"""Given an oid and a tid, return the full filename of the
'committed' blob file related to that oid and tid.
Modified: ZODB/trunk/src/ZODB/fsrecover.py
===================================================================
--- ZODB/trunk/src/ZODB/fsrecover.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/fsrecover.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -82,7 +82,7 @@
import ZODB.FileStorage
from ZODB.utils import u64
-from ZODB.FileStorage import RecordIterator
+from ZODB.FileStorage import TransactionRecord
from persistent.TimeStamp import TimeStamp
@@ -146,8 +146,8 @@
except: e={}
else: e={}
- result = RecordIterator(tid, status, user, description, e, pos, tend,
- f, tpos)
+ result = TransactionRecord(tid, status, user, description, e, pos, tend,
+ f, tpos)
pos = tend
# Read the (intentionally redundant) transaction length
Modified: ZODB/trunk/src/ZODB/interfaces.py
===================================================================
--- ZODB/trunk/src/ZODB/interfaces.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/interfaces.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -286,6 +286,7 @@
begins or until the connection os reopned.
"""
+
class IStorageDB(Interface):
"""Database interface exposed to storages
@@ -418,6 +419,7 @@
should also close all the Connections.
"""
+
class IStorage(Interface):
"""A storage is responsible for storing and retrieving data of objects.
"""
@@ -710,6 +712,7 @@
"""
+
class IStorageRestoreable(IStorage):
"""Copying Transactions
@@ -744,7 +747,7 @@
# failed to take into account records after the pack time.
- def restore(oid, serial, data, prev_txn, transaction):
+ def restore(oid, serial, data, version, prev_txn, transaction):
"""Write data already committed in a separate database
The restore method is used when copying data from one database
@@ -775,41 +778,44 @@
Nothing is returned.
"""
+
class IStorageRecordInformation(Interface):
"""Provide information about a single storage record
"""
oid = Attribute("The object id")
+ tid = Attribute("The transaction id")
data = Attribute("The data record")
+ version = Attribute("The version id")
+ data_txn = Attribute("The previous transaction id")
+
class IStorageTransactionInformation(Interface):
- """Provide information about a storage transaction
+ """Provide information about a storage transaction.
+
+ Can be iterated over to retrieve the records modified in the transaction.
+
"""
tid = Attribute("Transaction id")
status = Attribute("Transaction Status") # XXX what are valid values?
user = Attribute("Transaction user")
description = Attribute("Transaction Description")
- extension = Attribute("Transaction extension data")
+ extension = Attribute("A dictionary carrying the transaction's extension data")
def __iter__():
- """Return an iterable of IStorageRecordInformation
+ """Iterate over the transaction's records given as
+ IStorageRecordInformation objects.
+
"""
+
class IStorageIteration(Interface):
- """API for iterating over the contents of a storage
+ """API for iterating over the contents of a storage."""
- Note that this is a future API. Some storages now provide an
- approximation of this.
-
- """
-
def iterator(start=None, stop=None):
"""Return an IStorageTransactionInformation iterator.
- An IStorageTransactionInformation iterator is returned for
- iterating over the transactions in the storage.
-
If the start argument is not None, then iteration will start
with the first transaction whose identifier is greater than or
equal to start.
@@ -818,8 +824,12 @@
the last transaction whose identifier is less than or equal to
stop.
+ The iterator provides access to the data as available at the time when
+ the iterator was retrieved.
+
"""
+
class IStorageUndoable(IStorage):
"""A storage supporting transactional undo.
"""
@@ -932,6 +942,7 @@
"""
+
class IBlob(Interface):
"""A BLOB supports efficient handling of large data within ZODB."""
@@ -986,5 +997,12 @@
If Blobs use this, then commits can be performed with a simple rename.
"""
+
class BlobError(Exception):
pass
+
+
+class StorageStopIteration(IndexError, StopIteration):
+ """A combination of StopIteration and IndexError to provide a
+ backwards-compatible exception.
+ """
Modified: ZODB/trunk/src/ZODB/tests/IteratorStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/IteratorStorage.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/IteratorStorage.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -15,6 +15,7 @@
Any storage that supports the iterator() method should be able to pass
all these tests.
+
"""
from ZODB.tests.MinPO import MinPO
@@ -23,13 +24,16 @@
from transaction import Transaction
+import itertools
+
+
class IteratorCompare:
def iter_verify(self, txniter, revids, val0):
eq = self.assertEqual
oid = self._oid
val = val0
- for reciter, revid in zip(txniter, revids + [None]):
+ for reciter, revid in itertools.izip(txniter, revids + [None]):
eq(reciter.tid, revid)
for rec in reciter:
eq(rec.oid, oid)
@@ -37,8 +41,8 @@
eq(zodb_unpickle(rec.data), MinPO(val))
val = val + 1
eq(val, val0 + len(revids))
- txniter.close()
+
class IteratorStorage(IteratorCompare):
def checkSimpleIteration(self):
@@ -51,13 +55,6 @@
txniter = self._storage.iterator()
self.iter_verify(txniter, [revid1, revid2, revid3], 11)
- def checkClose(self):
- self._oid = oid = self._storage.new_oid()
- revid1 = self._dostore(oid, data=MinPO(11))
- txniter = self._storage.iterator()
- txniter.close()
- self.assertRaises(IOError, txniter.__getitem__, 0)
-
def checkUndoZombie(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(94))
@@ -89,7 +86,7 @@
iter = self._storage.iterator()
count = 0
for txn in iter:
- self.assertEqual(txn._extension, {})
+ self.assertEqual(txn.extension, {})
count +=1
self.assertEqual(count, 1)
@@ -129,9 +126,34 @@
match = True
if not match:
self.fail("Could not find transaction with matching id")
-
+ def checkIterateRepeatedly(self):
+ self._dostore()
+ transactions = self._storage.iterator()
+ self.assertEquals(1, len(list(transactions)))
+ # The iterator can only be consumed once:
+ self.assertEquals(0, len(list(transactions)))
+ def checkIterateRecordsRepeatedly(self):
+ self._dostore()
+ tinfo = self._storage.iterator().next()
+ self.assertEquals(1, len(list(tinfo)))
+ # The iterator can only be consumed once:
+ self.assertEquals(0, len(list(tinfo)))
+
+ def checkIterateWhileWriting(self):
+ self._dostore()
+ iterator = self._storage.iterator()
+ # We have one transaction with 1 modified object.
+ txn_1 = iterator.next()
+ self.assertEquals(1, len(list(txn_1)))
+
+ # We store another transaction with 1 object, the already running
+ # iterator does not pick this up.
+ self._dostore()
+ self.assertRaises(StopIteration, iterator.next)
+
+
class ExtendedIteratorStorage(IteratorCompare):
def checkExtendedIteration(self):
@@ -173,28 +195,36 @@
txniter = self._storage.iterator(revid3, revid3)
self.iter_verify(txniter, [revid3], 13)
+
class IteratorDeepCompare:
+
def compare(self, storage1, storage2):
eq = self.assertEqual
iter1 = storage1.iterator()
iter2 = storage2.iterator()
- for txn1, txn2 in zip(iter1, iter2):
+ for txn1, txn2 in itertools.izip(iter1, iter2):
eq(txn1.tid, txn2.tid)
eq(txn1.status, txn2.status)
eq(txn1.user, txn2.user)
eq(txn1.description, txn2.description)
- eq(txn1._extension, txn2._extension)
- for rec1, rec2 in zip(txn1, txn2):
+ eq(txn1.extension, txn2.extension)
+ itxn1 = iter(txn1)
+ itxn2 = iter(txn2)
+ for rec1, rec2 in itertools.izip(itxn1, itxn2):
eq(rec1.oid, rec2.oid)
eq(rec1.tid, rec2.tid)
eq(rec1.data, rec2.data)
# Make sure there are no more records left in rec1 and rec2,
# meaning they were the same length.
- self.assertRaises(IndexError, txn1.next)
- self.assertRaises(IndexError, txn2.next)
+ # Additionally, check that we're backwards compatible to the
+ # IndexError we used to raise before.
+ self.assertRaises(IndexError, itxn1.next)
+ self.assertRaises(IndexError, itxn2.next)
+ self.assertRaises(StopIteration, itxn1.next)
+ self.assertRaises(StopIteration, itxn2.next)
# Make sure ther are no more records left in txn1 and txn2, meaning
# they were the same length
self.assertRaises(IndexError, iter1.next)
self.assertRaises(IndexError, iter2.next)
- iter1.close()
- iter2.close()
+ self.assertRaises(StopIteration, iter1.next)
+ self.assertRaises(StopIteration, iter2.next)
Modified: ZODB/trunk/src/ZODB/tests/PackableStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/PackableStorage.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/PackableStorage.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -30,6 +30,7 @@
from persistent import Persistent
from persistent.mapping import PersistentMapping
import transaction
+import ZODB.interfaces
from ZODB import DB
from ZODB.serialize import referencesf
from ZODB.tests.MinPO import MinPO
@@ -149,6 +150,16 @@
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
+ def _sanity_check(self):
+ # Iterate over the storage to make sure it's sane.
+ if not ZODB.interfaces.IStorageIteration.providedBy(self._storage):
+ return
+ it = self._storage.iterator()
+ for txn in it:
+ for data in txn:
+ pass
+
+
class PackableStorage(PackableStorageBase):
def checkPackEmptyStorage(self):
@@ -253,17 +264,8 @@
self.fail('a thread is still alive')
- # Iterate over the storage to make sure it's sane, but not every
- # storage supports iterators.
- if not hasattr(self._storage, "iterator"):
- return
+ self._sanity_check()
- it = self._storage.iterator()
- for txn in it:
- for data in txn:
- pass
- it.close()
-
def checkPackWhileWriting(self):
self._PackWhileWriting(pack_now=False)
@@ -304,14 +306,7 @@
packt = time.time()
thread.join()
- # Iterate over the storage to make sure it's sane.
- if not hasattr(self._storage, "iterator"):
- return
- it = self._storage.iterator()
- for txn in it:
- for data in txn:
- pass
- it.close()
+ self._sanity_check()
def checkPackWithMultiDatabaseReferences(self):
databases = {}
Modified: ZODB/trunk/src/ZODB/tests/RecoveryStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/RecoveryStorage.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/RecoveryStorage.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -22,6 +22,7 @@
import time
+
class RecoveryStorage(IteratorDeepCompare):
# Requires a setUp() that creates a self._dst destination storage
def checkSimpleRecovery(self):
@@ -49,12 +50,16 @@
# copy the final transaction manually. even though there
# was a pack, the restore() ought to succeed.
it = self._storage.iterator()
- final = list(it)[-1]
+ # Get the last transaction and its record iterator. Record iterators
+ # can't be accessed out-of-order, so we need to do this in a bit
+ # complicated way:
+ for final in it:
+ records = list(final)
+
self._dst.tpc_begin(final, final.tid, final.status)
- for r in final:
+ for r in records:
self._dst.restore(r.oid, r.tid, r.data, '', r.data_txn,
final)
- it.close()
self._dst.tpc_vote(final)
self._dst.tpc_finish(final)
Modified: ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -636,11 +636,13 @@
for j in range(OBJECTS):
oid = s.new_oid()
obj = MinPO(i * OBJECTS + j)
- revid = s.store(oid, None, zodb_pickle(obj), '', t)
- orig.append((tid, oid, revid))
+ s.store(oid, None, zodb_pickle(obj), '', t)
+ orig.append((tid, oid))
s.tpc_vote(t)
s.tpc_finish(t)
+ orig = [(tid, oid, s.getTid(oid)) for tid, oid in orig]
+
i = 0
for tid, oid, revid in orig:
self._dostore(oid, revid=revid, data=MinPO(revid),
@@ -668,14 +670,11 @@
# OBJECTS * BATCHES modifications, followed by
# BATCHES undos
- iter = s.iterator()
- offset = 0
-
+ transactions = s.iterator()
eq = self.assertEqual
for i in range(BATCHES):
- txn = iter[offset]
- offset += 1
+ txn = transactions.next()
tid = p64(i + 1)
eq(txn.tid, tid)
@@ -687,13 +686,11 @@
eq(L1, L2)
for i in range(BATCHES * OBJECTS):
- txn = iter[offset]
- offset += 1
+ txn = transactions.next()
eq(len([rec for rec in txn if rec.data_txn is None]), 1)
for i in range(BATCHES):
- txn = iter[offset]
- offset += 1
+ txn = transactions.next()
# The undos are performed in reverse order.
otid = p64(BATCHES - i)
@@ -704,7 +701,7 @@
L2.sort()
eq(L1, L2)
- self.assertRaises(IndexError, iter.__getitem__, offset)
+ self.assertRaises(StopIteration, transactions.next)
def checkUndoLogMetadata(self):
# test that the metadata is correct in the undo log
Modified: ZODB/trunk/src/ZODB/tests/testFileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testFileStorage.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/testFileStorage.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -308,6 +308,7 @@
else:
self.assertNotEqual(next_oid, None)
+
class FileStorageRecoveryTest(
StorageTestBase.StorageTestBase,
RecoveryStorage.RecoveryStorage,
@@ -326,6 +327,40 @@
def new_dest(self):
return ZODB.FileStorage.FileStorage('Dest.fs')
+
+class FileStorageNoRestore(ZODB.FileStorage.FileStorage):
+
+ @property
+ def restore(self):
+ raise Exception
+
+
+class FileStorageNoRestoreRecoveryTest(
+ StorageTestBase.StorageTestBase,
+ RecoveryStorage.RecoveryStorage,
+ ):
+ # This test actually verifies a code path of
+ # BaseStorage.copyTransactionsFrom. For simplicity of implementation, we
+ # use a FileStorage deprived of its restore method.
+
+ def setUp(self):
+ self._storage = FileStorageNoRestore("Source.fs", create=True)
+ self._dst = FileStorageNoRestore("Dest.fs", create=True)
+
+ def tearDown(self):
+ self._storage.close()
+ self._dst.close()
+ self._storage.cleanup()
+ self._dst.cleanup()
+
+ def new_dest(self):
+ return FileStorageNoRestore('Dest.fs')
+
+ def checkRestoreAcrossPack(self):
+ # Skip this check as it calls restore directly.
+ pass
+
+
class SlowFileStorageTest(BaseFileStorageTests):
level = 2
@@ -492,7 +527,8 @@
suite = unittest.TestSuite()
for klass in [FileStorageTests, Corruption.FileStorageCorruptTests,
- FileStorageRecoveryTest, SlowFileStorageTest]:
+ FileStorageRecoveryTest, FileStorageNoRestoreRecoveryTest,
+ SlowFileStorageTest]:
suite.addTest(unittest.makeSuite(klass, "check"))
suite.addTest(doctest.DocTestSuite(setUp=ZODB.tests.util.setUp,
tearDown=ZODB.tests.util.tearDown))
Modified: ZODB/trunk/src/ZODB/tests/testMappingStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testMappingStorage.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/tests/testMappingStorage.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -37,6 +37,7 @@
# have this limit, so we inhibit this test here.
pass
+
def test_suite():
suite = unittest.makeSuite(MappingStorageTests, 'check')
return suite
Modified: ZODB/trunk/src/ZODB/utils.py
===================================================================
--- ZODB/trunk/src/ZODB/utils.py 2008-08-30 13:16:43 UTC (rev 90615)
+++ ZODB/trunk/src/ZODB/utils.py 2008-08-30 13:21:49 UTC (rev 90616)
@@ -295,5 +295,3 @@
handle, filename = mkstemp(dir=dir)
os.close(handle)
return filename
-
-
More information about the Zodb-checkins
mailing list