[Zope-Checkins] CVS: Zope3/src/ZODB - Connection.py:1.139
Jeremy Hylton
jeremy at zope.com
Sat Mar 13 02:48:42 EST 2004
Update of /cvs-repository/Zope3/src/ZODB
In directory cvs.zope.org:/tmp/cvs-serv29469/src/ZODB
Modified Files:
Connection.py
Log Message:
Revise Connection.
Make _added_during_commit a regular instance variable. Don't use
try/finally to reset it; just clear it at the start of a transaction.
XXX There was a test that needed to be removed, but it seemed to be
just a shallow test that try/finally was used. Can't see any feature
that depends on specific of error handling: The txn is going to abort.
Remove unused _opened instance variable.
Split commit() into two smaller parts.
Get rid of extra manipulation of _creating.
Don't look for _p_serial of None; z64 is now required.
Undo local variable aliases in subtransaction methods.
Also, trivial change to pickle cache API -- get() works like dict get().
=== Zope3/src/ZODB/Connection.py 1.138 => 1.139 ===
--- Zope3/src/ZODB/Connection.py:1.138 Fri Mar 12 01:37:23 2004
+++ Zope3/src/ZODB/Connection.py Sat Mar 13 02:48:11 2004
@@ -131,10 +131,9 @@
_tmp = None
_code_timestamp = 0
_transaction = None
- _added_during_commit = None
def __init__(self, version='', cache_size=400,
- cache_deactivate_after=60, mvcc=True):
+ cache_deactivate_after=None, mvcc=True):
"""Create a new Connection.
A Connection instance should by instantiated by the DB
@@ -143,7 +142,6 @@
self._log = logging.getLogger("zodb.conn")
self._storage = None
- self._opened = None
self._debug_info = ()
self._version = version
@@ -158,6 +156,7 @@
self._cache.cache_drain_resistance = 100
self._committed = []
self._added = {}
+ self._added_during_commit = None
self._reset_counter = global_reset_counter
self._load_count = 0 # Number of objects unghosted
self._store_count = 0 # Number of objects stored
@@ -315,12 +314,10 @@
raise InvalidObjectReference(obj, obj._p_jar)
def sortKey(self):
- # XXX will raise an exception if the DB hasn't been set
- storage_key = self._sortKey()
# If two connections use the same storage, give them a
# consistent order using id(). This is unique for the
# lifetime of a connection, which is good enough.
- return "%s:%s" % (storage_key, id(self))
+ return "%s:%s" % (self._sortKey(), id(self))
def _setDB(self, odb):
"""Register odb, the DB that this Connection uses.
@@ -348,7 +345,6 @@
self._flush_invalidations()
self._reader = ConnectionObjectReader(self, self._cache,
self._db.classFactory)
- self._opened = time()
def _resetCache(self):
"""Creates a new cache, discarding the old.
@@ -452,148 +448,123 @@
self._log.error("Close callback failed for %s", f,
sys.exc_info())
self.__onCloseCallbacks = None
- self._storage = self._tmp = self.new_oid = self._opened = None
+ self._storage = self._tmp = self.new_oid = None
self._debug_info = ()
# Return the connection to the pool.
if self._db is not None:
self._db._closeConnection(self)
self._db = None
- def commit(self, object, transaction):
- if object is self:
+ def commit(self, obj, transaction):
+ if obj is self:
# We registered ourself. Execute a commit action, if any.
if self._import:
self._importDuringCommit(transaction, *self._import)
self._import = None
return
- oid = object._p_oid
- if self._conflicts.has_key(oid):
- self.getTransaction().register(object)
- raise ReadConflictError(object=object)
-
- invalid = self._invalid
-
- # XXX In the case of a new object or an object added using add(),
- # the oid is appended to _creating.
- # However, this ought to be unnecessary because the _p_serial
- # of the object will be z64 or None, so it will be appended
- # to _creating about 30 lines down. The removal from _added
- # ought likewise to be unnecessary.
- if oid is None or object._p_jar is not self:
+ oid = obj._p_oid
+ if oid in self._conflicts:
+ self.getTransaction().register(obj)
+ raise ReadConflictError(object=obj)
+
+ if oid is None or obj._p_jar is not self:
# new object
oid = self.new_oid()
- object._p_jar = self
- object._p_oid = oid
- self._creating.append(oid) # maybe don't need this
+ obj._p_jar = self
+ obj._p_oid = oid
+ assert obj._p_serial == z64
elif oid in self._added:
- # maybe don't need these
- self._creating.append(oid)
- del self._added[oid]
- elif object._p_changed:
- if invalid(oid):
- resolve = getattr(object, "_p_resolveConflict", None)
+ assert obj._p_serial == z64
+ elif obj._p_changed:
+ if oid in self._invalidated:
+ resolve = getattr(obj, "_p_resolveConflict", None)
if resolve is None:
- raise ConflictError(object=object)
+ raise ConflictError(object=obj)
self._modified.append(oid)
else:
# Nothing to do
return
- w = ObjectWriter(object)
+ self._store_objects(ObjectWriter(obj), transaction)
+
+ def _store_objects(self, writer, transaction):
self._added_during_commit = []
- try:
- for obj in itertools.chain(w, self._added_during_commit):
- oid = obj._p_oid
- serial = getattr(obj, '_p_serial', z64)
-
- # XXX which one? z64 or None? Why do I have to check both?
- if serial == z64 or serial is None:
- # new object
- self._creating.append(oid)
- # If this object was added, it is now in _creating, so can
- # be removed from _added.
- self._added.pop(oid, None)
+ for obj in itertools.chain(writer, self._added_during_commit):
+ oid = obj._p_oid
+ serial = getattr(obj, "_p_serial", z64)
+
+ if serial == z64:
+ # new object
+ self._creating.append(oid)
+ # If this object was added, it is now in _creating, so can
+ # be removed from _added.
+ self._added.pop(oid, None)
+ else:
+ if (oid in self._invalidated
+ and not hasattr(obj, '_p_resolveConflict')):
+ raise ConflictError(object=obj)
+ self._modified.append(oid)
+ p = writer.serialize(obj) # This calls __getstate__ of obj
+ s = self._storage.store(oid, serial, p, self._version, transaction)
+ self._store_count += 1
+ # Put the object in the cache before handling the
+ # response, just in case the response contains the
+ # serial number for a newly created object
+ try:
+ self._cache[oid] = obj
+ except:
+ # Dang, I bet its wrapped:
+ if hasattr(obj, 'aq_base'):
+ self._cache[oid] = obj.aq_base
else:
- if (invalid(oid)
- and not hasattr(object, '_p_resolveConflict')):
- raise ConflictError(object=obj)
- self._modified.append(oid)
- p = w.serialize(obj) # This calls __getstate__ of obj
-
- s = self._storage.store(oid, serial, p, self._version,
- transaction)
- self._store_count = self._store_count + 1
- # Put the object in the cache before handling the
- # response, just in case the response contains the
- # serial number for a newly created object
- try:
- self._cache[oid] = obj
- except:
- # Dang, I bet its wrapped:
- if hasattr(obj, 'aq_base'):
- self._cache[oid] = obj.aq_base
- else:
- raise
+ raise
- self._handle_serial(s, oid)
- finally:
- del self._added_during_commit
+ self._handle_serial(s, oid)
+ self._added_during_commit = None
def commit_sub(self, t):
"""Commit all work done in all subtransactions for this transaction"""
- tmp=self._tmp
- if tmp is None: return
- src=self._storage
-
- self._log.debug("Commiting subtransaction of size %s",
- src.getSize())
-
- self._storage=tmp
- self._tmp=None
-
- tmp.tpc_begin(t)
-
- load=src.load
- store=tmp.store
- dest=self._version
- oids=src._index.keys()
+ if self._tmp is None:
+ return
+ src = self._storage
+ self._storage = self._tmp
+ self._tmp = None
+
+ self._log.debug("Commiting subtransaction of size %s", src.getSize())
+ oids = src._index.keys()
+ self._storage.tpc_begin(t)
# Copy invalidating and creating info from temporary storage:
- modified = self._modified
- modified[len(modified):] = oids
- creating = self._creating
- creating[len(creating):]=src._creating
+ self._modified[len(self._modified):] = oids
+ self._creating[len(self._creating):] = src._creating
for oid in oids:
- data, serial = load(oid, src)
- s=store(oid, serial, data, dest, t)
- self._handle_serial(s, oid, change=0)
+ data, serial = src.load(oid, src)
+ s = self._storage.store(oid, serial, data, self._version, t)
+ self._handle_serial(s, oid, change=False)
def abort_sub(self, t):
"""Abort work done in all subtransactions for this transaction"""
- tmp=self._tmp
- if tmp is None: return
- src=self._storage
- self._tmp=None
- self._storage=tmp
+ if self._tmp is None:
+ return
+ src = self._storage
+ self._storage = self._tmp
+ self._tmp = None
self._cache.invalidate(src._index.keys())
self._invalidate_creating(src._creating)
def _invalidate_creating(self, creating=None):
- """Dissown any objects newly saved in an uncommitted transaction.
- """
+ """Dissown any objects newly saved in an uncommitted transaction."""
if creating is None:
- creating=self._creating
- self._creating=[]
+ creating = self._creating
+ self._creating = []
- cache=self._cache
- cache_get=cache.get
for oid in creating:
- o=cache_get(oid, None)
+ o = self._cache.get(oid)
if o is not None:
- del cache[oid]
+ del self._cache[oid]
del o._p_jar
del o._p_oid
@@ -844,6 +815,9 @@
def tpc_begin(self, transaction, sub=None):
self._modified = []
+
+ # _creating is a list of oids of new objects, which is used to
+ # remove them from the cache if a transaction aborts.
self._creating = []
if sub:
# Sub-transaction!
More information about the Zope-Checkins
mailing list