[Checkins] SVN: relstorage/trunk/relstorage/adapters/ Merged hpmover.py and hfmover.py into mover.py.
Shane Hathaway
shane at hathawaymix.org
Thu Sep 24 18:00:06 EDT 2009
Log message for revision 104514:
Merged hpmover.py and hfmover.py into mover.py.
Changed:
D relstorage/trunk/relstorage/adapters/hfmover.py
D relstorage/trunk/relstorage/adapters/hpmover.py
U relstorage/trunk/relstorage/adapters/locker.py
A relstorage/trunk/relstorage/adapters/mover.py
U relstorage/trunk/relstorage/adapters/mysql.py
U relstorage/trunk/relstorage/adapters/oracle.py
U relstorage/trunk/relstorage/adapters/packundo.py
U relstorage/trunk/relstorage/adapters/postgresql.py
U relstorage/trunk/relstorage/adapters/schema.py
U relstorage/trunk/relstorage/adapters/txncontrol.py
-=-
Deleted: relstorage/trunk/relstorage/adapters/hfmover.py
===================================================================
--- relstorage/trunk/relstorage/adapters/hfmover.py 2009-09-24 21:14:15 UTC (rev 104513)
+++ relstorage/trunk/relstorage/adapters/hfmover.py 2009-09-24 22:00:06 UTC (rev 104514)
@@ -1,55 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2009 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""History-free IObjectMover implementation.
-"""
-
-from base64 import decodestring
-from base64 import encodestring
-from relstorage.adapters.interfaces import IObjectMover
-from ZODB.POSException import StorageError
-from zope.interface import implements
-
-
-class HistoryFreeObjectMover(object):
- implements(IObjectMover)
-
- _method_names = (
- 'get_current_tid',
- 'load_current',
- 'load_revision',
- 'exists',
- 'load_before',
- 'get_object_tid_after',
- 'on_store_opened',
- 'store_temp',
- 'replace_temp',
- 'restore',
- 'detect_conflict',
- 'move_from_temp',
- 'update_current',
- )
-
- def __init__(self, database_name, runner=None,
- Binary=None, inputsize_BLOB=None, inputsize_BINARY=None):
- # The inputsize parameters are for Oracle only.
- self.database_name = database_name
- self.runner = runner
- self.Binary = Binary
- self.inputsize_BLOB = inputsize_BLOB
- self.inputsize_BINARY = inputsize_BINARY
-
- for method_name in self._method_names:
- method = getattr(self, '%s_%s' % (database_name, method_name))
- setattr(self, method_name, method)
-
Deleted: relstorage/trunk/relstorage/adapters/hpmover.py
===================================================================
--- relstorage/trunk/relstorage/adapters/hpmover.py 2009-09-24 21:14:15 UTC (rev 104513)
+++ relstorage/trunk/relstorage/adapters/hpmover.py 2009-09-24 22:00:06 UTC (rev 104514)
@@ -1,714 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2009 Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""History preserving IObjectMover implementation.
-"""
-
-from base64 import decodestring
-from base64 import encodestring
-from relstorage.adapters.interfaces import IObjectMover
-from ZODB.POSException import StorageError
-from zope.interface import implements
-
-try:
- from hashlib import md5
-except ImportError:
- from md5 import new as md5
-
-
-def compute_md5sum(data):
- if data is not None:
- return md5(data).hexdigest()
- else:
- # George Bailey object
- return None
-
-
-class HistoryPreservingObjectMover(object):
- implements(IObjectMover)
-
- _method_names = (
- 'get_current_tid',
- 'load_current',
- 'load_revision',
- 'exists',
- 'load_before',
- 'get_object_tid_after',
- 'on_store_opened',
- 'store_temp',
- 'replace_temp',
- 'restore',
- 'detect_conflict',
- 'move_from_temp',
- 'update_current',
- )
-
- def __init__(self, database_name, runner=None,
- Binary=None, inputsize_BLOB=None, inputsize_BINARY=None):
- # The inputsize parameters are for Oracle only.
- self.database_name = database_name
- self.runner = runner
- self.Binary = Binary
- self.inputsize_BLOB = inputsize_BLOB
- self.inputsize_BINARY = inputsize_BINARY
-
- for method_name in self._method_names:
- method = getattr(self, '%s_%s' % (database_name, method_name))
- setattr(self, method_name, method)
-
-
-
-
- def generic_get_current_tid(self, cursor, oid):
- """Returns the current integer tid for an object.
-
- oid is an integer. Returns None if object does not exist.
- """
- stmt = """
- SELECT tid
- FROM current_object
- WHERE zoid = %s
- """
- cursor.execute(stmt, (oid,))
- for (tid,) in cursor:
- return tid
- return None
-
- postgresql_get_current_tid = generic_get_current_tid
- mysql_get_current_tid = generic_get_current_tid
-
- def oracle_get_current_tid(self, cursor, oid):
- """Returns the current integer tid for an object.
-
- oid is an integer. Returns None if object does not exist.
- """
- stmt = """
- SELECT tid
- FROM current_object
- WHERE zoid = :1
- """
- cursor.execute(stmt, (oid,))
- for (tid,) in cursor:
- return tid
- return None
-
-
-
-
- def postgresql_load_current(self, cursor, oid):
- """Returns the current pickle and integer tid for an object.
-
- oid is an integer. Returns (None, None) if object does not exist.
- """
- stmt = """
- SELECT encode(state, 'base64'), tid
- FROM current_object
- JOIN object_state USING(zoid, tid)
- WHERE zoid = %s
- """
- cursor.execute(stmt, (oid,))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- state64, tid = cursor.fetchone()
- if state64 is not None:
- state = decodestring(state64)
- else:
- # This object's creation has been undone
- state = None
- return state, tid
- else:
- return None, None
-
- def mysql_load_current(self, cursor, oid):
- """Returns the current pickle and integer tid for an object.
-
- oid is an integer. Returns (None, None) if object does not exist.
- """
- stmt = """
- SELECT state, tid
- FROM current_object
- JOIN object_state USING(zoid, tid)
- WHERE zoid = %s
- """
- cursor.execute(stmt, (oid,))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()
- else:
- return None, None
-
- def oracle_load_current(self, cursor, oid):
- """Returns the current pickle and integer tid for an object.
-
- oid is an integer. Returns (None, None) if object does not exist.
- """
- stmt = """
- SELECT state, tid
- FROM current_object
- JOIN object_state USING(zoid, tid)
- WHERE zoid = :1
- """
- return self.runner.run_lob_stmt(
- cursor, stmt, (oid,), default=(None, None))
-
-
-
-
- def postgresql_load_revision(self, cursor, oid, tid):
- """Returns the pickle for an object on a particular transaction.
-
- Returns None if no such state exists.
- """
- stmt = """
- SELECT encode(state, 'base64')
- FROM object_state
- WHERE zoid = %s
- AND tid = %s
- """
- cursor.execute(stmt, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- (state64,) = cursor.fetchone()
- if state64 is not None:
- return decodestring(state64)
- return None
-
- def mysql_load_revision(self, cursor, oid, tid):
- """Returns the pickle for an object on a particular transaction.
-
- Returns None if no such state exists.
- """
- stmt = """
- SELECT state
- FROM object_state
- WHERE zoid = %s
- AND tid = %s
- """
- cursor.execute(stmt, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- (state,) = cursor.fetchone()
- return state
- return None
-
- def oracle_load_revision(self, cursor, oid, tid):
- """Returns the pickle for an object on a particular transaction.
-
- Returns None if no such state exists.
- """
- stmt = """
- SELECT state
- FROM object_state
- WHERE zoid = :1
- AND tid = :2
- """
- (state,) = self.runner.run_lob_stmt(
- cursor, stmt, (oid, tid), default=(None,))
- return state
-
-
-
-
- def generic_exists(self, cursor, oid):
- """Returns a true value if the given object exists."""
- stmt = "SELECT 1 FROM current_object WHERE zoid = %s"
- cursor.execute(stmt, (oid,))
- for row in cursor:
- return True
- return False
-
- postgresql_exists = generic_exists
- mysql_exists = generic_exists
-
- def oracle_exists(self, cursor, oid):
- """Returns a true value if the given object exists."""
- stmt = "SELECT 1 FROM current_object WHERE zoid = :1"
- cursor.execute(stmt, (oid,))
- for row in cursor:
- return True
- return False
-
-
-
-
- def postgresql_load_before(self, cursor, oid, tid):
- """Returns the pickle and tid of an object before transaction tid.
-
- Returns (None, None) if no earlier state exists.
- """
- stmt = """
- SELECT encode(state, 'base64'), tid
- FROM object_state
- WHERE zoid = %s
- AND tid < %s
- ORDER BY tid DESC
- LIMIT 1
- """
- cursor.execute(stmt, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- state64, tid = cursor.fetchone()
- if state64 is not None:
- state = decodestring(state64)
- else:
- # The object's creation has been undone
- state = None
- return state, tid
- else:
- return None, None
-
- def mysql_load_before(self, cursor, oid, tid):
- """Returns the pickle and tid of an object before transaction tid.
-
- Returns (None, None) if no earlier state exists.
- """
- stmt = """
- SELECT state, tid
- FROM object_state
- WHERE zoid = %s
- AND tid < %s
- ORDER BY tid DESC
- LIMIT 1
- """
- cursor.execute(stmt, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()
- else:
- return None, None
-
- def oracle_load_before(self, cursor, oid, tid):
- """Returns the pickle and tid of an object before transaction tid.
-
- Returns (None, None) if no earlier state exists.
- """
- stmt = """
- SELECT state, tid
- FROM object_state
- WHERE zoid = :oid
- AND tid = (
- SELECT MAX(tid)
- FROM object_state
- WHERE zoid = :oid
- AND tid < :tid
- )
- """
- return self.runner.run_lob_stmt(
- cursor, stmt, {'oid': oid, 'tid': tid}, default=(None, None))
-
-
-
-
- def generic_get_object_tid_after(self, cursor, oid, tid):
- """Returns the tid of the next change after an object revision.
-
- Returns None if no later state exists.
- """
- stmt = """
- SELECT tid
- FROM object_state
- WHERE zoid = %s
- AND tid > %s
- ORDER BY tid
- LIMIT 1
- """
- cursor.execute(stmt, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()[0]
- else:
- return None
-
- postgresql_get_object_tid_after = generic_get_object_tid_after
- mysql_get_object_tid_after = generic_get_object_tid_after
-
- def oracle_get_object_tid_after(self, cursor, oid, tid):
- """Returns the tid of the next change after an object revision.
-
- Returns None if no later state exists.
- """
- stmt = """
- SELECT MIN(tid)
- FROM object_state
- WHERE zoid = :1
- AND tid > :2
- """
- cursor.execute(stmt, (oid, tid))
- rows = cursor.fetchall()
- if rows:
- assert len(rows) == 1
- return rows[0][0]
- else:
- return None
-
-
-
-
- def postgresql_on_store_opened(self, cursor, restart=False):
- """Create the temporary table for storing objects"""
- stmt = """
- CREATE TEMPORARY TABLE temp_store (
- zoid BIGINT NOT NULL,
- prev_tid BIGINT NOT NULL,
- md5 CHAR(32),
- state BYTEA
- ) ON COMMIT DROP;
- CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
- """
- cursor.execute(stmt)
-
- def mysql_on_store_opened(self, cursor, restart=False):
- """Create the temporary table for storing objects"""
- if restart:
- stmt = """
- DROP TEMPORARY TABLE IF EXISTS temp_store
- """
- cursor.execute(stmt)
-
- stmt = """
- CREATE TEMPORARY TABLE temp_store (
- zoid BIGINT NOT NULL PRIMARY KEY,
- prev_tid BIGINT NOT NULL,
- md5 CHAR(32),
- state LONGBLOB
- ) ENGINE MyISAM
- """
- cursor.execute(stmt)
-
- # no store connection initialization needed for Oracle
- oracle_on_store_opened = None
-
-
-
-
- def postgresql_store_temp(self, cursor, oid, prev_tid, data):
- """Store an object in the temporary table."""
- md5sum = compute_md5sum(data)
- stmt = """
- DELETE FROM temp_store WHERE zoid = %s;
- INSERT INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (%s, %s, %s, decode(%s, 'base64'))
- """
- cursor.execute(stmt, (oid, oid, prev_tid, md5sum, encodestring(data)))
-
- def mysql_store_temp(self, cursor, oid, prev_tid, data):
- """Store an object in the temporary table."""
- md5sum = compute_md5sum(data)
- stmt = """
- REPLACE INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (%s, %s, %s, %s)
- """
- cursor.execute(stmt, (oid, prev_tid, md5sum, self.Binary(data)))
-
- def oracle_store_temp(self, cursor, oid, prev_tid, data):
- """Store an object in the temporary table."""
- md5sum = compute_md5sum(data)
- cursor.execute("DELETE FROM temp_store WHERE zoid = :oid", oid=oid)
- if len(data) <= 2000:
- # Send data inline for speed. Oracle docs say maximum size
- # of a RAW is 2000 bytes. inputsize_BINARY corresponds with RAW.
- cursor.setinputsizes(rawdata=self.inputsize_BINARY)
- stmt = """
- INSERT INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (:oid, :prev_tid, :md5sum, :rawdata)
- """
- cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
- md5sum=md5sum, rawdata=data)
- else:
- # Send data as a BLOB
- cursor.setinputsizes(blobdata=self.inputsize_BLOB)
- stmt = """
- INSERT INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (:oid, :prev_tid, :md5sum, :blobdata)
- """
- cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
- md5sum=md5sum, blobdata=data)
-
-
-
-
- def postgresql_replace_temp(self, cursor, oid, prev_tid, data):
- """Replace an object in the temporary table.
-
- This happens after conflict resolution.
- """
- md5sum = compute_md5sum(data)
- stmt = """
- UPDATE temp_store SET
- prev_tid = %s,
- md5 = %s,
- state = decode(%s, 'base64')
- WHERE zoid = %s
- """
- cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
-
- def mysql_replace_temp(self, cursor, oid, prev_tid, data):
- """Replace an object in the temporary table.
-
- This happens after conflict resolution.
- """
- md5sum = compute_md5sum(data)
- stmt = """
- UPDATE temp_store SET
- prev_tid = %s,
- md5 = %s,
- state = %s
- WHERE zoid = %s
- """
- cursor.execute(stmt, (prev_tid, md5sum, self.Binary(data), oid))
-
- def oracle_replace_temp(self, cursor, oid, prev_tid, data):
- """Replace an object in the temporary table.
-
- This happens after conflict resolution.
- """
- md5sum = compute_md5sum(data)
- cursor.setinputsizes(data=self.inputsize_BLOB)
- stmt = """
- UPDATE temp_store SET
- prev_tid = :prev_tid,
- md5 = :md5sum,
- state = :data
- WHERE zoid = :oid
- """
- cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
- md5sum=md5sum, data=self.Binary(data))
-
-
-
-
- def postgresql_restore(self, cursor, oid, tid, data):
- """Store an object directly, without conflict detection.
-
- Used for copying transactions into this database.
- """
- md5sum = compute_md5sum(data)
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (%s, %s,
- COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
- %s, decode(%s, 'base64'))
- """
- if data is not None:
- data = encodestring(data)
- cursor.execute(stmt, (oid, tid, oid, md5sum, data))
-
- def mysql_restore(self, cursor, oid, tid, data):
- """Store an object directly, without conflict detection.
-
- Used for copying transactions into this database.
- """
- md5sum = compute_md5sum(data)
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (%s, %s,
- COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
- %s, %s)
- """
- if data is not None:
- data = self.Binary(data)
- cursor.execute(stmt, (oid, tid, oid, md5sum, data))
-
- def oracle_restore(self, cursor, oid, tid, data):
- """Store an object directly, without conflict detection.
-
- Used for copying transactions into this database.
- """
- md5sum = compute_md5sum(data)
- if not data or len(data) <= 2000:
- # Send data inline for speed. Oracle docs say maximum size
- # of a RAW is 2000 bytes. inputsize_BINARY corresponds with RAW.
- cursor.setinputsizes(rawdata=self.inputsize_BINARY)
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (:oid, :tid,
- COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
- :md5sum, :rawdata)
- """
- cursor.execute(stmt, oid=oid, tid=tid,
- md5sum=md5sum, rawdata=data)
- else:
- # Send data as a BLOB
- cursor.setinputsizes(blobdata=self.inputsize_BLOB)
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (:oid, :tid,
- COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
- :md5sum, :blobdata)
- """
- cursor.execute(stmt, oid=oid, tid=tid,
- md5sum=md5sum, blobdata=data)
-
-
-
-
- def postgresql_detect_conflict(self, cursor):
- """Find one conflict in the data about to be committed.
-
- If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
- attempted_data). If there is no conflict, returns None.
- """
- stmt = """
- SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
- encode(temp_store.state, 'base64')
- FROM temp_store
- JOIN current_object ON (temp_store.zoid = current_object.zoid)
- WHERE temp_store.prev_tid != current_object.tid
- LIMIT 1
- """
- cursor.execute(stmt)
- if cursor.rowcount:
- oid, prev_tid, attempted_prev_tid, data = cursor.fetchone()
- return oid, prev_tid, attempted_prev_tid, decodestring(data)
- return None
-
- def mysql_detect_conflict(self, cursor):
- """Find one conflict in the data about to be committed.
-
- If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
- attempted_data). If there is no conflict, returns None.
- """
- # Lock in share mode to ensure the data being read is up to date.
- stmt = """
- SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
- temp_store.state
- FROM temp_store
- JOIN current_object ON (temp_store.zoid = current_object.zoid)
- WHERE temp_store.prev_tid != current_object.tid
- LIMIT 1
- LOCK IN SHARE MODE
- """
- cursor.execute(stmt)
- if cursor.rowcount:
- return cursor.fetchone()
- return None
-
- def oracle_detect_conflict(self, cursor):
- """Find one conflict in the data about to be committed.
-
- If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
- attempted_data). If there is no conflict, returns None.
- """
- stmt = """
- SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
- temp_store.state
- FROM temp_store
- JOIN current_object ON (temp_store.zoid = current_object.zoid)
- WHERE temp_store.prev_tid != current_object.tid
- """
- return self.runner.run_lob_stmt(cursor, stmt)
-
-
-
-
- def generic_move_from_temp(self, cursor, tid):
- """Moved the temporarily stored objects to permanent storage.
-
- Returns the list of oids stored.
- """
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- SELECT zoid, %s, prev_tid, md5, state
- FROM temp_store
- """
- cursor.execute(stmt, (tid,))
-
- stmt = """
- SELECT zoid FROM temp_store
- """
- cursor.execute(stmt)
- return [oid for (oid,) in cursor]
-
- postgresql_move_from_temp = generic_move_from_temp
- mysql_move_from_temp = generic_move_from_temp
-
- def oracle_move_from_temp(self, cursor, tid):
- """Move the temporarily stored objects to permanent storage.
-
- Returns the list of oids stored.
- """
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- SELECT zoid, :tid, prev_tid, md5, state
- FROM temp_store
- """
- cursor.execute(stmt, tid=tid)
-
- stmt = """
- SELECT zoid FROM temp_store
- """
- cursor.execute(stmt)
- return [oid for (oid,) in cursor]
-
-
-
-
- def postgresql_update_current(self, cursor, tid):
- """Update the current object pointers.
-
- tid is the integer tid of the transaction being committed.
- """
- cursor.execute("""
- -- Insert objects created in this transaction into current_object.
- INSERT INTO current_object (zoid, tid)
- SELECT zoid, tid FROM object_state
- WHERE tid = %(tid)s
- AND prev_tid = 0;
-
- -- Change existing objects. To avoid deadlocks,
- -- update in OID order.
- UPDATE current_object SET tid = %(tid)s
- WHERE zoid IN (
- SELECT zoid FROM object_state
- WHERE tid = %(tid)s
- AND prev_tid != 0
- ORDER BY zoid
- )
- """, {'tid': tid})
-
- def mysql_update_current(self, cursor, tid):
- """Update the current object pointers.
-
- tid is the integer tid of the transaction being committed.
- """
- cursor.execute("""
- REPLACE INTO current_object (zoid, tid)
- SELECT zoid, tid FROM object_state
- WHERE tid = %s
- """, (tid,))
-
- def oracle_update_current(self, cursor, tid):
- """Update the current object pointers.
-
- tid is the integer tid of the transaction being committed.
- """
- # Insert objects created in this transaction into current_object.
- stmt = """
- INSERT INTO current_object (zoid, tid)
- SELECT zoid, tid FROM object_state
- WHERE tid = :1
- AND prev_tid = 0
- """
- cursor.execute(stmt, (tid,))
-
- # Change existing objects.
- stmt = """
- UPDATE current_object SET tid = :1
- WHERE zoid IN (
- SELECT zoid FROM object_state
- WHERE tid = :1
- AND prev_tid != 0
- )
- """
- cursor.execute(stmt, (tid,))
-
Modified: relstorage/trunk/relstorage/adapters/locker.py
===================================================================
--- relstorage/trunk/relstorage/adapters/locker.py 2009-09-24 21:14:15 UTC (rev 104513)
+++ relstorage/trunk/relstorage/adapters/locker.py 2009-09-24 22:00:06 UTC (rev 104514)
@@ -24,8 +24,9 @@
class Locker(object):
- def __init__(self, database_errors):
- self.database_errors = database_errors
+ def __init__(self, keep_history, lock_exceptions):
+ self.keep_history = keep_history
+ self.lock_exceptions = lock_exceptions
class PostgreSQLLocker(Locker):
@@ -37,11 +38,18 @@
# (for as short a time as possible).
# Lock transaction and current_object in share mode to ensure
# conflict detection has the most current data.
- cursor.execute("""
- LOCK TABLE commit_lock IN EXCLUSIVE MODE;
- LOCK TABLE transaction IN SHARE MODE;
- LOCK TABLE current_object IN SHARE MODE
- """)
+ if self.keep_history:
+ stmt = """
+ LOCK TABLE commit_lock IN EXCLUSIVE MODE;
+ LOCK TABLE transaction IN SHARE MODE;
+ LOCK TABLE current_object IN SHARE MODE
+ """
+ else:
+ stmt = """
+ LOCK TABLE commit_lock IN EXCLUSIVE MODE;
+ LOCK TABLE object_state IN SHARE MODE
+ """
+ cursor.execute(stmt)
else:
cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
@@ -81,7 +89,7 @@
# b/w compat
try:
cursor.execute("LOCK pack_lock IN EXCLUSIVE MODE NOWAIT")
- except self.database_errors: # psycopg2.DatabaseError:
+ except self.lock_exceptions: # psycopg2.DatabaseError:
raise StorageError('A pack or undo operation is in progress')
def release_pack_lock(self, cursor):
@@ -129,10 +137,13 @@
# (for as short a time as possible).
cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
if ensure_current:
- # Lock transaction and current_object in share mode to ensure
- # conflict detection has the most current data.
- cursor.execute("LOCK TABLE transaction IN SHARE MODE")
- cursor.execute("LOCK TABLE current_object IN SHARE MODE")
+ if self.keep_history:
+ # Lock transaction and current_object in share mode to ensure
+ # conflict detection has the most current data.
+ cursor.execute("LOCK TABLE transaction IN SHARE MODE")
+ cursor.execute("LOCK TABLE current_object IN SHARE MODE")
+ else:
+ cursor.execute("LOCK TABLE object_state IN SHARE MODE")
def release_commit_lock(self, cursor):
# no action needed
@@ -148,7 +159,7 @@
"""
try:
cursor.execute(stmt)
- except self.database_errors: # cx_Oracle.DatabaseError:
+ except self.lock_exceptions: # cx_Oracle.DatabaseError:
raise StorageError('A pack or undo operation is in progress')
def release_pack_lock(self, cursor):
Copied: relstorage/trunk/relstorage/adapters/mover.py (from rev 104507, relstorage/trunk/relstorage/adapters/hpmover.py)
===================================================================
--- relstorage/trunk/relstorage/adapters/mover.py (rev 0)
+++ relstorage/trunk/relstorage/adapters/mover.py 2009-09-24 22:00:06 UTC (rev 104514)
@@ -0,0 +1,877 @@
+##############################################################################
+#
+# Copyright (c) 2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""History preserving IObjectMover implementation.
+"""
+
+from base64 import decodestring
+from base64 import encodestring
+from relstorage.adapters.interfaces import IObjectMover
+from ZODB.POSException import StorageError
+from zope.interface import implements
+
+try:
+ from hashlib import md5
+except ImportError:
+ from md5 import new as md5
+
+
+def compute_md5sum(data):
+ if data is not None:
+ return md5(data).hexdigest()
+ else:
+ # George Bailey object
+ return None
+
+
+class ObjectMover(object):
+ implements(IObjectMover)
+
+ _method_names = (
+ 'get_current_tid',
+ 'load_current',
+ 'load_revision',
+ 'exists',
+ 'load_before',
+ 'get_object_tid_after',
+ 'on_store_opened',
+ 'store_temp',
+ 'replace_temp',
+ 'restore',
+ 'detect_conflict',
+ 'move_from_temp',
+ 'update_current',
+ )
+
+ def __init__(self, database_name, keep_history, runner=None,
+ Binary=None, inputsize_BLOB=None, inputsize_BINARY=None):
+ # The inputsize parameters are for Oracle only.
+ self.database_name = database_name
+ self.keep_history = keep_history
+ self.runner = runner
+ self.Binary = Binary
+ self.inputsize_BLOB = inputsize_BLOB
+ self.inputsize_BINARY = inputsize_BINARY
+
+ for method_name in self._method_names:
+ method = getattr(self, '%s_%s' % (database_name, method_name))
+ setattr(self, method_name, method)
+
+
+
+
+ def generic_get_current_tid(self, cursor, oid):
+ """Returns the current integer tid for an object.
+
+ oid is an integer. Returns None if object does not exist.
+ """
+ if self.keep_history:
+ stmt = """
+ SELECT tid
+ FROM current_object
+ WHERE zoid = %s
+ """
+ else:
+ stmt = """
+ SELECT tid
+ FROM object_state
+ WHERE zoid = %s
+ """
+ cursor.execute(stmt, (oid,))
+ for (tid,) in cursor:
+ return tid
+ return None
+
+ postgresql_get_current_tid = generic_get_current_tid
+ mysql_get_current_tid = generic_get_current_tid
+
+ def oracle_get_current_tid(self, cursor, oid):
+ """Returns the current integer tid for an object.
+
+ oid is an integer. Returns None if object does not exist.
+ """
+ if self.keep_history:
+ stmt = """
+ SELECT tid
+ FROM current_object
+ WHERE zoid = :1
+ """
+ else:
+ stmt = """
+ SELECT tid
+ FROM object_state
+ WHERE zoid = :1
+ """
+ cursor.execute(stmt, (oid,))
+ for (tid,) in cursor:
+ return tid
+ return None
+
+
+
+
+ def postgresql_load_current(self, cursor, oid):
+ """Returns the current pickle and integer tid for an object.
+
+ oid is an integer. Returns (None, None) if object does not exist.
+ """
+ if self.keep_history:
+ stmt = """
+ SELECT encode(state, 'base64'), tid
+ FROM current_object
+ JOIN object_state USING(zoid, tid)
+ WHERE zoid = %s
+ """
+ else:
+ stmt = """
+ SELECT encode(state, 'base64'), tid
+ FROM object_state
+ WHERE zoid = %s
+ """
+ cursor.execute(stmt, (oid,))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ state64, tid = cursor.fetchone()
+ if state64 is not None:
+ state = decodestring(state64)
+ else:
+ # This object's creation has been undone
+ state = None
+ return state, tid
+ else:
+ return None, None
+
+ def mysql_load_current(self, cursor, oid):
+ """Returns the current pickle and integer tid for an object.
+
+ oid is an integer. Returns (None, None) if object does not exist.
+ """
+ if self.keep_history:
+ stmt = """
+ SELECT state, tid
+ FROM current_object
+ JOIN object_state USING(zoid, tid)
+ WHERE zoid = %s
+ """
+ else:
+ stmt = """
+ SELECT state, tid
+ FROM object_state
+ WHERE zoid = %s
+ """
+ cursor.execute(stmt, (oid,))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()
+ else:
+ return None, None
+
+ def oracle_load_current(self, cursor, oid):
+ """Returns the current pickle and integer tid for an object.
+
+ oid is an integer. Returns (None, None) if object does not exist.
+ """
+ if self.keep_history:
+ stmt = """
+ SELECT state, tid
+ FROM current_object
+ JOIN object_state USING(zoid, tid)
+ WHERE zoid = :1
+ """
+ else:
+ stmt = """
+ SELECT state, tid
+ FROM object_state
+ WHERE zoid = :1
+ """
+ return self.runner.run_lob_stmt(
+ cursor, stmt, (oid,), default=(None, None))
+
+
+
+
+ def postgresql_load_revision(self, cursor, oid, tid):
+ """Returns the pickle for an object on a particular transaction.
+
+ Returns None if no such state exists.
+ """
+ stmt = """
+ SELECT encode(state, 'base64')
+ FROM object_state
+ WHERE zoid = %s
+ AND tid = %s
+ """
+ cursor.execute(stmt, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ (state64,) = cursor.fetchone()
+ if state64 is not None:
+ return decodestring(state64)
+ return None
+
+ def mysql_load_revision(self, cursor, oid, tid):
+ """Returns the pickle for an object on a particular transaction.
+
+ Returns None if no such state exists.
+ """
+ stmt = """
+ SELECT state
+ FROM object_state
+ WHERE zoid = %s
+ AND tid = %s
+ """
+ cursor.execute(stmt, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ (state,) = cursor.fetchone()
+ return state
+ return None
+
+ def oracle_load_revision(self, cursor, oid, tid):
+ """Returns the pickle for an object on a particular transaction.
+
+ Returns None if no such state exists.
+ """
+ stmt = """
+ SELECT state
+ FROM object_state
+ WHERE zoid = :1
+ AND tid = :2
+ """
+ (state,) = self.runner.run_lob_stmt(
+ cursor, stmt, (oid, tid), default=(None,))
+ return state
+
+
+
+
+ def generic_exists(self, cursor, oid):
+ """Returns a true value if the given object exists."""
+ if self.keep_history:
+ stmt = "SELECT 1 FROM current_object WHERE zoid = %s"
+ else:
+ stmt = "SELECT 1 FROM object_state WHERE zoid = %s"
+ cursor.execute(stmt, (oid,))
+ return cursor.rowcount
+
+ postgresql_exists = generic_exists
+ mysql_exists = generic_exists
+
+ def oracle_exists(self, cursor, oid):
+ """Returns a true value if the given object exists."""
+ if self.keep_history:
+ stmt = "SELECT 1 FROM current_object WHERE zoid = :1"
+ else:
+ stmt = "SELECT 1 FROM object_state WHERE zoid = :1"
+ cursor.execute(stmt, (oid,))
+ for row in cursor:
+ return True
+ return False
+
+
+
+
+ def postgresql_load_before(self, cursor, oid, tid):
+ """Returns the pickle and tid of an object before transaction tid.
+
+ Returns (None, None) if no earlier state exists.
+ """
+ stmt = """
+ SELECT encode(state, 'base64'), tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid < %s
+ ORDER BY tid DESC
+ LIMIT 1
+ """
+ cursor.execute(stmt, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ state64, tid = cursor.fetchone()
+ if state64 is not None:
+ state = decodestring(state64)
+ else:
+ # The object's creation has been undone
+ state = None
+ return state, tid
+ else:
+ return None, None
+
+ def mysql_load_before(self, cursor, oid, tid):
+ """Returns the pickle and tid of an object before transaction tid.
+
+ Returns (None, None) if no earlier state exists.
+ """
+ stmt = """
+ SELECT state, tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid < %s
+ ORDER BY tid DESC
+ LIMIT 1
+ """
+ cursor.execute(stmt, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()
+ else:
+ return None, None
+
+ def oracle_load_before(self, cursor, oid, tid):
+ """Returns the pickle and tid of an object before transaction tid.
+
+ Returns (None, None) if no earlier state exists.
+ """
+ stmt = """
+ SELECT state, tid
+ FROM object_state
+ WHERE zoid = :oid
+ AND tid = (
+ SELECT MAX(tid)
+ FROM object_state
+ WHERE zoid = :oid
+ AND tid < :tid
+ )
+ """
+ return self.runner.run_lob_stmt(
+ cursor, stmt, {'oid': oid, 'tid': tid}, default=(None, None))
+
+
+
+
+ def generic_get_object_tid_after(self, cursor, oid, tid):
+ """Returns the tid of the next change after an object revision.
+
+ Returns None if no later state exists.
+ """
+ stmt = """
+ SELECT tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid > %s
+ ORDER BY tid
+ LIMIT 1
+ """
+ cursor.execute(stmt, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()[0]
+ else:
+ return None
+
+ postgresql_get_object_tid_after = generic_get_object_tid_after
+ mysql_get_object_tid_after = generic_get_object_tid_after
+
+ def oracle_get_object_tid_after(self, cursor, oid, tid):
+ """Returns the tid of the next change after an object revision.
+
+ Returns None if no later state exists.
+ """
+ stmt = """
+ SELECT MIN(tid)
+ FROM object_state
+ WHERE zoid = :1
+ AND tid > :2
+ """
+ cursor.execute(stmt, (oid, tid))
+ rows = cursor.fetchall()
+ if rows:
+ assert len(rows) == 1
+ return rows[0][0]
+ else:
+ return None
+
+
+
+
+ def postgresql_on_store_opened(self, cursor, restart=False):
+ """Create the temporary table for storing objects"""
+ # note that the md5 column is not used if self.keep_history == False.
+ stmt = """
+ CREATE TEMPORARY TABLE temp_store (
+ zoid BIGINT NOT NULL,
+ prev_tid BIGINT NOT NULL,
+ md5 CHAR(32),
+ state BYTEA
+ ) ON COMMIT DROP;
+ CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+ """
+ cursor.execute(stmt)
+
+ def mysql_on_store_opened(self, cursor, restart=False):
+ """Create the temporary table for storing objects"""
+ if restart:
+ stmt = """
+ DROP TEMPORARY TABLE IF EXISTS temp_store
+ """
+ cursor.execute(stmt)
+
+ # note that the md5 column is not used if self.keep_history == False.
+ stmt = """
+ CREATE TEMPORARY TABLE temp_store (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ prev_tid BIGINT NOT NULL,
+ md5 CHAR(32),
+ state LONGBLOB
+ ) ENGINE MyISAM
+ """
+ cursor.execute(stmt)
+
+ # no store connection initialization needed for Oracle
+ oracle_on_store_opened = None
+
+
+
+
+ def postgresql_store_temp(self, cursor, oid, prev_tid, data):
+ """Store an object in the temporary table."""
+ if self.keep_history:
+ md5sum = compute_md5sum(data)
+ else:
+ md5sum = None
+ stmt = """
+ DELETE FROM temp_store WHERE zoid = %s;
+ INSERT INTO temp_store (zoid, prev_tid, md5, state)
+ VALUES (%s, %s, %s, decode(%s, 'base64'))
+ """
+ cursor.execute(stmt, (oid, oid, prev_tid, md5sum, encodestring(data)))
+
+ def mysql_store_temp(self, cursor, oid, prev_tid, data):
+ """Store an object in the temporary table."""
+ if self.keep_history:
+ md5sum = compute_md5sum(data)
+ else:
+ md5sum = None
+ stmt = """
+ REPLACE INTO temp_store (zoid, prev_tid, md5, state)
+ VALUES (%s, %s, %s, %s)
+ """
+ cursor.execute(stmt, (oid, prev_tid, md5sum, self.Binary(data)))
+
+ def oracle_store_temp(self, cursor, oid, prev_tid, data):
+ """Store an object in the temporary table."""
+ if self.keep_history:
+ md5sum = compute_md5sum(data)
+ else:
+ md5sum = None
+ cursor.execute("DELETE FROM temp_store WHERE zoid = :oid", oid=oid)
+ if len(data) <= 2000:
+ # Send data inline for speed. Oracle docs say maximum size
+ # of a RAW is 2000 bytes. inputsize_BINARY corresponds with RAW.
+ cursor.setinputsizes(rawdata=self.inputsize_BINARY)
+ stmt = """
+ INSERT INTO temp_store (zoid, prev_tid, md5, state)
+ VALUES (:oid, :prev_tid, :md5sum, :rawdata)
+ """
+ cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+ md5sum=md5sum, rawdata=data)
+ else:
+ # Send data as a BLOB
+ cursor.setinputsizes(blobdata=self.inputsize_BLOB)
+ stmt = """
+ INSERT INTO temp_store (zoid, prev_tid, md5, state)
+ VALUES (:oid, :prev_tid, :md5sum, :blobdata)
+ """
+ cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+ md5sum=md5sum, blobdata=data)
+
+
+
+
+ def postgresql_replace_temp(self, cursor, oid, prev_tid, data):
+ """Replace an object in the temporary table.
+
+ This happens after conflict resolution.
+ """
+ if self.keep_history:
+ md5sum = compute_md5sum(data)
+ else:
+ md5sum = None
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = %s,
+ md5 = %s,
+ state = decode(%s, 'base64')
+ WHERE zoid = %s
+ """
+ cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
+
+ def mysql_replace_temp(self, cursor, oid, prev_tid, data):
+ """Replace an object in the temporary table.
+
+ This happens after conflict resolution.
+ """
+ if self.keep_history:
+ md5sum = compute_md5sum(data)
+ else:
+ md5sum = None
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = %s,
+ md5 = %s,
+ state = %s
+ WHERE zoid = %s
+ """
+ cursor.execute(stmt, (prev_tid, md5sum, self.Binary(data), oid))
+
+ def oracle_replace_temp(self, cursor, oid, prev_tid, data):
+ """Replace an object in the temporary table.
+
+ This happens after conflict resolution.
+ """
+ if self.keep_history:
+ md5sum = compute_md5sum(data)
+ else:
+ md5sum = None
+ cursor.setinputsizes(data=self.inputsize_BLOB)
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = :prev_tid,
+ md5 = :md5sum,
+ state = :data
+ WHERE zoid = :oid
+ """
+ cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+ md5sum=md5sum, data=self.Binary(data))
+
+
+
+
+ def postgresql_restore(self, cursor, oid, tid, data):
+ """Store an object directly, without conflict detection.
+
+ Used for copying transactions into this database.
+ """
+ if self.keep_history:
+ md5sum = compute_md5sum(data)
+ else:
+ md5sum = None
+
+ if data is not None:
+ encoded = encodestring(data)
+ else:
+ encoded = None
+
+ if self.keep_history:
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ VALUES (%s, %s,
+ COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
+ %s, decode(%s, 'base64'))
+ """
+ cursor.execute(stmt, (oid, tid, oid, md5sum, encoded))
+ else:
+ stmt = """
+ INSERT INTO object_state (zoid, tid, state)
+ VALUES (%s, %s, decode(%s, 'base64'))
+ """
+ cursor.execute(stmt, (oid, tid, encoded))
+
+ def mysql_restore(self, cursor, oid, tid, data):
+ """Store an object directly, without conflict detection.
+
+ Used for copying transactions into this database.
+ """
+ if self.keep_history:
+ md5sum = compute_md5sum(data)
+ else:
+ md5sum = None
+
+ if data is not None:
+ encoded = self.Binary(data)
+ else:
+ encoded = None
+
+ if self.keep_history:
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ VALUES (%s, %s,
+ COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
+ %s, %s)
+ """
+ cursor.execute(stmt, (oid, tid, oid, md5sum, encoded))
+ else:
+ stmt = """
+ INSERT INTO object_state (zoid, tid, state)
+ VALUES (%s, %s, %s)
+ """
+ cursor.execute(stmt, (oid, tid, encoded))
+
+ def oracle_restore(self, cursor, oid, tid, data):
+ """Store an object directly, without conflict detection.
+
+ Used for copying transactions into this database.
+ """
+ if self.keep_history:
+ md5sum = compute_md5sum(data)
+ else:
+ md5sum = None
+
+ if not data or len(data) <= 2000:
+ # Send data inline for speed. Oracle docs say maximum size
+ # of a RAW is 2000 bytes. inputsize_BINARY corresponds with RAW.
+ cursor.setinputsizes(rawdata=self.inputsize_BINARY)
+ if self.keep_history:
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ VALUES (:oid, :tid,
+ COALESCE(
+ (SELECT tid FROM current_object WHERE zoid = :oid), 0),
+ :md5sum, :rawdata)
+ """
+ cursor.execute(stmt, oid=oid, tid=tid,
+ md5sum=md5sum, rawdata=data)
+ else:
+ stmt = """
+ INSERT INTO object_state (zoid, tid, state)
+ VALUES (:oid, :tid, :rawdata)
+ """
+ cursor.execute(stmt, oid=oid, tid=tid, rawdata=data)
+ else:
+ # Send data as a BLOB
+ cursor.setinputsizes(blobdata=self.inputsize_BLOB)
+ if self.keep_history:
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ VALUES (:oid, :tid,
+ COALESCE(
+ (SELECT tid FROM current_object WHERE zoid = :oid), 0),
+ :md5sum, :blobdata)
+ """
+ cursor.execute(stmt, oid=oid, tid=tid,
+ md5sum=md5sum, blobdata=data)
+ else:
+ stmt = """
+ INSERT INTO object_state (zoid, tid, state)
+ VALUES (:oid, :tid, :blobdata)
+ """
+ cursor.execute(stmt, oid=oid, tid=tid, blobdata=data)
+
+
+
+ def postgresql_detect_conflict(self, cursor):
+ """Find one conflict in the data about to be committed.
+
+ If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+ attempted_data). If there is no conflict, returns None.
+ """
+ if self.keep_history:
+ stmt = """
+ SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+ encode(temp_store.state, 'base64')
+ FROM temp_store
+ JOIN current_object ON (temp_store.zoid = current_object.zoid)
+ WHERE temp_store.prev_tid != current_object.tid
+ LIMIT 1
+ """
+ else:
+ stmt = """
+ SELECT temp_store.zoid, object_state.tid, temp_store.prev_tid,
+ encode(temp_store.state, 'base64')
+ FROM temp_store
+ JOIN object_state ON (temp_store.zoid = object_state.zoid)
+ WHERE temp_store.prev_tid != object_state.tid
+ LIMIT 1
+ """
+ cursor.execute(stmt)
+ if cursor.rowcount:
+ oid, prev_tid, attempted_prev_tid, data = cursor.fetchone()
+ return oid, prev_tid, attempted_prev_tid, decodestring(data)
+ return None
+
+ def mysql_detect_conflict(self, cursor):
+ """Find one conflict in the data about to be committed.
+
+ If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+ attempted_data). If there is no conflict, returns None.
+ """
+ # Lock in share mode to ensure the data being read is up to date.
+ if self.keep_history:
+ stmt = """
+ SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+ temp_store.state
+ FROM temp_store
+ JOIN current_object ON (temp_store.zoid = current_object.zoid)
+ WHERE temp_store.prev_tid != current_object.tid
+ LIMIT 1
+ LOCK IN SHARE MODE
+ """
+ else:
+ stmt = """
+ SELECT temp_store.zoid, object_state.tid, temp_store.prev_tid,
+ temp_store.state
+ FROM temp_store
+ JOIN object_state ON (temp_store.zoid = object_state.zoid)
+ WHERE temp_store.prev_tid != object_state.tid
+ LIMIT 1
+ LOCK IN SHARE MODE
+ """
+ cursor.execute(stmt)
+ if cursor.rowcount:
+ return cursor.fetchone()
+ return None
+
+ def oracle_detect_conflict(self, cursor):
+ """Find one conflict in the data about to be committed.
+
+ If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+ attempted_data). If there is no conflict, returns None.
+ """
+ if self.keep_history:
+ stmt = """
+ SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+ temp_store.state
+ FROM temp_store
+ JOIN current_object ON (temp_store.zoid = current_object.zoid)
+ WHERE temp_store.prev_tid != current_object.tid
+ """
+ else:
+ stmt = """
+ SELECT temp_store.zoid, object_state.tid, temp_store.prev_tid,
+ temp_store.state
+ FROM temp_store
+ JOIN object_state ON (temp_store.zoid = object_state.zoid)
+ WHERE temp_store.prev_tid != object_state.tid
+ """
+ return self.runner.run_lob_stmt(cursor, stmt)
+
+
+
+
+ def generic_move_from_temp(self, cursor, tid):
+ """Moved the temporarily stored objects to permanent storage.
+
+ Returns the list of oids stored.
+ """
+ if self.keep_history:
+ if self.database_name == 'oracle':
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT zoid, :1, prev_tid, md5, state
+ FROM temp_store
+ """
+ else:
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT zoid, %s, prev_tid, md5, state
+ FROM temp_store
+ """
+ cursor.execute(stmt, (tid,))
+
+ else:
+ if self.database_name == 'mysql':
+ stmt = """
+ REPLACE INTO object_state (zoid, tid, state)
+ SELECT zoid, %s, state
+ FROM temp_store
+ """
+ cursor.execute(stmt, (tid,))
+
+ else:
+ stmt = """
+ DELETE FROM object_state
+ WHERE zoid IN (SELECT zoid FROM temp_store)
+ """
+ cursor.execute(stmt)
+
+ stmt = """
+ INSERT INTO object_state (zoid, tid, state)
+ SELECT zoid, %s, state
+ FROM temp_store
+ """
+ cursor.execute(stmt, (tid,))
+
+ stmt = """
+ SELECT zoid FROM temp_store
+ """
+ cursor.execute(stmt)
+ return [oid for (oid,) in cursor]
+
+ postgresql_move_from_temp = generic_move_from_temp
+ mysql_move_from_temp = generic_move_from_temp
+ oracle_move_from_temp = generic_move_from_temp
+
+
+
+
+ def postgresql_update_current(self, cursor, tid):
+ """Update the current object pointers.
+
+ tid is the integer tid of the transaction being committed.
+ """
+ if not self.keep_history:
+ # nothing needs to be updated
+ return
+
+ cursor.execute("""
+ -- Insert objects created in this transaction into current_object.
+ INSERT INTO current_object (zoid, tid)
+ SELECT zoid, tid FROM object_state
+ WHERE tid = %(tid)s
+ AND prev_tid = 0;
+
+ -- Change existing objects. To avoid deadlocks,
+ -- update in OID order.
+ UPDATE current_object SET tid = %(tid)s
+ WHERE zoid IN (
+ SELECT zoid FROM object_state
+ WHERE tid = %(tid)s
+ AND prev_tid != 0
+ ORDER BY zoid
+ )
+ """, {'tid': tid})
+
+ def mysql_update_current(self, cursor, tid):
+ """Update the current object pointers.
+
+ tid is the integer tid of the transaction being committed.
+ """
+ if not self.keep_history:
+ # nothing needs to be updated
+ return
+
+ cursor.execute("""
+ REPLACE INTO current_object (zoid, tid)
+ SELECT zoid, tid FROM object_state
+ WHERE tid = %s
+ """, (tid,))
+
+ def oracle_update_current(self, cursor, tid):
+ """Update the current object pointers.
+
+ tid is the integer tid of the transaction being committed.
+ """
+ if not self.keep_history:
+ # nothing needs to be updated
+ return
+
+ # Insert objects created in this transaction into current_object.
+ stmt = """
+ INSERT INTO current_object (zoid, tid)
+ SELECT zoid, tid FROM object_state
+ WHERE tid = :1
+ AND prev_tid = 0
+ """
+ cursor.execute(stmt, (tid,))
+
+ # Change existing objects.
+ stmt = """
+ UPDATE current_object SET tid = :1
+ WHERE zoid IN (
+ SELECT zoid FROM object_state
+ WHERE tid = :1
+ AND prev_tid != 0
+ )
+ """
+ cursor.execute(stmt, (tid,))
+
Property changes on: relstorage/trunk/relstorage/adapters/mover.py
___________________________________________________________________
Added: svn:mergeinfo
+
Modified: relstorage/trunk/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py 2009-09-24 21:14:15 UTC (rev 104513)
+++ relstorage/trunk/relstorage/adapters/mysql.py 2009-09-24 22:00:06 UTC (rev 104514)
@@ -55,10 +55,9 @@
from relstorage.adapters.connmanager import AbstractConnectionManager
from relstorage.adapters.dbiter import HistoryFreeDatabaseIterator
from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
-from relstorage.adapters.hfmover import HistoryFreeObjectMover
-from relstorage.adapters.hpmover import HistoryPreservingObjectMover
from relstorage.adapters.interfaces import IRelStorageAdapter
from relstorage.adapters.locker import MySQLLocker
+from relstorage.adapters.mover import ObjectMover
from relstorage.adapters.oidallocator import MySQLOIDAllocator
from relstorage.adapters.packundo import HistoryFreePackUndo
from relstorage.adapters.packundo import MySQLHistoryPreservingPackUndo
@@ -87,34 +86,37 @@
self.keep_history = keep_history
self.connmanager = MySQLdbConnectionManager(params)
self.runner = ScriptRunner()
- self.locker = MySQLLocker((MySQLdb.DatabaseError,))
+ self.locker = MySQLLocker(
+ keep_history=self.keep_history,
+ lock_exceptions=(MySQLdb.DatabaseError,),
+ )
self.schema = MySQLSchemaInstaller(
connmanager=self.connmanager,
runner=self.runner,
keep_history=self.keep_history,
)
- if self.keep_history:
- self.mover = HistoryPreservingObjectMover(
- database_name='mysql',
- runner=self.runner,
- Binary=MySQLdb.Binary,
- )
- else:
- self.mover = HistoryFreeObjectMover(
- database_name='mysql',
- runner=self.runner,
- Binary=MySQLdb.Binary,
- )
+ self.mover = ObjectMover(
+ database_name='mysql',
+ keep_history=self.keep_history,
+ Binary=MySQLdb.Binary,
+ )
self.connmanager.set_on_store_opened(self.mover.on_store_opened)
self.oidallocator = MySQLOIDAllocator()
self.txncontrol = MySQLTransactionControl(
+ keep_history=self.keep_history,
Binary=MySQLdb.Binary,
)
+
+ if self.keep_history:
+ poll_query="SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1"
+ else:
+ poll_query="SELECT tid FROM object_state ORDER BY tid DESC LIMIT 1"
self.poller = Poller(
- poll_query="SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1",
+ poll_query=poll_query,
keep_history=self.keep_history,
runner=self.runner,
)
+
if self.keep_history:
self.packundo = MySQLHistoryPreservingPackUndo(
connmanager=self.connmanager,
@@ -133,6 +135,7 @@
self.dbiter = HistoryFreeDatabaseIterator(
runner=self.runner,
)
+
self.stats = MySQLStats(
connmanager=self.connmanager,
)
Modified: relstorage/trunk/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2009-09-24 21:14:15 UTC (rev 104513)
+++ relstorage/trunk/relstorage/adapters/oracle.py 2009-09-24 22:00:06 UTC (rev 104514)
@@ -19,11 +19,13 @@
from zope.interface import implements
from relstorage.adapters.connmanager import AbstractConnectionManager
+from relstorage.adapters.dbiter import HistoryFreeDatabaseIterator
from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
-from relstorage.adapters.hpmover import HistoryPreservingObjectMover
from relstorage.adapters.interfaces import IRelStorageAdapter
from relstorage.adapters.locker import OracleLocker
+from relstorage.adapters.mover import ObjectMover
from relstorage.adapters.oidallocator import OracleOIDAllocator
+from relstorage.adapters.packundo import HistoryFreePackUndo
from relstorage.adapters.packundo import OracleHistoryPreservingPackUndo
from relstorage.adapters.poller import Poller
from relstorage.adapters.schema import OracleSchemaInstaller
@@ -49,10 +51,8 @@
"""Oracle adapter for RelStorage."""
implements(IRelStorageAdapter)
- keep_history = True
-
def __init__(self, user, password, dsn, twophase=False, arraysize=64,
- use_inline_lobs=None):
+ use_inline_lobs=None, keep_history=True):
"""Create an Oracle adapter.
The user, password, and dsn parameters are provided to cx_Oracle
@@ -71,21 +71,28 @@
"""
if use_inline_lobs is None:
use_inline_lobs = (cx_Oracle.version >= '5.0')
+ self.keep_history = keep_history
self.connmanager = CXOracleConnectionManager(
params=(user, password, dsn),
arraysize=arraysize,
- twophase=bool(twophase),
+ twophase=twophase,
)
- self.runner = CXOracleScriptRunner(bool(use_inline_lobs))
- self.locker = OracleLocker((cx_Oracle.DatabaseError,))
+ self.runner = CXOracleScriptRunner(
+ use_inline_lobs=use_inline_lobs,
+ )
+ self.locker = OracleLocker(
+ keep_history=self.keep_history,
+ lock_exceptions=(cx_Oracle.DatabaseError,),
+ )
self.schema = OracleSchemaInstaller(
connmanager=self.connmanager,
runner=self.runner,
keep_history=self.keep_history,
)
- self.mover = HistoryPreservingObjectMover(
+ self.mover = ObjectMover(
database_name='oracle',
+ keep_history=self.keep_history,
runner=self.runner,
Binary=cx_Oracle.Binary,
inputsize_BLOB=cx_Oracle.BLOB,
@@ -96,22 +103,40 @@
connmanager=self.connmanager,
)
self.txncontrol = OracleTransactionControl(
+ keep_history=self.keep_history,
Binary=cx_Oracle.Binary,
- twophase=bool(twophase),
+ twophase=twophase,
)
+
+ if self.keep_history:
+ poll_query="SELECT MAX(tid) FROM transaction"
+ else:
+ poll_query="SELECT MAX(tid) FROM object_state"
self.poller = Poller(
- poll_query="SELECT MAX(tid) FROM transaction",
+ poll_query=poll_query,
keep_history=self.keep_history,
runner=self.runner,
)
- self.packundo = OracleHistoryPreservingPackUndo(
- connmanager=self.connmanager,
- runner=self.runner,
- locker=self.locker,
- )
- self.dbiter = HistoryPreservingDatabaseIterator(
- runner=self.runner,
- )
+
+ if self.keep_history:
+ self.packundo = OracleHistoryPreservingPackUndo(
+ connmanager=self.connmanager,
+ runner=self.runner,
+ locker=self.locker,
+ )
+ self.dbiter = HistoryPreservingDatabaseIterator(
+ runner=self.runner,
+ )
+ else:
+ self.packundo = HistoryFreePackUndo(
+ connmanager=self.connmanager,
+ runner=self.runner,
+ locker=self.locker,
+ )
+ self.dbiter = HistoryFreeDatabaseIterator(
+ runner=self.runner,
+ )
+
self.stats = OracleStats(
connmanager=self.connmanager,
)
Modified: relstorage/trunk/relstorage/adapters/packundo.py
===================================================================
--- relstorage/trunk/relstorage/adapters/packundo.py 2009-09-24 21:14:15 UTC (rev 104513)
+++ relstorage/trunk/relstorage/adapters/packundo.py 2009-09-24 22:00:06 UTC (rev 104514)
@@ -933,8 +933,8 @@
stmt = """
%(TRUNCATE)s pack_object;
- INSERT INTO pack_object (zoid, keep_tid)
- SELECT zoid, tid
+ INSERT INTO pack_object (zoid, keep, keep_tid)
+ SELECT zoid, %(FALSE)s, tid
FROM object_state;
-- Keep the root object
Modified: relstorage/trunk/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2009-09-24 21:14:15 UTC (rev 104513)
+++ relstorage/trunk/relstorage/adapters/postgresql.py 2009-09-24 22:00:06 UTC (rev 104514)
@@ -19,11 +19,13 @@
from zope.interface import implements
from relstorage.adapters.connmanager import AbstractConnectionManager
+from relstorage.adapters.dbiter import HistoryFreeDatabaseIterator
from relstorage.adapters.dbiter import HistoryPreservingDatabaseIterator
-from relstorage.adapters.hpmover import HistoryPreservingObjectMover
from relstorage.adapters.interfaces import IRelStorageAdapter
from relstorage.adapters.locker import PostgreSQLLocker
+from relstorage.adapters.mover import ObjectMover
from relstorage.adapters.oidallocator import PostgreSQLOIDAllocator
+from relstorage.adapters.packundo import HistoryFreePackUndo
from relstorage.adapters.packundo import HistoryPreservingPackUndo
from relstorage.adapters.poller import Poller
from relstorage.adapters.schema import PostgreSQLSchemaInstaller
@@ -48,38 +50,59 @@
"""PostgreSQL adapter for RelStorage."""
implements(IRelStorageAdapter)
- keep_history = True
-
- def __init__(self, dsn=''):
- self.connmanager = Psycopg2ConnectionManager(dsn)
+ def __init__(self, dsn='', keep_history=True):
+ self.keep_history = keep_history
+ self.connmanager = Psycopg2ConnectionManager(
+ dsn=dsn,
+ keep_history=self.keep_history,
+ )
self.runner = ScriptRunner()
- self.locker = PostgreSQLLocker((psycopg2.DatabaseError,))
+ self.locker = PostgreSQLLocker(
+ keep_history=self.keep_history,
+ lock_exceptions=(psycopg2.DatabaseError,),
+ )
self.schema = PostgreSQLSchemaInstaller(
connmanager=self.connmanager,
runner=self.runner,
locker=self.locker,
keep_history=self.keep_history,
)
- self.mover = HistoryPreservingObjectMover(
+ self.mover = ObjectMover(
database_name='postgresql',
+ keep_history=self.keep_history,
runner=self.runner,
)
self.connmanager.set_on_store_opened(self.mover.on_store_opened)
self.oidallocator = PostgreSQLOIDAllocator()
- self.txncontrol = PostgreSQLTransactionControl()
+ self.txncontrol = PostgreSQLTransactionControl(
+ keep_history=self.keep_history,
+ )
+
self.poller = Poller(
poll_query="EXECUTE get_latest_tid",
keep_history=self.keep_history,
runner=self.runner,
)
- self.packundo = HistoryPreservingPackUndo(
- connmanager=self.connmanager,
- runner=self.runner,
- locker=self.locker,
- )
- self.dbiter = HistoryPreservingDatabaseIterator(
- runner=self.runner,
- )
+
+ if self.keep_history:
+ self.packundo = HistoryPreservingPackUndo(
+ connmanager=self.connmanager,
+ runner=self.runner,
+ locker=self.locker,
+ )
+ self.dbiter = HistoryPreservingDatabaseIterator(
+ runner=self.runner,
+ )
+ else:
+ self.packundo = HistoryFreePackUndo(
+ connmanager=self.connmanager,
+ runner=self.runner,
+ locker=self.locker,
+ )
+ self.dbiter = HistoryFreeDatabaseIterator(
+ runner=self.runner,
+ )
+
self.stats = PostgreSQLStats(
connmanager=self.connmanager,
)
@@ -95,8 +118,9 @@
disconnected_exceptions = disconnected_exceptions
close_exceptions = close_exceptions
- def __init__(self, dsn):
+ def __init__(self, dsn, keep_history):
self._dsn = dsn
+ self.keep_history = keep_history
def open(self,
isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED):
@@ -117,13 +141,22 @@
Returns (conn, cursor).
"""
conn, cursor = self.open(self.isolation_serializable)
- stmt = """
- PREPARE get_latest_tid AS
- SELECT tid
- FROM transaction
- ORDER BY tid DESC
- LIMIT 1
- """
+ if self.keep_history:
+ stmt = """
+ PREPARE get_latest_tid AS
+ SELECT tid
+ FROM transaction
+ ORDER BY tid DESC
+ LIMIT 1
+ """
+ else:
+ stmt = """
+ PREPARE get_latest_tid AS
+ SELECT tid
+ FROM object_state
+ ORDER BY tid DESC
+ LIMIT 1
+ """
cursor.execute(stmt)
return conn, cursor
Modified: relstorage/trunk/relstorage/adapters/schema.py
===================================================================
--- relstorage/trunk/relstorage/adapters/schema.py 2009-09-24 21:14:15 UTC (rev 104513)
+++ relstorage/trunk/relstorage/adapters/schema.py 2009-09-24 22:00:06 UTC (rev 104514)
@@ -338,11 +338,180 @@
CREATE SEQUENCE zoid_seq;
"""
-history_free_schema = "TODO"
+history_free_schema = """
-history_free_init = "TODO"
+# commit_lock: Held during commit. Another kind of lock is used for MySQL.
+ postgresql:
+ CREATE TABLE commit_lock ();
+ oracle:
+ CREATE TABLE commit_lock (dummy CHAR);
+
+# pack_lock: Held during pack. Another kind of lock is used for MySQL.
+# Another kind of lock is used for PostgreSQL >= 8.2.
+
+ oracle:
+ CREATE TABLE pack_lock (dummy CHAR);
+
+# OID allocation
+
+ postgresql:
+ CREATE SEQUENCE zoid_seq;
+
+ mysql:
+ CREATE TABLE new_oid (
+ zoid BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT
+ ) ENGINE = InnoDB;
+
+ oracle:
+ CREATE SEQUENCE zoid_seq;
+
+# object_state: All object states in all transactions.
+
+ postgresql:
+ CREATE TABLE object_state (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ tid BIGINT NOT NULL CHECK (tid > 0),
+ state BYTEA
+ );
+ CREATE INDEX object_state_tid ON object_state (tid);
+
+ mysql:
+ CREATE TABLE object_state (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ tid BIGINT NOT NULL,
+ state LONGBLOB,
+ CHECK (tid > 0)
+ ) ENGINE = InnoDB;
+ CREATE INDEX object_state_tid ON object_state (tid);
+
+ oracle:
+ CREATE TABLE object_state (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ tid NUMBER(20) NOT NULL,
+ state BLOB
+ );
+ CREATE INDEX object_state_tid ON object_state (tid);
+
+# object_ref: A list of referenced OIDs from each object_state. This
+# table is populated as needed during packing.
+
+ postgresql:
+ CREATE TABLE object_ref (
+ zoid BIGINT NOT NULL,
+ to_zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL,
+ PRIMARY KEY (zoid, to_zoid)
+ );
+
+ mysql:
+ CREATE TABLE object_ref (
+ zoid BIGINT NOT NULL,
+ to_zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL,
+ PRIMARY KEY (zoid, to_zoid)
+ ) ENGINE = MyISAM;
+
+ oracle:
+ CREATE TABLE object_ref (
+ zoid NUMBER(20) NOT NULL,
+ to_zoid NUMBER(20) NOT NULL,
+ tid NUMBER(20) NOT NULL,
+ PRIMARY KEY (zoid, to_zoid)
+ );
+
+# The object_refs_added table tracks whether object_refs has been
+# populated for all states in a given transaction. An entry is added
+# only when the work is finished.
+
+ postgresql:
+ CREATE TABLE object_refs_added (
+ tid BIGINT NOT NULL PRIMARY KEY
+ );
+
+ mysql:
+ CREATE TABLE object_refs_added (
+ tid BIGINT NOT NULL PRIMARY KEY
+ ) ENGINE = MyISAM;
+
+ oracle:
+ CREATE TABLE object_refs_added (
+ tid NUMBER(20) NOT NULL PRIMARY KEY
+ );
+
+# pack_object contains temporary state during garbage collection: The
+# list of all objects, a flag signifying whether the object should be
+# kept, and a flag signifying whether the object's references have been
+# visited. The keep_tid field specifies the current revision of the
+# object.
+
+ postgresql:
+ CREATE TABLE pack_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ keep BOOLEAN NOT NULL,
+ keep_tid BIGINT NOT NULL,
+ visited BOOLEAN NOT NULL DEFAULT FALSE
+ );
+ CREATE INDEX pack_object_keep_false ON pack_object (zoid)
+ WHERE keep = false;
+ CREATE INDEX pack_object_keep_true ON pack_object (visited)
+ WHERE keep = true;
+
+ mysql:
+ CREATE TABLE pack_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ keep BOOLEAN NOT NULL,
+ keep_tid BIGINT NOT NULL,
+ visited BOOLEAN NOT NULL DEFAULT FALSE
+ ) ENGINE = MyISAM;
+ CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+
+ oracle:
+ CREATE TABLE pack_object (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ keep CHAR NOT NULL CHECK (keep IN ('N', 'Y')),
+ keep_tid NUMBER(20) NOT NULL,
+ visited CHAR DEFAULT 'N' NOT NULL CHECK (visited IN ('N', 'Y'))
+ );
+ CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+
+# Oracle expects temporary tables to be created ahead of time, while
+# MySQL and PostgreSQL expect them to be created in the session.
+# Note that the md5 column is not used in a history-free storage.
+
+ oracle:
+ # States that will soon be stored
+ CREATE GLOBAL TEMPORARY TABLE temp_store (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ prev_tid NUMBER(20) NOT NULL,
+ md5 CHAR(32),
+ state BLOB
+ ) ON COMMIT DELETE ROWS;
+
+ # Temporary state during packing: a list of objects
+ # whose references need to be examined.
+ CREATE GLOBAL TEMPORARY TABLE temp_pack_visit (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ keep_tid NUMBER(20)
+ );
+"""
+
+history_free_init = """
+# Reset the OID counter.
+
+ postgresql:
+ ALTER SEQUENCE zoid_seq RESTART WITH 1;
+
+ mysql:
+ TRUNCATE new_oid;
+
+ oracle:
+ DROP SEQUENCE zoid_seq;
+ CREATE SEQUENCE zoid_seq;
+"""
+
+
def filter_script(script, database_name):
res = []
match = False
@@ -360,7 +529,7 @@
class AbstractSchemaInstaller(object):
- # Keep this list in the same order as the schema script
+ # Keep this list in the same order as the schema scripts
all_tables = (
'commit_lock',
'pack_lock',
Modified: relstorage/trunk/relstorage/adapters/txncontrol.py
===================================================================
--- relstorage/trunk/relstorage/adapters/txncontrol.py 2009-09-24 21:14:15 UTC (rev 104513)
+++ relstorage/trunk/relstorage/adapters/txncontrol.py 2009-09-24 22:00:06 UTC (rev 104514)
@@ -52,38 +52,53 @@
class PostgreSQLTransactionControl(TransactionControl):
implements(ITransactionControl)
+ def __init__(self, keep_history):
+ self.keep_history = keep_history
+
def get_tid_and_time(self, cursor):
"""Returns the most recent tid and the current database time.
The database time is the number of seconds since the epoch.
"""
- cursor.execute("""
- SELECT tid, EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
- FROM transaction
- ORDER BY tid DESC
- LIMIT 1
- """)
+ if self.keep_history:
+ stmt = """
+ SELECT tid, EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
+ FROM transaction
+ ORDER BY tid DESC
+ LIMIT 1
+ """
+ else:
+ stmt = """
+ SELECT tid, EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
+ FROM object_state
+ ORDER BY tid DESC
+ LIMIT 1
+ """
+ cursor.execute(stmt)
assert cursor.rowcount == 1
return cursor.fetchone()
def add_transaction(self, cursor, tid, username, description, extension,
packed=False):
"""Add a transaction."""
- stmt = """
- INSERT INTO transaction
- (tid, packed, username, description, extension)
- VALUES (%s, %s,
- decode(%s, 'base64'), decode(%s, 'base64'), decode(%s, 'base64'))
- """
- cursor.execute(stmt, (tid, packed,
- encodestring(username), encodestring(description),
- encodestring(extension)))
+ if self.keep_history:
+ stmt = """
+ INSERT INTO transaction
+ (tid, packed, username, description, extension)
+ VALUES (%s, %s,
+ decode(%s, 'base64'), decode(%s, 'base64'),
+ decode(%s, 'base64'))
+ """
+ cursor.execute(stmt, (tid, packed,
+ encodestring(username), encodestring(description),
+ encodestring(extension)))
class MySQLTransactionControl(TransactionControl):
implements(ITransactionControl)
- def __init__(self, Binary):
+ def __init__(self, keep_history, Binary):
+ self.keep_history = keep_history
self.Binary = Binary
def get_tid_and_time(self, cursor):
@@ -92,13 +107,23 @@
The database time is the number of seconds since the epoch.
"""
# Lock in share mode to ensure the data being read is up to date.
- cursor.execute("""
- SELECT tid, UNIX_TIMESTAMP()
- FROM transaction
- ORDER BY tid DESC
- LIMIT 1
- LOCK IN SHARE MODE
- """)
+ if self.keep_history:
+ stmt = """
+ SELECT tid, UNIX_TIMESTAMP()
+ FROM transaction
+ ORDER BY tid DESC
+ LIMIT 1
+ LOCK IN SHARE MODE
+ """
+ else:
+ stmt = """
+ SELECT tid, UNIX_TIMESTAMP()
+ FROM object_state
+ ORDER BY tid DESC
+ LIMIT 1
+ LOCK IN SHARE MODE
+ """
+ cursor.execute(stmt)
assert cursor.rowcount == 1
tid, timestamp = cursor.fetchone()
# MySQL does not provide timestamps with more than one second
@@ -112,20 +137,22 @@
def add_transaction(self, cursor, tid, username, description, extension,
packed=False):
"""Add a transaction."""
- stmt = """
- INSERT INTO transaction
- (tid, packed, username, description, extension)
- VALUES (%s, %s, %s, %s, %s)
- """
- cursor.execute(stmt, (
- tid, packed, self.Binary(username),
- self.Binary(description), self.Binary(extension)))
+ if self.keep_history:
+ stmt = """
+ INSERT INTO transaction
+ (tid, packed, username, description, extension)
+ VALUES (%s, %s, %s, %s, %s)
+ """
+ cursor.execute(stmt, (
+ tid, packed, self.Binary(username),
+ self.Binary(description), self.Binary(extension)))
class OracleTransactionControl(TransactionControl):
implements(ITransactionControl)
- def __init__(self, Binary, twophase):
+ def __init__(self, keep_history, Binary, twophase):
+ self.keep_history = keep_history
self.Binary = Binary
self.twophase = twophase
@@ -155,28 +182,39 @@
The database time is the number of seconds since the epoch.
"""
- cursor.execute("""
- SELECT MAX(tid), TO_CHAR(TO_DSINTERVAL(SYSTIMESTAMP - TO_TIMESTAMP_TZ(
- '1970-01-01 00:00:00 +00:00','YYYY-MM-DD HH24:MI:SS TZH:TZM')))
- FROM transaction
- """)
+ if self.keep_history:
+ stmt = """
+ SELECT MAX(tid),
+ TO_CHAR(TO_DSINTERVAL(SYSTIMESTAMP - TO_TIMESTAMP_TZ(
+ '1970-01-01 00:00:00 +00:00','YYYY-MM-DD HH24:MI:SS TZH:TZM')))
+ FROM transaction
+ """
+ else:
+ stmt = """
+ SELECT MAX(tid),
+ TO_CHAR(TO_DSINTERVAL(SYSTIMESTAMP - TO_TIMESTAMP_TZ(
+ '1970-01-01 00:00:00 +00:00','YYYY-MM-DD HH24:MI:SS TZH:TZM')))
+ FROM object_state
+ """
+ cursor.execute(stmt)
tid, now = cursor.fetchone()
return tid, self._parse_dsinterval(now)
def add_transaction(self, cursor, tid, username, description, extension,
packed=False):
"""Add a transaction."""
- stmt = """
- INSERT INTO transaction
- (tid, packed, username, description, extension)
- VALUES (:1, :2, :3, :4, :5)
- """
- max_desc_len = 2000
- if len(description) > max_desc_len:
- log.warning('Trimming description of transaction %s '
- 'to %d characters', tid, max_desc_len)
- description = description[:max_desc_len]
- cursor.execute(stmt, (
- tid, packed and 'Y' or 'N', self.Binary(username),
- self.Binary(description), self.Binary(extension)))
+ if self.keep_history:
+ stmt = """
+ INSERT INTO transaction
+ (tid, packed, username, description, extension)
+ VALUES (:1, :2, :3, :4, :5)
+ """
+ max_desc_len = 2000
+ if len(description) > max_desc_len:
+ log.warning('Trimming description of transaction %s '
+ 'to %d characters', tid, max_desc_len)
+ description = description[:max_desc_len]
+ cursor.execute(stmt, (
+ tid, packed and 'Y' or 'N', self.Binary(username),
+ self.Binary(description), self.Binary(extension)))
More information about the checkins
mailing list