[Zodb-checkins] CVS: Zope/lib/python/ZODB - Connection.py:1.149
ExportImport.py:1.24
Tim Peters
tim.one at comcast.net
Thu Apr 15 21:08:43 EDT 2004
Update of /cvs-repository/Zope/lib/python/ZODB
In directory cvs.zope.org:/tmp/cvs-serv10213/lib/python/ZODB
Modified Files:
Connection.py ExportImport.py
Log Message:
Finally changed Connection to use the new transaction join API. This
essentially means a connection keep track of which objects from
the connection are modified, instead of transactions keeping track of that.
A good consequence hasn't yet been implemented: if a connection is
closed with modifications still pending, we can detect that efficiently
now, and complain.
=== Zope/lib/python/ZODB/Connection.py 1.148 => 1.149 ===
--- Zope/lib/python/ZODB/Connection.py:1.148 Thu Apr 15 12:22:38 2004
+++ Zope/lib/python/ZODB/Connection.py Thu Apr 15 21:08:12 2004
@@ -158,7 +158,7 @@
A Connection instance should by instantiated by the DB
instance that it is connected to.
-
+
:Parameters:
- `version`: the "version" that all changes will be made
in, defaults to no version.
@@ -192,8 +192,17 @@
self._reset_counter = global_reset_counter
self._load_count = 0 # Number of objects unghosted
self._store_count = 0 # Number of objects stored
+
+ # List of oids of modified objects (to be invalidated on an abort).
self._modified = []
+ # List of all objects (not oids) registered as modified by the
+ # persistence machinery.
+ self._registered_objects = []
+
+ # Do we need to join a txn manager?
+ self._needs_to_join = True
+
# If a transaction manager is passed to the constructor, use
# it instead of the global transaction manager. The instance
# variable will hold a TM instance.
@@ -373,7 +382,7 @@
obj._p_jar = self
if self._added_during_commit is not None:
self._added_during_commit.append(obj)
- self._txn_mgr.get().register(obj)
+ self._register(obj)
# Add to _added after calling register(), so that _added
# can be used as a test for whether the object has been
# registered with the transaction.
@@ -440,22 +449,23 @@
cache_size = self._cache.cache_size
self._cache = cache = PickleCache(self, cache_size)
- def abort(self, object, transaction):
+ def abort(self, transaction):
"""Abort the object in the transaction.
This just deactivates the thing.
"""
- if object is self:
- self._flush_invalidations()
- else:
- oid = object._p_oid
+
+ for obj in self._registered_objects:
+ oid = obj._p_oid
assert oid is not None
if oid in self._added:
del self._added[oid]
- del object._p_jar
- del object._p_oid
+ del obj._p_jar
+ del obj._p_oid
else:
- self._cache.invalidate(object._p_oid)
+ self._cache.invalidate(oid)
+
+ self._tpc_cleanup()
# XXX should there be a way to call incrgc directly?
# perhaps "full sweep" should do that?
@@ -546,37 +556,35 @@
# assert that here, because self may have been reused (by
# another thread) by the time we get back here.
- def commit(self, obj, transaction):
- if obj is self:
- # We registered ourself. Execute a commit action, if any.
- if self._import:
- self._importDuringCommit(transaction, *self._import)
- self._import = None
- return
-
- oid = obj._p_oid
- if oid in self._conflicts:
- raise ReadConflictError(object=obj)
+ def commit(self, transaction):
+ if self._import:
+ # XXX eh?
+ self._importDuringCommit(transaction, *self._import)
+ self._import = None
- if oid is None or obj._p_jar is not self:
- # new object
- oid = self.new_oid()
- obj._p_jar = self
- obj._p_oid = oid
- assert obj._p_serial == z64
- elif oid in self._added:
- assert obj._p_serial == z64
- elif obj._p_changed:
- if oid in self._invalidated:
- resolve = getattr(obj, "_p_resolveConflict", None)
- if resolve is None:
- raise ConflictError(object=obj)
- self._modified.append(oid)
- else:
- # Nothing to do
- return
+ for obj in self._registered_objects:
+ oid = obj._p_oid
+ assert oid
+ if oid in self._conflicts:
+ raise ReadConflictError(object=obj)
+
+ if obj._p_jar is not self:
+ raise InvalidObjectReference(obj, obj._p_jar)
+ elif oid in self._added:
+ assert obj._p_serial == z64
+ elif obj._p_changed:
+ if oid in self._invalidated:
+ resolve = getattr(obj, "_p_resolveConflict", None)
+ if resolve is None:
+ raise ConflictError(object=obj)
+ self._modified.append(oid)
+ else:
+ # Nothing to do. It's been said that it's legal, e.g., for
+ # an object to set _p_changed to false after it's been
+ # changed and registered.
+ continue
- self._store_objects(ObjectWriter(obj), transaction)
+ self._store_objects(ObjectWriter(obj), transaction)
def _store_objects(self, writer, transaction):
self._added_during_commit = []
@@ -626,8 +634,8 @@
self._storage.tpc_begin(t)
# Copy invalidating and creating info from temporary storage:
- self._modified[len(self._modified):] = oids
- self._creating[len(self._creating):] = src._creating
+ self._modified.extend(oids)
+ self._creating.extend(src._creating)
for oid in oids:
data, serial = src.load(oid, src)
@@ -745,7 +753,14 @@
elif obj._p_oid in self._added:
# It was registered before it was added to _added.
return
- self._txn_mgr.get().register(obj)
+ self._register(obj)
+
+ def _register(self, obj=None):
+ if obj is not None:
+ self._registered_objects.append(obj)
+ if self._needs_to_join:
+ self._txn_mgr.get().join(self)
+ self._needs_to_join = False
def root(self):
"""Return the database root object.
@@ -825,7 +840,7 @@
"""Load non-current state for obj or raise ReadConflictError."""
if not (self._mvcc and self._setstate_noncurrent(obj)):
- self._txn_mgr.get().register(obj)
+ self._register(obj)
self._conflicts[obj._p_oid] = True
raise ReadConflictError(object=obj)
@@ -867,7 +882,7 @@
finally:
self._inv_lock.release()
else:
- self._txn_mgr.get().register(obj)
+ self._register(obj)
raise ReadConflictError(object=obj)
def oldstate(self, obj, tid):
@@ -912,20 +927,6 @@
self._log.error("setklassstate failed", exc_info=sys.exc_info())
raise
- def tpc_abort(self, transaction):
- if self._import:
- self._import = None
- self._storage.tpc_abort(transaction)
- self._cache.invalidate(self._modified)
- self._conflicts.clear()
- if not self._synch:
- self._flush_invalidations()
- self._invalidate_creating()
- while self._added:
- oid, obj = self._added.popitem()
- del obj._p_oid
- del obj._p_jar
-
def tpc_begin(self, transaction, sub=False):
self._modified = []
@@ -1009,10 +1010,28 @@
d[oid] = 1
self._db.invalidate(tid, d, self)
self._storage.tpc_finish(transaction, callback)
+ self._tpc_cleanup()
+ def tpc_abort(self, transaction):
+ if self._import:
+ self._import = None
+ self._storage.tpc_abort(transaction)
+ self._cache.invalidate(self._modified)
+ self._invalidate_creating()
+ while self._added:
+ oid, obj = self._added.popitem()
+ del obj._p_oid
+ del obj._p_jar
+ self._tpc_cleanup()
+
+ # Common cleanup actions after tpc_finish/tpc_abort.
+ def _tpc_cleanup(self):
self._conflicts.clear()
if not self._synch:
self._flush_invalidations()
+ self._needs_to_join = True
+ self._registered_objects = []
+
def sync(self):
self._txn_mgr.get().abort()
@@ -1044,5 +1063,5 @@
new._p_oid = oid
new._p_jar = self
new._p_changed = 1
- self._txn_mgr.get().register(new)
+ self._register(new)
self._cache[oid] = new
=== Zope/lib/python/ZODB/ExportImport.py 1.23 => 1.24 ===
--- Zope/lib/python/ZODB/ExportImport.py:1.23 Sun Mar 21 11:13:31 2004
+++ Zope/lib/python/ZODB/ExportImport.py Thu Apr 15 21:08:12 2004
@@ -57,7 +57,7 @@
if isinstance(f, str):
f = open(f,'rb')
-
+
magic = f.read(4)
if magic != 'ZEXP':
if customImporters and customImporters.has_key(magic):
@@ -65,13 +65,13 @@
return customImporters[magic](self, f, clue)
raise ExportError("Invalid export header")
- t = self.getTransaction()
+ t = self._txn_mgr.get()
if clue:
t.note(clue)
return_oid_list = []
self._import = f, return_oid_list
- self.getTransaction().register(self)
+ self._register()
t.commit(1)
# Return the root imported object.
if return_oid_list:
More information about the Zodb-checkins
mailing list