[Zodb-checkins] SVN: ZODB/branches/gocept-iteration/s Implement
copyTransactionsFrom() and restore() for ClientStorages. Remove
Christian Theune
ct at gocept.com
Tue Feb 26 01:31:51 EST 2008
Log message for revision 84254:
Implement copyTransactionsFrom() and restore() for ClientStorages. Remove
ThreadedAsync reference.
Changed:
U ZODB/branches/gocept-iteration/setup.py
U ZODB/branches/gocept-iteration/src/ZEO/ClientStorage.py
U ZODB/branches/gocept-iteration/src/ZEO/CommitLog.py
U ZODB/branches/gocept-iteration/src/ZEO/ServerStub.py
U ZODB/branches/gocept-iteration/src/ZEO/StorageServer.py
U ZODB/branches/gocept-iteration/src/ZEO/tests/testZEO.py
U ZODB/branches/gocept-iteration/src/ZODB/BaseStorage.py
U ZODB/branches/gocept-iteration/src/ZODB/interfaces.py
U ZODB/branches/gocept-iteration/src/ZODB/tests/IteratorStorage.py
-=-
Modified: ZODB/branches/gocept-iteration/setup.py
===================================================================
--- ZODB/branches/gocept-iteration/setup.py 2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/setup.py 2008-02-26 06:31:49 UTC (rev 84254)
@@ -149,7 +149,6 @@
"ZODB", "ZODB.FileStorage", "ZODB.tests",
"ZODB.scripts",
"persistent", "persistent.tests",
- "ThreadedAsync",
"ZopeUndo", "ZopeUndo.tests",
]
Modified: ZODB/branches/gocept-iteration/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/ClientStorage.py 2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZEO/ClientStorage.py 2008-02-26 06:31:49 UTC (rev 84254)
@@ -1174,6 +1174,25 @@
return []
return self._server.undoLog(first, last)
+ # Recovery support
+
+ def copyTransactionsFrom(self, other, verbose=0):
+ """Copy transactions from another storage.
+
+ This is typically used for converting data from one storage to
+ another. `other` must have an .iterator() method.
+ """
+ ZODB.BaseStorage.copy(other, self, verbose)
+
+ def restore(self, oid, serial, data, version, prev_txn, transaction):
+ """Write data already committed in a separate database."""
+ assert not version
+ self._check_trans(transaction)
+ self._server.restorea(oid, serial, data, prev_txn, id(transaction))
+ # XXX I'm not updating the transaction buffer here because I can't
+ # exactly predict how invalidation should work with restore. :/
+ return self._check_serials()
+
# Below are methods invoked by the StorageServer
def serialnos(self, args):
Modified: ZODB/branches/gocept-iteration/src/ZEO/CommitLog.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/CommitLog.py 2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZEO/CommitLog.py 2008-02-26 06:31:49 UTC (rev 84254)
@@ -35,9 +35,13 @@
return self.file.tell()
def store(self, oid, serial, data):
- self.pickler.dump((oid, serial, data))
+ self.pickler.dump(('store', oid, serial, data))
self.stores += 1
+ def restore(self, oid, serial, data, prev_txn):
+ self.pickler.dump(('restore', oid, serial, data, prev_txn))
+ self.stores += 1
+
def get_loader(self):
self.read = 1
self.file.seek(0)
Modified: ZODB/branches/gocept-iteration/src/ZEO/ServerStub.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/ServerStub.py 2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZEO/ServerStub.py 2008-02-26 06:31:49 UTC (rev 84254)
@@ -209,6 +209,10 @@
def storea(self, oid, serial, data, id):
self.rpc.callAsync('storea', oid, serial, data, '', id)
+ def restorea(self, oid, serial, data, prev_txn, id):
+ self.rpc.callAsync('restorea', oid, serial, data, prev_txn, id)
+
+
def storeBlob(self, oid, serial, data, blobfilename, txn):
# Store a blob to the server. We don't want to real all of
Modified: ZODB/branches/gocept-iteration/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/StorageServer.py 2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZEO/StorageServer.py 2008-02-26 06:31:49 UTC (rev 84254)
@@ -479,6 +479,11 @@
self.stats.stores += 1
self.txnlog.store(oid, serial, data)
+ def restorea(self, oid, serial, data, prev_txn, id):
+ self._check_tid(id, exc=StorageTransactionError)
+ self.stats.stores += 1
+ self.txnlog.restore(oid, serial, data, prev_txn)
+
def storeBlobStart(self):
assert self.blob_tempfile is None
self.blob_tempfile = tempfile.mkstemp(
@@ -577,6 +582,33 @@
return err is None
+ def _restore(self, oid, serial, data, prev_txn):
+ err = None
+ try:
+ self.storage.restore(oid, serial, data, '', prev_txn, self.transaction)
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except Exception, err:
+ self.store_failed = 1
+ if not isinstance(err, TransactionError):
+ # Unexpected errors are logged and passed to the client
+ self.log("store error: %s, %s" % sys.exc_info()[:2],
+ logging.ERROR, exc_info=True)
+ # Try to pickle the exception. If it can't be pickled,
+ # the RPC response would fail, so use something else.
+ pickler = cPickle.Pickler()
+ pickler.fast = 1
+ try:
+ pickler.dump(err, 1)
+ except:
+ msg = "Couldn't pickle storage exception: %s" % repr(err)
+ self.log(msg, logging.ERROR)
+ err = StorageServerError(msg)
+ # The exception is reported back as newserial for this oid
+ self.serials.append((oid, err))
+
+ return err is None
+
def _vote(self):
if not self.store_failed:
# Only call tpc_vote of no store call failed, otherwise
@@ -629,8 +661,16 @@
self._tpc_begin(self.transaction, self.tid, self.status)
loads, loader = self.txnlog.get_loader()
for i in range(loads):
- # load oid, serial, data, version
- if not self._store(*loader.load()):
+ store = loader.load()
+ store_type = store[0]
+ store_args = store[1:]
+
+ if store_type == 'store':
+ do_store = self._store
+ elif store_type == 'restore':
+ do_store = self._restore
+
+ if not do_store(*store_args):
break
# Blob support
Modified: ZODB/branches/gocept-iteration/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZEO/tests/testZEO.py 2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZEO/tests/testZEO.py 2008-02-26 06:31:49 UTC (rev 84254)
@@ -41,7 +41,7 @@
from ZODB.tests import StorageTestBase, BasicStorage, \
TransactionalUndoStorage, \
PackableStorage, Synchronization, ConflictResolution, RevisionStorage, \
- MTStorage, ReadOnlyStorage, IteratorStorage
+ MTStorage, ReadOnlyStorage, IteratorStorage, RecoveryStorage
from ZODB.tests.testDemoStorage import DemoStorageWrappedBase
@@ -254,6 +254,70 @@
):
"""Extend GenericTests with tests that MappingStorage can't pass."""
+class FileStorageRecoveryTests(StorageTestBase.StorageTestBase,
+ RecoveryStorage.RecoveryStorage):
+
+ level = 2
+
+ def setUp(self):
+ self._storage = ZODB.FileStorage.FileStorage("Source.fs", create=True)
+ self._dst = ZODB.FileStorage.FileStorage("Dest.fs", create=True)
+
+ def getConfig(self):
+ filename = self.__fs_base = tempfile.mktemp()
+ return """\
+ <filestorage 1>
+ path %s
+ </filestorage>
+ """ % filename
+
+ def _new_storage(self):
+ port = get_port()
+ zconf = forker.ZEOConfig(('', port))
+ zport, adminaddr, pid, path = forker.start_zeo_server(self.getConfig(),
+ zconf, port)
+ blob_cache_dir = tempfile.mkdtemp()
+
+ self._pids.append(pid)
+ self._servers.append(adminaddr)
+ self._conf_paths.append(path)
+ self.blob_cache_dirs.append(blob_cache_dir)
+
+ storage = ClientStorage(
+ zport, '1', cache_size=20000000,
+ min_disconnect_poll=0.5, wait=1,
+ wait_timeout=60, blob_dir=blob_cache_dir)
+ storage.registerDB(DummyDB())
+ return storage
+
+ def setUp(self):
+ self._pids = []
+ self._servers = []
+ self._conf_paths = []
+ self.blob_cache_dirs = []
+
+ self._storage = self._new_storage()
+ self._dst = self._new_storage()
+
+ def tearDown(self):
+ self._storage.close()
+ self._dst.close()
+
+ for p in self._conf_paths:
+ os.remove(p)
+ for p in self.blob_cache_dirs:
+ ZODB.blob.remove_committed_dir(p)
+ for server in self._servers:
+ forker.shutdown_zeo_server(server)
+ if hasattr(os, 'waitpid'):
+ # Not in Windows Python until 2.3
+ for pid in self._pids:
+ os.waitpid(pid, 0)
+
+ def new_dest(self):
+ return self._new_storage()
+
+
class FileStorageTests(FullGenericTests):
"""Test ZEO backed by a FileStorage."""
level = 2
@@ -889,7 +953,8 @@
"""
-test_classes = [FileStorageTests, MappingStorageTests, DemoStorageTests,
+test_classes = [FileStorageTests, FileStorageRecoveryTests,
+ MappingStorageTests, DemoStorageTests,
BlobAdaptedFileStorageTests, BlobWritableCacheTests]
def test_suite():
Modified: ZODB/branches/gocept-iteration/src/ZODB/BaseStorage.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZODB/BaseStorage.py 2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZODB/BaseStorage.py 2008-02-26 06:31:49 UTC (rev 84254)
@@ -339,6 +339,7 @@
if verbose:
print oid_repr(oid), r.version, len(r.data)
if restoring:
+ print r.data_txn
dest.restore(oid, r.tid, r.data, r.version,
r.data_txn, transaction)
else:
@@ -349,7 +350,10 @@
dest.tpc_vote(transaction)
dest.tpc_finish(transaction)
- fiter.close()
+ if hasattr(fiter, 'close'):
+ # XXX close is not part of the iterator interface but FileStorage's
+ # iterator has this method to get rid of a file handle.
+ fiter.close()
class TransactionRecord(object):
Modified: ZODB/branches/gocept-iteration/src/ZODB/interfaces.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZODB/interfaces.py 2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZODB/interfaces.py 2008-02-26 06:31:49 UTC (rev 84254)
@@ -744,7 +744,7 @@
# failed to take into account records after the pack time.
- def restore(oid, serial, data, prev_txn, transaction):
+ def restore(oid, serial, data, version, prev_txn, transaction):
"""Write data already committed in a separate database
The restore method is used when copying data from one database
Modified: ZODB/branches/gocept-iteration/src/ZODB/tests/IteratorStorage.py
===================================================================
--- ZODB/branches/gocept-iteration/src/ZODB/tests/IteratorStorage.py 2008-02-26 02:23:14 UTC (rev 84253)
+++ ZODB/branches/gocept-iteration/src/ZODB/tests/IteratorStorage.py 2008-02-26 06:31:49 UTC (rev 84254)
@@ -225,5 +225,7 @@
# they were the same length
self.assertRaises(StopIteration, iter1.next)
self.assertRaises(StopIteration, iter2.next)
- iter1.close()
- iter2.close()
+ if hasattr(iter1, 'close'):
+ iter1.close()
+ if hasattr(iter2, 'close'):
+ iter2.close()
More information about the Zodb-checkins
mailing list