[Zope-Checkins] CVS: ZODB3/ZEO - StorageServer.py:1.88

Jeremy Hylton jeremy@zope.com
Thu, 9 Jan 2003 13:45:11 -0500


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv28132

Modified Files:
	StorageServer.py 
Log Message:
Swap order of StorageServer and ZEOStorage to avoid patchup at the
end.


=== ZODB3/ZEO/StorageServer.py 1.87 => 1.88 ===
--- ZODB3/ZEO/StorageServer.py:1.87	Thu Jan  9 13:18:54 2003
+++ ZODB3/ZEO/StorageServer.py	Thu Jan  9 13:45:08 2003
@@ -56,208 +56,6 @@
 class StorageServerError(StorageError):
     """Error reported when an unpickleable exception is raised."""
 
-class StorageServer:
-
-    """The server side implementation of ZEO.
-
-    The StorageServer is the 'manager' for incoming connections.  Each
-    connection is associated with its own ZEOStorage instance (defined
-    below).  The StorageServer may handle multiple storages; each
-    ZEOStorage instance only handles a single storage.
-    """
-
-    # Classes we instantiate.  A subclass might override.
-
-    DispatcherClass = Dispatcher
-    ZEOStorageClass = None # patched up later
-    ManagedServerConnectionClass = ManagedServerConnection
-
-    def __init__(self, addr, storages, read_only=0,
-                 invalidation_queue_size=100,
-                 transaction_timeout=None):
-        """StorageServer constructor.
-
-        This is typically invoked from the start.py script.
-
-        Arguments (the first two are required and positional):
-
-        addr -- the address at which the server should listen.  This
-            can be a tuple (host, port) to signify a TCP/IP connection
-            or a pathname string to signify a Unix domain socket
-            connection.  A hostname may be a DNS name or a dotted IP
-            address.
-
-        storages -- a dictionary giving the storage(s) to handle.  The
-            keys are the storage names, the values are the storage
-            instances, typically FileStorage or Berkeley storage
-            instances.  By convention, storage names are typically
-            strings representing small integers starting at '1'.
-
-        read_only -- an optional flag saying whether the server should
-            operate in read-only mode.  Defaults to false.  Note that
-            even if the server is operating in writable mode,
-            individual storages may still be read-only.  But if the
-            server is in read-only mode, no write operations are
-            allowed, even if the storages are writable.  Note that
-            pack() is considered a read-only operation.
-
-        invalidation_queue_size -- The storage server keeps a queue
-            of the objects modified by the last N transactions, where
-            N == invalidation_queue_size.  This queue is used to
-            speed client cache verification when a client disconnects
-            for a short period of time.
-
-        transaction_timout -- The maximum amount of time to wait for
-            a transaction to commit after acquiring the storage lock.
-            If the transaction takes too long, the client connection
-            will be closed and the transaction aborted.
-        """
-
-        self.addr = addr
-        self.storages = storages
-        set_label()
-        msg = ", ".join(
-            ["%s:%s:%s" % (name, storage.isReadOnly() and "RO" or "RW",
-                           storage.getName())
-             for name, storage in storages.items()])
-        log("%s created %s with storages: %s" %
-            (self.__class__.__name__, read_only and "RO" or "RW", msg))
-        for s in storages.values():
-            s._waiting = []
-        self.read_only = read_only
-        # A list of at most invalidation_queue_size invalidations
-        self.invq = []
-        self.invq_bound = invalidation_queue_size
-        self.connections = {}
-        self.dispatcher = self.DispatcherClass(addr,
-                                               factory=self.new_connection,
-                                               reuse_addr=1)
-        self.timeouts = {}
-        for name in self.storages.keys():
-            if transaction_timeout is None:
-                # An object with no-op methods
-                timeout = StubTimeoutThread()
-            else:
-                timeout = TimeoutThread(transaction_timeout)
-                timeout.start()
-            self.timeouts[name] = timeout
-
-    def new_connection(self, sock, addr):
-        """Internal: factory to create a new connection.
-
-        This is called by the Dispatcher class in ZEO.zrpc.server
-        whenever accept() returns a socket for a new incoming
-        connection.
-        """
-        z = self.ZEOStorageClass(self, self.read_only)
-        c = self.ManagedServerConnectionClass(sock, addr, z, self)
-        log("new connection %s: %s" % (addr, `c`))
-        return c
-
-    def register_connection(self, storage_id, conn):
-        """Internal: register a connection with a particular storage.
-
-        This is called by ZEOStorage.register().
-
-        The dictionary self.connections maps each storage name to a
-        list of current connections for that storage; this information
-        is needed to handle invalidation.  This function updates this
-        dictionary.
-
-        Returns the timeout object for the appropriate storage.
-        """
-        l = self.connections.get(storage_id)
-        if l is None:
-            l = self.connections[storage_id] = []
-        l.append(conn)
-        return self.timeouts[storage_id]
-
-    def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
-        """Internal: broadcast info and invalidations to clients.
-
-        This is called from several ZEOStorage methods.
-
-        This can do three different things:
-
-        - If the invalidated argument is non-empty, it broadcasts
-          invalidateTransaction() messages to all clients of the given
-          storage except the current client (the conn argument).
-
-        - If the invalidated argument is empty and the info argument
-          is a non-empty dictionary, it broadcasts info() messages to
-          all clients of the given storage, including the current
-          client.
-
-        - If both the invalidated argument and the info argument are
-          non-empty, it broadcasts invalidateTransaction() messages to all
-          clients except the current, and sends an info() message to
-          the current client.
-
-        """
-        if invalidated:
-            if len(self.invq) >= self.invq_bound:
-                del self.invq[0]
-            self.invq.append((tid, invalidated))
-        for p in self.connections.get(storage_id, ()):
-            if invalidated and p is not conn:
-                p.client.invalidateTransaction(tid, invalidated)
-            elif info is not None:
-                p.client.info(info)
-
-    def get_invalidations(self, tid):
-        """Return a tid and list of all objects invalidation since tid.
-
-        The tid is the most recent transaction id committed by the server.
-
-        Returns None if it is unable to provide a complete list
-        of invalidations for tid.  In this case, client should
-        do full cache verification.
-        """
-
-        if not self.invq:
-            log("invq empty")
-            return None, []
-        
-        earliest_tid = self.invq[0][0]
-        if earliest_tid > tid:
-            log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
-            return None, []
-        
-        oids = {}
-        for tid, L in self.invq:
-            for key in L:
-                oids[key] = 1
-        latest_tid = self.invq[-1][0]
-        return latest_tid, oids.keys()
-
-    def close_server(self):
-        """Close the dispatcher so that there are no new connections.
-
-        This is only called from the test suite, AFAICT.
-        """
-        for timeout in self.timeouts.values():
-            timeout.stop()
-        self.dispatcher.close()
-        for storage in self.storages.values():
-            storage.close()
-        # Force the asyncore mainloop to exit by hackery, i.e. close
-        # every socket in the map.  loop() will return when the map is
-        # empty.
-        for s in asyncore.socket_map.values():
-            try:
-                s.close()
-            except:
-                pass
-
-    def close_conn(self, conn):
-        """Internal: remove the given connection from self.connections.
-
-        This is the inverse of register_connection().
-        """
-        for cl in self.connections.values():
-            if conn.obj in cl:
-                cl.remove(conn.obj)
-
 class ZEOStorage:
     """Proxy to underlying storage for a single remote client."""
 
@@ -727,6 +525,208 @@
         else:
             return 1
 
+class StorageServer:
+
+    """The server side implementation of ZEO.
+
+    The StorageServer is the 'manager' for incoming connections.  Each
+    connection is associated with its own ZEOStorage instance (defined
+    below).  The StorageServer may handle multiple storages; each
+    ZEOStorage instance only handles a single storage.
+    """
+
+    # Classes we instantiate.  A subclass might override.
+
+    DispatcherClass = Dispatcher
+    ZEOStorageClass = ZEOStorage
+    ManagedServerConnectionClass = ManagedServerConnection
+
+    def __init__(self, addr, storages, read_only=0,
+                 invalidation_queue_size=100,
+                 transaction_timeout=None):
+        """StorageServer constructor.
+
+        This is typically invoked from the start.py script.
+
+        Arguments (the first two are required and positional):
+
+        addr -- the address at which the server should listen.  This
+            can be a tuple (host, port) to signify a TCP/IP connection
+            or a pathname string to signify a Unix domain socket
+            connection.  A hostname may be a DNS name or a dotted IP
+            address.
+
+        storages -- a dictionary giving the storage(s) to handle.  The
+            keys are the storage names, the values are the storage
+            instances, typically FileStorage or Berkeley storage
+            instances.  By convention, storage names are typically
+            strings representing small integers starting at '1'.
+
+        read_only -- an optional flag saying whether the server should
+            operate in read-only mode.  Defaults to false.  Note that
+            even if the server is operating in writable mode,
+            individual storages may still be read-only.  But if the
+            server is in read-only mode, no write operations are
+            allowed, even if the storages are writable.  Note that
+            pack() is considered a read-only operation.
+
+        invalidation_queue_size -- The storage server keeps a queue
+            of the objects modified by the last N transactions, where
+            N == invalidation_queue_size.  This queue is used to
+            speed client cache verification when a client disconnects
+            for a short period of time.
+
+        transaction_timout -- The maximum amount of time to wait for
+            a transaction to commit after acquiring the storage lock.
+            If the transaction takes too long, the client connection
+            will be closed and the transaction aborted.
+        """
+
+        self.addr = addr
+        self.storages = storages
+        set_label()
+        msg = ", ".join(
+            ["%s:%s:%s" % (name, storage.isReadOnly() and "RO" or "RW",
+                           storage.getName())
+             for name, storage in storages.items()])
+        log("%s created %s with storages: %s" %
+            (self.__class__.__name__, read_only and "RO" or "RW", msg))
+        for s in storages.values():
+            s._waiting = []
+        self.read_only = read_only
+        # A list of at most invalidation_queue_size invalidations
+        self.invq = []
+        self.invq_bound = invalidation_queue_size
+        self.connections = {}
+        self.dispatcher = self.DispatcherClass(addr,
+                                               factory=self.new_connection,
+                                               reuse_addr=1)
+        self.timeouts = {}
+        for name in self.storages.keys():
+            if transaction_timeout is None:
+                # An object with no-op methods
+                timeout = StubTimeoutThread()
+            else:
+                timeout = TimeoutThread(transaction_timeout)
+                timeout.start()
+            self.timeouts[name] = timeout
+
+    def new_connection(self, sock, addr):
+        """Internal: factory to create a new connection.
+
+        This is called by the Dispatcher class in ZEO.zrpc.server
+        whenever accept() returns a socket for a new incoming
+        connection.
+        """
+        z = self.ZEOStorageClass(self, self.read_only)
+        c = self.ManagedServerConnectionClass(sock, addr, z, self)
+        log("new connection %s: %s" % (addr, `c`))
+        return c
+
+    def register_connection(self, storage_id, conn):
+        """Internal: register a connection with a particular storage.
+
+        This is called by ZEOStorage.register().
+
+        The dictionary self.connections maps each storage name to a
+        list of current connections for that storage; this information
+        is needed to handle invalidation.  This function updates this
+        dictionary.
+
+        Returns the timeout object for the appropriate storage.
+        """
+        l = self.connections.get(storage_id)
+        if l is None:
+            l = self.connections[storage_id] = []
+        l.append(conn)
+        return self.timeouts[storage_id]
+
+    def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
+        """Internal: broadcast info and invalidations to clients.
+
+        This is called from several ZEOStorage methods.
+
+        This can do three different things:
+
+        - If the invalidated argument is non-empty, it broadcasts
+          invalidateTransaction() messages to all clients of the given
+          storage except the current client (the conn argument).
+
+        - If the invalidated argument is empty and the info argument
+          is a non-empty dictionary, it broadcasts info() messages to
+          all clients of the given storage, including the current
+          client.
+
+        - If both the invalidated argument and the info argument are
+          non-empty, it broadcasts invalidateTransaction() messages to all
+          clients except the current, and sends an info() message to
+          the current client.
+
+        """
+        if invalidated:
+            if len(self.invq) >= self.invq_bound:
+                del self.invq[0]
+            self.invq.append((tid, invalidated))
+        for p in self.connections.get(storage_id, ()):
+            if invalidated and p is not conn:
+                p.client.invalidateTransaction(tid, invalidated)
+            elif info is not None:
+                p.client.info(info)
+
+    def get_invalidations(self, tid):
+        """Return a tid and list of all objects invalidation since tid.
+
+        The tid is the most recent transaction id committed by the server.
+
+        Returns None if it is unable to provide a complete list
+        of invalidations for tid.  In this case, client should
+        do full cache verification.
+        """
+
+        if not self.invq:
+            log("invq empty")
+            return None, []
+        
+        earliest_tid = self.invq[0][0]
+        if earliest_tid > tid:
+            log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
+            return None, []
+        
+        oids = {}
+        for tid, L in self.invq:
+            for key in L:
+                oids[key] = 1
+        latest_tid = self.invq[-1][0]
+        return latest_tid, oids.keys()
+
+    def close_server(self):
+        """Close the dispatcher so that there are no new connections.
+
+        This is only called from the test suite, AFAICT.
+        """
+        for timeout in self.timeouts.values():
+            timeout.stop()
+        self.dispatcher.close()
+        for storage in self.storages.values():
+            storage.close()
+        # Force the asyncore mainloop to exit by hackery, i.e. close
+        # every socket in the map.  loop() will return when the map is
+        # empty.
+        for s in asyncore.socket_map.values():
+            try:
+                s.close()
+            except:
+                pass
+
+    def close_conn(self, conn):
+        """Internal: remove the given connection from self.connections.
+
+        This is the inverse of register_connection().
+        """
+        for cl in self.connections.values():
+            if conn.obj in cl:
+                cl.remove(conn.obj)
+
 class StubTimeoutThread:
 
     def begin(self, client):
@@ -844,6 +844,3 @@
             self.delay.error(sys.exc_info())
         else:
             self.delay.reply(result)
-
-# Patch up class references
-StorageServer.ZEOStorageClass = ZEOStorage