[Zope-Checkins] CVS: ZODB3/ZEO - StorageServer.py:1.88
Jeremy Hylton
jeremy@zope.com
Thu, 9 Jan 2003 13:45:11 -0500
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv28132
Modified Files:
StorageServer.py
Log Message:
Swap order of StorageServer and ZEOStorage to avoid patchup at the
end.
=== ZODB3/ZEO/StorageServer.py 1.87 => 1.88 ===
--- ZODB3/ZEO/StorageServer.py:1.87 Thu Jan 9 13:18:54 2003
+++ ZODB3/ZEO/StorageServer.py Thu Jan 9 13:45:08 2003
@@ -56,208 +56,6 @@
class StorageServerError(StorageError):
"""Error reported when an unpickleable exception is raised."""
-class StorageServer:
-
- """The server side implementation of ZEO.
-
- The StorageServer is the 'manager' for incoming connections. Each
- connection is associated with its own ZEOStorage instance (defined
- below). The StorageServer may handle multiple storages; each
- ZEOStorage instance only handles a single storage.
- """
-
- # Classes we instantiate. A subclass might override.
-
- DispatcherClass = Dispatcher
- ZEOStorageClass = None # patched up later
- ManagedServerConnectionClass = ManagedServerConnection
-
- def __init__(self, addr, storages, read_only=0,
- invalidation_queue_size=100,
- transaction_timeout=None):
- """StorageServer constructor.
-
- This is typically invoked from the start.py script.
-
- Arguments (the first two are required and positional):
-
- addr -- the address at which the server should listen. This
- can be a tuple (host, port) to signify a TCP/IP connection
- or a pathname string to signify a Unix domain socket
- connection. A hostname may be a DNS name or a dotted IP
- address.
-
- storages -- a dictionary giving the storage(s) to handle. The
- keys are the storage names, the values are the storage
- instances, typically FileStorage or Berkeley storage
- instances. By convention, storage names are typically
- strings representing small integers starting at '1'.
-
- read_only -- an optional flag saying whether the server should
- operate in read-only mode. Defaults to false. Note that
- even if the server is operating in writable mode,
- individual storages may still be read-only. But if the
- server is in read-only mode, no write operations are
- allowed, even if the storages are writable. Note that
- pack() is considered a read-only operation.
-
- invalidation_queue_size -- The storage server keeps a queue
- of the objects modified by the last N transactions, where
- N == invalidation_queue_size. This queue is used to
- speed client cache verification when a client disconnects
- for a short period of time.
-
- transaction_timout -- The maximum amount of time to wait for
- a transaction to commit after acquiring the storage lock.
- If the transaction takes too long, the client connection
- will be closed and the transaction aborted.
- """
-
- self.addr = addr
- self.storages = storages
- set_label()
- msg = ", ".join(
- ["%s:%s:%s" % (name, storage.isReadOnly() and "RO" or "RW",
- storage.getName())
- for name, storage in storages.items()])
- log("%s created %s with storages: %s" %
- (self.__class__.__name__, read_only and "RO" or "RW", msg))
- for s in storages.values():
- s._waiting = []
- self.read_only = read_only
- # A list of at most invalidation_queue_size invalidations
- self.invq = []
- self.invq_bound = invalidation_queue_size
- self.connections = {}
- self.dispatcher = self.DispatcherClass(addr,
- factory=self.new_connection,
- reuse_addr=1)
- self.timeouts = {}
- for name in self.storages.keys():
- if transaction_timeout is None:
- # An object with no-op methods
- timeout = StubTimeoutThread()
- else:
- timeout = TimeoutThread(transaction_timeout)
- timeout.start()
- self.timeouts[name] = timeout
-
- def new_connection(self, sock, addr):
- """Internal: factory to create a new connection.
-
- This is called by the Dispatcher class in ZEO.zrpc.server
- whenever accept() returns a socket for a new incoming
- connection.
- """
- z = self.ZEOStorageClass(self, self.read_only)
- c = self.ManagedServerConnectionClass(sock, addr, z, self)
- log("new connection %s: %s" % (addr, `c`))
- return c
-
- def register_connection(self, storage_id, conn):
- """Internal: register a connection with a particular storage.
-
- This is called by ZEOStorage.register().
-
- The dictionary self.connections maps each storage name to a
- list of current connections for that storage; this information
- is needed to handle invalidation. This function updates this
- dictionary.
-
- Returns the timeout object for the appropriate storage.
- """
- l = self.connections.get(storage_id)
- if l is None:
- l = self.connections[storage_id] = []
- l.append(conn)
- return self.timeouts[storage_id]
-
- def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
- """Internal: broadcast info and invalidations to clients.
-
- This is called from several ZEOStorage methods.
-
- This can do three different things:
-
- - If the invalidated argument is non-empty, it broadcasts
- invalidateTransaction() messages to all clients of the given
- storage except the current client (the conn argument).
-
- - If the invalidated argument is empty and the info argument
- is a non-empty dictionary, it broadcasts info() messages to
- all clients of the given storage, including the current
- client.
-
- - If both the invalidated argument and the info argument are
- non-empty, it broadcasts invalidateTransaction() messages to all
- clients except the current, and sends an info() message to
- the current client.
-
- """
- if invalidated:
- if len(self.invq) >= self.invq_bound:
- del self.invq[0]
- self.invq.append((tid, invalidated))
- for p in self.connections.get(storage_id, ()):
- if invalidated and p is not conn:
- p.client.invalidateTransaction(tid, invalidated)
- elif info is not None:
- p.client.info(info)
-
- def get_invalidations(self, tid):
- """Return a tid and list of all objects invalidation since tid.
-
- The tid is the most recent transaction id committed by the server.
-
- Returns None if it is unable to provide a complete list
- of invalidations for tid. In this case, client should
- do full cache verification.
- """
-
- if not self.invq:
- log("invq empty")
- return None, []
-
- earliest_tid = self.invq[0][0]
- if earliest_tid > tid:
- log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
- return None, []
-
- oids = {}
- for tid, L in self.invq:
- for key in L:
- oids[key] = 1
- latest_tid = self.invq[-1][0]
- return latest_tid, oids.keys()
-
- def close_server(self):
- """Close the dispatcher so that there are no new connections.
-
- This is only called from the test suite, AFAICT.
- """
- for timeout in self.timeouts.values():
- timeout.stop()
- self.dispatcher.close()
- for storage in self.storages.values():
- storage.close()
- # Force the asyncore mainloop to exit by hackery, i.e. close
- # every socket in the map. loop() will return when the map is
- # empty.
- for s in asyncore.socket_map.values():
- try:
- s.close()
- except:
- pass
-
- def close_conn(self, conn):
- """Internal: remove the given connection from self.connections.
-
- This is the inverse of register_connection().
- """
- for cl in self.connections.values():
- if conn.obj in cl:
- cl.remove(conn.obj)
-
class ZEOStorage:
"""Proxy to underlying storage for a single remote client."""
@@ -727,6 +525,208 @@
else:
return 1
+class StorageServer:
+
+ """The server side implementation of ZEO.
+
+ The StorageServer is the 'manager' for incoming connections. Each
+ connection is associated with its own ZEOStorage instance (defined
+ below). The StorageServer may handle multiple storages; each
+ ZEOStorage instance only handles a single storage.
+ """
+
+ # Classes we instantiate. A subclass might override.
+
+ DispatcherClass = Dispatcher
+ ZEOStorageClass = ZEOStorage
+ ManagedServerConnectionClass = ManagedServerConnection
+
+ def __init__(self, addr, storages, read_only=0,
+ invalidation_queue_size=100,
+ transaction_timeout=None):
+ """StorageServer constructor.
+
+ This is typically invoked from the start.py script.
+
+ Arguments (the first two are required and positional):
+
+ addr -- the address at which the server should listen. This
+ can be a tuple (host, port) to signify a TCP/IP connection
+ or a pathname string to signify a Unix domain socket
+ connection. A hostname may be a DNS name or a dotted IP
+ address.
+
+ storages -- a dictionary giving the storage(s) to handle. The
+ keys are the storage names, the values are the storage
+ instances, typically FileStorage or Berkeley storage
+ instances. By convention, storage names are typically
+ strings representing small integers starting at '1'.
+
+ read_only -- an optional flag saying whether the server should
+ operate in read-only mode. Defaults to false. Note that
+ even if the server is operating in writable mode,
+ individual storages may still be read-only. But if the
+ server is in read-only mode, no write operations are
+ allowed, even if the storages are writable. Note that
+ pack() is considered a read-only operation.
+
+ invalidation_queue_size -- The storage server keeps a queue
+ of the objects modified by the last N transactions, where
+ N == invalidation_queue_size. This queue is used to
+ speed client cache verification when a client disconnects
+ for a short period of time.
+
+ transaction_timout -- The maximum amount of time to wait for
+ a transaction to commit after acquiring the storage lock.
+ If the transaction takes too long, the client connection
+ will be closed and the transaction aborted.
+ """
+
+ self.addr = addr
+ self.storages = storages
+ set_label()
+ msg = ", ".join(
+ ["%s:%s:%s" % (name, storage.isReadOnly() and "RO" or "RW",
+ storage.getName())
+ for name, storage in storages.items()])
+ log("%s created %s with storages: %s" %
+ (self.__class__.__name__, read_only and "RO" or "RW", msg))
+ for s in storages.values():
+ s._waiting = []
+ self.read_only = read_only
+ # A list of at most invalidation_queue_size invalidations
+ self.invq = []
+ self.invq_bound = invalidation_queue_size
+ self.connections = {}
+ self.dispatcher = self.DispatcherClass(addr,
+ factory=self.new_connection,
+ reuse_addr=1)
+ self.timeouts = {}
+ for name in self.storages.keys():
+ if transaction_timeout is None:
+ # An object with no-op methods
+ timeout = StubTimeoutThread()
+ else:
+ timeout = TimeoutThread(transaction_timeout)
+ timeout.start()
+ self.timeouts[name] = timeout
+
+ def new_connection(self, sock, addr):
+ """Internal: factory to create a new connection.
+
+ This is called by the Dispatcher class in ZEO.zrpc.server
+ whenever accept() returns a socket for a new incoming
+ connection.
+ """
+ z = self.ZEOStorageClass(self, self.read_only)
+ c = self.ManagedServerConnectionClass(sock, addr, z, self)
+ log("new connection %s: %s" % (addr, `c`))
+ return c
+
+ def register_connection(self, storage_id, conn):
+ """Internal: register a connection with a particular storage.
+
+ This is called by ZEOStorage.register().
+
+ The dictionary self.connections maps each storage name to a
+ list of current connections for that storage; this information
+ is needed to handle invalidation. This function updates this
+ dictionary.
+
+ Returns the timeout object for the appropriate storage.
+ """
+ l = self.connections.get(storage_id)
+ if l is None:
+ l = self.connections[storage_id] = []
+ l.append(conn)
+ return self.timeouts[storage_id]
+
+ def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
+ """Internal: broadcast info and invalidations to clients.
+
+ This is called from several ZEOStorage methods.
+
+ This can do three different things:
+
+ - If the invalidated argument is non-empty, it broadcasts
+ invalidateTransaction() messages to all clients of the given
+ storage except the current client (the conn argument).
+
+ - If the invalidated argument is empty and the info argument
+ is a non-empty dictionary, it broadcasts info() messages to
+ all clients of the given storage, including the current
+ client.
+
+ - If both the invalidated argument and the info argument are
+ non-empty, it broadcasts invalidateTransaction() messages to all
+ clients except the current, and sends an info() message to
+ the current client.
+
+ """
+ if invalidated:
+ if len(self.invq) >= self.invq_bound:
+ del self.invq[0]
+ self.invq.append((tid, invalidated))
+ for p in self.connections.get(storage_id, ()):
+ if invalidated and p is not conn:
+ p.client.invalidateTransaction(tid, invalidated)
+ elif info is not None:
+ p.client.info(info)
+
+ def get_invalidations(self, tid):
+ """Return a tid and list of all objects invalidation since tid.
+
+ The tid is the most recent transaction id committed by the server.
+
+ Returns None if it is unable to provide a complete list
+ of invalidations for tid. In this case, client should
+ do full cache verification.
+ """
+
+ if not self.invq:
+ log("invq empty")
+ return None, []
+
+ earliest_tid = self.invq[0][0]
+ if earliest_tid > tid:
+ log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
+ return None, []
+
+ oids = {}
+ for tid, L in self.invq:
+ for key in L:
+ oids[key] = 1
+ latest_tid = self.invq[-1][0]
+ return latest_tid, oids.keys()
+
+ def close_server(self):
+ """Close the dispatcher so that there are no new connections.
+
+ This is only called from the test suite, AFAICT.
+ """
+ for timeout in self.timeouts.values():
+ timeout.stop()
+ self.dispatcher.close()
+ for storage in self.storages.values():
+ storage.close()
+ # Force the asyncore mainloop to exit by hackery, i.e. close
+ # every socket in the map. loop() will return when the map is
+ # empty.
+ for s in asyncore.socket_map.values():
+ try:
+ s.close()
+ except:
+ pass
+
+ def close_conn(self, conn):
+ """Internal: remove the given connection from self.connections.
+
+ This is the inverse of register_connection().
+ """
+ for cl in self.connections.values():
+ if conn.obj in cl:
+ cl.remove(conn.obj)
+
class StubTimeoutThread:
def begin(self, client):
@@ -844,6 +844,3 @@
self.delay.error(sys.exc_info())
else:
self.delay.reply(result)
-
-# Patch up class references
-StorageServer.ZEOStorageClass = ZEOStorage