[Zodb-checkins] CVS: Zope/lib/python/ZODB - FileStorage.py:1.76.16.3
Jeremy Hylton
jeremy@zope.com
Thu, 11 Apr 2002 16:40:36 -0400
Update of /cvs-repository/Zope/lib/python/ZODB
In directory cvs.zope.org:/tmp/cvs-serv20065
Modified Files:
Tag: Zope-2_5-branch
FileStorage.py
Log Message:
Backport fix of transaction_id generation from the trunk.
The transaction_id generated by undoLog() is now meaningful across
packs or across multiple Standby storages. It's possible that the
transaction won't be present, but the storage will find it if it is
regardless of its exact location in the file.
=== Zope/lib/python/ZODB/FileStorage.py 1.76.16.2 => 1.76.16.3 ===
def load(self, oid, version, _stuff=None):
self._lock_acquire()
- try: return self._load(oid, version, self._index, self._file)
- finally: self._lock_release()
+ try:
+ return self._load(oid, version, self._index, self._file)
+ finally:
+ self._lock_release()
def loadSerial(self, oid, serial):
self._lock_acquire()
try:
- _index=self._index
file=self._file
seek=file.seek
read=file.read
try:
- pos=_index[oid]
+ pos = self._index[oid]
except KeyError:
raise POSKeyError(oid)
while 1:
@@ -619,7 +620,10 @@
def modifiedInVersion(self, oid):
self._lock_acquire()
try:
- pos=self._index[oid]
+ try:
+ pos = self._index[oid]
+ except KeyError:
+ raise POSKeyError(oid)
file=self._file
seek=file.seek
seek(pos)
@@ -873,7 +877,7 @@
self._lock_acquire()
try:
self._clear_index()
- transaction_id=base64.decodestring(transaction_id+'==\n')
+ transaction_id=base64.decodestring(transaction_id + '\n')
tid, tpos = transaction_id[:8], U64(transaction_id[8:])
packt=self._packt
if packt is None or packt > tid:
@@ -967,7 +971,6 @@
self._file.seek(pos+8)
return self._file.read(8)
-
def _transactionalUndoRecord(self, oid, pos, serial, pre, version):
"""Get the indo information for a data record
@@ -975,7 +978,6 @@
version, packed non-version data pointer, and current
position. If the pickle is true, then the data pointer must
be 0, but the pickle can be empty *and* the pointer 0.
-
"""
copy=1 # Can we just copy a data pointer
@@ -1031,6 +1033,85 @@
raise UndoError('Some data were modified by a later transaction')
+ # undoLog() returns a description dict that includes an id entry.
+ # The id is opaque to the client, but encodes information that
+ # uniquely identifies a transaction in the storage. The id is a
+ # base64 encoded string, where the components of the string are:
+ # - the transaction id
+ # - the packed file position of the transaction record
+ # - the oid of an object modified by the transaction
+
+ # The file position is sufficient in most cases, but doesn't work
+ # if the id is used after a pack and may not work if used with
+ # replicated storages. If the file position is incorrect, the oid
+ # can be used for a relatively efficient search for the
+ # transaction record. FileStorage keeps an index mapping oids to
+ # file positions, but do notes have a transaction id to file
+ # offset index. The oid index maps to the most recent revision of
+ # the object. Transactional undo must follow back pointers until
+ # it finds the correct transaction record,
+
+ # This approach fails if the transaction record has no data
+ # records. It's not clear if that is possible, but it may be for
+ # commitVersion and abortVersion.
+
+ # The file offset also supports non-transactional undo, which
+ # won't work after a pack and isn't supported by replicated
+ # storages.
+
+ def undoLog(self, first=0, last=-20, filter=None):
+ if last < 0:
+ last = first - last + 1
+ self._lock_acquire()
+ try:
+ if self._packt is None:
+ raise UndoError(
+ 'Undo is currently disabled for database maintenance.<p>')
+ pos = self._pos
+ r = []
+ i = 0
+ # BAW: Why 39 please? This makes no sense (see also below).
+ while i < last and pos > 39:
+ self._file.seek(pos - 8)
+ pos = pos - U64(self._file.read(8)) - 8
+ self._file.seek(pos)
+ h = self._file.read(TRANS_HDR_LEN)
+ tid, tl, status, ul, dl, el = struct.unpack(">8s8scHHH", h)
+ if tid < self._packt:
+ break
+ if status != ' ':
+ continue
+ d = u = ''
+ if ul:
+ u = self._file.read(ul)
+ if dl:
+ d = self._file.read(dl)
+ e = {}
+ if el:
+ try:
+ e = loads(read(el))
+ except:
+ pass
+ next = self._file.read(8)
+ # next is either the redundant txn length - 8, or an oid
+ if next == tl:
+ # There were no objects in this txn
+ id = tid + p64(pos)
+ else:
+ id = tid + p64(pos) + next
+ d = {'id': base64.encodestring(id).rstrip(),
+ 'time': TimeStamp(tid).timeTime(),
+ 'user_name': u,
+ 'description': d}
+ d.update(e)
+ if filter is None or filter(d):
+ if i >= first:
+ r.append(d)
+ i += 1
+ return r
+ finally:
+ self._lock_release()
+
def transactionalUndo(self, transaction_id, transaction):
"""Undo a transaction, given by transaction_id.
@@ -1050,124 +1131,136 @@
self._lock_acquire()
try:
- transaction_id = base64.decodestring(transaction_id + '==\n')
- tid, tpos = transaction_id[:8], U64(transaction_id[8:])
-
- ostloc = p64(self._pos)
- here = self._pos + (self._tfile.tell() + self._thl)
+ return self._txn_undo(transaction_id)
+ finally:
+ self._lock_release()
- self._file.seek(tpos)
- h = self._file.read(TRANS_HDR_LEN)
- if len(h) != TRANS_HDR_LEN or h[:8] != tid:
- raise UndoError, 'Invalid undo transaction id'
- if h[16] == 'u':
- return
- if h[16] != ' ':
- raise UndoError, 'non-undoable transaction'
- tl = U64(h[8:16])
- ul, dl, el = struct.unpack(">HHH", h[17:TRANS_HDR_LEN])
- tend = tpos + tl
- pos = tpos + (TRANS_HDR_LEN + ul + dl + el)
- tindex = {}
- failures = {} # keep track of failures, cause we may succeed later
- failed = failures.has_key
- # Read the data records for this transaction
- while pos < tend:
+ def _txn_undo(self, transaction_id):
+ # Find the right transaction to undo and call _txn_undo_write().
+ transaction_id = base64.decodestring(transaction_id + '\n')
+ tid = transaction_id[:8]
+ tpos = U64(transaction_id[8:16])
+ if not self._check_txn_pos(tpos, tid):
+ # If the pos and tid don't match, we must use the oid to
+ # find the transaction record. Find the file position for
+ # the current revision of this object, and search back for
+ # the beginning of its transaction record
+ oid = transaction_id[16:]
+ if oid == '' or not self._index.has_key(oid):
+ # XXX Is this the right error message?
+ raise UndoError('Undoing a non-object affecting transaction')
+ pos = self._index[oid]
+ while 1:
self._file.seek(pos)
h = self._file.read(DATA_HDR_LEN)
- oid, serial, sprev, stloc, vlen, splen = \
- struct.unpack(">8s8s8s8sH8s", h)
- if failed(oid):
- del failures[oid] # second chance!
- plen = U64(splen)
- prev = U64(sprev)
- if vlen:
- dlen = DATA_VERSION_HDR_LEN + vlen + (plen or 8)
- self._file.seek(16, 1)
- version = self._file.read(vlen)
- else:
- dlen = DATA_HDR_LEN + (plen or 8)
- version = ''
+ doid, serial, prev, tpos, vlen, plen = \
+ unpack('>8s8s8s8sH8s', h)
+ tpos = U64(tpos)
+ self._file.seek(tpos)
+ # Read transaction id to see if we've got a match
+ thistid = self._file.read(8)
+ if thistid == tid:
+ break # Yeee ha!
+ # Keep looking
+ pos = U64(prev)
+ if not pos:
+ # We never found the right transaction
+ raise UndoError('Invalid undo transaction id')
+ tindex = self._txn_undo_write(tpos, tid)
+ self._tindex.update(tindex)
+ return tindex.keys()
- try:
- p, prev, v, snv, ipos = self._transactionalUndoRecord(
- oid, pos, serial, prev, version)
- except UndoError, v:
- # Don't fail right away. We may be redeemed later!
- failures[oid] = v
+ def _check_txn_pos(self, pos, tid):
+ "Return true if pos is location of the transaction record for tid."
+ self._file.seek(pos)
+ this_tid = self._file.read(8)
+ if this_tid != tid:
+ return 0
+ # be extra cautious: Check the record length makes sense, to
+ # guard against a random file location that happens to have
+ # the right 8-byte pattern.
+ stlen = self._file.read(8)
+ tlen = U64(stlen)
+ self._file.seek(tlen, 1)
+ redundant_stlen = self._file.read(8)
+ if len(redundant_stlen) != 8:
+ return 0
+ if redundant_stlen != stlen:
+ return 0
+ return 1
+
+ def _txn_undo_write(self, tpos, tid):
+ # a helper function to write the data records for transactional undo
+
+ ostloc = p64(self._pos)
+ here = self._pos + (self._tfile.tell() + self._thl)
+ # Let's move the file pointer back to the start of the txn record.
+ self._file.seek(tpos)
+ h = self._file.read(TRANS_HDR_LEN)
+ if h[16] == 'u':
+ return
+ if h[16] != ' ':
+ raise UndoError('non-undoable transaction')
+ tl = U64(h[8:16])
+ ul, dl, el = struct.unpack(">HHH", h[17:TRANS_HDR_LEN])
+ tend = tpos + tl
+ pos = tpos + (TRANS_HDR_LEN + ul + dl + el)
+ tindex = {}
+ failures = {} # keep track of failures, cause we may succeed later
+ failed = failures.has_key
+ # Read the data records for this transaction
+ while pos < tend:
+ self._file.seek(pos)
+ h = self._file.read(DATA_HDR_LEN)
+ oid, serial, sprev, stloc, vlen, splen = \
+ struct.unpack(">8s8s8s8sH8s", h)
+ if failed(oid):
+ del failures[oid] # second chance!
+ plen = U64(splen)
+ prev = U64(sprev)
+ if vlen:
+ dlen = DATA_VERSION_HDR_LEN + vlen + (plen or 8)
+ self._file.seek(16, 1)
+ version = self._file.read(vlen)
+ else:
+ dlen = DATA_HDR_LEN + (plen or 8)
+ version = ''
+
+ try:
+ p, prev, v, snv, ipos = self._transactionalUndoRecord(
+ oid, pos, serial, prev, version)
+ except UndoError, v:
+ # Don't fail right away. We may be redeemed later!
+ failures[oid] = v
+ else:
+ plen = len(p)
+ self._tfile.write(pack(">8s8s8s8sH8s",
+ oid, self._serial, p64(ipos),
+ ostloc, len(v), p64(plen)))
+ if v:
+ vprev=self._tvindex.get(v, 0) or self._vindex.get(v, 0)
+ self._tfile.write(snv + p64(vprev) + v)
+ self._tvindex[v] = here
+ odlen = DATA_VERSION_HDR_LEN + len(v)+(plen or 8)
else:
- plen =len(p)
- self._tfile.write(pack(">8s8s8s8sH8s",
- oid, self._serial, p64(ipos),
- ostloc, len(v), p64(plen)))
- if v:
- vprev=self._tvindex.get(v, 0) or self._vindex.get(v, 0)
- self._tfile.write(snv + p64(vprev) + v)
- self._tvindex[v]=here
- odlen = DATA_VERSION_HDR_LEN + len(v)+(plen or 8)
- else:
- odlen = DATA_HDR_LEN+(plen or 8)
-
- if p:
- self._tfile.write(p)
- else:
- self._tfile.write(p64(prev))
- tindex[oid]=here
- here=here+odlen
+ odlen = DATA_HDR_LEN+(plen or 8)
- pos=pos+dlen
- if pos > tend:
- raise UndoError, 'non-undoable transaction'
+ if p:
+ self._tfile.write(p)
+ else:
+ self._tfile.write(p64(prev))
+ tindex[oid] = here
+ here += odlen
- if failures: raise UndoError(failures)
- self._tindex.update(tindex)
- return tindex.keys()
+ pos=pos+dlen
+ if pos > tend:
+ raise UndoError, 'non-undoable transaction'
- finally: self._lock_release()
+ if failures:
+ raise UndoError(failures)
- def undoLog(self, first=0, last=-20, filter=None):
- if last < 0: last=first-last+1
- self._lock_acquire()
- try:
- packt=self._packt
- if packt is None:
- raise UndoError(
- 'Undo is currently disabled for database maintenance.<p>')
- pos=self._pos
- if pos < 39: return []
- file=self._file
- seek=file.seek
- read=file.read
- unpack=struct.unpack
- strip=string.strip
- encode=base64.encodestring
- r=[]
- append=r.append
- i=0
- while i < last and pos > 39:
- seek(pos-8)
- pos=pos-U64(read(8))-8
- seek(pos)
- h=read(TRANS_HDR_LEN)
- tid, tl, status, ul, dl, el = unpack(">8s8scHHH", h)
- if tid < packt: break
- if status != ' ': continue
- u=ul and read(ul) or ''
- d=dl and read(dl) or ''
- d={'id': encode(tid+p64(pos))[:22],
- 'time': TimeStamp(tid).timeTime(),
- 'user_name': u, 'description': d}
- if el:
- try:
- e=loads(read(el))
- d.update(e)
- except: pass
- if filter is None or filter(d):
- if i >= first: append(d)
- i=i+1
-
- return r
- finally: self._lock_release()
+ return tindex
+
def versionEmpty(self, version):
if not version: