[Zodb-checkins] CVS: Packages/ZEO - ClientStorage.py:1.26.4.7
jeremy@digicool.com
jeremy@digicool.com
Wed, 18 Apr 2001 16:51:08 -0400 (EDT)
Update of /cvs-repository/Packages/ZEO
In directory korak:/tmp/cvs-serv8237
Modified Files:
Tag: ZEO-ZRPC-Dev
ClientStorage.py
Log Message:
Sundry changes to ClientStorage
Add is_connected() method that returns true if the storage is
currently connected to the server.
Simply cache protocol by allowing cache itself to call verify().
Fix some stupid typos:
self._serverabortVersion -> self._server.abortVersion
Modify close() method to close the rpc mgr (so it doesn't open a new
connection) and then the current connection.
Get rid of unused variable in _check_serials().
Augment logging for now.
--- Updated File ClientStorage.py in package Packages/ZEO --
--- ClientStorage.py 2001/04/02 23:16:12 1.26.4.6
+++ ClientStorage.py 2001/04/18 20:51:08 1.26.4.7
@@ -83,6 +83,8 @@
#
##############################################################################
"""Network ZODB storage client
+
+XXX support multiple outstanding requests up until the vote
"""
__version__='$Revision$'[11:-2]
@@ -100,7 +102,7 @@
from zLOG import LOG, PROBLEM, INFO, BLATHER
import sys
-from types import TupleType
+from types import TupleType, StringType
class ClientStorageError(POSException.StorageError):
"""An error occured in the ZEO Client Storage"""
@@ -144,7 +146,7 @@
self._db = None
self._cache = ClientCache.ClientCache(storage, cache_size,
- client=client, var=var)
+ client=client, var=var)
self._cache.open() # XXX
self._rpc_mgr = zrpc2.ConnectionManager(addr, self,
@@ -160,6 +162,16 @@
"""Register that the storage is controlled by the given DB."""
self._db = db
+ def is_connected(self):
+ self._lock_acquire()
+ try:
+ if self._server:
+ return 1
+ else:
+ return 0
+ finally:
+ self._lock_release()
+
def notifyConnected(self, c):
LOG("ClientStorage", INFO, "Connected to storage")
self._lock_acquire()
@@ -175,13 +187,9 @@
self._lock_release()
def verify_cache(self):
- cached = self._cache.open() # XXX
- ### This is a little expensive for large caches
- if cached:
- self._server.beginZeoVerify()
- for oid, (s, vs) in cached:
- self._server.zeoVerify(oid, s, vs)
- self._server.endZeoVerify()
+ self._server.beginZeoVerify()
+ self._cache.verify(self._server.zeoVerify)
+ self._server.endZeoVerify()
### Is there a race condition between notifyConnected and
### notifyDisconnected? In Particular, what if we get
@@ -211,7 +219,7 @@
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
try:
- oids = self._serverabortVersion(src, self._serial)
+ oids = self._server.abortVersion(src, self._serial)
invalidate = self._cache.invalidate
for oid in oids:
invalidate(oid, src)
@@ -222,8 +230,10 @@
def close(self):
self._lock_acquire()
try:
- self_.call.closeIntensionally() # XXX this can't work
- self._rpc_mgr.close_nicely() # XXX or self._server
+ # Close the manager first, so that it doesn't attempt to
+ # re-open the connection.
+ self._rpc_mgr.close()
+ self._server.rpc.close()
finally:
self._lock_release()
@@ -248,8 +258,6 @@
def getName(self):
return "%s (%s)" % (self.__name__, "XXX")
-## # XXX self._connected is gone
-## self._connected and 'connected' or 'disconnected')
def getSize(self):
return self._info['size']
@@ -257,7 +265,7 @@
def history(self, oid, version, length=1):
self._lock_acquire()
try:
- return self._serverhistory(oid, version, length)
+ return self._server.history(oid, version, length)
finally:
self._lock_release()
@@ -324,8 +332,9 @@
l = len(self._serials)
r = self._serials[:l]
del self._serials[:l]
- d = self._seriald
for oid, s in r:
+ LOG("ClientStorage", BLATHER,
+ "serial: %s %s" % (repr(oid), repr(s)))
self._seriald[oid] = s
return r
@@ -408,8 +417,6 @@
# We have *BOTH* the local and distributed commit
# lock, now we can actually get ready to get started.
self._serial = id
-## # _tbuf should always be in the clear state
-## self._tfile.seek(0)
self._seriald.clear()
del self._serials[:]
@@ -431,11 +438,14 @@
transaction.description,
transaction._extension)
- seriald=self._seriald
r = self._check_serials()
+ assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
self._cache.checkSize(self._tbuf.get_size())
+ LOG("ClientStorage", BLATHER, "tpc_finish()")
+ LOG("ClientStorage", BLATHER, "serials: %s" % repr(self._seriald))
+
self._tbuf.begin_iterate()
while 1:
try:
@@ -449,7 +459,11 @@
oid, v, p = t
LOG("tbuf", BLATHER, "oid=%s v=%s len(p)=%d" % (
repr(oid), repr(v), len(p)))
- s = seriald[oid]
+ s = self._seriald[oid]
+ if type(s) != StringType:
+ LOG("ClientStorage", INFO, "bad serialno: %s for %s" % \
+ (repr(s), repr(oid)))
+ assert type(s) == StringType, "bad serialno: %s" % s
self._cache.update(oid, s, v, p)
self._tbuf.clear()
@@ -505,6 +519,7 @@
self.commit_lock_release()
def serialno(self, arg):
+ LOG("ClientStorage", BLATHER, "serialno(%s)" % repr(arg))
self._serials.append(arg)
def info(self, dict):