[Zodb-checkins] CVS: ZODB3/ZEO - ClientStorage.py:1.52 ClientStub.py:1.5 StorageServer.py:1.45

Guido van Rossum guido@python.org
Wed, 28 Aug 2002 12:37:40 -0400


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv32642

Modified Files:
	ClientStorage.py ClientStub.py StorageServer.py 
Log Message:
Another bunch of small cleanups and fixes.

Open the cache later to avoid scanning it twice if a connection is
made right away.

ClientStorage.close() is now idempotent.

ClientStorage stores its addr argument as self._addr so the test suite
doesn't have to dig it out of the rpc manager to reopen the storage as
readonly.

Renamed some of the callbacks into the client for clarity:
begin -> beginVerify
end -> endVerify
invalidate -> invalidateVerify
Invalidate -> invalidateTrans


=== ZODB3/ZEO/ClientStorage.py 1.51 => 1.52 ===
--- ZODB3/ZEO/ClientStorage.py:1.51	Fri Aug 16 18:55:44 2002
+++ ZODB3/ZEO/ClientStorage.py	Wed Aug 28 12:37:09 2002
@@ -70,12 +70,14 @@
                  min_disconnect_poll=5, max_disconnect_poll=300,
                  wait=0, read_only=0):
 
+        self._addr = addr # For tests
         self._server = disconnected_stub
         self._is_read_only = read_only
         self._storage = storage
 
         self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
-                      'supportsUndo':0, 'supportsVersions': 0}
+                      'supportsUndo':0, 'supportsVersions': 0,
+                      'supportsTransactionalUndo': 0}
 
         self._tbuf = TransactionBuffer()
         self._db = None
@@ -92,7 +94,6 @@
         client = client or os.environ.get('ZEO_CLIENT')
         self._cache = ClientCache.ClientCache(storage, cache_size,
                                               client=client, var=var)
-        self._cache.open() # XXX open now? or later?
 
         self._rpc_mgr = ConnectionManager(addr, self,
                                           tmin=min_disconnect_poll,
@@ -107,26 +108,30 @@
             if not self._rpc_mgr.attempt_connect():
                 self._rpc_mgr.connect()
 
+        # If we're connected at this point, the cache is opened as a
+        # side effect of verify_cache().  If not, open it now.
+        if not self.is_connected():
+            self._cache.open()
+
     def _basic_init(self, name):
         """Handle initialization activites of BaseStorage"""
 
-        # XXX does anything depend on attr being __name__
-        self.__name__ = name
+        self.__name__ = name # A standard convention among storages
 
-        # A ClientStorage only allows one client to commit at a time.
-        # Mutual exclusion is achieved using tpc_cond, which
+        # A ClientStorage only allows one thread to commit at a time.
+        # Mutual exclusion is achieved using _tpc_cond, which
         # protects _transaction.  A thread that wants to assign to
-        # self._transaction must acquire tpc_cond first.  A thread
+        # self._transaction must acquire _tpc_cond first.  A thread
         # that decides it's done with a transaction (whether via success
         # or failure) must set _transaction to None and do
-        # tpc_cond.notify() before releasing tpc_cond..
-        self.tpc_cond = threading.Condition()
+        # _tpc_cond.notify() before releasing _tpc_cond.
+        self._tpc_cond = threading.Condition()
         self._transaction = None
 
         # Prevent multiple new_oid calls from going out.  The _oids
         # variable should only be modified while holding the
-        # oid_cond.
-        self.oid_cond = threading.Condition()
+        # _oid_lock.
+        self._oid_lock = threading.Lock()
 
         commit_lock = threading.Lock()
         self._commit_lock_acquire = commit_lock.acquire
@@ -139,12 +144,18 @@
     def close(self):
         if self._tbuf is not None:
             self._tbuf.close()
+            self._tbuf = None
         if self._cache is not None:
             self._cache.close()
-        self._rpc_mgr.close()
+            self._cache = None
+        if self._rpc_mgr is not None:
+            self._rpc_mgr.close()
+            self._rpc_mgr = None
 
     def registerDB(self, db, limit):
         """Register that the storage is controlled by the given DB."""
+        # This is called by ZODB.DB (and by some tests).
+        # The storage isn't really ready to use until after this call.
         log2(INFO, "registerDB(%s, %s)" % (repr(db), repr(limit)))
         self._db = db
 
@@ -154,26 +165,27 @@
         else:
             return 1
 
-    def notifyConnected(self, c):
-        log2(INFO, "Connected to storage via %s" % repr(c))
+    def notifyConnected(self, conn):
+        log2(INFO, "Connected to storage via %s" % repr(conn))
 
         # check the protocol version here?
 
-        stub = ServerStub.StorageServer(c)
+        stub = ServerStub.StorageServer(conn)
 
         self._oids = []
 
-        # XXX Why is this synchronous?  If it were async, verification
-        # would start faster.
         stub.register(str(self._storage), self._is_read_only)
         self._info.update(stub.get_info())
         self.verify_cache(stub)
 
         # Don't make the server available to clients until after
         # validating the cache
+        # XXX The stub should be saved here and set in end() below.
         self._server = stub
 
     def verify_cache(self, server):
+        # XXX beginZeoVerify ends up calling back to beginVerify() below.
+        # That whole exchange is rather unnecessary.
         server.beginZeoVerify()
         self._cache.verify(server.zeoVerify)
         server.endZeoVerify()
@@ -206,37 +218,27 @@
         return self._info['supportsVersions']
 
     def supportsTransactionalUndo(self):
-        try:
-            return self._info['supportsTransactionalUndo']
-        except KeyError:
-            return 0
+        return self._info['supportsTransactionalUndo']
 
     def isReadOnly(self):
         return self._is_read_only
 
-    def _check_trans(self, trans, exc=None):
+    def _check_trans(self, trans):
+        if self._is_read_only:
+            raise POSException.ReadOnlyError()
         if self._transaction is not trans:
-            if exc is None:
-                return 0
-            else:
-                raise exc(self._transaction, trans)
-        return 1
+            raise POSException.StorageTransactionError(self._transaction,
+                                                       trans)
 
     def abortVersion(self, src, transaction):
-        if self._is_read_only:
-            raise POSException.ReadOnlyError()
-        self._check_trans(transaction,
-                          POSException.StorageTransactionError)
+        self._check_trans(transaction)
         oids = self._server.abortVersion(src, self._serial)
         for oid in oids:
             self._tbuf.invalidate(oid, src)
         return oids
 
     def commitVersion(self, src, dest, transaction):
-        if self._is_read_only:
-            raise POSException.ReadOnlyError()
-        self._check_trans(transaction,
-                          POSException.StorageTransactionError)
+        self._check_trans(transaction)
         oids = self._server.commitVersion(src, dest, self._serial)
         if dest:
             # just invalidate our version data
@@ -280,13 +282,12 @@
         if self._is_read_only:
             raise POSException.ReadOnlyError()
         # avoid multiple oid requests to server at the same time
-        self.oid_cond.acquire()
+        self._oid_lock.acquire()
         if not self._oids:
             self._oids = self._server.new_oids()
             self._oids.reverse()
-            self.oid_cond.notifyAll()
         oid = self._oids.pop()
-        self.oid_cond.release()
+        self._oid_lock.release()
         return oid
 
     def pack(self, t=None, rf=None, wait=0, days=0):
@@ -309,9 +310,7 @@
             return r
 
     def store(self, oid, serial, data, version, transaction):
-        if self._is_read_only:
-            raise POSException.ReadOnlyError()
-        self._check_trans(transaction, POSException.StorageTransactionError)
+        self._check_trans(transaction)
         self._server.storea(oid, serial, data, version, self._serial)
         self._tbuf.store(oid, version, data)
         return self._check_serials()
@@ -323,17 +322,17 @@
         return self._check_serials()
 
     def tpc_begin(self, transaction, tid=None, status=' '):
-        self.tpc_cond.acquire()
+        self._tpc_cond.acquire()
         while self._transaction is not None:
             # It is allowable for a client to call two tpc_begins in a
             # row with the same transaction, and the second of these
             # must be ignored.
             if self._transaction == transaction:
-                self.tpc_cond.release()
+                self._tpc_cond.release()
                 return
-            self.tpc_cond.wait()
+            self._tpc_cond.wait()
         self._transaction = transaction
-        self.tpc_cond.release()
+        self._tpc_cond.release()
 
         if tid is None:
             self._ts = get_timestamp(self._ts)
@@ -360,11 +359,11 @@
 
     def end_transaction(self):
         # the right way to set self._transaction to None
-        # calls notify() on tpc_cond in case there are waiting threads
-        self.tpc_cond.acquire()
+        # calls notify() on _tpc_cond in case there are waiting threads
+        self._tpc_cond.acquire()
         self._transaction = None
-        self.tpc_cond.notify()
-        self.tpc_cond.release()
+        self._tpc_cond.notify()
+        self._tpc_cond.release()
 
     def tpc_abort(self, transaction):
         if transaction is not self._transaction:
@@ -424,9 +423,7 @@
         self._tbuf.clear()
 
     def transactionalUndo(self, trans_id, trans):
-        if self._is_read_only:
-            raise POSException.ReadOnlyError()
-        self._check_trans(trans, POSException.StorageTransactionError)
+        self._check_trans(trans)
         oids = self._server.transactionalUndo(trans_id, self._serial)
         for oid in oids:
             self._tbuf.invalidate(oid, '')
@@ -464,18 +461,20 @@
     def info(self, dict):
         self._info.update(dict)
 
-    def begin(self):
+    def beginVerify(self):
         self._tfile = tempfile.TemporaryFile(suffix=".inv")
         self._pickler = cPickle.Pickler(self._tfile, 1)
         self._pickler.fast = 1 # Don't use the memo
 
-    def invalidate(self, args):
-        # Queue an invalidate for the end the transaction
+    def invalidateVerify(self, args):
+        # Invalidation as result of verify_cache().
+        # Queue an invalidate for the end the verification procedure.
         if self._pickler is None:
+            # XXX This should never happen
             return
         self._pickler.dump(args)
 
-    def end(self):
+    def endVerify(self):
         if self._pickler is None:
             return
         self._pickler.dump((0,0))
@@ -492,7 +491,8 @@
             self._db.invalidate(oid, version=version)
         f.close()
 
-    def Invalidate(self, args):
+    def invalidateTrans(self, args):
+        # Invalidation as a result of a transaction.
         for oid, version in args:
             self._cache.invalidate(oid, version=version)
             try:


=== ZODB3/ZEO/ClientStub.py 1.4 => 1.5 ===
--- ZODB3/ZEO/ClientStub.py:1.4	Tue Jun 11 09:43:06 2002
+++ ZODB3/ZEO/ClientStub.py	Wed Aug 28 12:37:09 2002
@@ -18,19 +18,16 @@
         self.rpc = rpc
 
     def beginVerify(self):
-        self.rpc.callAsync('begin')
+        self.rpc.callAsync('beginVerify')
 
-    # XXX must rename the two invalidate messages.  I can never
-    # remember which is which
-
-    def invalidate(self, args):
-        self.rpc.callAsync('invalidate', args)
-
-    def Invalidate(self, args):
-        self.rpc.callAsync('Invalidate', args)
+    def invalidateVerify(self, args):
+        self.rpc.callAsync('invalidateVerify', args)
 
     def endVerify(self):
-        self.rpc.callAsync('end')
+        self.rpc.callAsync('endVerify')
+
+    def invalidateTrans(self, args):
+        self.rpc.callAsync('invalidateTrans', args)
 
     def serialnos(self, arg):
         self.rpc.callAsync('serialnos', arg)


=== ZODB3/ZEO/StorageServer.py 1.44 => 1.45 ===
--- ZODB3/ZEO/StorageServer.py:1.44	Mon Aug 12 15:42:40 2002
+++ ZODB3/ZEO/StorageServer.py	Wed Aug 28 12:37:09 2002
@@ -86,11 +86,11 @@
             l = self.connections[storage_id] = []
         l.append(proxy)
 
-    def invalidate(self, conn, storage_id, invalidated=(), info=0):
+    def invalidate(self, conn, storage_id, invalidated=(), info=None):
         for p in self.connections.get(storage_id, ()):
             if invalidated and p is not conn:
-                p.client.Invalidate(invalidated)
-            else:
+                p.client.invalidateTrans(invalidated)
+            elif info is not None:
                 p.client.info(info)
 
     def close_server(self):
@@ -108,11 +108,9 @@
                 pass
 
     def close(self, conn):
-        removed = 0
         for sid, cl in self.connections.items():
             if conn.obj in cl:
                 cl.remove(conn.obj)
-                removed = 1
 
 class ZEOStorage:
     """Proxy to underlying storage for a single remote client."""
@@ -130,7 +128,7 @@
         # any pending transaction.  Not sure if this is the clearest way.
         if self._transaction is not None:
             self.__storage.tpc_abort(self._transaction)
-            self._transaction is None
+            self._transaction = None
         self._conn.close()
 
     def notifyConnected(self, conn):
@@ -240,9 +238,9 @@
         except: # except what?
             return None
         if os != s:
-            self.client.invalidate((oid, ''))
+            self.client.invalidateVerify((oid, ''))
         elif osv != sv:
-            self.client.invalidate((oid, v))
+            self.client.invalidateVerify((oid, v))
 
     def endZeoVerify(self):
         self.client.endVerify()
@@ -257,6 +255,8 @@
         t.start()
         if wait is not None:
             return wait
+        else:
+            return None
 
     def _pack(self, t, delay):
         try:
@@ -277,7 +277,8 @@
 
     def new_oids(self, n=100):
         """Return a sequence of n new oids, where n defaults to 100"""
-        if n < 0:
+        if n <= 0:
+            # Always return at least one
             n = 1
         return [self.__storage.new_oid() for i in range(n)]
 
@@ -374,6 +375,7 @@
             return d
         else:
             self.restart()
+            return None
 
     def _handle_waiting(self):
         while self.__storage._waiting:
@@ -495,7 +497,7 @@
                 self.invalidated.append((oid, version))
 
         try:
-            nil = dump(newserial, 1)
+            dump(newserial, 1)
         except:
             msg = "Couldn't pickle storage exception: %s" % repr(newserial)
             slog(self.storage, msg, zLOG.ERROR)