[Zope-Checkins] CVS: ZODB3/ZODB - Connection.py:1.88 DB.py:1.48 cPickleCache.c:1.81
Jeremy Hylton
jeremy@zope.com
Tue, 8 Apr 2003 11:56:15 -0400
Update of /cvs-repository/ZODB3/ZODB
In directory cvs.zope.org:/tmp/cvs-serv13071/ZODB
Modified Files:
Connection.py DB.py cPickleCache.c
Log Message:
Backport atomic invalidations code from Zope3.
The DB's invalidate() method takes a set of oids corresponding to all
the changes from a data manager for one transaction. All the objects
are invalidated at once.
Add a few tests in testZODB of the new code. The tests just cover
corner cases, because I can't think of a sensible way to test the
atomicity. When it has failed in the past, it's been caused by
nearly-impossible to reproduce data races.
This fix needs to be backported to Zope 2.6, but only after assessing
how significant an impact the API change will have.
=== ZODB3/ZODB/Connection.py 1.87 => 1.88 ===
--- ZODB3/ZODB/Connection.py:1.87 Tue Mar 4 16:00:23 2003
+++ ZODB3/ZODB/Connection.py Tue Apr 8 11:55:44 2003
@@ -14,9 +14,8 @@
"""Database connection support
$Id$"""
-
from cPickleCache import PickleCache
-from POSException import ConflictError, ReadConflictError
+from POSException import ConflictError, ReadConflictError, TransactionError
from ExtensionClass import Base
import ExportImport, TmpStore
from zLOG import LOG, ERROR, BLATHER, WARNING
@@ -27,6 +26,7 @@
from cPickle import Unpickler, Pickler
from cStringIO import StringIO
import sys
+import threading
from time import time
from types import StringType, ClassType
@@ -73,14 +73,29 @@
# XXX Why do we want version caches to behave this way?
self._cache.cache_drain_resistance = 100
- self._incrgc=self.cacheGC=cache.incrgc
- self._invalidated=d={}
- self._invalid=d.has_key
- self._committed=[]
+ self._incrgc = self.cacheGC = cache.incrgc
+ self._committed = []
self._code_timestamp = global_code_timestamp
self._load_count = 0 # Number of objects unghosted
self._store_count = 0 # Number of objects stored
+ # _invalidated queues invalidate messages delivered from the DB
+ # _inv_lock prevents one thread from modifying the set while
+ # another is processing invalidations. All the invalidations
+ # from a single transaction should be applied atomically, so
+ # the lock must be held when reading _invalidated.
+
+ # XXX It sucks that we have to hold the lock to read
+ # _invalidated. Normally, _invalidated is written by call
+ # dict.update, which will execute atomically by virtue of the
+ # GIL. But some storage might generate oids where hash or
+ # compare invokes Python code. In that case, the GIL can't
+ # save us.
+ self._inv_lock = threading.Lock()
+ self._invalidated = d = {}
+ self._invalid = d.has_key
+ self._conflicts = {}
+
def getTransaction(self):
t = self._transaction
if t is None:
@@ -91,8 +106,6 @@
def setLocalTransaction(self):
"""Use a transaction bound to the connection rather than the thread"""
if self._transaction is None:
- # XXX The connection may already be registered with a
- # transaction. I guess we should abort that transaction.
self._transaction = Transaction()
return self._transaction
@@ -150,7 +163,7 @@
not args and not hasattr(klass,'__getinitargs__')):
object=klass.__basicnew__()
else:
- object=apply(klass,args)
+ object = klass(*args)
if klass is not ExtensionKlass:
object.__dict__.clear()
@@ -221,7 +234,7 @@
# New code is in place. Start a new cache.
self._resetCache()
else:
- self._cache.invalidate(self._invalidated)
+ self._flush_invalidations()
self._opened=time()
return self
@@ -242,7 +255,7 @@
This just deactivates the thing.
"""
if object is self:
- self._cache.invalidate(self._invalidated)
+ self._flush_invalidations()
else:
assert object._p_oid is not None
self._cache.invalidate(object._p_oid)
@@ -263,7 +276,6 @@
def close(self):
self._incrgc() # This is a good time to do some GC
- db=self._db
# Call the close callbacks.
if self.__onCloseCallbacks is not None:
@@ -274,10 +286,10 @@
LOG('ZODB',ERROR, 'Close callback failed for %s' % f,
error=sys.exc_info())
self.__onCloseCallbacks = None
- self._db=self._storage=self._tmp=self.new_oid=self._opened=None
- self._debug_info=()
+ self._storage = self._tmp = self.new_oid = self._opened = None
+ self._debug_info = ()
# Return the connection to the pool.
- db._closeConnection(self)
+ self._db._closeConnection(self)
__onCommitActions = None
@@ -292,10 +304,13 @@
# We registered ourself. Execute a commit action, if any.
if self.__onCommitActions is not None:
method_name, args, kw = self.__onCommitActions.pop(0)
- apply(getattr(self, method_name), (transaction,) + args, kw)
+ getattr(self, method_name)(transaction, *args, **kw)
return
oid = object._p_oid
+ if self._conflicts.has_key(oid):
+ raise ReadConflictError(oid)
+
invalid = self._invalid
if oid is None or object._p_jar is not self:
# new object
@@ -305,9 +320,11 @@
self._creating.append(oid)
elif object._p_changed:
- if invalid(oid) and not hasattr(object, '_p_resolveConflict'):
- raise ConflictError(object=object)
- self._invalidating.append(oid)
+ if invalid(oid):
+ resolve = getattr(object, "_p_resolveConflict", None)
+ if resolve is None:
+ raise ConflictError(object=object)
+ self._modified.append(oid)
else:
# Nothing to do
@@ -369,7 +386,7 @@
#XXX We should never get here
if invalid(oid) and not hasattr(object, '_p_resolveConflict'):
raise ConflictError(object=object)
- self._invalidating.append(oid)
+ self._modified.append(oid)
klass = object.__class__
@@ -433,9 +450,9 @@
oids=src._index.keys()
# Copy invalidating and creating info from temporary storage:
- invalidating=self._invalidating
- invalidating[len(invalidating):]=oids
- creating=self._creating
+ modified = self._modified
+ modified[len(modified):] = oids
+ creating = self._creating
creating[len(creating):]=src._creating
for oid in oids:
@@ -479,15 +496,28 @@
def isReadOnly(self):
return self._storage.isReadOnly()
- def invalidate(self, oid):
- """Invalidate a particular oid
+ def invalidate(self, oids):
+ """Invalidate a set of oids.
This marks the oid as invalid, but doesn't actually invalidate
it. The object data will be actually invalidated at certain
transaction boundaries.
"""
- assert oid is not None
- self._invalidated[oid] = 1
+ self._inv_lock.acquire()
+ try:
+ self._invalidated.update(oids)
+ finally:
+ self._inv_lock.release()
+
+ def _flush_invalidations(self):
+ self._inv_lock.acquire()
+ try:
+ self._cache.invalidate(self._invalidated)
+ self._invalidated.clear()
+ finally:
+ self._inv_lock.release()
+ # Now is a good time to collect some garbage
+ self._cache.incrgc()
def modifiedInVersion(self, oid):
try: return self._db.modifiedInVersion(oid)
@@ -508,8 +538,8 @@
def root(self):
return self['\0\0\0\0\0\0\0\0']
- def setstate(self, object):
- oid = object._p_oid
+ def setstate(self, obj):
+ oid = obj._p_oid
if self._storage is None:
msg = ("Shouldn't load state for %s "
@@ -518,54 +548,20 @@
raise RuntimeError(msg)
try:
+ # Avoid reading data from a transaction that committed
+ # after the current transaction started, as that might
+ # lead to mixing of cached data from earlier transactions
+ # and new inconsistent data.
+ #
+ # Wait for check until after data is loaded from storage
+ # to avoid time-of-check to time-of-use race.
p, serial = self._storage.load(oid, self._version)
self._load_count = self._load_count + 1
-
- # XXX this is quite conservative!
- # We need, however, to avoid reading data from a transaction
- # that committed after the current "session" started, as
- # that might lead to mixing of cached data from earlier
- # transactions and new inconsistent data.
- #
- # Note that we (carefully) wait until after we call the
- # storage to make sure that we don't miss an invaildation
- # notifications between the time we check and the time we
- # read.
-
- # XXX Need unit tests for _p_independent.
- if self._invalid(oid):
- if not hasattr(object.__class__, '_p_independent'):
- self.getTransaction().register(self)
- raise ReadConflictError(object=object)
- invalid = 1
- else:
- invalid = 0
-
- file = StringIO(p)
- unpickler = Unpickler(file)
- unpickler.persistent_load = self._persistent_load
- unpickler.load()
- state = unpickler.load()
-
- if hasattr(object, '__setstate__'):
- object.__setstate__(state)
- else:
- d = object.__dict__
- for k, v in state.items():
- d[k] = v
-
- object._p_serial = serial
-
+ invalid = self._is_invalidated(obj)
+ self._set_ghost_state(obj, p)
+ obj._p_serial = serial
if invalid:
- if object._p_independent():
- try:
- del self._invalidated[oid]
- except KeyError:
- pass
- else:
- self.getTransaction().register(self)
- raise ConflictError(object=object)
-
+ self._handle_independent(obj)
except ConflictError:
raise
except:
@@ -573,6 +569,59 @@
error=sys.exc_info())
raise
+ def _is_invalidated(self, obj):
+ # Helper method for setstate() covers three cases:
+ # returns false if obj is valid
+ # returns true if obj was invalidation, but is independent
+ # otherwise, raises ConflictError for invalidated objects
+ self._inv_lock.acquire()
+ try:
+ if self._invalidated.has_key(obj._p_oid):
+ # Defer _p_independent() call until state is loaded.
+ ind = getattr(obj, "_p_independent", None)
+ if ind is not None:
+ # Defer _p_independent() call until state is loaded.
+ return 1
+ else:
+ self.getTransaction().register(self)
+ self._conflicts[obj._p_oid] = 1
+ raise ReadConflictError(object=obj)
+ else:
+ return 0
+ finally:
+ self._inv_lock.release()
+
+ def _set_ghost_state(self, obj, p):
+ file = StringIO(p)
+ unpickler = Unpickler(file)
+ unpickler.persistent_load = self._persistent_load
+ unpickler.load()
+ state = unpickler.load()
+
+ setstate = getattr(obj, "__setstate__", None)
+ if setstate is None:
+ obj.update(state)
+ else:
+ setstate(state)
+
+ def _handle_independent(self, obj):
+ # Helper method for setstate() handles possibly independent objects
+ # Call _p_independent(), if it returns True, setstate() wins.
+ # Otherwise, raise a ConflictError.
+
+ if obj._p_independent():
+ self._inv_lock.acquire()
+ try:
+ try:
+ del self._invalidated[obj._p_oid]
+ except KeyError:
+ pass
+ finally:
+ self._inv_lock.release()
+ else:
+ self.getTransaction().register(obj)
+ raise ReadConflictError(object=obj)
+
def oldstate(self, object, serial):
oid=object._p_oid
p = self._storage.loadSerial(oid, serial)
@@ -601,7 +650,7 @@
% getattr(object,'__name__','(?)'))
return
- copy=apply(klass,args)
+ copy = klass(*args)
object.__dict__.clear()
object.__dict__.update(copy.__dict__)
@@ -617,12 +666,13 @@
if self.__onCommitActions is not None:
del self.__onCommitActions
self._storage.tpc_abort(transaction)
- self._cache.invalidate(self._invalidated)
- self._cache.invalidate(self._invalidating)
+ self._cache.invalidate(self._modified)
+ self._flush_invalidations()
+ self._conflicts.clear()
self._invalidate_creating()
def tpc_begin(self, transaction, sub=None):
- self._invalidating = []
+ self._modified = []
self._creating = []
if sub:
# Sub-transaction!
@@ -688,10 +738,10 @@
def tpc_finish(self, transaction):
# It's important that the storage call the function we pass
- # (self._invalidate_invalidating) while it still has it's
- # lock. We don't want another thread to be able to read any
- # updated data until we've had a chance to send an
- # invalidation message to all of the other connections!
+ # while it still has it's lock. We don't want another thread
+ # to be able to read any updated data until we've had a chance
+ # to send an invalidation message to all of the other
+ # connections!
if self._tmp is not None:
# Commiting a subtransaction!
@@ -700,25 +750,21 @@
self._storage._creating[:0]=self._creating
del self._creating[:]
else:
- self._db.begin_invalidation()
- self._storage.tpc_finish(transaction,
- self._invalidate_invalidating)
+ def callback():
+ d = {}
+ for oid in self._modified:
+ d[oid] = 1
+ self._db.invalidate(d, self)
+ self._storage.tpc_finish(transaction, callback)
- self._cache.invalidate(self._invalidated)
- self._incrgc() # This is a good time to do some GC
-
- def _invalidate_invalidating(self):
- for oid in self._invalidating:
- assert oid is not None
- self._db.invalidate(oid, self)
- self._db.finish_invalidation()
+ self._conflicts.clear()
+ self._flush_invalidations()
def sync(self):
self.getTransaction().abort()
sync=getattr(self._storage, 'sync', 0)
if sync != 0: sync()
- self._cache.invalidate(self._invalidated)
- self._incrgc() # This is a good time to do some GC
+ self._flush_invalidations()
def getDebugInfo(self):
return self._debug_info
=== ZODB3/ZODB/DB.py 1.47 => 1.48 ===
--- ZODB3/ZODB/DB.py:1.47 Fri Jan 17 12:23:14 2003
+++ ZODB3/ZODB/DB.py Tue Apr 8 11:55:44 2003
@@ -26,6 +26,12 @@
from types import StringType
+def list2dict(L):
+ d = {}
+ for elt in L:
+ d[elt] = 1
+ return d
+
class DB(UndoLogCompatible.UndoLogCompatible):
"""The Object Database
@@ -282,17 +288,7 @@
def importFile(self, file):
raise 'Not yet implemented'
- def begin_invalidation(self):
- # Must be called before first call to invalidate and before
- # the storage lock is held.
- self._a()
-
- def finish_invalidation(self):
- # Must be called after begin_invalidation() and after final
- # invalidate() call.
- self._r()
-
- def invalidate(self, oid, connection=None, version='',
+ def invalidate(self, oids, connection=None, version='',
rc=sys.getrefcount):
"""Invalidate references to a given oid.
@@ -304,9 +300,11 @@
if connection is not None:
version=connection._version
# Update modified in version cache
- h=hash(oid)%131
- o=self._miv_cache.get(h, None)
- if o is not None and o[0]==oid: del self._miv_cache[h]
+ # XXX must make this work with list or dict to backport to 2.6
+ for oid in oids:
+ h=hash(oid)%131
+ o=self._miv_cache.get(h, None)
+ if o is not None and o[0]==oid: del self._miv_cache[h]
# Notify connections
for pool, allocated in self._pools[1]:
@@ -315,7 +313,7 @@
(not version or cc._version==version)):
if rc(cc) <= 3:
cc.close()
- cc.invalidate(oid)
+ cc.invalidate(oids)
temps=self._temps
if temps:
@@ -324,7 +322,7 @@
if rc(cc) > 3:
if (cc is not connection and
(not version or cc._version==version)):
- cc.invalidate(oid)
+ cc.invalidate(oids)
t.append(cc)
else: cc.close()
self._temps=t
@@ -561,8 +559,10 @@
transaction.register(TransactionalUndo(self, id))
else:
# fall back to old undo
+ d = {}
for oid in storage.undo(id):
- self.invalidate(oid)
+ d[oid] = 1
+ self.invalidate(d)
def versionEmpty(self, version):
return self._storage.versionEmpty(version)
@@ -589,14 +589,14 @@
def abort(self, reallyme, t): pass
def commit(self, reallyme, t):
- db=self._db
dest=self._dest
- oids=db._storage.commitVersion(self._version, dest, t)
- for oid in oids: db.invalidate(oid, version=dest)
+ oids = self._db._storage.commitVersion(self._version, dest, t)
+ oids = list2dict(oids)
+ self._db.invalidate(oids, version=dest)
if dest:
# the code above just invalidated the dest version.
# now we need to invalidate the source!
- for oid in oids: db.invalidate(oid, version=self._version)
+ self._db.invalidate(oids, version=self._version)
class AbortVersion(CommitVersion):
"""An object that will see to version abortion
@@ -605,11 +605,9 @@
"""
def commit(self, reallyme, t):
- db=self._db
version=self._version
- oids = db._storage.abortVersion(version, t)
- for oid in oids:
- db.invalidate(oid, version=version)
+ oids = self._db._storage.abortVersion(version, t)
+ self._db.invalidate(list2dict(oids), version=version)
class TransactionalUndo(CommitVersion):
@@ -623,7 +621,5 @@
# similarity of rhythm that I think it's justified.
def commit(self, reallyme, t):
- db=self._db
- oids=db._storage.transactionalUndo(self._version, t)
- for oid in oids:
- db.invalidate(oid)
+ oids = self._db._storage.transactionalUndo(self._version, t)
+ self._db.invalidate(list2dict(oids))
=== ZODB3/ZODB/cPickleCache.c 1.80 => 1.81 ===
--- ZODB3/ZODB/cPickleCache.c:1.80 Wed Apr 2 11:50:49 2003
+++ ZODB3/ZODB/cPickleCache.c Tue Apr 8 11:55:44 2003
@@ -352,6 +352,7 @@
_invalidate(self, key);
Py_DECREF(key);
}
+ /* XXX Do we really want to modify the input? */
PySequence_DelSlice(inv, 0, l);
}
}