[Zodb-checkins] CVS: Packages/ZEO - ClientStorage.py:1.26.4.3
jeremy@digicool.com
jeremy@digicool.com
Thu, 29 Mar 2001 16:36:24 -0500 (EST)
Update of /cvs-repository/Packages/ZEO
In directory korak:/tmp/cvs-serv19392
Modified Files:
Tag: ZEO-ZRPC-Dev
ClientStorage.py
Log Message:
Mondo refactoring based on workday with Jim
Push more startup work and all ThreadedAsync work into ConnectionManager.
Open cache before registerDB() call.
Remove __begin hackery.
Merge Invalidator into ClientStorage (need to revise this code and
remove Invalidator, but that comes later). Leads to removal of
ClientProxy object.
--- Updated File ClientStorage.py in package Packages/ZEO --
--- ClientStorage.py 2001/03/29 13:29:01 1.26.4.2
+++ ClientStorage.py 2001/03/29 21:36:23 1.26.4.3
@@ -111,8 +111,8 @@
class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
- _connected = _async = 0
- __begin = 'tpc_begin_sync'
+ _connected = 0 # XXX is this needed? can probably use self._server
+ _server = None
def __init__(self, addr, storage='1', cache_size=20000000,
name='', client='', debug=0, var=None,
@@ -130,10 +130,6 @@
self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0}
- self._rpc_mgr = zrpc2.ConnectionManager(addr, #debug=debug,
- tmin=min_disconnect_poll,
- tmax=max_disconnect_poll)
-
name = name or str(addr)
self._tfile = tempfile.TemporaryFile()
@@ -145,79 +141,40 @@
ClientStorage.inheritedAttribute('__init__')(self, name)
self.__lock_acquire = self._lock_acquire # XXX
-
- self._cache=ClientCache.ClientCache(storage, cache_size,
- client=client, var=var)
- # XXX This stuff doesn't work right yet.
- ThreadedAsync.register_loop_callback(self.becomeAsync)
+ self._cache = ClientCache.ClientCache(storage, cache_size,
+ client=client, var=var)
+ self._cache.open() # XXX
- # IMPORTANT: Note that we aren't fully "there" yet.
- # In particular, we don't actually connect to the server
- # until we have a controlling database set with registerDB
- # below.
+ self._rpc_mgr = zrpc2.ConnectionManager(addr, self,
+ #debug=debug,
+ tmin=min_disconnect_poll,
+ tmax=max_disconnect_poll)
+ # XXX make this method call the default CnMgr behavior
+ self._rpc_mgr.connect(callback=self.notifyConnected)
def registerDB(self, db, limit):
"""Register that the storage is controlled by the given DB.
"""
+ self._db = db
- # Among other things, we know that our data methods won't get
- # called until after this call.
-
- # XXX Don't need to create separate invalidator object and
- # proxy. JF: The separate object exists only to avoid a
- # circular reference.
- invalidator = Invalidator.Invalidator(db.invalidate,
- self._cache.invalidate)
- self._rpc_mgr.register_object(ClientProxy(invalidator, self))
- # Now that we have our callback system in place, we can
- # try to connect
- self._startup()
-
- def _startup(self):
- if not self._rpc_mgr.attempt_connect(self.notifyConnected):
- # If we can't connect right away, go ahead and open the cache
- # and start a separate thread to try and reconnect.
-
- LOG("ClientStorage", PROBLEM, "Failed to connect to storage")
- self._cache.open()
- self._rpc_mgr.connect(callback=self.notifyConnected)
- # If the connect succeeds then this work will be done by
- # notifyConnected
- else:
- LOG("ClientStorage", INFO, "connected to storage")
-
def notifyConnected(self, c):
LOG("ClientStorage", INFO, "Connected to storage")
self._lock_acquire()
try:
- # We let the connection keep coming up now that
- # we have the storage lock. This way, we know no calls
- # will be made while in the process of coming up.
self._server = ServerStub.StorageServer(c)
self._connected = 1
self._oids = []
- # we do synchronous commits until we are sure that
- # we have and are ready for a main loop.
-
- # Hm. This is a little silly. If self._async, then
- # we will really never do a synchronous commit.
- # See below.
- self.__begin='tpc_begin_sync'
self._server.register(str(self._storage))
- self.open_cache()
+ self.verify_cache()
finally:
self._lock_release()
- if self._async:
- import asyncore
- self.becomeAsync(asyncore.socket_map)
-
- def open_cache(self):
- cached = self._cache.open()
+ def verify_cache(self):
+ cached = self._cache.open() # XXX
### This is a little expensive for large caches
if cached:
self._server.beginZeoVerify()
@@ -239,25 +196,17 @@
def notifyDisconnected(self, ignored):
LOG("ClientStorage", PROBLEM, "Disconnected from storage")
+ # XXX does this need to be called by ConnectionManager?
+ # oh yes! need to release the lock
self._connected = 0
self._transaction = None
- thread.start_new_thread(self._call.connect, (0,))
try:
self._commit_lock_release()
except:
pass
-
- def becomeAsync(self, map):
- self._lock_acquire()
- try:
- self._async = 1
- if self._connected:
- self._call.setLoop(map, getWakeup())
- self.__begin='tpc_begin'
- finally:
- self._lock_release()
- def __len__(self): return self._info['length']
+ def __len__(self):
+ return self._info['length']
def abortVersion(self, src, transaction):
if transaction is not self._transaction:
@@ -275,7 +224,8 @@
def close(self):
self._lock_acquire()
try:
- self._call.closeIntensionally() # XXX
+ self_.call.closeIntensionally() # XXX this can't work
+ self._rpc_mgr.close_nicely() # XXX or self._server
finally:
self._lock_release()
@@ -454,10 +404,7 @@
if not self._connected:
raise ClientDisconnected("This action is temporarily "
"unavailable.")
- if self.__begin == "tpc_begin_sync":
- r = self._server.tpc_begin_sync(id, user, desc, ext)
- else:
- r = self._server.tpc_begin(id, user, desc, ext)
+ r = self._server.tpc_begin(id, user, desc, ext)
except:
self._commit_lock_release()
raise
@@ -481,6 +428,8 @@
if transaction is not self._transaction: return
if f is not None: f()
+ # XXX what happens if the network dies RIGHT NOW?
+
self._server.tpc_finish(self._serial,
transaction.user,
transaction.description,
@@ -566,41 +515,36 @@
finally:
self._lock_release()
-class ClientProxy:
- """Define methods called by the StorageServer"""
-
- def __init__(self, invalidator, storage):
- self._inv = invalidator
- self._sto = storage
-
def begin(self):
- self._inv.begin()
+ self._tfile=tempfile.TemporaryFile()
+ pickler=cPickle.Pickler(self._tfile, 1)
+ pickler.fast=1 # Don't use the memo
+ self._d=pickler.dump
- def end(self):
- self._inv.end()
-
def invalidate(self, args):
- self._inv.invalidate(args)
-
- def Invalidate(self, args):
- self._inv.Invalidate(args)
-
- def unlock(self):
- self._sto.commit_lock_release()
-
- def serialno(self, arg):
- self._sto._serials.append(arg)
-
- def info(self, dict):
- self._sto._info.update(dict)
+ if self._d is None: return
+ self._d(args)
-class Wakeup:
- def __init__(self):
- self.trigger = None
-
- def __call__(self):
- if self.trigger is None:
- self.trigger = trigger.trigger()
- return self.trigger.pull_trigger
+ def end(self):
+ if self._d is None: return
+ self._d((0,0))
+ self._d=None
+ self._tfile.seek(0)
+ load=cPickle.Unpickler(self._tfile).load
+ self._tfile=None
+
+ cinvalidate=self._cache.invalidate
+ dinvalidate=self._db.invalidate
+
+ while 1:
+ oid, version = load()
+ if not oid: break
+ cinvalidate(oid, version=version)
+ dinvalidate(oid, version=version)
-getWakeup = Wakeup()
+ def Invalidate(self, args):
+ cinvalidate=self._cache.invalidate
+ dinvalidate=self._db.invalidate
+ for oid, version in args:
+ cinvalidate(oid, version=version)
+ dinvalidate(oid, version=version)