[Checkins] SVN: relstorage/trunk/ Divided the implementation of database adapters into many small
Shane Hathaway
shane at hathawaymix.org
Thu Sep 24 14:06:38 EDT 2009
Log message for revision 104499:
Divided the implementation of database adapters into many small
objects, making the adapter code more modular. Added interfaces
that describe the duties of each part.
(Without the new modularity, adding history-free storage would be a
brain-busting combinatorial mess.)
Changed:
U relstorage/trunk/CHANGES.txt
U relstorage/trunk/relstorage/adapters/interfaces.py
U relstorage/trunk/relstorage/adapters/mysql.py
U relstorage/trunk/relstorage/adapters/oracle.py
U relstorage/trunk/relstorage/adapters/postgresql.py
U relstorage/trunk/relstorage/adapters/txncontrol.py
U relstorage/trunk/relstorage/config.py
U relstorage/trunk/relstorage/relstorage.py
U relstorage/trunk/relstorage/tests/reltestbase.py
U relstorage/trunk/relstorage/tests/speedtest.py
-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt 2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/CHANGES.txt 2009-09-24 18:06:37 UTC (rev 104499)
@@ -2,6 +2,12 @@
Unreleased
----------
+- Divided the implementation of database adapters into many small
+ objects, making the adapter code more modular. Added interfaces
+ that describe the duties of each part.
+
+- Oracle: sped up restore operations by sending short blobs inline
+
- PostgreSQL: use the documented ALTER SEQUENCE RESTART WITH
statement instead of ALTER SEQUENCE START WITH.
Modified: relstorage/trunk/relstorage/adapters/interfaces.py
===================================================================
--- relstorage/trunk/relstorage/adapters/interfaces.py 2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/adapters/interfaces.py 2009-09-24 18:06:37 UTC (rev 104499)
@@ -16,7 +16,25 @@
from zope.interface import Attribute
from zope.interface import Interface
+class IRelStorageAdapter(Interface):
+ """A database adapter for RelStorage"""
+
+ connmanager = Attribute("An IConnectionManager")
+ dbiter = Attribute("An IDatabaseIterator")
+ keep_history = Attribute("True if this adapter supports undo")
+ locker = Attribute("An ILocker")
+ mover = Attribute("An IObjectMover")
+ oidallocator = Attribute("An IOIDAllocator")
+ packundo = Attribute("An IPackUndo")
+ poller = Attribute("An IPoller")
+ runner = Attribute("An IScriptRunner")
+ schema = Attribute("An ISchemaInstaller")
+ stats = Attribute("An IStats")
+ txncontrol = Attribute("An ITransactionControl")
+
+
class IConnectionManager(Interface):
+ """Open and close database connections"""
def open():
"""Open a database connection and return (conn, cursor)."""
@@ -43,6 +61,8 @@
def restart_load(conn, cursor):
"""Reinitialize a connection for loading objects.
+ This gets called when polling the database, so it needs to be quick.
+
Raise StorageError if the database has disconnected.
"""
@@ -60,6 +80,7 @@
class IDatabaseIterator(Interface):
+ """Iterate over the available data in the database"""
def iter_objects(cursor, tid):
"""Iterate over object states in a transaction.
@@ -92,6 +113,7 @@
class ILocker(Interface):
+ """Acquire and release the commit and pack locks."""
def hold_commit_lock(cursor, ensure_current=False):
"""Acquire the commit lock.
@@ -117,7 +139,7 @@
class IObjectMover(Interface):
- """Moves object states to/from the database and within the database"""
+ """Move object states to/from the database and within the database."""
def get_current_tid(cursor, oid):
"""Returns the current integer tid for an object.
@@ -195,15 +217,17 @@
class IOIDAllocator(Interface):
+ """Allocate OIDs and control future allocation"""
+ def new_oid(cursor):
+ """Return a new, unused OID."""
+
def set_min_oid(cursor, oid):
"""Ensure the next OID is at least the given OID."""
- def new_oid(cursor):
- """Return a new, unused OID."""
-
class IPackUndo(Interface):
+ """Perform pack and undo operations"""
def verify_undoable(cursor, undo_tid):
"""Raise UndoError if it is not safe to undo the specified txn.
@@ -264,6 +288,7 @@
class IPoller(Interface):
+ """Poll for new data"""
def poll_invalidations(conn, cursor, prev_polled_tid, ignore_tid):
"""Polls for new transactions.
@@ -279,6 +304,7 @@
class ISchemaInstaller(Interface):
+ """Install the schema in the database, clear it, or uninstall it"""
def create(cursor):
"""Create the database tables, sequences, etc."""
@@ -294,7 +320,13 @@
class IScriptRunner(Interface):
+ """Run database-agnostic SQL scripts.
+ Using an IScriptRunner is appropriate for batch operations and
+ uncommon operations that can be slow, but is not appropriate
+ for performance-critical code.
+ """
+
script_vars = Attribute(
"""A mapping providing replacements for parts of scripts.
@@ -328,9 +360,23 @@
stmt should use '%s' parameter format (not %(name)s).
"""
+ # Note: the Oracle implementation also provides run_lob_stmt, which
+ # is useful for reading LOBs from the database quickly.
+
class ITransactionControl(Interface):
+ """Begin, commit, and abort transactions."""
+ def get_tid_and_time(cursor):
+ """Returns the most recent tid and the current database time.
+
+ The database time is the number of seconds since the epoch.
+ """
+
+ def add_transaction(cursor, tid, username, description, extension,
+ packed=False):
+ """Add a transaction."""
+
def commit_phase1(conn, cursor, tid):
"""Begin a commit. Returns the transaction name.
@@ -350,13 +396,3 @@
def abort(conn, cursor, txn=None):
"""Abort the commit. If txn is not None, phase 1 is also aborted."""
- def get_tid_and_time(cursor):
- """Returns the most recent tid and the current database time.
-
- The database time is the number of seconds since the epoch.
- """
-
- def add_transaction(cursor, tid, username, description, extension,
- packed=False):
- """Add a transaction."""
-
Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py 2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/adapters/mysql.py 2009-09-24 18:06:37 UTC (rev 104499)
@@ -50,10 +50,12 @@
import logging
import MySQLdb
+from zope.interface import implements
from relstorage.adapters.connmanager import AbstractConnectionManager
from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
from relstorage.adapters.hpmover import HistoryPreservingObjectMover
+from relstorage.adapters.interfaces import IRelStorageAdapter
from relstorage.adapters.locker import MySQLLocker
from relstorage.adapters.oidallocator import MySQLOIDAllocator
from relstorage.adapters.packundo import HistoryPreservingPackUndo
@@ -76,6 +78,7 @@
class MySQLAdapter(object):
"""MySQL adapter for RelStorage."""
+ implements(IRelStorageAdapter)
keep_history = True
@@ -115,64 +118,7 @@
connmanager=self.connmanager,
)
- self.open = self.connmanager.open
- self.close = self.connmanager.close
- self.open_for_load = self.connmanager.open_for_load
- self.restart_load = self.connmanager.restart_load
- self.open_for_store = self.connmanager.open_for_store
- self.restart_store = self.connmanager.restart_store
- self.hold_commit_lock = self.locker.hold_commit_lock
- self.release_commit_lock = self.locker.release_commit_lock
- self.hold_pack_lock = self.locker.hold_pack_lock
- self.release_pack_lock = self.locker.release_pack_lock
-
- self.create_schema = self.schema.create
- self.prepare_schema = self.schema.prepare
- self.zap_all = self.schema.zap_all
- self.drop_all = self.schema.drop_all
-
- self.get_current_tid = self.mover.get_current_tid
- self.load_current = self.mover.load_current
- self.load_revision = self.mover.load_revision
- self.exists = self.mover.exists
- self.load_before = self.mover.load_before
- self.get_object_tid_after = self.mover.get_object_tid_after
- self.store_temp = self.mover.store_temp
- self.replace_temp = self.mover.replace_temp
- self.restore = self.mover.restore
- self.detect_conflict = self.mover.detect_conflict
- self.move_from_temp = self.mover.move_from_temp
- self.update_current = self.mover.update_current
-
- self.set_min_oid = self.oidallocator.set_min_oid
- self.new_oid = self.oidallocator.new_oid
-
- self.get_tid_and_time = self.txncontrol.get_tid_and_time
- self.add_transaction = self.txncontrol.add_transaction
- self.commit_phase1 = self.txncontrol.commit_phase1
- self.commit_phase2 = self.txncontrol.commit_phase2
- self.abort = self.txncontrol.abort
-
- self.poll_invalidations = self.poller.poll_invalidations
-
- self.fill_object_refs = self.packundo.fill_object_refs
- self.open_for_pre_pack = self.packundo.open_for_pre_pack
- self.choose_pack_transaction = self.packundo.choose_pack_transaction
- self.pre_pack = self.packundo.pre_pack
- self.pack = self.packundo.pack
- self.verify_undoable = self.packundo.verify_undoable
- self.undo = self.packundo.undo
-
- self.iter_objects = self.dbiter.iter_objects
- self.iter_transactions = self.dbiter.iter_transactions
- self.iter_transactions_range = self.dbiter.iter_transactions_range
- self.iter_object_history = self.dbiter.iter_object_history
-
- self.get_object_count = self.stats.get_object_count
- self.get_db_size = self.stats.get_db_size
-
-
class MySQLdbConnectionManager(AbstractConnectionManager):
isolation_read_committed = "ISOLATION LEVEL READ COMMITTED"
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2009-09-24 18:06:37 UTC (rev 104499)
@@ -16,10 +16,12 @@
import logging
import cx_Oracle
from ZODB.POSException import StorageError
+from zope.interface import implements
from relstorage.adapters.connmanager import AbstractConnectionManager
from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
from relstorage.adapters.hpmover import HistoryPreservingObjectMover
+from relstorage.adapters.interfaces import IRelStorageAdapter
from relstorage.adapters.locker import OracleLocker
from relstorage.adapters.oidallocator import OracleOIDAllocator
from relstorage.adapters.packundo import OracleHistoryPreservingPackUndo
@@ -45,6 +47,7 @@
class OracleAdapter(object):
"""Oracle adapter for RelStorage."""
+ implements(IRelStorageAdapter)
keep_history = True
@@ -94,6 +97,7 @@
)
self.txncontrol = OracleTransactionControl(
Binary=cx_Oracle.Binary,
+ twophase=bool(twophase),
)
self.poller = Poller(
poll_query="SELECT MAX(tid) FROM transaction",
@@ -112,64 +116,7 @@
connmanager=self.connmanager,
)
- self.open = self.connmanager.open
- self.close = self.connmanager.close
- self.open_for_load = self.connmanager.open_for_load
- self.restart_load = self.connmanager.restart_load
- self.open_for_store = self.connmanager.open_for_store
- self.restart_store = self.connmanager.restart_store
- self.hold_commit_lock = self.locker.hold_commit_lock
- self.release_commit_lock = self.locker.release_commit_lock
- self.hold_pack_lock = self.locker.hold_pack_lock
- self.release_pack_lock = self.locker.release_pack_lock
-
- self.create_schema = self.schema.create
- self.prepare_schema = self.schema.prepare
- self.zap_all = self.schema.zap_all
- self.drop_all = self.schema.drop_all
-
- self.get_current_tid = self.mover.get_current_tid
- self.load_current = self.mover.load_current
- self.load_revision = self.mover.load_revision
- self.exists = self.mover.exists
- self.load_before = self.mover.load_before
- self.get_object_tid_after = self.mover.get_object_tid_after
- self.store_temp = self.mover.store_temp
- self.replace_temp = self.mover.replace_temp
- self.restore = self.mover.restore
- self.detect_conflict = self.mover.detect_conflict
- self.move_from_temp = self.mover.move_from_temp
- self.update_current = self.mover.update_current
-
- self.set_min_oid = self.oidallocator.set_min_oid
- self.new_oid = self.oidallocator.new_oid
-
- self.get_tid_and_time = self.txncontrol.get_tid_and_time
- self.add_transaction = self.txncontrol.add_transaction
- self.commit_phase1 = self.txncontrol.commit_phase1
- self.commit_phase2 = self.txncontrol.commit_phase2
- self.abort = self.txncontrol.abort
-
- self.poll_invalidations = self.poller.poll_invalidations
-
- self.fill_object_refs = self.packundo.fill_object_refs
- self.open_for_pre_pack = self.packundo.open_for_pre_pack
- self.choose_pack_transaction = self.packundo.choose_pack_transaction
- self.pre_pack = self.packundo.pre_pack
- self.pack = self.packundo.pack
- self.verify_undoable = self.packundo.verify_undoable
- self.undo = self.packundo.undo
-
- self.iter_objects = self.dbiter.iter_objects
- self.iter_transactions = self.dbiter.iter_transactions
- self.iter_transactions_range = self.dbiter.iter_transactions_range
- self.iter_object_history = self.dbiter.iter_object_history
-
- self.get_object_count = self.stats.get_object_count
- self.get_db_size = self.stats.get_db_size
-
-
class CXOracleScriptRunner(OracleScriptRunner):
def __init__(self, use_inline_lobs):
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2009-09-24 18:06:37 UTC (rev 104499)
@@ -16,10 +16,12 @@
import logging
import psycopg2
import psycopg2.extensions
+from zope.interface import implements
from relstorage.adapters.connmanager import AbstractConnectionManager
from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
from relstorage.adapters.hpmover import HistoryPreservingObjectMover
+from relstorage.adapters.interfaces import IRelStorageAdapter
from relstorage.adapters.locker import PostgreSQLLocker
from relstorage.adapters.oidallocator import PostgreSQLOIDAllocator
from relstorage.adapters.packundo import HistoryPreservingPackUndo
@@ -44,6 +46,7 @@
class PostgreSQLAdapter(object):
"""PostgreSQL adapter for RelStorage."""
+ implements(IRelStorageAdapter)
keep_history = True
@@ -81,64 +84,7 @@
connmanager=self.connmanager,
)
- self.open = self.connmanager.open
- self.close = self.connmanager.close
- self.open_for_load = self.connmanager.open_for_load
- self.restart_load = self.connmanager.restart_load
- self.open_for_store = self.connmanager.open_for_store
- self.restart_store = self.connmanager.restart_store
- self.hold_commit_lock = self.locker.hold_commit_lock
- self.release_commit_lock = self.locker.release_commit_lock
- self.hold_pack_lock = self.locker.hold_pack_lock
- self.release_pack_lock = self.locker.release_pack_lock
-
- self.create_schema = self.schema.create
- self.prepare_schema = self.schema.prepare
- self.zap_all = self.schema.zap_all
- self.drop_all = self.schema.drop_all
-
- self.get_current_tid = self.mover.get_current_tid
- self.load_current = self.mover.load_current
- self.load_revision = self.mover.load_revision
- self.exists = self.mover.exists
- self.load_before = self.mover.load_before
- self.get_object_tid_after = self.mover.get_object_tid_after
- self.store_temp = self.mover.store_temp
- self.replace_temp = self.mover.replace_temp
- self.restore = self.mover.restore
- self.detect_conflict = self.mover.detect_conflict
- self.move_from_temp = self.mover.move_from_temp
- self.update_current = self.mover.update_current
-
- self.set_min_oid = self.oidallocator.set_min_oid
- self.new_oid = self.oidallocator.new_oid
-
- self.get_tid_and_time = self.txncontrol.get_tid_and_time
- self.add_transaction = self.txncontrol.add_transaction
- self.commit_phase1 = self.txncontrol.commit_phase1
- self.commit_phase2 = self.txncontrol.commit_phase2
- self.abort = self.txncontrol.abort
-
- self.poll_invalidations = self.poller.poll_invalidations
-
- self.fill_object_refs = self.packundo.fill_object_refs
- self.open_for_pre_pack = self.packundo.open_for_pre_pack
- self.choose_pack_transaction = self.packundo.choose_pack_transaction
- self.pre_pack = self.packundo.pre_pack
- self.pack = self.packundo.pack
- self.verify_undoable = self.packundo.verify_undoable
- self.undo = self.packundo.undo
-
- self.iter_objects = self.dbiter.iter_objects
- self.iter_transactions = self.dbiter.iter_transactions
- self.iter_transactions_range = self.dbiter.iter_transactions_range
- self.iter_object_history = self.dbiter.iter_object_history
-
- self.get_object_count = self.stats.get_object_count
- self.get_db_size = self.stats.get_db_size
-
-
class Psycopg2ConnectionManager(AbstractConnectionManager):
isolation_read_committed = (
Modified: relstorage/trunk/relstorage/adapters/txncontrol.py
===================================================================
--- relstorage/trunk/relstorage/adapters/txncontrol.py 2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/adapters/txncontrol.py 2009-09-24 18:06:37 UTC (rev 104499)
@@ -125,7 +125,7 @@
class OracleTransactionControl(TransactionControl):
implements(ITransactionControl)
- def __init__(self, Binary, twophase=False):
+ def __init__(self, Binary, twophase):
self.Binary = Binary
self.twophase = twophase
Modified: relstorage/trunk/relstorage/config.py
===================================================================
--- relstorage/trunk/relstorage/config.py 2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/config.py 2009-09-24 18:06:37 UTC (rev 104499)
@@ -22,7 +22,7 @@
"""Open a storage configured via ZConfig"""
def open(self):
config = self.config
- adapter = config.adapter.open()
+ adapter = config.adapter.create()
options = Options()
for key in options.__dict__.keys():
value = getattr(config, key, None)
@@ -33,20 +33,20 @@
class PostgreSQLAdapterFactory(BaseConfig):
- def open(self):
+ def create(self):
from adapters.postgresql import PostgreSQLAdapter
return PostgreSQLAdapter(self.config.dsn)
class OracleAdapterFactory(BaseConfig):
- def open(self):
+ def create(self):
from adapters.oracle import OracleAdapter
config = self.config
return OracleAdapter(config.user, config.password, config.dsn)
class MySQLAdapterFactory(BaseConfig):
- def open(self):
+ def create(self):
from adapters.mysql import MySQLAdapter
options = {}
for key in self.config.getSectionAttributes():
Modified: relstorage/trunk/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py 2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/relstorage.py 2009-09-24 18:06:37 UTC (rev 104499)
@@ -95,7 +95,7 @@
self._cache_client = None
if create:
- self._adapter.prepare_schema()
+ self._adapter.schema.prepare()
# load_conn and load_cursor are open most of the time.
self._load_conn = None
@@ -168,7 +168,7 @@
def _open_load_connection(self):
"""Open the load connection to the database. Return nothing."""
- conn, cursor = self._adapter.open_for_load()
+ conn, cursor = self._adapter.connmanager.open_for_load()
self._drop_load_connection()
self._load_conn, self._load_cursor = conn, cursor
self._load_transaction_open = True
@@ -177,7 +177,7 @@
"""Unconditionally drop the load connection"""
conn, cursor = self._load_conn, self._load_cursor
self._load_conn, self._load_cursor = None, None
- self._adapter.close(conn, cursor)
+ self._adapter.connmanager.close(conn, cursor)
self._load_transaction_open = False
def _rollback_load_connection(self):
@@ -195,7 +195,7 @@
self._open_load_connection()
else:
try:
- self._adapter.restart_load(
+ self._adapter.connmanager.restart_load(
self._load_conn, self._load_cursor)
except POSException.StorageError, e:
log.warning("Reconnecting load_conn: %s", e)
@@ -212,7 +212,7 @@
def _open_store_connection(self):
"""Open the store connection to the database. Return nothing."""
- conn, cursor = self._adapter.open_for_store()
+ conn, cursor = self._adapter.connmanager.open_for_store()
self._drop_store_connection()
self._store_conn, self._store_cursor = conn, cursor
@@ -220,7 +220,7 @@
"""Unconditionally drop the store connection"""
conn, cursor = self._store_conn, self._store_cursor
self._store_conn, self._store_cursor = None, None
- self._adapter.close(conn, cursor)
+ self._adapter.connmanager.close(conn, cursor)
def _restart_store(self):
"""Restart the store connection, creating a new connection if needed"""
@@ -228,7 +228,7 @@
self._open_store_connection()
else:
try:
- self._adapter.restart_store(
+ self._adapter.connmanager.restart_store(
self._store_conn, self._store_cursor)
except POSException.StorageError, e:
log.warning("Reconnecting store_conn: %s", e)
@@ -247,7 +247,7 @@
Used by the test suite and the ZODBConvert script.
"""
- self._adapter.zap_all()
+ self._adapter.schema.zap_all()
self._rollback_load_connection()
cache = self._cache_client
if cache is not None:
@@ -289,11 +289,11 @@
return other
def __len__(self):
- return self._adapter.get_object_count()
+ return self._adapter.stats.get_object_count()
def getSize(self):
"""Return database size in bytes"""
- return self._adapter.get_db_size()
+ return self._adapter.stats.get_db_size()
def _log_keyerror(self, oid_int, reason):
"""Log just before raising KeyError in load().
@@ -305,7 +305,7 @@
adapter = self._adapter
logfunc = log.warning
msg = ["Storage KeyError on oid %d: %s" % (oid_int, reason)]
- rows = adapter.iter_transactions(cursor)
+ rows = adapter.dbiter.iter_transactions(cursor)
row = None
for row in rows:
# just get the first row
@@ -320,7 +320,7 @@
tids = []
try:
- rows = adapter.iter_object_history(cursor, oid_int)
+ rows = adapter.dbiter.iter_object_history(cursor, oid_int)
except KeyError:
# The object has no history, at least from the point of view
# of the current database load connection.
@@ -350,14 +350,15 @@
self._restart_load()
cursor = self._load_cursor
if cache is None:
- state, tid_int = self._adapter.load_current(cursor, oid_int)
+ state, tid_int = self._adapter.mover.load_current(
+ cursor, oid_int)
else:
# get tid_int from the cache or the database
cachekey = self._get_oid_cache_key(oid_int)
if cachekey:
tid_int = cache.get(cachekey)
if not cachekey or not tid_int:
- tid_int = self._adapter.get_current_tid(
+ tid_int = self._adapter.mover.get_current_tid(
cursor, oid_int)
if cachekey and tid_int is not None:
cache.set(cachekey, tid_int)
@@ -369,7 +370,7 @@
cachekey = 'state:%d:%d' % (oid_int, tid_int)
state = cache.get(cachekey)
if not state:
- state = self._adapter.load_revision(
+ state = self._adapter.mover.load_revision(
cursor, oid_int, tid_int)
if state:
state = str(state)
@@ -410,12 +411,12 @@
try:
if not self._load_transaction_open:
self._restart_load()
- state = self._adapter.load_revision(
+ state = self._adapter.mover.load_revision(
self._load_cursor, oid_int, tid_int)
if state is None and self._store_cursor is not None:
# Allow loading data from later transactions
# for conflict resolution.
- state = self._adapter.load_revision(
+ state = self._adapter.mover.load_revision(
self._store_cursor, oid_int, tid_int)
finally:
self._lock_release()
@@ -444,13 +445,13 @@
if not self._load_transaction_open:
self._restart_load()
cursor = self._load_cursor
- if not self._adapter.exists(cursor, u64(oid)):
+ if not self._adapter.mover.exists(cursor, u64(oid)):
raise POSKeyError(oid)
- state, start_tid = self._adapter.load_before(
+ state, start_tid = self._adapter.mover.load_before(
cursor, oid_int, u64(tid))
if start_tid is not None:
- end_int = self._adapter.get_object_tid_after(
+ end_int = self._adapter.mover.get_object_tid_after(
cursor, oid_int, start_tid)
if end_int is not None:
end = p64(end_int)
@@ -491,7 +492,7 @@
try:
self._max_stored_oid = max(self._max_stored_oid, oid_int)
# save the data in a temporary table
- adapter.store_temp(cursor, oid_int, prev_tid_int, data)
+ adapter.mover.store_temp(cursor, oid_int, prev_tid_int, data)
return None
finally:
self._lock_release()
@@ -521,7 +522,7 @@
try:
self._max_stored_oid = max(self._max_stored_oid, oid_int)
# save the data. Note that data can be None.
- adapter.restore(cursor, oid_int, tid_int, data)
+ adapter.mover.restore(cursor, oid_int, tid_int, data)
finally:
self._lock_release()
@@ -556,10 +557,10 @@
# get the commit lock and add the transaction now
cursor = self._store_cursor
packed = (status == 'p')
- adapter.hold_commit_lock(cursor, ensure_current=True)
+ adapter.locker.hold_commit_lock(cursor, ensure_current=True)
tid_int = u64(tid)
try:
- adapter.add_transaction(
+ adapter.txncontrol.add_transaction(
cursor, tid_int, user, desc, ext, packed)
except:
self._drop_store_connection()
@@ -583,20 +584,20 @@
adapter = self._adapter
cursor = self._store_cursor
- adapter.hold_commit_lock(cursor, ensure_current=True)
+ adapter.locker.hold_commit_lock(cursor, ensure_current=True)
user, desc, ext = self._ude
# Choose a transaction ID.
# Base the transaction ID on the database time,
# while ensuring that the tid of this transaction
# is greater than any existing tid.
- last_tid, now = adapter.get_tid_and_time(cursor)
+ last_tid, now = adapter.txncontrol.get_tid_and_time(cursor)
stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
tid = repr(stamp)
tid_int = u64(tid)
- adapter.add_transaction(cursor, tid_int, user, desc, ext)
+ adapter.txncontrol.add_transaction(cursor, tid_int, user, desc, ext)
self._tid = tid
@@ -621,7 +622,7 @@
# Try to resolve the conflicts.
resolved = set() # a set of OIDs
while True:
- conflict = adapter.detect_conflict(cursor)
+ conflict = adapter.mover.detect_conflict(cursor)
if conflict is None:
break
@@ -638,14 +639,14 @@
else:
# resolved
data = rdata
- self._adapter.replace_temp(
+ self._adapter.mover.replace_temp(
cursor, oid_int, prev_tid_int, data)
resolved.add(oid)
# Move the new states into the permanent table
tid_int = u64(self._tid)
serials = []
- oid_ints = adapter.move_from_temp(cursor, tid_int)
+ oid_ints = adapter.mover.move_from_temp(cursor, tid_int)
for oid_int in oid_ints:
oid = p64(oid_int)
if oid in resolved:
@@ -674,14 +675,15 @@
conn = self._store_conn
if self._max_stored_oid > self._max_new_oid:
- self._adapter.set_min_oid(cursor, self._max_stored_oid + 1)
+ self._adapter.oidallocator.set_min_oid(
+ cursor, self._max_stored_oid + 1)
self._prepare_tid()
tid_int = u64(self._tid)
serials = self._finish_store()
- self._adapter.update_current(cursor, tid_int)
- self._prepared_txn = self._adapter.commit_phase1(
+ self._adapter.mover.update_current(cursor, tid_int)
+ self._prepared_txn = self._adapter.txncontrol.commit_phase1(
conn, cursor, tid_int)
if self._txn_blobs:
@@ -722,9 +724,9 @@
self._rollback_load_connection()
txn = self._prepared_txn
assert txn is not None
- self._adapter.commit_phase2(
+ self._adapter.txncontrol.commit_phase2(
self._store_conn, self._store_cursor, txn)
- self._adapter.release_commit_lock(self._store_cursor)
+ self._adapter.locker.release_commit_lock(self._store_cursor)
cache = self._cache_client
if cache is not None:
if cache.incr('commit_count') is None:
@@ -746,9 +748,9 @@
try:
self._rollback_load_connection()
if self._store_cursor is not None:
- self._adapter.abort(
+ self._adapter.txncontrol.abort(
self._store_conn, self._store_cursor, self._prepared_txn)
- self._adapter.release_commit_lock(self._store_cursor)
+ self._adapter.locker.release_commit_lock(self._store_cursor)
if self._txn_blobs:
for oid, filename in self._txn_blobs.iteritems():
if os.path.exists(filename):
@@ -774,7 +776,7 @@
if cursor is None:
self._open_load_connection()
cursor = self._load_cursor
- oid_int = self._adapter.new_oid(cursor)
+ oid_int = self._adapter.oidallocator.new_oid(cursor)
self._max_new_oid = max(self._max_new_oid, oid_int)
return p64(oid_int)
finally:
@@ -801,9 +803,9 @@
# use a private connection to ensure the most current results
adapter = self._adapter
- conn, cursor = adapter.open()
+ conn, cursor = adapter.connmanager.open()
try:
- rows = adapter.iter_transactions(cursor)
+ rows = adapter.dbiter.iter_transactions(cursor)
i = 0
res = []
for tid_int, user, desc, ext in rows:
@@ -823,7 +825,7 @@
return res
finally:
- adapter.close(conn, cursor)
+ adapter.connmanager.close(conn, cursor)
def history(self, oid, version=None, size=1, filter=None):
self._lock_acquire()
@@ -831,7 +833,8 @@
cursor = self._load_cursor
oid_int = u64(oid)
try:
- rows = self._adapter.iter_object_history(cursor, oid_int)
+ rows = self._adapter.dbiter.iter_object_history(
+ cursor, oid_int)
except KeyError:
raise POSKeyError(oid)
@@ -881,30 +884,31 @@
cursor = self._store_cursor
assert cursor is not None
- adapter.hold_pack_lock(cursor)
+ adapter.locker.hold_pack_lock(cursor)
try:
# Note that _prepare_tid acquires the commit lock.
# The commit lock must be acquired after the pack lock
# because the database adapters also acquire in that
# order during packing.
self._prepare_tid()
- adapter.verify_undoable(cursor, undo_tid_int)
+ adapter.packundo.verify_undoable(cursor, undo_tid_int)
self_tid_int = u64(self._tid)
- copied = adapter.undo(cursor, undo_tid_int, self_tid_int)
+ copied = adapter.packundo.undo(
+ cursor, undo_tid_int, self_tid_int)
oids = [p64(oid_int) for oid_int, _ in copied]
# Update the current object pointers immediately, so that
# subsequent undo operations within this transaction will see
# the new current objects.
- adapter.update_current(cursor, self_tid_int)
+ adapter.mover.update_current(cursor, self_tid_int)
if self.fshelper is not None:
self._copy_undone_blobs(copied)
return self._tid, oids
finally:
- adapter.release_pack_lock(cursor)
+ adapter.locker.release_pack_lock(cursor)
finally:
self._lock_release()
@@ -930,7 +934,7 @@
self._add_blob_to_transaction(oid, new_fn)
- def pack(self, t, referencesf, sleep=time.sleep):
+ def pack(self, t, referencesf, sleep=None):
if self._is_read_only:
raise POSException.ReadOnlyError()
@@ -950,12 +954,13 @@
# connections to do the actual work, allowing the adapter
# to use special transaction modes for packing.
adapter = self._adapter
- lock_conn, lock_cursor = adapter.open()
+ lock_conn, lock_cursor = adapter.connmanager.open()
try:
- adapter.hold_pack_lock(lock_cursor)
+ adapter.locker.hold_pack_lock(lock_cursor)
try:
# Find the latest commit before or at the pack time.
- tid_int = adapter.choose_pack_transaction(pack_point_int)
+ tid_int = adapter.packundo.choose_pack_transaction(
+ pack_point_int)
if tid_int is None:
log.debug("all transactions before %s have already "
"been packed", time.ctime(t))
@@ -971,7 +976,8 @@
# In pre_pack, the adapter fills tables with
# information about what to pack. The adapter
# must not actually pack anything yet.
- adapter.pre_pack(tid_int, get_references, self._options)
+ adapter.packundo.pre_pack(
+ tid_int, get_references, self._options)
if self._options.pack_dry_run:
log.info("pack: dry run complete")
@@ -981,13 +987,13 @@
packed_func = self._after_pack
else:
packed_func = None
- adapter.pack(tid_int, self._options, sleep=sleep,
+ adapter.packundo.pack(tid_int, self._options, sleep=sleep,
packed_func=packed_func)
finally:
- adapter.release_pack_lock(lock_cursor)
+ adapter.locker.release_pack_lock(lock_cursor)
finally:
lock_conn.rollback()
- adapter.close(lock_conn, lock_cursor)
+ adapter.connmanager.close(lock_conn, lock_cursor)
self.sync()
def _after_pack(self, oid_int, tid_int):
@@ -1083,7 +1089,7 @@
ignore_tid = None
# get a list of changed OIDs and the most recent tid
- oid_ints, new_polled_tid = self._adapter.poll_invalidations(
+ oid_ints, new_polled_tid = self._adapter.poller.poll_invalidations(
conn, cursor, self._prev_polled_tid, ignore_tid)
self._prev_polled_tid = new_polled_tid
@@ -1247,7 +1253,7 @@
def __init__(self, adapter, start, stop):
self._adapter = adapter
- self._conn, self._cursor = self._adapter.open_for_load()
+ self._conn, self._cursor = self._adapter.connmanager.open_for_load()
self._closed = False
if start is not None:
@@ -1260,12 +1266,12 @@
stop_int = None
# _transactions: [(tid, username, description, extension, packed)]
- self._transactions = list(adapter.iter_transactions_range(
+ self._transactions = list(adapter.dbiter.iter_transactions_range(
self._cursor, start_int, stop_int))
self._index = 0
def close(self):
- self._adapter.close(self._conn, self._cursor)
+ self._adapter.connmanager.close(self._conn, self._cursor)
self._closed = True
def iterator(self):
@@ -1326,7 +1332,7 @@
adapter = record._trans_iter._adapter
tid_int = record._tid_int
self.tid = record.tid
- self._records = list(adapter.iter_objects(cursor, tid_int))
+ self._records = list(adapter.dbiter.iter_objects(cursor, tid_int))
self._index = 0
def __iter__(self):
Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/tests/reltestbase.py 2009-09-24 18:06:37 UTC (rev 104499)
@@ -74,8 +74,8 @@
):
def checkDropAndPrepare(self):
- self._storage._adapter.drop_all()
- self._storage._adapter.prepare_schema()
+ self._storage._adapter.schema.drop_all()
+ self._storage._adapter.schema.prepare()
def checkCrossConnectionInvalidation(self):
# Verify connections see updated state at txn boundaries
Modified: relstorage/trunk/relstorage/tests/speedtest.py
===================================================================
--- relstorage/trunk/relstorage/tests/speedtest.py 2009-09-24 18:00:10 UTC (rev 104498)
+++ relstorage/trunk/relstorage/tests/speedtest.py 2009-09-24 18:06:37 UTC (rev 104499)
@@ -196,8 +196,8 @@
def postgres_test(self):
from relstorage.adapters.postgresql import PostgreSQLAdapter
adapter = PostgreSQLAdapter('dbname=relstoragetest')
- adapter.prepare_schema()
- adapter.zap_all()
+ adapter.schema.prepare()
+ adapter.schema.zap_all()
def make_storage():
return RelStorage(adapter)
return self.run_tests(make_storage)
@@ -207,8 +207,8 @@
from relstorage.tests.testoracle import getOracleParams
user, password, dsn = getOracleParams()
adapter = OracleAdapter(user, password, dsn)
- adapter.prepare_schema()
- adapter.zap_all()
+ adapter.schema.prepare()
+ adapter.schema.zap_all()
def make_storage():
return RelStorage(adapter)
return self.run_tests(make_storage)
@@ -216,8 +216,8 @@
def mysql_test(self):
from relstorage.adapters.mysql import MySQLAdapter
adapter = MySQLAdapter(db='relstoragetest')
- adapter.prepare_schema()
- adapter.zap_all()
+ adapter.schema.prepare()
+ adapter.schema.zap_all()
def make_storage():
return RelStorage(adapter)
return self.run_tests(make_storage)
More information about the checkins
mailing list