[Checkins] SVN: zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py - Ignore duplicate data records (caused by an old blob bug).
Jim Fulton
jim at zope.com
Thu Dec 17 14:06:31 EST 2009
Log message for revision 106705:
- Ignore duplicate data records (caused by an old blob bug).
- Handle deleted records in recover.
2 changes to speed writing:
- Use an in-memory transaction log when practical. (The log is used to
save up data until tpc_vote to minimize the amount of time we hold
the bdb locks.)
- Combine the transactions and transaction_oids databases to
reduce the number of database records and tables to reduce the
number of database pages that have to be updated.
Changed:
U zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py
-=-
Modified: zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py
===================================================================
--- zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py 2009-12-17 16:15:52 UTC (rev 106704)
+++ zc.bsddbstorage/branches/dev/src/zc/bsddbstorage/__init__.py 2009-12-17 19:06:31 UTC (rev 106705)
@@ -15,9 +15,10 @@
from bsddb3 import db
from ZODB.utils import p64, u64, z64
import cPickle
+import cStringIO
import logging
-import marshal
import os
+import struct
import tempfile
import threading
import time
@@ -33,9 +34,6 @@
def n64(tid):
return p64(868082074056920076L-u64(tid))
-# XXX Still need checkpoint strategy.
-# Maybe initial config file when creating env.
-
def retry_on_deadlock(f):
def func(*args, **kw):
@@ -132,16 +130,7 @@
self._len_lock = threading.Lock()
- # transaction_oids: {tid->[oids]}
- self.transaction_oids = db.DB(self.env)
- self.transaction_oids.set_flags(db.DB_DUPSORT)
- self.transaction_oids.open('transaction_oids', dbtype=db.DB_BTREE,
- flags=(db.DB_CREATE | db.DB_THREAD |
- db.DB_AUTO_COMMIT |
- db.DB_MULTIVERSION),
- )
-
- # transactions: {tid ->status+transaction_pickle}
+ # transactions: {tid ->pickle((status,ext,oids))}
self.transactions = db.DB(self.env)
self.transactions.open('transactions', dbtype=db.DB_BTREE,
flags=(db.DB_CREATE | db.DB_THREAD |
@@ -198,7 +187,6 @@
self.finish_packing()
self.finish_checkpointing()
self.data.close()
- self.transaction_oids.close()
self.transactions.close()
self.env.close()
if not self._read_only:
@@ -216,7 +204,7 @@
def _history_entry(self, record, txn):
tid = n64(record[:8])
- transaction = cPickle.loads(self.transactions.get(tid, txn=txn)[1:])
+ transaction = cPickle.loads(self.transactions.get(tid, txn=txn))[1]
transaction.update(
size = len(record)-8,
tid = tid,
@@ -259,10 +247,10 @@
while 1:
if not kv:
return
- tid, ext = kv
+ tid, info = kv
if tid > stop:
return
- yield Records(self, tid, ext[0], ext[1:])
+ yield Records(self, tid, *cPickle.loads(info))
kv = transactions.get(tid, flags=db.DB_NEXT)
n += 1
if n >= 1000:
@@ -386,32 +374,19 @@
return None
with self.cursor(self.transactions, txn) as transactions:
- kv = transactions.get(
- tid, flags=db.DB_SET_RANGE, doff=0, dlen=0)
+ kv = transactions.get(tid, flags=db.DB_SET_RANGE)
if kv is None:
return None
tid = kv[0]
if tid > pack_tid:
return None
- # Set the status flag to indicate that the transaction
- # was packed.
- transactions.put(tid, 'p', db.DB_CURRENT, doff=0, dlen=1)
+ ntid = n64(tid)
- ntid = n64(tid)
+ ext, oids = cPickle.loads(kv[1])[1:]
+ new_oids = []
+ for oid in oids:
- with self.cursor(self.transaction_oids, txn) as transaction_oids:
- # Find the smallest tid >= the one we picked
- ttid, oid = transaction_oids.get(tid, flags=db.DB_SET_RANGE)
- assert ttid == tid
-
- # Iterate over the oids for this tid and pack each one.
- # Note that we treat the tid we're looking at as the
- # pack time. That is, as we look at each transaction,
- # we pack to that time. This way, we can pack
- # *very* incrementally.
- while 1:
-
# Find the first record for the oid whos tid is <=
# the pack time. (we use negative tids, so >=)
# This is the current record as of the pack time
@@ -420,7 +395,8 @@
if not kr:
kr = data.get(oid, flags=db.DB_SET)
if kr[1][:8] < ntid:
- # the one record found is after the pack time
+ # the one record found is after
+ # the pack time
continue
doid, record = kr
@@ -436,6 +412,7 @@
removed_oids += 1
else:
deleted_oid = False
+ new_oids.append(oid)
# OK, we have the current record as of the tid,
# we can remove later ones
@@ -463,12 +440,13 @@
# maybe transactions
self._pack_remove_oid_tid(dtid, oid, txn)
- # continue iterating over the oids for this tid
- kv = transaction_oids.get(tid, flags=db.DB_NEXT_DUP)
- if kv is None:
- break
- assert kv[0] == tid
- oid = kv[1]
+ if new_oids:
+ # Update the status flag and oids.
+ transactions.put(
+ tid, cPickle.dumps(('p', ext, new_oids)), db.DB_CURRENT)
+ else:
+ # transaction is empty. Delete it
+ transactions.delete()
self.misc.put('pack', tid, txn=txn)
@@ -481,21 +459,10 @@
return tid
def _pack_remove_oid_tid(self, tid, oid, txn):
- with self.cursor(self.transaction_oids, txn) as transaction_oids:
- kr = transaction_oids.get(tid, oid, flags=db.DB_GET_BOTH_RANGE)
- if not kr:
- kr = transaction_oids.get(tid, flags=db.DB_SET)
- ttid, toid = kr
- if toid != oid or ttid != tid:
- raise AssertionError("Bad oid+tid lookup",
- oid, tid, toid, ttid)
- transaction_oids.delete()
- # OK, we deleted the record. Maybe it was the last one. Try to get
- # the first, and, if we can't, then delete the transaction record.
- kv = transaction_oids.get(tid, flags=db.DB_SET)
- if kv is None:
- # OK, no more oids for this tid, remive it from transactions
- self.transactions.delete(tid, txn)
+ ext, oids = cPickle.loads(
+ self.transactions.get(tid, txn=txn, flags=db.DB_RMW))[1:]
+ oids.remove(oid)
+ self.transactions.put(tid, cPickle.dumps(('p', ext, oids)), txn)
def _remove_blob_files_tagged_for_removal_during_pack(self, removed):
for oid in removed:
@@ -562,7 +529,7 @@
data = rdata
result = ZODB.ConflictResolution.ResolvedSerial
- marshal.dump((oid, n64(self._tid)+data), self._log_file)
+ self._log(oid, n64(self._tid)+data)
return result
def restore(self, oid, serial, data, version, prev_txn, transaction):
@@ -570,7 +537,7 @@
if transaction is not self._transaction:
raise ZODB.POSException.StorageTransactionError(self, transaction)
- marshal.dump((oid, n64(serial)+data), self._log_file)
+ self._log(oid, n64(serial)+(data or ''))
def deleteObject(self, oid, oldserial, transaction):
if transaction is not self._transaction:
@@ -580,7 +547,7 @@
raise ZODB.POSException.ConflictError(
oid=oid, serials=(n64(committed_tid), oldserial))
- marshal.dump((oid, n64(self._tid)), self._log_file)
+ self._log(oid, n64(self._tid))
def tpc_begin(self, transaction, tid=None, status=' '):
if self._read_only:
@@ -595,7 +562,6 @@
ext = transaction._extension.copy()
ext['user_name'] = transaction.user
ext['description'] = transaction.description
- ext = status+cPickle.dumps(ext, 1)
if tid is None:
now = time.time()
@@ -607,16 +573,13 @@
self._ts = ZODB.TimeStamp.TimeStamp(tid)
self._tid = tid
- fd, self._log_path = tempfile.mkstemp('bsddb')
- self._log_file = open(self._log_path, 'r+b')
- os.close(fd)
- marshal.dump((tid, ext), self._log_file)
+ self._log = ObjectLog()
+ self._log(tid, status, ext)
self._new_obs = 0
def _tpc_cleanup(self):
self._transaction = self._txn = None
- self._log_file.close()
- os.remove(self._log_path)
+ self._log.close()
self._commit_lock.release()
def tpc_abort(self, transaction):
@@ -641,13 +604,21 @@
_transaction_id_suffix = 'x' * (db.DB_GID_SIZE - 8)
def tpc_vote(self, transaction):
+ log = iter(self._log)
+ tid, status, ext = log.next()
+ oids = []
self._txn = txn = self.env.txn_begin()
- self._log_file.seek(0)
- tid, ext = marshal.load(self._log_file)
- self.transactions.put(tid, ext, txn=txn)
- for oid, record in marshal_iterate(self._log_file):
- self.data.put(oid, record, txn=txn)
- self.transaction_oids.put(tid, oid, txn=txn)
+ for oid, record in log:
+ try:
+ self.data.put(oid, record, txn=txn)
+ except db.DBKeyExistError:
+ # If the entire records are dups, we
+ # don't want to write them again. That
+ # would be silly.
+ pass
+ else:
+ oids.append(oid)
+ self.transactions.put(tid, cPickle.dumps((status, ext, oids)), txn=txn)
txn.prepare(self._tid+self._transaction_id_suffix)
##############################################################
@@ -676,13 +647,6 @@
Storage = BSDDBStorage # easier to type alias :)
-def marshal_iterate(f):
- while 1:
- try:
- yield marshal.load(f)
- except EOFError:
- break
-
class TransactionContext(object):
def __init__(self, txn):
@@ -730,14 +694,14 @@
class Records(object):
- def __init__(self, storage, tid, status, ext):
+ def __init__(self, storage, tid, status, ext, oids):
self.storage = storage
self.tid = tid
- ext = cPickle.loads(ext)
self.user = ext.pop('user_name', '')
self.description = ext.pop('description', '')
self.status = status
self.extension = ext
+ self.oids = oids
@apply
def _extension():
@@ -751,24 +715,17 @@
def _iter(self):
tid = self.tid
ntid = n64(tid)
- with self.storage.txn(db.DB_TXN_SNAPSHOT) as txn:
- with self.storage.cursor(self.storage.transaction_oids, txn
- ) as transaction_oids:
- kv = transaction_oids.get(tid, flags=db.DB_SET)
- while kv:
- ttid, oid = kv
- assert ttid == tid
- with self.storage.cursor(self.storage.data, txn
- ) as data:
- kr = data.get(oid, ntid, flags=db.DB_GET_BOTH_RANGE)
- if kr is None:
- kr = data.get(oid, flags=db.DB_SET)
- doid, rec = kr
- assert doid == oid
- dntid = rec[:8]
- assert dntid == ntid
- yield Record(oid, tid, rec[8:])
- kv = transaction_oids.get(tid, flags=db.DB_NEXT_DUP)
+ for oid in self.oids:
+ with self.storage.txn(db.DB_TXN_SNAPSHOT) as txn:
+ with self.storage.cursor(self.storage.data, txn) as data:
+ kr = data.get(oid, ntid, flags=db.DB_GET_BOTH_RANGE)
+ if kr is None:
+ kr = data.get(oid, flags=db.DB_SET)
+ doid, rec = kr
+ assert doid == oid
+ dntid = rec[:8]
+ assert dntid == ntid, (tid, ntid, dntid)
+ yield Record(oid, tid, rec[8:])
class Record:
def __init__(self, oid, tid, data):
@@ -898,3 +855,40 @@
thread.join(*args)
return join
+
+
+class ObjectLog:
+ # Log of pickleable object data.
+ # In memory if possible
+
+ max_mem = 1<<20 # Most transactions are a few K or less
+ in_memory = True
+
+ def __init__(self):
+ self._file = cStringIO.StringIO()
+ self.close = self._file.close
+ self._size = 0
+
+ def __call__(self, *data):
+ data = cPickle.dumps(data, 1)
+ ldata = len(data)
+ size = self._size + len(data) + 4
+ if self.in_memory and size > self.max_mem:
+ newfile = tempfile.TemporaryFile()
+ self._file.seek(0)
+ newfile.write(self._file.read())
+ self._file = newfile
+ self.close = self._file.close
+ self.in_memory = False
+ self._file.write(struct.pack(">I", ldata))
+ self._file.write(data)
+
+ def __iter__(self):
+ file = self._file
+ file.seek(0)
+ while 1:
+ l = file.read(4)
+ if not l:
+ break
+ l = struct.unpack(">I", l)[0]
+ yield cPickle.loads(file.read(l))
More information about the checkins
mailing list