[Zope3-checkins] CVS: Zope3/src/zodb - connection.py:1.9.2.1
Jeremy Hylton
jeremy@zope.com
Sat, 1 Mar 2003 21:36:57 -0500
Update of /cvs-repository/Zope3/src/zodb
In directory cvs.zope.org:/tmp/cvs-serv19408/src/zodb
Modified Files:
Tag: jeremy-atomic-invalidation-branch
connection.py
Log Message:
New invalidation API and better handling of read conflicts.
The begin_ and finish_invalidation() methods are no longer needed.
Change invalidate() to take a set of oids instead of just one.
If a read conflict is raised, add the oid to connections' _conflicts
variable. If a commit() occurs for a txn that got a read conflict,
raise ReadConflictError again.
=== Zope3/src/zodb/connection.py 1.9 => 1.9.2.1 ===
--- Zope3/src/zodb/connection.py:1.9 Thu Feb 27 15:16:45 2003
+++ Zope3/src/zodb/connection.py Sat Mar 1 21:36:54 2003
@@ -55,7 +55,7 @@
from zodb import interfaces
from zodb.conflict import ResolvedSerial
from zodb.export import ExportImport
-from zodb.interfaces import IConnection, ConflictError, IAppConnection
+from zodb.interfaces import *
from zodb.serialize import ConnectionObjectReader, ObjectWriter
from zodb.utils import p64, u64, Set, z64
@@ -102,6 +102,11 @@
self._modified = Set()
self._created = Set()
+ # _conflicts: set of objects that failed to load because
+ # of read conflicts. We must track these explicitly
+ # because they occur outside the two-phase commit and
+ # we must not allow the transaction they occur in to commit.
+ self._conflicts = Set()
# new_oid is used by serialize
self.newObjectId = self._storage.newObjectId
@@ -152,11 +157,11 @@
# IPersistentDataManager requires the next three methods:
# setstate(), register(), mtime()
- def setstate(self, object):
+ def setstate(self, obj):
oid = None
# XXX Is it possible to reorganize the method-level try/except?
try:
- oid = object._p_oid
+ oid = obj._p_oid
# XXX this is quite conservative!
@@ -170,19 +175,20 @@
p, serial = self._storage.load(oid, self._version)
if oid in self._invalidated:
- if not (hasattr(object, '_p_independent')
- and object._p_independent()):
+ if not (hasattr(obj, '_p_independent')
+ and obj._p_independent()):
get_transaction().join(self)
- raise ConflictError(object=object)
+ self._conflicts.add(obj._p_oid)
+ raise ReadConflictError(object=obj)
invalid = 1
else:
invalid = 0
- self._reader.setGhostState(object, p)
- object._p_serial = serial
+ self._reader.setGhostState(obj, p)
+ obj._p_serial = serial
if invalid:
- if object._p_independent():
+ if obj._p_independent():
self._inv_lock.acquire()
try:
try:
@@ -193,7 +199,7 @@
self._inv_lock.release()
else:
get_transaction().join(self)
- raise ConflictError(object=object)
+ raise ConflictError(object=obj)
except ConflictError:
raise
@@ -202,7 +208,7 @@
raise
else:
# Add the object to the cache active list
- self._cache.setstate(oid, object)
+ self._cache.setstate(oid, obj)
def register(self, object):
txn = get_transaction()
@@ -239,17 +245,16 @@
def cacheGC(self):
self._cache.incrgc()
- def invalidate(self, oid):
+ def invalidate(self, oids):
"""Invalidate a particular oid
This marks the oid as invalid, but doesn't actually invalidate
it. The object data will be actually invalidated at certain
transaction boundaries.
"""
- assert oid is not None
self._inv_lock.acquire()
try:
- self._invalidated.add(oid)
+ self._invalidated.update(oids)
finally:
self._inv_lock.release()
@@ -264,6 +269,12 @@
# prepare(), abort(), commit(), savepoint()
def prepare(self, txn):
+ if self._conflicts:
+ # XXX should raise all of the conflicting oids, but
+ # haven't gotten around to changing the exception
+ # to store them.
+ oid = list(self._conflicts)[0]
+ raise ReadConflictError(oid)
self._modified.clear()
self._created.clear()
if self._tmp is not None:
@@ -295,24 +306,24 @@
self._invalidate_created(self._created)
self._created = Set()
self._modified.clear()
+ self._conflicts.clear()
def commit(self, txn):
# It's important that the storage call the function we pass
- # (self._invalidate_modified) while it still has its
- # lock. We don't want another thread to be able to read any
+ # (self._invalidate_modified) while it still has its lock.
+ # We don't want another thread to be able to read any
# updated data until we've had a chance to send an
# invalidation message to all of the other connections!
- self._db.begin_invalidation()
- # XXX We should really have a try/finally because the begin
- # call acquired a lock that will only be released in
- # _invalidate_modified().
+ # If another thread could read the newly committed data
+ # before the invalidation is delivered, the connection would
+ # not be able to detect a read conflict.
self._storage.tpcFinish(txn, self._invalidate_modified)
try:
del self._txns[txn]
except KeyError:
pass
-
+ self._conflicts.clear()
self._flush_invalidations()
def savepoint(self, txn):
@@ -335,6 +346,18 @@
self._created = Set()
return Rollback(self, undo)
+ def _invalidate_created(self, created):
+ # Dis-own new objects from uncommitted transaction.
+ for oid in created:
+ o = self._cache.get(oid)
+ if o is not None:
+ del o._p_jar
+ del o._p_oid
+ del self._cache[oid]
+
+ def _invalidate_modified(self):
+ self._db.invalidate(self._modified, self)
+
def _flush_invalidations(self):
self._inv_lock.acquire()
try:
@@ -457,24 +480,6 @@
self._cache.invalidateMany(tmp._index)
self._invalidate_created(tmp._created)
-
- def _invalidate_created(self, created):
- # Dis-own new objects from uncommitted transaction.
- for oid in created:
- o = self._cache.get(oid)
- if o is not None:
- del o._p_jar
- del o._p_oid
- del self._cache[oid]
-
- def _invalidate_modified(self):
- # Called from the storage's tpc_finish() method after
- # self._db.begin_invalidation() is called. The begin_
- # and finish_invalidation() methods acquire and release
- # a lock.
- for oid in self._modified:
- self._db.invalidate(oid, self)
- self._db.finish_invalidation()
class Rollback:
"""Rollback changes associated with savepoint"""