[Zodb-checkins] CVS: Packages/ZEO - ClientStorage.py:1.26.4.3

jeremy@digicool.com jeremy@digicool.com
Thu, 29 Mar 2001 16:36:24 -0500 (EST)


Update of /cvs-repository/Packages/ZEO
In directory korak:/tmp/cvs-serv19392

Modified Files:
      Tag: ZEO-ZRPC-Dev
	ClientStorage.py 
Log Message:
Mondo refactoring based on workday with Jim

Push more startup work and all ThreadedAsync work into ConnectionManager.

Open cache before registerDB() call.

Remove __begin hackery.

Merge Invalidator into ClientStorage (need to revise this code and
remove Invalidator, but that comes later).  Leads to removal of
ClientProxy object.




--- Updated File ClientStorage.py in package Packages/ZEO --
--- ClientStorage.py	2001/03/29 13:29:01	1.26.4.2
+++ ClientStorage.py	2001/03/29 21:36:23	1.26.4.3
@@ -111,8 +111,8 @@
 
 class ClientStorage(ExtensionClass.Base, BaseStorage.BaseStorage):
 
-    _connected = _async = 0
-    __begin = 'tpc_begin_sync'
+    _connected = 0 # XXX is this needed? can probably use self._server
+    _server = None
 
     def __init__(self, addr, storage='1', cache_size=20000000,
                  name='', client='', debug=0, var=None,
@@ -130,10 +130,6 @@
         self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
                       'supportsUndo':0, 'supportsVersions': 0}
 
-        self._rpc_mgr = zrpc2.ConnectionManager(addr, #debug=debug,
-                                                tmin=min_disconnect_poll,
-                                                tmax=max_disconnect_poll)
-
         name = name or str(addr)
 
         self._tfile = tempfile.TemporaryFile()
@@ -145,79 +141,40 @@
         ClientStorage.inheritedAttribute('__init__')(self, name)
 
         self.__lock_acquire = self._lock_acquire # XXX
-
-        self._cache=ClientCache.ClientCache(storage, cache_size,
-                                            client=client, var=var) 
 
-        # XXX This stuff doesn't work right yet.
-        ThreadedAsync.register_loop_callback(self.becomeAsync)
+        self._cache = ClientCache.ClientCache(storage, cache_size,
+                                              client=client, var=var) 
+        self._cache.open() # XXX
 
-        # IMPORTANT: Note that we aren't fully "there" yet.
-        # In particular, we don't actually connect to the server
-        # until we have a controlling database set with registerDB
-        # below.
+        self._rpc_mgr = zrpc2.ConnectionManager(addr, self,
+                                                #debug=debug,
+                                                tmin=min_disconnect_poll,
+                                                tmax=max_disconnect_poll)
+        # XXX make this method call the default CnMgr behavior
+        self._rpc_mgr.connect(callback=self.notifyConnected)
 
     def registerDB(self, db, limit):
         """Register that the storage is controlled by the given DB.
         """
+        self._db = db
 
-        # Among other things, we know that our data methods won't get
-        # called until after this call.
-
-        # XXX Don't need to create separate invalidator object and
-        # proxy.  JF: The separate object exists only to avoid a
-        # circular reference.
-        invalidator = Invalidator.Invalidator(db.invalidate,
-                                              self._cache.invalidate)
-        self._rpc_mgr.register_object(ClientProxy(invalidator, self))
-        # Now that we have our callback system in place, we can
-        # try to connect
-        self._startup()
-
-    def _startup(self):
-        if not self._rpc_mgr.attempt_connect(self.notifyConnected):
-            # If we can't connect right away, go ahead and open the cache
-            # and start a separate thread to try and reconnect.
-
-            LOG("ClientStorage", PROBLEM, "Failed to connect to storage")
-            self._cache.open()
-            self._rpc_mgr.connect(callback=self.notifyConnected)
-            # If the connect succeeds then this work will be done by
-            # notifyConnected
-        else:
-            LOG("ClientStorage", INFO, "connected to storage")
-
     def notifyConnected(self, c):
         LOG("ClientStorage", INFO, "Connected to storage")
         self._lock_acquire()
         try:
-            # We let the connection keep coming up now that
-            # we have the storage lock. This way, we know no calls
-            # will be made while in the process of coming up.
             self._server = ServerStub.StorageServer(c)
 
             self._connected = 1
             self._oids = []
 
-            # we do synchronous commits until we are sure that
-            # we have and are ready for a main loop.
-
-            # Hm. This is a little silly. If self._async, then
-            # we will really never do a synchronous commit.
-            # See below.
-            self.__begin='tpc_begin_sync'
             self._server.register(str(self._storage))
-            self.open_cache()
+            self.verify_cache()
 
         finally:
             self._lock_release()
 
-        if self._async:
-            import asyncore
-            self.becomeAsync(asyncore.socket_map)
-
-    def open_cache(self):
-        cached = self._cache.open()
+    def verify_cache(self):
+        cached = self._cache.open() # XXX
         ### This is a little expensive for large caches
         if cached:
             self._server.beginZeoVerify()
@@ -239,25 +196,17 @@
 
     def notifyDisconnected(self, ignored):
         LOG("ClientStorage", PROBLEM, "Disconnected from storage")
+        # XXX does this need to be called by ConnectionManager?
+        # oh yes!  need to release the lock
         self._connected = 0
         self._transaction = None
-        thread.start_new_thread(self._call.connect, (0,))
         try:
             self._commit_lock_release()
         except:
             pass
-
-    def becomeAsync(self, map):
-        self._lock_acquire()
-        try:
-            self._async = 1
-            if self._connected:
-                self._call.setLoop(map, getWakeup())
-                self.__begin='tpc_begin'
-        finally:
-            self._lock_release()
 
-    def __len__(self): return self._info['length']
+    def __len__(self):
+        return self._info['length']
 
     def abortVersion(self, src, transaction):
         if transaction is not self._transaction:
@@ -275,7 +224,8 @@
     def close(self):
         self._lock_acquire()
         try:
-            self._call.closeIntensionally() # XXX
+            self_.call.closeIntensionally() # XXX this can't work
+            self._rpc_mgr.close_nicely() # XXX or self._server
         finally:
             self._lock_release()
         
@@ -454,10 +404,7 @@
                     if not self._connected:
                         raise ClientDisconnected("This action is temporarily "
                                                  "unavailable.")
-                    if self.__begin == "tpc_begin_sync":
-                        r = self._server.tpc_begin_sync(id, user, desc, ext)
-                    else:
-                        r = self._server.tpc_begin(id, user, desc, ext)
+                    r = self._server.tpc_begin(id, user, desc, ext)
                 except:
                     self._commit_lock_release()
                     raise
@@ -481,6 +428,8 @@
             if transaction is not self._transaction: return
             if f is not None: f()
 
+            # XXX what happens if the network dies RIGHT NOW?
+
             self._server.tpc_finish(self._serial,
                                     transaction.user,
                                     transaction.description,
@@ -566,41 +515,36 @@
         finally:
             self._lock_release()
 
-class ClientProxy:
-    """Define methods called by the StorageServer"""
-
-    def __init__(self, invalidator, storage):
-        self._inv = invalidator
-        self._sto = storage
-
     def begin(self):
-        self._inv.begin()
+        self._tfile=tempfile.TemporaryFile()
+        pickler=cPickle.Pickler(self._tfile, 1)
+        pickler.fast=1 # Don't use the memo
+        self._d=pickler.dump
 
-    def end(self):
-        self._inv.end()
-
     def invalidate(self, args):
-        self._inv.invalidate(args)
-
-    def Invalidate(self, args):
-        self._inv.Invalidate(args)
-
-    def unlock(self):
-        self._sto.commit_lock_release()
-
-    def serialno(self, arg):
-        self._sto._serials.append(arg)
-
-    def info(self, dict):
-        self._sto._info.update(dict)
+        if self._d is None: return
+        self._d(args)
 
-class Wakeup:
-    def __init__(self):
-        self.trigger = None
-
-    def __call__(self):
-        if self.trigger is None:
-            self.trigger = trigger.trigger()
-        return self.trigger.pull_trigger
+    def end(self):
+        if self._d is None: return
+        self._d((0,0))
+        self._d=None
+        self._tfile.seek(0)
+        load=cPickle.Unpickler(self._tfile).load
+        self._tfile=None
+
+        cinvalidate=self._cache.invalidate
+        dinvalidate=self._db.invalidate
+
+        while 1:
+            oid, version = load()
+            if not oid: break
+            cinvalidate(oid, version=version)
+            dinvalidate(oid, version=version)
 
-getWakeup = Wakeup()
+    def Invalidate(self, args):
+        cinvalidate=self._cache.invalidate
+        dinvalidate=self._db.invalidate
+        for oid, version in args:
+            cinvalidate(oid, version=version)
+            dinvalidate(oid, version=version)