[Zope-Checkins] CVS: ZODB3/ZODB - FileStorage.py:1.105.2.20 fspack.py:1.5.2.7

Tim Peters tim.one at comcast.net
Fri Aug 22 17:56:39 EDT 2003


Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv1516/ZODB

Modified Files:
      Tag: ZODB3-3_1-branch
	FileStorage.py fspack.py 
Log Message:
Added an oid->serialno in-memory cache to FileStorage.  The intent is to
save expensive seeks in several common operations (e.g., getSerial()/
lastSerial() obviously, and .store() needs to fetch serialnos too).  With
some luck this will also speed cache verification on reconnects, and
when multiple clients connect via ZEO.

getSerial/lastSerial:  it's unclear why these both exist.  The former
raised KeyError if oid wasn't known, while the latter returned None,
and that appears to be the only plausibly intended difference.  The
latter neglected to acquire the lock before seeking, so that was a bug,
but the latter did better error-checking.  So massively rearranged
these, moving everything good above the latter into the former.

Jim noted that I'm more aggressive than necessary about deleting entries
from the new cache in the presence of versions.  I'll note that deleting
a cache entry is always safe, and that there are few good version tests
in 3.1.  Happy to optimize more later, but there's a lot of time pressure
now, and I don't really understand the version implementation, so "better
safe than sorry" rules.


=== ZODB3/ZODB/FileStorage.py 1.105.2.19 => 1.105.2.20 ===
--- ZODB3/ZODB/FileStorage.py:1.105.2.19	Thu Aug 21 12:26:59 2003
+++ ZODB3/ZODB/FileStorage.py	Fri Aug 22 16:56:08 2003
@@ -158,17 +158,21 @@
 assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN
 assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN
 
+def blather(message, *data):
+    LOG('ZODB FS', BLATHER, "%s blather: %s\n" % (packed_version,
+                                                  message % data))
+
 def warn(message, *data):
     LOG('ZODB FS', WARNING, "%s  warn: %s\n" % (packed_version,
-                                                (message % data)))
+                                                message % data))
 
 def error(message, *data):
     LOG('ZODB FS', ERROR, "%s ERROR: %s\n" % (packed_version,
-                                              (message % data)))
+                                              message % data))
 
 def nearPanic(message, *data):
     LOG('ZODB FS', PANIC, "%s ERROR: %s\n" % (packed_version,
-                                              (message % data)))
+                                              message % data))
 
 def panic(message, *data):
     message = message % data
@@ -238,8 +242,10 @@
 
         BaseStorage.BaseStorage.__init__(self, file_name)
 
-        index, vindex, tindex, tvindex = self._newIndexes()
-        self._initIndex(index, vindex, tindex, tvindex)
+        (index, vindex, tindex, tvindex,
+         oid2serial, toid2serial, toid2serial_delete) = self._newIndexes()
+        self._initIndex(index, vindex, tindex, tvindex,
+                        oid2serial, toid2serial, toid2serial_delete)
 
         # Now open the file
 
@@ -274,7 +280,8 @@
 
             index, vindex, start, maxoid, ltid = r
 
-            self._initIndex(index, vindex, tindex, tvindex)
+            self._initIndex(index, vindex, tindex, tvindex,
+                            oid2serial, toid2serial, toid2serial_delete)
             self._pos, self._oid, tid = read_index(
                 self._file, file_name, index, vindex, tindex, stop,
                 ltid=ltid, start=start, maxoid=maxoid,
@@ -304,7 +311,11 @@
 
         self._quota = quota
 
-    def _initIndex(self, index, vindex, tindex, tvindex):
+        # Serialno cache statistics.
+        self._oid2serial_nlookups = self._oid2serial_nhits = 0
+
+    def _initIndex(self, index, vindex, tindex, tvindex,
+                   oid2serial, toid2serial, toid2serial_delete):
         self._index=index
         self._vindex=vindex
         self._tindex=tindex
@@ -312,12 +323,33 @@
         self._index_get=index.get
         self._vindex_get=vindex.get
 
+        # .store() needs to compare the passed-in serial to the current
+        # serial in the database.  _oid2serial caches the oid -> current
+        # serial mapping for non-version data (if the current record for
+        # oid is version data, the oid is not a key in _oid2serial).
+        # The point is that otherwise seeking into the storage is needed
+        # to extract the current serial, and that's an expensive operation.
+        # For example, if a transaction stores 4000 objects, and each
+        # random seek + read takes 7ms (that was approximately true on
+        # Linux and Windows tests in mid-2003), that's 28 seconds just to
+        # find the old serials.
+        # XXX Probably better to junk this and redefine _index as mapping
+        # XXX oid to (offset, serialno) pair, via a new memory-efficient
+        # XXX BTree type.
+        self._oid2serial = oid2serial
+        # oid->serialno map to transactionally add to _oid2serial.
+        self._toid2serial = toid2serial
+        # Set of oids to transactionally delete from _oid2serial (e.g.,
+        # oids reverted by undo, or for which the most recent record
+        # becomes version data).
+        self._toid2serial_delete = toid2serial_delete
+
     def __len__(self):
         return len(self._index)
 
     def _newIndexes(self):
         # hook to use something other than builtin dict
-        return fsIndex(), {}, {}, {}
+        return fsIndex(), {}, {}, {}, {}, {}, {}
 
     _saved = 0
     def _save_index(self):
@@ -486,6 +518,31 @@
             # XXX should log the error, though
             pass # We don't care if this fails.
 
+    # Return serial number of most recent record for oid if that's in
+    # the _oid2serial cache.  Else return None.  It's important to use
+    # this instead of indexing _oid2serial directly so that cache
+    # statistics can be logged.
+    def _get_cached_serial(self, oid):
+        self._oid2serial_nlookups += 1
+        result = self._oid2serial.get(oid)
+        if result is not None:
+            self._oid2serial_nhits += 1
+
+        # Log a msg every ~8000 tries, and prevent overflow.
+        if self._oid2serial_nlookups & 0x1fff == 0:
+            if self._oid2serial_nlookups >> 30:
+                # In older Pythons, we may overflow if we keep it an int.
+                self._oid2serial_nlookups = long(self._oid2serial_nlookups)
+                self._oid2serial_nhits = long(self._oid2serial_nhits)
+            blather("_oid2serial size %s lookups %s hits %s rate %.1f%%",
+                    len(self._oid2serial),
+                    self._oid2serial_nlookups,
+                    self._oid2serial_nhits,
+                    100.0 * self._oid2serial_nhits /
+                            self._oid2serial_nlookups)
+
+        return result
+
     def abortVersion(self, src, transaction):
         return self.commitVersion(src, '', transaction, abort=1)
 
@@ -587,9 +644,11 @@
 
             spos = h[-8:]
             srcpos = U64(spos)
+        self._toid2serial_delete.update(current_oids)
         return oids
 
-    def getSize(self): return self._pos
+    def getSize(self):
+        return self._pos
 
     def _loada(self, oid, _index, file):
         "Read any version and return the version"
@@ -609,7 +668,8 @@
             version=''
             nv=0
 
-        if plen != z64: return read(U64(plen)), version, nv
+        if plen != z64:
+            return read(U64(plen)), version, nv
         return _loadBack(file, oid, read(8))[0], version, nv
 
     def _load(self, oid, version, _index, file):
@@ -629,6 +689,10 @@
                 (read(8) # skip past version link
                  and version != read(vlen))):
                 return _loadBack(file, oid, pnv)
+        else:
+            # The most recent record is for non-version data -- cache
+            # the serialno.
+            self._oid2serial[oid] = serial
 
         # If we get here, then either this was not a version record,
         # or we've already read past the version data!
@@ -707,19 +771,25 @@
         self._lock_acquire()
         try:
             old=self._index_get(oid, 0)
+            cached_serial = None
             pnv=None
             if old:
-                self._file.seek(old)
-                h=self._file.read(DATA_HDR_LEN)
-                doid,oserial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
-                if doid != oid: raise CorruptedDataError, h
-                if vlen:
-                    pnv=self._file.read(8) # non-version data pointer
-                    self._file.read(8) # skip past version link
-                    locked_version=self._file.read(vlen)
-                    if version != locked_version:
-                        raise POSException.VersionLockError, (
-                            `oid`, locked_version)
+                cached_serial = self._get_cached_serial(oid)
+                if cached_serial is None:
+                    self._file.seek(old)
+                    h=self._file.read(DATA_HDR_LEN)
+                    doid,oserial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
+                    if doid != oid:
+                        raise CorruptedDataError, h
+                    if vlen:
+                        pnv=self._file.read(8) # non-version data pointer
+                        self._file.read(8) # skip past version link
+                        locked_version=self._file.read(vlen)
+                        if version != locked_version:
+                            raise POSException.VersionLockError, (
+                                `oid`, locked_version)
+                else:
+                    oserial = cached_serial
 
                 if serial != oserial:
                     data=self.tryToResolveConflict(oid, oserial, serial, data)
@@ -741,14 +811,19 @@
                        )
                   )
             if version:
-                if pnv: write(pnv)
-                else:   write(p64(old))
+                if pnv:
+                    write(pnv)
+                else:
+                    write(p64(old))
                 # Link to last record for this version:
                 tvindex=self._tvindex
                 pv=tvindex.get(version, 0) or self._vindex_get(version, 0)
                 write(p64(pv))
                 tvindex[version]=here
                 write(version)
+                self._toid2serial_delete[oid] = 1
+            else:
+                self._toid2serial[oid] = newserial
 
             write(data)
 
@@ -861,7 +936,11 @@
                 self._tfile.write(p64(pv))
                 self._tvindex[version] = here
                 self._tfile.write(version)
-            # And finally, write the data or a backpointer
+                self._toid2serial_delete[oid] = 1
+            else:
+                self._toid2serial[oid] = serial
+
+            # Finally, write the data or a backpointer.
             if data is None:
                 if prev_pos:
                     self._tfile.write(p64(prev_pos))
@@ -926,6 +1005,8 @@
     def _clear_temp(self):
         self._tindex.clear()
         self._tvindex.clear()
+        self._toid2serial.clear()
+        self._toid2serial_delete.clear()
         if self._tfile is not None:
             self._tfile.seek(0)
 
@@ -1009,6 +1090,12 @@
 
             self._index.update(self._tindex)
             self._vindex.update(self._tvindex)
+            self._oid2serial.update(self._toid2serial)
+            for oid in self._toid2serial_delete.keys():
+                try:
+                    del self._oid2serial[oid]
+                except KeyError:
+                    pass
 
             # Update the number of records that we've written
             # +1 for the transaction record
@@ -1073,8 +1160,13 @@
             file.write('u')
             file.flush()
             self._index.update(t)
-            return t.keys()
-        finally: self._lock_release()
+            keys = t.keys()
+            for oid in keys:
+                if self._oid2serial.has_key(oid):
+                    del self._oid2serial[oid]
+            return keys
+        finally:
+            self._lock_release()
 
     def supportsTransactionalUndo(self):
         return 1
@@ -1125,18 +1217,26 @@
     def getSerial(self, oid):
         self._lock_acquire()
         try:
-            return self._getSerial(oid, self._index[oid])
+            result = self._get_cached_serial(oid)
+            if result is None:
+                result = self._getSerial(oid, self._index[oid])
+            return result
         finally:
             self._lock_release()
 
     def _getSerial(self, oid, pos):
         self._file.seek(pos)
         h = self._file.read(16)
-        assert oid == h[:8]
+        if len(h) < 16:
+            raise CorruptedDataError, h
+        if h[:8] != oid:
+            h = h + self._file.read(26) # get rest of header
+            raise CorruptedDataError, h
         return h[8:]
 
+
     def _transactionalUndoRecord(self, oid, pos, serial, pre, version):
-        """Get the indo information for a data record
+        """Get the undo information for a data record
 
         Return a 5-tuple consisting of a pickle, data pointer,
         version, packed non-version data pointer, and current
@@ -1265,6 +1365,10 @@
         tpos = self._txn_find(tid, 1)
         tindex = self._txn_undo_write(tpos, tid)
         self._tindex.update(tindex)
+        # Arrange to clear the affected oids from the oid2serial cache.
+        # It's too painful to try to update them to correct current
+        # values instead.
+        self._toid2serial_delete.update(tindex)
         return tindex.keys()
 
     def _txn_find(self, tid, stop_at_pack):
@@ -1518,7 +1622,9 @@
                 # OK, we're beyond the point of no return
                 os.rename(self._file_name + '.pack', self._file_name)
                 self._file = open(self._file_name, 'r+b')
-                self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
+                self._initIndex(p.index, p.vindex, p.tindex, p.tvindex,
+                                p.oid2serial, p.toid2serial,
+                                p.toid2serial_delete)
                 self._pos = opos
                 self._save_index()
             finally:
@@ -1544,18 +1650,9 @@
         if it is a new object -- return None.
         """
         try:
-            pos = self._index[oid]
+            return self.getSerial(oid)
         except KeyError:
             return None
-        self._file.seek(pos)
-        # first 8 bytes are oid, second 8 bytes are serialno
-        h = self._file.read(16)
-        if len(h) < 16:
-            raise CorruptedDataError, h
-        if h[:8] != oid:
-            h = h + self._file.read(26) # get rest of header
-            raise CorruptedDataError, h
-        return h[8:]
 
     def cleanup(self):
         """Remove all files created by this storage."""


=== ZODB3/ZODB/fspack.py 1.5.2.6 => 1.5.2.7 ===
--- ZODB3/ZODB/fspack.py:1.5.2.6	Thu Jul  3 11:34:51 2003
+++ ZODB3/ZODB/fspack.py	Fri Aug 22 16:56:08 2003
@@ -653,11 +653,15 @@
         # vindex: version -> pos of XXX
         # tindex: oid -> pos, for current txn
         # tvindex: version -> pos of XXX, for current txn
+        # oid2serial: not used by the packer
         
         self.index = fsIndex()
         self.vindex = {}
         self.tindex = {}
         self.tvindex = {}
+        self.oid2serial = {}
+        self.toid2serial = {}
+        self.toid2serial_delete = {}
 
         # Index for non-version data.  This is a temporary structure
         # to reduce I/O during packing




More information about the Zope-Checkins mailing list