[Checkins] SVN: relstorage/branches/1.4.0-fastimport/ Initial changes to add a `--single-transaction` parameter to zodbconvert.
David Blewett
davidb at sixfeetup.com
Mon Jul 19 14:52:13 EDT 2010
Log message for revision 114854:
Initial changes to add a `--single-transaction` parameter to zodbconvert.
Changed:
U relstorage/branches/1.4.0-fastimport/README.txt
U relstorage/branches/1.4.0-fastimport/relstorage/storage.py
U relstorage/branches/1.4.0-fastimport/relstorage/zodbconvert.py
-=-
Modified: relstorage/branches/1.4.0-fastimport/README.txt
===================================================================
--- relstorage/branches/1.4.0-fastimport/README.txt 2010-07-19 18:20:38 UTC (rev 114853)
+++ relstorage/branches/1.4.0-fastimport/README.txt 2010-07-19 18:52:13 UTC (rev 114854)
@@ -302,7 +302,12 @@
Opens both storages and analyzes what would be copied, but does not
actually copy.
+ ``--single-transaction``
+ Import into the destination in a single transaction, instead of one
+ transaction per transaction in the source. This option can
+ significantly speed up conversion times on PostgreSQL.
+
Migrating to a new version of RelStorage
----------------------------------------
Modified: relstorage/branches/1.4.0-fastimport/relstorage/storage.py
===================================================================
--- relstorage/branches/1.4.0-fastimport/relstorage/storage.py 2010-07-19 18:20:38 UTC (rev 114853)
+++ relstorage/branches/1.4.0-fastimport/relstorage/storage.py 2010-07-19 18:52:13 UTC (rev 114854)
@@ -38,6 +38,7 @@
import tempfile
import threading
import time
+import transaction
import weakref
import ZODB.interfaces
@@ -79,8 +80,10 @@
"""Storage to a relational database, based on invalidation polling"""
implements(*_relstorage_interfaces)
- _transaction=None # Transaction that is being committed
- _tstatus=' ' # Transaction status, used for copying data
+ _transaction=None # Transaction that is being committed
+ _transactions=None # List of pending sub-transactions when running
+ # a single-transaction import
+ _tstatus=' ' # Transaction status, used for copying data
_is_read_only = False
# load_conn and load_cursor are open most of the time.
@@ -578,14 +581,20 @@
self._lock_release()
- def restore(self, oid, serial, data, version, prev_txn, transaction):
+ def restore(self, oid, serial, data, version, prev_txn, transaction,
+ transaction_position=None):
# Like store(), but used for importing transactions. See the
# comments in FileStorage.restore(). The prev_txn optimization
# is not used.
if self._is_read_only:
raise POSException.ReadOnlyError()
- if transaction is not self._transaction:
- raise POSException.StorageTransactionError(self, transaction)
+ if self._transactions:
+ if transaction_position is None or \
+ transaction.tid != self._transactions[transaction_position]:
+ raise POSException.StorageTransactionError(self, transaction)
+ else:
+ if transaction is not self._transaction:
+ raise POSException.StorageTransactionError(self, transaction)
if version:
raise POSException.Unsupported("Versions aren't supported")
@@ -624,14 +633,6 @@
self._clear_temp()
self._transaction = transaction
- user = str(transaction.user)
- desc = str(transaction.description)
- ext = transaction._extension
- if ext:
- ext = cPickle.dumps(ext, 1)
- else:
- ext = ""
- self._ude = user, desc, ext
self._tstatus = status
self._restart_store()
@@ -642,19 +643,11 @@
if tid is not None:
# hold the commit lock and add the transaction now
- cursor = self._store_cursor
- packed = (status == 'p')
- adapter.locker.hold_commit_lock(cursor, ensure_current=True)
- tid_int = u64(tid)
- try:
- adapter.txncontrol.add_transaction(
- cursor, tid_int, user, desc, ext, packed)
- except:
- self._drop_store_connection()
- raise
+ self._ude = self._add_transaction(transaction)
+ else:
+ self._ude = self._get_ude(transaction)
# else choose the tid later
self._tid = tid
-
finally:
self._lock_release()
@@ -1361,10 +1354,63 @@
ZODB.blob.remove_committed(old_filename)
self._txn_blobs[oid] = filename
- def copyTransactionsFrom(self, other):
+ def _get_ude(self, txn):
+ user = str(txn.user)
+ desc = str(txn.description)
+ ext = txn._extension
+ if ext:
+ ext = cPickle.dumps(ext, 1)
+ else:
+ ext = ""
+ return user, desc, ext
+
+ def _add_transaction(self, txn, hold_lock=True):
+ """Function to add a transaction. Assumes locks have already been taken.
+ This function is used both by tpc_begin and when restoring multiple source
+ transactions in a single destination transaction. When called in the latter
+ context, we want to avoid any modifications to external variables, etc.
+ """
+ user, desc, ext = self._get_ude(txn)
+ if not user:
+ user = '""'
+ if not desc:
+ desc = '""'
+ adapter = self._adapter
+ cursor = self._store_cursor
+ packed = (txn.status == 'p')
+ if hold_lock:
+ adapter.locker.hold_commit_lock(cursor, ensure_current=True)
+ tid_int = u64(txn.tid)
+ try:
+ adapter.txncontrol.add_transaction(
+ cursor, tid_int, user, desc, ext, packed)
+ except:
+ if hold_lock:
+ self._drop_store_connection()
+ raise
+ return user, desc, ext
+
+ def copyTransactionsFrom(self, other, single_transaction=False):
+ begin_time = time.time()
+ if single_transaction:
+ master_txn = transaction.Transaction()
+ master_txn.tid = p64(1)
+ master_txn.description = 'zodbconvert run on: %s' % time.strftime('%Y-%m-%d.%H:%M:%S')
+ self.tpc_begin(master_txn, master_txn.tid)
+ txnum = 0
+ tx_size = 0
+ self._transactions = []
+ for trans in other.iterator():
+ self._transactions.append(trans.tid)
+ if single_transaction:
+ self._add_transaction(trans, hold_lock=False)
+
+ num_txns = len(self._transactions)
# adapted from ZODB.blob.BlobStorageMixin
for trans in other.iterator():
- self.tpc_begin(trans, trans.tid, trans.status)
+ if not single_transaction:
+ self.tpc_begin(trans, trans.tid, trans.status)
+ num_txn_records = 0
for record in trans:
blobfilename = None
if self.fshelper is not None:
@@ -1383,11 +1429,33 @@
name, record.data_txn, trans)
else:
self.restore(record.oid, record.tid, record.data,
- '', record.data_txn, trans)
+ '', record.data_txn, trans, txnum)
+ num_txn_records += 1
+ tx_size += len(record.data)
+ txnum += 1
+ tx_end = time.time()
+ pct_complete = (txnum/float(num_txns))*100
+ if pct_complete < 10:
+ pct_complete = ' %1.2f%%' % pct_complete
+ elif pct_complete < 100:
+ pct_complete = ' %1.2f%%' % pct_complete
+ rate = (tx_size/float(1024*1024)) / (tx_end - begin_time)
+ #if single_transaction:
+ # self._batcher.flush()
+ #else:
+ if not single_transaction:
+ self.tpc_vote(trans)
+ self.tpc_finish(trans)
+ #write("Restored tid %d,%5d records | %1.3fmB/s (%6d/%6d, %.2f%%)\n" %
+ # (u64(trans.tid), num_txn_records, rate, txnum, num_txns, pct_complete))
+ log.info("Restored tid %d,%5d records | %1.3f MB/s (%6d/%6d,%s)",
+ u64(trans.tid), num_txn_records, rate, txnum, num_txns, pct_complete)
+ if single_transaction:
+ self.tpc_vote(master_txn)
+ self.tpc_finish(master_txn)
+ self._transactions = None
+ return txnum, tx_size, tx_end - begin_time
- self.tpc_vote(trans)
- self.tpc_finish(trans)
-
# The propagate_invalidations flag implements the old
# invalidation polling API and is not otherwise used. Set to a
# false value, it tells the Connection not to propagate object
Modified: relstorage/branches/1.4.0-fastimport/relstorage/zodbconvert.py
===================================================================
--- relstorage/branches/1.4.0-fastimport/relstorage/zodbconvert.py 2010-07-19 18:20:38 UTC (rev 114853)
+++ relstorage/branches/1.4.0-fastimport/relstorage/zodbconvert.py 2010-07-19 18:52:13 UTC (rev 114854)
@@ -17,8 +17,10 @@
See README.txt for details.
"""
+import logging
import optparse
from persistent.TimeStamp import TimeStamp
+from relstorage.storage import RelStorage
from StringIO import StringIO
import sys
import ZConfig
@@ -35,6 +37,8 @@
</schema>
"""
+log = logging.getLogger("relstorage.zodbconvert")
+logging.basicConfig(level=logging.INFO)
def storage_has_data(storage):
i = storage.iterator()
@@ -59,29 +63,41 @@
parser.add_option(
"--clear", dest="clear", action="store_true",
help="Clear the contents of the destination storage before copying")
- parser.set_defaults(dry_run=False, clear=False)
+ parser.add_option(
+ "--single-transaction", dest="single_transaction", action="store_true",
+ help="Convert the source into the destination in a single transaction")
+ parser.add_option(
+ "--batch-size", dest="batch_size", type="int", action="store",
+ help="Batch size to use when converting")
+ parser.set_defaults(dry_run=False, clear=False, single_transaction=False, batch_size=250)
options, args = parser.parse_args(argv[1:])
if len(args) != 1:
parser.error("The name of one configuration file is required.")
+
schema = ZConfig.loadSchemaFile(StringIO(schema_xml))
config, handler = ZConfig.loadConfig(schema, args[0])
source = config.source.open()
destination = config.destination.open()
+ destination._batcher_row_limit = options.batch_size
- write("Storages opened successfully.\n")
+ #write("Storages opened successfully.\n")
+ log.info("Storages opened successfully.")
if options.dry_run:
- write("Dry run mode: not changing the destination.\n")
+ #write("Dry run mode: not changing the destination.\n")
+ log.info("Dry run mode: not changing the destination.")
if storage_has_data(destination):
- write("Warning: the destination storage has data\n")
+ #write("Warning: the destination storage has data\n")
+ log.warning("The destination storage has data.")
count = 0
for txn in source.iterator():
write('%s user=%s description=%s\n' % (
TimeStamp(txn.tid), txn.user, txn.description))
count += 1
- write("Would copy %d transactions.\n" % count)
+ #write("Would copy %d transactions.\n" % count)
+ log.info("Would copy %d transactions.", count)
else:
if options.clear:
@@ -96,12 +112,25 @@
msg = "Error: the destination storage has data. Try --clear."
sys.exit(msg)
- destination.copyTransactionsFrom(source)
+ copy_args = [source]
+ if issubclass(destination.__class__, RelStorage):
+ copy_args.append(options.single_transaction)
+ num_txns, size, elapsed = destination.copyTransactionsFrom(*copy_args)
- source.close()
+ try:
+ source.close()
+ except IOError:
+ #We don't mind if the source throws errors like:
+ #ERROR:ZODB.FileStorage:Error saving index on close()
+ #IOError: [Errno 13] Permission denied: '/path/to/db'
+ pass
destination.close()
- write('All transactions copied successfully.\n')
+ rate = (size/float(1024*1024)) / elapsed
+ #write('All %d transactions copied successfully in %4.1f minutes at %1.3fmB/s.\n' %
+ # (num_txns, elapsed/60, rate))
+ log.info('All %d transactions copied successfully in %4.1f minutes at %1.3fmB/s.',
+ num_txns, elapsed/60, rate)
if __name__ == '__main__':
More information about the checkins
mailing list