[Zodb-checkins] CVS: Packages/ZEO - ClientStorage.py:1.26.4.12
jeremy@digicool.com
jeremy@digicool.com
Wed, 25 Apr 2001 17:02:59 -0400 (EDT)
Update of /cvs-repository/Packages/ZEO
In directory korak:/tmp/cvs-serv30247
Modified Files:
Tag: ZEO-ZRPC-Dev
ClientStorage.py
Log Message:
Remove dependence on BaseStorage, which only define some locks.
XXX What is the difference between thread.allocate_lock() and
ThreadLock.allocate_lock() and bpthread.allocate_lock()?
Add log2() helper that defines pid of ClientStorage.
Replace call to BaseStorage.__init__ with _basic_init() method. XXX
This should probably be refactor along with __init__.
Revise load logic for versions that that it is in positive form.
Simplify tpc_begin(): It is always synchronous, so there's no need
for a while loop.
Workaround possibility that we can an invalidation message before the
DB is registered.
--- Updated File ClientStorage.py in package Packages/ZEO --
--- ClientStorage.py 2001/04/20 19:14:08 1.26.4.11
+++ ClientStorage.py 2001/04/25 21:02:58 1.26.4.12
@@ -85,6 +85,7 @@
"""Network ZODB storage client
XXX support multiple outstanding requests up until the vote
+XXX is_connected() vis ClientDisconnected error
"""
__version__='$Revision$'[11:-2]
@@ -100,22 +101,24 @@
from types import TupleType, StringType
from struct import pack, unpack
-import ExtensionClass, Sync
+import ExtensionClass, Sync, ThreadLock
import ClientCache
import zrpc2
import ServerStub
from TransactionBuffer import TransactionBuffer
-from ZODB import POSException, BaseStorage
+from ZODB import POSException
from ZODB.TimeStamp import TimeStamp
from zeolog import LOG, PROBLEM, INFO, BLATHER
+def log2(type, msg, subsys="ClientStorage %d" % os.getpid()):
+ LOG(subsys, type, msg)
+
try:
from ZODB.ConflictResolution import ResolvedSerial
except ImportError:
ResolvedSerial = 'rs'
-
class ClientStorageError(POSException.StorageError):
"""An error occured in the ZEO Client Storage"""
@@ -131,8 +134,7 @@
t = t.laterThan(prev_ts)
return t
-class ClientStorage(BaseStorage.BaseStorage):
- __super_init = BaseStorage.BaseStorage.__init__
+class ClientStorage:
_server = None
@@ -160,7 +162,7 @@
self._serials = []
self._seriald = {}
- self.__super_init(name)
+ self._basic_init(name)
self._db = None
self._cache = ClientCache.ClientCache(storage, cache_size,
@@ -176,8 +178,32 @@
if not self._rpc_mgr.attempt_connect():
self._rpc_mgr.connect()
+ def _basic_init(self, name):
+ """Handle initialization activites of BaseStorage"""
+
+ self.__name__ = name
+
+ # Allocate locks:
+## import debuglock
+## commit_lock = debuglock.DebugLock()
+ commit_lock = thread.allocate_lock()
+ self._commit_lock_acquire = commit_lock.acquire
+ self._commit_lock_release = commit_lock.release
+
+ # What's the difference between thread and ThreadLock?
+ l = ThreadLock.allocate_lock()
+ self._lock_acquire = l.acquire
+ self._lock_release = l.release
+
+ t = time.time()
+ t = self._ts = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
+ self._serial = `t`
+ self._oid='\0\0\0\0\0\0\0\0'
+ self._transaction = None
+
def registerDB(self, db, limit):
"""Register that the storage is controlled by the given DB."""
+ log2(INFO, "registerDB(%s, %s)" % (repr(db), repr(limit)))
self._db = db
def is_connected(self):
@@ -191,7 +217,7 @@
self._lock_release()
def notifyConnected(self, c):
- LOG("ClientStorage", INFO, "Connected to storage")
+ log2(INFO, "Connected to storage")
self._lock_acquire()
try:
self._server = ServerStub.StorageServer(c)
@@ -222,7 +248,7 @@
### responsible for starting the thread that makes the connection.
def notifyDisconnected(self, ignored):
- LOG("ClientStorage", PROBLEM, "Disconnected from storage")
+ log2(PROBLEM, "Disconnected from storage")
self._transaction = None
try:
self._commit_lock_release()
@@ -303,11 +329,12 @@
p, s, v, pv, sv = self._server.zeoLoad(oid)
self._cache.checkSize(0)
self._cache.store(oid, p, s, v, pv, sv)
- if not v or not version or version != v:
+ if v and version and v == version:
+ return pv, sv
+ else:
if s:
return p, s
raise KeyError, oid # no non-version data for this
- return pv, sv
finally:
self._lock_release()
@@ -400,27 +427,24 @@
try:
if self._transaction is transaction:
return # can start the same transaction many times
+ self._lock_release()
+ self._commit_lock_acquire()
+ self._lock_acquire()
- while 1:
- self._lock_release()
- self._commit_lock_acquire()
- self._lock_acquire()
-
- self._ts = get_timestamp(self._ts)
- id = `self._ts`
-
- try:
- r = self._server.tpc_begin(id,
- transaction.user,
- transaction.description,
- transaction._extension)
- except:
- self._commit_lock_release()
- raise
-
- if r is None:
- break
+ self._ts = get_timestamp(self._ts)
+ id = `self._ts`
+
+ try:
+ r = self._server.tpc_begin(id,
+ transaction.user,
+ transaction.description,
+ transaction._extension)
+ except:
+ self._commit_lock_release()
+ raise
+ assert r is None
+
# We have *BOTH* the local and distributed commit
# lock, now we can actually get ready to get started.
self._serial = id
@@ -462,7 +486,7 @@
oid, v, p = t
s = self._seriald[oid]
if type(s) != StringType:
- LOG("ClientStorage", INFO, "bad serialno: %s for %s" % \
+ log2(INFO, "bad serialno: %s for %s" % \
(repr(s), repr(oid)))
assert type(s) == StringType, "bad serialno: %s" % repr(s)
if s == ResolvedSerial:
@@ -520,7 +544,9 @@
# below are methods invoked by the StorageServer
def unlock(self):
- self.commit_lock_release()
+ # XXX Don't believe this is used anymore...
+ log2(INFO, "unlock()")
+ self._commit_lock_release()
def serialno(self, arg):
self._serials.append(arg)
@@ -555,6 +581,14 @@
self._db.invalidate(oid, version=version)
def Invalidate(self, args):
+ # XXX _db could be None
for oid, version in args:
self._cache.invalidate(oid, version=version)
- self._db.invalidate(oid, version=version)
+ try:
+ self._db.invalidate(oid, version=version)
+ except AttributeError, msg:
+ log2(PROBLEM,
+ "Invalidate(%s, %s) failed for _db: %s" % (repr(oid),
+ repr(version),
+ msg))
+