[Zodb-checkins] CVS: Packages/ZEO - ClientStorage.py:1.26.4.12

jeremy@digicool.com jeremy@digicool.com
Wed, 25 Apr 2001 17:02:59 -0400 (EDT)


Update of /cvs-repository/Packages/ZEO
In directory korak:/tmp/cvs-serv30247

Modified Files:
      Tag: ZEO-ZRPC-Dev
	ClientStorage.py 
Log Message:
Remove dependence on BaseStorage, which only define some locks.

XXX What is the difference between thread.allocate_lock() and
ThreadLock.allocate_lock() and bpthread.allocate_lock()?

Add log2() helper that defines pid of ClientStorage.

Replace call to BaseStorage.__init__ with _basic_init() method.  XXX
This should probably be refactor along with __init__.

Revise load logic for versions that that it is in positive form.

Simplify tpc_begin():  It is always synchronous, so there's no need
for a while loop.

Workaround possibility that we can an invalidation message before the
DB is registered.





--- Updated File ClientStorage.py in package Packages/ZEO --
--- ClientStorage.py	2001/04/20 19:14:08	1.26.4.11
+++ ClientStorage.py	2001/04/25 21:02:58	1.26.4.12
@@ -85,6 +85,7 @@
 """Network ZODB storage client
 
 XXX support multiple outstanding requests up until the vote
+XXX is_connected() vis ClientDisconnected error
 """
 __version__='$Revision$'[11:-2]
 
@@ -100,22 +101,24 @@
 from types import TupleType, StringType
 from struct import pack, unpack
 
-import ExtensionClass, Sync
+import ExtensionClass, Sync, ThreadLock
 import ClientCache
 import zrpc2
 import ServerStub
 from TransactionBuffer import TransactionBuffer
 
-from ZODB import POSException, BaseStorage
+from ZODB import POSException
 from ZODB.TimeStamp import TimeStamp
 from zeolog import LOG, PROBLEM, INFO, BLATHER
 
+def log2(type, msg, subsys="ClientStorage %d" % os.getpid()):
+    LOG(subsys, type, msg)
+
 try:
     from ZODB.ConflictResolution import ResolvedSerial
 except ImportError:
     ResolvedSerial = 'rs'
 
-
 class ClientStorageError(POSException.StorageError):
     """An error occured in the ZEO Client Storage"""
 
@@ -131,8 +134,7 @@
     t = t.laterThan(prev_ts)
     return t
 
-class ClientStorage(BaseStorage.BaseStorage):
-    __super_init = BaseStorage.BaseStorage.__init__
+class ClientStorage:
 
     _server = None
 
@@ -160,7 +162,7 @@
         self._serials = []
         self._seriald = {}
 
-        self.__super_init(name)
+        self._basic_init(name)
 
         self._db = None
         self._cache = ClientCache.ClientCache(storage, cache_size,
@@ -176,8 +178,32 @@
         if not self._rpc_mgr.attempt_connect():
             self._rpc_mgr.connect()
 
+    def _basic_init(self, name):
+        """Handle initialization activites of BaseStorage"""
+
+        self.__name__ = name
+
+        # Allocate locks:
+##        import debuglock
+##        commit_lock = debuglock.DebugLock()
+        commit_lock = thread.allocate_lock()
+        self._commit_lock_acquire = commit_lock.acquire
+        self._commit_lock_release = commit_lock.release
+
+        # What's the difference between thread and ThreadLock?
+        l = ThreadLock.allocate_lock()
+        self._lock_acquire = l.acquire
+        self._lock_release = l.release
+
+        t = time.time()
+        t = self._ts = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
+        self._serial = `t`
+        self._oid='\0\0\0\0\0\0\0\0'
+        self._transaction = None
+
     def registerDB(self, db, limit):
         """Register that the storage is controlled by the given DB."""
+        log2(INFO, "registerDB(%s, %s)" % (repr(db), repr(limit)))
         self._db = db
 
     def is_connected(self):
@@ -191,7 +217,7 @@
             self._lock_release()
 
     def notifyConnected(self, c):
-        LOG("ClientStorage", INFO, "Connected to storage")
+        log2(INFO, "Connected to storage")
         self._lock_acquire()
         try:
             self._server = ServerStub.StorageServer(c)
@@ -222,7 +248,7 @@
     ### responsible for starting the thread that makes the connection.
 
     def notifyDisconnected(self, ignored):
-        LOG("ClientStorage", PROBLEM, "Disconnected from storage")
+        log2(PROBLEM, "Disconnected from storage")
         self._transaction = None
         try:
             self._commit_lock_release()
@@ -303,11 +329,12 @@
             p, s, v, pv, sv = self._server.zeoLoad(oid)
             self._cache.checkSize(0)
             self._cache.store(oid, p, s, v, pv, sv)
-            if not v or not version or version != v:
+            if v and version and v == version:
+                return pv, sv
+            else:
                 if s:
                     return p, s
                 raise KeyError, oid # no non-version data for this
-            return pv, sv
         finally:
             self._lock_release()
                     
@@ -400,27 +427,24 @@
         try:
             if self._transaction is transaction:
                 return # can start the same transaction many times
+            self._lock_release()
+            self._commit_lock_acquire()
+            self._lock_acquire()
 
-            while 1:
-                self._lock_release()
-                self._commit_lock_acquire()
-                self._lock_acquire()
-
-                self._ts = get_timestamp(self._ts)
-                id = `self._ts`
-                
-                try:
-                    r = self._server.tpc_begin(id,
-                                               transaction.user,
-                                               transaction.description,
-                                               transaction._extension)
-                except:
-                    self._commit_lock_release()
-                    raise
-                
-                if r is None:
-                    break
+            self._ts = get_timestamp(self._ts)
+            id = `self._ts`
+
+            try:
+                r = self._server.tpc_begin(id,
+                                           transaction.user,
+                                           transaction.description,
+                                           transaction._extension)
+            except:
+                self._commit_lock_release()
+                raise
 
+            assert r is None
+
             # We have *BOTH* the local and distributed commit
             # lock, now we can actually get ready to get started.
             self._serial = id
@@ -462,7 +486,7 @@
                 oid, v, p = t
                 s = self._seriald[oid]
                 if type(s) != StringType:
-                    LOG("ClientStorage", INFO, "bad serialno: %s for %s" % \
+                    log2(INFO, "bad serialno: %s for %s" % \
                         (repr(s), repr(oid)))
                 assert type(s) == StringType, "bad serialno: %s" % repr(s)
                 if s == ResolvedSerial:
@@ -520,7 +544,9 @@
     # below are methods invoked by the StorageServer
 
     def unlock(self):
-        self.commit_lock_release()
+        # XXX Don't believe this is used anymore...
+        log2(INFO, "unlock()")
+        self._commit_lock_release()
 
     def serialno(self, arg):
         self._serials.append(arg)
@@ -555,6 +581,14 @@
             self._db.invalidate(oid, version=version)
 
     def Invalidate(self, args):
+        # XXX _db could be None
         for oid, version in args:
             self._cache.invalidate(oid, version=version)
-            self._db.invalidate(oid, version=version)
+            try:
+                self._db.invalidate(oid, version=version)
+            except AttributeError, msg:
+                log2(PROBLEM,
+                    "Invalidate(%s, %s) failed for _db: %s" % (repr(oid),
+                                                               repr(version),
+                                                               msg))
+