[Zodb-checkins] CVS: ZODB3/ZEO - ClientCache.py:1.41 ClientStorage.py:1.81 ClientStub.py:1.11 ServerStub.py:1.11 StorageServer.py:1.82 simul.py:1.17 stats.py:1.20

Jeremy Hylton jeremy@zope.com
Fri, 3 Jan 2003 17:08:16 -0500


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv21352/ZEO

Modified Files:
	ClientCache.py ClientStorage.py ClientStub.py ServerStub.py 
	StorageServer.py simul.py stats.py 
Log Message:
Merge ZODB3-fast-restart-branch to the trunk


=== ZODB3/ZEO/ClientCache.py 1.40 => 1.41 ===
--- ZODB3/ZEO/ClientCache.py:1.40	Thu Nov 21 13:48:53 2002
+++ ZODB3/ZEO/ClientCache.py	Fri Jan  3 17:07:38 2003
@@ -34,8 +34,16 @@
 The ClientStorage overrides the client name default to the value of
 the environment variable ZEO_CLIENT, if it exists.
 
-Each cache file has a 4-byte magic number followed by a sequence of
-records of the form:
+Each cache file has a 12-byte header followed by a sequence of
+records.  The header format is as follows:
+
+  offset in header: name -- description
+
+  0: magic -- 4-byte magic number, identifying this as a ZEO cache file
+
+  4: lasttid -- 8-byte last transaction id
+
+Each record has the following form:
 
   offset in record: name -- description
 
@@ -111,7 +119,8 @@
 import zLOG
 from ZEO.ICache import ICache
 
-magic='ZEC0'
+magic = 'ZEC1'
+headersize = 12
 
 class ClientCache:
 
@@ -126,6 +135,8 @@
 
         self._storage = storage
         self._limit = size / 2
+        self._client = client
+        self._ltid = None # For getLastTid()
 
         # Allocate locks:
         L = allocate_lock()
@@ -154,9 +165,9 @@
                     fi = open(p[i],'r+b')
                     if fi.read(4) == magic: # Minimal sanity
                         fi.seek(0, 2)
-                        if fi.tell() > 30:
-                            # First serial is at offset 19 + 4 for magic
-                            fi.seek(23)
+                        if fi.tell() > headersize:
+                            # Read serial at offset 19 of first record
+                            fi.seek(headersize + 19)
                             s[i] = fi.read(8)
                     # If we found a non-zero serial, then use the file
                     if s[i] != '\0\0\0\0\0\0\0\0':
@@ -172,14 +183,14 @@
                 if f[0] is None:
                     # We started, open the first cache file
                     f[0] = open(p[0], 'w+b')
-                    f[0].write(magic)
+                    f[0].write(magic + '\0' * (headersize - len(magic)))
                 current = 0
                 f[1] = None
         else:
             self._f = f = [tempfile.TemporaryFile(suffix='.zec'), None]
             # self._p file name 'None' signifies an unnamed temp file.
             self._p = p = [None, None]
-            f[0].write(magic)
+            f[0].write(magic + '\0' * (headersize - len(magic)))
             current = 0
 
         self.log("%s: storage=%r, size=%r; file[%r]=%r" %
@@ -219,6 +230,57 @@
                 except OSError:
                     pass
 
+    def getLastTid(self):
+        """Get the last transaction id stored by setLastTid().
+
+        If the cache is persistent, it is read from the current
+        cache file; otherwise it's an instance variable.
+        """
+        if self._client is None:
+            return self._ltid
+        else:
+            self._acquire()
+            try:
+                return self._getLastTid()
+            finally:
+                self._release()
+
+    def _getLastTid(self):
+        f = self._f[self._current]
+        f.seek(4)
+        tid = f.read(8)
+        if len(tid) < 8 or tid == '\0\0\0\0\0\0\0\0':
+            return None
+        else:
+            return tid
+
+    def setLastTid(self, tid):
+        """Store the last transaction id.
+
+        If the cache is persistent, it is written to the current
+        cache file; otherwise it's an instance variable.
+        """
+        if self._client is None:
+            if tid == '\0\0\0\0\0\0\0\0':
+                tid = None
+            self._ltid = tid
+        else:
+            self._acquire()
+            try:
+                self._setLastTid(tid)
+            finally:
+                self._release()
+
+    def _setLastTid(self, tid):
+        if tid is None:
+            tid = '\0\0\0\0\0\0\0\0'
+        else:
+            tid = str(tid)
+            assert len(tid) == 8
+        f = self._f[self._current]
+        f.seek(4)
+        f.write(tid)
+
     def verify(self, verifyFunc):
         """Call the verifyFunc on every object in the cache.
 
@@ -477,6 +539,7 @@
         self._acquire()
         try:
             if self._pos + size > self._limit:
+                ltid = self._getLastTid()
                 current = not self._current
                 self._current = current
                 self._trace(0x70)
@@ -500,8 +563,12 @@
                 else:
                     # Temporary cache file:
                     self._f[current] = tempfile.TemporaryFile(suffix='.zec')
-                self._f[current].write(magic)
-                self._pos = 4
+                header = magic
+                if ltid:
+                    header += ltid
+                self._f[current].write(header +
+                                       '\0' * (headersize - len(header)))
+                self._pos = headersize
         finally:
             self._release()
 
@@ -593,7 +660,7 @@
         f = self._f[fileindex]
         seek = f.seek
         read = f.read
-        pos = 4
+        pos = headersize
         count = 0
 
         while 1:
@@ -651,7 +718,6 @@
                     # We have a record for this oid, but it was invalidated!
                     del serial[oid]
                     del index[oid]
-
 
             pos = pos + tlen
             count += 1


=== ZODB3/ZEO/ClientStorage.py 1.80 => 1.81 ===
--- ZODB3/ZEO/ClientStorage.py:1.80	Fri Dec 20 11:11:58 2002
+++ ZODB3/ZEO/ClientStorage.py	Fri Jan  3 17:07:38 2003
@@ -22,7 +22,6 @@
 """
 
 # XXX TO DO
-# get rid of beginVerify, set up _tfile in verify_cache
 # set self._storage = stub later, in endVerify
 # if wait is given, wait until verify is complete
 
@@ -60,6 +59,9 @@
 class ClientDisconnected(ClientStorageError, Disconnected):
     """The database storage is disconnected from the storage."""
 
+def tid2time(tid):
+    return str(TimeStamp(tid))
+
 def get_timestamp(prev_ts=None):
     """Internal helper to return a unique TimeStamp instance.
 
@@ -208,6 +210,8 @@
         self._connection = None
         # _server_addr is used by sortKey()
         self._server_addr = None
+        self._tfile = None
+        self._pickler = None
 
         self._info = {'length': 0, 'size': 0, 'name': 'ZEO Client',
                       'supportsUndo':0, 'supportsVersions': 0,
@@ -337,12 +341,14 @@
         This is called by ConnectionManager after it has decided which
         connection should be used.
         """
+        # XXX would like to report whether we get a read-only connection
         if self._connection is not None:
-            log2(INFO, "Reconnected to storage")
+            reconnect = 1
         else:
-            log2(INFO, "Connected to storage")
+            reconnect = 0
         self.set_server_addr(conn.get_addr())
         stub = self.StorageServerStubClass(conn)
+        stub = self.StorageServerStubClass(conn)
         self._oids = []
         self._info.update(stub.get_info())
         self.verify_cache(stub)
@@ -353,6 +359,11 @@
         self._connection = conn
         self._server = stub
 
+        if reconnect:
+            log2(INFO, "Reconnected to storage: %s" % self._server_addr)
+        else:
+            log2(INFO, "Connected to storage: %s" % self._server_addr)
+
     def set_server_addr(self, addr):
         # Normalize server address and convert to string
         if isinstance(addr, types.StringType):
@@ -381,12 +392,42 @@
             return self._server_addr
 
     def verify_cache(self, server):
-        """Internal routine called to verify the cache."""
-        # XXX beginZeoVerify ends up calling back to beginVerify() below.
-        # That whole exchange is rather unnecessary.
-        server.beginZeoVerify()
+        """Internal routine called to verify the cache.
+
+        The return value (indicating which path we took) is used by
+        the test suite.
+        """
+        last_inval_tid = self._cache.getLastTid()
+        if last_inval_tid is not None:
+            ltid = server.lastTransaction()
+            if ltid == last_inval_tid:
+                log2(INFO, "No verification necessary "
+                     "(last_inval_tid up-to-date)")
+                self._cache.open()
+                return "no verification"
+
+            # log some hints about last transaction
+            log2(INFO, "last inval tid: %r %s"
+                 % (last_inval_tid, tid2time(last_inval_tid)))
+            log2(INFO, "last transaction: %r %s" %
+                 (ltid, ltid and tid2time(ltid)))
+
+            pair = server.getInvalidations(last_inval_tid)
+            if pair is not None:
+                log2(INFO, "Recovering %d invalidations" % len(pair[1]))
+                self._cache.open()
+                self.invalidateTransaction(*pair)
+                return "quick verification"
+            
+        log2(INFO, "Verifying cache")
+        # setup tempfile to hold zeoVerify results
+        self._tfile = tempfile.TemporaryFile(suffix=".inv")
+        self._pickler = cPickle.Pickler(self._tfile, 1)
+        self._pickler.fast = 1 # Don't use the memo
+
         self._cache.verify(server.zeoVerify)
         server.endZeoVerify()
+        return "full verification"
 
     ### Is there a race condition between notifyConnected and
     ### notifyDisconnected? In Particular, what if we get
@@ -402,7 +443,8 @@
         This is called by ConnectionManager when the connection is
         closed or when certain problems with the connection occur.
         """
-        log2(PROBLEM, "Disconnected from storage")
+        log2(PROBLEM, "Disconnected from storage: %s"
+             % repr(self._server_addr))
         self._connection = None
         self._server = disconnected_stub
 
@@ -644,6 +686,7 @@
         self._serial = id
         self._seriald.clear()
         del self._serials[:]
+        self._tbuf.clear()
 
     def end_transaction(self):
         """Internal helper to end a transaction."""
@@ -678,12 +721,13 @@
             if f is not None:
                 f()
 
-            self._server.tpc_finish(self._serial)
+            tid = self._server.tpc_finish(self._serial)
 
             r = self._check_serials()
             assert r is None or len(r) == 0, "unhandled serialnos: %s" % r
 
             self._update_cache()
+            self._cache.setLastTid(tid)
         finally:
             self.end_transaction()
 
@@ -779,12 +823,6 @@
         """Server callback to update the info dictionary."""
         self._info.update(dict)
 
-    def beginVerify(self):
-        """Server callback to signal start of cache validation."""
-        self._tfile = tempfile.TemporaryFile(suffix=".inv")
-        self._pickler = cPickle.Pickler(self._tfile, 1)
-        self._pickler.fast = 1 # Don't use the memo
-
     def invalidateVerify(self, args):
         """Server callback to invalidate an (oid, version) pair.
 
@@ -802,6 +840,7 @@
         if self._pickler is None:
             return
         self._pickler.dump((0,0))
+        self._pickler = None
         self._tfile.seek(0)
         unpick = cPickle.Unpickler(self._tfile)
         f = self._tfile
@@ -815,29 +854,26 @@
             self._db.invalidate(oid, version=version)
         f.close()
 
-    def invalidateTrans(self, args):
-        """Server callback to invalidate a list of (oid, version) pairs.
-
-        This is called as the result of a transaction.
-        """
+    def invalidateTransaction(self, tid, args):
+        """Invalidate objects modified by tid."""
+        self._cache.setLastTid(tid)
+        if self._pickler is not None:
+            self.log("Transactional invalidation during cache verification",
+                     level=zLOG.BLATHER)
+            for t in args:
+                self.self._pickler.dump(t)
+            return
+        db = self._db
         for oid, version in args:
             self._cache.invalidate(oid, version=version)
-            try:
-                self._db.invalidate(oid, version=version)
-            except AttributeError, msg:
-                log2(PROBLEM,
-                    "Invalidate(%s, %s) failed for _db: %s" % (repr(oid),
-                                                               repr(version),
-                                                               msg))
-
-    # Unfortunately, the ZEO 2 wire protocol uses different names for
-    # several of the callback methods invoked by the StorageServer.
-    # We can't change the wire protocol at this point because that
-    # would require synchronized updates of clients and servers and we
-    # don't want that.  So here we alias the old names to their new
-    # implementations.
+            if db is not None:
+                db.invalidate(oid, version=version)
+
+    # The following are for compatibility with protocol version 2.0.0
+
+    def invalidateTrans(self, args):
+        return self.invalidateTransaction(None, args)
 
-    begin = beginVerify
     invalidate = invalidateVerify
     end = endVerify
     Invalidate = invalidateTrans


=== ZODB3/ZEO/ClientStub.py 1.10 => 1.11 ===
--- ZODB3/ZEO/ClientStub.py:1.10	Tue Oct  1 14:38:22 2002
+++ ZODB3/ZEO/ClientStub.py	Fri Jan  3 17:07:38 2003
@@ -44,16 +44,16 @@
         self.rpc = rpc
 
     def beginVerify(self):
-        self.rpc.callAsync('begin')
+        self.rpc.callAsync('beginVerify')
 
     def invalidateVerify(self, args):
-        self.rpc.callAsync('invalidate', args)
+        self.rpc.callAsync('invalidateVerify', args)
 
     def endVerify(self):
-        self.rpc.callAsync('end')
+        self.rpc.callAsync('endVerify')
 
-    def invalidateTrans(self, args):
-        self.rpc.callAsync('Invalidate', args)
+    def invalidateTransaction(self, tid, args):
+        self.rpc.callAsync('invalidateTransaction', tid, args)
 
     def serialnos(self, arg):
         self.rpc.callAsync('serialnos', arg)


=== ZODB3/ZEO/ServerStub.py 1.10 => 1.11 ===
--- ZODB3/ZEO/ServerStub.py:1.10	Wed Nov 13 06:24:36 2002
+++ ZODB3/ZEO/ServerStub.py	Fri Jan  3 17:07:38 2003
@@ -32,6 +32,9 @@
         zrpc.connection.Connection class.
         """
         self.rpc = rpc
+        if self.rpc.peer_protocol_version == 'Z200':
+            self.lastTransaction = lambda: None
+            self.getInvalidations = lambda tid: None
 
     def extensionMethod(self, name):
         return ExtensionMethodWrapper(self.rpc, name).call
@@ -51,8 +54,13 @@
     def get_info(self):
         return self.rpc.call('get_info')
 
-    def beginZeoVerify(self):
-        self.rpc.callAsync('beginZeoVerify')
+    def lastTransaction(self):
+        # Not in protocol version 2.0.0; see __init__()
+        return self.rpc.call('lastTransaction')
+
+    def getInvalidations(self, tid):
+        # Not in protocol version 2.0.0; see __init__()
+        return self.rpc.call('getInvalidations', tid)
 
     def zeoVerify(self, oid, s, sv):
         self.rpc.callAsync('zeoVerify', oid, s, sv)


=== ZODB3/ZEO/StorageServer.py 1.81 => 1.82 ===
--- ZODB3/ZEO/StorageServer.py:1.81	Tue Nov 26 17:37:12 2002
+++ ZODB3/ZEO/StorageServer.py	Fri Jan  3 17:07:38 2003
@@ -37,6 +37,7 @@
 from ZODB.POSException import TransactionError, ReadOnlyError
 from ZODB.referencesf import referencesf
 from ZODB.Transaction import Transaction
+from ZODB.utils import u64
 
 _label = "ZSS" # Default label used for logging.
 
@@ -68,8 +69,8 @@
     ZEOStorageClass = None # patched up later
     ManagedServerConnectionClass = ManagedServerConnection
 
-    def __init__(self, addr, storages, read_only=0):
-
+    def __init__(self, addr, storages, read_only=0,
+                 invalidation_queue_size=100):
         """StorageServer constructor.
 
         This is typically invoked from the start.py script.
@@ -102,13 +103,17 @@
         self.storages = storages
         set_label()
         msg = ", ".join(
-            ["%s:%s" % (name, storage.isReadOnly() and "RO" or "RW")
+            ["%s:%s:%s" % (name, storage.isReadOnly() and "RO" or "RW",
+                           storage.getName())
              for name, storage in storages.items()])
         log("%s created %s with storages: %s" %
             (self.__class__.__name__, read_only and "RO" or "RW", msg))
         for s in storages.values():
             s._waiting = []
         self.read_only = read_only
+        # A list of at most invalidation_queue_size invalidations
+        self.invq = []
+        self.invq_bound = invalidation_queue_size
         self.connections = {}
         self.dispatcher = self.DispatcherClass(addr,
                                                factory=self.new_connection,
@@ -141,7 +146,7 @@
             l = self.connections[storage_id] = []
         l.append(conn)
 
-    def invalidate(self, conn, storage_id, invalidated=(), info=None):
+    def invalidate(self, conn, storage_id, tid, invalidated=(), info=None):
         """Internal: broadcast info and invalidations to clients.
 
         This is called from several ZEOStorage methods.
@@ -149,7 +154,7 @@
         This can do three different things:
 
         - If the invalidated argument is non-empty, it broadcasts
-          invalidateTrans() messages to all clients of the given
+          invalidateTransaction() messages to all clients of the given
           storage except the current client (the conn argument).
 
         - If the invalidated argument is empty and the info argument
@@ -158,17 +163,47 @@
           client.
 
         - If both the invalidated argument and the info argument are
-          non-empty, it broadcasts invalidateTrans() messages to all
+          non-empty, it broadcasts invalidateTransaction() messages to all
           clients except the current, and sends an info() message to
           the current client.
 
         """
+        if invalidated:
+            if len(self.invq) >= self.invq_bound:
+                del self.invq[0]
+            self.invq.append((tid, invalidated))
         for p in self.connections.get(storage_id, ()):
             if invalidated and p is not conn:
-                p.client.invalidateTrans(invalidated)
+                p.client.invalidateTransaction(tid, invalidated)
             elif info is not None:
                 p.client.info(info)
 
+    def get_invalidations(self, tid):
+        """Return a tid and list of all objects invalidation since tid.
+
+        The tid is the most recent transaction id committed by the server.
+
+        Returns None if it is unable to provide a complete list
+        of invalidations for tid.  In this case, client should
+        do full cache verification.
+        """
+
+        if not self.invq:
+            log("invq empty")
+            return None, []
+        
+        earliest_tid = self.invq[0][0]
+        if earliest_tid > tid:
+            log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
+            return None, []
+        
+        oids = {}
+        for tid, L in self.invq:
+            for key in L:
+                oids[key] = 1
+        latest_tid = self.invq[-1][0]
+        return latest_tid, oids.keys()
+
     def close_server(self):
         """Close the dispatcher so that there are no new connections.
 
@@ -212,10 +247,18 @@
         self.storage_id = "uninitialized"
         self.transaction = None
         self.read_only = read_only
+        self.log_label = _label
 
     def notifyConnected(self, conn):
         self.connection = conn # For restart_other() below
         self.client = self.ClientStorageStubClass(conn)
+        addr = conn.addr
+        if isinstance(addr, type("")):
+            label = addr
+        else:
+            host, port = addr
+            label = str(host) + ":" + str(port)
+        self.log_label = _label + "/" + label
 
     def notifyDisconnected(self):
         # When this storage closes, we must ensure that it aborts
@@ -237,7 +280,7 @@
         return "<%s %X trans=%s s_trans=%s>" % (name, id(self), tid, stid)
 
     def log(self, msg, level=zLOG.INFO, error=None):
-        zLOG.LOG("%s:%s" % (_label, self.storage_id), level, msg, error=error)
+        zLOG.LOG(self.log_label, level, msg, error=error)
 
     def setup_delegation(self):
         """Delegate several methods to the storage"""
@@ -259,6 +302,7 @@
             for name in fn().keys():
                 if not hasattr(self,name):
                     setattr(self, name, getattr(self.storage, name))
+        self.lastTransaction = self.storage.lastTransaction
 
     def check_tid(self, tid, exc=None):
         if self.read_only:
@@ -286,7 +330,7 @@
         This method must be the first one called by the client.
         """
         if self.storage is not None:
-            log("duplicate register() call")
+            self.log("duplicate register() call")
             raise ValueError, "duplicate register() call"
         storage = self.server.storages.get(storage_id)
         if storage is None:
@@ -342,8 +386,13 @@
                 raise
         return p, s, v, pv, sv
 
-    def beginZeoVerify(self):
-        self.client.beginVerify()
+    def getInvalidations(self, tid):
+        invtid, invlist = self.server.get_invalidations(tid)
+        if invtid is None:
+            return None
+        self.log("Return %d invalidations up to tid %s"
+                 % (len(invlist), u64(invtid)))
+        return invtid, invlist
 
     def zeoVerify(self, oid, s, sv):
         try:
@@ -394,7 +443,8 @@
         self.storage.pack(time, referencesf)
         self.log("pack(time=%s) complete" % repr(time))
         # Broadcast new size statistics
-        self.server.invalidate(0, self.storage_id, (), self.get_size_info())
+        self.server.invalidate(0, self.storage_id, None,
+                               (), self.get_size_info())
 
     def new_oids(self, n=100):
         """Return a sequence of n new oids, where n defaults to 100"""
@@ -409,7 +459,7 @@
             raise ReadOnlyError()
         oids = self.storage.undo(transaction_id)
         if oids:
-            self.server.invalidate(self, self.storage_id,
+            self.server.invalidate(self, self.storage_id, None,
                                    map(lambda oid: (oid, ''), oids))
             return oids
         return ()
@@ -450,12 +500,15 @@
         if not self.check_tid(id):
             return
         invalidated = self.strategy.tpc_finish()
+        tid = self.storage.lastTransaction()
         if invalidated:
-            self.server.invalidate(self, self.storage_id,
+            self.server.invalidate(self, self.storage_id, tid,
                                    invalidated, self.get_size_info())
         self.transaction = None
         self.strategy = None
+        # Return the tid, for cache invalidation optimization
         self.handle_waiting()
+        return tid
 
     def tpc_abort(self, id):
         if not self.check_tid(id):
@@ -546,7 +599,8 @@
         old_strategy = self.strategy
         assert isinstance(old_strategy, DelayedCommitStrategy)
         self.strategy = ImmediateCommitStrategy(self.storage,
-                                                self.client)
+                                                self.client,
+                                                self.log)
         resp = old_strategy.restart(self.strategy)
         if delay is not None:
             delay.reply(resp)
@@ -602,11 +656,12 @@
 class ImmediateCommitStrategy:
     """The storage is available so do a normal commit."""
 
-    def __init__(self, storage, client):
+    def __init__(self, storage, client, logmethod):
         self.storage = storage
         self.client = client
         self.invalidated = []
         self.serials = []
+        self.log = logmethod
 
     def tpc_begin(self, txn, tid, status):
         self.txn = txn
@@ -628,12 +683,14 @@
         try:
             newserial = self.storage.store(oid, serial, data, version,
                                            self.txn)
+        except (SystemExit, KeyboardInterrupt):
+            raise
         except Exception, err:
             if not isinstance(err, TransactionError):
                 # Unexpected errors are logged and passed to the client
                 exc_info = sys.exc_info()
-                log("store error: %s, %s" % exc_info[:2],
-                    zLOG.ERROR, error=exc_info)
+                self.log("store error: %s, %s" % exc_info[:2],
+                         zLOG.ERROR, error=exc_info)
                 del exc_info
             # Try to pickle the exception.  If it can't be pickled,
             # the RPC response would fail, so use something else.
@@ -643,7 +700,7 @@
                 pickler.dump(err, 1)
             except:
                 msg = "Couldn't pickle storage exception: %s" % repr(err)
-                log(msg, zLOG.ERROR)
+                self.log(msg, zLOG.ERROR)
                 err = StorageServerError(msg)
             # The exception is reported back as newserial for this oid
             newserial = err
@@ -776,6 +833,8 @@
     def run(self):
         try:
             result = self._method(*self._args)
+        except (SystemExit, KeyboardInterrupt):
+            raise
         except Exception:
             self.delay.error(sys.exc_info())
         else:


=== ZODB3/ZEO/simul.py 1.16 => 1.17 ===
--- ZODB3/ZEO/simul.py:1.16	Fri Nov 22 11:42:29 2002
+++ ZODB3/ZEO/simul.py	Fri Jan  3 17:07:38 2003
@@ -117,18 +117,12 @@
             # Must be a misaligned record caused by a crash
             ##print "Skipping 8 bytes at offset", offset-8
             continue
-        oid = f_read(8)
-        if len(oid) < 8:
+        r = f_read(16)
+        if len(r) < 16:
             break
-        if heuristic and oid[:4] != '\0\0\0\0':
-            f.seek(-8, 1)
-            continue
-        offset += 8
-        serial = f_read(8)
-        if len(serial) < 8:
-            break
-        offset += 8
+        offset += 16
         records += 1
+        oid, serial = struct_unpack(">8s8s", r)
         # Decode the code
         dlen, version, code, current = (code & 0x7fffff00,
                                         code & 0x80,


=== ZODB3/ZEO/stats.py 1.19 => 1.20 ===
--- ZODB3/ZEO/stats.py:1.19	Fri Nov 22 11:42:29 2002
+++ ZODB3/ZEO/stats.py	Fri Jan  3 17:07:38 2003
@@ -153,24 +153,14 @@
             if ts == 0:
                 # Must be a misaligned record caused by a crash
                 if not quiet:
-                    print "Skipping 8 bytes at offset", offset-8,
-                    print repr(r)
+                    print "Skipping 8 bytes at offset", offset-8
                 continue
-            oid = f_read(8)
-            if len(oid) < 8:
+            r = f_read(16)
+            if len(r) < 16:
                 break
-            if heuristic and oid[:4] != '\0\0\0\0':
-                # Heuristic for severe data corruption
-                print "Seeking back over bad oid at offset", offset,
-                print repr(r)
-                f.seek(-8, 1)
-                continue
-            offset += 8
-            serial = f_read(8)
-            if len(serial) < 8:
-                break
-            offset += 8
+            offset += 16
             records += 1
+            oid, serial = struct_unpack(">8s8s", r)
             if t0 is None:
                 t0 = ts
                 thisinterval = t0 / interval