[Zodb-checkins] CVS: StandaloneZODB/ZEO - ClientStorage.py:1.35.6.3 StorageServer.py:1.32.6.2 zrpc2.py:NONE

Jeremy Hylton jeremy@zope.com
Wed, 16 Jan 2002 09:58:39 -0500


Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv23840

Modified Files:
      Tag: Standby-branch
	ClientStorage.py StorageServer.py 
Removed Files:
      Tag: Standby-branch
	zrpc2.py 
Log Message:
Various cleanups and simplifications.

Convert from zrpc2 module to zrpc package.  This includes some changes
to client code:
    - Dispatcher factory (StorageServer.newConnection) doesn't have an
      unused third argument anymore.

Remove unused imports.

Remove fixup_storage().  I think this backwards compatibility feature
is not needed.  Happy to add it back if someone misses it.

Fix possible leaks of traceback (sys.exc_info() stored in local).

Cleanup comments in ClientStorage and removed unreachable code.

Replace uses of thread module with threading module.



=== StandaloneZODB/ZEO/ClientStorage.py 1.35.6.2 => 1.35.6.3 ===
 ##############################################################################
 """Network ZODB storage client
-
-XXX support multiple outstanding requests up until the vote?
-XXX is_connected() vis ClientDisconnected error
 """
 __version__='$Revision$'[11:-2]
 
 import cPickle
 import os
-import socket
-import string
-import struct
-import sys
 import tempfile
-import thread
 import threading
 import time
-from types import TupleType, StringType
-from struct import pack, unpack
 
-import ExtensionClass, Sync, ThreadLock
-import ClientCache
-import zrpc2
-import ServerStub
-from TransactionBuffer import TransactionBuffer
+from ZEO import ClientCache, ServerStub
+from ZEO.TransactionBuffer import TransactionBuffer
+from ZEO.Exceptions import Disconnected
+from ZEO.zrpc.client import ConnectionManager
 
 from ZODB import POSException
 from ZODB.TimeStamp import TimeStamp
 from zLOG import LOG, PROBLEM, INFO, BLATHER
-from Exceptions import Disconnected
 
 def log2(type, msg, subsys="ClientStorage %d" % os.getpid()):
     LOG(subsys, type, msg)
@@ -130,10 +118,11 @@
 class ClientDisconnected(ClientStorageError, Disconnected):
     """The database storage is disconnected from the storage."""
 
-def get_timestamp(prev_ts):
+def get_timestamp(prev_ts=None):
     t = time.time()
     t = apply(TimeStamp, (time.gmtime(t)[:5] + (t % 60,)))
-    t = t.laterThan(prev_ts)
+    if prev_ts is not None:
+        t = t.laterThan(prev_ts)
     return t
 
 class DisconnectedServerStub:
@@ -161,7 +150,9 @@
         self._tbuf = TransactionBuffer()
         self._db = None
         self._oids = []
-        # XXX It's confusing to have _serial, _serials, and _seriald. 
+        # _serials: stores (oid, serialno) as returned by server
+        # _seriald: _check_serials() moves from _serials to _seriald,
+        #           which maps oid to serialno
         self._serials = []
         self._seriald = {}
 
@@ -171,17 +162,15 @@
         client = client or os.environ.get('ZEO_CLIENT', '')
         self._cache = ClientCache.ClientCache(storage, cache_size,
                                               client=client, var=var)
-        self._cache.open() # XXX
+        self._cache.open() # XXX open now?
 
-        self._rpc_mgr = zrpc2.ConnectionManager(addr, self,
-                                                #debug=debug,
-                                                tmin=min_disconnect_poll,
-                                                tmax=max_disconnect_poll)
+        self._rpc_mgr = ConnectionManager(addr, self,
+                                          tmin=min_disconnect_poll,
+                                          tmax=max_disconnect_poll)
 
         # XXX What if we can only get a read-only connection and we
         # want a read-write connection?  Looks like the current code
         # will block forever.
-        
         if wait_for_server_on_startup:
             self._rpc_mgr.connect(sync=1)
         else:
@@ -191,6 +180,7 @@
     def _basic_init(self, name):
         """Handle initialization activites of BaseStorage"""
 
+        # XXX does anything depend on attr being __name__
         self.__name__ = name
 
         # A ClientStorage only allows one client to commit at a time.
@@ -205,15 +195,19 @@
         # oid_cond. 
         self.oid_cond = threading.Condition()
 
-        commit_lock = thread.allocate_lock()
+        commit_lock = threading.Lock()
         self._commit_lock_acquire = commit_lock.acquire
         self._commit_lock_release = commit_lock.release
 
-        t = time.time()
-        t = self._ts = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
+        t = self._ts = get_timestamp()
         self._serial = `t`
         self._oid='\0\0\0\0\0\0\0\0'
 
+    def close(self):
+        self._rpc_mgr.close()
+        if self._cache is not None:
+            self._cache.close()
+        
     def registerDB(self, db, limit):
         """Register that the storage is controlled by the given DB."""
         log2(INFO, "registerDB(%s, %s)" % (repr(db), repr(limit)))
@@ -253,10 +247,6 @@
     ### notifyDisconnected had to get the instance lock.  There's
     ### nothing to gain by getting the instance lock.
 
-    ### Note that we *don't* have to worry about getting connected
-    ### in the middle of notifyDisconnected, because *it's*
-    ### responsible for starting the thread that makes the connection.
-
     def notifyDisconnected(self):
         log2(PROBLEM, "Disconnected from storage")
         self._server = disconnected_stub
@@ -298,14 +288,6 @@
         return 1
         
     def _check_tid(self, tid, exc=None):
-        # XXX Is all this locking unnecessary?  The only way to
-        # begin a transaction is to call tpc_begin().  If we assume
-        # clients are single-threaded and well-behaved, i.e. they call
-        # tpc_begin() first, then there appears to be no need for
-        # locking.  If _check_tid() is called and self.tpc_tid != tid,
-        # then there is no way it can be come equal during the call.
-        # Thus, there should be no race.
-        
         if self.tpc_tid != tid:
             if exc is None:
                 return 0
@@ -313,19 +295,6 @@
                 raise exc(self.tpc_tid, tid)
         return 1
 
-        # XXX But I'm not sure
-        
-        self.tpc_cond.acquire()
-        try:
-            if self.tpc_tid != tid:
-                if exc is None:
-                    return 0
-                else:
-                    raise exc(self.tpc_tid, tid)
-            return 1
-        finally:
-            self.tpc_cond.release()
-
     def abortVersion(self, src, transaction):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
@@ -388,8 +357,7 @@
     def new_oid(self, last=None):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
-        # We want to avoid a situation where multiple oid requests are
-        # made at the same time.
+        # avoid multiple oid requests to server at the same time
         self.oid_cond.acquire()
         if not self._oids:
             self._oids = self._server.new_oids()
@@ -402,8 +370,7 @@
     def pack(self, t=None, rf=None, wait=0, days=0):
         if self._is_read_only:
             raise POSException.ReadOnlyError()
-        # Note that we ignore the rf argument.  The server
-        # will provide it's own implementation.
+        # rf argument ignored; server will provide it's own implementation
         if t is None:
             t = time.time()
         t = t - (days * 86400)
@@ -467,9 +434,8 @@
                                        transaction.description,
                                        transaction._extension)
         except:
-            # If _server is None, then the client disconnected during
-            # the tpc_begin() and notifyDisconnected() will have
-            # released the lock.
+            # Client may have disconnected during the tpc_begin().
+            # Then notifyDisconnected() will have released the lock.
             if self._server is not disconnected_stub:
                 self.tpc_cond.release()
             raise
@@ -481,7 +447,7 @@
     def tpc_finish(self, transaction, f=None):
         if transaction is not self._transaction:
             return
-        if f is not None: # XXX what is f()?
+        if f is not None:
             f()
 
         self._server.tpc_finish(self._serial)
@@ -575,7 +541,6 @@
         if self._pickler is None:
             return
         self._pickler.dump((0,0))
-##        self._pickler.dump = None
         self._tfile.seek(0)
         unpick = cPickle.Unpickler(self._tfile)
         self._tfile = None
@@ -588,7 +553,6 @@
             self._db.invalidate(oid, version=version)
 
     def Invalidate(self, args):
-        # XXX _db could be None
         for oid, version in args:
             self._cache.invalidate(oid, version=version)
             try:
@@ -598,4 +562,3 @@
                     "Invalidate(%s, %s) failed for _db: %s" % (repr(oid),
                                                                repr(version),
                                                                msg))
-                    


=== StandaloneZODB/ZEO/StorageServer.py 1.32.6.1 => 1.32.6.2 ===
 import sys
 import threading
-import types
 
-import ClientStub
-import zrpc2
-import zLOG
+from ZEO import ClientStub
+from ZEO.zrpc.server import Dispatcher
+from ZEO.zrpc.connection import ManagedServerConnection, Handler, Delay
 
-from zrpc2 import Dispatcher, Handler, ManagedServerConnection, Delay
+import zLOG
 from ZODB.POSException import StorageError, StorageTransactionError, \
      TransactionError, ReadOnlyError
 from ZODB.referencesf import referencesf
@@ -129,14 +128,14 @@
         self.storages = storages
         self.read_only = read_only
         self.connections = {}
-        for name, store in storages.items():
-            fixup_storage(store)
         self.dispatcher = Dispatcher(addr, factory=self.newConnection,
                                      reuse_addr=1)
 
-    def newConnection(self, sock, addr, nil):
+    def newConnection(self, sock, addr):
+        # XXX figure out how to do the connection / proxy dance better
         c = ManagedServerConnection(sock, addr, None, self)
         c.register_object(StorageProxy(self, c))
+        log("new connection %s: %s" % (addr, `c`))
         return c
         
     def register(self, storage_id, proxy):
@@ -187,6 +186,7 @@
         self.server = server
         self.client = ClientStub.ClientStorage(conn)
         self.__storage = None
+        self.__storage_id = "uninitialized"
         self.__invalidated = []
         self._transaction = None
 
@@ -201,7 +201,7 @@
                                                           stid)
 
     def _log(self, msg, level=zLOG.INFO, error=None, pid=os.getpid()):
-        zLOG.LOG("ZEO Server %s %X" % (pid, id(self)),
+        zLOG.LOG("ZEO Server:%s:%s" % (pid, self.__storage_id),
                    level, msg, error=error)
 
     def setup_delegation(self):
@@ -313,6 +313,7 @@
             if wait:
                 raise
         else:
+            # XXX Why doesn't we broadcast on wait?
             if not wait:
                 # Broadcast new size statistics
                 self.server.invalidate(0, self.__storage_id, (),
@@ -338,7 +339,6 @@
         self._check_tid(id, exc=StorageTransactionError)
         try:
             # XXX does this stmt need to be in the try/except?
-        
             newserial = self.__storage.store(oid, serial, data, version,
                                              self._transaction)
         except TransactionError, v:
@@ -354,11 +354,13 @@
             error = sys.exc_info()
             self._log('store error: %s: %s' % (error[0], error[1]),
                       zLOG.ERROR, error=error)
-            newserial = sys.exc_info()[1]
+            newserial = error[1]
+            del error
         else:
             if serial != '\0\0\0\0\0\0\0\0':
                 self.__invalidated.append((oid, version))
 
+        # Is all this error checking necessary?
         try:
             nil = dump(newserial, 1)
         except:
@@ -415,7 +417,7 @@
         t._extension = ext
 
         if self.__storage._transaction is not None:
-            d = zrpc2.Delay()
+            d = Delay()
             self.__storage.__waiting.append((d, self, t))
             return d
 
@@ -468,8 +470,3 @@
         if n < 0:
             n = 1
         return [self.__storage.new_oid() for i in range(n)]
-
-def fixup_storage(storage):
-    # backwards compatibility hack
-    if not hasattr(storage,'tpc_vote'):
-        storage.tpc_vote = lambda *args: None

=== Removed File StandaloneZODB/ZEO/zrpc2.py ===