[Zodb-checkins] SVN: ZODB/branches/jim-zeo-registerdb/src/Z
	checkpointing.
    Jim Fulton 
    jim at zope.com
       
    Thu May 10 18:10:52 EDT 2007
    
    
  
Log message for revision 75679:
  checkpointing.
  
Changed:
  U   ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py
  U   ZODB/branches/jim-zeo-registerdb/src/ZEO/interfaces.py
  U   ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test
  U   ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py
  A   ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test
  U   ZODB/branches/jim-zeo-registerdb/src/ZEO/zrpc/smac.py
  U   ZODB/branches/jim-zeo-registerdb/src/ZODB/interfaces.py
  U   ZODB/branches/jim-zeo-registerdb/src/ZODB/tests/testFileStorage.py
-=-
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py	2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py	2007-05-10 22:10:52 UTC (rev 75679)
@@ -32,6 +32,7 @@
 import transaction
 
 import ZODB.serialize
+import ZEO.zrpc.error
 from ZEO import ClientStub
 from ZEO.CommitLog import CommitLog
 from ZEO.monitor import StorageStats, StatsServer
@@ -630,20 +631,33 @@
         else:
             if serial != "\0\0\0\0\0\0\0\0":
                 self.invalidated.append((oid, version))
-        if newserial == ResolvedSerial:
-            self.stats.conflicts_resolved += 1
-            self.log("conflict resolved oid=%s" % oid_repr(oid), BLATHER)
-        self.serials.append((oid, newserial))
+
+        if isinstance(newserial, str):
+            newserial = (oid, newserial)
+
+        if newserial:
+            for oid, s in newserial:
+
+                if s == ResolvedSerial:
+                    self.stats.conflicts_resolved += 1
+                    self.log("conflict resolved oid=%s"
+                             % oid_repr(oid), BLATHER)
+
+                self.serials.append((oid, s))
+
         return err is None
 
     def _vote(self):
+        if not self.store_failed:
+            # Only call tpc_vote of no store call failed, otherwise
+            # the serialnos() call will deliver an exception that will be
+            # handled by the client in its tpc_vote() method.
+            serials = self.storage.tpc_vote(self.transaction)
+            if serials:
+                self.serials.extend(serials)
+
         self.client.serialnos(self.serials)
-        # If a store call failed, then return to the client immediately.
-        # The serialnos() call will deliver an exception that will be
-        # handled by the client in its tpc_vote() method.
-        if self.store_failed:
-            return
-        return self.storage.tpc_vote(self.transaction)
+        return
 
     def _abortVersion(self, src):
         tid, oids = self.storage.abortVersion(src, self.transaction)
@@ -747,7 +761,7 @@
     def __init__(self, server, storage_id):
         self.server = server
         self.storage_id = storage_id
-        self.references = ZODB.serial.referencesf
+        self.references = ZODB.serialize.referencesf
 
     def invalidate(self, tid, oids, version=''):
         self.server.invalidate(
@@ -756,7 +770,8 @@
             )
 
     def invalidateCache(self):
-        pass
+        self.server._invalidateCache(self.storage_id)
+        
 
 class StorageServer:
 
@@ -861,17 +876,12 @@
         # The list is kept in sorted order with the most recent
         # invalidation at the front.  The list never has more than
         # self.invq_bound elements.
+        self.invq_bound = invalidation_queue_size
         self.invq = {}
         for name, storage in storages.items():
-            lastInvalidations = getattr(storage, 'lastInvalidations', None)
-            if lastInvalidations is None:
-                self.invq[name] = [(storage.lastTransaction(), None)]
-            else:
-                self.invq[name] = list(
-                    lastInvalidations(invalidation_queue_size)
-                    )
-                self.invq[name].reverse()
-        self.invq_bound = invalidation_queue_size
+            self._setup_invq(name, storage)
+            storage.registerDB(StorageServerDB(self, name))
+
         self.connections = {}
         self.dispatcher = self.DispatcherClass(addr,
                                                factory=self.new_connection)
@@ -891,6 +901,17 @@
         else:
             self.monitor = None
 
+    def _setup_invq(self, name, storage):
+        lastInvalidations = getattr(storage, 'lastInvalidations', None)
+        if lastInvalidations is None:
+            self.invq[name] = [(storage.lastTransaction(), None)]
+        else:
+            self.invq[name] = list(
+                lastInvalidations(self.invq_bound)
+                )
+            self.invq[name].reverse()
+
+
     def _setup_auth(self, protocol):
         # Can't be done in global scope, because of cyclic references
         from ZEO.auth import get_module
@@ -963,6 +984,49 @@
         stats.clients += 1
         return self.timeouts[storage_id], stats
 
+    def _invalidateCache(self, storage_id):
+        """We need to invalidate any caches we have.
+
+        This basically means telling our clients to
+        invalidate/revalidate their caches. We do this by closing them
+        and making them reconnect.
+        """
+
+        # This method can be called from foreign threads.  We have to
+        # worry about interaction with the main thread.
+
+        # 1. We modify self.invq which is read by get_invalidations
+        #    below. This is why get_invalidations makes a copy of
+        #    self.invq.
+
+        # 2. We access connections.  There are two dangers:
+        #
+        # a. We miss a new connection.  This is not a problem because
+        #    if a client connects after we get the list of connections,
+        #    then it will have to read the invalidation queue, which
+        #    has already been reset.
+        #
+        # b. A connection is closes while we are iterating.  This
+        #    doesn't matter, bacause we can call should_close on a closed
+        #    connection.
+
+        # Rebuild invq
+        self._setup_invq(storage_id, self.storages[storage_id])
+
+        connections = self.connections.get(storage_id, ())
+
+        # Make a copy since we are going to be mutating the
+        # connections indirectoy by closing them.  We don't care about
+        # later transactions since they will have to validate their
+        # caches anyway.
+        connections = connections[:]        
+        for p in connections:
+            try:
+                p.connection.should_close()
+            except ZEO.zrpc.error.DisconnectedError:
+                pass
+        
+
     def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
         """Internal: broadcast info and invalidations to clients.
 
@@ -988,6 +1052,27 @@
 
         """
 
+        # This method can be called from foreign threads.  We have to
+        # worry about interaction with the main thread.
+
+        # 1. We modify self.invq which is read by get_invalidations
+        #    below. This is why get_invalidations makes a copy of
+        #    self.invq.
+
+        # 2. We access connections.  There are two dangers:
+        #
+        # a. We miss a new connection.  This is not a problem because
+        #    we are called while the storage lock is held.  A new
+        #    connection that tries to read data won't read committed
+        #    data without first recieving an invalidation.  Also, if a
+        #    client connects after getting the list of connections,
+        #    then it will have to read the invalidation queue, which
+        #    has been updated to reflect the invalidations.
+        #
+        # b. A connection is closes while we are iterating. We'll need
+        #    to cactch and ignore Disconnected errors.
+        
+
         if invalidated:
             invq = self.invq[storage_id]
             if len(invq) >= self.invq_bound:
@@ -996,7 +1081,11 @@
 
         for p in self.connections.get(storage_id, ()):
             if invalidated and p is not conn:
-                p.client.invalidateTransaction(tid, invalidated)
+                try:
+                    p.client.invalidateTransaction(tid, invalidated)
+                except ZEO.zrpc.error.DisconnectedError:
+                    pass
+
             elif info is not None:
                 p.client.info(info)
 
@@ -1010,7 +1099,13 @@
         do full cache verification.
         """
 
+        
         invq = self.invq[storage_id]
+
+        # We make a copy of invq because it might be modified by a
+        # foreign (other than main thread) calling invalidate above.        
+        invq = invq[:]
+
         if not invq:
             log("invq empty")
             return None, []
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/interfaces.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/interfaces.py	2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/interfaces.py	2007-05-10 22:10:52 UTC (rev 75679)
@@ -43,5 +43,5 @@
         An iterable of up to size entries must be returned, where each
         entry is a transaction id and a sequence of object-id/version
         pairs describing the objects and versions written by the
-        transaction, ordered starting at the most recent.
+        transaction, in chronological order.
         """
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test	2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test	2007-05-10 22:10:52 UTC (rev 75679)
@@ -1,7 +1,7 @@
 Storage Servers should call registerDB on storages to propigate invalidations
 =============================================================================
 
-Storages servers propigate invalidations from their storages. Among
+Storages servers propagate invalidations from their storages. Among
 other things, this allows client storages to be used in storage
 servers, allowing storage-server fan out, spreading read load over
 multiple storage servers.
@@ -23,12 +23,18 @@
     ...     def lastInvalidations(self, size):
     ...         return list(self.invalidations)
 
-We'll create a storage and a storage server using it:
+We dont' want the storage server to try to bind to a socket.  We'll
+subclass it and give it a do-nothing dispatcher "class":
 
-    >>> storage = FauxStorage()
     >>> import ZEO.StorageServer
-    >>> server = ZEO.StorageServer.StorageServer('addr', dict(t=storage))
+    >>> class StorageServer(ZEO.StorageServer.StorageServer):
+    ...     DispatcherClass = lambda *a, **k: None
 
+We'll create a storage instance and a storage server using it:
+
+    >>> storage = FauxStorage()
+    >>> server = StorageServer('addr', dict(t=storage))
+
 Our storage now has a db attribute that provides IStorageDB.  It's
 references method is just the referencesf function from ZODB.Serialize
 
@@ -50,47 +56,50 @@
     ...     def __init__(self, mgr, obj):
     ...         self.mgr = mgr
     ...         self.obj = obj
-    ...     def close(self):
+    ...     def should_close(self):
     ...         print 'closed', self.obj.name
     ...         self.mgr.close_conn(self)
 
     >>> class ZEOStorage:
     ...     def __init__(self, server, name):
+    ...         self.name = name
     ...         self.connection = Connection(server, self)
     ...         self.client = Client(name)
 
 Now, we'll register the client with the storage server:
 
-    >>> server.register_connection('t', ZEOStorage(server, 1))
-    >>> server.register_connection('t', ZEOStorage(server, 2))
+    >>> _ = server.register_connection('t', ZEOStorage(server, 1))
+    >>> _ = server.register_connection('t', ZEOStorage(server, 2))
     
 Now, if we call invalidate, we'll see it propigate to the client:
 
-    >>> storage.db.invalidate('trans2, ['ob1', 'ob2'])
-    invalidateTransaction trans1 1
+    >>> storage.db.invalidate('trans2', ['ob1', 'ob2'])
+    invalidateTransaction trans2 1
     [('ob1', ''), ('ob2', '')]
-    invalidateTransaction trans1 2
+    invalidateTransaction trans2 2
     [('ob1', ''), ('ob2', '')]
 
-    >>> storage.db.invalidate('trans3, ['ob1', 'ob2'], 'v')
-    invalidateTransaction trans2 1
+    >>> storage.db.invalidate('trans3', ['ob1', 'ob2'], 'v')
+    invalidateTransaction trans3 1
     [('ob1', 'v'), ('ob2', 'v')]
-    invalidateTransaction trans2 2
+    invalidateTransaction trans3 2
     [('ob1', 'v'), ('ob2', 'v')]
 
 The storage servers queue will reflect the invalidations:
 
     >>> for tid, invalidated in server.invq['t']:
     ...     print repr(tid), invalidated
-    `trans2` [('ob1', 'v'), ('ob2', 'v')]
-    `trans1` [('ob1', ''), ('ob2', '')]
+    'trans3' [('ob1', 'v'), ('ob2', 'v')]
+    'trans2' [('ob1', ''), ('ob2', '')]
+    'trans1' [('ob0', ''), ('ob1', '')]
+    'trans0' [('ob0', '')]
 
 If we call invalidateCache, the storage server will close each of it's
 connections:
 
     >>> storage.db.invalidateCache()
-    close 1
-    close 2
+    closed 1
+    closed 2
 
 The connections will then reopen and revalidate their caches.
     
@@ -98,3 +107,5 @@
 
     >>> for tid, invalidated in server.invq['t']:
     ...     print repr(tid), invalidated
+    'trans1' [('ob0', ''), ('ob1', '')]
+    'trans0' [('ob0', '')]
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py	2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py	2007-05-10 22:10:52 UTC (rev 75679)
@@ -26,6 +26,8 @@
 import unittest
 import shutil
 
+import zope.testing.setupstack
+
 # ZODB test support
 import ZODB
 import ZODB.tests.util
@@ -150,11 +152,13 @@
         self._servers = [adminaddr]
         self._conf_path = path
         if not self.blob_cache_dir:
-            self.blob_cache_dir = tempfile.mkdtemp()  # This is the blob cache for ClientStorage
-        self._storage = ClientStorage(zport, '1', cache_size=20000000,
-                                      min_disconnect_poll=0.5, wait=1,
-                                      wait_timeout=60, blob_dir=self.blob_cache_dir,
-                                      blob_cache_writable=self.blob_cache_writable)
+            # This is the blob cache for ClientStorage
+            self.blob_cache_dir = tempfile.mkdtemp()
+        self._storage = ClientStorage(
+            zport, '1', cache_size=20000000,
+            min_disconnect_poll=0.5, wait=1,
+            wait_timeout=60, blob_dir=self.blob_cache_dir,
+            blob_cache_writable=self.blob_cache_writable)
         self._storage.registerDB(DummyDB())
 
     def tearDown(self):
@@ -816,11 +820,20 @@
                 BlobAdaptedFileStorageTests, BlobWritableCacheTests]
 
 
+def zeoFanOutSetup(test):
+    zope.testing.setupstack.setUpDirectory(test)
+
 def test_suite():
     suite = unittest.TestSuite()
     suite.addTest(doctest.DocTestSuite(setUp=ZODB.tests.util.setUp,
                                        tearDown=ZODB.tests.util.tearDown))
     suite.addTest(doctest.DocFileSuite('registerDB.test'))
+    suite.addTest(
+        doctest.DocFileSuite('zeo-fan-out.test',
+                             setUp=zeoFanOutSetup,
+                             tearDown=zope.testing.setupstack.tearDown,
+                             ),
+        )
     for klass in test_classes:
         sub = unittest.makeSuite(klass, "check")
         suite.addTest(sub)
Added: ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test	2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test	2007-05-10 22:10:52 UTC (rev 75679)
@@ -0,0 +1,133 @@
+ZEO Fan Out
+===========
+
+We should be able to set up ZEO servers with ZEO clients.  Let's see
+if we can make it work.
+
+We'll use some helper functions.  The first is a helpter that starts
+ZEO servers for us and another one that picks ports.
+
+We'll start the first server:
+
+    >>> import ZEO.tests.forker, ZEO.tests.testZEO
+    >>> port0 = ZEO.tests.testZEO.get_port()
+    >>> zconf0 = ZEO.tests.forker.ZEOConfig(('', port0))
+    >>> zport0, adminaddr0, pid0, path0 = ZEO.tests.forker.start_zeo_server(
+    ...    '<filestorage 1>\n  path fs\n</filestorage>\n', zconf0, port0)
+
+
+Then we''ll start 2 others that use this one:
+
+    >>> port1 = ZEO.tests.testZEO.get_port()
+    >>> zconf1 = ZEO.tests.forker.ZEOConfig(('', port1))
+    >>> zport1, adminaddr1, pid1, path1 = ZEO.tests.forker.start_zeo_server(
+    ...    '<zeoclient 1>\n  server %s\n</zeoclient>\n' % port0,
+    ...     zconf1, port1)
+
+    >>> port2 = ZEO.tests.testZEO.get_port()
+    >>> zconf2 = ZEO.tests.forker.ZEOConfig(('', port2))
+    >>> zport2, adminaddr2, pid2, path2 = ZEO.tests.forker.start_zeo_server(
+    ...    '<zeoclient 1>\n  server %s\n</zeoclient>\n' % port0,
+    ...     zconf2, port2)
+
+Now, let's create some client storages that connect to these:
+
+    >>> import ZEO.ClientStorage
+    >>> cs1 = ZEO.ClientStorage.ClientStorage(('', port1), '1')
+    >>> cs2 = ZEO.ClientStorage.ClientStorage(('', port2), '1')
+
+And some databases and connections around these:
+
+    >>> from ZODB.DB import DB
+    >>> import transaction
+
+    >>> db1 = DB(cs1)
+    >>> tm1 = transaction.TransactionManager()
+    >>> c1 = db1.open(transaction_manager=tm1)
+    >>> r1 = c1.root()
+    >>> r1
+    {}
+
+    >>> db2 = DB(cs2)
+    >>> tm2 = transaction.TransactionManager()
+    >>> c2 = db2.open(transaction_manager=tm2)
+    >>> r2 = c2.root()
+    >>> r2
+    {}
+
+If we update c1, we'll eventually see the change in c2:
+
+    >>> import persistent
+    >>> class P(persistent.Persistent):
+    ...     pass
+
+    >>> r1[1] = P()
+    >>> r1[1].v = 1000
+    >>> r1[2] = P()
+    >>> r1[2].v = -1000
+
+    >>> import time
+    >>> for i in range(100):
+    ...     c2.sync()
+    ...     if r2:
+    ...         break
+    ...     time.sleep(0.01)
+    
+    >>> r2[1].v
+    1000
+
+    >>> r2[2].v
+    -1000
+
+Now, let's see if we can break it. :)
+
+    >>> def f():
+    ...     for in in range(100):
+    ...         r1[1] -= 1
+    ...         r1[2] += 1
+    ...         tm1.commit()
+    ...         time.sleep(0.01)
+    >>> import thread
+    >>> thread.start_new_thread(f, ())
+
+    >>> for i in range(1000):
+    ...     c2.sync()
+    ...     if c2[1] + c2[2]:
+    ...         print 'oops', c2[1], c2[2]
+    ...     if not c2[1]:
+    ...         break
+    ...     time.sleep(0.01)
+    
+If we shutdown and restart the source server, the variables will be
+invalidated:
+
+    >>> forker.shutdown_zeo_server(adminaddr0)
+    >>> zport0, adminaddr0, pid0, path0 = ZEO.tests.forker.start_zeo_server(
+    ...    '<filestorage 1>\n  path fs\n</filestorage>\n', zconf0, port0)
+    
+    >>> for i in range(1000):
+    ...     c1.sync()
+    ...     c2.sync()
+    ...     if (
+    ...         (r1[1]._p_changed is None)
+    ...         and
+    ...         (r1[2]._p_changed is None)
+    ...         and
+    ...         (r2[1]._p_changed is None)
+    ...         and
+    ...         (r2[2]._p_changed is None)
+    ...        ):
+    ...        print 'Cool'
+    ...        break
+    ...     time.sleep(0.01)
+    ... else:
+    ...     print 'Dang'
+    Cool
+
+Cleanup:
+
+    >>> db1.close()
+    >>> db2.close()
+    >>> ZEO.tests.forker.shutdown_zeo_server(adminaddr0)
+    >>> ZEO.tests.forker.shutdown_zeo_server(adminaddr1)
+    >>> ZEO.tests.forker.shutdown_zeo_server(adminaddr2)
Property changes on: ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test
___________________________________________________________________
Name: svn:eol-style
   + native
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/zrpc/smac.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/zrpc/smac.py	2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/zrpc/smac.py	2007-05-10 22:10:52 UTC (rev 75679)
@@ -68,6 +68,8 @@
 
 MAC_BIT = 0x80000000L
 
+_close_marker = object()
+
 class SizedMessageAsyncConnection(asyncore.dispatcher):
     __super_init = asyncore.dispatcher.__init__
     __super_close = asyncore.dispatcher.close
@@ -235,6 +237,9 @@
         else:
             return True
 
+    def should_close(self):
+        self.__output.append(_close_marker)
+
     def handle_write(self):
         self.__output_lock.acquire()
         try:
@@ -250,7 +255,13 @@
 
                 l = 0
                 for i in range(len(output)):
-                    l += len(output[i])
+                    try:
+                        l += len(output[i])
+                    except TypeError:
+                        # We had an output marker, close the connection
+                        assert output[i] is _close_marker
+                        return self.close()
+                    
                     if l > SEND_SIZE:
                         break
 
Modified: ZODB/branches/jim-zeo-registerdb/src/ZODB/interfaces.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZODB/interfaces.py	2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZODB/interfaces.py	2007-05-10 22:10:52 UTC (rev 75679)
@@ -614,6 +614,12 @@
 
         - An iterable of object-id and serial pairs giving new serials
           for objects.
+
+        A serial, returned as a string or in a sequence of oid/serial
+        pairs, may be the special value
+        ZODB.ConflictResolution.ResolvedSerial to indicate that a
+        conflict occured and that the object should be invalidated.
+        
         """
 
     def tpc_abort(transaction):
@@ -663,8 +669,14 @@
         The return value can be either None or a sequence of object-id
         and serial pairs giving new serials for objects who's ids were
         passed to previous store calls in the same transaction.
-        After the tpc_vote call, bew serials must have been returned,
+        After the tpc_vote call, new serials must have been returned,
         either from tpc_vote or store for objects passed to store.
+
+        A serial returned in a sequence of oid/serial pairs, may be
+        the special value ZODB.ConflictResolution.ResolvedSerial to
+        indicate that a conflict occured and that the object should be
+        invalidated.
+
         """
 
 class IStorageRestoreable(IStorage):
Modified: ZODB/branches/jim-zeo-registerdb/src/ZODB/tests/testFileStorage.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZODB/tests/testFileStorage.py	2007-05-10 17:39:33 UTC (rev 75678)
+++ ZODB/branches/jim-zeo-registerdb/src/ZODB/tests/testFileStorage.py	2007-05-10 22:10:52 UTC (rev 75679)
@@ -483,7 +483,7 @@
 def lastInvalidations():
     """
 
-The last invalidations method is used by a storage server to pupulate
+The last invalidations method is used by a storage server to populate
 it's data structure of recent invalidations.  The lastInvalidations
 method is passed a count and must return up to count number of the
 most recent transactions.
    
    
More information about the Zodb-checkins
mailing list