[Checkins] SVN: relstorage/trunk/ - Fixed a bug in packing history-free databases. If changes were

Shane Hathaway shane at hathawaymix.org
Fri Jan 28 21:49:42 EST 2011


Log message for revision 119996:
  - Fixed a bug in packing history-free databases.  If changes were
    made to the database during the pack, the pack code could delete
    too many objects.  Thanks to Chris Withers for writing test code
    that revealed the bug.
  

Changed:
  U   relstorage/trunk/CHANGES.txt
  U   relstorage/trunk/relstorage/adapters/packundo.py
  U   relstorage/trunk/relstorage/adapters/schema.py
  U   relstorage/trunk/relstorage/tests/reltestbase.py

-=-
Modified: relstorage/trunk/CHANGES.txt
===================================================================
--- relstorage/trunk/CHANGES.txt	2011-01-29 01:13:49 UTC (rev 119995)
+++ relstorage/trunk/CHANGES.txt	2011-01-29 02:49:42 UTC (rev 119996)
@@ -2,6 +2,11 @@
 Next Release
 ------------
 
+- Fixed a bug in packing history-free databases.  If changes were
+  made to the database during the pack, the pack code could delete
+  too many objects.  Thanks to Chris Withers for writing test code
+  that revealed the bug.
+
 - Added more logging during zodbconvert to show that something is
   happening and give an indication of how far along the process is.
 

Modified: relstorage/trunk/relstorage/adapters/packundo.py
===================================================================
--- relstorage/trunk/relstorage/adapters/packundo.py	2011-01-29 01:13:49 UTC (rev 119995)
+++ relstorage/trunk/relstorage/adapters/packundo.py	2011-01-29 02:49:42 UTC (rev 119996)
@@ -50,103 +50,6 @@
         finally:
             self.connmanager.close(conn, cursor)
 
-    def on_filling_object_refs(self):
-        """Test injection point"""
-
-    def fill_object_refs(self, conn, cursor, get_references):
-        """Update the object_refs table by analyzing new transactions."""
-        if self.keep_history:
-            stmt = """
-            SELECT transaction.tid
-            FROM transaction
-                LEFT JOIN object_refs_added
-                    ON (transaction.tid = object_refs_added.tid)
-            WHERE object_refs_added.tid IS NULL
-            ORDER BY transaction.tid
-            """
-        else:
-            stmt = """
-            SELECT transaction.tid
-            FROM (SELECT DISTINCT tid FROM object_state) transaction
-                LEFT JOIN object_refs_added
-                    ON (transaction.tid = object_refs_added.tid)
-            WHERE object_refs_added.tid IS NULL
-            ORDER BY transaction.tid
-            """
-
-        self.runner.run_script_stmt(cursor, stmt)
-        tids = [tid for (tid,) in cursor]
-        if tids:
-            self.on_filling_object_refs()
-            added = 0
-            log.info("discovering references from objects in %d "
-                "transaction(s)" % len(tids))
-            for tid in tids:
-                added += self._add_refs_for_tid(cursor, tid, get_references)
-                if added >= 10000:
-                    # save the work done so far
-                    conn.commit()
-                    added = 0
-            if added:
-                conn.commit()
-
-    def _add_refs_for_tid(self, cursor, tid, get_references):
-        """Fill object_refs with all states for a transaction.
-
-        Returns the number of references added.
-        """
-        log.debug("pre_pack: transaction %d: computing references ", tid)
-        from_count = 0
-
-        stmt = """
-        SELECT zoid, state
-        FROM object_state
-        WHERE tid = %(tid)s
-        """
-        self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
-
-        replace_rows = []
-        add_rows = []  # [(from_oid, tid, to_oid)]
-        for from_oid, state in cursor:
-            replace_rows.append((from_oid,))
-            if hasattr(state, 'read'):
-                # Oracle
-                state = state.read()
-            if state:
-                from_count += 1
-                try:
-                    to_oids = get_references(str(state))
-                except:
-                    log.error("pre_pack: can't unpickle "
-                        "object %d in transaction %d; state length = %d" % (
-                        from_oid, tid, len(state)))
-                    raise
-                for to_oid in to_oids:
-                    add_rows.append((from_oid, tid, to_oid))
-
-        if not self.keep_history:
-            stmt = "DELETE FROM object_ref WHERE zoid = %s"
-            self.runner.run_many(cursor, stmt, replace_rows)
-
-        stmt = """
-        INSERT INTO object_ref (zoid, tid, to_zoid)
-        VALUES (%s, %s, %s)
-        """
-        self.runner.run_many(cursor, stmt, add_rows)
-
-        # The references have been computed for this transaction.
-        stmt = """
-        INSERT INTO object_refs_added (tid)
-        VALUES (%(tid)s)
-        """
-        self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
-
-        to_count = len(add_rows)
-        log.debug("pre_pack: transaction %d: has %d reference(s) "
-            "from %d object(s)", tid, to_count, from_count)
-        return to_count
-
-
     def _visit_all_references(self, cursor):
         """Visit all references in pack_object and set the keep flags.
         """
@@ -428,6 +331,91 @@
 
         return res
 
+    def on_filling_object_refs(self):
+        """Test injection point"""
+
+    def fill_object_refs(self, conn, cursor, get_references):
+        """Update the object_refs table by analyzing new transactions."""
+        stmt = """
+        SELECT transaction.tid
+        FROM transaction
+            LEFT JOIN object_refs_added
+                ON (transaction.tid = object_refs_added.tid)
+        WHERE object_refs_added.tid IS NULL
+        ORDER BY transaction.tid
+        """
+        self.runner.run_script_stmt(cursor, stmt)
+        tids = [tid for (tid,) in cursor]
+        if tids:
+            self.on_filling_object_refs()
+            added = 0
+            log.info("discovering references from objects in %d "
+                "transaction(s)" % len(tids))
+            for tid in tids:
+                added += self._add_refs_for_tid(cursor, tid, get_references)
+                if added >= 10000:
+                    # save the work done so far
+                    conn.commit()
+                    added = 0
+            if added:
+                conn.commit()
+
+    def _add_refs_for_tid(self, cursor, tid, get_references):
+        """Fill object_refs with all states for a transaction.
+
+        Returns the number of references added.
+        """
+        log.debug("pre_pack: transaction %d: computing references ", tid)
+        from_count = 0
+
+        stmt = """
+        SELECT zoid, state
+        FROM object_state
+        WHERE tid = %(tid)s
+        """
+        self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+
+        replace_rows = []
+        add_rows = []  # [(from_oid, tid, to_oid)]
+        for from_oid, state in cursor:
+            replace_rows.append((from_oid,))
+            if hasattr(state, 'read'):
+                # Oracle
+                state = state.read()
+            if state:
+                from_count += 1
+                try:
+                    to_oids = get_references(str(state))
+                except:
+                    log.error("pre_pack: can't unpickle "
+                        "object %d in transaction %d; state length = %d" % (
+                        from_oid, tid, len(state)))
+                    raise
+                for to_oid in to_oids:
+                    add_rows.append((from_oid, tid, to_oid))
+
+        if not self.keep_history:
+            stmt = "DELETE FROM object_ref WHERE zoid = %s"
+            self.runner.run_many(cursor, stmt, replace_rows)
+
+        stmt = """
+        INSERT INTO object_ref (zoid, tid, to_zoid)
+        VALUES (%s, %s, %s)
+        """
+        self.runner.run_many(cursor, stmt, add_rows)
+
+        # The references have been computed for this transaction.
+        stmt = """
+        INSERT INTO object_refs_added (tid)
+        VALUES (%(tid)s)
+        """
+        self.runner.run_script_stmt(cursor, stmt, {'tid': tid})
+
+        to_count = len(add_rows)
+        log.debug("pre_pack: transaction %d: has %d reference(s) "
+            "from %d object(s)", tid, to_count, from_count)
+        return to_count
+
     def pre_pack(self, pack_tid, get_references, options):
         """Decide what to pack.
 
@@ -881,6 +869,88 @@
         """
         raise UndoError("Undo is not supported by this storage")
 
+    def on_filling_object_refs(self):
+        """Test injection point"""
+
+    def fill_object_refs(self, conn, cursor, get_references):
+        """Update the object_refs table by analyzing new object states."""
+        stmt = """
+        SELECT object_state.zoid FROM object_state
+            LEFT JOIN object_refs_added USING (zoid)
+        WHERE object_refs_added.tid IS NULL
+            OR object_refs_added.tid != object_state.tid
+        ORDER BY object_state.zoid
+        """
+        self.runner.run_script_stmt(cursor, stmt)
+        oids = [oid for (oid,) in cursor]
+        if oids:
+            self.on_filling_object_refs()
+            added = 0
+            log.info("discovering references from %d objects", len(oids))
+            while oids:
+                batch = oids[:100]
+                oids = batch[100:]
+                added += self._add_refs_for_oids(cursor, batch, get_references)
+                if added >= 10000:
+                    # save the work done so far
+                    conn.commit()
+                    added = 0
+            if added:
+                conn.commit()
+
+    def _add_refs_for_oids(self, cursor, oids, get_references):
+        """Fill object_refs with the states for some objects.
+
+        Returns the number of references added.
+        """
+        to_count = 0
+        oid_list = ','.join(str(oid) for oid in oids)
+
+        stmt = """
+        SELECT zoid, tid, state
+        FROM object_state
+        WHERE zoid IN (%s)
+        """ % oid_list
+        self.runner.run_script_stmt(cursor, stmt)
+        rows = list(cursor)
+        if not rows:
+            return 0
+
+        add_objects = []
+        add_refs = []
+        for from_oid, tid, state in rows:
+            if hasattr(state, 'read'):
+                # Oracle
+                state = state.read()
+            add_objects.append((from_oid, tid))
+            if state:
+                try:
+                    to_oids = get_references(str(state))
+                except:
+                    log.error("pre_pack: can't unpickle "
+                        "object %d in transaction %d; state length = %d" % (
+                        from_oid, tid, len(state)))
+                    raise
+                for to_oid in to_oids:
+                    add_refs.append((from_oid, tid, to_oid))
+
+        stmt = "DELETE FROM object_refs_added WHERE zoid IN (%s)" % oid_list
+        self.runner.run_script_stmt(cursor, stmt)
+        stmt = "DELETE FROM object_ref WHERE zoid IN (%s)" % oid_list
+        self.runner.run_script_stmt(cursor, stmt)
+
+        stmt = """
+        INSERT INTO object_ref (zoid, tid, to_zoid) VALUES (%s, %s, %s)
+        """
+        self.runner.run_many(cursor, stmt, add_refs)
+
+        stmt = """
+        INSERT INTO object_refs_added (zoid, tid) VALUES (%s, %s)
+        """
+        self.runner.run_many(cursor, stmt, add_objects)
+
+        return len(add_refs)
+
     def pre_pack(self, pack_tid, get_references, options):
         """Decide what the garbage collector should delete.
 
@@ -1022,9 +1092,10 @@
 
         stmt = """
         DELETE FROM object_refs_added
-        WHERE tid NOT IN (
-            SELECT DISTINCT tid
-            FROM object_state
+        WHERE zoid IN (
+            SELECT zoid
+            FROM pack_object
+            WHERE keep = %(FALSE)s
         );
 
         DELETE FROM object_ref

Modified: relstorage/trunk/relstorage/adapters/schema.py
===================================================================
--- relstorage/trunk/relstorage/adapters/schema.py	2011-01-29 01:13:49 UTC (rev 119995)
+++ relstorage/trunk/relstorage/adapters/schema.py	2011-01-29 02:49:42 UTC (rev 119996)
@@ -567,23 +567,25 @@
             PRIMARY KEY (zoid, to_zoid)
         );
 
-# The object_refs_added table tracks whether object_refs has been
-# populated for all states in a given transaction. An entry is added
-# only when the work is finished.
+# The object_refs_added table tracks which state of each object
+# has been analyzed for references to other objects.
 
     postgresql:
         CREATE TABLE object_refs_added (
-            tid         BIGINT NOT NULL PRIMARY KEY
+            zoid        BIGINT NOT NULL PRIMARY KEY,
+            tid         BIGINT NOT NULL
         );
 
     mysql:
         CREATE TABLE object_refs_added (
-            tid         BIGINT NOT NULL PRIMARY KEY
+            zoid        BIGINT NOT NULL PRIMARY KEY,
+            tid         BIGINT NOT NULL
         ) ENGINE = MyISAM;
 
     oracle:
         CREATE TABLE object_refs_added (
-            tid         NUMBER(20) NOT NULL PRIMARY KEY
+            zoid        NUMBER(20) NOT NULL PRIMARY KEY,
+            tid         NUMBER(20) NOT NULL
         );
 
 # pack_object contains temporary state during garbage collection: The

Modified: relstorage/trunk/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py	2011-01-29 01:13:49 UTC (rev 119995)
+++ relstorage/trunk/relstorage/tests/reltestbase.py	2011-01-29 02:49:42 UTC (rev 119996)
@@ -558,6 +558,8 @@
             expect_oids = [child._p_oid]
 
             def inject_changes():
+                # Change the database just after the list of objects
+                # to analyze has been determined.
                 child2 = PersistentMapping()
                 root['child2'] = child2
                 transaction.commit()
@@ -569,7 +571,7 @@
             self._storage.pack(packtime, referencesf)
 
             self.assertEqual(len(expect_oids), 2,
-                "The on_filling_object_refs hook was never called")
+                "The on_filling_object_refs hook should have been called once")
             # Both children should still exist.
             self._storage.load(expect_oids[0], '')
             self._storage.load(expect_oids[1], '')



More information about the checkins mailing list