[Zodb-checkins] CVS: ZODB3/ZODB - FileStorage.py:1.139.2.1
DemoStorage.py:1.21.2.1 DB.py:1.55.2.1
Connection.py:1.100.2.1 BaseStorage.py:1.36.2.1
Jeremy Hylton
jeremy at zope.com
Tue Oct 7 01:11:04 EDT 2003
Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv28710/ZODB
Modified Files:
Tag: ZODB3-mvcc-2-branch
FileStorage.py DemoStorage.py DB.py Connection.py
BaseStorage.py
Log Message:
Merge changes from the ZODB3-mvcc-branch.
This new branch is relative to the head, instead of the
Zope-2_7-branch.
=== ZODB3/ZODB/FileStorage.py 1.139 => 1.139.2.1 ===
--- ZODB3/ZODB/FileStorage.py:1.139 Thu Oct 2 18:14:04 2003
+++ ZODB3/ZODB/FileStorage.py Tue Oct 7 01:10:31 2003
@@ -643,7 +643,7 @@
spos = h[-8:]
srcpos = u64(spos)
self._toid2serial_delete.update(current_oids)
- return oids
+ return self._serial, oids
def getSize(self):
return self._pos
@@ -1315,7 +1315,7 @@
# It's too painful to try to update them to correct current
# values instead.
self._toid2serial_delete.update(tindex)
- return tindex.keys()
+ return self._serial, tindex.keys()
def _txn_find(self, tid, stop_at_pack):
pos = self._pos
@@ -1460,9 +1460,6 @@
self._lock_acquire()
try:
r=[]
- file=self._file
- seek=file.seek
- read=file.read
try:
pos=self._index[oid]
except KeyError:
@@ -1473,14 +1470,14 @@
while 1:
if len(r) >= size: return r
- seek(pos)
- h=read(DATA_HDR_LEN)
+ self._file.seek(pos)
+ h=self._file.read(DATA_HDR_LEN)
doid,serial,prev,tloc,vlen,plen = unpack(DATA_HDR, h)
prev=u64(prev)
if vlen:
- read(16)
- version=read(vlen)
+ self._file.read(16)
+ version = self._file.read(vlen)
if wantver is not None and version != wantver:
if prev:
pos=prev
@@ -1491,13 +1488,15 @@
version=''
wantver=None
- seek(u64(tloc))
- h=read(TRANS_HDR_LEN)
+ self._file.seek(u64(tloc))
+ h = self._file.read(TRANS_HDR_LEN)
tid, stl, status, ul, dl, el = unpack(TRANS_HDR,h)
- user_name=read(ul)
- description=read(dl)
- if el: d=loads(read(el))
- else: d={}
+ user_name = self._file.read(ul)
+ description = self._file.read(dl)
+ if el:
+ d=loads(self._file.read(el))
+ else:
+ d={}
d['time']=TimeStamp(serial).timeTime()
d['user_name']=user_name
=== ZODB3/ZODB/DemoStorage.py 1.21 => 1.21.2.1 ===
--- ZODB3/ZODB/DemoStorage.py:1.21 Thu Oct 2 14:17:19 2003
+++ ZODB3/ZODB/DemoStorage.py Tue Oct 7 01:10:31 2003
@@ -148,7 +148,7 @@
# effectively, delete the thing
self._tindex.append([oid, None, r, None, None])
- return oids
+ return self._serial, oids
finally: self._lock_release()
@@ -181,7 +181,7 @@
tindex.append([oid, newserial, r, new_vdata, p])
- return oids
+ return self._serial, oids
finally: self._lock_release()
=== ZODB3/ZODB/DB.py 1.55 => 1.55.2.1 ===
--- ZODB3/ZODB/DB.py:1.55 Thu Oct 2 14:17:19 2003
+++ ZODB3/ZODB/DB.py Tue Oct 7 01:10:32 2003
@@ -74,7 +74,7 @@
self._version_cache_size=version_cache_size
self._version_cache_deactivate_after = version_cache_deactivate_after
- self._miv_cache={}
+ self._miv_cache = {}
# Setup storage
self._storage=storage
@@ -303,8 +303,7 @@
def importFile(self, file):
raise 'Not yet implemented'
- def invalidate(self, oids, connection=None, version='',
- rc=sys.getrefcount):
+ def invalidate(self, tid, oids, connection=None, version=''):
"""Invalidate references to a given oid.
This is used to indicate that one of the connections has committed a
@@ -326,21 +325,21 @@
for cc in allocated:
if (cc is not connection and
(not version or cc._version==version)):
- if rc(cc) <= 3:
+ if sys.getrefcount(cc) <= 3:
cc.close()
- cc.invalidate(oids)
+ cc.invalidate(tid, oids)
- temps=self._temps
- if temps:
+ if self._temps:
t=[]
- for cc in temps:
- if rc(cc) > 3:
+ for cc in self._temps:
+ if sys.getrefcount(cc) > 3:
if (cc is not connection and
- (not version or cc._version==version)):
- cc.invalidate(oids)
+ (not version or cc._version == version)):
+ cc.invalidate(tid, oids)
t.append(cc)
- else: cc.close()
- self._temps=t
+ else:
+ cc.close()
+ self._temps = t
def modifiedInVersion(self, oid):
h=hash(oid)%131
@@ -356,7 +355,7 @@
return len(self._storage)
def open(self, version='', transaction=None, temporary=0, force=None,
- waitflag=1):
+ waitflag=1, mvcc=True):
"""Return a object space (AKA connection) to work in
The optional version argument can be used to specify that a
@@ -374,25 +373,25 @@
try:
if transaction is not None:
- connections=transaction._connections
+ connections = transaction._connections
if connections:
if connections.has_key(version) and not temporary:
return connections[version]
else:
- transaction._connections=connections={}
- transaction=transaction._connections
-
+ transaction._connections = connections = {}
+ transaction = transaction._connections
if temporary:
# This is a temporary connection.
# We won't bother with the pools. This will be
# a one-use connection.
- c=self.klass(
- version=version,
- cache_size=self._version_cache_size)
+ c = self.klass(version=version,
+ cache_size=self._version_cache_size,
+ mvcc=mvcc)
c._setDB(self)
self._temps.append(c)
- if transaction is not None: transaction[id(c)]=c
+ if transaction is not None:
+ transaction[id(c)] = c
return c
@@ -433,18 +432,18 @@
if not pool:
- c=None
+ c = None
if version:
if self._version_pool_size > len(allocated) or force:
- c=self.klass(
- version=version,
- cache_size=self._version_cache_size)
+ c = self.klass(version=version,
+ cache_size=self._version_cache_size,
+ mvcc=mvcc)
allocated.append(c)
pool.append(c)
elif self._pool_size > len(allocated) or force:
- c=self.klass(
- version=version,
- cache_size=self._cache_size)
+ c = self.klass(version=version,
+ cache_size=self._cache_size,
+ mvcc=mvcc)
allocated.append(c)
pool.append(c)
@@ -459,7 +458,7 @@
pool_lock.release()
else: return
- elif len(pool)==1:
+ elif len(pool) == 1:
# Taking last one, lock the pool
# Note that another thread might grab the lock
# before us, so we might actually block, however,
@@ -473,14 +472,15 @@
# but it could be higher due to a race condition.
pool_lock.release()
- c=pool[-1]
+ c = pool[-1]
del pool[-1]
c._setDB(self)
for pool, allocated in pooll:
for cc in pool:
cc._incrgc()
- if transaction is not None: transaction[version]=c
+ if transaction is not None:
+ transaction[version] = c
return c
finally: self._r()
@@ -591,7 +591,8 @@
d = {}
for oid in storage.undo(id):
d[oid] = 1
- self.invalidate(d)
+ # XXX I think we need to remove old undo to use mvcc
+ self.invalidate(None, d)
def versionEmpty(self, version):
return self._storage.versionEmpty(version)
@@ -619,13 +620,13 @@
def commit(self, reallyme, t):
dest=self._dest
- oids = self._db._storage.commitVersion(self._version, dest, t)
+ tid, oids = self._db._storage.commitVersion(self._version, dest, t)
oids = list2dict(oids)
- self._db.invalidate(oids, version=dest)
+ self._db.invalidate(tid, oids, version=dest)
if dest:
# the code above just invalidated the dest version.
# now we need to invalidate the source!
- self._db.invalidate(oids, version=self._version)
+ self._db.invalidate(tid, oids, version=self._version)
class AbortVersion(CommitVersion):
"""An object that will see to version abortion
@@ -634,9 +635,9 @@
"""
def commit(self, reallyme, t):
- version=self._version
- oids = self._db._storage.abortVersion(version, t)
- self._db.invalidate(list2dict(oids), version=version)
+ version = self._version
+ tid, oids = self._db._storage.abortVersion(version, t)
+ self._db.invalidate(tid, list2dict(oids), version=version)
class TransactionalUndo(CommitVersion):
@@ -650,5 +651,5 @@
# similarity of rhythm that I think it's justified.
def commit(self, reallyme, t):
- oids = self._db._storage.transactionalUndo(self._version, t)
- self._db.invalidate(list2dict(oids))
+ tid, oids = self._db._storage.transactionalUndo(self._version, t)
+ self._db.invalidate(tid, list2dict(oids))
=== ZODB3/ZODB/Connection.py 1.100 => 1.100.2.1 ===
--- ZODB3/ZODB/Connection.py:1.100 Thu Oct 2 14:17:19 2003
+++ ZODB3/ZODB/Connection.py Tue Oct 7 01:10:32 2003
@@ -21,7 +21,6 @@
from POSException import ConflictError, ReadConflictError, TransactionError
from ExtensionClass import Base
import ExportImport, TmpStore
-from zLOG import LOG, ERROR, BLATHER, WARNING
from coptimizations import new_persistent_id
from ConflictResolution import ResolvedSerial
from Transaction import Transaction, get_transaction
@@ -29,10 +28,17 @@
from cPickle import Unpickler, Pickler
from cStringIO import StringIO
+import logging
import sys
import threading
from time import time
-from types import StringType, ClassType
+from types import ClassType
+
+_marker = object()
+
+def myhasattr(obj, attr):
+ # builtin hasattr() swallows exceptions
+ return getattr(obj, attr, _marker) is not _marker
global_code_timestamp = 0
@@ -55,9 +61,9 @@
The Connection manages movement of objects in and out of object storage.
"""
- _tmp=None
- _debug_info=()
- _opened=None
+ _tmp = None
+ _debug_info = ()
+ _opened = None
_code_timestamp = 0
_transaction = None
@@ -65,9 +71,12 @@
# when we close by putting something here.
def __init__(self, version='', cache_size=400,
- cache_deactivate_after=60):
+ cache_deactivate_after=60, mvcc=True):
"""Create a new Connection"""
- self._version=version
+
+ self._log = logging.getLogger("zodb.conn")
+
+ self._version = version
self._cache = cache = PickleCache(self, cache_size)
if version:
# Caches for versions end up empty if the version
@@ -99,6 +108,14 @@
self._invalidated = d = {}
self._invalid = d.has_key
self._conflicts = {}
+ self._noncurrent = {}
+
+ # If MVCC is enabled, then _mvcc is True and _txn_time stores
+ # the upper bound on transactions visible to this connection.
+ # That is, all object revisions must be written before _txn_time.
+ # If it is None, then the current revisions are acceptable.
+ self._mvcc = mvcc
+ self._txn_time = None
def getTransaction(self):
t = self._transaction
@@ -141,7 +158,7 @@
self._incrgc = None
self.cacheGC = None
- def __getitem__(self, oid, tt=type(())):
+ def __getitem__(self, oid):
obj = self._cache.get(oid, None)
if obj is not None:
return obj
@@ -157,9 +174,9 @@
klass, args = object
- if type(klass) is tt:
+ if isinstance(klass, tuple):
module, name = klass
- klass=self._db._classFactory(self, module, name)
+ klass = self._db._classFactory(self, module, name)
if (args is None or
not args and not hasattr(klass,'__getinitargs__')):
@@ -177,12 +194,10 @@
self._cache[oid] = object
return object
- def _persistent_load(self,oid,
- tt=type(())):
-
+ def _persistent_load(self, oid):
__traceback_info__=oid
- if type(oid) is tt:
+ if isinstance(oid, tuple):
# Quick instance reference. We know all we need to know
# to create the instance wo hitting the db, so go for it!
oid, klass = oid
@@ -190,9 +205,10 @@
if obj is not None:
return obj
- if type(klass) is tt:
+ if isinstance(klass, tuple):
module, name = klass
- try: klass=self._db._classFactory(self, module, name)
+ try:
+ klass=self._db._classFactory(self, module, name)
except:
# Eek, we couldn't get the class. Hm.
# Maybe their's more current data in the
@@ -282,11 +298,12 @@
# Call the close callbacks.
if self.__onCloseCallbacks is not None:
for f in self.__onCloseCallbacks:
- try: f()
- except:
- f=getattr(f, 'im_self', f)
- LOG('ZODB',ERROR, 'Close callback failed for %s' % f,
- error=sys.exc_info())
+ try:
+ f()
+ except: # except what?
+ f = getattr(f, 'im_self', f)
+ self._log.error("Close callback failed for %s", f,
+ sys.exc_info())
self.__onCloseCallbacks = None
self._storage = self._tmp = self.new_oid = self._opened = None
self._debug_info = ()
@@ -438,8 +455,8 @@
if tmp is None: return
src=self._storage
- LOG('ZODB', BLATHER,
- 'Commiting subtransaction of size %s' % src.getSize())
+ self._log.debug("Commiting subtransaction of size %s",
+ src.getSize())
self._storage=tmp
self._tmp=None
@@ -490,8 +507,6 @@
del o._p_jar
del o._p_oid
- #XXX
-
def db(self): return self._db
def getVersion(self): return self._version
@@ -499,7 +514,7 @@
def isReadOnly(self):
return self._storage.isReadOnly()
- def invalidate(self, oids):
+ def invalidate(self, tid, oids):
"""Invalidate a set of oids.
This marks the oid as invalid, but doesn't actually invalidate
@@ -508,6 +523,8 @@
"""
self._inv_lock.acquire()
try:
+ if self._txn_time is None:
+ self._txn_time = tid
self._invalidated.update(oids)
finally:
self._inv_lock.release()
@@ -517,13 +534,15 @@
try:
self._cache.invalidate(self._invalidated)
self._invalidated.clear()
+ self._txn_time = None
finally:
self._inv_lock.release()
# Now is a good time to collect some garbage
self._cache.incrgc()
def modifiedInVersion(self, oid):
- try: return self._db.modifiedInVersion(oid)
+ try:
+ return self._db.modifiedInVersion(oid)
except KeyError:
return self._version
@@ -547,55 +566,85 @@
if self._storage is None:
msg = ("Shouldn't load state for %s "
"when the connection is closed" % oid_repr(oid))
- LOG('ZODB', ERROR, msg)
+ self._log.error(msg)
raise RuntimeError(msg)
try:
- # Avoid reading data from a transaction that committed
- # after the current transaction started, as that might
- # lead to mixing of cached data from earlier transactions
- # and new inconsistent data.
- #
- # Wait for check until after data is loaded from storage
- # to avoid time-of-check to time-of-use race.
- p, serial = self._storage.load(oid, self._version)
- self._load_count = self._load_count + 1
- invalid = self._is_invalidated(obj)
- self._set_ghost_state(obj, p)
- obj._p_serial = serial
- if invalid:
- self._handle_independent(obj)
+ self._setstate(obj)
except ConflictError:
raise
except:
- LOG('ZODB', ERROR,
- "Couldn't load state for %s" % oid_repr(oid),
- error=sys.exc_info())
+ self._log.error("Couldn't load state for %s", oid_repr(oid),
+ exc_info=sys.exc_info())
raise
- def _is_invalidated(self, obj):
- # Helper method for setstate() covers three cases:
- # returns false if obj is valid
- # returns true if obj was invalidation, but is independent
- # otherwise, raises ConflictError for invalidated objects
+ def _setstate(self, obj):
+ # Helper for setstate(), which provides logging of failures.
+
+ # The control flow is complicated here to avoid loading an
+ # object revision that we are sure we aren't going to use. As
+ # a result, invalidation tests occur before and after the
+ # load. We can only be sure about invalidations after the
+ # load.
+
+ # If an object has been invalidated, there are several cases
+ # to consider:
+ # 1. Check _p_independent()
+ # 2. Try MVCC
+ # 3. Raise ConflictError.
+
+ # Does anything actually use _p_independent()? It would simplify
+ # the code if we could drop support for it.
+
+ # There is a harmless data race with self._invalidated. A
+ # dict update could go on in another thread, but we don't care
+ # because we have to check again after the load anyway.
+ if (obj._p_oid in self._invalidated
+ and not myhasattr(obj, "_p_independent")):
+ # If the object has _p_independent(), we will handle it below.
+ if not (self._mvcc and self._setstate_noncurrent(obj)):
+ self.getTransaction().register(obj)
+ self._conflicts[obj._p_oid] = 1
+ raise ReadConflictError(object=obj)
+
+ p, serial = self._storage.load(obj._p_oid, self._version)
+ self._load_count += 1
+
self._inv_lock.acquire()
try:
- if self._invalidated.has_key(obj._p_oid):
- # Defer _p_independent() call until state is loaded.
- ind = getattr(obj, "_p_independent", None)
- if ind is not None:
- # Defer _p_independent() call until state is loaded.
- return 1
- else:
- self.getTransaction().register(obj)
- self._conflicts[obj._p_oid] = 1
- raise ReadConflictError(object=obj)
- else:
- return 0
+ invalid = obj._p_oid in self._invalidated
finally:
self._inv_lock.release()
+
+ if invalid:
+ if myhasattr(obj, "_p_independent"):
+ # This call will raise a ReadConflictError if something
+ # goes wrong
+ self._handle_independent(obj)
+ elif not (self._mvcc and self._setstate_noncurrent(obj)):
+ self.getTransaction().register(obj)
+ self._conflicts[obj._p_oid] = 1
+ raise ReadConflictError(object=obj)
+
+ self._set_ghost_state(obj, p, serial)
+
+ def _setstate_noncurrent(self, obj):
+ """Set state using non-current data.
+
+ Return True if state was available, False if not.
+ """
+ try:
+ t = self._storage.loadNonCurrent(obj._p_oid, self._txn_time)
+ except KeyError:
+ return False
+ if t is None:
+ return False
+ data, serial, start, end = t
+ assert start < end == self._txn_time, (start, end, self._txn_time)
+ self._noncurrent[obj._p_oid] = True
+ self._set_ghost_state(obj, data, serial)
- def _set_ghost_state(self, obj, p):
+ def _set_ghost_state(self, obj, p, serial):
file = StringIO(p)
unpickler = Unpickler(file)
unpickler.persistent_load = self._persistent_load
@@ -607,6 +656,7 @@
obj.update(state)
else:
setstate(state)
+ obj._p_serial = serial
def _handle_independent(self, obj):
# Helper method for setstate() handles possibly independent objects
@@ -649,9 +699,9 @@
klass, args = copy
if klass is not ExtensionKlass:
- LOG('ZODB',ERROR,
- "Unexpected klass when setting class state on %s"
- % getattr(object,'__name__','(?)'))
+ self._log.error(
+ "Unexpected klass when setting class state on %s",
+ getattr(object, "__name__", "(?)"))
return
copy = klass(*args)
@@ -663,7 +713,7 @@
object._p_changed=0
object._p_serial=serial
except:
- LOG('ZODB',ERROR, 'setklassstate failed', error=sys.exc_info())
+ self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
def tpc_abort(self, transaction):
@@ -720,7 +770,7 @@
if not store_return:
return
- if isinstance(store_return, StringType):
+ if isinstance(store_return, str):
assert oid is not None
self._handle_one_serial(oid, store_return, change)
else:
@@ -728,7 +778,7 @@
self._handle_one_serial(oid, serial, change)
def _handle_one_serial(self, oid, serial, change):
- if not isinstance(serial, StringType):
+ if not isinstance(serial, str):
raise serial
obj = self._cache.get(oid, None)
if obj is None:
@@ -754,11 +804,11 @@
self._storage._creating[:0]=self._creating
del self._creating[:]
else:
- def callback():
+ def callback(tid):
d = {}
for oid in self._modified:
d[oid] = 1
- self._db.invalidate(d, self)
+ self._db.invalidate(tid, d, self)
self._storage.tpc_finish(transaction, callback)
self._conflicts.clear()
=== ZODB3/ZODB/BaseStorage.py 1.36 => 1.36.2.1 ===
--- ZODB3/ZODB/BaseStorage.py:1.36 Thu Oct 2 14:17:19 2003
+++ ZODB3/ZODB/BaseStorage.py Tue Oct 7 01:10:32 2003
@@ -56,12 +56,12 @@
def abortVersion(self, src, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
- return []
+ return self._serial, []
def commitVersion(self, src, dest, transaction):
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
- return []
+ return self._serial, []
def close(self):
pass
@@ -199,7 +199,7 @@
return
try:
if f is not None:
- f()
+ f(self._serial)
u, d, e = self._ude
self._finish(self._serial, u, d, e)
self._clear_temp()
@@ -245,6 +245,33 @@
def loadSerial(self, oid, serial):
raise POSException.Unsupported, (
"Retrieval of historical revisions is not supported")
+
+ def loadNonCurrent(self, oid, tid):
+ """Return most recent revision of oid before tid committed."""
+
+ n = 2
+ start_time = None
+ while start_time is None:
+ # The history() approach is a hack, because the dict
+ # returned by history() doesn't contain a tid. It
+ # contains a serialno, which is often the same, but isn't
+ # required to be. We'll pretend it is for now.
+
+ # A second problem is that history() doesn't say anything
+ # about whether the transaction status. If it falls before
+ # the pack time, we can't honor the MVCC request.
+
+ # Note: history() returns the most recent record first.
+ L = self.history(oid, "", lambda d: not d["version"])
+ for d in L:
+ if d["serial"] < tid:
+ start_time = d["serial"]
+ break
+ else:
+ end_time = d["serial"]
+ n *= 2
+ data = self.loadSerial(oid, start_time)
+ return data, start_time, start_time, end_time
def getExtensionMethods(self):
"""getExtensionMethods
More information about the Zodb-checkins
mailing list