[Zodb-checkins] SVN: ZODB/trunk/src/ Refactored storage server to support multiple client threads.
Jim Fulton
jim at zope.com
Sun Jan 31 14:59:54 EST 2010
Log message for revision 108679:
Refactored storage server to support multiple client threads.
Changed ZEO undo protocol. (Undo is disabled with older clients.)
Now use one-way undoa. Undone oids are now returned by (tpc_)vote for
ZEO. Undo no-longer gets commit lock.
Changed:
U ZODB/trunk/src/CHANGES.txt
U ZODB/trunk/src/ZEO/ClientStorage.py
U ZODB/trunk/src/ZEO/ServerStub.py
U ZODB/trunk/src/ZEO/StorageServer.py
U ZODB/trunk/src/ZEO/tests/Cache.py
U ZODB/trunk/src/ZEO/tests/CommitLockTests.py
U ZODB/trunk/src/ZEO/tests/InvalidationTests.py
U ZODB/trunk/src/ZEO/tests/servertesting.py
U ZODB/trunk/src/ZEO/tests/testZEO.py
U ZODB/trunk/src/ZEO/tests/testZEO2.py
U ZODB/trunk/src/ZEO/zrpc/connection.py
U ZODB/trunk/src/ZODB/Connection.py
U ZODB/trunk/src/ZODB/tests/ConflictResolution.py
U ZODB/trunk/src/ZODB/tests/RevisionStorage.py
U ZODB/trunk/src/ZODB/tests/StorageTestBase.py
U ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py
-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/CHANGES.txt 2010-01-31 19:59:54 UTC (rev 108679)
@@ -14,6 +14,10 @@
database's undo method multiple times in the same transaction now
raises an exception.
+- The ZEO protocol for undo has changed. The only user-visible
+ consequence of this is that when ZODB 3.10 ZEO servers won't support
+ undo for older clients.
+
- The storage API (IStorage) has been tightened. Now, storages should
raise a StorageTransactionError when invalid transactions are passed
to tpc_begin, tpc_vote, or tpc_finish.
Modified: ZODB/trunk/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/trunk/src/ZEO/ClientStorage.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/ClientStorage.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -1198,14 +1198,19 @@
if self._cache is None:
return
+ for oid, _ in self._seriald.iteritems():
+ self._cache.invalidate(oid, tid, False)
+
for oid, data in self._tbuf:
- self._cache.invalidate(oid, tid, False)
# If data is None, we just invalidate.
if data is not None:
s = self._seriald[oid]
if s != ResolvedSerial:
assert s == tid, (s, tid)
self._cache.store(oid, s, None, data)
+ else:
+ # object deletion
+ self._cache.invalidate(oid, tid, False)
if self.fshelper is not None:
blobs = self._tbuf.blobs
@@ -1241,10 +1246,7 @@
"""
self._check_trans(txn)
- tid, oids = self._server.undo(trans_id, id(txn))
- for oid in oids:
- self._tbuf.invalidate(oid)
- return tid, oids
+ self._server.undoa(trans_id, id(txn))
def undoInfo(self, first=0, last=-20, specification=None):
"""Storage API: return undo information."""
Modified: ZODB/trunk/src/ZEO/ServerStub.py
===================================================================
--- ZODB/trunk/src/ZEO/ServerStub.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/ServerStub.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -272,8 +272,8 @@
def new_oid(self):
return self.rpc.call('new_oid')
- def undo(self, trans_id, trans):
- return self.rpc.call('undo', trans_id, trans)
+ def undoa(self, trans_id, trans):
+ self.rpc.callAsync('undoa', trans_id, trans)
def undoLog(self, first, last):
return self.rpc.call('undoLog', first, last)
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/StorageServer.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -20,6 +20,8 @@
exported for invocation by the server.
"""
+from __future__ import with_statement
+
import asyncore
import cPickle
import logging
@@ -32,6 +34,7 @@
import transaction
+import ZODB.blob
import ZODB.serialize
import ZODB.TimeStamp
import ZEO.zrpc.error
@@ -40,7 +43,7 @@
from ZEO.CommitLog import CommitLog
from ZEO.monitor import StorageStats, StatsServer
from ZEO.zrpc.server import Dispatcher
-from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay
+from ZEO.zrpc.connection import ManagedServerConnection, Delay, MTDelay, Result
from ZEO.zrpc.trigger import trigger
from ZEO.Exceptions import AuthError
@@ -48,7 +51,7 @@
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf
-from ZODB.utils import u64, p64, oid_repr, mktemp
+from ZODB.utils import oid_repr, p64, u64, z64
from ZODB.loglevels import BLATHER
@@ -87,7 +90,6 @@
def __init__(self, server, read_only=0, auth_realm=None):
self.server = server
# timeout and stats will be initialized in register()
- self.timeout = None
self.stats = None
self.connection = None
self.client = None
@@ -95,14 +97,13 @@
self.storage_id = "uninitialized"
self.transaction = None
self.read_only = read_only
- self.locked = 0
+ self.locked = False # Don't have storage lock
self.verifying = 0
self.store_failed = 0
self.log_label = _label
self.authenticated = 0
self.auth_realm = auth_realm
self.blob_tempfile = None
- self.blob_log = []
# The authentication protocol may define extra methods.
self._extensions = {}
for func in self.extensions:
@@ -139,24 +140,13 @@
self.log_label = _label + "/" + label
def notifyDisconnected(self):
+ self.connection = None
+
# When this storage closes, we must ensure that it aborts
# any pending transaction.
if self.transaction is not None:
self.log("disconnected during transaction %s" % self.transaction)
- if not self.locked:
- # Delete (d, zeo_storage) from the _waiting list, if found.
- waiting = self.storage._waiting
- for i in range(len(waiting)):
- d, z = waiting[i]
- if z is self:
- del waiting[i]
- self.log("Closed connection removed from waiting list."
- " Clients waiting: %d." % len(waiting))
- break
-
- if self.transaction:
- self.tpc_abort(self.transaction.id)
-
+ self.tpc_abort(self.transaction.id)
else:
self.log("disconnected")
@@ -176,6 +166,7 @@
def setup_delegation(self):
"""Delegate several methods to the storage
"""
+ # Called from register
storage = self.storage
@@ -183,9 +174,6 @@
if not info['supportsUndo']:
self.undoLog = self.undoInfo = lambda *a,**k: ()
- def undo(*a, **k):
- raise NotImplementedError
- self.undo = undo
self.getTid = storage.getTid
self.load = storage.load
@@ -268,6 +256,7 @@
if self.storage is not None:
self.log("duplicate register() call")
raise ValueError("duplicate register() call")
+
storage = self.server.storages.get(storage_id)
if storage is None:
self.log("unknown storage_id: %s" % storage_id)
@@ -280,19 +269,15 @@
self.storage_id = storage_id
self.storage = storage
self.setup_delegation()
- self.timeout, self.stats = self.server.register_connection(storage_id,
- self)
+ self.stats = self.server.register_connection(storage_id, self)
def get_info(self):
storage = self.storage
- try:
- supportsUndo = storage.supportsUndo
- except AttributeError:
- supportsUndo = False
- else:
- supportsUndo = supportsUndo()
+ supportsUndo = (getattr(storage, 'supportsUndo', lambda : False)()
+ and self.connection.peer_protocol_version >= 'Z310')
+
# Communicate the backend storage interfaces to the client
storage_provides = zope.interface.providedBy(storage)
interfaces = []
@@ -419,6 +404,7 @@
self.serials = []
self.invalidated = []
self.txnlog = CommitLog()
+ self.blob_log = []
self.tid = tid
self.status = status
self.store_failed = 0
@@ -437,19 +423,23 @@
def tpc_finish(self, id):
if not self._check_tid(id):
return
- assert self.locked
+ assert self.locked, "finished called wo lock"
+
self.stats.commits += 1
- self.storage.tpc_finish(self.transaction)
+ self.storage.tpc_finish(self.transaction, self._invalidate)
+ # Note that the tid is still current because we still hold the
+ # commit lock. We'll relinquish it in _clear_transaction.
tid = self.storage.lastTransaction()
+ # Return the tid, for cache invalidation optimization
+ return Result(tid, self._clear_transaction)
+
+ def _invalidate(self, tid):
if self.invalidated:
self.server.invalidate(self, self.storage_id, tid,
self.invalidated, self.get_size_info())
- self._clear_transaction()
- # Return the tid, for cache invalidation optimization
- return tid
- def tpc_abort(self, id):
- if not self._check_tid(id):
+ def tpc_abort(self, tid):
+ if not self._check_tid(tid):
return
self.stats.aborts += 1
if self.locked:
@@ -458,111 +448,68 @@
def _clear_transaction(self):
# Common code at end of tpc_finish() and tpc_abort()
- self.stats.active_txns -= 1
- self.transaction = None
- self.txnlog.close()
if self.locked:
+ self.server.unlock_storage(self)
self.locked = 0
- self.timeout.end(self)
- self.stats.lock_time = None
- self.log("Transaction released storage lock", BLATHER)
+ self.transaction = None
+ self.stats.active_txns -= 1
+ if self.txnlog is not None:
+ self.txnlog.close()
+ self.txnlog = None
+ for oid, oldserial, data, blobfilename in self.blob_log:
+ ZODB.blob.remove_committed(blobfilename)
+ del self.blob_log
- # Restart any client waiting for the storage lock.
- while self.storage._waiting:
- delay, zeo_storage = self.storage._waiting.pop(0)
- try:
- zeo_storage._restart(delay)
- except:
- self.log("Unexpected error handling waiting transaction",
- level=logging.WARNING, exc_info=True)
- zeo_storage.connection.close()
- continue
+ def vote(self, tid):
+ self._check_tid(tid, exc=StorageTransactionError)
+ return self._try_to_vote()
- if self.storage._waiting:
- n = len(self.storage._waiting)
- self.log("Blocked transaction restarted. "
- "Clients waiting: %d" % n)
+ def _try_to_vote(self, delay=None):
+ if self.connection is None:
+ return # We're disconnected
+ self.locked = self.server.lock_storage(self)
+ if self.locked:
+ try:
+ self._vote()
+ except Exception:
+ if delay is not None:
+ delay.error()
else:
- self.log("Blocked transaction restarted.")
-
- break
-
- # The following two methods return values, so they must acquire
- # the storage lock and begin the transaction before returning.
-
- # It's a bit vile that undo can cause us to get the lock before vote.
-
- def undo(self, trans_id, id):
- self._check_tid(id, exc=StorageTransactionError)
- if self.locked:
- return self._undo(trans_id)
+ raise
+ else:
+ if delay is not None:
+ delay.reply(None)
else:
- return self._wait(lambda: self._undo(trans_id))
+ if delay == None:
+ self.log("(%r) queue lock: transactions waiting: %s"
+ % (self.storage_id, self.server.waiting(self)+1))
+ delay = Delay()
+ self.server.unlock_callback(self, delay)
+ return delay
- def vote(self, id):
- self._check_tid(id, exc=StorageTransactionError)
- if self.locked:
- return self._vote()
- else:
- return self._wait(lambda: self._vote())
+ def _unlock_callback(self, delay):
+ connection = self.connection
+ if connection is not None:
+ connection.call_from_thread(self._try_to_vote, delay)
- # When a delayed transaction is restarted, the dance is
- # complicated. The restart occurs when one ZEOStorage instance
- # finishes as a transaction and finds another instance is in the
- # _waiting list.
+ def _vote(self):
- # It might be better to have a mechanism to explicitly send
- # the finishing transaction's reply before restarting the waiting
- # transaction. If the restart takes a long time, the previous
- # client will be blocked until it finishes.
-
- def _wait(self, thunk):
- # Wait for the storage lock to be acquired.
- self._thunk = thunk
- if self.tpc_transaction():
- d = Delay()
- self.storage._waiting.append((d, self))
- self.log("Transaction blocked waiting for storage. "
- "Clients waiting: %d." % len(self.storage._waiting))
- return d
- else:
- self.log("Transaction acquired storage lock.", BLATHER)
- return self._restart()
-
- def _restart(self, delay=None):
- # Restart when the storage lock is available.
if self.txnlog.stores == 1:
template = "Preparing to commit transaction: %d object, %d bytes"
else:
template = "Preparing to commit transaction: %d objects, %d bytes"
+
self.log(template % (self.txnlog.stores, self.txnlog.size()),
level=BLATHER)
- self.locked = 1
- self.timeout.begin(self)
- self.stats.lock_time = time.time()
if (self.tid is not None) or (self.status != ' '):
self.storage.tpc_begin(self.transaction, self.tid, self.status)
else:
self.storage.tpc_begin(self.transaction)
try:
- loads, loader = self.txnlog.get_loader()
- for i in range(loads):
- store = loader.load()
- store_type = store[0]
- store_args = store[1:]
-
- if store_type == 'd':
- do_store = self._delete
- elif store_type == 's':
- do_store = self._store
- elif store_type == 'r':
- do_store = self._restore
- else:
- raise ValueError('Invalid store type: %r' % store_type)
-
- if not do_store(*store_args):
+ for op, args in self.txnlog:
+ if not getattr(self, op)(*args):
break
# Blob support
@@ -575,12 +522,17 @@
self._clear_transaction()
raise
- resp = self._thunk()
- if delay is not None:
- delay.reply(resp)
- else:
- return resp
+ if not self.store_failed:
+ # Only call tpc_vote of no store call failed, otherwise
+ # the serialnos() call will deliver an exception that will be
+ # handled by the client in its tpc_vote() method.
+ serials = self.storage.tpc_vote(self.transaction)
+ if serials:
+ self.serials.extend(serials)
+
+ self.client.serialnos(self.serials)
+
# The public methods of the ZEO client API do not do the real work.
# They defer work until after the storage lock has been acquired.
# Most of the real implementations are in methods beginning with
@@ -610,14 +562,18 @@
os.write(self.blob_tempfile[0], chunk)
def storeBlobEnd(self, oid, serial, data, id):
+ self._check_tid(id, exc=StorageTransactionError)
+ assert self.txnlog is not None # effectively not allowed after undo
fd, tempname = self.blob_tempfile
self.blob_tempfile = None
os.close(fd)
self.blob_log.append((oid, serial, data, tempname))
def storeBlobShared(self, oid, serial, data, filename, id):
+ self._check_tid(id, exc=StorageTransactionError)
+ assert self.txnlog is not None # effectively not allowed after undo
+
# Reconstruct the full path from the filename in the OID directory
-
if (os.path.sep in filename
or not (filename.endswith('.tmp')
or filename[:-1].endswith('.tmp')
@@ -635,6 +591,13 @@
def sendBlob(self, oid, serial):
self.client.storeBlob(oid, serial, self.storage.loadBlob(oid, serial))
+ def undo(*a, **k):
+ raise NotImplementedError
+
+ def undoa(self, trans_id, tid):
+ self._check_tid(tid, exc=StorageTransactionError)
+ self.txnlog.undo(trans_id)
+
def _delete(self, oid, serial):
err = None
try:
@@ -721,6 +684,27 @@
return err is None
+ def _undo(self, trans_id):
+ err = None
+ try:
+ tid, oids = self.storage.undo(trans_id, self.transaction)
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except Exception, err:
+ self.store_failed = 1
+ if not isinstance(err, TransactionError):
+ # Unexpected errors are logged and passed to the client
+ self.log("store error: %s, %s" % sys.exc_info()[:2],
+ logging.ERROR, exc_info=True)
+ err = self._marshal_error(err)
+ # The exception is reported back as newserial for this oid
+ self.serials.append((z64, err))
+ else:
+ self.invalidated.extend(oids)
+ self.serials.extend((oid, ResolvedSerial) for oid in oids)
+
+ return err is None
+
def _marshal_error(self, error):
# Try to pickle the exception. If it can't be pickled,
# the RPC response would fail, so use something that can be pickled.
@@ -734,23 +718,6 @@
error = StorageServerError(msg)
return error
- def _vote(self):
- if not self.store_failed:
- # Only call tpc_vote of no store call failed, otherwise
- # the serialnos() call will deliver an exception that will be
- # handled by the client in its tpc_vote() method.
- serials = self.storage.tpc_vote(self.transaction)
- if serials:
- self.serials.extend(serials)
-
- self.client.serialnos(self.serials)
- return
-
- def _undo(self, trans_id):
- tid, oids = self.storage.undo(trans_id, self.transaction)
- self.invalidated.extend(oids)
- return tid, oids
-
# IStorageIteration support
def iterator_start(self, start, stop):
@@ -929,8 +896,12 @@
for name, storage in storages.items()])
log("%s created %s with storages: %s" %
(self.__class__.__name__, read_only and "RO" or "RW", msg))
- for s in storages.values():
- s._waiting = []
+
+
+ self._lock = threading.Lock()
+ self._commit_locks = {}
+ self._unlock_callbacks = dict((name, []) for name in storages)
+
self.read_only = read_only
self.auth_protocol = auth_protocol
self.auth_database = auth_database
@@ -1044,7 +1015,7 @@
Returns the timeout and stats objects for the appropriate storage.
"""
self.connections[storage_id].append(conn)
- return self.timeouts[storage_id], self.stats[storage_id]
+ return self.stats[storage_id]
def _invalidateCache(self, storage_id):
"""We need to invalidate any caches we have.
@@ -1195,8 +1166,6 @@
self.dispatcher.close()
if self.monitor is not None:
self.monitor.close()
- for storage in self.storages.values():
- storage.close()
# Force the asyncore mainloop to exit by hackery, i.e. close
# every socket in the map. loop() will return when the map is
# empty.
@@ -1206,6 +1175,8 @@
except:
pass
asyncore.socket_map.clear()
+ for storage in self.storages.values():
+ storage.close()
def close_conn(self, conn):
"""Internal: remove the given connection from self.connections.
@@ -1216,7 +1187,46 @@
if conn.obj in cl:
cl.remove(conn.obj)
+ def lock_storage(self, zeostore):
+ storage_id = zeostore.storage_id
+ with self._lock:
+ if storage_id in self._commit_locks:
+ return False
+ self._commit_locks[storage_id] = zeostore
+ self.timeouts[storage_id].begin(zeostore)
+ self.stats[storage_id].lock_time = time.time()
+ return True
+ def unlock_storage(self, zeostore):
+ storage_id = zeostore.storage_id
+ with self._lock:
+ assert self._commit_locks[storage_id] is zeostore
+ del self._commit_locks[storage_id]
+ self.timeouts[storage_id].end(zeostore)
+ self.stats[storage_id].lock_time = None
+ callbacks = self._unlock_callbacks[storage_id][:]
+ del self._unlock_callbacks[storage_id][:]
+
+ if callbacks:
+ zeostore.log("(%r) unlock: transactions waiting: %s"
+ % (storage_id, len(callbacks)-1))
+
+ for zeostore, delay in callbacks:
+ try:
+ zeostore._unlock_callback(delay)
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except Exception:
+ logger.exception("Calling unlock callback")
+
+ def unlock_callback(self, zeostore, delay):
+ storage_id = zeostore.storage_id
+ with self._lock:
+ self._unlock_callbacks[storage_id].append((zeostore, delay))
+
+ def waiting(self, zeostore):
+ return len(self._unlock_callbacks[zeostore.storage_id])
+
class StubTimeoutThread:
def begin(self, client):
@@ -1238,7 +1248,6 @@
self._client = None
self._deadline = None
self._cond = threading.Condition() # Protects _client and _deadline
- self._trigger = trigger()
def begin(self, client):
# Called from the restart code the "main" thread, whenever the
@@ -1281,7 +1290,7 @@
if howlong <= 0:
client.log("Transaction timeout after %s seconds" %
self._timeout)
- self._trigger.pull_trigger(lambda: client.connection.close())
+ client.connection.trigger.pull_trigger(client.connection.close)
else:
time.sleep(howlong)
Modified: ZODB/trunk/src/ZEO/tests/Cache.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/Cache.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/tests/Cache.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -37,14 +37,11 @@
# Now start an undo transaction
t = Transaction()
t.note('undo1')
- self._storage.tpc_begin(t)
+ oids = self._begin_undos_vote(t, tid)
- tid, oids = self._storage.undo(tid, t)
-
# Make sure this doesn't load invalid data into the cache
self._storage.load(oid, '')
- self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
assert len(oids) == 1
Modified: ZODB/trunk/src/ZEO/tests/CommitLockTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/CommitLockTests.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/tests/CommitLockTests.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -181,64 +181,3 @@
self._finish_threads()
self._cleanup()
-
-class CommitLockUndoTests(CommitLockTests):
-
- def _get_trans_id(self):
- self._dostore()
- L = self._storage.undoInfo()
- return L[0]['id']
-
- def _begin_undo(self, trans_id, txn):
- rpc = self._storage._server.rpc
- return rpc._deferred_call('undo', trans_id, id(txn))
-
- def _finish_undo(self, msgid):
- return self._storage._server.rpc._deferred_wait(msgid)
-
- def checkCommitLockUndoFinish(self):
- trans_id = self._get_trans_id()
- oid, txn = self._start_txn()
- msgid = self._begin_undo(trans_id, txn)
-
- self._begin_threads()
-
- self._finish_undo(msgid)
- self._storage.tpc_vote(txn)
- self._storage.tpc_finish(txn)
- self._storage.load(oid, '')
-
- self._finish_threads()
-
- self._dostore()
- self._cleanup()
-
- def checkCommitLockUndoAbort(self):
- trans_id = self._get_trans_id()
- oid, txn = self._start_txn()
- msgid = self._begin_undo(trans_id, txn)
-
- self._begin_threads()
-
- self._finish_undo(msgid)
- self._storage.tpc_vote(txn)
- self._storage.tpc_abort(txn)
-
- self._finish_threads()
-
- self._dostore()
- self._cleanup()
-
- def checkCommitLockUndoClose(self):
- trans_id = self._get_trans_id()
- oid, txn = self._start_txn()
- msgid = self._begin_undo(trans_id, txn)
- self._begin_threads()
-
- self._finish_undo(msgid)
- self._storage.tpc_vote(txn)
- self._storage.close()
-
- self._finish_threads()
-
- self._cleanup()
Modified: ZODB/trunk/src/ZEO/tests/InvalidationTests.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/InvalidationTests.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/tests/InvalidationTests.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -318,9 +318,9 @@
# tearDown then immediately, but if other threads are still
# running that can lead to a cascade of spurious exceptions.
for t in threads:
- t.join(10)
+ t.join(30)
for t in threads:
- t.cleanup()
+ t.cleanup(10)
def checkConcurrentUpdates2Storages_emulated(self):
self._storage = storage1 = self.openClientStorage()
@@ -378,6 +378,34 @@
db1.close()
db2.close()
+ def checkConcurrentUpdates19Storages(self):
+ n = 19
+ dbs = [DB(self.openClientStorage()) for i in range(n)]
+ self._storage = dbs[0].storage
+ stop = threading.Event()
+
+ cn = dbs[0].open()
+ tree = cn.root()["tree"] = OOBTree()
+ transaction.commit()
+ cn.close()
+
+ # Run threads that update the BTree
+ cd = {}
+ threads = [self.StressThread(dbs[i], stop, i, cd, i, n)
+ for i in range(n)]
+ self.go(stop, cd, *threads)
+
+ while len(set(db.lastTransaction() for db in dbs)) > 1:
+ _ = [db._storage.sync() for db in dbs]
+
+ cn = dbs[0].open()
+ tree = cn.root()["tree"]
+ self._check_tree(cn, tree)
+ self._check_threads(tree, *threads)
+
+ cn.close()
+ _ = [db.close() for db in dbs]
+
def checkConcurrentUpdates1Storage(self):
self._storage = storage1 = self.openClientStorage()
db1 = DB(storage1)
Modified: ZODB/trunk/src/ZEO/tests/servertesting.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/servertesting.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/tests/servertesting.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -58,3 +58,7 @@
print self.name, 'callAsync', meth, repr(args)
callAsyncNoPoll = callAsync
+
+ def call_from_thread(self, *args):
+ if args:
+ args[0](*args[1:])
Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -25,7 +25,6 @@
from ZODB.tests.MinPO import MinPO
from ZODB.tests.StorageTestBase import zodb_unpickle
-import asyncore
import doctest
import logging
import os
@@ -244,7 +243,6 @@
class FullGenericTests(
GenericTests,
Cache.TransUndoStorageWithCache,
- CommitLockTests.CommitLockUndoTests,
ConflictResolution.ConflictResolvingStorage,
ConflictResolution.ConflictResolvingTransUndoStorage,
PackableStorage.PackableUndoStorage,
@@ -727,6 +725,10 @@
blob_cache_dir = 'blobs'
shared_blob_dir = True
+class FauxConn:
+ addr = 'x'
+ peer_protocol_version = ZEO.zrpc.connection.Connection.current_protocol
+
class StorageServerClientWrapper:
def __init__(self):
@@ -743,8 +745,8 @@
def __init__(self, server, storage_id):
self.storage_id = storage_id
self.server = ZEO.StorageServer.ZEOStorage(server, server.read_only)
+ self.server.notifyConnected(FauxConn())
self.server.register(storage_id, False)
- self.server._thunk = lambda : None
self.server.client = StorageServerClientWrapper()
def sortKey(self):
@@ -766,8 +768,7 @@
self.server.tpc_begin(id(transaction), '', '', {}, None, ' ')
def tpc_vote(self, transaction):
- self.server._restart()
- self.server.vote(id(transaction))
+ assert self.server.vote(id(transaction)) is None
result = self.server.client.serials[:]
del self.server.client.serials[:]
return result
@@ -775,8 +776,11 @@
def store(self, oid, serial, data, version_ignored, transaction):
self.server.storea(oid, serial, data, id(transaction))
+ def send_reply(self, *args): # Masquerade as conn
+ pass
+
def tpc_finish(self, transaction, func = lambda: None):
- self.server.tpc_finish(id(transaction))
+ self.server.tpc_finish(id(transaction)).set_sender(0, self)
def multiple_storages_invalidation_queue_is_not_insane():
@@ -849,6 +853,7 @@
>>> fs = FileStorage('t.fs')
>>> sv = StorageServer(('', get_port()), dict(fs=fs))
>>> s = ZEOStorage(sv, sv.read_only)
+ >>> s.notifyConnected(FauxConn())
>>> s.register('fs', False)
If we ask for the last transaction, we should get the last transaction
@@ -941,7 +946,7 @@
... def close(self):
... print 'connection closed'
... trigger = property(lambda self: self)
- ... pull_trigger = lambda self, func: func()
+ ... pull_trigger = lambda self, func, *args: func(*args)
>>> class ConnectionManager:
... def __init__(self, addr, client, tmin, tmax):
@@ -1251,6 +1256,8 @@
>>> thread.join(1)
"""
+
+
if sys.version_info >= (2, 6):
import multiprocessing
@@ -1259,28 +1266,32 @@
q.put((name, conn.root.x))
conn.close()
- def work_with_multiprocessing():
- """Client storage should work with multi-processing.
+ class MultiprocessingTests(unittest.TestCase):
- >>> import StringIO
- >>> sys.stdin = StringIO.StringIO()
- >>> addr, _ = start_server()
- >>> conn = ZEO.connection(addr)
- >>> conn.root.x = 1
- >>> transaction.commit()
- >>> q = multiprocessing.Queue()
- >>> processes = [multiprocessing.Process(
- ... target=work_with_multiprocessing_process,
- ... args=(i, addr, q))
- ... for i in range(3)]
- >>> _ = [p.start() for p in processes]
- >>> sorted(q.get(timeout=60) for p in processes)
- [(0, 1), (1, 1), (2, 1)]
+ def test_work_with_multiprocessing(self):
+ "Client storage should work with multi-processing."
- >>> _ = [p.join(30) for p in processes]
- >>> conn.close()
- """
+ self.globs = {}
+ forker.setUp(self)
+ addr, adminaddr = self.globs['start_server']()
+ conn = ZEO.connection(addr)
+ conn.root.x = 1
+ transaction.commit()
+ q = multiprocessing.Queue()
+ processes = [multiprocessing.Process(
+ target=work_with_multiprocessing_process,
+ args=(i, addr, q))
+ for i in range(3)]
+ _ = [p.start() for p in processes]
+ self.assertEqual(sorted(q.get(timeout=300) for p in processes),
+ [(0, 1), (1, 1), (2, 1)])
+ _ = [p.join(30) for p in processes]
+ conn.close()
+ zope.testing.setupstack.tearDown(self)
+else:
+ class MultiprocessingTests(unittest.TestCase):
+ pass
slow_test_classes = [
BlobAdaptedFileStorageTests, BlobWritableCacheTests,
@@ -1353,6 +1364,7 @@
# unit test layer
zeo = unittest.TestSuite()
zeo.addTest(unittest.makeSuite(ZODB.tests.util.AAAA_Test_Runner_Hack))
+ zeo.addTest(unittest.makeSuite(MultiprocessingTests))
zeo.addTest(doctest.DocTestSuite(
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown))
zeo.addTest(doctest.DocTestSuite(ZEO.tests.IterationTests,
Modified: ZODB/trunk/src/ZEO/tests/testZEO2.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO2.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/tests/testZEO2.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -93,9 +93,9 @@
handled correctly:
>>> zs1.tpc_abort('0') # doctest: +ELLIPSIS
+ (511/test-addr) ('1') unlock: transactions waiting: 0
2 callAsync serialnos ...
reply 1 None
- (511/test-addr) Blocked transaction restarted.
>>> fs.tpc_transaction() is not None
True
Modified: ZODB/trunk/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/trunk/src/ZEO/zrpc/connection.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZEO/zrpc/connection.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -55,6 +55,16 @@
log("Error raised in delayed method", logging.ERROR, exc_info=True)
self.conn.return_error(self.msgid, *exc_info[:2])
+class Result(Delay):
+
+ def __init__(self, *args):
+ self.args = args
+
+ def set_sender(self, msgid, conn):
+ reply, callback = self.args
+ conn.send_reply(msgid, reply, False)
+ callback()
+
class MTDelay(Delay):
def __init__(self):
@@ -218,18 +228,25 @@
# restorea, iterator_start, iterator_next,
# iterator_record_start, iterator_record_next,
# iterator_gc
+ #
+ # Z310 -- named after the ZODB release 3.10
+ # New server methods:
+ # undoa
+ # Doesn't support undo for older clients.
+ # Undone oid info returned by vote.
# Protocol variables:
# Our preferred protocol.
- current_protocol = "Z309"
+ current_protocol = "Z310"
# If we're a client, an exhaustive list of the server protocols we
# can accept.
- servers_we_can_talk_to = ["Z308", current_protocol]
+ servers_we_can_talk_to = ["Z308", "Z309", current_protocol]
# If we're a server, an exhaustive list of the client protocols we
# can accept.
- clients_we_can_talk_to = ["Z200", "Z201", "Z303", "Z308", current_protocol]
+ clients_we_can_talk_to = [
+ "Z200", "Z201", "Z303", "Z308", "Z309", current_protocol]
# This is pretty excruciating. Details:
#
Modified: ZODB/trunk/src/ZODB/Connection.py
===================================================================
--- ZODB/trunk/src/ZODB/Connection.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZODB/Connection.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -666,32 +666,11 @@
self._cache.update_object_size_estimation(oid, len(p))
obj._p_estimated_size = len(p)
- self._handle_serial(s, oid)
+ self._handle_serial(oid, s)
- def _handle_serial(self, store_return, oid=None, change=1):
- """Handle the returns from store() and tpc_vote() calls."""
-
- # These calls can return different types depending on whether
- # ZEO is used. ZEO uses asynchronous returns that may be
- # returned in batches by the ClientStorage. ZEO1 can also
- # return an exception object and expect that the Connection
- # will raise the exception.
-
- # When conflict resolution occurs, the object state held by
- # the connection does not match what is written to the
- # database. Invalidate the object here to guarantee that
- # the new state is read the next time the object is used.
-
- if not store_return:
+ def _handle_serial(self, oid, serial, change=True):
+ if not serial:
return
- if isinstance(store_return, str):
- assert oid is not None
- self._handle_one_serial(oid, store_return, change)
- else:
- for oid, serial in store_return:
- self._handle_one_serial(oid, serial, change)
-
- def _handle_one_serial(self, oid, serial, change):
if not isinstance(serial, str):
raise serial
obj = self._cache.get(oid, None)
@@ -757,7 +736,9 @@
except AttributeError:
return
s = vote(transaction)
- self._handle_serial(s)
+ if s:
+ for oid, serial in s:
+ self._handle_serial(oid, serial)
def tpc_finish(self, transaction):
"""Indicate confirmation that the transaction is done."""
@@ -1171,7 +1152,7 @@
s = self._storage.store(oid, serial, data,
'', transaction)
- self._handle_serial(s, oid, change=False)
+ self._handle_serial(oid, s, change=False)
src.close()
def _abort_savepoint(self):
Modified: ZODB/trunk/src/ZODB/tests/ConflictResolution.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/ConflictResolution.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZODB/tests/ConflictResolution.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -158,6 +158,7 @@
t = Transaction()
self._storage.tpc_begin(t)
self._storage.undo(tid, t)
+ self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
def checkUndoUnresolvable(self):
@@ -177,7 +178,5 @@
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
- self._storage.tpc_begin(t)
- self.assertRaises(UndoError, self._storage.undo,
- tid, t)
+ self.assertRaises(UndoError, self._begin_undos_vote, t, tid)
self._storage.tpc_abort(t)
Modified: ZODB/trunk/src/ZODB/tests/RevisionStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/RevisionStorage.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZODB/tests/RevisionStorage.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -122,7 +122,7 @@
tid = info[0]["id"]
# Always undo the most recent txn, so the value will
# alternate between 3 and 4.
- self._undo(tid, [oid], note="undo %d" % i)
+ self._undo(tid, note="undo %d" % i)
revs.append(self._storage.load(oid, ""))
prev_tid = None
Modified: ZODB/trunk/src/ZODB/tests/StorageTestBase.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/StorageTestBase.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZODB/tests/StorageTestBase.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -209,10 +209,12 @@
t = transaction.Transaction()
t.note(note or "undo")
self._storage.tpc_begin(t)
- tid, oids = self._storage.undo(tid, t)
- self._storage.tpc_vote(t)
+ undo_result = self._storage.undo(tid, t)
+ vote_result = self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
if expected_oids is not None:
+ oids = undo_result and undo_result[1] or []
+ oids.extend(oid for (oid, _) in vote_result or ())
self.assertEqual(len(oids), len(expected_oids), repr(oids))
for oid in expected_oids:
self.assert_(oid in oids)
Modified: ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py 2010-01-31 19:59:52 UTC (rev 108678)
+++ ZODB/trunk/src/ZODB/tests/TransactionalUndoStorage.py 2010-01-31 19:59:54 UTC (rev 108679)
@@ -101,12 +101,20 @@
for rec in txn:
pass
+ def _begin_undos_vote(self, t, *tids):
+ self._storage.tpc_begin(t)
+ oids = []
+ for tid in tids:
+ undo_result = self._storage.undo(tid, t)
+ if undo_result:
+ oids.extend(undo_result[1])
+ oids.extend(oid for (oid, _) in self._storage.tpc_vote(t) or ())
+ return oids
+
def undo(self, tid, note):
t = Transaction()
t.note(note)
- self._storage.tpc_begin(t)
- oids = self._storage.undo(tid, t)
- self._storage.tpc_vote(t)
+ oids = self._begin_undos_vote(t, tid)
self._storage.tpc_finish(t)
return oids
@@ -152,9 +160,7 @@
tid = info[0]['id']
t = Transaction()
t.note('undo1')
- self._storage.tpc_begin(t)
- self._storage.undo(tid, t)
- self._storage.tpc_vote(t)
+ self._begin_undos_vote(t, tid)
self._storage.tpc_finish(t)
# Check that calling getTid on an uncreated object raises a KeyError
# The current version of FileStorage fails this test
@@ -281,14 +287,10 @@
tid = info[0]['id']
tid1 = info[1]['id']
t = Transaction()
- self._storage.tpc_begin(t)
- tid, oids = self._storage.undo(tid, t)
- tid, oids1 = self._storage.undo(tid1, t)
- self._storage.tpc_vote(t)
+ oids = self._begin_undos_vote(t, tid, tid1)
self._storage.tpc_finish(t)
# We get the finalization stuff called an extra time:
- eq(len(oids), 2)
- eq(len(oids1), 2)
+ eq(len(oids), 4)
unless(oid1 in oids)
unless(oid2 in oids)
data, revid1 = self._storage.load(oid1, '')
@@ -355,9 +357,7 @@
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
- self._storage.tpc_begin(t)
- tid, oids = self._storage.undo(tid, t)
- self._storage.tpc_vote(t)
+ oids = self._begin_undos_vote(t, tid)
self._storage.tpc_finish(t)
eq(len(oids), 1)
self.failUnless(oid1 in oids)
@@ -368,7 +368,6 @@
eq(zodb_unpickle(data), MinPO(54))
self._iterate()
-
def checkNotUndoable(self):
eq = self.assertEqual
# Set things up so we've got a transaction that can't be undone
@@ -380,10 +379,7 @@
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
- self._storage.tpc_begin(t)
- self.assertRaises(POSException.UndoError,
- self._storage.undo,
- tid, t)
+ self.assertRaises(POSException.UndoError, self._begin_undos_vote, t, tid)
self._storage.tpc_abort(t)
# Now have more fun: object1 and object2 are in the same transaction,
# which we'll try to undo to, but one of them has since modified in
@@ -419,10 +415,7 @@
info = self._storage.undoInfo()
tid = info[1]['id']
t = Transaction()
- self._storage.tpc_begin(t)
- self.assertRaises(POSException.UndoError,
- self._storage.undo,
- tid, t)
+ self.assertRaises(POSException.UndoError, self._begin_undos_vote, t, tid)
self._storage.tpc_abort(t)
self._iterate()
@@ -439,7 +432,7 @@
# So, basically, this makes sure that undo info doesn't depend
# on file positions. We change the file positions in an undo
# record by packing.
-
+
# Add a few object revisions
oid = '\0'*8
revid0 = self._dostore(oid, data=MinPO(50))
@@ -462,9 +455,7 @@
self.assertEqual(len(info2), 2)
# And now attempt to undo the last transaction
t = Transaction()
- self._storage.tpc_begin(t)
- tid, oids = self._storage.undo(tid, t)
- self._storage.tpc_vote(t)
+ oids = self._begin_undos_vote(t, tid)
self._storage.tpc_finish(t)
self.assertEqual(len(oids), 1)
self.assertEqual(oids[0], oid)
More information about the Zodb-checkins
mailing list