[Zodb-checkins] CVS: Packages/ZEO - ClientStorage.py:1.26.4.7

jeremy@digicool.com jeremy@digicool.com
Wed, 18 Apr 2001 16:51:08 -0400 (EDT)


Update of /cvs-repository/Packages/ZEO
In directory korak:/tmp/cvs-serv8237

Modified Files:
      Tag: ZEO-ZRPC-Dev
	ClientStorage.py 
Log Message:
Sundry changes to ClientStorage

Add is_connected() method that returns true if the storage is
currently connected to the server.

Simply cache protocol by allowing cache itself to call verify().

Fix some stupid typos:
    self._serverabortVersion -> self._server.abortVersion

Modify close() method to close the rpc mgr (so it doesn't open a new
connection) and then the current connection.

Get rid of unused variable in _check_serials().

Augment logging for now.




--- Updated File ClientStorage.py in package Packages/ZEO --
--- ClientStorage.py	2001/04/02 23:16:12	1.26.4.6
+++ ClientStorage.py	2001/04/18 20:51:08	1.26.4.7
@@ -83,6 +83,8 @@
 # 
 ##############################################################################
 """Network ZODB storage client
+
+XXX support multiple outstanding requests up until the vote
 """
 __version__='$Revision$'[11:-2]
 
@@ -100,7 +102,7 @@
 from zLOG import LOG, PROBLEM, INFO, BLATHER
 
 import sys
-from types import TupleType
+from types import TupleType, StringType
 
 class ClientStorageError(POSException.StorageError):
     """An error occured in the ZEO Client Storage"""
@@ -144,7 +146,7 @@
 
         self._db = None
         self._cache = ClientCache.ClientCache(storage, cache_size,
-                                              client=client, var=var) 
+                                              client=client, var=var)
         self._cache.open() # XXX
 
         self._rpc_mgr = zrpc2.ConnectionManager(addr, self,
@@ -160,6 +162,16 @@
         """Register that the storage is controlled by the given DB."""
         self._db = db
 
+    def is_connected(self):
+        self._lock_acquire()
+        try:
+            if self._server:
+                return 1
+            else:
+                return 0
+        finally:
+            self._lock_release()
+
     def notifyConnected(self, c):
         LOG("ClientStorage", INFO, "Connected to storage")
         self._lock_acquire()
@@ -175,13 +187,9 @@
             self._lock_release()
 
     def verify_cache(self):
-        cached = self._cache.open() # XXX
-        ### This is a little expensive for large caches
-        if cached:
-            self._server.beginZeoVerify()
-            for oid, (s, vs) in cached:
-                self._server.zeoVerify(oid, s, vs)
-            self._server.endZeoVerify()
+        self._server.beginZeoVerify()
+        self._cache.verify(self._server.zeoVerify)
+        self._server.endZeoVerify()
 
     ### Is there a race condition between notifyConnected and
     ### notifyDisconnected? In Particular, what if we get
@@ -211,7 +219,7 @@
             raise POSException.StorageTransactionError(self, transaction)
         self._lock_acquire()
         try:
-            oids = self._serverabortVersion(src, self._serial)
+            oids = self._server.abortVersion(src, self._serial)
             invalidate = self._cache.invalidate
             for oid in oids:
                 invalidate(oid, src)
@@ -222,8 +230,10 @@
     def close(self):
         self._lock_acquire()
         try:
-            self_.call.closeIntensionally() # XXX this can't work
-            self._rpc_mgr.close_nicely() # XXX or self._server
+            # Close the manager first, so that it doesn't attempt to
+            # re-open the connection. 
+            self._rpc_mgr.close()
+            self._server.rpc.close()
         finally:
             self._lock_release()
         
@@ -248,8 +258,6 @@
 
     def getName(self):
         return "%s (%s)" % (self.__name__, "XXX")
-##                            # XXX self._connected is gone
-##                            self._connected and 'connected' or 'disconnected')
 
     def getSize(self):
         return self._info['size']
@@ -257,7 +265,7 @@
     def history(self, oid, version, length=1):
         self._lock_acquire()
         try:
-            return self._serverhistory(oid, version, length)     
+            return self._server.history(oid, version, length)     
         finally:
             self._lock_release()       
                   
@@ -324,8 +332,9 @@
             l = len(self._serials)
             r = self._serials[:l]
             del self._serials[:l]
-            d = self._seriald
             for oid, s in r:
+                LOG("ClientStorage", BLATHER,
+                    "serial: %s %s" % (repr(oid), repr(s)))
                 self._seriald[oid] = s
             return r
 
@@ -408,8 +417,6 @@
             # We have *BOTH* the local and distributed commit
             # lock, now we can actually get ready to get started.
             self._serial = id
-##            # _tbuf should always be in the clear state
-##            self._tfile.seek(0)
             self._seriald.clear()
             del self._serials[:]
 
@@ -431,11 +438,14 @@
                                     transaction.description,
                                     transaction._extension)
 
-            seriald=self._seriald
             r = self._check_serials()
+            assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
 
             self._cache.checkSize(self._tbuf.get_size())
 
+            LOG("ClientStorage", BLATHER, "tpc_finish()")
+            LOG("ClientStorage", BLATHER, "serials: %s" % repr(self._seriald))
+
             self._tbuf.begin_iterate()
             while 1:
                 try:
@@ -449,7 +459,11 @@
                 oid, v, p = t
                 LOG("tbuf", BLATHER, "oid=%s v=%s len(p)=%d" % (
                     repr(oid), repr(v), len(p)))
-                s = seriald[oid]
+                s = self._seriald[oid]
+                if type(s) != StringType:
+                    LOG("ClientStorage", INFO, "bad serialno: %s for %s" % \
+                        (repr(s), repr(oid)))
+                assert type(s) == StringType, "bad serialno: %s" % s
                 self._cache.update(oid, s, v, p)
             self._tbuf.clear()
 
@@ -505,6 +519,7 @@
         self.commit_lock_release()
 
     def serialno(self, arg):
+        LOG("ClientStorage", BLATHER, "serialno(%s)" % repr(arg))
         self._serials.append(arg)
 
     def info(self, dict):