[Zope-Checkins] CVS: ZODB3/ZEO - StorageServer.py:1.74.2.7

Jeremy Hylton jeremy@zope.com
Wed, 18 Dec 2002 16:11:23 -0500


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv2358/ZEO

Modified Files:
      Tag: ZODB3-3_1-branch
	StorageServer.py 
Log Message:
Backport zeoVerify() improvement from trunk.
Merge two fixes from ZODB3-fast-restart-branch

Use cheap getSerial() instead of costly zeoLoad().

Logging cleanup. Use the connection address as part of the log label,
so log messages pertaining to different clients can be distinguished.
Jeremy is making a similar change to zrpc.

Don't swallow SystemExit and KeyboardInterrupt and send to client.


=== ZODB3/ZEO/StorageServer.py 1.74.2.6 => 1.74.2.7 ===
--- ZODB3/ZEO/StorageServer.py:1.74.2.6	Tue Nov 12 15:18:09 2002
+++ ZODB3/ZEO/StorageServer.py	Wed Dec 18 16:11:22 2002
@@ -211,10 +211,18 @@
         self.storage_id = "uninitialized"
         self.transaction = None
         self.read_only = read_only
+        self.log_label = _label
 
     def notifyConnected(self, conn):
         self.connection = conn # For restart_other() below
         self.client = self.ClientStorageStubClass(conn)
+        addr = conn.addr
+        if isinstance(addr, type("")):
+            label = addr
+        else:
+            host, port = addr
+            label = str(host) + ":" + str(port)
+        self.log_label = _label + "/" + label
 
     def notifyDisconnected(self):
         # When this storage closes, we must ensure that it aborts
@@ -236,7 +244,7 @@
         return "<%s %X trans=%s s_trans=%s>" % (name, id(self), tid, stid)
 
     def log(self, msg, level=zLOG.INFO, error=None):
-        zLOG.LOG("%s:%s" % (_label, self.storage_id), level, msg, error=error)
+        zLOG.LOG(self.log_label, level, msg, error=error)
 
     def setup_delegation(self):
         """Delegate several methods to the storage"""
@@ -273,7 +281,7 @@
         This method must be the first one called by the client.
         """
         if self.storage is not None:
-            log("duplicate register() call")
+            self.log("duplicate register() call")
             raise ValueError, "duplicate register() call"
         storage = self.server.storages.get(storage_id)
         if storage is None:
@@ -325,13 +333,33 @@
 
     def zeoVerify(self, oid, s, sv):
         try:
-            p, os, v, pv, osv = self.zeoLoad(oid)
-        except: # except what?
-            return None
-        if os != s:
+            os = self.storage.getSerial(oid)
+        except KeyError:
             self.client.invalidateVerify((oid, ''))
-        elif osv != sv:
-            self.client.invalidateVerify((oid, v))
+            # XXX It's not clear what we should do now.  The KeyError
+            # could be caused by an object uncreation, in which case
+            # invalidation is right.  It could be an application bug
+            # that left a dangling reference, in which case it's bad.
+        else:
+            # If the client has version data, the logic is a bit more
+            # complicated.  If the current serial number matches the
+            # client serial number, then the non-version data must
+            # also be valid.  If the current serialno is for a
+            # version, then the non-version data can't change.
+
+            # If the version serialno isn't valid, then the
+            # non-version serialno may or may not be valid.  Rather
+            # than trying to figure it whether it is valid, we just
+            # invalidate it.  Sending an invalidation for the
+            # non-version data implies invalidating the version data
+            # too, since an update to non-version data can only occur
+            # after the version is aborted or committed.
+            if sv:
+                if sv != os:
+                    self.client.invalidateVerify((oid, ''))
+            else:
+                if s != os:
+                    self.client.invalidateVerify((oid, ''))
 
     def endZeoVerify(self):
         self.client.endVerify()
@@ -504,7 +532,8 @@
         old_strategy = self.strategy
         assert isinstance(old_strategy, DelayedCommitStrategy)
         self.strategy = ImmediateCommitStrategy(self.storage,
-                                                self.client)
+                                                self.client,
+                                                self.log)
         resp = old_strategy.restart(self.strategy)
         if delay is not None:
             delay.reply(resp)
@@ -560,11 +589,12 @@
 class ImmediateCommitStrategy:
     """The storage is available so do a normal commit."""
 
-    def __init__(self, storage, client):
+    def __init__(self, storage, client, logmethod):
         self.storage = storage
         self.client = client
         self.invalidated = []
         self.serials = []
+        self.log = logmethod
 
     def tpc_begin(self, txn, tid, status):
         self.txn = txn
@@ -586,12 +616,14 @@
         try:
             newserial = self.storage.store(oid, serial, data, version,
                                            self.txn)
+        except (SystemExit, KeyboardInterrupt):
+            raise
         except Exception, err:
             if not isinstance(err, TransactionError):
                 # Unexpected errors are logged and passed to the client
                 exc_info = sys.exc_info()
-                log("store error: %s, %s" % exc_info[:2],
-                    zLOG.ERROR, error=exc_info)
+                self.log("store error: %s, %s" % exc_info[:2],
+                         zLOG.ERROR, error=exc_info)
                 del exc_info
             # Try to pickle the exception.  If it can't be pickled,
             # the RPC response would fail, so use something else.
@@ -601,7 +633,7 @@
                 pickler.dump(err, 1)
             except:
                 msg = "Couldn't pickle storage exception: %s" % repr(err)
-                log(msg, zLOG.ERROR)
+                self.log(msg, zLOG.ERROR)
                 err = StorageServerError(msg)
             # The exception is reported back as newserial for this oid
             newserial = err
@@ -734,6 +766,8 @@
     def run(self):
         try:
             result = self._method(*self._args)
+        except (SystemExit, KeyboardInterrupt):
+            raise
         except Exception:
             self.delay.error(sys.exc_info())
         else: