[Checkins] SVN: relstorage/branches/packless/ Created a packless PostgreSQL adapter in a branch.

Shane Hathaway shane at hathawaymix.org
Thu May 7 05:46:44 EDT 2009


Log message for revision 99796:
  Created a packless PostgreSQL adapter in a branch.
  

Changed:
  A   relstorage/branches/packless/
  U   relstorage/branches/packless/relstorage/adapters/common.py
  U   relstorage/branches/packless/relstorage/adapters/mysql.py
  U   relstorage/branches/packless/relstorage/adapters/oracle.py
  A   relstorage/branches/packless/relstorage/adapters/packless/
  A   relstorage/branches/packless/relstorage/adapters/packless/__init__.py
  A   relstorage/branches/packless/relstorage/adapters/packless/common.py
  A   relstorage/branches/packless/relstorage/adapters/packless/postgresql.py
  U   relstorage/branches/packless/relstorage/adapters/postgresql.py
  U   relstorage/branches/packless/relstorage/relstorage.py
  U   relstorage/branches/packless/relstorage/tests/alltests.py
  U   relstorage/branches/packless/relstorage/tests/comparison.ods
  A   relstorage/branches/packless/relstorage/tests/packlesstestbase.py
  U   relstorage/branches/packless/relstorage/tests/reltestbase.py
  A   relstorage/branches/packless/relstorage/tests/testpacklesspostgresql.py

-=-
Modified: relstorage/branches/packless/relstorage/adapters/common.py
===================================================================
--- relstorage/trunk/relstorage/adapters/common.py	2009-05-06 18:37:42 UTC (rev 99794)
+++ relstorage/branches/packless/relstorage/adapters/common.py	2009-05-07 09:46:43 UTC (rev 99796)
@@ -18,6 +18,12 @@
 import logging
 import time
 
+try:
+    from hashlib import md5
+except ImportError:
+    from md5 import new as md5
+
+
 log = logging.getLogger("relstorage.adapters.common")
 
 verify_sane_database = False
@@ -929,18 +935,30 @@
             return None, new_polled_tid
 
         # Get the list of changed OIDs and return it.
-        stmt = """
-        SELECT zoid
-        FROM current_object
-        WHERE tid > %(tid)s
-        """
         if ignore_tid is None:
+            stmt = """
+            SELECT zoid
+            FROM current_object
+            WHERE tid > %(tid)s
+            """
             cursor.execute(intern(stmt % self._script_vars),
                 {'tid': prev_polled_tid})
         else:
-            stmt += " AND tid != %(self_tid)s"
+            stmt = """
+            SELECT zoid
+            FROM current_object
+            WHERE tid > %(tid)s
+                AND tid != %(self_tid)s
+            """
             cursor.execute(intern(stmt % self._script_vars),
                 {'tid': prev_polled_tid, 'self_tid': ignore_tid})
         oids = [oid for (oid,) in cursor]
 
         return oids, new_polled_tid
+
+    def md5sum(self, data):
+        if data is not None:
+            return md5(data).hexdigest()
+        else:
+            # George Bailey object
+            return None

Modified: relstorage/branches/packless/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py	2009-05-06 18:37:42 UTC (rev 99794)
+++ relstorage/branches/packless/relstorage/adapters/mysql.py	2009-05-07 09:46:43 UTC (rev 99796)
@@ -467,16 +467,18 @@
         except disconnected_exceptions, e:
             raise StorageError(e)
 
-    def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+    def store_temp(self, cursor, oid, prev_tid, data):
         """Store an object in the temporary table."""
+        md5sum = self.md5sum(data)
         stmt = """
         REPLACE INTO temp_store (zoid, prev_tid, md5, state)
         VALUES (%s, %s, %s, %s)
         """
         cursor.execute(stmt, (oid, prev_tid, md5sum, MySQLdb.Binary(data)))
 
-    def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+    def replace_temp(self, cursor, oid, prev_tid, data):
         """Replace an object in the temporary table."""
+        md5sum = self.md5sum(data)
         stmt = """
         UPDATE temp_store SET
             prev_tid = %s,
@@ -486,11 +488,12 @@
         """
         cursor.execute(stmt, (prev_tid, md5sum, MySQLdb.Binary(data), oid))
 
-    def restore(self, cursor, oid, tid, md5sum, data):
+    def restore(self, cursor, oid, tid, data):
         """Store an object directly, without conflict detection.
 
         Used for copying transactions into this database.
         """
+        md5sum = self.md5sum(data)
         stmt = """
         INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
         VALUES (%s, %s,

Modified: relstorage/branches/packless/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py	2009-05-06 18:37:42 UTC (rev 99794)
+++ relstorage/branches/packless/relstorage/adapters/oracle.py	2009-05-07 09:46:43 UTC (rev 99796)
@@ -538,8 +538,9 @@
         except disconnected_exceptions, e:
             raise StorageError(e)
 
-    def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+    def store_temp(self, cursor, oid, prev_tid, data):
         """Store an object in the temporary table."""
+        md5sum = self.md5sum(data)
         cursor.execute("DELETE FROM temp_store WHERE zoid = :oid", oid=oid)
         if len(data) <= 2000:
             # Send data inline for speed.  Oracle docs say maximum size
@@ -561,8 +562,9 @@
             cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
                 md5sum=md5sum, blobdata=data)
 
-    def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+    def replace_temp(self, cursor, oid, prev_tid, data):
         """Replace an object in the temporary table."""
+        md5sum = self.md5sum(data)
         cursor.setinputsizes(data=cx_Oracle.BLOB)
         stmt = """
         UPDATE temp_store SET
@@ -574,11 +576,12 @@
         cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
             md5sum=md5sum, data=cx_Oracle.Binary(data))
 
-    def restore(self, cursor, oid, tid, md5sum, data):
+    def restore(self, cursor, oid, tid, data):
         """Store an object directly, without conflict detection.
 
         Used for copying transactions into this database.
         """
+        md5sum = self.md5sum(data)
         cursor.setinputsizes(data=cx_Oracle.BLOB)
         stmt = """
         INSERT INTO object_state (zoid, tid, prev_tid, md5, state)

Added: relstorage/branches/packless/relstorage/adapters/packless/__init__.py
===================================================================
--- relstorage/branches/packless/relstorage/adapters/packless/__init__.py	                        (rev 0)
+++ relstorage/branches/packless/relstorage/adapters/packless/__init__.py	2009-05-07 09:46:43 UTC (rev 99796)
@@ -0,0 +1 @@
+

Copied: relstorage/branches/packless/relstorage/adapters/packless/common.py (from rev 99794, relstorage/trunk/relstorage/adapters/common.py)
===================================================================
--- relstorage/branches/packless/relstorage/adapters/packless/common.py	                        (rev 0)
+++ relstorage/branches/packless/relstorage/adapters/packless/common.py	2009-05-07 09:46:43 UTC (rev 99796)
@@ -0,0 +1,563 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Code common to most adapters."""
+
+from ZODB.POSException import UndoError
+
+import logging
+import time
+
+log = logging.getLogger("relstorage.adapters.packless.common")
+
+verify_sane_database = False
+
+
+# Notes about adapters:
+#
+# An adapter must not hold a connection, cursor, or database state, because
+# RelStorage opens multiple concurrent connections using a single adapter
+# instance.
+# Within the context of an adapter, all OID and TID values are integers,
+# not binary strings, except as noted.
+
+class PacklessAdapter(object):
+    """Common code for a packless database adapter.
+
+    This is an abstract class; a lot of methods are expected to be
+    provided by subclasses.
+    """
+
+    # _script_vars contains replacements for statements in scripts.
+    # These are correct for PostgreSQL and MySQL but not for Oracle.
+    _script_vars = {
+        'TRUE':         'TRUE',
+        'FALSE':        'FALSE',
+        'OCTET_LENGTH': 'OCTET_LENGTH',
+        'TRUNCATE':     'TRUNCATE',
+        'oid':          '%(oid)s',
+        'tid':          '%(tid)s',
+        'self_tid':     '%(self_tid)s',
+        'min_tid':      '%(min_tid)s',
+        'max_tid':      '%(max_tid)s',
+    }
+
+    _scripts = {
+        'create_temp_gc_visit': """
+            CREATE TEMPORARY TABLE temp_gc_visit (
+                zoid BIGINT NOT NULL
+            );
+            CREATE UNIQUE INDEX temp_gc_visit_zoid ON temp_gc_visit (zoid)
+            """,
+
+        'pre_gc_follow_child_refs': """
+            UPDATE gc_object SET keep = %(TRUE)s
+            WHERE keep = %(FALSE)s
+                AND zoid IN (
+                    SELECT DISTINCT to_zoid
+                    FROM object_ref
+                        JOIN temp_gc_visit USING (zoid)
+                )
+            """,
+
+        'gc': """
+            DELETE FROM object_state
+            WHERE zoid IN (
+                SELECT zoid
+                FROM gc_object
+                WHERE keep = %(FALSE)s
+            );
+
+            DELETE FROM object_refs_added
+            WHERE tid NOT IN (
+                SELECT DISTINCT tid
+                FROM object_state
+            );
+
+            DELETE FROM object_ref
+            WHERE zoid IN (
+                SELECT zoid
+                FROM gc_object
+                WHERE keep = %(FALSE)s
+            );
+
+            %(TRUNCATE)s gc_object
+            """,
+    }
+
+    def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
+        """Execute a statement from a script with the given parameters.
+
+        Subclasses may override this.
+        The input statement is generic and needs to be transformed
+        into a database-specific statement.
+        """
+        stmt = generic_stmt % self._script_vars
+        try:
+            cursor.execute(stmt, generic_params)
+        except:
+            log.warning("script statement failed: %r; parameters: %r",
+                stmt, generic_params)
+            raise
+
+
+    def _run_script(self, cursor, script, params=()):
+        """Execute a series of statements in the database.
+
+        The statements are transformed by _run_script_stmt
+        before execution.
+        """
+        lines = []
+        for line in script.split('\n'):
+            line = line.strip()
+            if not line or line.startswith('--'):
+                continue
+            if line.endswith(';'):
+                line = line[:-1]
+                lines.append(line)
+                stmt = '\n'.join(lines)
+                self._run_script_stmt(cursor, stmt, params)
+                lines = []
+            else:
+                lines.append(line)
+        if lines:
+            stmt = '\n'.join(lines)
+            self._run_script_stmt(cursor, stmt, params)
+
+    def _open_and_call(self, callback):
+        """Call a function with an open connection and cursor.
+
+        If the function returns, commits the transaction and returns the
+        result returned by the function.
+        If the function raises an exception, aborts the transaction
+        then propagates the exception.
+        """
+        conn, cursor = self.open()
+        try:
+            try:
+                res = callback(conn, cursor)
+            except:
+                conn.rollback()
+                raise
+            else:
+                conn.commit()
+                return res
+        finally:
+            self.close(conn, cursor)
+
+
+    def iter_transactions(self, cursor):
+        """Iterate over the transaction log, newest first.
+
+        Skips packed transactions.
+        Yields (tid, username, description, extension) for each transaction.
+        """
+        stmt = """
+        SELECT DISTINCT tid
+        FROM object_state
+        ORDER BY tid DESC
+        """
+        self._run_script_stmt(cursor, stmt)
+        return ((tid, '', '', '') for (tid,) in cursor)
+
+
+    def iter_transactions_range(self, cursor, start=None, stop=None):
+        """Iterate over the transactions in the given range, oldest first.
+
+        Includes packed transactions.
+        Yields (tid, username, description, extension, packed)
+        for each transaction.
+        """
+        stmt = """
+        SELECT DISTINCT tid
+        FROM object_state
+        WHERE tid > 0
+        """
+        if start is not None:
+            stmt += " AND tid >= %(min_tid)s"
+        if stop is not None:
+            stmt += " AND tid <= %(max_tid)s"
+        stmt += " ORDER BY tid"
+        self._run_script_stmt(cursor, stmt,
+            {'min_tid': start, 'max_tid': stop})
+        return ((tid, '', '', '', True) for (tid,) in cursor)
+
+
+    def iter_object_history(self, cursor, oid):
+        """Iterate over an object's history.
+
+        Raises KeyError if the object does not exist.
+        Yields (tid, username, description, extension, pickle_size)
+        for each modification.
+        """
+        stmt = """
+        SELECT tid, %(OCTET_LENGTH)s(state)
+        FROM object_state
+        WHERE zoid = %(oid)s
+        """
+        self._run_script_stmt(cursor, stmt, {'oid': oid})
+        return ((tid, '', '', '', size) for (tid, size) in cursor)
+
+
+    def iter_objects(self, cursor, tid):
+        """Iterate over object states in a transaction.
+
+        Yields (oid, prev_tid, state) for each object state.
+        """
+        stmt = """
+        SELECT zoid, state
+        FROM object_state
+        WHERE tid = %(tid)s
+        ORDER BY zoid
+        """
+        self._run_script_stmt(cursor, stmt, {'tid': tid})
+        for oid, state in cursor:
+            if hasattr(state, 'read'):
+                # Oracle
+                state = state.read()
+            yield oid, state
+
+
+    def verify_undoable(self, cursor, undo_tid):
+        """Raise UndoError if it is not safe to undo the specified txn."""
+        raise UndoError("Undo is not supported by this storage")
+
+
+    def undo(self, cursor, undo_tid, self_tid):
+        """Undo a transaction.
+
+        Parameters: "undo_tid", the integer tid of the transaction to undo,
+        and "self_tid", the integer tid of the current transaction.
+
+        Returns the list of OIDs undone.
+        """
+        raise UndoError("Undo is not supported by this storage")
+
+
+    def choose_pack_transaction(self, pack_point):
+        """Return the transaction before or at the specified pack time.
+
+        Returns None if there is nothing to pack.
+        """
+        return 1
+
+
+    def open_for_pre_pack(self):
+        """Open a connection to be used for the pre-pack phase.
+        Returns (conn, cursor).
+
+        Subclasses may override this.
+        """
+        return self.open()
+
+
+    def pre_pack(self, pack_tid, get_references, options):
+        """Decide what the garbage collector should delete.
+
+        pack_tid is ignored.
+
+        get_references is a function that accepts a pickled state and
+        returns a set of OIDs that state refers to.
+
+        options is an instance of relstorage.Options.
+        The options.pack_gc flag indicates whether to run garbage collection.
+        If pack_gc is false, this method does nothing.
+        """
+        if not options.pack_gc:
+            log.warning("pre_gc: garbage collection is disabled")
+            return
+
+        conn, cursor = self.open_for_pre_pack()
+        try:
+            try:
+                self._pre_gc(conn, cursor, get_references)
+                conn.commit()
+
+                stmt = """
+                SELECT COUNT(1)
+                FROM gc_object
+                WHERE keep = %(FALSE)s
+                """
+                self._run_script_stmt(cursor, stmt)
+                to_remove = cursor.fetchone()[0]
+
+                log.info("pre_gc: will remove %d object(s)",
+                    to_remove)
+
+            except:
+                log.exception("pre_gc: failed")
+                conn.rollback()
+                raise
+            else:
+                log.info("pre_gc: finished successfully")
+                conn.commit()
+        finally:
+            self.close(conn, cursor)
+
+
+    def _pre_gc(self, conn, cursor, get_references):
+        """Determine what to garbage collect.
+        """
+        stmt = self._scripts['create_temp_gc_visit']
+        if stmt:
+            self._run_script(cursor, stmt)
+
+        self.fill_object_refs(conn, cursor, get_references)
+
+        log.info("pre_gc: filling the gc_object table")
+        # Fill the gc_object table with all known OIDs.
+        stmt = """
+        %(TRUNCATE)s gc_object;
+
+        INSERT INTO gc_object (zoid)
+        SELECT zoid
+        FROM object_state;
+
+        -- Keep the root object
+        UPDATE gc_object SET keep = %(TRUE)s
+        WHERE zoid = 0;
+        """
+        self._run_script(cursor, stmt)
+
+        # Each of the objects to be kept might
+        # refer to other objects.  If some of those references
+        # include objects currently set to be removed, mark
+        # the referenced objects to be kept as well.  Do this
+        # repeatedly until all references have been satisfied.
+        pass_num = 1
+        while True:
+            log.info("pre_gc: following references, pass %d", pass_num)
+
+            # Make a list of all parent objects that still need
+            # to be visited.  Then set gc_object.visited for all gc_object
+            # rows with keep = true.
+            stmt = """
+            %(TRUNCATE)s temp_gc_visit;
+
+            INSERT INTO temp_gc_visit (zoid)
+            SELECT zoid
+            FROM gc_object
+            WHERE keep = %(TRUE)s
+                AND visited = %(FALSE)s;
+
+            UPDATE gc_object SET visited = %(TRUE)s
+            WHERE keep = %(TRUE)s
+                AND visited = %(FALSE)s
+            """
+            self._run_script(cursor, stmt)
+            visit_count = cursor.rowcount
+
+            if verify_sane_database:
+                # Verify the update actually worked.
+                # MySQL 5.1.23 fails this test; 5.1.24 passes.
+                stmt = """
+                SELECT 1
+                FROM gc_object
+                WHERE keep = %(TRUE)s AND visited = %(FALSE)s
+                """
+                self._run_script_stmt(cursor, stmt)
+                if list(cursor):
+                    raise AssertionError(
+                        "database failed to update gc_object")
+
+            log.debug("pre_gc: checking references from %d object(s)",
+                visit_count)
+
+            # Visit the children of all parent objects that were
+            # just visited.
+            stmt = self._scripts['pre_gc_follow_child_refs']
+            self._run_script(cursor, stmt)
+            found_count = cursor.rowcount
+
+            log.debug("pre_gc: found %d more referenced object(s) in "
+                "pass %d", found_count, pass_num)
+            if not found_count:
+                # No new references detected.
+                break
+            else:
+                pass_num += 1
+
+
+    def _add_object_ref_rows(self, cursor, add_rows):
+        """Add rows to object_ref.
+
+        The input rows are tuples containing (from_zoid, to_zoid).
+
+        Subclasses can override this.
+        """
+        stmt = """
+        INSERT INTO object_ref (zoid, to_zoid)
+        VALUES (%s, %s)
+        """
+        cursor.executemany(stmt, add_rows)
+
+
+    def _add_refs_for_tid(self, cursor, tid, get_references):
+        """Fill object_refs with all states for a transaction.
+
+        Returns the number of references added.
+        """
+        log.debug("pre_gc: transaction %d: computing references ", tid)
+        from_count = 0
+
+        stmt = """
+        SELECT zoid, state
+        FROM object_state
+        WHERE tid = %(tid)s
+        """
+        self._run_script_stmt(cursor, stmt, {'tid': tid})
+
+        add_rows = []  # [(from_oid, to_oid)]
+        for from_oid, state in cursor:
+            if hasattr(state, 'read'):
+                # Oracle
+                state = state.read()
+            if state:
+                from_count += 1
+                to_oids = get_references(str(state))
+                for to_oid in to_oids:
+                    add_rows.append((from_oid, to_oid))
+
+        if add_rows:
+            stmt = """
+            DELETE FROM object_ref
+            WHERE zoid in (
+                SELECT zoid
+                FROM object_state
+                WHERE tid = %(tid)s
+                )
+            """
+            self._run_script_stmt(cursor, stmt, {'tid': tid})
+            self._add_object_ref_rows(cursor, add_rows)
+
+        # The references have been computed for this transaction.
+        stmt = """
+        INSERT INTO object_refs_added (tid)
+        VALUES (%(tid)s)
+        """
+        self._run_script_stmt(cursor, stmt, {'tid': tid})
+
+        to_count = len(add_rows)
+        log.debug("pre_gc: transaction %d: has %d reference(s) "
+            "from %d object(s)", tid, to_count, from_count)
+        return to_count
+
+
+    def fill_object_refs(self, conn, cursor, get_references):
+        """Update the object_refs table by analyzing new transactions."""
+        stmt = """
+        SELECT transaction.tid
+        FROM (SELECT DISTINCT tid FROM object_state) AS transaction
+            LEFT JOIN object_refs_added
+                ON (transaction.tid = object_refs_added.tid)
+        WHERE object_refs_added.tid IS NULL
+        ORDER BY transaction.tid
+        """
+        cursor.execute(stmt)
+        tids = [tid for (tid,) in cursor]
+        if tids:
+            added = 0
+            log.info("discovering references from objects in %d "
+                "transaction(s)" % len(tids))
+            for tid in tids:
+                added += self._add_refs_for_tid(cursor, tid, get_references)
+                if added >= 10000:
+                    # save the work done so far
+                    conn.commit()
+                    added = 0
+            if added:
+                conn.commit()
+
+
+    def _hold_commit_lock(self, cursor):
+        """Hold the commit lock for gc"""
+        cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+
+
+    def _release_commit_lock(self, cursor):
+        """Release the commit lock during gc"""
+        # no action needed
+        pass
+
+
+    def pack(self, pack_tid, options, sleep=time.sleep):
+        """Run garbage collection.
+
+        Requires the information provided by _pre_gc.
+        """
+
+        # Read committed mode is sufficient.
+        conn, cursor = self.open()
+        try:
+            try:
+                log.info("gc: running")
+                stmt = self._scripts['gc']
+                self._run_script(cursor, stmt)
+
+            except:
+                log.exception("gc: failed")
+                conn.rollback()
+                raise
+
+            else:
+                log.info("gc: finished successfully")
+                conn.commit()
+
+        finally:
+            self.close(conn, cursor)
+
+
+    def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
+        """Polls for new transactions.
+
+        conn and cursor must have been created previously by open_for_load().
+        prev_polled_tid is the tid returned at the last poll, or None
+        if this is the first poll.  If ignore_tid is not None, changes
+        committed in that transaction will not be included in the list
+        of changed OIDs.
+
+        Returns (changed_oids, new_polled_tid).
+        """
+        # find out the tid of the most recent transaction.
+        cursor.execute(self._poll_query)
+        new_polled_tid = cursor.fetchone()[0]
+
+        if prev_polled_tid is None:
+            # This is the first time the connection has polled.
+            return None, new_polled_tid
+
+        if new_polled_tid == prev_polled_tid:
+            # No transactions have been committed since prev_polled_tid.
+            return (), new_polled_tid
+
+        # Get the list of changed OIDs and return it.
+        if ignore_tid is None:
+            stmt = """
+            SELECT zoid
+            FROM object_state
+            WHERE tid > %(tid)s
+            """
+            cursor.execute(intern(stmt % self._script_vars),
+                {'tid': prev_polled_tid})
+        else:
+            stmt = """
+            SELECT zoid
+            FROM object_state
+            WHERE tid > %(tid)s
+                AND tid != %(self_tid)s
+            """
+            cursor.execute(intern(stmt % self._script_vars),
+                {'tid': prev_polled_tid, 'self_tid': ignore_tid})
+        oids = [oid for (oid,) in cursor]
+
+        return oids, new_polled_tid

Copied: relstorage/branches/packless/relstorage/adapters/packless/postgresql.py (from rev 99794, relstorage/trunk/relstorage/adapters/postgresql.py)
===================================================================
--- relstorage/branches/packless/relstorage/adapters/packless/postgresql.py	                        (rev 0)
+++ relstorage/branches/packless/relstorage/adapters/packless/postgresql.py	2009-05-07 09:46:43 UTC (rev 99796)
@@ -0,0 +1,503 @@
+##############################################################################
+#
+# Copyright (c) 2008-2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""PostgreSQL adapter for RelStorage."""
+
+from base64 import decodestring, encodestring
+import logging
+import psycopg2, psycopg2.extensions
+import re
+from ZODB.POSException import StorageError
+
+from relstorage.adapters.packless.common import PacklessAdapter
+
+log = logging.getLogger("relstorage.adapters.postgresql")
+
+# disconnected_exceptions contains the exception types that might be
+# raised when the connection to the database has been broken.
+disconnected_exceptions = (psycopg2.OperationalError, psycopg2.InterfaceError)
+
+
+class PacklessPostgreSQLAdapter(PacklessAdapter):
+    """Packless PostgreSQL adapter for RelStorage."""
+
+    def __init__(self, dsn=''):
+        self._dsn = dsn
+
+    def create_schema(self, cursor):
+        """Create the database tables."""
+        stmt = """
+        CREATE TABLE commit_lock ();
+
+        CREATE SEQUENCE zoid_seq;
+
+        -- All object states in all transactions.
+        CREATE TABLE object_state (
+            zoid        BIGINT NOT NULL PRIMARY KEY,
+            tid         BIGINT NOT NULL CHECK (tid > 0),
+            state       BYTEA
+        );
+        CREATE INDEX object_state_tid ON object_state (tid);
+
+        -- A list of referenced OIDs from each object_state.
+        -- This table is populated as needed during garbage collection.
+        CREATE TABLE object_ref (
+            zoid        BIGINT NOT NULL,
+            to_zoid     BIGINT NOT NULL,
+            PRIMARY KEY (zoid, to_zoid)
+        );
+
+        -- The object_refs_added table tracks whether object_refs has
+        -- been populated for all states in a given transaction.
+        -- An entry is added only when the work is finished.
+        CREATE TABLE object_refs_added (
+            tid         BIGINT NOT NULL PRIMARY KEY
+        );
+
+        -- Temporary state during garbage collection:
+        -- The list of all objects, a flag signifying whether
+        -- the object should be kept, and a flag signifying whether
+        -- the object's references have been visited.
+        CREATE TABLE gc_object (
+            zoid        BIGINT NOT NULL PRIMARY KEY,
+            keep        BOOLEAN NOT NULL DEFAULT FALSE,
+            visited     BOOLEAN NOT NULL DEFAULT FALSE
+        );
+        CREATE INDEX gc_object_keep_false ON gc_object (zoid)
+            WHERE keep = false;
+        CREATE INDEX gc_object_keep_true ON gc_object (visited)
+            WHERE keep = true;
+        """
+        cursor.execute(stmt)
+
+        if not self._pg_has_advisory_locks(cursor):
+            cursor.execute("CREATE TABLE gc_lock ()")
+
+
+    def prepare_schema(self):
+        """Create the database schema if it does not already exist."""
+        def callback(conn, cursor):
+            cursor.execute("""
+            SELECT tablename
+            FROM pg_tables
+            WHERE tablename = 'object_state'
+            """)
+            if not cursor.rowcount:
+                self.create_schema(cursor)
+        self._open_and_call(callback)
+
+    def zap_all(self):
+        """Clear all data out of the database."""
+        def callback(conn, cursor):
+            cursor.execute("""
+            DELETE FROM object_refs_added;
+            DELETE FROM object_ref;
+            DELETE FROM object_state;
+            ALTER SEQUENCE zoid_seq START WITH 1;
+            """)
+        self._open_and_call(callback)
+
+    def drop_all(self):
+        """Drop all tables and sequences."""
+        def callback(conn, cursor):
+            cursor.execute("SELECT tablename FROM pg_tables")
+            existent = set([name for (name,) in cursor])
+            for tablename in ('gc_object', 'object_refs_added',
+                    'object_ref', 'object_state', 'commit_lock', 'pack_lock'):
+                if tablename in existent:
+                    cursor.execute("DROP TABLE %s" % tablename)
+            cursor.execute("DROP SEQUENCE zoid_seq")
+        self._open_and_call(callback)
+
+    def open(self,
+            isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED):
+        """Open a database connection and return (conn, cursor)."""
+        try:
+            conn = psycopg2.connect(self._dsn)
+            conn.set_isolation_level(isolation)
+            cursor = conn.cursor()
+            cursor.arraysize = 64
+        except psycopg2.OperationalError, e:
+            log.warning("Unable to connect: %s", e)
+            raise
+        return conn, cursor
+
+    def close(self, conn, cursor):
+        """Close a connection and cursor, ignoring certain errors.
+        """
+        for obj in (cursor, conn):
+            if obj is not None:
+                try:
+                    obj.close()
+                except disconnected_exceptions:
+                    pass
+
+    def _pg_version(self, cursor):
+        """Return the (major, minor) version of PostgreSQL"""
+        cursor.execute("SELECT version()")
+        v = cursor.fetchone()[0]
+        m = re.search(r"([0-9]+)[.]([0-9]+)", v)
+        if m is None:
+            raise AssertionError("Unable to detect PostgreSQL version: " + v)
+        else:
+            return int(m.group(1)), int(m.group(2))
+
+    def _pg_has_advisory_locks(self, cursor):
+        """Return true if this version of PostgreSQL supports advisory locks"""
+        return self._pg_version(cursor) >= (8, 2)
+
+    def open_for_load(self):
+        """Open and initialize a connection for loading objects.
+
+        Returns (conn, cursor).
+        """
+        conn, cursor = self.open(
+            psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+        stmt = """
+        PREPARE get_latest_tid AS
+        SELECT tid
+        FROM object_state
+        ORDER BY tid DESC
+        LIMIT 1
+        """
+        cursor.execute(stmt)
+        return conn, cursor
+
+    def restart_load(self, cursor):
+        """Reinitialize a connection for loading objects."""
+        try:
+            cursor.connection.rollback()
+        except disconnected_exceptions, e:
+            raise StorageError(e)
+
+    def get_object_count(self):
+        """Returns the number of objects in the database"""
+        # do later
+        return 0
+
+    def get_db_size(self):
+        """Returns the approximate size of the database in bytes"""
+        def callback(conn, cursor):
+            cursor.execute("SELECT pg_database_size(current_database())")
+            return cursor.fetchone()[0]
+        return self._open_and_call(callback)
+
+    def get_current_tid(self, cursor, oid):
+        """Returns the current integer tid for an object.
+
+        oid is an integer.  Returns None if object does not exist.
+        """
+        cursor.execute("""
+        SELECT tid
+        FROM object_state
+        WHERE zoid = %s
+        """, (oid,))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()[0]
+        return None
+
+    def load_current(self, cursor, oid):
+        """Returns the current pickle and integer tid for an object.
+
+        oid is an integer.  Returns (None, None) if object does not exist.
+        """
+        cursor.execute("""
+        SELECT encode(state, 'base64'), tid
+        FROM object_state
+        WHERE zoid = %s
+        """, (oid,))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            state64, tid = cursor.fetchone()
+            if state64 is not None:
+                state = decodestring(state64)
+            else:
+                # This object's creation has been undone
+                state = None
+            return state, tid
+        else:
+            return None, None
+
+    def load_revision(self, cursor, oid, tid):
+        """Returns the pickle for an object on a particular transaction.
+
+        Returns None if no such state exists.
+        """
+        cursor.execute("""
+        SELECT encode(state, 'base64')
+        FROM object_state
+        WHERE zoid = %s
+            AND tid = %s
+        """, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            (state64,) = cursor.fetchone()
+            if state64 is not None:
+                return decodestring(state64)
+        return None
+
+    def exists(self, cursor, oid):
+        """Returns a true value if the given object exists."""
+        cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
+        return cursor.rowcount
+
+    def load_before(self, cursor, oid, tid):
+        """Returns the pickle and tid of an object before transaction tid.
+
+        Returns (None, None) if no earlier state exists.
+        """
+        cursor.execute("""
+        SELECT encode(state, 'base64'), tid
+        FROM object_state
+        WHERE zoid = %s
+            AND tid < %s
+        ORDER BY tid DESC
+        LIMIT 1
+        """, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            state64, tid = cursor.fetchone()
+            if state64 is not None:
+                state = decodestring(state64)
+            else:
+                # The object's creation has been undone
+                state = None
+            return state, tid
+        else:
+            return None, None
+
+    def get_object_tid_after(self, cursor, oid, tid):
+        """Returns the tid of the next change after an object revision.
+
+        Returns None if no later state exists.
+        """
+        stmt = """
+        SELECT tid
+        FROM object_state
+        WHERE zoid = %s
+            AND tid > %s
+        ORDER BY tid
+        LIMIT 1
+        """
+        cursor.execute(stmt, (oid, tid))
+        if cursor.rowcount:
+            assert cursor.rowcount == 1
+            return cursor.fetchone()[0]
+        else:
+            return None
+
+    def _make_temp_table(self, cursor):
+        """Create the temporary table for storing objects"""
+        stmt = """
+        CREATE TEMPORARY TABLE temp_store (
+            zoid        BIGINT NOT NULL,
+            prev_tid    BIGINT NOT NULL,
+            state       BYTEA
+        ) ON COMMIT DROP;
+        CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+        """
+        cursor.execute(stmt)
+
+    def open_for_store(self):
+        """Open and initialize a connection for storing objects.
+
+        Returns (conn, cursor).
+        """
+        conn, cursor = self.open()
+        try:
+            self._make_temp_table(cursor)
+            return conn, cursor
+        except:
+            self.close(conn, cursor)
+            raise
+
+    def restart_store(self, cursor):
+        """Reuse a store connection."""
+        try:
+            cursor.connection.rollback()
+            self._make_temp_table(cursor)
+        except disconnected_exceptions, e:
+            raise StorageError(e)
+
+    def store_temp(self, cursor, oid, prev_tid, data):
+        """Store an object in the temporary table."""
+        stmt = """
+        DELETE FROM temp_store WHERE zoid = %s;
+        INSERT INTO temp_store (zoid, prev_tid, state)
+        VALUES (%s, %s, decode(%s, 'base64'))
+        """
+        cursor.execute(stmt, (oid, oid, prev_tid, encodestring(data)))
+
+    def replace_temp(self, cursor, oid, prev_tid, data):
+        """Replace an object in the temporary table."""
+        stmt = """
+        UPDATE temp_store SET
+            prev_tid = %s,
+            state = decode(%s, 'base64')
+        WHERE zoid = %s
+        """
+        cursor.execute(stmt, (prev_tid, encodestring(data), oid))
+
+    def restore(self, cursor, oid, tid, data):
+        """Store an object directly, without conflict detection.
+
+        Used for copying transactions into this database.
+        """
+        stmt = """
+        DELETE FROM object_state WHERE zoid = %s;
+        INSERT INTO object_state (zoid, tid, state)
+        VALUES (%s, %s, decode(%s, 'base64'))
+        """
+        if data is not None:
+            data = encodestring(data)
+        cursor.execute(stmt, (oid, oid, tid, data))
+
+    def start_commit(self, cursor):
+        """Prepare to commit."""
+        # Hold commit_lock to prevent concurrent commits
+        # (for as short a time as possible).
+        # Lock object_state in share mode to ensure
+        # conflict detection has the most current data.
+        cursor.execute("""
+        LOCK TABLE commit_lock IN EXCLUSIVE MODE;
+        LOCK TABLE object_state IN SHARE MODE
+        """)
+
+    def get_tid_and_time(self, cursor):
+        """Returns the most recent tid and the current database time.
+
+        The database time is the number of seconds since the epoch.
+        """
+        cursor.execute("""
+        SELECT tid, EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
+        FROM object_state
+        ORDER BY tid DESC
+        LIMIT 1
+        """)
+        if cursor.rowcount:
+            return cursor.fetchone()
+        else:
+            return 0, 0
+
+    def add_transaction(self, cursor, tid, username, description, extension,
+            packed=False):
+        """Add a transaction."""
+        pass
+
+    def detect_conflict(self, cursor):
+        """Find one conflict in the data about to be committed.
+
+        If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+        attempted_data).  If there is no conflict, returns None.
+        """
+        stmt = """
+        SELECT temp_store.zoid, object_state.tid, temp_store.prev_tid,
+            encode(temp_store.state, 'base64')
+        FROM temp_store
+            JOIN object_state ON (temp_store.zoid = object_state.zoid)
+        WHERE temp_store.prev_tid != object_state.tid
+        LIMIT 1
+        """
+        cursor.execute(stmt)
+        if cursor.rowcount:
+            oid, prev_tid, attempted_prev_tid, data = cursor.fetchone()
+            return oid, prev_tid, attempted_prev_tid, decodestring(data)
+        return None
+
+    def move_from_temp(self, cursor, tid):
+        """Moved the temporarily stored objects to permanent storage.
+
+        Returns the list of oids stored.
+        """
+        stmt = """
+        DELETE FROM object_state
+        WHERE zoid IN (SELECT zoid FROM temp_store);
+
+        INSERT INTO object_state (zoid, tid, state)
+        SELECT zoid, %s, state
+        FROM temp_store
+        """
+        cursor.execute(stmt, (tid,))
+
+        stmt = """
+        SELECT zoid FROM temp_store
+        """
+        cursor.execute(stmt)
+        return [oid for (oid,) in cursor]
+
+    def update_current(self, cursor, tid):
+        """Update the current object pointers.
+
+        tid is the integer tid of the transaction being committed.
+        """
+        pass
+
+    def set_min_oid(self, cursor, oid):
+        """Ensure the next OID is at least the given OID."""
+        cursor.execute("""
+        SELECT CASE WHEN %s > nextval('zoid_seq')
+            THEN setval('zoid_seq', %s)
+            ELSE 0
+            END
+        """, (oid, oid))
+
+    def commit_phase1(self, cursor, tid):
+        """Begin a commit.  Returns the transaction name.
+
+        This method should guarantee that commit_phase2() will succeed,
+        meaning that if commit_phase2() would raise any error, the error
+        should be raised in commit_phase1() instead.
+        """
+        return '-'
+
+    def commit_phase2(self, cursor, txn):
+        """Final transaction commit."""
+        cursor.connection.commit()
+
+    def abort(self, cursor, txn=None):
+        """Abort the commit.  If txn is not None, phase 1 is also aborted."""
+        cursor.connection.rollback()
+
+    def new_oid(self, cursor):
+        """Return a new, unused OID."""
+        stmt = "SELECT NEXTVAL('zoid_seq')"
+        cursor.execute(stmt)
+        return cursor.fetchone()[0]
+
+    def hold_pack_lock(self, cursor):
+        """Try to acquire the garbage collection lock.
+
+        Raise an exception if gc is already in progress.
+        """
+        if self._pg_has_advisory_locks(cursor):
+            cursor.execute("SELECT pg_try_advisory_lock(1)")
+            locked = cursor.fetchone()[0]
+            if not locked:
+                raise StorageError(
+                    'A garbage collection operation is in progress')
+        else:
+            # b/w compat with PostgreSQL 8.1
+            try:
+                cursor.execute("LOCK gc_lock IN EXCLUSIVE MODE NOWAIT")
+            except psycopg2.DatabaseError:
+                raise StorageError(
+                    'A garbage collection operation is in progress')
+
+    def release_pack_lock(self, cursor):
+        """Release the garbage collection lock."""
+        if self._pg_has_advisory_locks(cursor):
+            cursor.execute("SELECT pg_advisory_unlock(1)")
+        # else no action needed since the lock will be released at txn commit
+
+    _poll_query = "EXECUTE get_latest_tid"

Modified: relstorage/branches/packless/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py	2009-05-06 18:37:42 UTC (rev 99794)
+++ relstorage/branches/packless/relstorage/adapters/postgresql.py	2009-05-07 09:46:43 UTC (rev 99796)
@@ -398,8 +398,9 @@
         except disconnected_exceptions, e:
             raise StorageError(e)
 
-    def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+    def store_temp(self, cursor, oid, prev_tid, data):
         """Store an object in the temporary table."""
+        md5sum = self.md5sum(data)
         stmt = """
         DELETE FROM temp_store WHERE zoid = %s;
         INSERT INTO temp_store (zoid, prev_tid, md5, state)
@@ -407,8 +408,9 @@
         """
         cursor.execute(stmt, (oid, oid, prev_tid, md5sum, encodestring(data)))
 
-    def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+    def replace_temp(self, cursor, oid, prev_tid, data):
         """Replace an object in the temporary table."""
+        md5sum = self.md5sum(data)
         stmt = """
         UPDATE temp_store SET
             prev_tid = %s,
@@ -418,11 +420,12 @@
         """
         cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
 
-    def restore(self, cursor, oid, tid, md5sum, data):
+    def restore(self, cursor, oid, tid, data):
         """Store an object directly, without conflict detection.
 
         Used for copying transactions into this database.
         """
+        md5sum = self.md5sum(data)
         stmt = """
         INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
         VALUES (%s, %s,

Modified: relstorage/branches/packless/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py	2009-05-06 18:37:42 UTC (rev 99794)
+++ relstorage/branches/packless/relstorage/relstorage.py	2009-05-07 09:46:43 UTC (rev 99796)
@@ -44,12 +44,6 @@
         """Stub for versions of ZODB that do not define IMVCCStorage.
         """
 
-try:
-    from hashlib import md5
-except ImportError:
-    from md5 import new as md5
-
-
 log = logging.getLogger("relstorage")
 
 # Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
@@ -382,15 +376,15 @@
 
         self._lock_acquire()
         try:
-            if self._store_cursor is not None:
+            if not self._load_transaction_open:
+                self._restart_load()
+            state = self._adapter.load_revision(
+                self._load_cursor, oid_int, tid_int)
+            if state is None and self._store_cursor is not None:
                 # Allow loading data from later transactions
                 # for conflict resolution.
-                cursor = self._store_cursor
-            else:
-                if not self._load_transaction_open:
-                    self._restart_load()
-                cursor = self._load_cursor
-            state = self._adapter.load_revision(cursor, oid_int, tid_int)
+                state = self._adapter.load_revision(
+                    self._store_cursor, oid_int, tid_int)
         finally:
             self._lock_release()
 
@@ -451,7 +445,6 @@
         # attempting to store objects after the vote phase has finished.
         # That should not happen, should it?
         assert self._prepared_txn is None
-        md5sum = md5(data).hexdigest()
 
         adapter = self._adapter
         cursor = self._store_cursor
@@ -466,7 +459,7 @@
         try:
             self._max_stored_oid = max(self._max_stored_oid, oid_int)
             # save the data in a temporary table
-            adapter.store_temp(cursor, oid_int, prev_tid_int, md5sum, data)
+            adapter.store_temp(cursor, oid_int, prev_tid_int, data)
             return None
         finally:
             self._lock_release()
@@ -485,11 +478,6 @@
 
         assert self._tid is not None
         assert self._prepared_txn is None
-        if data is not None:
-            md5sum = md5(data).hexdigest()
-        else:
-            # George Bailey object
-            md5sum = None
 
         adapter = self._adapter
         cursor = self._store_cursor
@@ -500,8 +488,8 @@
         self._lock_acquire()
         try:
             self._max_stored_oid = max(self._max_stored_oid, oid_int)
-            # save the data.  Note that md5sum and data can be None.
-            adapter.restore(cursor, oid_int, tid_int, md5sum, data)
+            # save the data.  Note that data can be None.
+            adapter.restore(cursor, oid_int, tid_int, data)
         finally:
             self._lock_release()
 
@@ -618,9 +606,8 @@
             else:
                 # resolved
                 data = rdata
-                md5sum = md5(data).hexdigest()
                 self._adapter.replace_temp(
-                    cursor, oid_int, prev_tid_int, md5sum, data)
+                    cursor, oid_int, prev_tid_int, data)
                 resolved.add(oid)
 
         # Move the new states into the permanent table
@@ -910,6 +897,7 @@
         finally:
             lock_conn.rollback()
             adapter.close(lock_conn, lock_cursor)
+        self.sync()
 
 
     def iterator(self, start=None, stop=None):

Modified: relstorage/branches/packless/relstorage/tests/alltests.py
===================================================================
--- relstorage/trunk/relstorage/tests/alltests.py	2009-05-06 18:37:42 UTC (rev 99794)
+++ relstorage/branches/packless/relstorage/tests/alltests.py	2009-05-07 09:46:43 UTC (rev 99796)
@@ -18,10 +18,12 @@
 from testpostgresql import test_suite as postgresql_test_suite
 from testmysql import test_suite as mysql_test_suite
 from testoracle import test_suite as oracle_test_suite
+from testpacklesspostgresql import test_suite as packless_postgresql_test_suite
 
 def make_suite():
     suite = unittest.TestSuite()
     suite.addTest(postgresql_test_suite())
     suite.addTest(mysql_test_suite())
     suite.addTest(oracle_test_suite())
+    suite.addTest(packless_postgresql_test_suite())
     return suite

Modified: relstorage/branches/packless/relstorage/tests/comparison.ods
===================================================================
(Binary files differ)

Added: relstorage/branches/packless/relstorage/tests/packlesstestbase.py
===================================================================
--- relstorage/branches/packless/relstorage/tests/packlesstestbase.py	                        (rev 0)
+++ relstorage/branches/packless/relstorage/tests/packlesstestbase.py	2009-05-07 09:46:43 UTC (rev 99796)
@@ -0,0 +1,246 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""A foundation for packless relstorage adapter tests"""
+
+import cPickle
+import time
+
+from ZODB.tests.ConflictResolution import PCounter
+from ZODB.tests.PackableStorage import Root, dumps, pdumps, ZERO
+from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
+from ZODB.serialize import referencesf
+
+from relstorage.tests.reltestbase import GenericRelStorageTests
+from relstorage.tests.reltestbase import ToFileStorage
+from relstorage.tests.reltestbase import FromFileStorage
+
+
+class PacklessRelStorageTests(GenericRelStorageTests):
+    # override certain tests with a version that expects
+    # garbage collection, but not packing.
+    def checkPackAllRevisions(self):
+        self._initroot()
+        eq = self.assertEqual
+        raises = self.assertRaises
+        # Create a `persistent' object
+        obj = self._newobj()
+        oid = obj.getoid()
+        obj.value = 1
+        # Commit three different revisions
+        revid1 = self._dostoreNP(oid, data=pdumps(obj))
+        obj.value = 2
+        revid2 = self._dostoreNP(oid, revid=revid1, data=pdumps(obj))
+        obj.value = 3
+        revid3 = self._dostoreNP(oid, revid=revid2, data=pdumps(obj))
+        # Now make sure only the latest revision can be extracted
+        raises(KeyError, self._storage.loadSerial, oid, revid1)
+        raises(KeyError, self._storage.loadSerial, oid, revid2)
+        data = self._storage.loadSerial(oid, revid3)
+        pobj = cPickle.loads(data)
+        eq(pobj.getoid(), oid)
+        eq(pobj.value, 3)
+        # Now pack all transactions; need to sleep a second to make
+        # sure that the pack time is greater than the last commit time.
+        now = packtime = time.time()
+        while packtime <= now:
+            packtime = time.time()
+        self._storage.pack(packtime, referencesf)
+        self._storage.sync()
+        # All revisions of the object should be gone, since there is no
+        # reference from the root object to this object.
+        raises(KeyError, self._storage.loadSerial, oid, revid1)
+        raises(KeyError, self._storage.loadSerial, oid, revid2)
+        raises(KeyError, self._storage.loadSerial, oid, revid3)
+        raises(KeyError, self._storage.load, oid, '')
+
+    def checkPackJustOldRevisions(self):
+        eq = self.assertEqual
+        raises = self.assertRaises
+        loads = self._makeloader()
+        # Create a root object.  This can't be an instance of Object,
+        # otherwise the pickling machinery will serialize it as a persistent
+        # id and not as an object that contains references (persistent ids) to
+        # other objects.
+        root = Root()
+        # Create a persistent object, with some initial state
+        obj = self._newobj()
+        oid = obj.getoid()
+        # Link the root object to the persistent object, in order to keep the
+        # persistent object alive.  Store the root object.
+        root.obj = obj
+        root.value = 0
+        revid0 = self._dostoreNP(ZERO, data=dumps(root))
+        # Make sure the root can be retrieved
+        data, revid = self._storage.load(ZERO, '')
+        eq(revid, revid0)
+        eq(loads(data).value, 0)
+        # Commit three different revisions of the other object
+        obj.value = 1
+        revid1 = self._dostoreNP(oid, data=pdumps(obj))
+        obj.value = 2
+        revid2 = self._dostoreNP(oid, revid=revid1, data=pdumps(obj))
+        obj.value = 3
+        revid3 = self._dostoreNP(oid, revid=revid2, data=pdumps(obj))
+        # Now make sure only the latest revision can be extracted
+        raises(KeyError, self._storage.loadSerial, oid, revid1)
+        raises(KeyError, self._storage.loadSerial, oid, revid2)
+        data = self._storage.loadSerial(oid, revid3)
+        pobj = cPickle.loads(data)
+        eq(pobj.getoid(), oid)
+        eq(pobj.value, 3)
+        # Now pack.  The object should stay alive because it's pointed
+        # to by the root.
+        now = packtime = time.time()
+        while packtime <= now:
+            packtime = time.time()
+        self._storage.pack(packtime, referencesf)
+        # Make sure the revisions are gone, but that object zero and revision
+        # 3 are still there and correct
+        data, revid = self._storage.load(ZERO, '')
+        eq(revid, revid0)
+        eq(loads(data).value, 0)
+        raises(KeyError, self._storage.loadSerial, oid, revid1)
+        raises(KeyError, self._storage.loadSerial, oid, revid2)
+        data = self._storage.loadSerial(oid, revid3)
+        pobj = cPickle.loads(data)
+        eq(pobj.getoid(), oid)
+        eq(pobj.value, 3)
+        data, revid = self._storage.load(oid, '')
+        eq(revid, revid3)
+        pobj = cPickle.loads(data)
+        eq(pobj.getoid(), oid)
+        eq(pobj.value, 3)
+
+    def checkPackOnlyOneObject(self):
+        eq = self.assertEqual
+        raises = self.assertRaises
+        loads = self._makeloader()
+        # Create a root object.  This can't be an instance of Object,
+        # otherwise the pickling machinery will serialize it as a persistent
+        # id and not as an object that contains references (persistent ids) to
+        # other objects.
+        root = Root()
+        # Create a persistent object, with some initial state
+        obj1 = self._newobj()
+        oid1 = obj1.getoid()
+        # Create another persistent object, with some initial state.
+        obj2 = self._newobj()
+        oid2 = obj2.getoid()
+        # Link the root object to the persistent objects, in order to keep
+        # them alive.  Store the root object.
+        root.obj1 = obj1
+        root.obj2 = obj2
+        root.value = 0
+        revid0 = self._dostoreNP(ZERO, data=dumps(root))
+        # Make sure the root can be retrieved
+        data, revid = self._storage.load(ZERO, '')
+        eq(revid, revid0)
+        eq(loads(data).value, 0)
+        # Commit three different revisions of the first object
+        obj1.value = 1
+        revid1 = self._dostoreNP(oid1, data=pdumps(obj1))
+        obj1.value = 2
+        revid2 = self._dostoreNP(oid1, revid=revid1, data=pdumps(obj1))
+        obj1.value = 3
+        revid3 = self._dostoreNP(oid1, revid=revid2, data=pdumps(obj1))
+        # Now make sure only the latest revision can be extracted
+        raises(KeyError, self._storage.loadSerial, oid1, revid1)
+        raises(KeyError, self._storage.loadSerial, oid1, revid2)
+        data = self._storage.loadSerial(oid1, revid3)
+        pobj = cPickle.loads(data)
+        eq(pobj.getoid(), oid1)
+        eq(pobj.value, 3)
+        # Now commit a revision of the second object
+        obj2.value = 11
+        revid4 = self._dostoreNP(oid2, data=pdumps(obj2))
+        # And make sure the revision can be extracted
+        data = self._storage.loadSerial(oid2, revid4)
+        pobj = cPickle.loads(data)
+        eq(pobj.getoid(), oid2)
+        eq(pobj.value, 11)
+        # Now pack just revisions 1 and 2 of object1.  Object1's current
+        # revision should stay alive because it's pointed to by the root, as
+        # should Object2's current revision.
+        now = packtime = time.time()
+        while packtime <= now:
+            packtime = time.time()
+        self._storage.pack(packtime, referencesf)
+        # Make sure the revisions are gone, but that object zero, object2, and
+        # revision 3 of object1 are still there and correct.
+        data, revid = self._storage.load(ZERO, '')
+        eq(revid, revid0)
+        eq(loads(data).value, 0)
+        raises(KeyError, self._storage.loadSerial, oid1, revid1)
+        raises(KeyError, self._storage.loadSerial, oid1, revid2)
+        data = self._storage.loadSerial(oid1, revid3)
+        pobj = cPickle.loads(data)
+        eq(pobj.getoid(), oid1)
+        eq(pobj.value, 3)
+        data, revid = self._storage.load(oid1, '')
+        eq(revid, revid3)
+        pobj = cPickle.loads(data)
+        eq(pobj.getoid(), oid1)
+        eq(pobj.value, 3)
+        data, revid = self._storage.load(oid2, '')
+        eq(revid, revid4)
+        eq(loads(data).value, 11)
+        data = self._storage.loadSerial(oid2, revid4)
+        pobj = cPickle.loads(data)
+        eq(pobj.getoid(), oid2)
+        eq(pobj.value, 11)
+
+    def checkResolve(self):
+        obj = PCounter()
+        obj.inc()
+
+        oid = self._storage.new_oid()
+
+        revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))
+
+        obj.inc()
+        obj.inc()
+
+        # The effect of committing two transactions with the same
+        # pickle is to commit two different transactions relative to
+        # revid1 that add two to _value.
+
+        # open s1
+        s1 = self._storage.new_instance()
+        # start a load transaction in s1
+        s1.poll_invalidations()
+
+        # commit a change
+        revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
+
+        # commit a conflicting change using s1
+        main_storage = self._storage
+        self._storage = s1
+        try:
+            # we can resolve this conflict because s1 has an open
+            # transaction that can read the old state of the object.
+            revid3 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
+            s1.release()
+        finally:
+            self._storage = main_storage
+
+        data, serialno = self._storage.load(oid, '')
+        inst = zodb_unpickle(data)
+        self.assertEqual(inst._value, 5)
+
+
+class PacklessToFileStorage(ToFileStorage):
+    pass
+
+class PacklessFromFileStorage(FromFileStorage):
+    pass

Modified: relstorage/branches/packless/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2009-05-06 18:37:42 UTC (rev 99794)
+++ relstorage/branches/packless/relstorage/tests/reltestbase.py	2009-05-07 09:46:43 UTC (rev 99796)
@@ -36,7 +36,7 @@
 from ZODB.serialize import referencesf
 
 
-class BaseRelStorageTests(StorageTestBase.StorageTestBase):
+class RelStorageTestBase(StorageTestBase.StorageTestBase):
 
     def make_adapter(self):
         # abstract method
@@ -56,21 +56,15 @@
         self._storage.cleanup()
 
 
-class RelStorageTests(
-    BaseRelStorageTests,
+class GenericRelStorageTests(
+    RelStorageTestBase,
     BasicStorage.BasicStorage,
-    TransactionalUndoStorage.TransactionalUndoStorage,
-    RevisionStorage.RevisionStorage,
     PackableStorage.PackableStorage,
-    PackableStorage.PackableUndoStorage,
     Synchronization.SynchronizedStorage,
     ConflictResolution.ConflictResolvingStorage,
-    HistoryStorage.HistoryStorage,
-    IteratorStorage.IteratorStorage,
-    IteratorStorage.ExtendedIteratorStorage,
     PersistentStorage.PersistentStorage,
     MTStorage.MTStorage,
-    ReadOnlyStorage.ReadOnlyStorage
+    ReadOnlyStorage.ReadOnlyStorage,
     ):
 
     def checkDropAndPrepare(self):
@@ -360,7 +354,36 @@
         fakecache.data.clear()
         self.checkPollInterval(using_cache=True)
 
+    def checkDoubleCommitter(self):
+        # Verify we can store an object that gets committed twice in
+        # a single transaction.
+        db = DB(self._storage)
+        try:
+            conn = db.open()
+            try:
+                conn.root()['dc'] = DoubleCommitter()
+                transaction.commit()
+                conn2 = db.open()
+                self.assertEquals(conn2.root()['dc'].new_attribute, 1)
+                conn2.close()
+            finally:
+                transaction.abort()
+                conn.close()
+        finally:
+            db.close()
 
+
+class RelStorageTests(
+    GenericRelStorageTests,
+    TransactionalUndoStorage.TransactionalUndoStorage,
+    IteratorStorage.IteratorStorage,
+    IteratorStorage.ExtendedIteratorStorage,
+    RevisionStorage.RevisionStorage,
+    PackableStorage.PackableUndoStorage,
+    HistoryStorage.HistoryStorage,
+    ConflictResolution.ConflictResolvingTransUndoStorage,
+    ):
+
     def checkTransactionalUndoIterator(self):
         # this test overrides the broken version in TransactionalUndoStorage.
 
@@ -567,24 +590,6 @@
         finally:
             db.close()
 
-    def checkDoubleCommitter(self):
-        # Verify we can store an object that gets committed twice in
-        # a single transaction.
-        db = DB(self._storage)
-        try:
-            conn = db.open()
-            try:
-                conn.root()['dc'] = DoubleCommitter()
-                transaction.commit()
-                conn2 = db.open()
-                self.assertEquals(conn2.root()['dc'].new_attribute, 1)
-                conn2.close()
-            finally:
-                transaction.abort()
-                conn.close()
-        finally:
-            db.close()
-        
 
 class DoubleCommitter(Persistent):
     """A crazy persistent class that changes self in __getstate__"""
@@ -647,7 +652,7 @@
         setattr(RecoveryStorageSubset, name, attr)
 
 
-class ToFileStorage(BaseRelStorageTests, RecoveryStorageSubset):
+class ToFileStorage(RelStorageTestBase, RecoveryStorageSubset):
     def setUp(self):
         self.open(create=1)
         self._storage.zap_all()
@@ -663,7 +668,7 @@
         return FileStorage('Dest.fs')
 
 
-class FromFileStorage(BaseRelStorageTests, RecoveryStorageSubset):
+class FromFileStorage(RelStorageTestBase, RecoveryStorageSubset):
     def setUp(self):
         self.open(create=1)
         self._storage.zap_all()

Added: relstorage/branches/packless/relstorage/tests/testpacklesspostgresql.py
===================================================================
--- relstorage/branches/packless/relstorage/tests/testpacklesspostgresql.py	                        (rev 0)
+++ relstorage/branches/packless/relstorage/tests/testpacklesspostgresql.py	2009-05-07 09:46:43 UTC (rev 99796)
@@ -0,0 +1,52 @@
+##############################################################################
+#
+# Copyright (c) 2008-2009 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Tests of relstorage.adapters.packless.postgresql"""
+
+import logging
+import unittest
+
+from relstorage.tests import reltestbase
+from relstorage.tests import packlesstestbase
+from relstorage.adapters.packless.postgresql import PacklessPostgreSQLAdapter
+
+
+class UsePacklessPostgreSQLAdapter:
+    def make_adapter(self):
+        return PacklessPostgreSQLAdapter(
+        'dbname=relstoragetestp user=relstoragetest password=relstoragetest')
+
+class PacklessPostgreSQLTests(
+        UsePacklessPostgreSQLAdapter,
+        packlesstestbase.PacklessRelStorageTests):
+    pass
+
+class PacklessPGToFile(
+        UsePacklessPostgreSQLAdapter, reltestbase.ToFileStorage):
+    pass
+
+class PacklessFileToPG(
+        UsePacklessPostgreSQLAdapter, reltestbase.FromFileStorage):
+    pass
+
+
+def test_suite():
+    suite = unittest.TestSuite()
+    for klass in [PacklessPostgreSQLTests, PacklessPGToFile, PacklessFileToPG]:
+        suite.addTest(unittest.makeSuite(klass, "check"))
+    return suite
+
+if __name__=='__main__':
+    logging.basicConfig()
+    unittest.main(defaultTest="test_suite")
+



More information about the Checkins mailing list