[Zodb-checkins] CVS: ZODB3/ZODB - fspack.py:1.8.8.3
config.py:1.13.4.1 FileStorage.py:1.135.6.2 DB.py:1.53.2.1
Connection.py:1.98.4.1 BaseStorage.py:1.34.4.1
Jeremy Hylton
jeremy at zope.com
Mon Sep 15 14:03:44 EDT 2003
Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv13599/ZODB
Modified Files:
Tag: Zope-2_7-branch
fspack.py config.py FileStorage.py DB.py Connection.py
BaseStorage.py
Log Message:
Take two: Merge changes from ZODB3-3_2-branch to Zope-2_7-branch.
Please make all future changes on the Zope-2_7-branch instead.
The previous attempt used "cvs up -j ZODB3-3_2-branch", but appeared
to get only a small fraction of the changes. This attempt is based on
copying a checkout of ZODB3-3_2-branch over top of a checkout of
Zope-2_7-branch.
=== ZODB3/ZODB/fspack.py 1.8.8.2 => 1.8.8.3 ===
--- ZODB3/ZODB/fspack.py:1.8.8.2 Wed Sep 3 16:03:41 2003
+++ ZODB3/ZODB/fspack.py Mon Sep 15 14:02:57 2003
@@ -33,7 +33,7 @@
from types import StringType
from ZODB.referencesf import referencesf
-from ZODB.utils import p64, u64, z64
+from ZODB.utils import p64, u64, z64, oid_repr
from zLOG import LOG, BLATHER, WARNING, ERROR, PANIC
try:
@@ -54,7 +54,7 @@
def __str__(self):
if self.oid:
- msg = "Error reading oid %s. Found %r" % (_fmt_oid(self.oid),
+ msg = "Error reading oid %s. Found %r" % (oid_repr(self.oid),
self.buf)
else:
msg = "Error reading unknown oid. Found %r" % self.buf
@@ -166,7 +166,7 @@
def checkTxn(self, th, pos):
if th.tid <= self.ltid:
self.fail(pos, "time-stamp reduction: %s <= %s",
- _fmt_oid(th.tid), _fmt_oid(self.ltid))
+ oid_repr(th.tid), oid_repr(self.ltid))
self.ltid = th.tid
if th.status == "c":
self.fail(pos, "transaction with checkpoint flag set")
@@ -647,11 +647,15 @@
# vindex: version -> pos of XXX
# tindex: oid -> pos, for current txn
# tvindex: version -> pos of XXX, for current txn
+ # oid2serial: not used by the packer
self.index = fsIndex()
self.vindex = {}
self.tindex = {}
self.tvindex = {}
+ self.oid2serial = {}
+ self.toid2serial = {}
+ self.toid2serial_delete = {}
# Index for non-version data. This is a temporary structure
# to reduce I/O during packing
=== ZODB3/ZODB/config.py 1.13 => 1.13.4.1 ===
--- ZODB3/ZODB/config.py:1.13 Mon Jun 16 10:51:49 2003
+++ ZODB3/ZODB/config.py Mon Sep 15 14:02:58 2003
@@ -157,7 +157,7 @@
if name.startswith('_'):
continue
setattr(bconf, name, getattr(self.config, name))
- return storageclass(self.config.name, config=bconf)
+ return storageclass(self.config.envdir, config=bconf)
class BDBMinimalStorage(BDBStorage):
=== ZODB3/ZODB/FileStorage.py 1.135.6.1 => 1.135.6.2 ===
--- ZODB3/ZODB/FileStorage.py:1.135.6.1 Mon Jul 21 12:37:18 2003
+++ ZODB3/ZODB/FileStorage.py Mon Sep 15 14:02:58 2003
@@ -157,17 +157,21 @@
assert struct.calcsize(TRANS_HDR) == TRANS_HDR_LEN
assert struct.calcsize(DATA_HDR) == DATA_HDR_LEN
+def blather(message, *data):
+ LOG('ZODB FS', BLATHER, "%s blather: %s\n" % (packed_version,
+ message % data))
+
def warn(message, *data):
LOG('ZODB FS', WARNING, "%s warn: %s\n" % (packed_version,
- (message % data)))
+ message % data))
def error(message, *data):
LOG('ZODB FS', ERROR, "%s ERROR: %s\n" % (packed_version,
- (message % data)))
+ message % data))
def nearPanic(message, *data):
LOG('ZODB FS', PANIC, "%s ERROR: %s\n" % (packed_version,
- (message % data)))
+ message % data))
def panic(message, *data):
message = message % data
@@ -234,8 +238,10 @@
BaseStorage.BaseStorage.__init__(self, file_name)
- index, vindex, tindex, tvindex = self._newIndexes()
- self._initIndex(index, vindex, tindex, tvindex)
+ (index, vindex, tindex, tvindex,
+ oid2serial, toid2serial, toid2serial_delete) = self._newIndexes()
+ self._initIndex(index, vindex, tindex, tvindex,
+ oid2serial, toid2serial, toid2serial_delete)
# Now open the file
@@ -269,7 +275,8 @@
self._used_index = 1 # Marker for testing
index, vindex, start, maxoid, ltid = r
- self._initIndex(index, vindex, tindex, tvindex)
+ self._initIndex(index, vindex, tindex, tvindex,
+ oid2serial, toid2serial, toid2serial_delete)
self._pos, self._oid, tid = read_index(
self._file, file_name, index, vindex, tindex, stop,
ltid=ltid, start=start, maxoid=maxoid,
@@ -302,7 +309,11 @@
self._quota = quota
- def _initIndex(self, index, vindex, tindex, tvindex):
+ # Serialno cache statistics.
+ self._oid2serial_nlookups = self._oid2serial_nhits = 0
+
+ def _initIndex(self, index, vindex, tindex, tvindex,
+ oid2serial, toid2serial, toid2serial_delete):
self._index=index
self._vindex=vindex
self._tindex=tindex
@@ -310,12 +321,33 @@
self._index_get=index.get
self._vindex_get=vindex.get
+ # .store() needs to compare the passed-in serial to the current
+ # serial in the database. _oid2serial caches the oid -> current
+ # serial mapping for non-version data (if the current record for
+ # oid is version data, the oid is not a key in _oid2serial).
+ # The point is that otherwise seeking into the storage is needed
+ # to extract the current serial, and that's an expensive operation.
+ # For example, if a transaction stores 4000 objects, and each
+ # random seek + read takes 7ms (that was approximately true on
+ # Linux and Windows tests in mid-2003), that's 28 seconds just to
+ # find the old serials.
+ # XXX Probably better to junk this and redefine _index as mapping
+ # XXX oid to (offset, serialno) pair, via a new memory-efficient
+ # XXX BTree type.
+ self._oid2serial = oid2serial
+ # oid->serialno map to transactionally add to _oid2serial.
+ self._toid2serial = toid2serial
+ # Set of oids to transactionally delete from _oid2serial (e.g.,
+ # oids reverted by undo, or for which the most recent record
+ # becomes version data).
+ self._toid2serial_delete = toid2serial_delete
+
def __len__(self):
return len(self._index)
def _newIndexes(self):
# hook to use something other than builtin dict
- return fsIndex(), {}, {}, {}
+ return fsIndex(), {}, {}, {}, {}, {}, {}
_saved = 0
def _save_index(self):
@@ -483,6 +515,31 @@
# XXX should log the error, though
pass # We don't care if this fails.
+ # Return serial number of most recent record for oid if that's in
+ # the _oid2serial cache. Else return None. It's important to use
+ # this instead of indexing _oid2serial directly so that cache
+ # statistics can be logged.
+ def _get_cached_serial(self, oid):
+ self._oid2serial_nlookups += 1
+ result = self._oid2serial.get(oid)
+ if result is not None:
+ self._oid2serial_nhits += 1
+
+ # Log a msg every ~8000 tries, and prevent overflow.
+ if self._oid2serial_nlookups & 0x1fff == 0:
+ if self._oid2serial_nlookups >> 30:
+ # In older Pythons, we may overflow if we keep it an int.
+ self._oid2serial_nlookups = long(self._oid2serial_nlookups)
+ self._oid2serial_nhits = long(self._oid2serial_nhits)
+ blather("_oid2serial size %s lookups %s hits %s rate %.1f%%",
+ len(self._oid2serial),
+ self._oid2serial_nlookups,
+ self._oid2serial_nhits,
+ 100.0 * self._oid2serial_nhits /
+ self._oid2serial_nlookups)
+
+ return result
+
def abortVersion(self, src, transaction):
return self.commitVersion(src, '', transaction, abort=1)
@@ -585,33 +642,11 @@
spos = h[-8:]
srcpos = u64(spos)
+ self._toid2serial_delete.update(current_oids)
return oids
- def getSize(self): return self._pos
-
- def _loada(self, oid, _index, file):
- "Read any version and return the version"
- try:
- pos=_index[oid]
- except KeyError:
- raise POSKeyError(oid)
- except TypeError:
- raise TypeError, 'invalid oid %r' % (oid,)
- file.seek(pos)
- read=file.read
- h=read(DATA_HDR_LEN)
- doid,serial,prev,tloc,vlen,plen = unpack(DATA_HDR, h)
- if vlen:
- nv = u64(read(8))
- read(8) # Skip previous version record pointer
- version = read(vlen)
- else:
- version = ''
- nv = 0
-
- if plen != z64:
- return read(u64(plen)), version, nv
- return _loadBack(file, oid, read(8))[0], version, nv
+ def getSize(self):
+ return self._pos
def _load(self, oid, version, _index, file):
try:
@@ -632,6 +667,10 @@
(read(8) # skip past version link
and version != read(vlen))):
return _loadBack(file, oid, pnv)
+ else:
+ # The most recent record is for non-version data -- cache
+ # the serialno.
+ self._oid2serial[oid] = serial
# If we get here, then either this was not a version record,
# or we've already read past the version data!
@@ -713,20 +752,25 @@
self._lock_acquire()
try:
- old=self._index_get(oid, 0)
- pnv=None
+ old = self._index_get(oid, 0)
+ cached_serial = None
+ pnv = None
if old:
- self._file.seek(old)
- h=self._file.read(DATA_HDR_LEN)
- doid,oserial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
- if doid != oid: raise CorruptedDataError(h)
- if vlen:
- pnv=self._file.read(8) # non-version data pointer
- self._file.read(8) # skip past version link
- locked_version=self._file.read(vlen)
- if version != locked_version:
- raise POSException.VersionLockError, (
- `oid`, locked_version)
+ cached_serial = self._get_cached_serial(oid)
+ if cached_serial is None:
+ self._file.seek(old)
+ h=self._file.read(DATA_HDR_LEN)
+ doid,oserial,sprev,stloc,vlen,splen = unpack(DATA_HDR, h)
+ if doid != oid: raise CorruptedDataError(h)
+ if vlen:
+ pnv=self._file.read(8) # non-version data pointer
+ self._file.read(8) # skip past version link
+ locked_version=self._file.read(vlen)
+ if version != locked_version:
+ raise POSException.VersionLockError, (
+ `oid`, locked_version)
+ else:
+ oserial = cached_serial
if serial != oserial:
data = self.tryToResolveConflict(oid, oserial, serial,
@@ -749,14 +793,19 @@
)
)
if version:
- if pnv: write(pnv)
- else: write(p64(old))
+ if pnv:
+ write(pnv)
+ else:
+ write(p64(old))
# Link to last record for this version:
tvindex=self._tvindex
pv=tvindex.get(version, 0) or self._vindex_get(version, 0)
write(p64(pv))
tvindex[version]=here
write(version)
+ self._toid2serial_delete[oid] = 1
+ else:
+ self._toid2serial[oid] = newserial
write(data)
@@ -875,7 +924,11 @@
self._tfile.write(p64(pv))
self._tvindex[version] = here
self._tfile.write(version)
- # And finally, write the data or a backpointer
+ self._toid2serial_delete[oid] = 1
+ else:
+ self._toid2serial[oid] = serial
+
+ # Finally, write the data or a backpointer.
if data is None:
if prev_pos:
self._tfile.write(p64(prev_pos))
@@ -940,6 +993,8 @@
def _clear_temp(self):
self._tindex.clear()
self._tvindex.clear()
+ self._toid2serial.clear()
+ self._toid2serial_delete.clear()
if self._tfile is not None:
self._tfile.seek(0)
@@ -1023,6 +1078,12 @@
self._index.update(self._tindex)
self._vindex.update(self._tvindex)
+ self._oid2serial.update(self._toid2serial)
+ for oid in self._toid2serial_delete.keys():
+ try:
+ del self._oid2serial[oid]
+ except KeyError:
+ pass
# Update the number of records that we've written
# +1 for the transaction record
@@ -1090,21 +1151,28 @@
def getSerial(self, oid):
self._lock_acquire()
try:
- try:
- return self._getSerial(oid, self._index[oid])
- except KeyError:
- raise POSKeyError(oid)
- except TypeError:
- raise TypeError, 'invalid oid %r' % (oid,)
+ result = self._get_cached_serial(oid)
+ if result is None:
+ try:
+ result = self._getSerial(oid, self._index[oid])
+ except KeyError:
+ raise POSKeyError(oid)
+ except TypeError:
+ raise TypeError, 'invalid oid %r' % (oid,)
+ return result
finally:
self._lock_release()
def _getSerial(self, oid, pos):
self._file.seek(pos)
- h = self._file.read(DATA_HDR_LEN)
+ h = self._file.read(16)
+ if len(h) < 16:
+ raise CorruptedDataError(h)
+ h += self._file.read(26) # get rest of header
+ if h[:8] != oid:
+ raise CorruptedDataError(h)
oid2, serial, sprev, stloc, vlen, splen = unpack(DATA_HDR, h)
- assert oid == oid2
- if splen==z64:
+ if splen == z64:
# a back pointer
bp = self._file.read(8)
if bp == z64:
@@ -1243,6 +1311,10 @@
tpos = self._txn_find(tid, 1)
tindex = self._txn_undo_write(tpos)
self._tindex.update(tindex)
+ # Arrange to clear the affected oids from the oid2serial cache.
+ # It's too painful to try to update them to correct current
+ # values instead.
+ self._toid2serial_delete.update(tindex)
return tindex.keys()
def _txn_find(self, tid, stop_at_pack):
@@ -1500,7 +1572,9 @@
# OK, we're beyond the point of no return
os.rename(self._file_name + '.pack', self._file_name)
self._file = open(self._file_name, 'r+b')
- self._initIndex(p.index, p.vindex, p.tindex, p.tvindex)
+ self._initIndex(p.index, p.vindex, p.tindex, p.tvindex,
+ p.oid2serial, p.toid2serial,
+ p.toid2serial_delete)
self._pos = opos
self._save_index()
finally:
@@ -1526,20 +1600,9 @@
if it is a new object -- return None.
"""
try:
- pos = self._index[oid]
+ return self.getSerial(oid)
except KeyError:
return None
- except TypeError:
- raise TypeError, 'invalid oid %r' % (oid,)
- self._file.seek(pos)
- # first 8 bytes are oid, second 8 bytes are serialno
- h = self._file.read(16)
- if len(h) < 16:
- raise CorruptedDataError(h)
- if h[:8] != oid:
- h = h + self._file.read(26) # get rest of header
- raise CorruptedDataError(h)
- return h[8:]
def cleanup(self):
"""Remove all files created by this storage."""
=== ZODB3/ZODB/DB.py 1.53 => 1.53.2.1 ===
--- ZODB3/ZODB/DB.py:1.53 Tue Jun 24 17:50:18 2003
+++ ZODB3/ZODB/DB.py Mon Sep 15 14:02:58 2003
@@ -32,7 +32,7 @@
d[elt] = 1
return d
-class DB(UndoLogCompatible.UndoLogCompatible):
+class DB(UndoLogCompatible.UndoLogCompatible, object):
"""The Object Database
The Object database coordinates access to and interaction of one
=== ZODB3/ZODB/Connection.py 1.98 => 1.98.4.1 ===
--- ZODB3/ZODB/Connection.py:1.98 Fri Jun 13 17:53:08 2003
+++ ZODB3/ZODB/Connection.py Mon Sep 15 14:02:58 2003
@@ -47,7 +47,7 @@
ExtensionKlass = Base.__class__
-class Connection(ExportImport.ExportImport):
+class Connection(ExportImport.ExportImport, object):
"""Object managers for individual object space.
An object space is a version of collection of objects. In a
@@ -136,11 +136,10 @@
# Explicitly remove references from the connection to its
# cache and to the root object, because they refer back to the
# connection.
- self._cache.clear()
- self._cache = None
+ if self._cache is not None:
+ self._cache.clear()
self._incrgc = None
self.cacheGC = None
- self._root_ = None
def __getitem__(self, oid, tt=type(())):
obj = self._cache.get(oid, None)
@@ -176,8 +175,6 @@
object._p_serial=serial
self._cache[oid] = object
- if oid=='\0\0\0\0\0\0\0\0':
- self._root_=object # keep a ref
return object
def _persistent_load(self,oid,
@@ -279,7 +276,8 @@
self.__onCloseCallbacks.append(f)
def close(self):
- self._incrgc() # This is a good time to do some GC
+ if self._incrgc is not None:
+ self._incrgc() # This is a good time to do some GC
# Call the close callbacks.
if self.__onCloseCallbacks is not None:
=== ZODB3/ZODB/BaseStorage.py 1.34 => 1.34.4.1 ===
--- ZODB3/ZODB/BaseStorage.py:1.34 Tue Jun 10 11:46:31 2003
+++ ZODB3/ZODB/BaseStorage.py Mon Sep 15 14:02:58 2003
@@ -16,21 +16,26 @@
$Id$
"""
import cPickle
-import ThreadLock, bpthread
-import time, UndoLogCompatible
-import POSException
-from TimeStamp import TimeStamp
-z64='\0'*8
+import time
-class BaseStorage(UndoLogCompatible.UndoLogCompatible):
+import ThreadLock
+import zLOG
+from ZODB import bpthread
+from ZODB import POSException
+from ZODB.TimeStamp import TimeStamp
+from ZODB.UndoLogCompatible import UndoLogCompatible
+from ZODB.utils import z64
+
+class BaseStorage(UndoLogCompatible):
_transaction=None # Transaction that is being committed
_serial=z64 # Transaction serial number
_tstatus=' ' # Transaction status, used for copying data
_is_read_only = 0
def __init__(self, name, base=None):
-
- self.__name__=name
+ self.__name__= name
+ zLOG.LOG(self.__class__.__name__, zLOG.DEBUG,
+ "create storage %s" % self.__name__)
# Allocate locks:
l=ThreadLock.allocate_lock()
More information about the Zodb-checkins
mailing list