[Checkins] SVN: relstorage/branches/packless/ Created a packless PostgreSQL adapter in a branch.
Shane Hathaway
shane at hathawaymix.org
Thu May 7 05:46:44 EDT 2009
Log message for revision 99796:
Created a packless PostgreSQL adapter in a branch.
Changed:
A relstorage/branches/packless/
U relstorage/branches/packless/relstorage/adapters/common.py
U relstorage/branches/packless/relstorage/adapters/mysql.py
U relstorage/branches/packless/relstorage/adapters/oracle.py
A relstorage/branches/packless/relstorage/adapters/packless/
A relstorage/branches/packless/relstorage/adapters/packless/__init__.py
A relstorage/branches/packless/relstorage/adapters/packless/common.py
A relstorage/branches/packless/relstorage/adapters/packless/postgresql.py
U relstorage/branches/packless/relstorage/adapters/postgresql.py
U relstorage/branches/packless/relstorage/relstorage.py
U relstorage/branches/packless/relstorage/tests/alltests.py
U relstorage/branches/packless/relstorage/tests/comparison.ods
A relstorage/branches/packless/relstorage/tests/packlesstestbase.py
U relstorage/branches/packless/relstorage/tests/reltestbase.py
A relstorage/branches/packless/relstorage/tests/testpacklesspostgresql.py
-=-
Modified: relstorage/branches/packless/relstorage/adapters/common.py
===================================================================
--- relstorage/trunk/relstorage/adapters/common.py 2009-05-06 18:37:42 UTC (rev 99794)
+++ relstorage/branches/packless/relstorage/adapters/common.py 2009-05-07 09:46:43 UTC (rev 99796)
@@ -18,6 +18,12 @@
import logging
import time
+try:
+ from hashlib import md5
+except ImportError:
+ from md5 import new as md5
+
+
log = logging.getLogger("relstorage.adapters.common")
verify_sane_database = False
@@ -929,18 +935,30 @@
return None, new_polled_tid
# Get the list of changed OIDs and return it.
- stmt = """
- SELECT zoid
- FROM current_object
- WHERE tid > %(tid)s
- """
if ignore_tid is None:
+ stmt = """
+ SELECT zoid
+ FROM current_object
+ WHERE tid > %(tid)s
+ """
cursor.execute(intern(stmt % self._script_vars),
{'tid': prev_polled_tid})
else:
- stmt += " AND tid != %(self_tid)s"
+ stmt = """
+ SELECT zoid
+ FROM current_object
+ WHERE tid > %(tid)s
+ AND tid != %(self_tid)s
+ """
cursor.execute(intern(stmt % self._script_vars),
{'tid': prev_polled_tid, 'self_tid': ignore_tid})
oids = [oid for (oid,) in cursor]
return oids, new_polled_tid
+
+ def md5sum(self, data):
+ if data is not None:
+ return md5(data).hexdigest()
+ else:
+ # George Bailey object
+ return None
Modified: relstorage/branches/packless/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py 2009-05-06 18:37:42 UTC (rev 99794)
+++ relstorage/branches/packless/relstorage/adapters/mysql.py 2009-05-07 09:46:43 UTC (rev 99796)
@@ -467,16 +467,18 @@
except disconnected_exceptions, e:
raise StorageError(e)
- def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+ def store_temp(self, cursor, oid, prev_tid, data):
"""Store an object in the temporary table."""
+ md5sum = self.md5sum(data)
stmt = """
REPLACE INTO temp_store (zoid, prev_tid, md5, state)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(stmt, (oid, prev_tid, md5sum, MySQLdb.Binary(data)))
- def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+ def replace_temp(self, cursor, oid, prev_tid, data):
"""Replace an object in the temporary table."""
+ md5sum = self.md5sum(data)
stmt = """
UPDATE temp_store SET
prev_tid = %s,
@@ -486,11 +488,12 @@
"""
cursor.execute(stmt, (prev_tid, md5sum, MySQLdb.Binary(data), oid))
- def restore(self, cursor, oid, tid, md5sum, data):
+ def restore(self, cursor, oid, tid, data):
"""Store an object directly, without conflict detection.
Used for copying transactions into this database.
"""
+ md5sum = self.md5sum(data)
stmt = """
INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
VALUES (%s, %s,
Modified: relstorage/branches/packless/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2009-05-06 18:37:42 UTC (rev 99794)
+++ relstorage/branches/packless/relstorage/adapters/oracle.py 2009-05-07 09:46:43 UTC (rev 99796)
@@ -538,8 +538,9 @@
except disconnected_exceptions, e:
raise StorageError(e)
- def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+ def store_temp(self, cursor, oid, prev_tid, data):
"""Store an object in the temporary table."""
+ md5sum = self.md5sum(data)
cursor.execute("DELETE FROM temp_store WHERE zoid = :oid", oid=oid)
if len(data) <= 2000:
# Send data inline for speed. Oracle docs say maximum size
@@ -561,8 +562,9 @@
cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
md5sum=md5sum, blobdata=data)
- def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+ def replace_temp(self, cursor, oid, prev_tid, data):
"""Replace an object in the temporary table."""
+ md5sum = self.md5sum(data)
cursor.setinputsizes(data=cx_Oracle.BLOB)
stmt = """
UPDATE temp_store SET
@@ -574,11 +576,12 @@
cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
md5sum=md5sum, data=cx_Oracle.Binary(data))
- def restore(self, cursor, oid, tid, md5sum, data):
+ def restore(self, cursor, oid, tid, data):
"""Store an object directly, without conflict detection.
Used for copying transactions into this database.
"""
+ md5sum = self.md5sum(data)
cursor.setinputsizes(data=cx_Oracle.BLOB)
stmt = """
INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
Added: relstorage/branches/packless/relstorage/adapters/packless/__init__.py
===================================================================
--- relstorage/branches/packless/relstorage/adapters/packless/__init__.py (rev 0)
+++ relstorage/branches/packless/relstorage/adapters/packless/__init__.py 2009-05-07 09:46:43 UTC (rev 99796)
@@ -0,0 +1 @@
+
Copied: relstorage/branches/packless/relstorage/adapters/packless/common.py (from rev 99794, relstorage/trunk/relstorage/adapters/common.py)
===================================================================
--- relstorage/branches/packless/relstorage/adapters/packless/common.py (rev 0)
+++ relstorage/branches/packless/relstorage/adapters/packless/common.py 2009-05-07 09:46:43 UTC (rev 99796)
@@ -0,0 +1,563 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Code common to most adapters."""
+
+from ZODB.POSException import UndoError
+
+import logging
+import time
+
+log = logging.getLogger("relstorage.adapters.packless.common")
+
+verify_sane_database = False
+
+
+# Notes about adapters:
+#
+# An adapter must not hold a connection, cursor, or database state, because
+# RelStorage opens multiple concurrent connections using a single adapter
+# instance.
+# Within the context of an adapter, all OID and TID values are integers,
+# not binary strings, except as noted.
+
+class PacklessAdapter(object):
+ """Common code for a packless database adapter.
+
+ This is an abstract class; a lot of methods are expected to be
+ provided by subclasses.
+ """
+
+ # _script_vars contains replacements for statements in scripts.
+ # These are correct for PostgreSQL and MySQL but not for Oracle.
+ _script_vars = {
+ 'TRUE': 'TRUE',
+ 'FALSE': 'FALSE',
+ 'OCTET_LENGTH': 'OCTET_LENGTH',
+ 'TRUNCATE': 'TRUNCATE',
+ 'oid': '%(oid)s',
+ 'tid': '%(tid)s',
+ 'self_tid': '%(self_tid)s',
+ 'min_tid': '%(min_tid)s',
+ 'max_tid': '%(max_tid)s',
+ }
+
+ _scripts = {
+ 'create_temp_gc_visit': """
+ CREATE TEMPORARY TABLE temp_gc_visit (
+ zoid BIGINT NOT NULL
+ );
+ CREATE UNIQUE INDEX temp_gc_visit_zoid ON temp_gc_visit (zoid)
+ """,
+
+ 'pre_gc_follow_child_refs': """
+ UPDATE gc_object SET keep = %(TRUE)s
+ WHERE keep = %(FALSE)s
+ AND zoid IN (
+ SELECT DISTINCT to_zoid
+ FROM object_ref
+ JOIN temp_gc_visit USING (zoid)
+ )
+ """,
+
+ 'gc': """
+ DELETE FROM object_state
+ WHERE zoid IN (
+ SELECT zoid
+ FROM gc_object
+ WHERE keep = %(FALSE)s
+ );
+
+ DELETE FROM object_refs_added
+ WHERE tid NOT IN (
+ SELECT DISTINCT tid
+ FROM object_state
+ );
+
+ DELETE FROM object_ref
+ WHERE zoid IN (
+ SELECT zoid
+ FROM gc_object
+ WHERE keep = %(FALSE)s
+ );
+
+ %(TRUNCATE)s gc_object
+ """,
+ }
+
+ def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
+ """Execute a statement from a script with the given parameters.
+
+ Subclasses may override this.
+ The input statement is generic and needs to be transformed
+ into a database-specific statement.
+ """
+ stmt = generic_stmt % self._script_vars
+ try:
+ cursor.execute(stmt, generic_params)
+ except:
+ log.warning("script statement failed: %r; parameters: %r",
+ stmt, generic_params)
+ raise
+
+
+ def _run_script(self, cursor, script, params=()):
+ """Execute a series of statements in the database.
+
+ The statements are transformed by _run_script_stmt
+ before execution.
+ """
+ lines = []
+ for line in script.split('\n'):
+ line = line.strip()
+ if not line or line.startswith('--'):
+ continue
+ if line.endswith(';'):
+ line = line[:-1]
+ lines.append(line)
+ stmt = '\n'.join(lines)
+ self._run_script_stmt(cursor, stmt, params)
+ lines = []
+ else:
+ lines.append(line)
+ if lines:
+ stmt = '\n'.join(lines)
+ self._run_script_stmt(cursor, stmt, params)
+
+ def _open_and_call(self, callback):
+ """Call a function with an open connection and cursor.
+
+ If the function returns, commits the transaction and returns the
+ result returned by the function.
+ If the function raises an exception, aborts the transaction
+ then propagates the exception.
+ """
+ conn, cursor = self.open()
+ try:
+ try:
+ res = callback(conn, cursor)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ return res
+ finally:
+ self.close(conn, cursor)
+
+
+ def iter_transactions(self, cursor):
+ """Iterate over the transaction log, newest first.
+
+ Skips packed transactions.
+ Yields (tid, username, description, extension) for each transaction.
+ """
+ stmt = """
+ SELECT DISTINCT tid
+ FROM object_state
+ ORDER BY tid DESC
+ """
+ self._run_script_stmt(cursor, stmt)
+ return ((tid, '', '', '') for (tid,) in cursor)
+
+
+ def iter_transactions_range(self, cursor, start=None, stop=None):
+ """Iterate over the transactions in the given range, oldest first.
+
+ Includes packed transactions.
+ Yields (tid, username, description, extension, packed)
+ for each transaction.
+ """
+ stmt = """
+ SELECT DISTINCT tid
+ FROM object_state
+ WHERE tid > 0
+ """
+ if start is not None:
+ stmt += " AND tid >= %(min_tid)s"
+ if stop is not None:
+ stmt += " AND tid <= %(max_tid)s"
+ stmt += " ORDER BY tid"
+ self._run_script_stmt(cursor, stmt,
+ {'min_tid': start, 'max_tid': stop})
+ return ((tid, '', '', '', True) for (tid,) in cursor)
+
+
+ def iter_object_history(self, cursor, oid):
+ """Iterate over an object's history.
+
+ Raises KeyError if the object does not exist.
+ Yields (tid, username, description, extension, pickle_size)
+ for each modification.
+ """
+ stmt = """
+ SELECT tid, %(OCTET_LENGTH)s(state)
+ FROM object_state
+ WHERE zoid = %(oid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'oid': oid})
+ return ((tid, '', '', '', size) for (tid, size) in cursor)
+
+
+ def iter_objects(self, cursor, tid):
+ """Iterate over object states in a transaction.
+
+ Yields (oid, prev_tid, state) for each object state.
+ """
+ stmt = """
+ SELECT zoid, state
+ FROM object_state
+ WHERE tid = %(tid)s
+ ORDER BY zoid
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+ for oid, state in cursor:
+ if hasattr(state, 'read'):
+ # Oracle
+ state = state.read()
+ yield oid, state
+
+
+ def verify_undoable(self, cursor, undo_tid):
+ """Raise UndoError if it is not safe to undo the specified txn."""
+ raise UndoError("Undo is not supported by this storage")
+
+
+ def undo(self, cursor, undo_tid, self_tid):
+ """Undo a transaction.
+
+ Parameters: "undo_tid", the integer tid of the transaction to undo,
+ and "self_tid", the integer tid of the current transaction.
+
+ Returns the list of OIDs undone.
+ """
+ raise UndoError("Undo is not supported by this storage")
+
+
+ def choose_pack_transaction(self, pack_point):
+ """Return the transaction before or at the specified pack time.
+
+ Returns None if there is nothing to pack.
+ """
+ return 1
+
+
+ def open_for_pre_pack(self):
+ """Open a connection to be used for the pre-pack phase.
+ Returns (conn, cursor).
+
+ Subclasses may override this.
+ """
+ return self.open()
+
+
+ def pre_pack(self, pack_tid, get_references, options):
+ """Decide what the garbage collector should delete.
+
+ pack_tid is ignored.
+
+ get_references is a function that accepts a pickled state and
+ returns a set of OIDs that state refers to.
+
+ options is an instance of relstorage.Options.
+ The options.pack_gc flag indicates whether to run garbage collection.
+ If pack_gc is false, this method does nothing.
+ """
+ if not options.pack_gc:
+ log.warning("pre_gc: garbage collection is disabled")
+ return
+
+ conn, cursor = self.open_for_pre_pack()
+ try:
+ try:
+ self._pre_gc(conn, cursor, get_references)
+ conn.commit()
+
+ stmt = """
+ SELECT COUNT(1)
+ FROM gc_object
+ WHERE keep = %(FALSE)s
+ """
+ self._run_script_stmt(cursor, stmt)
+ to_remove = cursor.fetchone()[0]
+
+ log.info("pre_gc: will remove %d object(s)",
+ to_remove)
+
+ except:
+ log.exception("pre_gc: failed")
+ conn.rollback()
+ raise
+ else:
+ log.info("pre_gc: finished successfully")
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def _pre_gc(self, conn, cursor, get_references):
+ """Determine what to garbage collect.
+ """
+ stmt = self._scripts['create_temp_gc_visit']
+ if stmt:
+ self._run_script(cursor, stmt)
+
+ self.fill_object_refs(conn, cursor, get_references)
+
+ log.info("pre_gc: filling the gc_object table")
+ # Fill the gc_object table with all known OIDs.
+ stmt = """
+ %(TRUNCATE)s gc_object;
+
+ INSERT INTO gc_object (zoid)
+ SELECT zoid
+ FROM object_state;
+
+ -- Keep the root object
+ UPDATE gc_object SET keep = %(TRUE)s
+ WHERE zoid = 0;
+ """
+ self._run_script(cursor, stmt)
+
+ # Each of the objects to be kept might
+ # refer to other objects. If some of those references
+ # include objects currently set to be removed, mark
+ # the referenced objects to be kept as well. Do this
+ # repeatedly until all references have been satisfied.
+ pass_num = 1
+ while True:
+ log.info("pre_gc: following references, pass %d", pass_num)
+
+ # Make a list of all parent objects that still need
+ # to be visited. Then set gc_object.visited for all gc_object
+ # rows with keep = true.
+ stmt = """
+ %(TRUNCATE)s temp_gc_visit;
+
+ INSERT INTO temp_gc_visit (zoid)
+ SELECT zoid
+ FROM gc_object
+ WHERE keep = %(TRUE)s
+ AND visited = %(FALSE)s;
+
+ UPDATE gc_object SET visited = %(TRUE)s
+ WHERE keep = %(TRUE)s
+ AND visited = %(FALSE)s
+ """
+ self._run_script(cursor, stmt)
+ visit_count = cursor.rowcount
+
+ if verify_sane_database:
+ # Verify the update actually worked.
+ # MySQL 5.1.23 fails this test; 5.1.24 passes.
+ stmt = """
+ SELECT 1
+ FROM gc_object
+ WHERE keep = %(TRUE)s AND visited = %(FALSE)s
+ """
+ self._run_script_stmt(cursor, stmt)
+ if list(cursor):
+ raise AssertionError(
+ "database failed to update gc_object")
+
+ log.debug("pre_gc: checking references from %d object(s)",
+ visit_count)
+
+ # Visit the children of all parent objects that were
+ # just visited.
+ stmt = self._scripts['pre_gc_follow_child_refs']
+ self._run_script(cursor, stmt)
+ found_count = cursor.rowcount
+
+ log.debug("pre_gc: found %d more referenced object(s) in "
+ "pass %d", found_count, pass_num)
+ if not found_count:
+ # No new references detected.
+ break
+ else:
+ pass_num += 1
+
+
+ def _add_object_ref_rows(self, cursor, add_rows):
+ """Add rows to object_ref.
+
+ The input rows are tuples containing (from_zoid, to_zoid).
+
+ Subclasses can override this.
+ """
+ stmt = """
+ INSERT INTO object_ref (zoid, to_zoid)
+ VALUES (%s, %s)
+ """
+ cursor.executemany(stmt, add_rows)
+
+
+ def _add_refs_for_tid(self, cursor, tid, get_references):
+ """Fill object_refs with all states for a transaction.
+
+ Returns the number of references added.
+ """
+ log.debug("pre_gc: transaction %d: computing references ", tid)
+ from_count = 0
+
+ stmt = """
+ SELECT zoid, state
+ FROM object_state
+ WHERE tid = %(tid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+
+ add_rows = [] # [(from_oid, to_oid)]
+ for from_oid, state in cursor:
+ if hasattr(state, 'read'):
+ # Oracle
+ state = state.read()
+ if state:
+ from_count += 1
+ to_oids = get_references(str(state))
+ for to_oid in to_oids:
+ add_rows.append((from_oid, to_oid))
+
+ if add_rows:
+ stmt = """
+ DELETE FROM object_ref
+ WHERE zoid in (
+ SELECT zoid
+ FROM object_state
+ WHERE tid = %(tid)s
+ )
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+ self._add_object_ref_rows(cursor, add_rows)
+
+ # The references have been computed for this transaction.
+ stmt = """
+ INSERT INTO object_refs_added (tid)
+ VALUES (%(tid)s)
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+
+ to_count = len(add_rows)
+ log.debug("pre_gc: transaction %d: has %d reference(s) "
+ "from %d object(s)", tid, to_count, from_count)
+ return to_count
+
+
+ def fill_object_refs(self, conn, cursor, get_references):
+ """Update the object_refs table by analyzing new transactions."""
+ stmt = """
+ SELECT transaction.tid
+ FROM (SELECT DISTINCT tid FROM object_state) AS transaction
+ LEFT JOIN object_refs_added
+ ON (transaction.tid = object_refs_added.tid)
+ WHERE object_refs_added.tid IS NULL
+ ORDER BY transaction.tid
+ """
+ cursor.execute(stmt)
+ tids = [tid for (tid,) in cursor]
+ if tids:
+ added = 0
+ log.info("discovering references from objects in %d "
+ "transaction(s)" % len(tids))
+ for tid in tids:
+ added += self._add_refs_for_tid(cursor, tid, get_references)
+ if added >= 10000:
+ # save the work done so far
+ conn.commit()
+ added = 0
+ if added:
+ conn.commit()
+
+
+ def _hold_commit_lock(self, cursor):
+ """Hold the commit lock for gc"""
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+
+
+ def _release_commit_lock(self, cursor):
+ """Release the commit lock during gc"""
+ # no action needed
+ pass
+
+
+ def pack(self, pack_tid, options, sleep=time.sleep):
+ """Run garbage collection.
+
+ Requires the information provided by _pre_gc.
+ """
+
+ # Read committed mode is sufficient.
+ conn, cursor = self.open()
+ try:
+ try:
+ log.info("gc: running")
+ stmt = self._scripts['gc']
+ self._run_script(cursor, stmt)
+
+ except:
+ log.exception("gc: failed")
+ conn.rollback()
+ raise
+
+ else:
+ log.info("gc: finished successfully")
+ conn.commit()
+
+ finally:
+ self.close(conn, cursor)
+
+
+ def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
+ """Polls for new transactions.
+
+ conn and cursor must have been created previously by open_for_load().
+ prev_polled_tid is the tid returned at the last poll, or None
+ if this is the first poll. If ignore_tid is not None, changes
+ committed in that transaction will not be included in the list
+ of changed OIDs.
+
+ Returns (changed_oids, new_polled_tid).
+ """
+ # find out the tid of the most recent transaction.
+ cursor.execute(self._poll_query)
+ new_polled_tid = cursor.fetchone()[0]
+
+ if prev_polled_tid is None:
+ # This is the first time the connection has polled.
+ return None, new_polled_tid
+
+ if new_polled_tid == prev_polled_tid:
+ # No transactions have been committed since prev_polled_tid.
+ return (), new_polled_tid
+
+ # Get the list of changed OIDs and return it.
+ if ignore_tid is None:
+ stmt = """
+ SELECT zoid
+ FROM object_state
+ WHERE tid > %(tid)s
+ """
+ cursor.execute(intern(stmt % self._script_vars),
+ {'tid': prev_polled_tid})
+ else:
+ stmt = """
+ SELECT zoid
+ FROM object_state
+ WHERE tid > %(tid)s
+ AND tid != %(self_tid)s
+ """
+ cursor.execute(intern(stmt % self._script_vars),
+ {'tid': prev_polled_tid, 'self_tid': ignore_tid})
+ oids = [oid for (oid,) in cursor]
+
+ return oids, new_polled_tid
Copied: relstorage/branches/packless/relstorage/adapters/packless/postgresql.py (from rev 99794, relstorage/trunk/relstorage/adapters/postgresql.py)
===================================================================
--- relstorage/branches/packless/relstorage/adapters/packless/postgresql.py (rev 0)
+++ relstorage/branches/packless/relstorage/adapters/packless/postgresql.py 2009-05-07 09:46:43 UTC (rev 99796)
@@ -0,0 +1,503 @@
+##############################################################################
+#
+# Copyright (c) 2008-2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""PostgreSQL adapter for RelStorage."""
+
+from base64 import decodestring, encodestring
+import logging
+import psycopg2, psycopg2.extensions
+import re
+from ZODB.POSException import StorageError
+
+from relstorage.adapters.packless.common import PacklessAdapter
+
+log = logging.getLogger("relstorage.adapters.postgresql")
+
+# disconnected_exceptions contains the exception types that might be
+# raised when the connection to the database has been broken.
+disconnected_exceptions = (psycopg2.OperationalError, psycopg2.InterfaceError)
+
+
+class PacklessPostgreSQLAdapter(PacklessAdapter):
+ """Packless PostgreSQL adapter for RelStorage."""
+
+ def __init__(self, dsn=''):
+ self._dsn = dsn
+
+ def create_schema(self, cursor):
+ """Create the database tables."""
+ stmt = """
+ CREATE TABLE commit_lock ();
+
+ CREATE SEQUENCE zoid_seq;
+
+ -- All object states in all transactions.
+ CREATE TABLE object_state (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ tid BIGINT NOT NULL CHECK (tid > 0),
+ state BYTEA
+ );
+ CREATE INDEX object_state_tid ON object_state (tid);
+
+ -- A list of referenced OIDs from each object_state.
+ -- This table is populated as needed during garbage collection.
+ CREATE TABLE object_ref (
+ zoid BIGINT NOT NULL,
+ to_zoid BIGINT NOT NULL,
+ PRIMARY KEY (zoid, to_zoid)
+ );
+
+ -- The object_refs_added table tracks whether object_refs has
+ -- been populated for all states in a given transaction.
+ -- An entry is added only when the work is finished.
+ CREATE TABLE object_refs_added (
+ tid BIGINT NOT NULL PRIMARY KEY
+ );
+
+ -- Temporary state during garbage collection:
+ -- The list of all objects, a flag signifying whether
+ -- the object should be kept, and a flag signifying whether
+ -- the object's references have been visited.
+ CREATE TABLE gc_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ keep BOOLEAN NOT NULL DEFAULT FALSE,
+ visited BOOLEAN NOT NULL DEFAULT FALSE
+ );
+ CREATE INDEX gc_object_keep_false ON gc_object (zoid)
+ WHERE keep = false;
+ CREATE INDEX gc_object_keep_true ON gc_object (visited)
+ WHERE keep = true;
+ """
+ cursor.execute(stmt)
+
+ if not self._pg_has_advisory_locks(cursor):
+ cursor.execute("CREATE TABLE gc_lock ()")
+
+
+ def prepare_schema(self):
+ """Create the database schema if it does not already exist."""
+ def callback(conn, cursor):
+ cursor.execute("""
+ SELECT tablename
+ FROM pg_tables
+ WHERE tablename = 'object_state'
+ """)
+ if not cursor.rowcount:
+ self.create_schema(cursor)
+ self._open_and_call(callback)
+
+ def zap_all(self):
+ """Clear all data out of the database."""
+ def callback(conn, cursor):
+ cursor.execute("""
+ DELETE FROM object_refs_added;
+ DELETE FROM object_ref;
+ DELETE FROM object_state;
+ ALTER SEQUENCE zoid_seq START WITH 1;
+ """)
+ self._open_and_call(callback)
+
+ def drop_all(self):
+ """Drop all tables and sequences."""
+ def callback(conn, cursor):
+ cursor.execute("SELECT tablename FROM pg_tables")
+ existent = set([name for (name,) in cursor])
+ for tablename in ('gc_object', 'object_refs_added',
+ 'object_ref', 'object_state', 'commit_lock', 'pack_lock'):
+ if tablename in existent:
+ cursor.execute("DROP TABLE %s" % tablename)
+ cursor.execute("DROP SEQUENCE zoid_seq")
+ self._open_and_call(callback)
+
+ def open(self,
+ isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED):
+ """Open a database connection and return (conn, cursor)."""
+ try:
+ conn = psycopg2.connect(self._dsn)
+ conn.set_isolation_level(isolation)
+ cursor = conn.cursor()
+ cursor.arraysize = 64
+ except psycopg2.OperationalError, e:
+ log.warning("Unable to connect: %s", e)
+ raise
+ return conn, cursor
+
+ def close(self, conn, cursor):
+ """Close a connection and cursor, ignoring certain errors.
+ """
+ for obj in (cursor, conn):
+ if obj is not None:
+ try:
+ obj.close()
+ except disconnected_exceptions:
+ pass
+
+ def _pg_version(self, cursor):
+ """Return the (major, minor) version of PostgreSQL"""
+ cursor.execute("SELECT version()")
+ v = cursor.fetchone()[0]
+ m = re.search(r"([0-9]+)[.]([0-9]+)", v)
+ if m is None:
+ raise AssertionError("Unable to detect PostgreSQL version: " + v)
+ else:
+ return int(m.group(1)), int(m.group(2))
+
+ def _pg_has_advisory_locks(self, cursor):
+ """Return true if this version of PostgreSQL supports advisory locks"""
+ return self._pg_version(cursor) >= (8, 2)
+
+ def open_for_load(self):
+ """Open and initialize a connection for loading objects.
+
+ Returns (conn, cursor).
+ """
+ conn, cursor = self.open(
+ psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+ stmt = """
+ PREPARE get_latest_tid AS
+ SELECT tid
+ FROM object_state
+ ORDER BY tid DESC
+ LIMIT 1
+ """
+ cursor.execute(stmt)
+ return conn, cursor
+
+ def restart_load(self, cursor):
+ """Reinitialize a connection for loading objects."""
+ try:
+ cursor.connection.rollback()
+ except disconnected_exceptions, e:
+ raise StorageError(e)
+
+ def get_object_count(self):
+ """Returns the number of objects in the database"""
+ # do later
+ return 0
+
+ def get_db_size(self):
+ """Returns the approximate size of the database in bytes"""
+ def callback(conn, cursor):
+ cursor.execute("SELECT pg_database_size(current_database())")
+ return cursor.fetchone()[0]
+ return self._open_and_call(callback)
+
+ def get_current_tid(self, cursor, oid):
+ """Returns the current integer tid for an object.
+
+ oid is an integer. Returns None if object does not exist.
+ """
+ cursor.execute("""
+ SELECT tid
+ FROM object_state
+ WHERE zoid = %s
+ """, (oid,))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()[0]
+ return None
+
+ def load_current(self, cursor, oid):
+ """Returns the current pickle and integer tid for an object.
+
+ oid is an integer. Returns (None, None) if object does not exist.
+ """
+ cursor.execute("""
+ SELECT encode(state, 'base64'), tid
+ FROM object_state
+ WHERE zoid = %s
+ """, (oid,))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ state64, tid = cursor.fetchone()
+ if state64 is not None:
+ state = decodestring(state64)
+ else:
+ # This object's creation has been undone
+ state = None
+ return state, tid
+ else:
+ return None, None
+
+ def load_revision(self, cursor, oid, tid):
+ """Returns the pickle for an object on a particular transaction.
+
+ Returns None if no such state exists.
+ """
+ cursor.execute("""
+ SELECT encode(state, 'base64')
+ FROM object_state
+ WHERE zoid = %s
+ AND tid = %s
+ """, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ (state64,) = cursor.fetchone()
+ if state64 is not None:
+ return decodestring(state64)
+ return None
+
+ def exists(self, cursor, oid):
+ """Returns a true value if the given object exists."""
+ cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
+ return cursor.rowcount
+
+ def load_before(self, cursor, oid, tid):
+ """Returns the pickle and tid of an object before transaction tid.
+
+ Returns (None, None) if no earlier state exists.
+ """
+ cursor.execute("""
+ SELECT encode(state, 'base64'), tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid < %s
+ ORDER BY tid DESC
+ LIMIT 1
+ """, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ state64, tid = cursor.fetchone()
+ if state64 is not None:
+ state = decodestring(state64)
+ else:
+ # The object's creation has been undone
+ state = None
+ return state, tid
+ else:
+ return None, None
+
+ def get_object_tid_after(self, cursor, oid, tid):
+ """Returns the tid of the next change after an object revision.
+
+ Returns None if no later state exists.
+ """
+ stmt = """
+ SELECT tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid > %s
+ ORDER BY tid
+ LIMIT 1
+ """
+ cursor.execute(stmt, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()[0]
+ else:
+ return None
+
+ def _make_temp_table(self, cursor):
+ """Create the temporary table for storing objects"""
+ stmt = """
+ CREATE TEMPORARY TABLE temp_store (
+ zoid BIGINT NOT NULL,
+ prev_tid BIGINT NOT NULL,
+ state BYTEA
+ ) ON COMMIT DROP;
+ CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+ """
+ cursor.execute(stmt)
+
+ def open_for_store(self):
+ """Open and initialize a connection for storing objects.
+
+ Returns (conn, cursor).
+ """
+ conn, cursor = self.open()
+ try:
+ self._make_temp_table(cursor)
+ return conn, cursor
+ except:
+ self.close(conn, cursor)
+ raise
+
+ def restart_store(self, cursor):
+ """Reuse a store connection."""
+ try:
+ cursor.connection.rollback()
+ self._make_temp_table(cursor)
+ except disconnected_exceptions, e:
+ raise StorageError(e)
+
+ def store_temp(self, cursor, oid, prev_tid, data):
+ """Store an object in the temporary table."""
+ stmt = """
+ DELETE FROM temp_store WHERE zoid = %s;
+ INSERT INTO temp_store (zoid, prev_tid, state)
+ VALUES (%s, %s, decode(%s, 'base64'))
+ """
+ cursor.execute(stmt, (oid, oid, prev_tid, encodestring(data)))
+
+ def replace_temp(self, cursor, oid, prev_tid, data):
+ """Replace an object in the temporary table."""
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = %s,
+ state = decode(%s, 'base64')
+ WHERE zoid = %s
+ """
+ cursor.execute(stmt, (prev_tid, encodestring(data), oid))
+
+ def restore(self, cursor, oid, tid, data):
+ """Store an object directly, without conflict detection.
+
+ Used for copying transactions into this database.
+ """
+ stmt = """
+ DELETE FROM object_state WHERE zoid = %s;
+ INSERT INTO object_state (zoid, tid, state)
+ VALUES (%s, %s, decode(%s, 'base64'))
+ """
+ if data is not None:
+ data = encodestring(data)
+ cursor.execute(stmt, (oid, oid, tid, data))
+
+ def start_commit(self, cursor):
+ """Prepare to commit."""
+ # Hold commit_lock to prevent concurrent commits
+ # (for as short a time as possible).
+ # Lock object_state in share mode to ensure
+ # conflict detection has the most current data.
+ cursor.execute("""
+ LOCK TABLE commit_lock IN EXCLUSIVE MODE;
+ LOCK TABLE object_state IN SHARE MODE
+ """)
+
+ def get_tid_and_time(self, cursor):
+ """Returns the most recent tid and the current database time.
+
+ The database time is the number of seconds since the epoch.
+ """
+ cursor.execute("""
+ SELECT tid, EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
+ FROM object_state
+ ORDER BY tid DESC
+ LIMIT 1
+ """)
+ if cursor.rowcount:
+ return cursor.fetchone()
+ else:
+ return 0, 0
+
+ def add_transaction(self, cursor, tid, username, description, extension,
+ packed=False):
+ """Add a transaction."""
+ pass
+
+ def detect_conflict(self, cursor):
+ """Find one conflict in the data about to be committed.
+
+ If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+ attempted_data). If there is no conflict, returns None.
+ """
+ stmt = """
+ SELECT temp_store.zoid, object_state.tid, temp_store.prev_tid,
+ encode(temp_store.state, 'base64')
+ FROM temp_store
+ JOIN object_state ON (temp_store.zoid = object_state.zoid)
+ WHERE temp_store.prev_tid != object_state.tid
+ LIMIT 1
+ """
+ cursor.execute(stmt)
+ if cursor.rowcount:
+ oid, prev_tid, attempted_prev_tid, data = cursor.fetchone()
+ return oid, prev_tid, attempted_prev_tid, decodestring(data)
+ return None
+
+ def move_from_temp(self, cursor, tid):
+ """Moved the temporarily stored objects to permanent storage.
+
+ Returns the list of oids stored.
+ """
+ stmt = """
+ DELETE FROM object_state
+ WHERE zoid IN (SELECT zoid FROM temp_store);
+
+ INSERT INTO object_state (zoid, tid, state)
+ SELECT zoid, %s, state
+ FROM temp_store
+ """
+ cursor.execute(stmt, (tid,))
+
+ stmt = """
+ SELECT zoid FROM temp_store
+ """
+ cursor.execute(stmt)
+ return [oid for (oid,) in cursor]
+
+ def update_current(self, cursor, tid):
+ """Update the current object pointers.
+
+ tid is the integer tid of the transaction being committed.
+ """
+ pass
+
+ def set_min_oid(self, cursor, oid):
+ """Ensure the next OID is at least the given OID."""
+ cursor.execute("""
+ SELECT CASE WHEN %s > nextval('zoid_seq')
+ THEN setval('zoid_seq', %s)
+ ELSE 0
+ END
+ """, (oid, oid))
+
+ def commit_phase1(self, cursor, tid):
+ """Begin a commit. Returns the transaction name.
+
+ This method should guarantee that commit_phase2() will succeed,
+ meaning that if commit_phase2() would raise any error, the error
+ should be raised in commit_phase1() instead.
+ """
+ return '-'
+
+ def commit_phase2(self, cursor, txn):
+ """Final transaction commit."""
+ cursor.connection.commit()
+
+ def abort(self, cursor, txn=None):
+ """Abort the commit. If txn is not None, phase 1 is also aborted."""
+ cursor.connection.rollback()
+
+ def new_oid(self, cursor):
+ """Return a new, unused OID."""
+ stmt = "SELECT NEXTVAL('zoid_seq')"
+ cursor.execute(stmt)
+ return cursor.fetchone()[0]
+
+ def hold_pack_lock(self, cursor):
+ """Try to acquire the garbage collection lock.
+
+ Raise an exception if gc is already in progress.
+ """
+ if self._pg_has_advisory_locks(cursor):
+ cursor.execute("SELECT pg_try_advisory_lock(1)")
+ locked = cursor.fetchone()[0]
+ if not locked:
+ raise StorageError(
+ 'A garbage collection operation is in progress')
+ else:
+ # b/w compat with PostgreSQL 8.1
+ try:
+ cursor.execute("LOCK gc_lock IN EXCLUSIVE MODE NOWAIT")
+ except psycopg2.DatabaseError:
+ raise StorageError(
+ 'A garbage collection operation is in progress')
+
+ def release_pack_lock(self, cursor):
+ """Release the garbage collection lock."""
+ if self._pg_has_advisory_locks(cursor):
+ cursor.execute("SELECT pg_advisory_unlock(1)")
+ # else no action needed since the lock will be released at txn commit
+
+ _poll_query = "EXECUTE get_latest_tid"
Modified: relstorage/branches/packless/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2009-05-06 18:37:42 UTC (rev 99794)
+++ relstorage/branches/packless/relstorage/adapters/postgresql.py 2009-05-07 09:46:43 UTC (rev 99796)
@@ -398,8 +398,9 @@
except disconnected_exceptions, e:
raise StorageError(e)
- def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+ def store_temp(self, cursor, oid, prev_tid, data):
"""Store an object in the temporary table."""
+ md5sum = self.md5sum(data)
stmt = """
DELETE FROM temp_store WHERE zoid = %s;
INSERT INTO temp_store (zoid, prev_tid, md5, state)
@@ -407,8 +408,9 @@
"""
cursor.execute(stmt, (oid, oid, prev_tid, md5sum, encodestring(data)))
- def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+ def replace_temp(self, cursor, oid, prev_tid, data):
"""Replace an object in the temporary table."""
+ md5sum = self.md5sum(data)
stmt = """
UPDATE temp_store SET
prev_tid = %s,
@@ -418,11 +420,12 @@
"""
cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
- def restore(self, cursor, oid, tid, md5sum, data):
+ def restore(self, cursor, oid, tid, data):
"""Store an object directly, without conflict detection.
Used for copying transactions into this database.
"""
+ md5sum = self.md5sum(data)
stmt = """
INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
VALUES (%s, %s,
Modified: relstorage/branches/packless/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py 2009-05-06 18:37:42 UTC (rev 99794)
+++ relstorage/branches/packless/relstorage/relstorage.py 2009-05-07 09:46:43 UTC (rev 99796)
@@ -44,12 +44,6 @@
"""Stub for versions of ZODB that do not define IMVCCStorage.
"""
-try:
- from hashlib import md5
-except ImportError:
- from md5 import new as md5
-
-
log = logging.getLogger("relstorage")
# Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
@@ -382,15 +376,15 @@
self._lock_acquire()
try:
- if self._store_cursor is not None:
+ if not self._load_transaction_open:
+ self._restart_load()
+ state = self._adapter.load_revision(
+ self._load_cursor, oid_int, tid_int)
+ if state is None and self._store_cursor is not None:
# Allow loading data from later transactions
# for conflict resolution.
- cursor = self._store_cursor
- else:
- if not self._load_transaction_open:
- self._restart_load()
- cursor = self._load_cursor
- state = self._adapter.load_revision(cursor, oid_int, tid_int)
+ state = self._adapter.load_revision(
+ self._store_cursor, oid_int, tid_int)
finally:
self._lock_release()
@@ -451,7 +445,6 @@
# attempting to store objects after the vote phase has finished.
# That should not happen, should it?
assert self._prepared_txn is None
- md5sum = md5(data).hexdigest()
adapter = self._adapter
cursor = self._store_cursor
@@ -466,7 +459,7 @@
try:
self._max_stored_oid = max(self._max_stored_oid, oid_int)
# save the data in a temporary table
- adapter.store_temp(cursor, oid_int, prev_tid_int, md5sum, data)
+ adapter.store_temp(cursor, oid_int, prev_tid_int, data)
return None
finally:
self._lock_release()
@@ -485,11 +478,6 @@
assert self._tid is not None
assert self._prepared_txn is None
- if data is not None:
- md5sum = md5(data).hexdigest()
- else:
- # George Bailey object
- md5sum = None
adapter = self._adapter
cursor = self._store_cursor
@@ -500,8 +488,8 @@
self._lock_acquire()
try:
self._max_stored_oid = max(self._max_stored_oid, oid_int)
- # save the data. Note that md5sum and data can be None.
- adapter.restore(cursor, oid_int, tid_int, md5sum, data)
+ # save the data. Note that data can be None.
+ adapter.restore(cursor, oid_int, tid_int, data)
finally:
self._lock_release()
@@ -618,9 +606,8 @@
else:
# resolved
data = rdata
- md5sum = md5(data).hexdigest()
self._adapter.replace_temp(
- cursor, oid_int, prev_tid_int, md5sum, data)
+ cursor, oid_int, prev_tid_int, data)
resolved.add(oid)
# Move the new states into the permanent table
@@ -910,6 +897,7 @@
finally:
lock_conn.rollback()
adapter.close(lock_conn, lock_cursor)
+ self.sync()
def iterator(self, start=None, stop=None):
Modified: relstorage/branches/packless/relstorage/tests/alltests.py
===================================================================
--- relstorage/trunk/relstorage/tests/alltests.py 2009-05-06 18:37:42 UTC (rev 99794)
+++ relstorage/branches/packless/relstorage/tests/alltests.py 2009-05-07 09:46:43 UTC (rev 99796)
@@ -18,10 +18,12 @@
from testpostgresql import test_suite as postgresql_test_suite
from testmysql import test_suite as mysql_test_suite
from testoracle import test_suite as oracle_test_suite
+from testpacklesspostgresql import test_suite as packless_postgresql_test_suite
def make_suite():
suite = unittest.TestSuite()
suite.addTest(postgresql_test_suite())
suite.addTest(mysql_test_suite())
suite.addTest(oracle_test_suite())
+ suite.addTest(packless_postgresql_test_suite())
return suite
Modified: relstorage/branches/packless/relstorage/tests/comparison.ods
===================================================================
(Binary files differ)
Added: relstorage/branches/packless/relstorage/tests/packlesstestbase.py
===================================================================
--- relstorage/branches/packless/relstorage/tests/packlesstestbase.py (rev 0)
+++ relstorage/branches/packless/relstorage/tests/packlesstestbase.py 2009-05-07 09:46:43 UTC (rev 99796)
@@ -0,0 +1,246 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""A foundation for packless relstorage adapter tests"""
+
+import cPickle
+import time
+
+from ZODB.tests.ConflictResolution import PCounter
+from ZODB.tests.PackableStorage import Root, dumps, pdumps, ZERO
+from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
+from ZODB.serialize import referencesf
+
+from relstorage.tests.reltestbase import GenericRelStorageTests
+from relstorage.tests.reltestbase import ToFileStorage
+from relstorage.tests.reltestbase import FromFileStorage
+
+
+class PacklessRelStorageTests(GenericRelStorageTests):
+ # override certain tests with a version that expects
+ # garbage collection, but not packing.
+ def checkPackAllRevisions(self):
+ self._initroot()
+ eq = self.assertEqual
+ raises = self.assertRaises
+ # Create a `persistent' object
+ obj = self._newobj()
+ oid = obj.getoid()
+ obj.value = 1
+ # Commit three different revisions
+ revid1 = self._dostoreNP(oid, data=pdumps(obj))
+ obj.value = 2
+ revid2 = self._dostoreNP(oid, revid=revid1, data=pdumps(obj))
+ obj.value = 3
+ revid3 = self._dostoreNP(oid, revid=revid2, data=pdumps(obj))
+ # Now make sure only the latest revision can be extracted
+ raises(KeyError, self._storage.loadSerial, oid, revid1)
+ raises(KeyError, self._storage.loadSerial, oid, revid2)
+ data = self._storage.loadSerial(oid, revid3)
+ pobj = cPickle.loads(data)
+ eq(pobj.getoid(), oid)
+ eq(pobj.value, 3)
+ # Now pack all transactions; need to sleep a second to make
+ # sure that the pack time is greater than the last commit time.
+ now = packtime = time.time()
+ while packtime <= now:
+ packtime = time.time()
+ self._storage.pack(packtime, referencesf)
+ self._storage.sync()
+ # All revisions of the object should be gone, since there is no
+ # reference from the root object to this object.
+ raises(KeyError, self._storage.loadSerial, oid, revid1)
+ raises(KeyError, self._storage.loadSerial, oid, revid2)
+ raises(KeyError, self._storage.loadSerial, oid, revid3)
+ raises(KeyError, self._storage.load, oid, '')
+
+ def checkPackJustOldRevisions(self):
+ eq = self.assertEqual
+ raises = self.assertRaises
+ loads = self._makeloader()
+ # Create a root object. This can't be an instance of Object,
+ # otherwise the pickling machinery will serialize it as a persistent
+ # id and not as an object that contains references (persistent ids) to
+ # other objects.
+ root = Root()
+ # Create a persistent object, with some initial state
+ obj = self._newobj()
+ oid = obj.getoid()
+ # Link the root object to the persistent object, in order to keep the
+ # persistent object alive. Store the root object.
+ root.obj = obj
+ root.value = 0
+ revid0 = self._dostoreNP(ZERO, data=dumps(root))
+ # Make sure the root can be retrieved
+ data, revid = self._storage.load(ZERO, '')
+ eq(revid, revid0)
+ eq(loads(data).value, 0)
+ # Commit three different revisions of the other object
+ obj.value = 1
+ revid1 = self._dostoreNP(oid, data=pdumps(obj))
+ obj.value = 2
+ revid2 = self._dostoreNP(oid, revid=revid1, data=pdumps(obj))
+ obj.value = 3
+ revid3 = self._dostoreNP(oid, revid=revid2, data=pdumps(obj))
+ # Now make sure only the latest revision can be extracted
+ raises(KeyError, self._storage.loadSerial, oid, revid1)
+ raises(KeyError, self._storage.loadSerial, oid, revid2)
+ data = self._storage.loadSerial(oid, revid3)
+ pobj = cPickle.loads(data)
+ eq(pobj.getoid(), oid)
+ eq(pobj.value, 3)
+ # Now pack. The object should stay alive because it's pointed
+ # to by the root.
+ now = packtime = time.time()
+ while packtime <= now:
+ packtime = time.time()
+ self._storage.pack(packtime, referencesf)
+ # Make sure the revisions are gone, but that object zero and revision
+ # 3 are still there and correct
+ data, revid = self._storage.load(ZERO, '')
+ eq(revid, revid0)
+ eq(loads(data).value, 0)
+ raises(KeyError, self._storage.loadSerial, oid, revid1)
+ raises(KeyError, self._storage.loadSerial, oid, revid2)
+ data = self._storage.loadSerial(oid, revid3)
+ pobj = cPickle.loads(data)
+ eq(pobj.getoid(), oid)
+ eq(pobj.value, 3)
+ data, revid = self._storage.load(oid, '')
+ eq(revid, revid3)
+ pobj = cPickle.loads(data)
+ eq(pobj.getoid(), oid)
+ eq(pobj.value, 3)
+
+ def checkPackOnlyOneObject(self):
+ eq = self.assertEqual
+ raises = self.assertRaises
+ loads = self._makeloader()
+ # Create a root object. This can't be an instance of Object,
+ # otherwise the pickling machinery will serialize it as a persistent
+ # id and not as an object that contains references (persistent ids) to
+ # other objects.
+ root = Root()
+ # Create a persistent object, with some initial state
+ obj1 = self._newobj()
+ oid1 = obj1.getoid()
+ # Create another persistent object, with some initial state.
+ obj2 = self._newobj()
+ oid2 = obj2.getoid()
+ # Link the root object to the persistent objects, in order to keep
+ # them alive. Store the root object.
+ root.obj1 = obj1
+ root.obj2 = obj2
+ root.value = 0
+ revid0 = self._dostoreNP(ZERO, data=dumps(root))
+ # Make sure the root can be retrieved
+ data, revid = self._storage.load(ZERO, '')
+ eq(revid, revid0)
+ eq(loads(data).value, 0)
+ # Commit three different revisions of the first object
+ obj1.value = 1
+ revid1 = self._dostoreNP(oid1, data=pdumps(obj1))
+ obj1.value = 2
+ revid2 = self._dostoreNP(oid1, revid=revid1, data=pdumps(obj1))
+ obj1.value = 3
+ revid3 = self._dostoreNP(oid1, revid=revid2, data=pdumps(obj1))
+ # Now make sure only the latest revision can be extracted
+ raises(KeyError, self._storage.loadSerial, oid1, revid1)
+ raises(KeyError, self._storage.loadSerial, oid1, revid2)
+ data = self._storage.loadSerial(oid1, revid3)
+ pobj = cPickle.loads(data)
+ eq(pobj.getoid(), oid1)
+ eq(pobj.value, 3)
+ # Now commit a revision of the second object
+ obj2.value = 11
+ revid4 = self._dostoreNP(oid2, data=pdumps(obj2))
+ # And make sure the revision can be extracted
+ data = self._storage.loadSerial(oid2, revid4)
+ pobj = cPickle.loads(data)
+ eq(pobj.getoid(), oid2)
+ eq(pobj.value, 11)
+ # Now pack just revisions 1 and 2 of object1. Object1's current
+ # revision should stay alive because it's pointed to by the root, as
+ # should Object2's current revision.
+ now = packtime = time.time()
+ while packtime <= now:
+ packtime = time.time()
+ self._storage.pack(packtime, referencesf)
+ # Make sure the revisions are gone, but that object zero, object2, and
+ # revision 3 of object1 are still there and correct.
+ data, revid = self._storage.load(ZERO, '')
+ eq(revid, revid0)
+ eq(loads(data).value, 0)
+ raises(KeyError, self._storage.loadSerial, oid1, revid1)
+ raises(KeyError, self._storage.loadSerial, oid1, revid2)
+ data = self._storage.loadSerial(oid1, revid3)
+ pobj = cPickle.loads(data)
+ eq(pobj.getoid(), oid1)
+ eq(pobj.value, 3)
+ data, revid = self._storage.load(oid1, '')
+ eq(revid, revid3)
+ pobj = cPickle.loads(data)
+ eq(pobj.getoid(), oid1)
+ eq(pobj.value, 3)
+ data, revid = self._storage.load(oid2, '')
+ eq(revid, revid4)
+ eq(loads(data).value, 11)
+ data = self._storage.loadSerial(oid2, revid4)
+ pobj = cPickle.loads(data)
+ eq(pobj.getoid(), oid2)
+ eq(pobj.value, 11)
+
+ def checkResolve(self):
+ obj = PCounter()
+ obj.inc()
+
+ oid = self._storage.new_oid()
+
+ revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))
+
+ obj.inc()
+ obj.inc()
+
+ # The effect of committing two transactions with the same
+ # pickle is to commit two different transactions relative to
+ # revid1 that add two to _value.
+
+ # open s1
+ s1 = self._storage.new_instance()
+ # start a load transaction in s1
+ s1.poll_invalidations()
+
+ # commit a change
+ revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
+
+ # commit a conflicting change using s1
+ main_storage = self._storage
+ self._storage = s1
+ try:
+ # we can resolve this conflict because s1 has an open
+ # transaction that can read the old state of the object.
+ revid3 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
+ s1.release()
+ finally:
+ self._storage = main_storage
+
+ data, serialno = self._storage.load(oid, '')
+ inst = zodb_unpickle(data)
+ self.assertEqual(inst._value, 5)
+
+
+class PacklessToFileStorage(ToFileStorage):
+ pass
+
+class PacklessFromFileStorage(FromFileStorage):
+ pass
Modified: relstorage/branches/packless/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2009-05-06 18:37:42 UTC (rev 99794)
+++ relstorage/branches/packless/relstorage/tests/reltestbase.py 2009-05-07 09:46:43 UTC (rev 99796)
@@ -36,7 +36,7 @@
from ZODB.serialize import referencesf
-class BaseRelStorageTests(StorageTestBase.StorageTestBase):
+class RelStorageTestBase(StorageTestBase.StorageTestBase):
def make_adapter(self):
# abstract method
@@ -56,21 +56,15 @@
self._storage.cleanup()
-class RelStorageTests(
- BaseRelStorageTests,
+class GenericRelStorageTests(
+ RelStorageTestBase,
BasicStorage.BasicStorage,
- TransactionalUndoStorage.TransactionalUndoStorage,
- RevisionStorage.RevisionStorage,
PackableStorage.PackableStorage,
- PackableStorage.PackableUndoStorage,
Synchronization.SynchronizedStorage,
ConflictResolution.ConflictResolvingStorage,
- HistoryStorage.HistoryStorage,
- IteratorStorage.IteratorStorage,
- IteratorStorage.ExtendedIteratorStorage,
PersistentStorage.PersistentStorage,
MTStorage.MTStorage,
- ReadOnlyStorage.ReadOnlyStorage
+ ReadOnlyStorage.ReadOnlyStorage,
):
def checkDropAndPrepare(self):
@@ -360,7 +354,36 @@
fakecache.data.clear()
self.checkPollInterval(using_cache=True)
+ def checkDoubleCommitter(self):
+ # Verify we can store an object that gets committed twice in
+ # a single transaction.
+ db = DB(self._storage)
+ try:
+ conn = db.open()
+ try:
+ conn.root()['dc'] = DoubleCommitter()
+ transaction.commit()
+ conn2 = db.open()
+ self.assertEquals(conn2.root()['dc'].new_attribute, 1)
+ conn2.close()
+ finally:
+ transaction.abort()
+ conn.close()
+ finally:
+ db.close()
+
+class RelStorageTests(
+ GenericRelStorageTests,
+ TransactionalUndoStorage.TransactionalUndoStorage,
+ IteratorStorage.IteratorStorage,
+ IteratorStorage.ExtendedIteratorStorage,
+ RevisionStorage.RevisionStorage,
+ PackableStorage.PackableUndoStorage,
+ HistoryStorage.HistoryStorage,
+ ConflictResolution.ConflictResolvingTransUndoStorage,
+ ):
+
def checkTransactionalUndoIterator(self):
# this test overrides the broken version in TransactionalUndoStorage.
@@ -567,24 +590,6 @@
finally:
db.close()
- def checkDoubleCommitter(self):
- # Verify we can store an object that gets committed twice in
- # a single transaction.
- db = DB(self._storage)
- try:
- conn = db.open()
- try:
- conn.root()['dc'] = DoubleCommitter()
- transaction.commit()
- conn2 = db.open()
- self.assertEquals(conn2.root()['dc'].new_attribute, 1)
- conn2.close()
- finally:
- transaction.abort()
- conn.close()
- finally:
- db.close()
-
class DoubleCommitter(Persistent):
"""A crazy persistent class that changes self in __getstate__"""
@@ -647,7 +652,7 @@
setattr(RecoveryStorageSubset, name, attr)
-class ToFileStorage(BaseRelStorageTests, RecoveryStorageSubset):
+class ToFileStorage(RelStorageTestBase, RecoveryStorageSubset):
def setUp(self):
self.open(create=1)
self._storage.zap_all()
@@ -663,7 +668,7 @@
return FileStorage('Dest.fs')
-class FromFileStorage(BaseRelStorageTests, RecoveryStorageSubset):
+class FromFileStorage(RelStorageTestBase, RecoveryStorageSubset):
def setUp(self):
self.open(create=1)
self._storage.zap_all()
Added: relstorage/branches/packless/relstorage/tests/testpacklesspostgresql.py
===================================================================
--- relstorage/branches/packless/relstorage/tests/testpacklesspostgresql.py (rev 0)
+++ relstorage/branches/packless/relstorage/tests/testpacklesspostgresql.py 2009-05-07 09:46:43 UTC (rev 99796)
@@ -0,0 +1,52 @@
+##############################################################################
+#
+# Copyright (c) 2008-2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Tests of relstorage.adapters.packless.postgresql"""
+
+import logging
+import unittest
+
+from relstorage.tests import reltestbase
+from relstorage.tests import packlesstestbase
+from relstorage.adapters.packless.postgresql import PacklessPostgreSQLAdapter
+
+
+class UsePacklessPostgreSQLAdapter:
+ def make_adapter(self):
+ return PacklessPostgreSQLAdapter(
+ 'dbname=relstoragetestp user=relstoragetest password=relstoragetest')
+
+class PacklessPostgreSQLTests(
+ UsePacklessPostgreSQLAdapter,
+ packlesstestbase.PacklessRelStorageTests):
+ pass
+
+class PacklessPGToFile(
+ UsePacklessPostgreSQLAdapter, reltestbase.ToFileStorage):
+ pass
+
+class PacklessFileToPG(
+ UsePacklessPostgreSQLAdapter, reltestbase.FromFileStorage):
+ pass
+
+
+def test_suite():
+ suite = unittest.TestSuite()
+ for klass in [PacklessPostgreSQLTests, PacklessPGToFile, PacklessFileToPG]:
+ suite.addTest(unittest.makeSuite(klass, "check"))
+ return suite
+
+if __name__=='__main__':
+ logging.basicConfig()
+ unittest.main(defaultTest="test_suite")
+
More information about the Checkins
mailing list