[Zodb-checkins] CVS: ZODB3/BDBStorage - BDBFullStorage.py:1.71.2.1
BDBMinimalStorage.py:1.29.14.1 BerkeleyBase.py:1.41.14.2
Jeremy Hylton
jeremy at zope.com
Tue Sep 9 19:36:08 EDT 2003
Update of /cvs-repository/ZODB3/BDBStorage
In directory cvs.zope.org:/tmp/cvs-serv21141/BDBStorage
Modified Files:
Tag: ZODB3-3_2-branch
BDBFullStorage.py BDBMinimalStorage.py BerkeleyBase.py
Log Message:
Commit merge of Berkeley code from ZODB3-3_1-branch.
Two tests fail --
testEarlierRaceCondition and
testRaceCondition.
Will fix that next.
=== ZODB3/BDBStorage/BDBFullStorage.py 1.71 => 1.71.2.1 ===
--- ZODB3/BDBStorage/BDBFullStorage.py:1.71 Fri May 30 13:36:07 2003
+++ ZODB3/BDBStorage/BDBFullStorage.py Tue Sep 9 18:35:37 2003
@@ -235,7 +235,7 @@
# Tables to support packing.
self._objrevs = self._setupDB('objrevs', db.DB_DUP)
self._packmark = self._setupDB('packmark')
- self._oidqueue = self._setupDB('oidqueue', 0, db.DB_QUEUE, 8)
+ self._oidqueue = self._setupDB('oidqueue', 0, db.DB_QUEUE, 16)
self._delqueue = self._setupDB('delqueue', 0, db.DB_QUEUE, 8)
# Do recovery and consistency checks
self._withlock(self._dorecovery)
@@ -247,14 +247,6 @@
elif version <> BDBFULL_SCHEMA_VERSION:
raise StorageSystemError, 'incompatible storage version'
- def _make_autopacker(self, event):
- config = self._config
- lastpacktime = U64(self._last_packtime())
- return _Autopack(
- self, event,
- config.frequency, config.packtime, config.classicpack,
- lastpacktime)
-
def _dorecovery(self):
# If these tables are non-empty, it means we crashed during a pack
# operation. I think we can safely throw out this data since the next
@@ -282,8 +274,10 @@
flag = self._pending.get(tid)
assert flag in (ABORT, COMMIT)
if flag == ABORT:
+ self.log('aborting pending transaction %r', tid)
self._withtxn(self._doabort, tid)
else:
+ self.log('recovering pending transaction %r', tid)
self._withtxn(self._docommit, tid)
# Initialize our cache of the next available version id.
c = self._versions.cursor()
@@ -295,9 +289,9 @@
# Convert to a Python long integer. Note that cursor.last()
# returns key/value, and we want the key (which for the
# versions table is the vid).
- self.__nextvid = U64(rec[0])
+ self._nextvid = U64(rec[0])
else:
- self.__nextvid = 0L
+ self._nextvid = 0L
# Initialize the last transaction
c = self._txnoids.cursor()
try:
@@ -305,13 +299,18 @@
finally:
c.close()
if rec:
- self.__ltid = rec[0]
+ self._ltid = rec[0]
else:
- self.__ltid = ZERO
+ self._ltid = ZERO
+
+ def _make_autopacker(self, event):
+ config = self._config
+ return _Autopack(self, event,
+ config.frequency, config.packtime, config.gcpack)
def _doabort(self, txn, tid):
# First clean up the oid indexed (or oid+tid indexed) tables.
- co = cs = ct = cv = None
+ co = cs = ct = cv = cr = None
try:
co = self._oids.cursor(txn=txn)
cs = self._serials.cursor(txn=txn)
@@ -338,10 +337,7 @@
revid = oid+tid
vid = self._metadata[revid][:8]
self._metadata.delete(revid, txn=txn)
- try:
- self._pickles.delete(revid, txn=txn)
- except db.DBNotFoundError:
- pass
+ self._pickles.delete(revid, txn=txn)
# Clean up the object revisions table
try:
cr.set(oid+tid)
@@ -442,7 +438,7 @@
# if co.close() were to fail. In practice this shouldn't happen.
if co: co.close()
if cs: cs.close()
- # Now incref all the object refcounts
+ # Now incref all references
for oid, delta in deltas.items():
refcount = self._refcounts.get(oid, ZERO, txn=txn)
self._refcounts.put(oid, incr(refcount, delta), txn=txn)
@@ -450,21 +446,9 @@
self._pvids.truncate(txn)
self._prevrevids.truncate(txn)
self._pending.truncate(txn)
- # If we're in the middle of a pack, we need to add to the packmark
- # table any objects that were modified in this transaction.
- # Otherwise, there's a race condition where mark might have happened,
- # then the object is added, then sweep runs, deleting the object
- # created in the interrim.
- if self._packing:
- for oid in self._oids.keys():
- self._packmark.put(oid, PRESENT, txn=txn)
self._oids.truncate(txn)
def _dobegin(self, txn, tid, u, d, e):
- # When a transaction begins, we set the pending flag to ABORT,
- # meaning, if we crash between now and the time we vote, all changes
- # will be aborted.
- #
# It's more convenient to store the transaction metadata now, rather
# than in the _finish() call. Doesn't matter because if the ZODB
# transaction were to abort, we'd clean this up anyway.
@@ -472,6 +456,9 @@
desclen = len(d)
lengths = pack('>II', userlen, desclen)
data = lengths + u + d + e
+ # When a transaction begins, we set the pending flag to ABORT,
+ # meaning, if we crash between now and the time we vote, all changes
+ # will be aborted.
self._pending.put(tid, ABORT, txn=txn)
self._txnMetadata.put(tid, data, txn=txn)
@@ -480,7 +467,7 @@
def _finish(self, tid, u, d, e):
self._withtxn(self._docommit, self._serial)
- self.__ltid = tid
+ self._ltid = tid
#
# Storing an object revision in a transaction
@@ -539,7 +526,7 @@
# The non-version revid is the same as for the previous
# revision of the object.
nvrevid = onvrevid
- # Now store optimistically data to all the tables
+ # Now optimistically store data to all the tables
newserial = self._serial
revid = oid + newserial
self._serials.put(oid, newserial, txn=txn)
@@ -562,6 +549,8 @@
def store(self, oid, serial, data, version, transaction):
# Lock and transaction wrapper
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
self._lock_acquire()
@@ -683,9 +672,9 @@
# progress gets aborted.
vid = self._vids.get(version)
if vid is None:
- self.__nextvid += 1
+ self._nextvid += 1
# Convert the version id into an 8-byte string
- vid = p64(self.__nextvid)
+ vid = p64(self._nextvid)
# Now update the vids/versions tables, along with the log table
self._vids.put(version, vid, txn=txn)
self._versions.put(vid, version, txn=txn)
@@ -718,7 +707,7 @@
rec = c.next()
continue
# This object was modified
- rtnoids[oid] = 1
+ rtnoids[oid] = True
# Calculate the values for the new transaction metadata
serial, tid = self._getSerialAndTid(oid)
meta = self._metadata[oid+tid]
@@ -792,7 +781,7 @@
if not dest:
dvid = ZERO
else:
- # Find the vid for the dest version, or create on eif necessary.
+ # Find the vid for the dest version, or create one if necessary.
dvid = self._findcreatevid(dest, txn)
c = self._currentVersions.cursor(txn)
try:
@@ -812,14 +801,14 @@
rec = c.next()
continue
# This object was modified
- rtnoids[oid] = 1
+ rtnoids[oid] = True
# Calculate the values for the new transaction metadata
serial, tid = self._getSerialAndTid(oid)
meta = self._metadata[oid+tid]
curvid, nvrevid, lrevid = unpack('>8s8s8s', meta[:24])
assert curvid == svid
# If we're committing to the non-version, then the nvrevid
- # ougt to be ZERO too, regardless of what it was for the
+ # ought to be ZERO too, regardless of what it was for the
# source version.
if not dest:
nvrevid = ZERO
@@ -893,10 +882,7 @@
vid = self._vids.get(version, missing)
if vid is missing:
return True
- if self._currentVersions.has_key(vid):
- return False
- else:
- return True
+ return not self._currentVersions.has_key(vid)
finally:
self._lock_release()
@@ -949,7 +935,8 @@
if vid == ZERO or self._versions.get(vid) == version:
return self._pickles[oid+lrevid], serial
# The object was living in a version, but not the one requested.
- # Semantics here are to return the non-version revision.
+ # Semantics here are to return the non-version revision. Allow
+ # KeyErrors to percolate up (meaning there's no non-version rev).
lrevid = self._metadata[oid+nvrevid][16:24]
return self._pickles[oid+lrevid], nvrevid
finally:
@@ -1066,13 +1053,16 @@
def lastTransaction(self):
"""Return transaction id for last committed transaction"""
- return self.__ltid
+ return self._ltid
#
# Transactional undo
#
def _undo_current_tid(self, oid, ctid):
+ # Returns (oid, metadata record, None). The last represents the data
+ # which will always be None because there's no conflict resolution
+ # necessary.
vid, nvrevid, lrevid, prevrevid = unpack(
'>8s8s8s8s', self._metadata[oid+ctid])
# We can always undo the last transaction. The prevrevid pointer
@@ -1092,9 +1082,8 @@
return oid, vid+nvrevid+DNE+ctid, None
# BAW: If the serial number of this object record is the same as
# the serial we're being asked to undo, then I think we have a
- # problem (since the storage invariant is that it doesn't retain
- # metadata records for multiple modifications of the object in the
- # same transaction).
+ # problem (since the storage invariant is that it retains only one
+ # metadata record per object revision).
assert mrec[0][8:] <> ctid, 'storage invariant violated'
# All is good, so just restore this metadata record
return oid, mrec[1], None
@@ -1102,6 +1091,9 @@
mdc.close()
def _undo_to_same_pickle(self, oid, tid, ctid):
+ # Returns (oid, metadata record, data). Data always be None unless
+ # conflict resolution was necessary and succeeded.
+ #
# We need to compare the lrevid (pickle pointers) of the transaction
# previous to the current one, and the transaction previous to the one
# we want to undo. If their lrevids are the same, it's undoable
@@ -1115,9 +1107,9 @@
vid, nvrevid = unpack('>8s8s', self._metadata[oid+tid][:16])
return oid, vid+nvrevid+DNE+ctid, None
elif target_prevrevid == ZERO or last_prevrevid == ZERO:
- # The object's revision is in it's initial creation state but
- # we're asking for an undo of something other than the initial
- # creation state. No, no.
+ # The object's revision is in its initial creation state but we're
+ # asking for an undo of something other than the initial creation
+ # state. No, no.
raise POSException.UndoError(
'Undoing mismatched zombification', oid)
last_lrevid = self._metadata[oid+last_prevrevid][16:24]
@@ -1125,21 +1117,26 @@
target_lrevid = target_metadata[16:24]
# If the pickle pointers of the object's last revision and the
# undo-target revision are the same, then the transaction can be
- # undone. Note that we take a short cut here, since we really want to
- # test pickle equality, but this is good enough for now.
+ # undone. Note that we cannot test for pickle equality here because
+ # that would allow us to undo to an arbitrary object history. Imagine
+ # a boolean object -- if undo tested for equality and not identity,
+ # then half the time we could undo to an arbitrary point in the
+ # object's history.
if target_lrevid == last_lrevid:
return oid, target_metadata, None
- # Check previous transactionalUndos done in this transaction
+ # Check previous transactional undos done in this transaction
elif target_lrevid == self._prevrevids.get(oid):
return oid, target_metadata, None
else:
# Attempt application level conflict resolution
- data = self.tryToResolveConflict(
- oid, ctid, tid, self._pickles[oid+target_lrevid])
+ try:
+ data = self.tryToResolveConflict(
+ oid, ctid, tid, self._pickles[oid+target_lrevid])
+ except ConflictError:
+ raise POSException.UndoError, 'Cannot undo transaction'
if data:
return oid, target_metadata, data
- else:
- raise POSException.UndoError('Cannot undo transaction', oid)
+ raise POSException.UndoError('Cannot undo transaction', oid)
def _dotxnundo(self, txn, tid):
# First, make sure the transaction isn't protected by a pack.
@@ -1184,10 +1181,10 @@
self._pickles.put(revid, data, txn=txn)
metadata = vid+nvrevid+newserial+prevrevid
# We need to write all the new records for an object changing in
- # this transaction. Note that we only write to th serials table
+ # this transaction. Note that we only write to the serials table
# if prevrevids hasn't already seen this object, otherwise we'll
# end up with multiple entries in the serials table for the same
- # tid.
+ # object revision.
if not self._prevrevids.has_key(oid):
self._serials.put(oid, newserial, txn=txn)
self._metadata.put(revid, metadata, txn=txn)
@@ -1198,7 +1195,7 @@
if vid <> ZERO:
self._currentVersions.put(vid, revid, txn=txn)
self._oids.put(oid, PRESENT, txn=txn)
- rtnoids[oid] = 1
+ rtnoids[oid] = True
# Add this object revision to the autopack table
self._objrevs.put(newserial+oid, prevrevid, txn=txn)
return rtnoids.keys()
@@ -1212,6 +1209,21 @@
finally:
self._lock_release()
+ def _unpack_txnmeta(self, txnmeta):
+ userlen, desclen = unpack('>2I', txnmeta[:8])
+ user = txnmeta[8:8+userlen]
+ desc = txnmeta[8+userlen:8+userlen+desclen]
+ extdata = txnmeta[8+userlen+desclen:]
+ # ext is a pickled mapping. Any exceptions are ignored, but XXX can
+ # we (and FileStorage :) do better?
+ ext = {}
+ if extdata:
+ try:
+ ext = pickle.loads(extdata)
+ except Exception, e:
+ self.log('Error unpickling extension data: %s', e)
+ return user, desc, ext
+
def _doundolog(self, first, last, filter):
# Get the last packtime
packtime = self._last_packtime()
@@ -1229,26 +1241,14 @@
rec = c.prev()
if tid <= packtime:
break
- userlen, desclen = unpack('>II', txnmeta[:8])
- user = txnmeta[8:8+userlen]
- desc = txnmeta[8+userlen:8+userlen+desclen]
- ext = txnmeta[8+userlen+desclen:]
+ user, desc, ext = self._unpack_txnmeta(txnmeta)
# Create a dictionary for the TransactionDescription
txndesc = {'id' : tid,
'time' : TimeStamp(tid).timeTime(),
'user_name' : user,
'description': desc,
}
- # The extension stuff is a picklable mapping, so if we can
- # unpickle it, we update the TransactionDescription dictionary
- # with that data. BAW: The bare except is disgusting, but I'm
- # too lazy to figure out what exceptions could actually be
- # raised here...
- if ext:
- try:
- txndesc.update(pickle.loads(ext))
- except:
- pass
+ txndesc.update(ext)
# Now call the filter to see if this transaction should be
# added to the return list...
if filter is None or filter(txndesc):
@@ -1359,6 +1359,20 @@
# First, the public API for classic pack
def pack(self, t, zreferencesf):
+ """Perform a pack on the storage.
+
+ There are two forms of packing: incremental and full gc. In an
+ incremental pack, only old object revisions are removed. In a full gc
+ pack, cyclic garbage detection and removal is also performed.
+
+ t is the pack time. All non-current object revisions older than t
+ will be removed in an incremental pack.
+
+ pack() always performs an incremental pack. If the gc flag is True,
+ then pack() will also perform a garbage collection. Some storages
+ (e.g. FileStorage) always do both phases in a pack() call. Such
+ storages should simply ignore the gc flag.
+ """
# For all intents and purposes, referencesf here is always going to be
# the same as ZODB.referencesf.referencesf. It's too much of a PITA
# to pass that around to the helper methods, so just assert they're
@@ -1381,14 +1395,36 @@
self.log('classic pack finished')
def _dopack(self, t, gc=True):
+ # BAW: should a pack time in the future be a ValueError? When ZEO is
+ # involved, t could come from a remote machine with a skewed clock.
+ # Jim wants us to believe t if it's "close", but our algorithm
+ # requires synchronicity between the calculation of the pack time and
+ # the timestamps used in serial numbers.
+ #
+ # If a transaction is currently in progress, wait for it to finish
+ # before calculating the pack time, by acquiring the commit lock.
+ # This guarantees that the next transaction begins after the pack
+ # time so that any objects added in that transaction will have a
+ # serial number greater than the pack time. Such objects will be
+ # completely ignored for packing purposes.
+ #
+ # If we don't do this, then it would be possible for some of the
+ # current transaction's objects to have been stored with a serial
+ # number earlier than the pack time, but not yet linked to the root.
+ # Say that thread 1 starts a transaction, and then thread 2 starts a
+ # pack. Thread 2 then marks the root-reachable objects, but before
+ # sweeping, object B is stored by thread 1. If the object linking B
+ # to the root hasn't been stored by the time of the sweep, B will be
+ # collected as garbage.
+ #
# t is a TimeTime, or time float, convert this to a TimeStamp object,
# using an algorithm similar to what's used in FileStorage. We know
# that our transaction ids, a.k.a. revision ids, are timestamps.
- #
- # BAW: should a pack time in the future be a ValueError? We'd have to
- # worry about clock skew, so for now, we just set the pack time to the
- # minimum of t and now.
- packtime = min(t, time.time())
+ self._commit_lock_acquire()
+ try:
+ packtime = min(t, time.time())
+ finally:
+ self._commit_lock_release()
t0 = TimeStamp(*(time.gmtime(packtime)[:5] + (packtime % 60,)))
packtid = `t0`
# Collect all revisions of all objects earlier than the pack time.
@@ -1473,14 +1509,12 @@
# with it again. Otherwise, we can remove the metadata record
# for this revision and decref the corresponding pickle.
if oldserial <> ZERO:
+ orevid = oid+oldserial
# It's possible this object revision has already been
# deleted, if the oid points to a decref'd away object
- try:
- metadata = self._metadata[oid+oldserial]
- except KeyError:
- pass
- else:
- self._metadata.delete(oid+oldserial, txn=txn)
+ if self._metadata.has_key(orevid):
+ metadata = self._metadata[orevid]
+ self._metadata.delete(orevid, txn=txn)
# Decref the pickle
self._decrefPickle(oid, metadata[16:24], txn)
try:
@@ -1505,19 +1539,19 @@
if lrevid == DNE:
# There is no pickle data
return
- key = oid + lrevid
- refcount = U64(self._pickleRefcounts.get(key, ZERO)) - 1
+ revid = oid + lrevid
+ refcount = U64(self._pickleRefcounts.get(revid, ZERO)) - 1
assert refcount >= 0
if refcount == 0:
# We can collect this pickle
- self._pickleRefcounts.delete(key, txn=txn)
- data = self._pickles[key]
- self._pickles.delete(key, txn=txn)
+ self._pickleRefcounts.delete(revid, txn=txn)
+ data = self._pickles[revid]
+ self._pickles.delete(revid, txn=txn)
deltas = {}
self._update(deltas, data, -1)
self._decref(deltas, txn)
else:
- self._pickleRefcounts.put(key, p64(refcount), txn=txn)
+ self._pickleRefcounts.put(revid, p64(refcount), txn=txn)
def _decref(self, deltas, txn):
for oid, delta in deltas.items():
@@ -1605,7 +1639,7 @@
# BAW: Maybe this could probably be more efficient by not doing so
# much searching, but it would also be more complicated, so the
# tradeoff should be measured.
- serial = None
+ serial, tid = self._getSerialAndTid(oid)
c = self._metadata.cursor(txn=txn)
try:
rec = c.set_range(oid)
@@ -1623,9 +1657,52 @@
c.close()
return serial
+ def _rootset(self, packtid, txn):
+ # Find the root set for reachability purposes. A root set is a tuple
+ # of oid and tid. First, the current root object as of the pack time
+ # is always in the root set. Second, any object revision after the
+ # pack time that has a back pointer (lrevid) to before the pack time
+ # serves as another root because some future undo could then revive
+ # any referenced objects. The root set ends up in the oidqueue.
+ try:
+ zerorev = self._findrev(ZERO, packtid, txn)
+ except KeyError:
+ # There's no root object
+ return
+ self._oidqueue.append(ZERO+zerorev, txn)
+ c = self._txnoids.cursor(txn)
+ try:
+ try:
+ rec = c.set_range(packtid)
+ except db.DBNotFoundError:
+ rec = None
+ while rec:
+ tid, oid = rec
+ revid = oid + tid
+ rec = c.next()
+ lrevid = self._metadata[revid][16:24]
+ if lrevid < packtid:
+ self._oidqueue.append(revid, txn)
+ finally:
+ c.close()
+
+ # tid is None if all we care about is that any object revision is present.
+ def _packmark_has(self, oid, tid, txn):
+ if tid is None:
+ return self._packmark.has_key(oid)
+ c = self._packmark.cursor(txn)
+ try:
+ try:
+ c.set_both(oid, tid)
+ return True
+ except db.DBNotFoundError:
+ return False
+ finally:
+ c.close()
+
def _mark(self, txn, packtid):
# Find the oids for all the objects reachable from the root, as of the
- # pack time. To reduce the amount of in-core memory we need do do a
+ # pack time. To reduce the amount of in-core memory we need to do a
# pack operation, we'll save the mark data in the packmark table. The
# oidqueue is a BerkeleyDB Queue that holds the list of object ids to
# look at next, and by using this we don't need to keep an in-memory
@@ -1634,36 +1711,41 @@
# Quick exit for empty storages
if not self._serials:
return
- # The oid of the object we're looking at, starting at the root
- oid = ZERO
- # Start at the root, find all the objects the current revision of the
- # root references, and then for each of those, find all the objects it
- # references, and so on until we've traversed the entire object graph.
- while oid:
+ # Start with the root set, iterating over all reachable objects until
+ # we've traversed the entire object tree.
+ self._rootset(packtid, txn)
+ rec = self._oidqueue.consume(txn)
+ while rec:
if self._stop:
raise PackStop, 'stopped in _mark()'
- if not self._packmark.has_key(oid):
- # We haven't seen this object yet
- self._packmark.put(oid, PRESENT, txn=txn)
- # Get the pickle data for the most current revision of this
- # object as of the pack time.
- tid = self._findrev(oid, packtid, txn)
+ revid = rec[1]
+ oid = revid[:8]
+ tid = revid[8:]
+ # See if this revision is already in the packmark
+ if not self._packmark_has(oid, tid, txn):
+ # BAW: We are more conservative than FileStorage here, since
+ # any reference to an object keeps all the object references
+ # alive. FileStorage will collect individual object
+ # revisions. I think our way is fine since we'll eventually
+ # collect everything incrementally anyway, and for Berkeley,
+ # all object revisions add to the refcount total.
+ self._packmark.put(oid, tid, txn=txn)
# Say there's no root object (as is the case in some of the
# unit tests), and we're looking up oid ZERO. Then serial
# will be None.
if tid is not None:
lrevid = self._metadata[oid+tid][16:24]
- data = self._pickles[oid+lrevid]
# Now get the oids of all the objects referenced by this
- # pickle
+ # object revision
+ data = self._pickles[oid+lrevid]
refdoids = []
referencesf(data, refdoids)
# And append them to the queue for later
- for oid in refdoids:
- self._oidqueue.append(oid, txn)
+ for roid in refdoids:
+ rtid = self._findrev(roid, tid, txn)
+ self._oidqueue.append(roid+rtid, txn)
# Pop the next oid off the queue and do it all again
rec = self._oidqueue.consume(txn)
- oid = rec and rec[1]
assert len(self._oidqueue) == 0
def _sweep(self, txn, packtid):
@@ -1684,7 +1766,7 @@
# Otherwise, if packmark (which knows about all the root
# reachable objects) doesn't have a record for this guy, then
# we can zap it. Do so by appending to oidqueue.
- if not self._packmark.has_key(oid):
+ if not self._packmark_has(oid, None, txn):
self._delqueue.append(oid, txn)
finally:
c.close()
@@ -1722,7 +1804,7 @@
rec = c.next()
if rec is None:
raise IndexError
- tid, data = rec
+ tid, txnmeta = rec
# Now unpack the necessary information. Don't impedence match the
# status flag (that's done by the caller).
packtime = self._last_packtime()
@@ -1730,38 +1812,13 @@
packedp = True
else:
packedp = False
- userlen, desclen = unpack('>II', data[:8])
- user = data[8:8+userlen]
- desc = data[8+userlen:8+userlen+desclen]
- ext = data[8+userlen+desclen:]
+ user, desc, ext = self._unpack_txnmeta(txnmeta)
return tid, packedp, user, desc, ext
finally:
if c:
c.close()
self._lock_release()
- def _alltxnoids(self, tid):
- self._lock_acquire()
- c = self._txnoids.cursor()
- try:
- oids = []
- oidkeys = {}
- try:
- rec = c.set(tid)
- except db.DBNotFoundError:
- rec = None
- while rec:
- # Ignore the key
- oid = rec[1]
- if not oidkeys.has_key(oid):
- oids.append(oid)
- oidkeys[oid] = 1
- rec = c.next_dup()
- return oids
- finally:
- c.close()
- self._lock_release()
-
# Other interface assertions
def supportsTransactionalUndo(self):
return True
@@ -1793,6 +1850,7 @@
self._stop = stop
self._closed = False
self._first = True
+ self._iters = []
def __len__(self):
# This is a lie. It's here only for Python 2.1 support for
@@ -1823,9 +1881,13 @@
if self._stop is not None and tid > self._stop:
raise IndexError
self._tid = tid
- return _RecordsIterator(self._storage, tid, packedp, user, desc, ext)
+ it = _RecordsIterator(self._storage, tid, packedp, user, desc, ext)
+ self._iters.append(it)
+ return it
def close(self):
+ for it in self._iters:
+ it.close()
self._closed = True
@@ -1863,14 +1925,11 @@
self.status = ' '
self.user = user
self.description = desc
- try:
- self._extension = pickle.loads(ext)
- except EOFError:
- self._extension = {}
- # Internal pointer
- self._oids = self._storage._alltxnoids(self.tid)
- # To make .pop() more efficient
- self._oids.reverse()
+ self._extension = ext
+ # BAW: touching the storage's private parts!
+ self._table = self._storage._txnoids
+ self._cursor = None
+ self._rec = None
def next(self):
"""Return the ith item in the sequence of record data.
@@ -1879,10 +1938,42 @@
IndexError will be raised after all of the items have been
returned.
"""
- # Let IndexError percolate up
- oid = self._oids.pop()
- data, version, lrevid = self._storage._loadSerialEx(oid, self.tid)
- return _Record(oid, self.tid, version, data, lrevid)
+ if self._table is None:
+ # We already exhausted this iterator
+ raise IndexError
+ # Initialize a txnoids cursor and set it to the start of the oids
+ # touched by this transaction. We do this here to ensure the cursor
+ # is closed if there are any problems. A hole in this approach is if
+ # the client never exhausts the iterator. Then I think we have a
+ # problem because I don't think the environment can be closed if
+ # there's an open cursor, but you also cannot close the cursor if the
+ # environment is already closed (core dumps), so an __del__ doesn't
+ # help a whole lot.
+ try:
+ if self._cursor is None:
+ self._cursor = self._table.cursor()
+ try:
+ self._rec = self._cursor.set(self.tid)
+ except db.DBNotFoundError:
+ pass
+ # Cursor exhausted?
+ if self._rec is None:
+ self.close()
+ raise IndexError
+ oid = self._rec[1]
+ self._rec = self._cursor.next_dup()
+ data, version, lrevid = self._storage._loadSerialEx(oid, self.tid)
+ return _Record(oid, self.tid, version, data, lrevid)
+ except:
+ self.close()
+ raise
+
+ def close(self):
+ if self._cursor:
+ self._cursor.close()
+ self._cursor = None
+ # _table == None means the iterator has been exhausted
+ self._table = None
@@ -1910,22 +2001,20 @@
class _Autopack(_WorkThread):
NAME = 'autopacking'
- def __init__(self, storage, event,
- frequency, packtime, classicpack,
- lastpacktime):
+ def __init__(self, storage, event, frequency, packtime, gcpack):
_WorkThread.__init__(self, storage, event, frequency)
self._packtime = packtime
- self._classicpack = classicpack
+ self._gcpack = gcpack
# Bookkeeping
- self._lastclassic = 0
+ self._lastgc = 0
def _dowork(self):
- # Should we do a classic pack this time?
- if self._classicpack <= 0:
- classicp = False
+ # Should we do a full gc pack this time?
+ if self._gcpack <= 0:
+ dofullgc = False
else:
- v = (self._lastclassic + 1) % self._classicpack
- self._lastclassic = v
- classicp = not v
- # Run the autopack phase
- self._storage.autopack(time.time() - self._packtime, classicp)
+ v = (self._lastgc + 1) % self._gcpack
+ self._lastgc = v
+ dofullgc = not v
+ # Run the full gc phase
+ self._storage.autopack(time.time() - self._packtime, dofullgc)
=== ZODB3/BDBStorage/BDBMinimalStorage.py 1.29 => 1.29.14.1 ===
--- ZODB3/BDBStorage/BDBMinimalStorage.py:1.29 Thu Jan 30 18:31:24 2003
+++ ZODB3/BDBStorage/BDBMinimalStorage.py Tue Sep 9 18:35:37 2003
@@ -15,8 +15,6 @@
"""Berkeley storage without undo or versioning.
"""
-__version__ = '$Revision$'[-2:][0]
-
from __future__ import nested_scopes
from ZODB import POSException
@@ -292,6 +290,8 @@
return newserial
def store(self, oid, serial, data, version, transaction):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
if transaction is not self._transaction:
raise POSException.StorageTransactionError(self, transaction)
# We don't support versions
=== ZODB3/BDBStorage/BerkeleyBase.py 1.41.14.1 => 1.41.14.2 ===
--- ZODB3/BDBStorage/BerkeleyBase.py:1.41.14.1 Mon Jul 28 18:06:30 2003
+++ ZODB3/BDBStorage/BerkeleyBase.py Tue Sep 9 18:35:37 2003
@@ -28,7 +28,7 @@
# BaseStorage provides primitives for lock acquisition and release, and a host
# of other methods, some of which are overridden here, some of which are not.
-from ZODB.lock_file import LockFile
+from ZODB.lock_file import lock_file
from ZODB.BaseStorage import BaseStorage
from ZODB.referencesf import referencesf
import ThreadLock
@@ -56,15 +56,26 @@
class BerkeleyConfig:
- """Bag of bits for describing various underlying configuration options.
+ """Bag of attributes for configuring Berkeley based storages.
Berkeley databases are wildly configurable, and this class exposes some of
that. To customize these options, instantiate one of these classes and
set the attributes below to the desired value. Then pass this instance to
the Berkeley storage constructor, using the `config' keyword argument.
+ BerkeleyDB stores all its information in an `environment directory'
+ (modulo log files, which can be in a different directory, see below). By
+ default, the `name' argument given to the storage constructor names this
+ directory, but you can set this option to explicitly point to a different
+ location:
+
+ - envdir if not None, names the BerkeleyDB environment directory. The
+ directory will be created if necessary, but its parent directory must
+ exist. Additional configuration is available through the BerkeleyDB
+ DB_CONFIG mechanism.
+
Berkeley storages need to be checkpointed occasionally, otherwise
- automatic recover can take a huge amount of time. You should set up a
+ automatic recovery can take a huge amount of time. You should set up a
checkpointing policy which trades off the amount of work done periodically
against the recovery time. Note that the Berkeley environment is
automatically, and forcefully, checkpointed twice when it is closed.
@@ -80,7 +91,7 @@
- min is passed directly to txn_checkpoint()
- You can acheive one of the biggest performance wins by moving the Berkeley
+ You can achieve one of the biggest performance wins by moving the Berkeley
log files to a different disk than the data files. We saw between 2.5 and
7 x better performance this way. Here are attributes which control the
log files.
@@ -111,17 +122,18 @@
to autopack to. E.g. if packtime is 14400, autopack will pack to 4
hours in the past. For Minimal storage, this value is ignored.
- - classicpack is an integer indicating how often an autopack phase should
- do a full classic pack. E.g. if classicpack is 24 and frequence is
- 3600, a classic pack will be performed once per day. Set to zero to
- never automatically do classic packs. For Minimal storage, this value
- is ignored -- all packs are classic packs.
+ - gcpack is an integer indicating how often an autopack phase should do a
+ full garbage collecting pack. E.g. if gcpack is 24 and frequence is
+ 3600, a gc pack will be performed once per day. Set to zero to never
+ automatically do gc packs. For Minimal storage, this value is ignored;
+ all packs are gc packs.
Here are some other miscellaneous configuration variables:
- read_only causes ReadOnlyError's to be raised whenever any operation
(except pack!) might modify the underlying database.
"""
+ envdir = None
interval = 120
kbyte = 0
min = 0
@@ -129,13 +141,14 @@
cachesize = 128 * 1024 * 1024
frequency = 0
packtime = 4 * 60 * 60
- classicpack = 0
- read_only = 0
+ gcpack = 0
+ read_only = False
def __repr__(self):
d = self.__class__.__dict__.copy()
d.update(self.__dict__)
return """<BerkeleyConfig (read_only=%(read_only)s):
+\tenvironment dir:: %(envdir)s
\tcheckpoint interval: %(interval)s seconds
\tcheckpoint kbytes: %(kbyte)s
\tcheckpoint minutes: %(min)s
@@ -145,7 +158,7 @@
\t----------------------
\tautopack frequency: %(frequency)s seconds
\tpack to %(packtime)s seconds in the past
-\tclassic pack every %(classicpack)s autopacks
+\tclassic pack every %(gcpack)s autopacks
\t>""" % d
@@ -238,7 +251,7 @@
else:
self._autopacker = None
self.log('ready')
-
+
def _version_check(self, txn):
raise NotImplementedError
@@ -260,12 +273,12 @@
# Our storage is based on the underlying BSDDB btree database type.
if reclen is not None:
d.set_re_len(reclen)
+ # DB 4.1 requires that operations happening in a transaction must be
+ # performed on a database that was opened in a transaction. Since we
+ # do the former, we must do the latter. However, earlier DB versions
+ # don't transactionally protect database open, so this is the most
+ # portable way to write the code.
openflags = db.DB_CREATE
- # DB 4.1.24 requires that operations happening in a transaction must
- # be performed on a database that was opened in a transaction. Since
- # we do the former, we must do the latter. However, earlier DB
- # versions don't transactionally protect database open, so this is the
- # most portable way to write the code.
try:
openflags |= db.DB_AUTO_COMMIT
except AttributeError:
@@ -312,10 +325,11 @@
If last is provided, the new oid will be one greater than that.
"""
# BAW: the last parameter is undocumented in the UML model
+ newoid = BaseStorage.new_oid(self, last)
if self._len is not None:
# Increment the cached length
self._len += 1
- return BaseStorage.new_oid(self, last)
+ return newoid
def getSize(self):
"""Return the size of the database."""
@@ -401,8 +415,10 @@
# can't hurt and is more robust.
self._env.txn_checkpoint(0, 0, db.DB_FORCE)
self._env.txn_checkpoint(0, 0, db.DB_FORCE)
+ lockfile = os.path.join(self._env.db_home, '.lock')
self._lockfile.close()
self._env.close()
+ os.unlink(lockfile)
# A couple of convenience methods
def _update(self, deltas, data, incdec):
@@ -466,7 +482,15 @@
# This is required in order to work around the Berkeley lock
# exhaustion problem (i.e. we do our own application level locks
# rather than rely on Berkeley's finite page locks).
- lockfile = LockFile(os.path.join(envname, '.lock'))
+ lockpath = os.path.join(envname, '.lock')
+ try:
+ lockfile = open(lockpath, 'r+')
+ except IOError, e:
+ if e.errno <> errno.ENOENT: raise
+ lockfile = open(lockpath, 'w+')
+ lock_file(lockfile)
+ lockfile.write(str(os.getpid()))
+ lockfile.flush()
try:
# Create, initialize, and open the environment
env = db.DBEnv()
More information about the Zodb-checkins
mailing list