[Zodb-checkins] SVN: ZODB/trunk/src/ZEO/ Updated to support the
registerDB framework which allows storages to
Jim Fulton
jim at zope.com
Fri May 11 16:04:15 EDT 2007
Log message for revision 75688:
Updated to support the registerDB framework which allows storages to
generate it's own invalidations. Also updated to honor the storage
APIs more carefully. These changes together allow a ClientStorage to
be served by a storage server.
Changed:
U ZODB/trunk/src/ZEO/StorageServer.py
A ZODB/trunk/src/ZEO/tests/registerDB.test
U ZODB/trunk/src/ZEO/tests/testZEO.py
A ZODB/trunk/src/ZEO/tests/zeo-fan-out.test
-=-
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py 2007-05-11 20:04:07 UTC (rev 75687)
+++ ZODB/trunk/src/ZEO/StorageServer.py 2007-05-11 20:04:14 UTC (rev 75688)
@@ -31,6 +31,9 @@
import transaction
+import ZODB.serialize
+import ZEO.zrpc.error
+
from ZEO import ClientStub
from ZEO.CommitLog import CommitLog
from ZEO.monitor import StorageStats, StatsServer
@@ -625,24 +628,37 @@
self.log(msg, logging.ERROR)
err = StorageServerError(msg)
# The exception is reported back as newserial for this oid
- newserial = err
+ newserial = [(oid, err)]
else:
if serial != "\0\0\0\0\0\0\0\0":
self.invalidated.append((oid, version))
- if newserial == ResolvedSerial:
- self.stats.conflicts_resolved += 1
- self.log("conflict resolved oid=%s" % oid_repr(oid), BLATHER)
- self.serials.append((oid, newserial))
+
+ if isinstance(newserial, str):
+ newserial = [(oid, newserial)]
+
+ if newserial:
+ for oid, s in newserial:
+
+ if s == ResolvedSerial:
+ self.stats.conflicts_resolved += 1
+ self.log("conflict resolved oid=%s"
+ % oid_repr(oid), BLATHER)
+
+ self.serials.append((oid, s))
+
return err is None
def _vote(self):
+ if not self.store_failed:
+ # Only call tpc_vote of no store call failed, otherwise
+ # the serialnos() call will deliver an exception that will be
+ # handled by the client in its tpc_vote() method.
+ serials = self.storage.tpc_vote(self.transaction)
+ if serials:
+ self.serials.extend(serials)
+
self.client.serialnos(self.serials)
- # If a store call failed, then return to the client immediately.
- # The serialnos() call will deliver an exception that will be
- # handled by the client in its tpc_vote() method.
- if self.store_failed:
- return
- return self.storage.tpc_vote(self.transaction)
+ return
def _abortVersion(self, src):
tid, oids = self.storage.abortVersion(src, self.transaction)
@@ -741,7 +757,31 @@
else:
return 1
+class StorageServerDB:
+ def __init__(self, server, storage_id):
+ self.server = server
+ self.storage_id = storage_id
+ self.references = ZODB.serialize.referencesf
+
+ def invalidate(self, tid, oids, version=''):
+ storage_id = self.storage_id
+ self.server.invalidate(
+ None, storage_id, tid,
+ [(oid, version) for oid in oids],
+ )
+ for zeo_server in self.server.connections.get(storage_id, ())[:]:
+ try:
+ zeo_server.connection.poll()
+ except ZEO.zrpc.error.DisconnectedError:
+ pass
+ else:
+ break # We only need to pull one :)
+
+ def invalidateCache(self):
+ self.server._invalidateCache(self.storage_id)
+
+
class StorageServer:
"""The server side implementation of ZEO.
@@ -845,17 +885,12 @@
# The list is kept in sorted order with the most recent
# invalidation at the front. The list never has more than
# self.invq_bound elements.
+ self.invq_bound = invalidation_queue_size
self.invq = {}
for name, storage in storages.items():
- lastInvalidations = getattr(storage, 'lastInvalidations', None)
- if lastInvalidations is None:
- self.invq[name] = [(storage.lastTransaction(), None)]
- else:
- self.invq[name] = list(
- lastInvalidations(invalidation_queue_size)
- )
- self.invq[name].reverse()
- self.invq_bound = invalidation_queue_size
+ self._setup_invq(name, storage)
+ storage.registerDB(StorageServerDB(self, name))
+
self.connections = {}
self.dispatcher = self.DispatcherClass(addr,
factory=self.new_connection)
@@ -875,6 +910,17 @@
else:
self.monitor = None
+ def _setup_invq(self, name, storage):
+ lastInvalidations = getattr(storage, 'lastInvalidations', None)
+ if lastInvalidations is None:
+ self.invq[name] = [(storage.lastTransaction(), None)]
+ else:
+ self.invq[name] = list(
+ lastInvalidations(self.invq_bound)
+ )
+ self.invq[name].reverse()
+
+
def _setup_auth(self, protocol):
# Can't be done in global scope, because of cyclic references
from ZEO.auth import get_module
@@ -947,6 +993,49 @@
stats.clients += 1
return self.timeouts[storage_id], stats
+ def _invalidateCache(self, storage_id):
+ """We need to invalidate any caches we have.
+
+ This basically means telling our clients to
+ invalidate/revalidate their caches. We do this by closing them
+ and making them reconnect.
+ """
+
+ # This method can be called from foreign threads. We have to
+ # worry about interaction with the main thread.
+
+ # 1. We modify self.invq which is read by get_invalidations
+ # below. This is why get_invalidations makes a copy of
+ # self.invq.
+
+ # 2. We access connections. There are two dangers:
+ #
+ # a. We miss a new connection. This is not a problem because
+ # if a client connects after we get the list of connections,
+ # then it will have to read the invalidation queue, which
+ # has already been reset.
+ #
+ # b. A connection is closes while we are iterating. This
+ # doesn't matter, bacause we can call should_close on a closed
+ # connection.
+
+ # Rebuild invq
+ self._setup_invq(storage_id, self.storages[storage_id])
+
+ connections = self.connections.get(storage_id, ())
+
+ # Make a copy since we are going to be mutating the
+ # connections indirectoy by closing them. We don't care about
+ # later transactions since they will have to validate their
+ # caches anyway.
+ connections = connections[:]
+ for p in connections:
+ try:
+ p.connection.should_close()
+ except ZEO.zrpc.error.DisconnectedError:
+ pass
+
+
def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
"""Internal: broadcast info and invalidations to clients.
@@ -972,6 +1061,27 @@
"""
+ # This method can be called from foreign threads. We have to
+ # worry about interaction with the main thread.
+
+ # 1. We modify self.invq which is read by get_invalidations
+ # below. This is why get_invalidations makes a copy of
+ # self.invq.
+
+ # 2. We access connections. There are two dangers:
+ #
+ # a. We miss a new connection. This is not a problem because
+ # we are called while the storage lock is held. A new
+ # connection that tries to read data won't read committed
+ # data without first recieving an invalidation. Also, if a
+ # client connects after getting the list of connections,
+ # then it will have to read the invalidation queue, which
+ # has been updated to reflect the invalidations.
+ #
+ # b. A connection is closes while we are iterating. We'll need
+ # to cactch and ignore Disconnected errors.
+
+
if invalidated:
invq = self.invq[storage_id]
if len(invq) >= self.invq_bound:
@@ -980,7 +1090,11 @@
for p in self.connections.get(storage_id, ()):
if invalidated and p is not conn:
- p.client.invalidateTransaction(tid, invalidated)
+ try:
+ p.client.invalidateTransaction(tid, invalidated)
+ except ZEO.zrpc.error.DisconnectedError:
+ pass
+
elif info is not None:
p.client.info(info)
@@ -994,7 +1108,13 @@
do full cache verification.
"""
+
invq = self.invq[storage_id]
+
+ # We make a copy of invq because it might be modified by a
+ # foreign (other than main thread) calling invalidate above.
+ invq = invq[:]
+
if not invq:
log("invq empty")
return None, []
Copied: ZODB/trunk/src/ZEO/tests/registerDB.test (from rev 75684, ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test)
Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py 2007-05-11 20:04:07 UTC (rev 75687)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py 2007-05-11 20:04:14 UTC (rev 75688)
@@ -26,6 +26,9 @@
import unittest
import shutil
+import zope.testing.setupstack
+from zope.testing import doctest
+
# ZODB test support
import ZODB
import ZODB.tests.util
@@ -150,11 +153,13 @@
self._servers = [adminaddr]
self._conf_path = path
if not self.blob_cache_dir:
- self.blob_cache_dir = tempfile.mkdtemp() # This is the blob cache for ClientStorage
- self._storage = ClientStorage(zport, '1', cache_size=20000000,
- min_disconnect_poll=0.5, wait=1,
- wait_timeout=60, blob_dir=self.blob_cache_dir,
- blob_cache_writable=self.blob_cache_writable)
+ # This is the blob cache for ClientStorage
+ self.blob_cache_dir = tempfile.mkdtemp()
+ self._storage = ClientStorage(
+ zport, '1', cache_size=20000000,
+ min_disconnect_poll=0.5, wait=1,
+ wait_timeout=60, blob_dir=self.blob_cache_dir,
+ blob_cache_writable=self.blob_cache_writable)
self._storage.registerDB(DummyDB())
def tearDown(self):
@@ -816,10 +821,20 @@
BlobAdaptedFileStorageTests, BlobWritableCacheTests]
+def zeoFanOutSetup(test):
+ zope.testing.setupstack.setUpDirectory(test)
+
def test_suite():
suite = unittest.TestSuite()
suite.addTest(doctest.DocTestSuite(setUp=ZODB.tests.util.setUp,
tearDown=ZODB.tests.util.tearDown))
+ suite.addTest(doctest.DocFileSuite('registerDB.test'))
+ suite.addTest(
+ doctest.DocFileSuite('zeo-fan-out.test',
+ setUp=zeoFanOutSetup,
+ tearDown=zope.testing.setupstack.tearDown,
+ ),
+ )
for klass in test_classes:
sub = unittest.makeSuite(klass, "check")
suite.addTest(sub)
Copied: ZODB/trunk/src/ZEO/tests/zeo-fan-out.test (from rev 75684, ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test)
More information about the Zodb-checkins
mailing list