[Zope-Checkins] CVS: ZODB3/ZODB - MappingStorage.py:1.11
DemoStorage.py:1.23 DB.py:1.58 Connection.py:1.105
BaseStorage.py:1.39
Jeremy Hylton
jeremy at zope.com
Wed Dec 24 11:02:32 EST 2003
Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv27465/ZODB
Modified Files:
MappingStorage.py DemoStorage.py DB.py Connection.py
BaseStorage.py
Log Message:
Merge MVCC branch to the HEAD.
=== ZODB3/ZODB/MappingStorage.py 1.10 => 1.11 ===
--- ZODB3/ZODB/MappingStorage.py:1.10 Fri Nov 28 11:44:49 2003
+++ ZODB3/ZODB/MappingStorage.py Wed Dec 24 11:02:00 2003
@@ -58,6 +58,16 @@
finally:
self._lock_release()
+ def loadEx(self, oid, version):
+ self._lock_acquire()
+ try:
+ # Since this storage doesn't support versions, tid and
+ # serial will always be the same.
+ p = self._index[oid]
+ return p[8:], p[:8], "" # pickle, serial, tid
+ finally:
+ self._lock_release()
+
def store(self, oid, serial, data, version, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
@@ -75,11 +85,10 @@
serials=(oserial, serial),
data=data)
- serial = self._serial
- self._tindex.append((oid, serial+data))
+ self._tindex.append((oid, self._tid + data))
finally:
self._lock_release()
- return serial
+ return self._tid
def _clear_temp(self):
self._tindex = []
@@ -87,7 +96,7 @@
def _finish(self, tid, user, desc, ext):
for oid, p in self._tindex:
self._index[oid] = p
- self._ltid = self._serial
+ self._ltid = self._tid
def lastTransaction(self):
return self._ltid
@@ -95,6 +104,8 @@
def pack(self, t, referencesf):
self._lock_acquire()
try:
+ if not self._index:
+ return
# Build an index of *only* those objects reachable from the root.
rootl = ['\0\0\0\0\0\0\0\0']
pindex = {}
=== ZODB3/ZODB/DemoStorage.py 1.22 => 1.23 ===
--- ZODB3/ZODB/DemoStorage.py:1.22 Fri Nov 28 11:44:49 2003
+++ ZODB3/ZODB/DemoStorage.py Wed Dec 24 11:02:00 2003
@@ -45,14 +45,12 @@
A record is a tuple:
- oid, serial, pre, vdata, p,
+ oid, pre, vdata, p, tid
where:
oid -- object id
- serial -- object serial number
-
pre -- The previous record for this object (or None)
vdata -- version data
@@ -62,6 +60,8 @@
p -- the pickle data or None
+ tid -- the transaction id that wrote the record
+
The pickle data will be None for a record for an object created in
an aborted version.
@@ -93,12 +93,13 @@
BaseStorage.BaseStorage.__init__(self, name, base)
# We use a BTree because the items are sorted!
- self._data=OOBTree.OOBTree()
- self._index={}
- self._vindex={}
- self._base=base
- self._size=0
- self._quota=quota
+ self._data = OOBTree.OOBTree()
+ self._index = {}
+ self._vindex = {}
+ self._base = base
+ self._size = 0
+ self._quota = quota
+ self._ltid = None
self._clear_temp()
if base is not None and base.versions():
raise POSException.StorageError, (
@@ -113,7 +114,7 @@
s=100
for tid, (p, u, d, e, t) in self._data.items():
s=s+16+24+12+4+16+len(u)+16+len(d)+16+len(e)+16
- for oid, serial, pre, vdata, p in t:
+ for oid, pre, vdata, p, tid in t:
s=s+16+24+24+4+4+(p and (16+len(p)) or 4)
if vdata: s=s+12+16+len(vdata[0])+4
@@ -139,16 +140,16 @@
oids = []
for r in v.values():
- oid, serial, pre, (version, nv), p = r
+ oid, pre, (version, nv), p, tid = r
oids.append(oid)
if nv:
- oid, serial, pre, vdata, p = nv
- self._tindex.append([oid, serial, r, None, p])
+ oid, pre, vdata, p, tid = nv
+ self._tindex.append([oid, r, None, p, self._tid])
else:
# effectively, delete the thing
- self._tindex.append([oid, None, r, None, None])
+ self._tindex.append([oid, r, None, None, self._tid])
- return oids
+ return self._tid, oids
finally: self._lock_release()
@@ -168,53 +169,60 @@
if v is None:
return
- newserial = self._serial
+ newserial = self._tid
tindex = self._tindex
oids = []
for r in v.values():
- oid, serial, pre, vdata, p = r
+ oid, pre, vdata, p, tid = r
assert vdata is not None
oids.append(oid)
if dest:
new_vdata = dest, vdata[1]
else:
new_vdata = None
- tindex.append([oid, newserial, r, new_vdata, p])
+ tindex.append([oid, r, new_vdata, p, self._tid])
- return oids
+ return self._tid, oids
finally:
self._lock_release()
- def load(self, oid, version):
+ def loadEx(self, oid, version):
self._lock_acquire()
try:
try:
- oid, serial, pre, vdata, p = self._index[oid]
+ oid, pre, vdata, p, tid = self._index[oid]
except KeyError:
if self._base:
return self._base.load(oid, '')
raise KeyError, oid
+ ver = ""
if vdata:
oversion, nv = vdata
if oversion != version:
if nv:
- oid, serial, pre, vdata, p = nv
+ # Return the current txn's tid with the non-version
+ # data.
+ oid, pre, vdata, p, skiptid = nv
else:
raise KeyError, oid
+ ver = oversion
if p is None:
raise KeyError, oid
- return p, serial
+ return p, tid, ver
finally: self._lock_release()
+ def load(self, oid, version):
+ return self.loadEx(oid, version)[:2]
+
def modifiedInVersion(self, oid):
self._lock_acquire()
try:
try:
- oid, serial, pre, vdata, p = self._index[oid]
+ oid, pre, vdata, p, tid = self._index[oid]
if vdata: return vdata[0]
return ''
except: return ''
@@ -231,15 +239,15 @@
# Hm, nothing here, check the base version:
if self._base:
try:
- p, oserial = self._base.load(oid, '')
+ p, tid = self._base.load(oid, '')
except KeyError:
pass
else:
- old = oid, oserial, None, None, p
+ old = oid, None, None, p, tid
nv=None
if old:
- oid, oserial, pre, vdata, p = old
+ oid, pre, vdata, p, tid = old
if vdata:
if vdata[0] != version:
@@ -249,12 +257,11 @@
else:
nv=old
- if serial != oserial:
+ if serial != tid:
raise POSException.ConflictError(
- oid=oid, serials=(oserial, serial), data=data)
+ oid=oid, serials=(tid, serial), data=data)
- serial=self._serial
- r=[oid, serial, old, version and (version, nv) or None, data]
+ r = [oid, old, version and (version, nv) or None, data, self._tid]
self._tindex.append(r)
s=self._tsize
@@ -268,15 +275,21 @@
has been exceeded.<br>Have a nice day.''')
finally: self._lock_release()
- return serial
+ return self._tid
- def supportsUndo(self): return 1
- def supportsVersions(self): return 1
+ def supportsUndo(self):
+ return 1
+
+ def supportsVersions(self):
+ return 1
def _clear_temp(self):
self._tindex = []
self._tsize = self._size + 160
+ def lastTransaction(self):
+ return self._ltid
+
def _begin(self, tid, u, d, e):
self._tsize = self._size + 120 + len(u) + len(d) + len(e)
@@ -285,11 +298,11 @@
self._data[tid] = None, user, desc, ext, tuple(self._tindex)
for r in self._tindex:
- oid, serial, pre, vdata, p = r
+ oid, pre, vdata, p, tid = r
old = self._index.get(oid)
# If the object had version data, remove the version data.
if old is not None:
- oldvdata = old[3]
+ oldvdata = old[2]
if oldvdata:
v = self._vindex[oldvdata[0]]
del v[oid]
@@ -306,6 +319,7 @@
if v is None:
v = self._vindex[version] = {}
v[oid] = r
+ self._ltid = self._tid
def undo(self, transaction_id):
self._lock_acquire()
@@ -324,7 +338,7 @@
oids=[]
for r in t:
- oid, serial, pre, vdata, p = r
+ oid, pre, vdata, p, tid = r
if pre:
index[oid] = pre
@@ -337,7 +351,7 @@
if v: del v[oid]
# Add new version data (from pre):
- oid, serial, prepre, vdata, p = pre
+ oid, prepre, vdata, p, tid = pre
if vdata:
version=vdata[0]
v=vindex.get(version, None)
@@ -404,17 +418,17 @@
def _build_indexes(self, stop='\377\377\377\377\377\377\377\377'):
# Rebuild index structures from transaction data
- index={}
- vindex={}
- _data=self._data
- for tid, (p, u, d, e, t) in _data.items():
- if tid >= stop: break
+ index = {}
+ vindex = {}
+ for tid, (p, u, d, e, t) in self._data.items():
+ if tid >= stop:
+ break
for r in t:
- oid, serial, pre, vdata, p = r
+ oid, pre, vdata, p, tid = r
old=index.get(oid, None)
if old is not None:
- oldvdata=old[3]
+ oldvdata=old[2]
if oldvdata:
v=vindex[oldvdata[0]]
del v[oid]
@@ -439,54 +453,56 @@
try:
stop=`TimeStamp(*time.gmtime(t)[:5]+(t%60,))`
- _data=self._data
# Build indexes up to the pack time:
index, vindex = self._build_indexes(stop)
# Now build an index of *only* those objects reachable
# from the root.
- rootl=['\0\0\0\0\0\0\0\0']
- pop=rootl.pop
- pindex={}
- referenced=pindex.has_key
+ rootl = ['\0\0\0\0\0\0\0\0']
+ pindex = {}
while rootl:
- oid=pop()
- if referenced(oid): continue
+ oid = rootl.pop()
+ if oid in pindex:
+ continue
# Scan non-version pickle for references
- r=index.get(oid, None)
+ r = index.get(oid, None)
if r is None:
if self._base:
p, s = self._base.load(oid, '')
referencesf(p, rootl)
else:
- pindex[oid]=r
- oid, serial, pre, vdata, p = r
+ pindex[oid] = r
+ oid, pre, vdata, p, tid = r
referencesf(p, rootl)
if vdata:
- nv=vdata[1]
+ nv = vdata[1]
if nv:
- oid, serial, pre, vdata, p = nv
+ oid, pre, vdata, p, tid = nv
referencesf(p, rootl)
# Now we're ready to do the actual packing.
# We'll simply edit the transaction data in place.
# We'll defer deleting transactions till the end
# to avoid messing up the BTree items.
- deleted=[]
- for tid, (p, u, d, e, t) in _data.items():
- if tid >= stop: break
- o=[]
- for r in t:
- c=pindex.get(r[0])
+ deleted = []
+ for tid, (p, u, d, e, records) in self._data.items():
+ if tid >= stop:
+ break
+ o = []
+ for r in records:
+ c = pindex.get(r[0])
if c is None:
# GC this record, no longer referenced
continue
- elif c is not r:
+ if c == r:
+ # This is the most recent revision.
+ o.append(r)
+ else:
# This record is not the indexed record,
# so it may not be current. Let's see.
- oid, serial, pre, vdata, p = r
+ vdata = r[3]
if vdata:
# Version record are current *only* if they
# are indexed
@@ -494,7 +510,7 @@
else:
# OK, this isn't a version record, so it may be the
# non-version record for the indexed record.
- oid, serial, pre, vdata, p = c
+ vdata = c[3]
if vdata:
if vdata[1] != r:
# This record is not the non-version
@@ -505,25 +521,25 @@
# so this record can not be the non-version
# record for it.
continue
- o.append(r)
+ o.append(r)
if o:
- if len(o) != len(t):
- _data[tid] = 1, u, d, e, tuple(o) # Reset data
+ if len(o) != len(records):
+ self._data[tid] = 1, u, d, e, tuple(o) # Reset data
else:
deleted.append(tid)
# Now delete empty transactions
for tid in deleted:
- del _data[tid]
+ del self._data[tid]
# Now reset previous pointers for "current" records:
for r in pindex.values():
- r[2] = None # Previous record
- if r[3] and r[3][1]: # vdata
+ r[1] = None # Previous record
+ if r[2] and r[2][1]: # vdata
# If this record contains version data and
# non-version data, then clear it out.
- r[3][1][2] = None
+ r[2][1][2] = None
# Finally, rebuild indexes from transaction data:
self._index, self._vindex = self._build_indexes()
@@ -541,21 +557,22 @@
for tid, (p, u, d, e, t) in self._data.items():
o.append(" %s %s" % (TimeStamp(tid), p))
for r in t:
- oid, serial, pre, vdata, p = r
- oid=utils.u64(oid)
- if serial is not None: serial=str(TimeStamp(serial))
+ oid, pre, vdata, p, tid = r
+ oid = utils.oid_repr(oid)
+ tid = utils.oid_repr(tid)
+## if serial is not None: serial=str(TimeStamp(serial))
pre=id(pre)
if vdata and vdata[1]: vdata=vdata[0], id(vdata[1])
if p: p=''
o.append(' %s: %s' %
- (id(r), `(oid, serial, pre, vdata, p)`))
+ (id(r), `(oid, pre, vdata, p, tid)`))
o.append('\nIndex:')
items=self._index.items()
items.sort()
for oid, r in items:
if r: r=id(r)
- o.append(' %s: %s' % (utils.u64(oid), r))
+ o.append(' %s: %s' % (utils.oid_repr(oid), r))
o.append('\nVersion Index:')
items=self._vindex.items()
@@ -566,7 +583,6 @@
vitems.sort()
for oid, r in vitems:
if r: r=id(r)
- o.append(' %s: %s' % (utils.u64(oid), r))
-
+ o.append(' %s: %s' % (utils.oid_repr(oid), r))
return string.join(o,'\n')
=== ZODB3/ZODB/DB.py 1.57 => 1.58 ===
--- ZODB3/ZODB/DB.py:1.57 Fri Nov 28 11:44:49 2003
+++ ZODB3/ZODB/DB.py Wed Dec 24 11:02:00 2003
@@ -74,7 +74,7 @@
self._version_cache_size=version_cache_size
self._version_cache_deactivate_after = version_cache_deactivate_after
- self._miv_cache={}
+ self._miv_cache = {}
# Setup storage
self._storage=storage
@@ -300,8 +300,7 @@
def importFile(self, file):
raise NotImplementedError
- def invalidate(self, oids, connection=None, version='',
- rc=sys.getrefcount):
+ def invalidate(self, tid, oids, connection=None, version=''):
"""Invalidate references to a given oid.
This is used to indicate that one of the connections has committed a
@@ -323,21 +322,21 @@
for cc in allocated:
if (cc is not connection and
(not version or cc._version==version)):
- if rc(cc) <= 3:
+ if sys.getrefcount(cc) <= 3:
cc.close()
- cc.invalidate(oids)
+ cc.invalidate(tid, oids)
- temps=self._temps
- if temps:
+ if self._temps:
t=[]
- for cc in temps:
- if rc(cc) > 3:
+ for cc in self._temps:
+ if sys.getrefcount(cc) > 3:
if (cc is not connection and
- (not version or cc._version==version)):
- cc.invalidate(oids)
+ (not version or cc._version == version)):
+ cc.invalidate(tid, oids)
t.append(cc)
- else: cc.close()
- self._temps=t
+ else:
+ cc.close()
+ self._temps = t
def modifiedInVersion(self, oid):
h=hash(oid)%131
@@ -353,7 +352,7 @@
return len(self._storage)
def open(self, version='', transaction=None, temporary=0, force=None,
- waitflag=1):
+ waitflag=1, mvcc=True):
"""Return a object space (AKA connection) to work in
The optional version argument can be used to specify that a
@@ -371,25 +370,25 @@
try:
if transaction is not None:
- connections=transaction._connections
+ connections = transaction._connections
if connections:
if connections.has_key(version) and not temporary:
return connections[version]
else:
- transaction._connections=connections={}
- transaction=transaction._connections
-
+ transaction._connections = connections = {}
+ transaction = transaction._connections
if temporary:
# This is a temporary connection.
# We won't bother with the pools. This will be
# a one-use connection.
- c=self.klass(
- version=version,
- cache_size=self._version_cache_size)
+ c = self.klass(version=version,
+ cache_size=self._version_cache_size,
+ mvcc=mvcc)
c._setDB(self)
self._temps.append(c)
- if transaction is not None: transaction[id(c)]=c
+ if transaction is not None:
+ transaction[id(c)] = c
return c
@@ -430,18 +429,18 @@
if not pool:
- c=None
+ c = None
if version:
if self._version_pool_size > len(allocated) or force:
- c=self.klass(
- version=version,
- cache_size=self._version_cache_size)
+ c = self.klass(version=version,
+ cache_size=self._version_cache_size,
+ mvcc=mvcc)
allocated.append(c)
pool.append(c)
elif self._pool_size > len(allocated) or force:
- c=self.klass(
- version=version,
- cache_size=self._cache_size)
+ c = self.klass(version=version,
+ cache_size=self._cache_size,
+ mvcc=mvcc)
allocated.append(c)
pool.append(c)
@@ -456,7 +455,7 @@
pool_lock.release()
else: return
- elif len(pool)==1:
+ elif len(pool) == 1:
# Taking last one, lock the pool
# Note that another thread might grab the lock
# before us, so we might actually block, however,
@@ -470,14 +469,15 @@
# but it could be higher due to a race condition.
pool_lock.release()
- c=pool[-1]
+ c = pool[-1]
del pool[-1]
c._setDB(self)
for pool, allocated in pooll:
for cc in pool:
cc._incrgc()
- if transaction is not None: transaction[version]=c
+ if transaction is not None:
+ transaction[version] = c
return c
finally: self._r()
@@ -588,7 +588,8 @@
d = {}
for oid in storage.undo(id):
d[oid] = 1
- self.invalidate(d)
+ # XXX I think we need to remove old undo to use mvcc
+ self.invalidate(None, d)
def versionEmpty(self, version):
return self._storage.versionEmpty(version)
@@ -616,13 +617,13 @@
def commit(self, reallyme, t):
dest=self._dest
- oids = self._db._storage.commitVersion(self._version, dest, t)
+ tid, oids = self._db._storage.commitVersion(self._version, dest, t)
oids = list2dict(oids)
- self._db.invalidate(oids, version=dest)
+ self._db.invalidate(tid, oids, version=dest)
if dest:
# the code above just invalidated the dest version.
# now we need to invalidate the source!
- self._db.invalidate(oids, version=self._version)
+ self._db.invalidate(tid, oids, version=self._version)
class AbortVersion(CommitVersion):
"""An object that will see to version abortion
@@ -631,9 +632,9 @@
"""
def commit(self, reallyme, t):
- version=self._version
- oids = self._db._storage.abortVersion(version, t)
- self._db.invalidate(list2dict(oids), version=version)
+ version = self._version
+ tid, oids = self._db._storage.abortVersion(version, t)
+ self._db.invalidate(tid, list2dict(oids), version=version)
class TransactionalUndo(CommitVersion):
@@ -647,5 +648,5 @@
# similarity of rhythm that I think it's justified.
def commit(self, reallyme, t):
- oids = self._db._storage.transactionalUndo(self._version, t)
- self._db.invalidate(list2dict(oids))
+ tid, oids = self._db._storage.transactionalUndo(self._version, t)
+ self._db.invalidate(tid, list2dict(oids))
=== ZODB3/ZODB/Connection.py 1.104 => 1.105 ===
--- ZODB3/ZODB/Connection.py:1.104 Wed Dec 10 15:02:15 2003
+++ ZODB3/ZODB/Connection.py Wed Dec 24 11:02:00 2003
@@ -15,9 +15,17 @@
$Id$"""
+import logging
import sys
import threading
from time import time
+from types import ClassType
+
+_marker = object()
+
+def myhasattr(obj, attr):
+ # builtin hasattr() swallows exceptions
+ return getattr(obj, attr, _marker) is not _marker
from persistent import PickleCache
from zLOG import LOG, ERROR, BLATHER, WARNING
@@ -56,16 +64,19 @@
The Connection manages movement of objects in and out of object storage.
"""
- _tmp=None
- _debug_info=()
- _opened=None
- _reset_counter = 0
+ _tmp = None
+ _debug_info = ()
+ _opened = None
+ _code_timestamp = 0
_transaction = None
def __init__(self, version='', cache_size=400,
- cache_deactivate_after=60):
+ cache_deactivate_after=60, mvcc=True):
"""Create a new Connection"""
- self._version=version
+
+ self._log = logging.getLogger("zodb.conn")
+
+ self._version = version
self._cache = cache = PickleCache(self, cache_size)
if version:
# Caches for versions end up empty if the version
@@ -97,6 +108,16 @@
self._invalidated = d = {}
self._invalid = d.has_key
self._conflicts = {}
+ self._noncurrent = {}
+
+ # If MVCC is enabled, then _mvcc is True and _txn_time stores
+ # the upper bound on transactions visible to this connection.
+ # That is, all object revisions must be written before _txn_time.
+ # If it is None, then the current revisions are acceptable.
+ # If the connection is in a version, mvcc will be disabled, because
+ # loadBefore() only returns non-version data.
+ self._mvcc = mvcc and not version
+ self._txn_time = None
def getTransaction(self):
t = self._transaction
@@ -216,11 +237,12 @@
# Call the close callbacks.
if self.__onCloseCallbacks is not None:
for f in self.__onCloseCallbacks:
- try: f()
- except:
- f=getattr(f, 'im_self', f)
- LOG('ZODB',ERROR, 'Close callback failed for %s' % f,
- error=sys.exc_info())
+ try:
+ f()
+ except: # except what?
+ f = getattr(f, 'im_self', f)
+ self._log.error("Close callback failed for %s", f,
+ sys.exc_info())
self.__onCloseCallbacks = None
self._storage = self._tmp = self.new_oid = self._opened = None
self._debug_info = ()
@@ -303,8 +325,8 @@
if tmp is None: return
src=self._storage
- LOG('ZODB', BLATHER,
- 'Commiting subtransaction of size %s' % src.getSize())
+ self._log.debug("Commiting subtransaction of size %s",
+ src.getSize())
self._storage=tmp
self._tmp=None
@@ -363,7 +385,7 @@
def isReadOnly(self):
return self._storage.isReadOnly()
- def invalidate(self, oids):
+ def invalidate(self, tid, oids):
"""Invalidate a set of oids.
This marks the oid as invalid, but doesn't actually invalidate
@@ -372,6 +394,8 @@
"""
self._inv_lock.acquire()
try:
+ if self._txn_time is None:
+ self._txn_time = tid
self._invalidated.update(oids)
finally:
self._inv_lock.release()
@@ -381,13 +405,15 @@
try:
self._cache.invalidate(self._invalidated)
self._invalidated.clear()
+ self._txn_time = None
finally:
self._inv_lock.release()
# Now is a good time to collect some garbage
self._cache.incrgc()
def modifiedInVersion(self, oid):
- try: return self._db.modifiedInVersion(oid)
+ try:
+ return self._db.modifiedInVersion(oid)
except KeyError:
return self._version
@@ -411,54 +437,94 @@
if self._storage is None:
msg = ("Shouldn't load state for %s "
"when the connection is closed" % oid_repr(oid))
- LOG('ZODB', ERROR, msg)
+ self._log.error(msg)
raise RuntimeError(msg)
try:
- # Avoid reading data from a transaction that committed
- # after the current transaction started, as that might
- # lead to mixing of cached data from earlier transactions
- # and new inconsistent data.
- #
- # Wait for check until after data is loaded from storage
- # to avoid time-of-check to time-of-use race.
- p, serial = self._storage.load(oid, self._version)
- self._load_count = self._load_count + 1
- invalid = self._is_invalidated(obj)
- self._reader.setGhostState(obj, p)
- obj._p_serial = serial
- if invalid:
- self._handle_independent(obj)
+ self._setstate(obj)
except ConflictError:
raise
except:
- LOG('ZODB', ERROR,
- "Couldn't load state for %s" % oid_repr(oid),
- error=sys.exc_info())
+ self._log.error("Couldn't load state for %s", oid_repr(oid),
+ exc_info=sys.exc_info())
raise
- def _is_invalidated(self, obj):
- # Helper method for setstate() covers three cases:
- # returns false if obj is valid
- # returns true if obj was invalidation, but is independent
- # otherwise, raises ConflictError for invalidated objects
+ def _setstate(self, obj):
+ # Helper for setstate(), which provides logging of failures.
+
+ # The control flow is complicated here to avoid loading an
+ # object revision that we are sure we aren't going to use. As
+ # a result, invalidation tests occur before and after the
+ # load. We can only be sure about invalidations after the
+ # load.
+
+ # If an object has been invalidated, there are several cases
+ # to consider:
+ # 1. Check _p_independent()
+ # 2. Try MVCC
+ # 3. Raise ConflictError.
+
+ # Does anything actually use _p_independent()? It would simplify
+ # the code if we could drop support for it.
+
+ # There is a harmless data race with self._invalidated. A
+ # dict update could go on in another thread, but we don't care
+ # because we have to check again after the load anyway.
+ if (obj._p_oid in self._invalidated
+ and not myhasattr(obj, "_p_independent")):
+ # If the object has _p_independent(), we will handle it below.
+ if not (self._mvcc and self._setstate_noncurrent(obj)):
+ self.getTransaction().register(obj)
+ self._conflicts[obj._p_oid] = 1
+ raise ReadConflictError(object=obj)
+
+ p, serial = self._storage.load(obj._p_oid, self._version)
+ self._load_count += 1
+
self._inv_lock.acquire()
try:
- if self._invalidated.has_key(obj._p_oid):
- # Defer _p_independent() call until state is loaded.
- ind = getattr(obj, "_p_independent", None)
- if ind is not None:
- # Defer _p_independent() call until state is loaded.
- return 1
- else:
- self.getTransaction().register(obj)
- self._conflicts[obj._p_oid] = 1
- raise ReadConflictError(object=obj)
- else:
- return 0
+ invalid = obj._p_oid in self._invalidated
finally:
self._inv_lock.release()
+ if invalid:
+ if myhasattr(obj, "_p_independent"):
+ # This call will raise a ReadConflictError if something
+ # goes wrong
+ self._handle_independent(obj)
+ elif not (self._mvcc and self._setstate_noncurrent(obj)):
+ self.getTransaction().register(obj)
+ self._conflicts[obj._p_oid] = 1
+ raise ReadConflictError(object=obj)
+
+ self._reader.setGhostState(obj, p)
+ obj._p_serial = serial
+
+ def _setstate_noncurrent(self, obj):
+ """Set state using non-current data.
+
+ Return True if state was available, False if not.
+ """
+ try:
+ # Load data that was current before the commit at txn_time.
+ t = self._storage.loadBefore(obj._p_oid, self._txn_time)
+ except KeyError:
+ return False
+ if t is None:
+ return False
+ data, start, end = t
+ # The non-current transaction must have been written before
+ # txn_time. It must be current at txn_time, but could have
+ # been modified at txn_time.
+
+ # It's possible that end is None, if, e.g., the most recent
+ # invalidation was for version data.
+ assert start < self._txn_time <= end, \
+ (U64(start), U64(self._txn_time), U64(end))
+ self._noncurrent[obj._p_oid] = True
+ self._reader.setGhostState(obj, data)
+ obj._p_serial = start
+
def _handle_independent(self, obj):
# Helper method for setstate() handles possibly independent objects
# Call _p_independent(), if it returns True, setstate() wins.
@@ -499,7 +565,7 @@
obj._p_changed = 0
obj._p_serial = serial
except:
- LOG('ZODB',ERROR, 'setklassstate failed', error=sys.exc_info())
+ self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
def tpc_abort(self, transaction):
@@ -590,11 +656,11 @@
self._storage._creating[:0]=self._creating
del self._creating[:]
else:
- def callback():
+ def callback(tid):
d = {}
for oid in self._modified:
d[oid] = 1
- self._db.invalidate(d, self)
+ self._db.invalidate(tid, d, self)
self._storage.tpc_finish(transaction, callback)
self._conflicts.clear()
=== ZODB3/ZODB/BaseStorage.py 1.38 => 1.39 ===
--- ZODB3/ZODB/BaseStorage.py:1.38 Tue Dec 23 09:37:13 2003
+++ ZODB3/ZODB/BaseStorage.py Wed Dec 24 11:02:00 2003
@@ -32,7 +32,6 @@
class BaseStorage(UndoLogCompatible):
_transaction=None # Transaction that is being committed
- _serial=z64 # Transaction serial number
_tstatus=' ' # Transaction status, used for copying data
_is_read_only = 0
@@ -51,7 +50,7 @@
t=time.time()
t=self._ts=apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
- self._serial=`t`
+ self._tid = `t`
if base is None:
self._oid='\0\0\0\0\0\0\0\0'
else:
@@ -60,16 +59,19 @@
def abortVersion(self, src, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
- return []
+ return self._tid, []
def commitVersion(self, src, dest, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
- return []
+ return self._tid, []
def close(self):
pass
+ def cleanup(self):
+ pass
+
def sortKey(self):
"""Return a string that can be used to sort storage instances.
@@ -85,7 +87,7 @@
def getSize(self):
return len(self)*300 # WAG!
- def history(self, oid, version, length=1):
+ def history(self, oid, version, length=1, filter=None):
pass
def modifiedInVersion(self, oid):
@@ -167,13 +169,13 @@
now = time.time()
t = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
self._ts = t = t.laterThan(self._ts)
- self._serial = `t`
+ self._tid = `t`
else:
self._ts = TimeStamp(tid)
- self._serial = tid
+ self._tid = tid
self._tstatus = status
- self._begin(self._serial, user, desc, ext)
+ self._begin(self._tid, user, desc, ext)
finally:
self._lock_release()
@@ -203,10 +205,11 @@
return
try:
if f is not None:
- f()
+ f(self._tid)
u, d, e = self._ude
- self._finish(self._serial, u, d, e)
+ self._finish(self._tid, u, d, e)
self._clear_temp()
+ return self._tid
finally:
self._ude = None
self._transaction = None
@@ -250,6 +253,48 @@
raise POSException.Unsupported, (
"Retrieval of historical revisions is not supported")
+ def loadBefore(self, oid, tid):
+ """Return most recent revision of oid before tid committed."""
+
+ # XXX Is it okay for loadBefore() to return current data?
+ # There doesn't seem to be a good reason to forbid it, even
+ # though the typical use of this method will never find
+ # current data. But maybe we should call it loadByTid()?
+
+ n = 2
+ start_time = None
+ end_time = None
+ while start_time is None:
+ # The history() approach is a hack, because the dict
+ # returned by history() doesn't contain a tid. It
+ # contains a serialno, which is often the same, but isn't
+ # required to be. We'll pretend it is for now.
+
+ # A second problem is that history() doesn't say anything
+ # about whether the transaction status. If it falls before
+ # the pack time, we can't honor the MVCC request.
+
+ # Note: history() returns the most recent record first.
+
+ # XXX The filter argument to history() only appears to be
+ # supported by FileStorage. Perhaps it shouldn't be used.
+ L = self.history(oid, "", n, lambda d: not d["version"])
+ if not L:
+ return
+ for d in L:
+ if d["serial"] < tid:
+ start_time = d["serial"]
+ break
+ else:
+ end_time = d["serial"]
+ if len(L) < n:
+ break
+ n *= 2
+ if start_time is None:
+ return None
+ data = self.loadSerial(oid, start_time)
+ return data, start_time, end_time
+
def getExtensionMethods(self):
"""getExtensionMethods
@@ -314,7 +359,7 @@
oid=r.oid
if verbose: print oid_repr(oid), r.version, len(r.data)
if restoring:
- self.restore(oid, r.serial, r.data, r.version,
+ self.restore(oid, r.tid, r.data, r.version,
r.data_txn, transaction)
else:
pre=preget(oid, None)
More information about the Zope-Checkins
mailing list