[Zodb-checkins] CVS: ZODB3/ZODB - fspack.py:1.8.6.4
FileStorage.py:1.135.4.2
Jeremy Hylton
jeremy at zope.com
Tue Sep 9 00:09:20 EDT 2003
Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv12358/ZODB
Modified Files:
Tag: ZODB3-3_2-branch
fspack.py FileStorage.py
Log Message:
Port the oid cache from ZODB 3.1.
=== ZODB3/ZODB/fspack.py 1.8.6.3 => 1.8.6.4 ===
--- ZODB3/ZODB/fspack.py:1.8.6.3 Mon Sep 8 22:40:35 2003
+++ ZODB3/ZODB/fspack.py Mon Sep 8 23:09:18 2003
@@ -647,11 +647,15 @@
# vindex: version -> pos of XXX
# tindex: oid -> pos, for current txn
# tvindex: version -> pos of XXX, for current txn
+ # oid2serial: not used by the packer
self.index = fsIndex()
self.vindex = {}
self.tindex = {}
self.tvindex = {}
+ self.oid2serial = {}
+ self.toid2serial = {}
+ self.toid2serial_delete = {}
# Index for non-version data. This is a temporary structure
# to reduce I/O during packing
=== ZODB3/ZODB/FileStorage.py 1.135.4.1 => 1.135.4.2 ===
--- ZODB3/ZODB/FileStorage.py:1.135.4.1 Wed Sep 3 16:06:02 2003
+++ ZODB3/ZODB/FileStorage.py Mon Sep 8 23:09:18 2003
@@ -157,17 +157,21 @@
assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN
assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN
+def blather(message, *data):
+ LOG('ZODB FS', BLATHER, "%s blather: %s\n" % (packed_version,
+ message % data))
+
def warn(message, *data):
LOG('ZODB FS', WARNING, "%s warn: %s\n" % (packed_version,
- (message % data)))
+ message % data))
def error(message, *data):
LOG('ZODB FS', ERROR, "%s ERROR: %s\n" % (packed_version,
- (message % data)))
+ message % data))
def nearPanic(message, *data):
LOG('ZODB FS', PANIC, "%s ERROR: %s\n" % (packed_version,
- (message % data)))
+ message % data))
def panic(message, *data):
message = message % data
@@ -234,8 +238,10 @@
BaseStorage.BaseStorage.__init__(self, file_name)
- index, vindex, tindex, tvindex = self._newIndexes()
- self._initIndex(index, vindex, tindex, tvindex)
+ (index, vindex, tindex, tvindex,
+ oid2serial, toid2serial, toid2serial_delete) = self._newIndexes()
+ self._initIndex(index, vindex, tindex, tvindex,
+ oid2serial, toid2serial, toid2serial_delete)
# Now open the file
@@ -269,7 +275,8 @@
self._used_index = 1 # Marker for testing
index, vindex, start, maxoid, ltid = r
- self._initIndex(index, vindex, tindex, tvindex)
+ self._initIndex(index, vindex, tindex, tvindex,
+ oid2serial, toid2serial, toid2serial_delete)
self._pos, self._oid, tid = read_index(
self._file, file_name, index, vindex, tindex, stop,
ltid=ltid, start=start, maxoid=maxoid,
@@ -302,7 +309,11 @@
self._quota = quota
- def _initIndex(self, index, vindex, tindex, tvindex):
+ # Serialno cache statistics.
+ self._oid2serial_nlookups = self._oid2serial_nhits = 0
+
+ def _initIndex(self, index, vindex, tindex, tvindex,
+ oid2serial, toid2serial, toid2serial_delete):
self._index=index
self._vindex=vindex
self._tindex=tindex
@@ -310,12 +321,33 @@
self._index_get=index.get
self._vindex_get=vindex.get
+ # .store() needs to compare the passed-in serial to the current
+ # serial in the database. _oid2serial caches the oid -> current
+ # serial mapping for non-version data (if the current record for
+ # oid is version data, the oid is not a key in _oid2serial).
+ # The point is that otherwise seeking into the storage is needed
+ # to extract the current serial, and that's an expensive operation.
+ # For example, if a transaction stores 4000 objects, and each
+ # random seek + read takes 7ms (that was approximately true on
+ # Linux and Windows tests in mid-2003), that's 28 seconds just to
+ # find the old serials.
+ # XXX Probably better to junk this and redefine _index as mapping
+ # XXX oid to (offset, serialno) pair, via a new memory-efficient
+ # XXX BTree type.
+ self._oid2serial = oid2serial
+ # oid->serialno map to transactionally add to _oid2serial.
+ self._toid2serial = toid2serial
+ # Set of oids to transactionally delete from _oid2serial (e.g.,
+ # oids reverted by undo, or for which the most recent record
+ # becomes version data).
+ self._toid2serial_delete = toid2serial_delete
+
def __len__(self):
return len(self._index)
def _newIndexes(self):
# hook to use something other than builtin dict
- return fsIndex(), {}, {}, {}
+ return fsIndex(), {}, {}, {}, {}, {}, {}
_saved = 0
def _save_index(self):
@@ -483,6 +515,31 @@
# XXX should log the error, though
pass # We don't care if this fails.
+ # Return serial number of most recent record for oid if that's in
+ # the _oid2serial cache. Else return None. It's important to use
+ # this instead of indexing _oid2serial directly so that cache
+ # statistics can be logged.
+ def _get_cached_serial(self, oid):
+ self._oid2serial_nlookups += 1
+ result = self._oid2serial.get(oid)
+ if result is not None:
+ self._oid2serial_nhits += 1
+
+ # Log a msg every ~8000 tries, and prevent overflow.
+ if self._oid2serial_nlookups & 0x1fff == 0:
+ if self._oid2serial_nlookups >> 30:
+ # In older Pythons, we may overflow if we keep it an int.
+ self._oid2serial_nlookups = long(self._oid2serial_nlookups)
+ self._oid2serial_nhits = long(self._oid2serial_nhits)
+ blather("_oid2serial size %s lookups %s hits %s rate %.1f%%",
+ len(self._oid2serial),
+ self._oid2serial_nlookups,
+ self._oid2serial_nhits,
+ 100.0 * self._oid2serial_nhits /
+ self._oid2serial_nlookups)
+
+ return result
+
def abortVersion(self, src, transaction):
return self.commitVersion(src, '', transaction, abort=1)
@@ -585,9 +642,11 @@
spos = h[-8:]
srcpos = u64(spos)
+ self._toid2serial_delete.update(current_oids)
return oids
- def getSize(self): return self._pos
+ def getSize(self):
+ return self._pos
def _loada(self, oid, _index, file):
"Read any version and return the version"
@@ -632,6 +691,10 @@
(read(8) # skip past version link
and version != read(vlen))):
return _loadBack(file, oid, pnv)
+ else:
+ # The most recent record is for non-version data -- cache
+ # the serialno.
+ self._oid2serial[oid] = serial
# If we get here, then either this was not a version record,
# or we've already read past the version data!
@@ -713,20 +776,25 @@
self._lock_acquire()
try:
- old=self._index_get(oid, 0)
- pnv=None
+ old = self._index_get(oid, 0)
+ cached_serial = None
+ pnv = None
if old:
- self._file.seek(old)
- h=self._file.read(DATA_HDR_LEN)
- doid,oserial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
- if doid != oid: raise CorruptedDataError(h)
- if vlen:
- pnv=self._file.read(8) # non-version data pointer
- self._file.read(8) # skip past version link
- locked_version=self._file.read(vlen)
- if version != locked_version:
- raise POSException.VersionLockError, (
- `oid`, locked_version)
+ cached_serial = self._get_cached_serial(oid)
+ if cached_serial is None:
+ self._file.seek(old)
+ h=self._file.read(DATA_HDR_LEN)
+ doid,oserial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
+ if doid != oid: raise CorruptedDataError(h)
+ if vlen:
+ pnv=self._file.read(8) # non-version data pointer
+ self._file.read(8) # skip past version link
+ locked_version=self._file.read(vlen)
+ if version != locked_version:
+ raise POSException.VersionLockError, (
+ `oid`, locked_version)
+ else:
+ oserial = cached_serial
if serial != oserial:
data = self.tryToResolveConflict(oid, oserial, serial,
@@ -749,14 +817,19 @@
)
)
if version:
- if pnv: write(pnv)
- else: write(p64(old))
+ if pnv:
+ write(pnv)
+ else:
+ write(p64(old))
# Link to last record for this version:
tvindex=self._tvindex
pv=tvindex.get(version, 0) or self._vindex_get(version, 0)
write(p64(pv))
tvindex[version]=here
write(version)
+ self._toid2serial_delete[oid] = 1
+ else:
+ self._toid2serial[oid] = newserial
write(data)
@@ -875,7 +948,11 @@
self._tfile.write(p64(pv))
self._tvindex[version] = here
self._tfile.write(version)
- # And finally, write the data or a backpointer
+ self._toid2serial_delete[oid] = 1
+ else:
+ self._toid2serial[oid] = serial
+
+ # Finally, write the data or a backpointer.
if data is None:
if prev_pos:
self._tfile.write(p64(prev_pos))
@@ -940,6 +1017,8 @@
def _clear_temp(self):
self._tindex.clear()
self._tvindex.clear()
+ self._toid2serial.clear()
+ self._toid2serial_delete.clear()
if self._tfile is not None:
self._tfile.seek(0)
@@ -1023,6 +1102,12 @@
self._index.update(self._tindex)
self._vindex.update(self._tvindex)
+ self._oid2serial.update(self._toid2serial)
+ for oid in self._toid2serial_delete.keys():
+ try:
+ del self._oid2serial[oid]
+ except KeyError:
+ pass
# Update the number of records that we've written
# +1 for the transaction record
@@ -1090,21 +1175,28 @@
def getSerial(self, oid):
self._lock_acquire()
try:
- try:
- return self._getSerial(oid, self._index[oid])
- except KeyError:
- raise POSKeyError(oid)
- except TypeError:
- raise TypeError, 'invalid oid %r' % (oid,)
+ result = self._get_cached_serial(oid)
+ if result is None:
+ try:
+ result = self._getSerial(oid, self._index[oid])
+ except KeyError:
+ raise POSKeyError(oid)
+ except TypeError:
+ raise TypeError, 'invalid oid %r' % (oid,)
+ return result
finally:
self._lock_release()
def _getSerial(self, oid, pos):
self._file.seek(pos)
- h = self._file.read(DATA_HDR_LEN)
+ h = self._file.read(16)
+ if len(h) < 16:
+ raise CorruptedDataError(h)
+ h += self._file.read(26) # get rest of header
+ if h[:8] != oid:
+ raise CorruptedDataError(h)
oid2, serial, sprev, stloc, vlen, splen = unpack(DATA_HDR, h)
- assert oid == oid2
- if splen==z64:
+ if splen == z64:
# a back pointer
bp = self._file.read(8)
if bp == z64:
@@ -1243,6 +1335,10 @@
tpos = self._txn_find(tid, 1)
tindex = self._txn_undo_write(tpos)
self._tindex.update(tindex)
+ # Arrange to clear the affected oids from the oid2serial cache.
+ # It's too painful to try to update them to correct current
+ # values instead.
+ self._toid2serial_delete.update(tindex)
return tindex.keys()
def _txn_find(self, tid, stop_at_pack):
@@ -1500,7 +1596,9 @@
# OK, we're beyond the point of no return
os.rename(self._file_name + '.pack', self._file_name)
self._file = open(self._file_name, 'r+b')
- self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
+ self._initIndex(p.index, p.vindex, p.tindex, p.tvindex,
+ p.oid2serial, p.toid2serial,
+ p.toid2serial_delete)
self._pos = opos
self._save_index()
finally:
@@ -1526,20 +1624,9 @@
if it is a new object -- return None.
"""
try:
- pos = self._index[oid]
+ return self.getSerial(oid)
except KeyError:
return None
- except TypeError:
- raise TypeError, 'invalid oid %r' % (oid,)
- self._file.seek(pos)
- # first 8 bytes are oid, second 8 bytes are serialno
- h = self._file.read(16)
- if len(h) < 16:
- raise CorruptedDataError(h)
- if h[:8] != oid:
- h = h + self._file.read(26) # get rest of header
- raise CorruptedDataError(h)
- return h[8:]
def cleanup(self):
"""Remove all files created by this storage."""
More information about the Zodb-checkins
mailing list