[Checkins] SVN: relstorage/tags/ Tagged 1.0-beta1
Shane Hathaway
shane at hathawaymix.org
Fri Feb 22 04:40:07 EST 2008
Log message for revision 84133:
Tagged 1.0-beta1
Changed:
A relstorage/tags/
A relstorage/tags/1.0-beta1/
D relstorage/tags/1.0-beta1/CHANGELOG.txt
A relstorage/tags/1.0-beta1/CHANGELOG.txt
A relstorage/tags/1.0-beta1/MANIFEST
A relstorage/tags/1.0-beta1/MANIFEST.in
D relstorage/tags/1.0-beta1/README.txt
A relstorage/tags/1.0-beta1/README.txt
D relstorage/tags/1.0-beta1/notes/oracle_notes.txt
A relstorage/tags/1.0-beta1/notes/oracle_notes.txt
D relstorage/tags/1.0-beta1/relstorage/adapters/common.py
A relstorage/tags/1.0-beta1/relstorage/adapters/common.py
D relstorage/tags/1.0-beta1/relstorage/adapters/mysql.py
A relstorage/tags/1.0-beta1/relstorage/adapters/mysql.py
D relstorage/tags/1.0-beta1/relstorage/adapters/oracle.py
A relstorage/tags/1.0-beta1/relstorage/adapters/oracle.py
D relstorage/tags/1.0-beta1/relstorage/adapters/postgresql.py
A relstorage/tags/1.0-beta1/relstorage/adapters/postgresql.py
D relstorage/tags/1.0-beta1/relstorage/component.xml
A relstorage/tags/1.0-beta1/relstorage/component.xml
D relstorage/tags/1.0-beta1/relstorage/config.py
A relstorage/tags/1.0-beta1/relstorage/config.py
D relstorage/tags/1.0-beta1/relstorage/relstorage.py
A relstorage/tags/1.0-beta1/relstorage/relstorage.py
D relstorage/tags/1.0-beta1/relstorage/tests/reltestbase.py
A relstorage/tags/1.0-beta1/relstorage/tests/reltestbase.py
D relstorage/tags/1.0-beta1/relstorage/tests/speedtest.py
A relstorage/tags/1.0-beta1/relstorage/tests/speedtest.py
A relstorage/tags/1.0-beta1/setup.py
-=-
Copied: relstorage/tags/1.0-beta1 (from rev 84068, relstorage/trunk)
Deleted: relstorage/tags/1.0-beta1/CHANGELOG.txt
===================================================================
--- relstorage/trunk/CHANGELOG.txt 2008-02-20 03:08:20 UTC (rev 84068)
+++ relstorage/tags/1.0-beta1/CHANGELOG.txt 2008-02-22 09:40:07 UTC (rev 84133)
@@ -1,105 +0,0 @@
-
-relstorage 0.9
-
-- Renamed to reflect expanding database support.
-
-- Support for Oracle added.
-
-- Major overhaul with many scalability and reliability improvements,
- particularly in the area of packing.
-
-- Moved to svn.zope.org and switched to ZPL 2.1 (required for projects
- on svn.zope.org.)
-
-- Made two-phase commit optional in both Oracle and PostgreSQL. They
- both use commit_lock in such a way that the commit is not likely to
- fail in the second phase.
-
-- Switched most database transaction isolation levels from serializable
- to read committed. It turns out that commit_lock already provides
- the serializability guarantees we need, so it is safe to take advantage
- of the potential speed gains. The one major exception is the load
- connection, which requires an unchanging view of the database.
-
-- Stored objects are now buffered in a database table rather than a file.
-
-- Stopped using the LISTEN and NOTIFY statements in PostgreSQL since
- they are not strictly transactional in the sense we require.
-
-- Started using a prepared statement in PostgreSQL for getting the
- newest transaction ID quickly.
-
-- Removed the code in the Oracle adapter for retrying connection attempts.
- (It is better to just reconfigure Oracle.)
-
-- Added support for MySQL. (Version 5.0 is probably the minimum.)
-
-- Added the poll_interval option. It reduces the frequency of database
- polls, but it also increases the potential for conflict errors on
- servers with high write volume.
-
-- Implemented the storage iterator protocol, making it possible to copy
- transactions to and from FileStorage and other RelStorage instances.
-
-
-PGStorage 0.4
-
-- Began using the PostgreSQL LISTEN and NOTIFY statements as a shortcut
- for invalidation polling.
-
-- Removed the commit_order code. The commit_order idea was intended to
- allow concurrent commits, but that idea is a little too ambitious while
- other more important ideas are being tested. Something like it may
- come later.
-
-- Improved connection management: only one database connection is
- held continuously open per storage instance.
-
-- Reconnect to the database automatically.
-
-- Removed test mode.
-
-- Switched from using a ZODB.Connection subclass to a ZODB patch. The
- Connection class changes in subtle ways too often to subclass reliably;
- a patch is much safer.
-
-- PostgreSQL 8.1 is now a dependency because PGStorage uses two phase commit.
-
-- Fixed an undo bug. Symptom: attempting to examine the undo log revealed
- broken pickles. Cause: the extension field was not being wrapped in
- psycopg2.Binary upon insert. Solution: used psycopg2.Binary.
- Unfortunately, this doesn't fix existing transactions people have
- committed. If anyone has any data to keep, fixing the old transactions
- should be easy.
-
-- Moved from a private CVS repository to Sourceforge.
- See http://pgstorage.sourceforge.net . Also switched to the MIT license.
-
-- David Pratt added a basic getSize() implementation so that the Zope
- management interface displays an estimate of the size of the database.
-
-- Turned PGStorage into a top-level package. Python generally makes
- top-level packages easier to install.
-
-
-PGStorage 0.3
-
-- Made compatible with Zope 3, although an undo bug apparently remains.
-
-
-PGStorage 0.2
-
-- Fixed concurrent commits, which were generating deadlocks. Fixed by
- adding a special table, "commit_lock", which is used for
- synchronizing increments of commit_seq (but only at final commit.)
- If you are upgrading from version 0.1, you need to change your
- database using the 'psql' prompt:
-
- create table commit_lock ();
-
-- Added speed tests and an OpenDocument spreadsheet comparing
- FileStorage / ZEO with PGStorage. PGStorage wins at reading objects
- and writing a lot of small transactions, while FileStorage / ZEO
- wins at writing big transactions. Interestingly, they tie when
- writing a RAM disk.
-
Copied: relstorage/tags/1.0-beta1/CHANGELOG.txt (from rev 84130, relstorage/trunk/CHANGELOG.txt)
===================================================================
--- relstorage/tags/1.0-beta1/CHANGELOG.txt (rev 0)
+++ relstorage/tags/1.0-beta1/CHANGELOG.txt 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,113 @@
+
+RelStorage 1.0 beta
+
+- Renamed to reflect expanding database support.
+
+- Added support for Oracle 10g.
+
+- Major overhaul with many scalability and reliability improvements,
+ particularly in the area of packing.
+
+- Moved to svn.zope.org and switched to ZPL 2.1.
+
+- Made two-phase commit optional in both Oracle and PostgreSQL. They
+ both use commit_lock in such a way that the commit is not likely to
+ fail in the second phase.
+
+- Switched most database transaction isolation levels from serializable
+ to read committed. It turns out that commit_lock already provides
+ the serializability guarantees we need, so it is safe to take advantage
+ of the potential speed gains. The one major exception is the load
+ connection, which requires an unchanging view of the database.
+
+- Stored objects are now buffered in a database table rather than a file.
+
+- Stopped using the LISTEN and NOTIFY statements in PostgreSQL since
+ they are not strictly transactional in the sense we require.
+
+- Started using a prepared statement in PostgreSQL for getting the
+ newest transaction ID quickly.
+
+- Removed the code in the Oracle adapter for retrying connection attempts.
+ (It is better to just reconfigure Oracle.)
+
+- Added support for MySQL 5.0.
+
+- Added the poll_interval option. It reduces the frequency of database
+ polls, but it also increases the potential for conflict errors on
+ servers with high write volume.
+
+- Implemented the storage iterator protocol, making it possible to copy
+ transactions to and from FileStorage and other RelStorage instances.
+
+- Fixed a bug that caused OIDs to be reused after importing transactions.
+ Added a corresponding test.
+
+- Made it possible to disable garbage collection during packing.
+ Exposed the option in zope.conf.
+
+- Valery Suhomlinov discovered a problem with non-ASCII data in transaction
+ metadata. The problem has been fixed for all supported databases.
+
+
+PGStorage 0.4
+
+- Began using the PostgreSQL LISTEN and NOTIFY statements as a shortcut
+ for invalidation polling.
+
+- Removed the commit_order code. The commit_order idea was intended to
+ allow concurrent commits, but that idea is a little too ambitious while
+ other more important ideas are being tested. Something like it may
+ come later.
+
+- Improved connection management: only one database connection is
+ held continuously open per storage instance.
+
+- Reconnect to the database automatically.
+
+- Removed test mode.
+
+- Switched from using a ZODB.Connection subclass to a ZODB patch. The
+ Connection class changes in subtle ways too often to subclass reliably;
+ a patch is much safer.
+
+- PostgreSQL 8.1 is now a dependency because PGStorage uses two phase commit.
+
+- Fixed an undo bug. Symptom: attempting to examine the undo log revealed
+ broken pickles. Cause: the extension field was not being wrapped in
+ psycopg2.Binary upon insert. Solution: used psycopg2.Binary.
+ Unfortunately, this doesn't fix existing transactions people have
+ committed. If anyone has any data to keep, fixing the old transactions
+ should be easy.
+
+- Moved from a private CVS repository to Sourceforge.
+ See http://pgstorage.sourceforge.net . Also switched to the MIT license.
+
+- David Pratt added a basic getSize() implementation so that the Zope
+ management interface displays an estimate of the size of the database.
+
+- Turned PGStorage into a top-level package. Python generally makes
+ top-level packages easier to install.
+
+
+PGStorage 0.3
+
+- Made compatible with Zope 3, although an undo bug apparently remains.
+
+
+PGStorage 0.2
+
+- Fixed concurrent commits, which were generating deadlocks. Fixed by
+ adding a special table, "commit_lock", which is used for
+ synchronizing increments of commit_seq (but only at final commit.)
+ If you are upgrading from version 0.1, you need to change your
+ database using the 'psql' prompt:
+
+ create table commit_lock ();
+
+- Added speed tests and an OpenDocument spreadsheet comparing
+ FileStorage / ZEO with PGStorage. PGStorage wins at reading objects
+ and writing a lot of small transactions, while FileStorage / ZEO
+ wins at writing big transactions. Interestingly, they tie when
+ writing a RAM disk.
+
Copied: relstorage/tags/1.0-beta1/MANIFEST (from rev 84130, relstorage/trunk/MANIFEST)
===================================================================
--- relstorage/tags/1.0-beta1/MANIFEST (rev 0)
+++ relstorage/tags/1.0-beta1/MANIFEST 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,21 @@
+CHANGELOG.txt
+MANIFEST.in
+README.txt
+poll-invalidation-1-zodb-3-7-1.patch
+poll-invalidation-1-zodb-3-8-0.patch
+setup.py
+relstorage/__init__.py
+relstorage/component.xml
+relstorage/config.py
+relstorage/relstorage.py
+relstorage/adapters/__init__.py
+relstorage/adapters/common.py
+relstorage/adapters/mysql.py
+relstorage/adapters/oracle.py
+relstorage/adapters/postgresql.py
+relstorage/tests/__init__.py
+relstorage/tests/reltestbase.py
+relstorage/tests/speedtest.py
+relstorage/tests/testmysql.py
+relstorage/tests/testoracle.py
+relstorage/tests/testpostgresql.py
Copied: relstorage/tags/1.0-beta1/MANIFEST.in (from rev 84130, relstorage/trunk/MANIFEST.in)
===================================================================
--- relstorage/tags/1.0-beta1/MANIFEST.in (rev 0)
+++ relstorage/tags/1.0-beta1/MANIFEST.in 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,2 @@
+include MANIFEST.in *.txt *.py *.patch
+recursive-include relstorage *.py *.xml
Deleted: relstorage/tags/1.0-beta1/README.txt
===================================================================
--- relstorage/trunk/README.txt 2008-02-20 03:08:20 UTC (rev 84068)
+++ relstorage/tags/1.0-beta1/README.txt 2008-02-22 09:40:07 UTC (rev 84133)
@@ -1,43 +0,0 @@
-
-To make Zope store in RelStorage, first patch ZODB/Connection.py using the
-provided patch. The patch is for Zope 2.10.5. Then modify etc/zope.conf.
-
-
-For PostgreSQL, use this in etc/zope.conf:
-
-%import relstorage
-<zodb_db main>
- mount-point /
- <relstorage>
- <postgresql>
- </postgresql>
- </relstorage>
-</zodb_db>
-
-
-For Oracle, use this in etc/zope.conf:
-
-%import relstorage
-<zodb_db main>
- mount-point /
- <relstorage>
- <oracle>
- user johndoe
- password opensesame
- dsn XE
- </oracle>
- </relstorage>
-</zodb_db>
-
-
-For MySQL, use this in etc/zope.conf:
-
-%import relstorage
-<zodb_db main>
- <relstorage>
- <mysql>
- db zodb
- </mysql>
- </relstorage>
- mount-point /
-</zodb_db>
Copied: relstorage/tags/1.0-beta1/README.txt (from rev 84130, relstorage/trunk/README.txt)
===================================================================
--- relstorage/tags/1.0-beta1/README.txt (rev 0)
+++ relstorage/tags/1.0-beta1/README.txt 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,45 @@
+
+See http://wiki.zope.org/ZODB/RelStorage .
+
+To make Zope store in RelStorage, first patch ZODB using the provided
+patch. The patch is for Zope 2.10.5. Then modify etc/zope.conf.
+
+
+For PostgreSQL, use this in etc/zope.conf:
+
+%import relstorage
+<zodb_db main>
+ mount-point /
+ <relstorage>
+ <postgresql>
+ </postgresql>
+ </relstorage>
+</zodb_db>
+
+
+For Oracle, use this in etc/zope.conf:
+
+%import relstorage
+<zodb_db main>
+ mount-point /
+ <relstorage>
+ <oracle>
+ user johndoe
+ password opensesame
+ dsn XE
+ </oracle>
+ </relstorage>
+</zodb_db>
+
+
+For MySQL, use this in etc/zope.conf:
+
+%import relstorage
+<zodb_db main>
+ <relstorage>
+ <mysql>
+ db zodb
+ </mysql>
+ </relstorage>
+ mount-point /
+</zodb_db>
Deleted: relstorage/tags/1.0-beta1/notes/oracle_notes.txt
===================================================================
--- relstorage/trunk/notes/oracle_notes.txt 2008-02-20 03:08:20 UTC (rev 84068)
+++ relstorage/tags/1.0-beta1/notes/oracle_notes.txt 2008-02-22 09:40:07 UTC (rev 84133)
@@ -1,16 +0,0 @@
-
-Docs:
- http://www.oracle.com/pls/db102/homepage
-
-Excellent setup instructions:
- http://www.davidpashley.com/articles/oracle-install.html
-
-Work around session limit (fixes ORA-12520):
- ALTER SYSTEM SET PROCESSES=150 SCOPE=SPFILE
- ALTER SYSTEM SET SESSIONS=150 SCOPE=SPFILE
- (then restart Oracle)
-
-Manually rollback an in-dispute transaction:
- select local_tran_id, state from DBA_2PC_PENDING;
- rollback force '$local_tran_id';
-
Copied: relstorage/tags/1.0-beta1/notes/oracle_notes.txt (from rev 84129, relstorage/trunk/notes/oracle_notes.txt)
===================================================================
--- relstorage/tags/1.0-beta1/notes/oracle_notes.txt (rev 0)
+++ relstorage/tags/1.0-beta1/notes/oracle_notes.txt 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,27 @@
+
+If you are using Oracle 10g XE, use the "universal" version,
+since the smaller Western European version has minimal support
+for Unicode and will not pass all of the RelStorage tests.
+
+Docs:
+ http://www.oracle.com/pls/db102/homepage
+
+Excellent setup instructions:
+ http://www.davidpashley.com/articles/oracle-install.html
+
+Work around session limit (fixes ORA-12520):
+ ALTER SYSTEM SET PROCESSES=150 SCOPE=SPFILE;
+ ALTER SYSTEM SET SESSIONS=150 SCOPE=SPFILE;
+ (then restart Oracle)
+
+Manually rollback an in-dispute transaction:
+ select local_tran_id, state from DBA_2PC_PENDING;
+ rollback force '$local_tran_id';
+
+It might be necessary to add the following lines to adapters/oracle.py,
+before all imports, to solve Oracle encoding issues. (Let me know
+if you have to do this!)
+
+ import os
+ os.environ["NLS_LANG"] = ".AL32UTF8"
+
Deleted: relstorage/tags/1.0-beta1/relstorage/adapters/common.py
===================================================================
--- relstorage/trunk/relstorage/adapters/common.py 2008-02-20 03:08:20 UTC (rev 84068)
+++ relstorage/tags/1.0-beta1/relstorage/adapters/common.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -1,655 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Code common to most adapters."""
-
-from ZODB.POSException import UndoError
-
-import logging
-
-log = logging.getLogger("relstorage.adapters.common")
-
-
-class Adapter(object):
- """Common code for a database adapter.
-
- This is an abstract class; a lot of methods are expected to be
- provided by subclasses.
- """
-
- # _script_vars contains replacements for statements in scripts.
- # These are correct for PostgreSQL and MySQL but not for Oracle.
- _script_vars = {
- 'TRUE': 'TRUE',
- 'FALSE': 'FALSE',
- 'OCTET_LENGTH': 'OCTET_LENGTH',
- 'oid': '%(oid)s',
- 'tid': '%(tid)s',
- 'pack_tid': '%(pack_tid)s',
- 'undo_tid': '%(undo_tid)s',
- 'self_tid': '%(self_tid)s',
- 'min_tid': '%(min_tid)s',
- 'max_tid': '%(max_tid)s',
- }
-
- _scripts = {
- 'select_keep_tid': """
- SELECT tid
- FROM object_state
- WHERE zoid = pack_object.zoid
- AND tid > 0
- AND tid <= %(pack_tid)s
- ORDER BY tid DESC
- LIMIT 1
- """,
-
- 'choose_pack_transaction': """
- SELECT tid
- FROM transaction
- WHERE tid > 0
- AND tid <= %(tid)s
- AND packed = FALSE
- ORDER BY tid DESC
- LIMIT 1
- """,
-
- 'create_temp_pack_visit': """
- CREATE TEMPORARY TABLE temp_pack_visit (
- zoid BIGINT NOT NULL
- );
- CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
- """,
-
- 'create_temp_undo': """
- CREATE TEMPORARY TABLE temp_undo (
- zoid BIGINT NOT NULL,
- prev_tid BIGINT NOT NULL
- );
- CREATE UNIQUE INDEX temp_undo_zoid ON temp_undo (zoid)
- """,
-
- 'reset_temp_undo': "DROP TABLE temp_undo",
- }
-
-
- def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
- """Execute a statement from a script with the given parameters.
-
- Subclasses may override this.
- The input statement is generic and needs to be transformed
- into a database-specific statement.
- """
- stmt = generic_stmt % self._script_vars
- try:
- cursor.execute(stmt, generic_params)
- except:
- log.warning("script statement failed: %r; parameters: %r",
- stmt, generic_params)
- raise
-
-
- def _run_script(self, cursor, script, params=()):
- """Execute a series of statements in the database.
-
- The statements are transformed by _run_script_stmt
- before execution.
- """
- lines = []
- for line in script.split('\n'):
- line = line.strip()
- if not line or line.startswith('--'):
- continue
- if line.endswith(';'):
- line = line[:-1]
- lines.append(line)
- stmt = '\n'.join(lines)
- self._run_script_stmt(cursor, stmt, params)
- lines = []
- else:
- lines.append(line)
- if lines:
- stmt = '\n'.join(lines)
- self._run_script_stmt(cursor, stmt, params)
-
-
- def iter_transactions(self, cursor):
- """Iterate over the transaction log, newest first.
-
- Skips packed transactions.
- Yields (tid, username, description, extension) for each transaction.
- """
- stmt = """
- SELECT tid, username, description, extension
- FROM transaction
- WHERE packed = %(FALSE)s
- AND tid != 0
- ORDER BY tid DESC
- """
- self._run_script_stmt(cursor, stmt)
- return iter(cursor)
-
-
- def iter_transactions_range(self, cursor, start=None, stop=None):
- """Iterate over the transactions in the given range, oldest first.
-
- Includes packed transactions.
- Yields (tid, packed, username, description, extension)
- for each transaction.
- """
- stmt = """
- SELECT tid,
- CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END,
- username, description, extension
- FROM transaction
- WHERE tid >= 0
- """
- if start is not None:
- stmt += " AND tid >= %(min_tid)s"
- if stop is not None:
- stmt += " AND tid <= %(max_tid)s"
- stmt += " ORDER BY tid"
- self._run_script_stmt(cursor, stmt,
- {'min_tid': start, 'max_tid': stop})
- return iter(cursor)
-
-
- def iter_object_history(self, cursor, oid):
- """Iterate over an object's history.
-
- Raises KeyError if the object does not exist.
- Yields (tid, username, description, extension, pickle_size)
- for each modification.
- """
- stmt = """
- SELECT 1 FROM current_object WHERE zoid = %(oid)s
- """
- self._run_script_stmt(cursor, stmt, {'oid': oid})
- if not cursor.fetchall():
- raise KeyError(oid)
-
- stmt = """
- SELECT tid, username, description, extension, %(OCTET_LENGTH)s(state)
- FROM transaction
- JOIN object_state USING (tid)
- WHERE zoid = %(oid)s
- AND packed = %(FALSE)s
- ORDER BY tid DESC
- """
- self._run_script_stmt(cursor, stmt, {'oid': oid})
- return iter(cursor)
-
-
- def iter_objects(self, cursor, tid):
- """Iterate over object states in a transaction.
-
- Yields (oid, prev_tid, state) for each object state.
- """
- stmt = """
- SELECT zoid, state
- FROM object_state
- WHERE tid = %(tid)s
- ORDER BY zoid
- """
- self._run_script_stmt(cursor, stmt, {'tid': tid})
- for oid, state in cursor:
- if hasattr(state, 'read'):
- # Oracle
- state = state.read()
- yield oid, state
-
-
- def verify_undoable(self, cursor, undo_tid):
- """Raise UndoError if it is not safe to undo the specified txn."""
- stmt = """
- SELECT 1 FROM transaction
- WHERE tid = %(undo_tid)s
- AND packed = %(FALSE)s
- """
- self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
- if not cursor.fetchall():
- raise UndoError("Transaction not found or packed")
-
- # Rule: we can undo an object if the object's state in the
- # transaction to undo matches the object's current state.
- # If any object in the transaction does not fit that rule,
- # refuse to undo.
- stmt = """
- SELECT prev_os.zoid, current_object.tid
- FROM object_state prev_os
- JOIN object_state cur_os ON (prev_os.zoid = cur_os.zoid)
- JOIN current_object ON (cur_os.zoid = current_object.zoid
- AND cur_os.tid = current_object.tid)
- WHERE prev_os.tid = %(undo_tid)s
- AND cur_os.md5 != prev_os.md5
- """
- self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
- if cursor.fetchmany():
- raise UndoError(
- "Some data were modified by a later transaction")
-
- # Rule: don't allow the creation of the root object to
- # be undone. It's hard to get it back.
- stmt = """
- SELECT 1
- FROM object_state
- WHERE tid = %(undo_tid)s
- AND zoid = 0
- AND prev_tid = 0
- """
- self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
- if cursor.fetchall():
- raise UndoError("Can't undo the creation of the root object")
-
-
- def undo(self, cursor, undo_tid, self_tid):
- """Undo a transaction.
-
- Parameters: "undo_tid", the integer tid of the transaction to undo,
- and "self_tid", the integer tid of the current transaction.
-
- Returns the list of OIDs undone.
- """
- stmt = self._scripts['create_temp_undo']
- if stmt:
- self._run_script(cursor, stmt)
-
- stmt = """
- DELETE FROM temp_undo;
-
- -- Put into temp_undo the list of objects to be undone and
- -- the tid of the transaction that has the undone state.
- INSERT INTO temp_undo (zoid, prev_tid)
- SELECT zoid, prev_tid
- FROM object_state
- WHERE tid = %(undo_tid)s;
-
- -- Override previous undo operations within this transaction
- -- by resetting the current_object pointer and deleting
- -- copied states from object_state.
- UPDATE current_object
- SET tid = (
- SELECT prev_tid
- FROM object_state
- WHERE zoid = current_object.zoid
- AND tid = %(self_tid)s
- )
- WHERE zoid IN (SELECT zoid FROM temp_undo)
- AND tid = %(self_tid)s;
-
- DELETE FROM object_state
- WHERE zoid IN (SELECT zoid FROM temp_undo)
- AND tid = %(self_tid)s;
-
- -- Add new undo records.
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- SELECT temp_undo.zoid, %(self_tid)s, current_object.tid,
- prev.md5, prev.state
- FROM temp_undo
- JOIN current_object ON (temp_undo.zoid = current_object.zoid)
- LEFT JOIN object_state prev
- ON (prev.zoid = temp_undo.zoid
- AND prev.tid = temp_undo.prev_tid);
-
- -- List the changed OIDs.
- SELECT zoid FROM temp_undo
- """
- self._run_script(cursor, stmt,
- {'undo_tid': undo_tid, 'self_tid': self_tid})
- res = [oid for (oid,) in cursor]
-
- stmt = self._scripts['reset_temp_undo']
- if stmt:
- self._run_script(cursor, stmt)
-
- return res
-
-
- def choose_pack_transaction(self, pack_point):
- """Return the transaction before or at the specified pack time.
-
- Returns None if there is nothing to pack.
- """
- conn, cursor = self.open()
- try:
- stmt = self._scripts['choose_pack_transaction']
- self._run_script(cursor, stmt, {'tid': pack_point})
- rows = cursor.fetchall()
- if not rows:
- # Nothing needs to be packed.
- return None
- return rows[0][0]
- finally:
- self.close(conn, cursor)
-
-
- def pre_pack(self, pack_tid, get_references, gc=True):
- """Decide what to pack.
-
- Subclasses may override this.
-
- tid specifies the most recent transaction to pack.
-
- get_references is a function that accepts a pickled state and
- returns a set of OIDs that state refers to.
-
- gc is a boolean indicating whether to run garbage collection.
- If gc is false, at least one revision of every object is kept,
- even if nothing refers to it. Packing with gc disabled can be
- much faster.
- """
- conn, cursor = self.open()
- try:
- try:
- if gc:
- self._pre_pack_with_gc(cursor, pack_tid, get_references)
- else:
- self._pre_pack_without_gc(cursor, pack_tid)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
-
-
- def _pre_pack_without_gc(self, cursor, pack_tid):
- """Determine what to pack, without garbage collection.
-
- With garbage collection disabled, there is no need to follow
- object references.
- """
- # Fill the pack_object table with OIDs, but configure them
- # all to be kept by setting keep and keep_tid.
- stmt = """
- DELETE FROM pack_object;
-
- INSERT INTO pack_object (zoid, keep)
- SELECT DISTINCT zoid, %(TRUE)s
- FROM object_state
- WHERE tid <= %(pack_tid)s;
-
- UPDATE pack_object SET keep_tid = (@select_keep_tid@)
- """
- stmt = stmt.replace(
- '@select_keep_tid@', self._scripts['select_keep_tid'])
- self._run_script(cursor, stmt, {'pack_tid': pack_tid})
-
-
- def _pre_pack_with_gc(self, cursor, pack_tid, get_references):
- """Determine what to pack, with garbage collection.
- """
- # Fill object_ref with references from object states
- # in transactions that will not be packed.
- self._fill_nonpacked_refs(cursor, pack_tid, get_references)
-
- # Fill the pack_object table with OIDs that either will be
- # removed (if nothing references the OID) or whose history will
- # be cut.
- stmt = """
- DELETE FROM pack_object;
-
- INSERT INTO pack_object (zoid, keep)
- SELECT DISTINCT zoid, %(FALSE)s
- FROM object_state
- WHERE tid <= %(pack_tid)s;
-
- -- If the root object is in pack_object, keep it.
- UPDATE pack_object SET keep = %(TRUE)s
- WHERE zoid = 0;
-
- -- Keep objects that have been revised since pack_tid.
- UPDATE pack_object SET keep = %(TRUE)s
- WHERE keep = %(FALSE)s
- AND zoid IN (
- SELECT zoid
- FROM current_object
- WHERE tid > %(pack_tid)s
- );
-
- -- Keep objects that are still referenced by object states in
- -- transactions that will not be packed.
- UPDATE pack_object SET keep = %(TRUE)s
- WHERE keep = %(FALSE)s
- AND zoid IN (
- SELECT to_zoid
- FROM object_ref
- WHERE tid > %(pack_tid)s
- );
- """
- self._run_script(cursor, stmt, {'pack_tid': pack_tid})
-
- stmt = self._scripts['create_temp_pack_visit']
- if stmt:
- self._run_script(cursor, stmt)
-
- # Each of the packable objects to be kept might
- # refer to other objects. If some of those references
- # include objects currently set to be removed, keep
- # those objects as well. Do this
- # repeatedly until all references have been satisfied.
- while True:
-
- # Make a list of all parent objects that still need
- # to be visited. Then set keep_tid for all pack_object
- # rows with keep = true.
- # keep_tid must be set before _fill_pack_object_refs examines
- # references.
- stmt = """
- DELETE FROM temp_pack_visit;
-
- INSERT INTO temp_pack_visit (zoid)
- SELECT zoid
- FROM pack_object
- WHERE keep = %(TRUE)s
- AND keep_tid IS NULL;
-
- UPDATE pack_object SET keep_tid = (@select_keep_tid@)
- WHERE keep = %(TRUE)s AND keep_tid IS NULL
- """
- stmt = stmt.replace(
- '@select_keep_tid@', self._scripts['select_keep_tid'])
- self._run_script(cursor, stmt, {'pack_tid': pack_tid})
-
- self._fill_pack_object_refs(cursor, get_references)
-
- # Visit the children of all parent objects that were
- # just visited.
- stmt = """
- UPDATE pack_object SET keep = %(TRUE)s
- WHERE keep = %(FALSE)s
- AND zoid IN (
- SELECT DISTINCT to_zoid
- FROM object_ref
- JOIN temp_pack_visit USING (zoid)
- )
- """
- self._run_script_stmt(cursor, stmt)
- if not cursor.rowcount:
- # No new references detected.
- break
-
-
- def _fill_nonpacked_refs(self, cursor, pack_tid, get_references):
- """Fill object_ref for all transactions that will not be packed."""
- stmt = """
- SELECT DISTINCT tid
- FROM object_state
- WHERE tid > %(pack_tid)s
- AND NOT EXISTS (
- SELECT 1
- FROM object_refs_added
- WHERE tid = object_state.tid
- )
- """
- self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
- for (tid,) in cursor.fetchall():
- self._add_refs_for_tid(cursor, tid, get_references)
-
-
- def _fill_pack_object_refs(self, cursor, get_references):
- """Fill object_ref for all pack_object rows that have keep_tid."""
- stmt = """
- SELECT DISTINCT keep_tid
- FROM pack_object
- WHERE keep_tid IS NOT NULL
- AND NOT EXISTS (
- SELECT 1
- FROM object_refs_added
- WHERE tid = keep_tid
- )
- """
- cursor.execute(stmt)
- for (tid,) in cursor.fetchall():
- self._add_refs_for_tid(cursor, tid, get_references)
-
-
- def _add_object_ref_rows(self, cursor, add_rows):
- """Add rows to object_ref.
-
- The input rows are tuples containing (from_zoid, tid, to_zoid).
-
- Subclasses can override this.
- """
- stmt = """
- INSERT INTO object_ref (zoid, tid, to_zoid)
- VALUES (%s, %s, %s)
- """
- cursor.executemany(stmt, add_rows)
-
-
- def _add_refs_for_tid(self, cursor, tid, get_references):
- """Fill object_refs with all states for a transaction.
- """
- stmt = """
- SELECT zoid, state
- FROM object_state
- WHERE tid = %(tid)s
- """
- self._run_script_stmt(cursor, stmt, {'tid': tid})
-
- add_rows = [] # [(from_oid, tid, to_oid)]
- for from_oid, state in cursor:
- if hasattr(state, 'read'):
- # Oracle
- state = state.read()
- if state:
- to_oids = get_references(str(state))
- for to_oid in to_oids:
- add_rows.append((from_oid, tid, to_oid))
-
- if add_rows:
- self._add_object_ref_rows(cursor, add_rows)
-
- # The references have been computed for this transaction.
- stmt = """
- INSERT INTO object_refs_added (tid)
- VALUES (%(tid)s)
- """
- self._run_script_stmt(cursor, stmt, {'tid': tid})
-
-
- def _hold_commit_lock(self, cursor):
- """Hold the commit lock for packing"""
- cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
-
-
- def pack(self, pack_tid):
- """Pack. Requires populated pack tables."""
-
- # Read committed mode is sufficient.
- conn, cursor = self.open()
- try:
- try:
- # hold the commit lock for a moment to prevent deadlocks.
- self._hold_commit_lock(cursor)
-
- for table in ('object_ref', 'current_object', 'object_state'):
-
- # Remove objects that are in pack_object and have keep
- # set to false.
- stmt = """
- DELETE FROM %s
- WHERE zoid IN (
- SELECT zoid
- FROM pack_object
- WHERE keep = %%(FALSE)s
- )
- """ % table
- self._run_script_stmt(cursor, stmt)
-
- if table != 'current_object':
- # Cut the history of objects in pack_object that
- # have keep set to true.
- stmt = """
- DELETE FROM %s
- WHERE zoid IN (
- SELECT zoid
- FROM pack_object
- WHERE keep = %%(TRUE)s
- )
- AND tid < (
- SELECT keep_tid
- FROM pack_object
- WHERE zoid = %s.zoid
- )
- """ % (table, table)
- self._run_script_stmt(cursor, stmt)
-
- stmt = """
- -- Terminate prev_tid chains
- UPDATE object_state SET prev_tid = 0
- WHERE tid <= %(pack_tid)s
- AND prev_tid != 0;
-
- -- For each tid to be removed, delete the corresponding row in
- -- object_refs_added.
- DELETE FROM object_refs_added
- WHERE tid > 0
- AND tid <= %(pack_tid)s
- AND NOT EXISTS (
- SELECT 1
- FROM object_state
- WHERE tid = object_refs_added.tid
- );
-
- -- Delete transactions no longer used.
- DELETE FROM transaction
- WHERE tid > 0
- AND tid <= %(pack_tid)s
- AND NOT EXISTS (
- SELECT 1
- FROM object_state
- WHERE tid = transaction.tid
- );
-
- -- Mark the remaining packable transactions as packed
- UPDATE transaction SET packed = %(TRUE)s
- WHERE tid > 0
- AND tid <= %(pack_tid)s
- AND packed = %(FALSE)s;
-
- -- Clean up.
- DELETE FROM pack_object;
- """
- self._run_script(cursor, stmt, {'pack_tid': pack_tid})
-
- except:
- conn.rollback()
- raise
-
- else:
- conn.commit()
-
- finally:
- self.close(conn, cursor)
Copied: relstorage/tags/1.0-beta1/relstorage/adapters/common.py (from rev 84129, relstorage/trunk/relstorage/adapters/common.py)
===================================================================
--- relstorage/tags/1.0-beta1/relstorage/adapters/common.py (rev 0)
+++ relstorage/tags/1.0-beta1/relstorage/adapters/common.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,674 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Code common to most adapters."""
+
+from ZODB.POSException import UndoError
+
+import logging
+
+log = logging.getLogger("relstorage.adapters.common")
+
+
+# Notes about adapters:
+#
+# An adapter must not hold a connection, cursor, or database state, because
+# RelStorage opens multiple concurrent connections using a single adapter
+# instance.
+# Within the context of an adapter, all OID and TID values are integers,
+# not binary strings, except as noted.
+
+class Adapter(object):
+ """Common code for a database adapter.
+
+ This is an abstract class; a lot of methods are expected to be
+ provided by subclasses.
+ """
+
+ # _script_vars contains replacements for statements in scripts.
+ # These are correct for PostgreSQL and MySQL but not for Oracle.
+ _script_vars = {
+ 'TRUE': 'TRUE',
+ 'FALSE': 'FALSE',
+ 'OCTET_LENGTH': 'OCTET_LENGTH',
+ 'oid': '%(oid)s',
+ 'tid': '%(tid)s',
+ 'pack_tid': '%(pack_tid)s',
+ 'undo_tid': '%(undo_tid)s',
+ 'self_tid': '%(self_tid)s',
+ 'min_tid': '%(min_tid)s',
+ 'max_tid': '%(max_tid)s',
+ }
+
+ _scripts = {
+ 'select_keep_tid': """
+ SELECT tid
+ FROM object_state
+ WHERE zoid = pack_object.zoid
+ AND tid > 0
+ AND tid <= %(pack_tid)s
+ ORDER BY tid DESC
+ LIMIT 1
+ """,
+
+ 'choose_pack_transaction': """
+ SELECT tid
+ FROM transaction
+ WHERE tid > 0
+ AND tid <= %(tid)s
+ AND packed = FALSE
+ ORDER BY tid DESC
+ LIMIT 1
+ """,
+
+ 'create_temp_pack_visit': """
+ CREATE TEMPORARY TABLE temp_pack_visit (
+ zoid BIGINT NOT NULL
+ );
+ CREATE UNIQUE INDEX temp_pack_visit_zoid ON temp_pack_visit (zoid)
+ """,
+
+ 'create_temp_undo': """
+ CREATE TEMPORARY TABLE temp_undo (
+ zoid BIGINT NOT NULL,
+ prev_tid BIGINT NOT NULL
+ );
+ CREATE UNIQUE INDEX temp_undo_zoid ON temp_undo (zoid)
+ """,
+
+ 'reset_temp_undo': "DROP TABLE temp_undo",
+ }
+
+
+ def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
+ """Execute a statement from a script with the given parameters.
+
+ Subclasses may override this.
+ The input statement is generic and needs to be transformed
+ into a database-specific statement.
+ """
+ stmt = generic_stmt % self._script_vars
+ try:
+ cursor.execute(stmt, generic_params)
+ except:
+ log.warning("script statement failed: %r; parameters: %r",
+ stmt, generic_params)
+ raise
+
+
+ def _run_script(self, cursor, script, params=()):
+ """Execute a series of statements in the database.
+
+ The statements are transformed by _run_script_stmt
+ before execution.
+ """
+ lines = []
+ for line in script.split('\n'):
+ line = line.strip()
+ if not line or line.startswith('--'):
+ continue
+ if line.endswith(';'):
+ line = line[:-1]
+ lines.append(line)
+ stmt = '\n'.join(lines)
+ self._run_script_stmt(cursor, stmt, params)
+ lines = []
+ else:
+ lines.append(line)
+ if lines:
+ stmt = '\n'.join(lines)
+ self._run_script_stmt(cursor, stmt, params)
+
+
+ def _transaction_iterator(self, cursor):
+ """Iterate over a list of transactions returned from the database.
+
+ Each row begins with (tid, username, description, extension)
+ and may have other columns.
+
+ The default implementation returns the cursor unmodified.
+ Subclasses can override this.
+ """
+ return cursor
+
+
+ def iter_transactions(self, cursor):
+ """Iterate over the transaction log, newest first.
+
+ Skips packed transactions.
+ Yields (tid, username, description, extension) for each transaction.
+ """
+ stmt = """
+ SELECT tid, username, description, extension
+ FROM transaction
+ WHERE packed = %(FALSE)s
+ AND tid != 0
+ ORDER BY tid DESC
+ """
+ self._run_script_stmt(cursor, stmt)
+ return self._transaction_iterator(cursor)
+
+
+ def iter_transactions_range(self, cursor, start=None, stop=None):
+ """Iterate over the transactions in the given range, oldest first.
+
+ Includes packed transactions.
+ Yields (tid, packed, username, description, extension)
+ for each transaction.
+ """
+ stmt = """
+ SELECT tid, username, description, extension,
+ CASE WHEN packed = %(TRUE)s THEN 1 ELSE 0 END
+ FROM transaction
+ WHERE tid >= 0
+ """
+ if start is not None:
+ stmt += " AND tid >= %(min_tid)s"
+ if stop is not None:
+ stmt += " AND tid <= %(max_tid)s"
+ stmt += " ORDER BY tid"
+ self._run_script_stmt(cursor, stmt,
+ {'min_tid': start, 'max_tid': stop})
+ return self._transaction_iterator(cursor)
+
+
+ def iter_object_history(self, cursor, oid):
+ """Iterate over an object's history.
+
+ Raises KeyError if the object does not exist.
+ Yields (tid, username, description, extension, pickle_size)
+ for each modification.
+ """
+ stmt = """
+ SELECT 1 FROM current_object WHERE zoid = %(oid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'oid': oid})
+ if not cursor.fetchall():
+ raise KeyError(oid)
+
+ stmt = """
+ SELECT tid, username, description, extension, %(OCTET_LENGTH)s(state)
+ FROM transaction
+ JOIN object_state USING (tid)
+ WHERE zoid = %(oid)s
+ AND packed = %(FALSE)s
+ ORDER BY tid DESC
+ """
+ self._run_script_stmt(cursor, stmt, {'oid': oid})
+ return self._transaction_iterator(cursor)
+
+
+ def iter_objects(self, cursor, tid):
+ """Iterate over object states in a transaction.
+
+ Yields (oid, prev_tid, state) for each object state.
+ """
+ stmt = """
+ SELECT zoid, state
+ FROM object_state
+ WHERE tid = %(tid)s
+ ORDER BY zoid
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+ for oid, state in cursor:
+ if hasattr(state, 'read'):
+ # Oracle
+ state = state.read()
+ yield oid, state
+
+
+ def verify_undoable(self, cursor, undo_tid):
+ """Raise UndoError if it is not safe to undo the specified txn."""
+ stmt = """
+ SELECT 1 FROM transaction
+ WHERE tid = %(undo_tid)s
+ AND packed = %(FALSE)s
+ """
+ self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
+ if not cursor.fetchall():
+ raise UndoError("Transaction not found or packed")
+
+ # Rule: we can undo an object if the object's state in the
+ # transaction to undo matches the object's current state.
+ # If any object in the transaction does not fit that rule,
+ # refuse to undo.
+ stmt = """
+ SELECT prev_os.zoid, current_object.tid
+ FROM object_state prev_os
+ JOIN object_state cur_os ON (prev_os.zoid = cur_os.zoid)
+ JOIN current_object ON (cur_os.zoid = current_object.zoid
+ AND cur_os.tid = current_object.tid)
+ WHERE prev_os.tid = %(undo_tid)s
+ AND cur_os.md5 != prev_os.md5
+ """
+ self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
+ if cursor.fetchmany():
+ raise UndoError(
+ "Some data were modified by a later transaction")
+
+ # Rule: don't allow the creation of the root object to
+ # be undone. It's hard to get it back.
+ stmt = """
+ SELECT 1
+ FROM object_state
+ WHERE tid = %(undo_tid)s
+ AND zoid = 0
+ AND prev_tid = 0
+ """
+ self._run_script_stmt(cursor, stmt, {'undo_tid': undo_tid})
+ if cursor.fetchall():
+ raise UndoError("Can't undo the creation of the root object")
+
+
+ def undo(self, cursor, undo_tid, self_tid):
+ """Undo a transaction.
+
+ Parameters: "undo_tid", the integer tid of the transaction to undo,
+ and "self_tid", the integer tid of the current transaction.
+
+ Returns the list of OIDs undone.
+ """
+ stmt = self._scripts['create_temp_undo']
+ if stmt:
+ self._run_script(cursor, stmt)
+
+ stmt = """
+ DELETE FROM temp_undo;
+
+ -- Put into temp_undo the list of objects to be undone and
+ -- the tid of the transaction that has the undone state.
+ INSERT INTO temp_undo (zoid, prev_tid)
+ SELECT zoid, prev_tid
+ FROM object_state
+ WHERE tid = %(undo_tid)s;
+
+ -- Override previous undo operations within this transaction
+ -- by resetting the current_object pointer and deleting
+ -- copied states from object_state.
+ UPDATE current_object
+ SET tid = (
+ SELECT prev_tid
+ FROM object_state
+ WHERE zoid = current_object.zoid
+ AND tid = %(self_tid)s
+ )
+ WHERE zoid IN (SELECT zoid FROM temp_undo)
+ AND tid = %(self_tid)s;
+
+ DELETE FROM object_state
+ WHERE zoid IN (SELECT zoid FROM temp_undo)
+ AND tid = %(self_tid)s;
+
+ -- Add new undo records.
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT temp_undo.zoid, %(self_tid)s, current_object.tid,
+ prev.md5, prev.state
+ FROM temp_undo
+ JOIN current_object ON (temp_undo.zoid = current_object.zoid)
+ LEFT JOIN object_state prev
+ ON (prev.zoid = temp_undo.zoid
+ AND prev.tid = temp_undo.prev_tid);
+
+ -- List the changed OIDs.
+ SELECT zoid FROM temp_undo
+ """
+ self._run_script(cursor, stmt,
+ {'undo_tid': undo_tid, 'self_tid': self_tid})
+ res = [oid for (oid,) in cursor]
+
+ stmt = self._scripts['reset_temp_undo']
+ if stmt:
+ self._run_script(cursor, stmt)
+
+ return res
+
+
+ def choose_pack_transaction(self, pack_point):
+ """Return the transaction before or at the specified pack time.
+
+ Returns None if there is nothing to pack.
+ """
+ conn, cursor = self.open()
+ try:
+ stmt = self._scripts['choose_pack_transaction']
+ self._run_script(cursor, stmt, {'tid': pack_point})
+ rows = cursor.fetchall()
+ if not rows:
+ # Nothing needs to be packed.
+ return None
+ return rows[0][0]
+ finally:
+ self.close(conn, cursor)
+
+
+ def pre_pack(self, pack_tid, get_references, gc):
+ """Decide what to pack.
+
+ Subclasses may override this.
+
+ tid specifies the most recent transaction to pack.
+
+ get_references is a function that accepts a pickled state and
+ returns a set of OIDs that state refers to.
+
+ gc is a boolean indicating whether to run garbage collection.
+ If gc is false, at least one revision of every object is kept,
+ even if nothing refers to it. Packing with gc disabled can be
+ much faster.
+ """
+ conn, cursor = self.open()
+ try:
+ try:
+ if gc:
+ self._pre_pack_with_gc(cursor, pack_tid, get_references)
+ else:
+ self._pre_pack_without_gc(cursor, pack_tid)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def _pre_pack_without_gc(self, cursor, pack_tid):
+ """Determine what to pack, without garbage collection.
+
+ With garbage collection disabled, there is no need to follow
+ object references.
+ """
+ # Fill the pack_object table with OIDs, but configure them
+ # all to be kept by setting keep and keep_tid.
+ stmt = """
+ DELETE FROM pack_object;
+
+ INSERT INTO pack_object (zoid, keep)
+ SELECT DISTINCT zoid, %(TRUE)s
+ FROM object_state
+ WHERE tid <= %(pack_tid)s;
+
+ UPDATE pack_object SET keep_tid = (@select_keep_tid@)
+ """
+ stmt = stmt.replace(
+ '@select_keep_tid@', self._scripts['select_keep_tid'])
+ self._run_script(cursor, stmt, {'pack_tid': pack_tid})
+
+
+ def _pre_pack_with_gc(self, cursor, pack_tid, get_references):
+ """Determine what to pack, with garbage collection.
+ """
+ # Fill object_ref with references from object states
+ # in transactions that will not be packed.
+ self._fill_nonpacked_refs(cursor, pack_tid, get_references)
+
+ # Fill the pack_object table with OIDs that either will be
+ # removed (if nothing references the OID) or whose history will
+ # be cut.
+ stmt = """
+ DELETE FROM pack_object;
+
+ INSERT INTO pack_object (zoid, keep)
+ SELECT DISTINCT zoid, %(FALSE)s
+ FROM object_state
+ WHERE tid <= %(pack_tid)s;
+
+ -- If the root object is in pack_object, keep it.
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE zoid = 0;
+
+ -- Keep objects that have been revised since pack_tid.
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE keep = %(FALSE)s
+ AND zoid IN (
+ SELECT zoid
+ FROM current_object
+ WHERE tid > %(pack_tid)s
+ );
+
+ -- Keep objects that are still referenced by object states in
+ -- transactions that will not be packed.
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE keep = %(FALSE)s
+ AND zoid IN (
+ SELECT to_zoid
+ FROM object_ref
+ WHERE tid > %(pack_tid)s
+ );
+ """
+ self._run_script(cursor, stmt, {'pack_tid': pack_tid})
+
+ stmt = self._scripts['create_temp_pack_visit']
+ if stmt:
+ self._run_script(cursor, stmt)
+
+ # Each of the packable objects to be kept might
+ # refer to other objects. If some of those references
+ # include objects currently set to be removed, keep
+ # those objects as well. Do this
+ # repeatedly until all references have been satisfied.
+ while True:
+
+ # Make a list of all parent objects that still need
+ # to be visited. Then set keep_tid for all pack_object
+ # rows with keep = true.
+ # keep_tid must be set before _fill_pack_object_refs examines
+ # references.
+ stmt = """
+ DELETE FROM temp_pack_visit;
+
+ INSERT INTO temp_pack_visit (zoid)
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = %(TRUE)s
+ AND keep_tid IS NULL;
+
+ UPDATE pack_object SET keep_tid = (@select_keep_tid@)
+ WHERE keep = %(TRUE)s AND keep_tid IS NULL
+ """
+ stmt = stmt.replace(
+ '@select_keep_tid@', self._scripts['select_keep_tid'])
+ self._run_script(cursor, stmt, {'pack_tid': pack_tid})
+
+ self._fill_pack_object_refs(cursor, get_references)
+
+ # Visit the children of all parent objects that were
+ # just visited.
+ stmt = """
+ UPDATE pack_object SET keep = %(TRUE)s
+ WHERE keep = %(FALSE)s
+ AND zoid IN (
+ SELECT DISTINCT to_zoid
+ FROM object_ref
+ JOIN temp_pack_visit USING (zoid)
+ )
+ """
+ self._run_script_stmt(cursor, stmt)
+ if not cursor.rowcount:
+ # No new references detected.
+ break
+
+
+ def _fill_nonpacked_refs(self, cursor, pack_tid, get_references):
+ """Fill object_ref for all transactions that will not be packed."""
+ stmt = """
+ SELECT DISTINCT tid
+ FROM object_state
+ WHERE tid > %(pack_tid)s
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_refs_added
+ WHERE tid = object_state.tid
+ )
+ """
+ self._run_script_stmt(cursor, stmt, {'pack_tid': pack_tid})
+ for (tid,) in cursor.fetchall():
+ self._add_refs_for_tid(cursor, tid, get_references)
+
+
+ def _fill_pack_object_refs(self, cursor, get_references):
+ """Fill object_ref for all pack_object rows that have keep_tid."""
+ stmt = """
+ SELECT DISTINCT keep_tid
+ FROM pack_object
+ WHERE keep_tid IS NOT NULL
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_refs_added
+ WHERE tid = keep_tid
+ )
+ """
+ cursor.execute(stmt)
+ for (tid,) in cursor.fetchall():
+ self._add_refs_for_tid(cursor, tid, get_references)
+
+
+ def _add_object_ref_rows(self, cursor, add_rows):
+ """Add rows to object_ref.
+
+ The input rows are tuples containing (from_zoid, tid, to_zoid).
+
+ Subclasses can override this.
+ """
+ stmt = """
+ INSERT INTO object_ref (zoid, tid, to_zoid)
+ VALUES (%s, %s, %s)
+ """
+ cursor.executemany(stmt, add_rows)
+
+
+ def _add_refs_for_tid(self, cursor, tid, get_references):
+ """Fill object_refs with all states for a transaction.
+ """
+ stmt = """
+ SELECT zoid, state
+ FROM object_state
+ WHERE tid = %(tid)s
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+
+ add_rows = [] # [(from_oid, tid, to_oid)]
+ for from_oid, state in cursor:
+ if hasattr(state, 'read'):
+ # Oracle
+ state = state.read()
+ if state:
+ to_oids = get_references(str(state))
+ for to_oid in to_oids:
+ add_rows.append((from_oid, tid, to_oid))
+
+ if add_rows:
+ self._add_object_ref_rows(cursor, add_rows)
+
+ # The references have been computed for this transaction.
+ stmt = """
+ INSERT INTO object_refs_added (tid)
+ VALUES (%(tid)s)
+ """
+ self._run_script_stmt(cursor, stmt, {'tid': tid})
+
+
+ def _hold_commit_lock(self, cursor):
+ """Hold the commit lock for packing"""
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+
+
+ def pack(self, pack_tid):
+ """Pack. Requires populated pack tables."""
+
+ # Read committed mode is sufficient.
+ conn, cursor = self.open()
+ try:
+ try:
+ # hold the commit lock for a moment to prevent deadlocks.
+ self._hold_commit_lock(cursor)
+
+ for table in ('object_ref', 'current_object', 'object_state'):
+
+ # Remove objects that are in pack_object and have keep
+ # set to false.
+ stmt = """
+ DELETE FROM %s
+ WHERE zoid IN (
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = %%(FALSE)s
+ )
+ """ % table
+ self._run_script_stmt(cursor, stmt)
+
+ if table != 'current_object':
+ # Cut the history of objects in pack_object that
+ # have keep set to true.
+ stmt = """
+ DELETE FROM %s
+ WHERE zoid IN (
+ SELECT zoid
+ FROM pack_object
+ WHERE keep = %%(TRUE)s
+ )
+ AND tid < (
+ SELECT keep_tid
+ FROM pack_object
+ WHERE zoid = %s.zoid
+ )
+ """ % (table, table)
+ self._run_script_stmt(cursor, stmt)
+
+ stmt = """
+ -- Terminate prev_tid chains
+ UPDATE object_state SET prev_tid = 0
+ WHERE tid <= %(pack_tid)s
+ AND prev_tid != 0;
+
+ -- For each tid to be removed, delete the corresponding row in
+ -- object_refs_added.
+ DELETE FROM object_refs_added
+ WHERE tid > 0
+ AND tid <= %(pack_tid)s
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_state
+ WHERE tid = object_refs_added.tid
+ );
+
+ -- Delete transactions no longer used.
+ DELETE FROM transaction
+ WHERE tid > 0
+ AND tid <= %(pack_tid)s
+ AND NOT EXISTS (
+ SELECT 1
+ FROM object_state
+ WHERE tid = transaction.tid
+ );
+
+ -- Mark the remaining packable transactions as packed
+ UPDATE transaction SET packed = %(TRUE)s
+ WHERE tid > 0
+ AND tid <= %(pack_tid)s
+ AND packed = %(FALSE)s;
+
+ -- Clean up.
+ DELETE FROM pack_object;
+ """
+ self._run_script(cursor, stmt, {'pack_tid': pack_tid})
+
+ except:
+ conn.rollback()
+ raise
+
+ else:
+ conn.commit()
+
+ finally:
+ self.close(conn, cursor)
Deleted: relstorage/tags/1.0-beta1/relstorage/adapters/mysql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/mysql.py 2008-02-20 03:08:20 UTC (rev 84068)
+++ relstorage/tags/1.0-beta1/relstorage/adapters/mysql.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -1,620 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""MySQL adapter for RelStorage.
-
-Connection parameters supported by MySQLdb:
-
-host
- string, host to connect
-user
- string, user to connect as
-passwd
- string, password to use
-db
- string, database to use
-port
- integer, TCP/IP port to connect to
-unix_socket
- string, location of unix_socket (UNIX-ish only)
-conv
- mapping, maps MySQL FIELD_TYPE.* to Python functions which convert a
- string to the appropriate Python type
-connect_timeout
- number of seconds to wait before the connection attempt fails.
-compress
- if set, gzip compression is enabled
-named_pipe
- if set, connect to server via named pipe (Windows only)
-init_command
- command which is run once the connection is created
-read_default_file
- see the MySQL documentation for mysql_options()
-read_default_group
- see the MySQL documentation for mysql_options()
-client_flag
- client flags from MySQLdb.constants.CLIENT
-load_infile
- int, non-zero enables LOAD LOCAL INFILE, zero disables
-"""
-
-import logging
-import MySQLdb
-import time
-from ZODB.POSException import StorageError
-
-from common import Adapter
-
-log = logging.getLogger("relstorage.adapters.mysql")
-
-commit_lock_timeout = 30
-
-
-class MySQLAdapter(Adapter):
- """MySQL adapter for RelStorage."""
-
- def __init__(self, **params):
- self._params = params
-
- def create_schema(self, cursor):
- """Create the database tables."""
- stmt = """
- -- The list of all transactions in the database
- CREATE TABLE transaction (
- tid BIGINT NOT NULL PRIMARY KEY,
- packed BOOLEAN NOT NULL DEFAULT FALSE,
- username VARCHAR(255) NOT NULL,
- description TEXT NOT NULL,
- extension BLOB
- ) ENGINE = InnoDB;
-
- -- Create a special transaction to represent object creation. This
- -- row is often referenced by object_state.prev_tid, but never by
- -- object_state.tid.
- INSERT INTO transaction (tid, username, description)
- VALUES (0, 'system', 'special transaction for object creation');
-
- -- All OIDs allocated in the database. Note that this table
- -- is purposely non-transactional.
- CREATE TABLE new_oid (
- zoid BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT
- ) ENGINE = MyISAM;
-
- -- All object states in all transactions. Note that md5 and state
- -- can be null to represent object uncreation.
- CREATE TABLE object_state (
- zoid BIGINT NOT NULL,
- tid BIGINT NOT NULL REFERENCES transaction,
- PRIMARY KEY (zoid, tid),
- prev_tid BIGINT NOT NULL REFERENCES transaction,
- md5 CHAR(32),
- state LONGBLOB,
- CHECK (tid > 0)
- ) ENGINE = InnoDB;
- CREATE INDEX object_state_tid ON object_state (tid);
-
- -- Pointers to the current object state
- CREATE TABLE current_object (
- zoid BIGINT NOT NULL PRIMARY KEY,
- tid BIGINT NOT NULL,
- FOREIGN KEY (zoid, tid) REFERENCES object_state (zoid, tid)
- ) ENGINE = InnoDB;
-
- -- A list of referenced OIDs from each object_state.
- -- This table is populated as needed during packing.
- -- To prevent unnecessary table locking, it does not use
- -- foreign keys, which is safe because rows in object_state
- -- are never modified once committed, and rows are removed
- -- from object_state only by packing.
- CREATE TABLE object_ref (
- zoid BIGINT NOT NULL,
- tid BIGINT NOT NULL,
- to_zoid BIGINT NOT NULL
- ) ENGINE = MyISAM;
- CREATE INDEX object_ref_from ON object_ref (zoid);
- CREATE INDEX object_ref_tid ON object_ref (tid);
- CREATE INDEX object_ref_to ON object_ref (to_zoid);
-
- -- The object_refs_added table tracks whether object_refs has
- -- been populated for all states in a given transaction.
- -- An entry is added only when the work is finished.
- -- To prevent unnecessary table locking, it does not use
- -- foreign keys, which is safe because object states
- -- are never added to a transaction once committed, and
- -- rows are removed from the transaction table only by
- -- packing.
- CREATE TABLE object_refs_added (
- tid BIGINT NOT NULL PRIMARY KEY
- ) ENGINE = MyISAM;
-
- -- Temporary state during packing:
- -- The list of objects to pack. If keep is 'N',
- -- the object and all its revisions will be removed.
- -- If keep is 'Y', instead of removing the object,
- -- the pack operation will cut the object's history.
- -- If keep is 'Y' then the keep_tid field must also be set.
- -- The keep_tid field specifies which revision to keep within
- -- the list of packable transactions.
- CREATE TABLE pack_object (
- zoid BIGINT NOT NULL PRIMARY KEY,
- keep BOOLEAN NOT NULL,
- keep_tid BIGINT
- ) ENGINE = MyISAM;
- CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
- """
- self._run_script(cursor, stmt)
-
-
- def prepare_schema(self):
- """Create the database schema if it does not already exist."""
- conn, cursor = self.open()
- try:
- try:
- cursor.execute("SHOW TABLES LIKE 'object_state'")
- if not cursor.rowcount:
- self.create_schema(cursor)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
-
-
- def zap(self):
- """Clear all data out of the database.
-
- Used by the test suite.
- """
- conn, cursor = self.open()
- try:
- try:
- stmt = """
- DELETE FROM object_refs_added;
- DELETE FROM object_ref;
- DELETE FROM current_object;
- DELETE FROM object_state;
- TRUNCATE new_oid;
- DELETE FROM transaction;
- -- Create a transaction to represent object creation.
- INSERT INTO transaction (tid, username, description) VALUES
- (0, 'system', 'special transaction for object creation');
- """
- self._run_script(cursor, stmt)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
-
-
- def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED"):
- """Open a database connection and return (conn, cursor)."""
- try:
- conn = MySQLdb.connect(**self._params)
- cursor = conn.cursor()
- cursor.arraysize = 64
- if transaction_mode:
- conn.autocommit(True)
- cursor.execute("SET SESSION TRANSACTION %s" % transaction_mode)
- conn.autocommit(False)
- return conn, cursor
- except MySQLdb.OperationalError:
- log.warning("Unable to connect in %s", repr(self))
- raise
-
- def close(self, conn, cursor):
- """Close a connection and cursor, ignoring certain errors.
- """
- for obj in (cursor, conn):
- if obj is not None:
- try:
- obj.close()
- except (MySQLdb.InterfaceError,
- MySQLdb.OperationalError):
- pass
-
- def open_for_load(self):
- """Open and initialize a connection for loading objects.
-
- Returns (conn, cursor).
- """
- return self.open("ISOLATION LEVEL REPEATABLE READ")
-
- def restart_load(self, cursor):
- """After a rollback, reinitialize a connection for loading objects."""
- # No re-init necessary
- pass
-
- def get_object_count(self):
- """Returns the number of objects in the database"""
- # do later
- return 0
-
- def get_db_size(self):
- """Returns the approximate size of the database in bytes"""
- # do later
- return 0
-
- def load_current(self, cursor, oid):
- """Returns the current pickle and integer tid for an object.
-
- oid is an integer. Returns (None, None) if object does not exist.
- """
- cursor.execute("""
- SELECT state, tid
- FROM current_object
- JOIN object_state USING(zoid, tid)
- WHERE zoid = %s
- """, (oid,))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()
- else:
- return None, None
-
- def load_revision(self, cursor, oid, tid):
- """Returns the pickle for an object on a particular transaction.
-
- Returns None if no such state exists.
- """
- cursor.execute("""
- SELECT state
- FROM object_state
- WHERE zoid = %s
- AND tid = %s
- """, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- (state,) = cursor.fetchone()
- return state
- return None
-
- def exists(self, cursor, oid):
- """Returns a true value if the given object exists."""
- cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
- return cursor.rowcount
-
- def load_before(self, cursor, oid, tid):
- """Returns the pickle and tid of an object before transaction tid.
-
- Returns (None, None) if no earlier state exists.
- """
- cursor.execute("""
- SELECT state, tid
- FROM object_state
- WHERE zoid = %s
- AND tid < %s
- ORDER BY tid DESC
- LIMIT 1
- """, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()
- else:
- return None, None
-
- def get_object_tid_after(self, cursor, oid, tid):
- """Returns the tid of the next change after an object revision.
-
- Returns None if no later state exists.
- """
- stmt = """
- SELECT tid
- FROM object_state
- WHERE zoid = %s
- AND tid > %s
- ORDER BY tid
- LIMIT 1
- """
- cursor.execute(stmt, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()[0]
- else:
- return None
-
- def _make_temp_table(self, cursor):
- """Create the temporary table for storing objects"""
- stmt = """
- CREATE TEMPORARY TABLE temp_store (
- zoid BIGINT NOT NULL PRIMARY KEY,
- prev_tid BIGINT NOT NULL,
- md5 CHAR(32),
- state LONGBLOB
- ) ENGINE MyISAM
- """
- cursor.execute(stmt)
-
- def open_for_store(self):
- """Open and initialize a connection for storing objects.
-
- Returns (conn, cursor).
- """
- conn, cursor = self.open()
- try:
- self._make_temp_table(cursor)
- return conn, cursor
- except:
- self.close(conn, cursor)
- raise
-
- def restart_store(self, cursor):
- """Reuse a store connection."""
- try:
- cursor.connection.rollback()
- cursor.execute("TRUNCATE temp_store")
- except (MySQLdb.OperationalError, MySQLdb.InterfaceError):
- raise StorageError("database disconnected")
-
- def store_temp(self, cursor, oid, prev_tid, md5sum, data):
- """Store an object in the temporary table."""
- stmt = """
- INSERT INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (%s, %s, %s, %s)
- """
- cursor.execute(stmt, (oid, prev_tid, md5sum, MySQLdb.Binary(data)))
-
- def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
- """Replace an object in the temporary table."""
- stmt = """
- UPDATE temp_store SET
- prev_tid = %s,
- md5 = %s,
- state = %s
- WHERE zoid = %s
- """
- cursor.execute(stmt, (prev_tid, md5sum, MySQLdb.Binary(data), oid))
-
- def restore(self, cursor, oid, tid, md5sum, data):
- """Store an object directly, without conflict detection.
-
- Used for copying transactions into this database.
- """
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (%s, %s,
- COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
- %s, %s)
- """
- if data is not None:
- data = MySQLdb.Binary(data)
- cursor.execute(stmt, (oid, tid, oid, md5sum, data))
-
- def start_commit(self, cursor):
- """Prepare to commit."""
- # Hold commit_lock to prevent concurrent commits.
- cursor.execute("SELECT GET_LOCK('relstorage.commit', %s)",
- (commit_lock_timeout,))
- locked = cursor.fetchone()[0]
- if not locked:
- raise StorageError("Unable to acquire commit lock")
-
- def get_tid_and_time(self, cursor):
- """Returns the most recent tid and the current database time.
-
- The database time is the number of seconds since the epoch.
- """
- # Lock in share mode to ensure the data being read is up to date.
- cursor.execute("""
- SELECT tid, UNIX_TIMESTAMP()
- FROM transaction
- ORDER BY tid DESC
- LIMIT 1
- LOCK IN SHARE MODE
- """)
- assert cursor.rowcount == 1
- tid, timestamp = cursor.fetchone()
- # MySQL does not provide timestamps with more than one second
- # precision. To provide more precision, if the system time is
- # within one minute of the MySQL time, use the system time instead.
- now = time.time()
- if abs(now - timestamp) <= 60.0:
- timestamp = now
- return tid, timestamp
-
- def add_transaction(self, cursor, tid, username, description, extension,
- packed=False):
- """Add a transaction."""
- stmt = """
- INSERT INTO transaction
- (tid, packed, username, description, extension)
- VALUES (%s, %s, %s, %s, %s)
- """
- cursor.execute(stmt, (
- tid, packed, username, description, MySQLdb.Binary(extension)))
-
- def detect_conflict(self, cursor):
- """Find one conflict in the data about to be committed.
-
- If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
- attempted_data). If there is no conflict, returns None.
- """
- # Lock in share mode to ensure the data being read is up to date.
- stmt = """
- SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
- temp_store.state
- FROM temp_store
- JOIN current_object ON (temp_store.zoid = current_object.zoid)
- WHERE temp_store.prev_tid != current_object.tid
- LIMIT 1
- LOCK IN SHARE MODE
- """
- cursor.execute(stmt)
- if cursor.rowcount:
- return cursor.fetchone()
- return None
-
- def move_from_temp(self, cursor, tid):
- """Moved the temporarily stored objects to permanent storage.
-
- Returns the list of oids stored.
- """
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- SELECT zoid, %s, prev_tid, md5, state
- FROM temp_store
- """
- cursor.execute(stmt, (tid,))
-
- stmt = """
- SELECT zoid FROM temp_store
- """
- cursor.execute(stmt)
- return [oid for (oid,) in cursor]
-
- def update_current(self, cursor, tid):
- """Update the current object pointers.
-
- tid is the integer tid of the transaction being committed.
- """
- cursor.execute("""
- REPLACE INTO current_object (zoid, tid)
- SELECT zoid, tid FROM object_state
- WHERE tid = %s
- """, (tid,))
-
- def commit_phase1(self, cursor, tid):
- """Begin a commit. Returns the transaction name.
-
- This method should guarantee that commit_phase2() will succeed,
- meaning that if commit_phase2() would raise any error, the error
- should be raised in commit_phase1() instead.
- """
- return '-'
-
- def commit_phase2(self, cursor, txn):
- """Final transaction commit."""
- cursor.connection.commit()
- cursor.execute("SELECT RELEASE_LOCK('relstorage.commit')")
-
- def abort(self, cursor, txn=None):
- """Abort the commit. If txn is not None, phase 1 is also aborted."""
- cursor.connection.rollback()
- cursor.execute("SELECT RELEASE_LOCK('relstorage.commit')")
-
- def new_oid(self, cursor):
- """Return a new, unused OID."""
- stmt = "INSERT INTO new_oid VALUES ()"
- cursor.execute(stmt)
- oid = cursor.connection.insert_id()
- if oid % 100 == 0:
- # Clean out previously generated OIDs.
- stmt = "DELETE FROM new_oid WHERE zoid < %s"
- cursor.execute(stmt, (oid,))
- return oid
-
-
- def hold_pack_lock(self, cursor):
- """Try to acquire the pack lock.
-
- Raise an exception if packing or undo is already in progress.
- """
- stmt = "SELECT GET_LOCK('relstorage.pack', 0)"
- cursor.execute(stmt)
- res = cursor.fetchone()[0]
- if not res:
- raise StorageError('A pack or undo operation is in progress')
-
-
- def release_pack_lock(self, cursor):
- """Release the pack lock."""
- stmt = "SELECT RELEASE_LOCK('relstorage.pack')"
- cursor.execute(stmt)
-
-
- def pre_pack(self, pack_tid, get_references, gc=True):
- """Decide what to pack.
-
- This overrides the method by the same name in common.Adapter.
- """
- conn, cursor = self.open(transaction_mode=None)
- try:
- # This phase of packing works best with transactions
- # disabled. It changes no user-facing data.
- conn.autocommit(True)
- if gc:
- self._pre_pack_with_gc(cursor, pack_tid, get_references)
- else:
- self._pre_pack_without_gc(cursor, pack_tid)
- finally:
- self.close(conn, cursor)
-
-
- def _hold_commit_lock(self, cursor):
- """Hold the commit lock for packing.
-
- This overrides the method by the same name in common.Adapter.
- """
- cursor.execute("SELECT GET_LOCK('relstorage.commit', %s)",
- (commit_lock_timeout,))
- locked = cursor.fetchone()[0]
- if not locked:
- raise StorageError("Unable to acquire commit lock")
-
-
- def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
- """Polls for new transactions.
-
- conn and cursor must have been created previously by open_for_load().
- prev_polled_tid is the tid returned at the last poll, or None
- if this is the first poll. If ignore_tid is not None, changes
- committed in that transaction will not be included in the list
- of changed OIDs.
-
- Returns (changed_oids, new_polled_tid). Raises StorageError
- if the database has disconnected.
- """
- try:
- # find out the tid of the most recent transaction.
- stmt = "SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1"
- cursor.execute(stmt)
- # Expect the transaction table to always have at least one row.
- assert cursor.rowcount == 1
- new_polled_tid = cursor.fetchone()[0]
-
- if prev_polled_tid is None:
- # This is the first time the connection has polled.
- return None, new_polled_tid
-
- if new_polled_tid == prev_polled_tid:
- # No transactions have been committed since prev_polled_tid.
- return (), new_polled_tid
-
- stmt = "SELECT 1 FROM transaction WHERE tid = %s"
- cursor.execute(stmt, (prev_polled_tid,))
- if not cursor.rowcount:
- # Transaction not found; perhaps it has been packed.
- # The connection cache needs to be cleared.
- return None, new_polled_tid
-
- # Get the list of changed OIDs and return it.
- stmt = """
- SELECT DISTINCT zoid
- FROM object_state
- JOIN transaction USING (tid)
- WHERE tid > %s
- """
- if ignore_tid is not None:
- stmt += " AND tid != %d" % ignore_tid
- cursor.execute(stmt, (prev_polled_tid,))
- oids = [oid for (oid,) in cursor]
-
- return oids, new_polled_tid
-
- except (MySQLdb.OperationalError, MySQLdb.InterfaceError):
- raise StorageError("database disconnected")
-
Copied: relstorage/tags/1.0-beta1/relstorage/adapters/mysql.py (from rev 84129, relstorage/trunk/relstorage/adapters/mysql.py)
===================================================================
--- relstorage/tags/1.0-beta1/relstorage/adapters/mysql.py (rev 0)
+++ relstorage/tags/1.0-beta1/relstorage/adapters/mysql.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,623 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""MySQL adapter for RelStorage.
+
+Connection parameters supported by MySQLdb:
+
+host
+ string, host to connect
+user
+ string, user to connect as
+passwd
+ string, password to use
+db
+ string, database to use
+port
+ integer, TCP/IP port to connect to
+unix_socket
+ string, location of unix_socket (UNIX-ish only)
+conv
+ mapping, maps MySQL FIELD_TYPE.* to Python functions which convert a
+ string to the appropriate Python type
+connect_timeout
+ number of seconds to wait before the connection attempt fails.
+compress
+ if set, gzip compression is enabled
+named_pipe
+ if set, connect to server via named pipe (Windows only)
+init_command
+ command which is run once the connection is created
+read_default_file
+ see the MySQL documentation for mysql_options()
+read_default_group
+ see the MySQL documentation for mysql_options()
+client_flag
+ client flags from MySQLdb.constants.CLIENT
+load_infile
+ int, non-zero enables LOAD LOCAL INFILE, zero disables
+"""
+
+import logging
+import MySQLdb
+import time
+from ZODB.POSException import StorageError
+
+from common import Adapter
+
+log = logging.getLogger("relstorage.adapters.mysql")
+
+commit_lock_timeout = 30
+
+
+class MySQLAdapter(Adapter):
+ """MySQL adapter for RelStorage."""
+
+ def __init__(self, **params):
+ self._params = params.copy()
+ self._params['use_unicode'] = True
+ self._params['charset'] = 'utf8'
+
+ def create_schema(self, cursor):
+ """Create the database tables."""
+ stmt = """
+ -- The list of all transactions in the database
+ CREATE TABLE transaction (
+ tid BIGINT NOT NULL PRIMARY KEY,
+ packed BOOLEAN NOT NULL DEFAULT FALSE,
+ username VARCHAR(255) NOT NULL,
+ description TEXT NOT NULL,
+ extension BLOB
+ ) ENGINE = InnoDB CHARACTER SET utf8;
+
+ -- Create a special transaction to represent object creation. This
+ -- row is often referenced by object_state.prev_tid, but never by
+ -- object_state.tid.
+ INSERT INTO transaction (tid, username, description)
+ VALUES (0, 'system', 'special transaction for object creation');
+
+ -- All OIDs allocated in the database. Note that this table
+ -- is purposely non-transactional.
+ CREATE TABLE new_oid (
+ zoid BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT
+ ) ENGINE = MyISAM;
+
+ -- All object states in all transactions. Note that md5 and state
+ -- can be null to represent object uncreation.
+ CREATE TABLE object_state (
+ zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL REFERENCES transaction,
+ PRIMARY KEY (zoid, tid),
+ prev_tid BIGINT NOT NULL REFERENCES transaction,
+ md5 CHAR(32) CHARACTER SET ascii,
+ state LONGBLOB,
+ CHECK (tid > 0)
+ ) ENGINE = InnoDB;
+ CREATE INDEX object_state_tid ON object_state (tid);
+
+ -- Pointers to the current object state
+ CREATE TABLE current_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ tid BIGINT NOT NULL,
+ FOREIGN KEY (zoid, tid) REFERENCES object_state (zoid, tid)
+ ) ENGINE = InnoDB;
+
+ -- A list of referenced OIDs from each object_state.
+ -- This table is populated as needed during packing.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because rows in object_state
+ -- are never modified once committed, and rows are removed
+ -- from object_state only by packing.
+ CREATE TABLE object_ref (
+ zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL,
+ to_zoid BIGINT NOT NULL
+ ) ENGINE = MyISAM;
+ CREATE INDEX object_ref_from ON object_ref (zoid);
+ CREATE INDEX object_ref_tid ON object_ref (tid);
+ CREATE INDEX object_ref_to ON object_ref (to_zoid);
+
+ -- The object_refs_added table tracks whether object_refs has
+ -- been populated for all states in a given transaction.
+ -- An entry is added only when the work is finished.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because object states
+ -- are never added to a transaction once committed, and
+ -- rows are removed from the transaction table only by
+ -- packing.
+ CREATE TABLE object_refs_added (
+ tid BIGINT NOT NULL PRIMARY KEY
+ ) ENGINE = MyISAM;
+
+ -- Temporary state during packing:
+ -- The list of objects to pack. If keep is 'N',
+ -- the object and all its revisions will be removed.
+ -- If keep is 'Y', instead of removing the object,
+ -- the pack operation will cut the object's history.
+ -- If keep is 'Y' then the keep_tid field must also be set.
+ -- The keep_tid field specifies which revision to keep within
+ -- the list of packable transactions.
+ CREATE TABLE pack_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ keep BOOLEAN NOT NULL,
+ keep_tid BIGINT
+ ) ENGINE = MyISAM;
+ CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+ """
+ self._run_script(cursor, stmt)
+
+
+ def prepare_schema(self):
+ """Create the database schema if it does not already exist."""
+ conn, cursor = self.open()
+ try:
+ try:
+ cursor.execute("SHOW TABLES LIKE 'object_state'")
+ if not cursor.rowcount:
+ self.create_schema(cursor)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def zap_all(self):
+ """Clear all data out of the database."""
+ conn, cursor = self.open()
+ try:
+ try:
+ stmt = """
+ DELETE FROM object_refs_added;
+ DELETE FROM object_ref;
+ DELETE FROM current_object;
+ DELETE FROM object_state;
+ TRUNCATE new_oid;
+ DELETE FROM transaction;
+ -- Create a transaction to represent object creation.
+ INSERT INTO transaction (tid, username, description) VALUES
+ (0, 'system', 'special transaction for object creation');
+ """
+ self._run_script(cursor, stmt)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED"):
+ """Open a database connection and return (conn, cursor)."""
+ try:
+ conn = MySQLdb.connect(**self._params)
+ cursor = conn.cursor()
+ cursor.arraysize = 64
+ if transaction_mode:
+ conn.autocommit(True)
+ cursor.execute("SET SESSION TRANSACTION %s" % transaction_mode)
+ conn.autocommit(False)
+ return conn, cursor
+ except MySQLdb.OperationalError:
+ log.warning("Unable to connect in %s", repr(self))
+ raise
+
+ def close(self, conn, cursor):
+ """Close a connection and cursor, ignoring certain errors.
+ """
+ for obj in (cursor, conn):
+ if obj is not None:
+ try:
+ obj.close()
+ except (MySQLdb.InterfaceError,
+ MySQLdb.OperationalError):
+ pass
+
+ def open_for_load(self):
+ """Open and initialize a connection for loading objects.
+
+ Returns (conn, cursor).
+ """
+ return self.open("ISOLATION LEVEL REPEATABLE READ")
+
+ def restart_load(self, cursor):
+ """After a rollback, reinitialize a connection for loading objects."""
+ # No re-init necessary
+ pass
+
+ def get_object_count(self):
+ """Returns the number of objects in the database"""
+ # do later
+ return 0
+
+ def get_db_size(self):
+ """Returns the approximate size of the database in bytes"""
+ # do later
+ return 0
+
+ def load_current(self, cursor, oid):
+ """Returns the current pickle and integer tid for an object.
+
+ oid is an integer. Returns (None, None) if object does not exist.
+ """
+ cursor.execute("""
+ SELECT state, tid
+ FROM current_object
+ JOIN object_state USING(zoid, tid)
+ WHERE zoid = %s
+ """, (oid,))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()
+ else:
+ return None, None
+
+ def load_revision(self, cursor, oid, tid):
+ """Returns the pickle for an object on a particular transaction.
+
+ Returns None if no such state exists.
+ """
+ cursor.execute("""
+ SELECT state
+ FROM object_state
+ WHERE zoid = %s
+ AND tid = %s
+ """, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ (state,) = cursor.fetchone()
+ return state
+ return None
+
+ def exists(self, cursor, oid):
+ """Returns a true value if the given object exists."""
+ cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
+ return cursor.rowcount
+
+ def load_before(self, cursor, oid, tid):
+ """Returns the pickle and tid of an object before transaction tid.
+
+ Returns (None, None) if no earlier state exists.
+ """
+ cursor.execute("""
+ SELECT state, tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid < %s
+ ORDER BY tid DESC
+ LIMIT 1
+ """, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()
+ else:
+ return None, None
+
+ def get_object_tid_after(self, cursor, oid, tid):
+ """Returns the tid of the next change after an object revision.
+
+ Returns None if no later state exists.
+ """
+ stmt = """
+ SELECT tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid > %s
+ ORDER BY tid
+ LIMIT 1
+ """
+ cursor.execute(stmt, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()[0]
+ else:
+ return None
+
+ def _make_temp_table(self, cursor):
+ """Create the temporary table for storing objects"""
+ stmt = """
+ CREATE TEMPORARY TABLE temp_store (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ prev_tid BIGINT NOT NULL,
+ md5 CHAR(32),
+ state LONGBLOB
+ ) ENGINE MyISAM
+ """
+ cursor.execute(stmt)
+
+ def open_for_store(self):
+ """Open and initialize a connection for storing objects.
+
+ Returns (conn, cursor).
+ """
+ conn, cursor = self.open()
+ try:
+ self._make_temp_table(cursor)
+ return conn, cursor
+ except:
+ self.close(conn, cursor)
+ raise
+
+ def restart_store(self, cursor):
+ """Reuse a store connection."""
+ try:
+ cursor.connection.rollback()
+ cursor.execute("TRUNCATE temp_store")
+ except (MySQLdb.OperationalError, MySQLdb.InterfaceError):
+ raise StorageError("database disconnected")
+
+ def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+ """Store an object in the temporary table."""
+ stmt = """
+ INSERT INTO temp_store (zoid, prev_tid, md5, state)
+ VALUES (%s, %s, %s, %s)
+ """
+ cursor.execute(stmt, (oid, prev_tid, md5sum, MySQLdb.Binary(data)))
+
+ def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+ """Replace an object in the temporary table."""
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = %s,
+ md5 = %s,
+ state = %s
+ WHERE zoid = %s
+ """
+ cursor.execute(stmt, (prev_tid, md5sum, MySQLdb.Binary(data), oid))
+
+ def restore(self, cursor, oid, tid, md5sum, data):
+ """Store an object directly, without conflict detection.
+
+ Used for copying transactions into this database.
+ """
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ VALUES (%s, %s,
+ COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
+ %s, %s)
+ """
+ if data is not None:
+ data = MySQLdb.Binary(data)
+ cursor.execute(stmt, (oid, tid, oid, md5sum, data))
+
+ def start_commit(self, cursor):
+ """Prepare to commit."""
+ # Hold commit_lock to prevent concurrent commits.
+ cursor.execute("SELECT GET_LOCK('relstorage.commit', %s)",
+ (commit_lock_timeout,))
+ locked = cursor.fetchone()[0]
+ if not locked:
+ raise StorageError("Unable to acquire commit lock")
+
+ def get_tid_and_time(self, cursor):
+ """Returns the most recent tid and the current database time.
+
+ The database time is the number of seconds since the epoch.
+ """
+ # Lock in share mode to ensure the data being read is up to date.
+ cursor.execute("""
+ SELECT tid, UNIX_TIMESTAMP()
+ FROM transaction
+ ORDER BY tid DESC
+ LIMIT 1
+ LOCK IN SHARE MODE
+ """)
+ assert cursor.rowcount == 1
+ tid, timestamp = cursor.fetchone()
+ # MySQL does not provide timestamps with more than one second
+ # precision. To provide more precision, if the system time is
+ # within one minute of the MySQL time, use the system time instead.
+ now = time.time()
+ if abs(now - timestamp) <= 60.0:
+ timestamp = now
+ return tid, timestamp
+
+ def add_transaction(self, cursor, tid, username, description, extension,
+ packed=False):
+ """Add a transaction."""
+ stmt = """
+ INSERT INTO transaction
+ (tid, packed, username, description, extension)
+ VALUES (%s, %s, %s, %s, %s)
+ """
+ cursor.execute(stmt, (
+ tid, packed, username, description, MySQLdb.Binary(extension)))
+
+ def detect_conflict(self, cursor):
+ """Find one conflict in the data about to be committed.
+
+ If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+ attempted_data). If there is no conflict, returns None.
+ """
+ # Lock in share mode to ensure the data being read is up to date.
+ stmt = """
+ SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+ temp_store.state
+ FROM temp_store
+ JOIN current_object ON (temp_store.zoid = current_object.zoid)
+ WHERE temp_store.prev_tid != current_object.tid
+ LIMIT 1
+ LOCK IN SHARE MODE
+ """
+ cursor.execute(stmt)
+ if cursor.rowcount:
+ return cursor.fetchone()
+ return None
+
+ def move_from_temp(self, cursor, tid):
+ """Moved the temporarily stored objects to permanent storage.
+
+ Returns the list of oids stored.
+ """
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT zoid, %s, prev_tid, md5, state
+ FROM temp_store
+ """
+ cursor.execute(stmt, (tid,))
+
+ stmt = """
+ SELECT zoid FROM temp_store
+ """
+ cursor.execute(stmt)
+ return [oid for (oid,) in cursor]
+
+ def update_current(self, cursor, tid):
+ """Update the current object pointers.
+
+ tid is the integer tid of the transaction being committed.
+ """
+ cursor.execute("""
+ REPLACE INTO current_object (zoid, tid)
+ SELECT zoid, tid FROM object_state
+ WHERE tid = %s
+ """, (tid,))
+
+ def set_min_oid(self, cursor, oid):
+ """Ensure the next OID is at least the given OID."""
+ cursor.execute("REPLACE INTO new_oid VALUES(%s)", (oid,))
+
+ def commit_phase1(self, cursor, tid):
+ """Begin a commit. Returns the transaction name.
+
+ This method should guarantee that commit_phase2() will succeed,
+ meaning that if commit_phase2() would raise any error, the error
+ should be raised in commit_phase1() instead.
+ """
+ return '-'
+
+ def commit_phase2(self, cursor, txn):
+ """Final transaction commit."""
+ cursor.connection.commit()
+ cursor.execute("SELECT RELEASE_LOCK('relstorage.commit')")
+
+ def abort(self, cursor, txn=None):
+ """Abort the commit. If txn is not None, phase 1 is also aborted."""
+ cursor.connection.rollback()
+ cursor.execute("SELECT RELEASE_LOCK('relstorage.commit')")
+
+ def new_oid(self, cursor):
+ """Return a new, unused OID."""
+ stmt = "INSERT INTO new_oid VALUES ()"
+ cursor.execute(stmt)
+ oid = cursor.connection.insert_id()
+ if oid % 100 == 0:
+ # Clean out previously generated OIDs.
+ stmt = "DELETE FROM new_oid WHERE zoid < %s"
+ cursor.execute(stmt, (oid,))
+ return oid
+
+
+ def hold_pack_lock(self, cursor):
+ """Try to acquire the pack lock.
+
+ Raise an exception if packing or undo is already in progress.
+ """
+ stmt = "SELECT GET_LOCK('relstorage.pack', 0)"
+ cursor.execute(stmt)
+ res = cursor.fetchone()[0]
+ if not res:
+ raise StorageError('A pack or undo operation is in progress')
+
+
+ def release_pack_lock(self, cursor):
+ """Release the pack lock."""
+ stmt = "SELECT RELEASE_LOCK('relstorage.pack')"
+ cursor.execute(stmt)
+
+
+ def pre_pack(self, pack_tid, get_references, gc):
+ """Decide what to pack.
+
+ This overrides the method by the same name in common.Adapter.
+ """
+ conn, cursor = self.open(transaction_mode=None)
+ try:
+ # This phase of packing works best with transactions
+ # disabled. It changes no user-facing data.
+ conn.autocommit(True)
+ if gc:
+ self._pre_pack_with_gc(cursor, pack_tid, get_references)
+ else:
+ self._pre_pack_without_gc(cursor, pack_tid)
+ finally:
+ self.close(conn, cursor)
+
+
+ def _hold_commit_lock(self, cursor):
+ """Hold the commit lock for packing.
+
+ This overrides the method by the same name in common.Adapter.
+ """
+ cursor.execute("SELECT GET_LOCK('relstorage.commit', %s)",
+ (commit_lock_timeout,))
+ locked = cursor.fetchone()[0]
+ if not locked:
+ raise StorageError("Unable to acquire commit lock")
+
+
+ def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
+ """Polls for new transactions.
+
+ conn and cursor must have been created previously by open_for_load().
+ prev_polled_tid is the tid returned at the last poll, or None
+ if this is the first poll. If ignore_tid is not None, changes
+ committed in that transaction will not be included in the list
+ of changed OIDs.
+
+ Returns (changed_oids, new_polled_tid). Raises StorageError
+ if the database has disconnected.
+ """
+ try:
+ # find out the tid of the most recent transaction.
+ stmt = "SELECT tid FROM transaction ORDER BY tid DESC LIMIT 1"
+ cursor.execute(stmt)
+ # Expect the transaction table to always have at least one row.
+ assert cursor.rowcount == 1
+ new_polled_tid = cursor.fetchone()[0]
+
+ if prev_polled_tid is None:
+ # This is the first time the connection has polled.
+ return None, new_polled_tid
+
+ if new_polled_tid == prev_polled_tid:
+ # No transactions have been committed since prev_polled_tid.
+ return (), new_polled_tid
+
+ stmt = "SELECT 1 FROM transaction WHERE tid = %s"
+ cursor.execute(stmt, (prev_polled_tid,))
+ if not cursor.rowcount:
+ # Transaction not found; perhaps it has been packed.
+ # The connection cache needs to be cleared.
+ return None, new_polled_tid
+
+ # Get the list of changed OIDs and return it.
+ stmt = """
+ SELECT DISTINCT zoid
+ FROM object_state
+ JOIN transaction USING (tid)
+ WHERE tid > %s
+ """
+ if ignore_tid is not None:
+ stmt += " AND tid != %d" % ignore_tid
+ cursor.execute(stmt, (prev_polled_tid,))
+ oids = [oid for (oid,) in cursor]
+
+ return oids, new_polled_tid
+
+ except (MySQLdb.OperationalError, MySQLdb.InterfaceError):
+ raise StorageError("database disconnected")
+
Deleted: relstorage/tags/1.0-beta1/relstorage/adapters/oracle.py
===================================================================
--- relstorage/trunk/relstorage/adapters/oracle.py 2008-02-20 03:08:20 UTC (rev 84068)
+++ relstorage/tags/1.0-beta1/relstorage/adapters/oracle.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -1,704 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Oracle adapter for RelStorage."""
-
-import logging
-import re
-import thread
-import time
-import cx_Oracle
-from ZODB.POSException import StorageError
-
-from common import Adapter
-
-log = logging.getLogger("relstorage.adapters.oracle")
-
-
-class OracleAdapter(Adapter):
- """Oracle adapter for RelStorage."""
-
- _script_vars = {
- 'TRUE': "'Y'",
- 'FALSE': "'N'",
- 'OCTET_LENGTH': 'LENGTH',
- 'oid': ':oid',
- 'tid': ':tid',
- 'pack_tid': ':pack_tid',
- 'undo_tid': ':undo_tid',
- 'self_tid': ':self_tid',
- 'min_tid': ':min_tid',
- 'max_tid': ':max_tid',
- }
-
- _scripts = {
- 'select_keep_tid': """
- SELECT MAX(tid)
- FROM object_state
- WHERE zoid = pack_object.zoid
- AND tid > 0
- AND tid <= %(pack_tid)s
- """,
-
- 'choose_pack_transaction': """
- SELECT MAX(tid)
- FROM transaction
- WHERE tid > 0
- AND tid <= %(tid)s
- AND packed = 'N'
- """,
-
- 'create_temp_pack_visit': None,
- 'create_temp_undo': None,
- 'reset_temp_undo': "DELETE FROM temp_undo",
- }
-
- def __init__(self, user, password, dsn, twophase=False, arraysize=64):
- self._params = (user, password, dsn)
- self._twophase = twophase
- self._arraysize = arraysize
-
-
- def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
- """Execute a statement from a script with the given parameters.
-
- This overrides the method by the same name in common.Adapter.
- """
- if generic_params:
- # Oracle raises ORA-01036 if the parameter map contains extra keys,
- # so filter out any unused parameters.
- tracker = TrackingMap(self._script_vars)
- stmt = generic_stmt % tracker
- used = tracker.used
- params = {}
- for k, v in generic_params.iteritems():
- if k in used:
- params[k] = v
- else:
- stmt = generic_stmt % self._script_vars
- params = ()
-
- try:
- cursor.execute(stmt, params)
- except:
- log.warning("script statement failed: %r; parameters: %r",
- stmt, params)
- raise
-
-
- def create_schema(self, cursor):
- """Create the database tables."""
- stmt = """
- CREATE TABLE commit_lock (dummy CHAR);
-
- -- The list of all transactions in the database
- CREATE TABLE transaction (
- tid NUMBER(20) NOT NULL PRIMARY KEY,
- packed CHAR DEFAULT 'N' CHECK (packed IN ('N', 'Y')),
- username VARCHAR2(255),
- description VARCHAR2(4000),
- extension RAW(2000)
- );
-
- -- Create a special transaction to represent object creation. This
- -- row is often referenced by object_state.prev_tid, but never by
- -- object_state.tid.
- INSERT INTO transaction (tid, username, description)
- VALUES (0, 'system', 'special transaction for object creation');
-
- CREATE SEQUENCE zoid_seq;
-
- -- All object states in all transactions.
- -- md5 and state can be null to represent object uncreation.
- CREATE TABLE object_state (
- zoid NUMBER(20) NOT NULL,
- tid NUMBER(20) NOT NULL REFERENCES transaction
- CHECK (tid > 0),
- PRIMARY KEY (zoid, tid),
- prev_tid NUMBER(20) NOT NULL REFERENCES transaction,
- md5 CHAR(32),
- state BLOB
- );
- CREATE INDEX object_state_tid ON object_state (tid);
-
- -- Pointers to the current object state
- CREATE TABLE current_object (
- zoid NUMBER(20) NOT NULL PRIMARY KEY,
- tid NUMBER(20) NOT NULL,
- FOREIGN KEY (zoid, tid) REFERENCES object_state
- );
-
- -- States that will soon be stored
- CREATE GLOBAL TEMPORARY TABLE temp_store (
- zoid NUMBER(20) NOT NULL PRIMARY KEY,
- prev_tid NUMBER(20) NOT NULL,
- md5 CHAR(32),
- state BLOB
- ) ON COMMIT DELETE ROWS;
-
- -- During packing, an exclusive lock is held on pack_lock.
- CREATE TABLE pack_lock (dummy CHAR);
-
- -- A list of referenced OIDs from each object_state.
- -- This table is populated as needed during packing.
- -- To prevent unnecessary table locking, it does not use
- -- foreign keys, which is safe because rows in object_state
- -- are never modified once committed, and rows are removed
- -- from object_state only by packing.
- CREATE TABLE object_ref (
- zoid NUMBER(20) NOT NULL,
- tid NUMBER(20) NOT NULL,
- to_zoid NUMBER(20) NOT NULL
- );
- CREATE INDEX object_ref_from ON object_ref (zoid);
- CREATE INDEX object_ref_tid ON object_ref (tid);
- CREATE INDEX object_ref_to ON object_ref (to_zoid);
-
- -- The object_refs_added table tracks whether object_refs has
- -- been populated for all states in a given transaction.
- -- An entry is added only when the work is finished.
- -- To prevent unnecessary table locking, it does not use
- -- foreign keys, which is safe because object states
- -- are never added to a transaction once committed, and
- -- rows are removed from the transaction table only by
- -- packing.
- CREATE TABLE object_refs_added (
- tid NUMBER(20) NOT NULL PRIMARY KEY
- );
-
- -- Temporary state during packing:
- -- The list of objects to pack. If keep is 'N',
- -- the object and all its revisions will be removed.
- -- If keep is 'Y', instead of removing the object,
- -- the pack operation will cut the object's history.
- -- If keep is 'Y' then the keep_tid field must also be set.
- -- The keep_tid field specifies which revision to keep within
- -- the list of packable transactions.
- CREATE TABLE pack_object (
- zoid NUMBER(20) NOT NULL PRIMARY KEY,
- keep CHAR NOT NULL CHECK (keep IN ('N', 'Y')),
- keep_tid NUMBER(20)
- );
- CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
-
- -- Temporary state during packing: a list of objects
- -- whose references need to be examined.
- CREATE GLOBAL TEMPORARY TABLE temp_pack_visit (
- zoid NUMBER(20) NOT NULL PRIMARY KEY
- );
-
- -- Temporary state during undo: a list of objects
- -- to be undone and the tid of the undone state.
- CREATE GLOBAL TEMPORARY TABLE temp_undo (
- zoid NUMBER(20) NOT NULL PRIMARY KEY,
- prev_tid NUMBER(20) NOT NULL
- );
- """
- self._run_script(cursor, stmt)
-
-
- def prepare_schema(self):
- """Create the database schema if it does not already exist."""
- conn, cursor = self.open()
- try:
- try:
- cursor.execute("""
- SELECT 1 FROM USER_TABLES WHERE TABLE_NAME = 'OBJECT_STATE'
- """)
- if not cursor.fetchall():
- self.create_schema(cursor)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
-
-
- def zap(self):
- """Clear all data out of the database.
-
- Used by the test suite.
- """
- conn, cursor = self.open()
- try:
- try:
- stmt = """
- DELETE FROM object_refs_added;
- DELETE FROM object_ref;
- DELETE FROM current_object;
- DELETE FROM object_state;
- DELETE FROM transaction;
- -- Create a transaction to represent object creation.
- INSERT INTO transaction (tid, username, description) VALUES
- (0, 'system', 'special transaction for object creation');
- DROP SEQUENCE zoid_seq;
- CREATE SEQUENCE zoid_seq;
- """
- self._run_script(cursor, stmt)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
-
-
- def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
- twophase=False):
- """Open a database connection and return (conn, cursor)."""
- try:
- kw = {'twophase': twophase} #, 'threaded': True}
- conn = cx_Oracle.connect(*self._params, **kw)
- cursor = conn.cursor()
- cursor.arraysize = self._arraysize
- if transaction_mode:
- cursor.execute("SET TRANSACTION %s" % transaction_mode)
- return conn, cursor
-
- except cx_Oracle.OperationalError:
- log.warning("Unable to connect to DSN %s", self._params[2])
- raise
-
- def close(self, conn, cursor):
- """Close both a cursor and connection, ignoring certain errors."""
- for obj in (cursor, conn):
- if obj is not None:
- try:
- obj.close()
- except (cx_Oracle.InterfaceError,
- cx_Oracle.OperationalError):
- pass
-
- def open_for_load(self):
- """Open and initialize a connection for loading objects.
-
- Returns (conn, cursor).
- """
- return self.open('READ ONLY')
-
- def restart_load(self, cursor):
- """After a rollback, reinitialize a connection for loading objects."""
- cursor.execute("SET TRANSACTION READ ONLY")
-
- def get_object_count(self):
- """Returns the number of objects in the database"""
- # The tests expect an exact number, but the code below generates
- # an estimate, so this is disabled for now.
- if True:
- return 0
- else:
- conn, cursor = self.open('READ ONLY')
- try:
- cursor.execute("""
- SELECT NUM_ROWS
- FROM USER_TABLES
- WHERE TABLE_NAME = 'CURRENT_OBJECT'
- """)
- res = cursor.fetchone()[0]
- if res is None:
- res = 0
- else:
- res = int(res)
- return res
- finally:
- self.close(conn, cursor)
-
- def get_db_size(self):
- """Returns the approximate size of the database in bytes"""
- # May not be possible without access to the dba_* objects
- return 0
-
- def load_current(self, cursor, oid):
- """Returns the current pickle and integer tid for an object.
-
- oid is an integer. Returns (None, None) if object does not exist.
- """
- cursor.execute("""
- SELECT state, tid
- FROM current_object
- JOIN object_state USING(zoid, tid)
- WHERE zoid = :1
- """, (oid,))
- for state, tid in cursor:
- if state is not None:
- state = state.read()
- # else this object's creation has been undone
- return state, tid
- return None, None
-
- def load_revision(self, cursor, oid, tid):
- """Returns the pickle for an object on a particular transaction.
-
- Returns None if no such state exists.
- """
- cursor.execute("""
- SELECT state
- FROM object_state
- WHERE zoid = :1
- AND tid = :2
- """, (oid, tid))
- for (state,) in cursor:
- if state is not None:
- return state.read()
- return None
-
- def exists(self, cursor, oid):
- """Returns a true value if the given object exists."""
- cursor.execute("SELECT 1 FROM current_object WHERE zoid = :1", (oid,))
- return len(list(cursor))
-
- def load_before(self, cursor, oid, tid):
- """Returns the pickle and tid of an object before transaction tid.
-
- Returns (None, None) if no earlier state exists.
- """
- stmt = """
- SELECT state, tid
- FROM object_state
- WHERE zoid = :oid
- AND tid = (
- SELECT MAX(tid)
- FROM object_state
- WHERE zoid = :oid
- AND tid < :tid
- )
- """
- cursor.execute(stmt, {'oid': oid, 'tid': tid})
- for state, tid in cursor:
- if state is not None:
- state = state.read()
- # else this object's creation has been undone
- return state, tid
- return None, None
-
- def get_object_tid_after(self, cursor, oid, tid):
- """Returns the tid of the next change after an object revision.
-
- Returns None if no later state exists.
- """
- stmt = """
- SELECT MIN(tid)
- FROM object_state
- WHERE zoid = :1
- AND tid > :2
- """
- cursor.execute(stmt, (oid, tid))
- rows = cursor.fetchall()
- if rows:
- assert len(rows) == 1
- return rows[0][0]
- else:
- return None
-
- def _set_xid(self, cursor):
- """Set up a distributed transaction"""
- stmt = """
- SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
- """
- cursor.execute(stmt)
- xid = str(cursor.fetchone()[0])
- cursor.connection.begin(0, xid, '0')
-
- def open_for_store(self):
- """Open and initialize a connection for storing objects.
-
- Returns (conn, cursor).
- """
- if self._twophase:
- conn, cursor = self.open(transaction_mode=None, twophase=True)
- try:
- self._set_xid(cursor)
- except:
- self.close(conn, cursor)
- raise
- else:
- conn, cursor = self.open()
- return conn, cursor
-
- def restart_store(self, cursor):
- """Reuse a store connection."""
- try:
- cursor.connection.rollback()
- if self._twophase:
- self._set_xid(cursor)
- except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError):
- raise StorageError("database disconnected")
-
- def store_temp(self, cursor, oid, prev_tid, md5sum, data):
- """Store an object in the temporary table."""
- cursor.setinputsizes(data=cx_Oracle.BLOB)
- stmt = """
- INSERT INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (:oid, :prev_tid, :md5sum, :data)
- """
- cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
- md5sum=md5sum, data=cx_Oracle.Binary(data))
-
- def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
- """Replace an object in the temporary table."""
- cursor.setinputsizes(data=cx_Oracle.BLOB)
- stmt = """
- UPDATE temp_store SET
- prev_tid = :prev_tid,
- md5 = :md5sum,
- state = :data
- WHERE zoid = :oid
- """
- cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
- md5sum=md5sum, data=cx_Oracle.Binary(data))
-
- def restore(self, cursor, oid, tid, md5sum, data):
- """Store an object directly, without conflict detection.
-
- Used for copying transactions into this database.
- """
- cursor.setinputsizes(data=cx_Oracle.BLOB)
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (:oid, :tid,
- COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
- :md5sum, :data)
- """
- if data is not None:
- data = cx_Oracle.Binary(data)
- cursor.execute(stmt, oid=oid, tid=tid, md5sum=md5sum, data=data)
-
- def start_commit(self, cursor):
- """Prepare to commit."""
- # Hold commit_lock to prevent concurrent commits
- # (for as short a time as possible).
- # Lock transaction and current_object in share mode to ensure
- # conflict detection has the most current data.
- cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
- cursor.execute("LOCK TABLE transaction IN SHARE MODE")
- cursor.execute("LOCK TABLE current_object IN SHARE MODE")
-
- def _parse_dsinterval(self, s):
- """Convert an Oracle dsinterval (as a string) to a float."""
- mo = re.match(r'([+-]\d+) (\d+):(\d+):([0-9.]+)', s)
- if not mo:
- raise ValueError(s)
- day, hour, min, sec = [float(v) for v in mo.groups()]
- return day * 86400 + hour * 3600 + min * 60 + sec
-
- def get_tid_and_time(self, cursor):
- """Returns the most recent tid and the current database time.
-
- The database time is the number of seconds since the epoch.
- """
- cursor.execute("""
- SELECT MAX(tid), TO_CHAR(TO_DSINTERVAL(SYSTIMESTAMP - TO_TIMESTAMP_TZ(
- '1970-01-01 00:00:00 +00:00','YYYY-MM-DD HH24:MI:SS TZH:TZM')))
- FROM transaction
- """)
- tid, now = cursor.fetchone()
- return tid, self._parse_dsinterval(now)
-
- def add_transaction(self, cursor, tid, username, description, extension,
- packed=False):
- """Add a transaction."""
- stmt = """
- INSERT INTO transaction
- (tid, packed, username, description, extension)
- VALUES (:1, :2, :3, :4, :5)
- """
- cursor.execute(stmt, (
- tid, packed and 'Y' or 'N', username, description,
- cx_Oracle.Binary(extension)))
-
- def detect_conflict(self, cursor):
- """Find one conflict in the data about to be committed.
-
- If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
- attempted_data). If there is no conflict, returns None.
- """
- stmt = """
- SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
- temp_store.state
- FROM temp_store
- JOIN current_object ON (temp_store.zoid = current_object.zoid)
- WHERE temp_store.prev_tid != current_object.tid
- """
- cursor.execute(stmt)
- for oid, prev_tid, attempted_prev_tid, data in cursor:
- return oid, prev_tid, attempted_prev_tid, data.read()
- return None
-
- def move_from_temp(self, cursor, tid):
- """Moved the temporarily stored objects to permanent storage.
-
- Returns the list of oids stored.
- """
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- SELECT zoid, :tid, prev_tid, md5, state
- FROM temp_store
- """
- cursor.execute(stmt, tid=tid)
-
- stmt = """
- SELECT zoid FROM temp_store
- """
- cursor.execute(stmt)
- return [oid for (oid,) in cursor]
-
- def update_current(self, cursor, tid):
- """Update the current object pointers.
-
- tid is the integer tid of the transaction being committed.
- """
- # Insert objects created in this transaction into current_object.
- stmt = """
- INSERT INTO current_object (zoid, tid)
- SELECT zoid, tid FROM object_state
- WHERE tid = :1
- AND prev_tid = 0
- """
- cursor.execute(stmt, (tid,))
-
- # Change existing objects.
- stmt = """
- UPDATE current_object SET tid = :1
- WHERE zoid IN (
- SELECT zoid FROM object_state
- WHERE tid = :1
- AND prev_tid != 0
- )
- """
- cursor.execute(stmt, (tid,))
-
- def commit_phase1(self, cursor, tid):
- """Begin a commit. Returns the transaction name.
-
- This method should guarantee that commit_phase2() will succeed,
- meaning that if commit_phase2() would raise any error, the error
- should be raised in commit_phase1() instead.
- """
- if self._twophase:
- cursor.connection.prepare()
- return '-'
-
- def commit_phase2(self, cursor, txn):
- """Final transaction commit."""
- cursor.connection.commit()
-
- def abort(self, cursor, txn=None):
- """Abort the commit. If txn is not None, phase 1 is also aborted."""
- cursor.connection.rollback()
-
- def new_oid(self, cursor):
- """Return a new, unused OID."""
- stmt = "SELECT zoid_seq.nextval FROM DUAL"
- cursor.execute(stmt)
- return cursor.fetchone()[0]
-
-
- def hold_pack_lock(self, cursor):
- """Try to acquire the pack lock.
-
- Raise an exception if packing or undo is already in progress.
- """
- stmt = """
- LOCK TABLE pack_lock IN EXCLUSIVE MODE NOWAIT
- """
- try:
- cursor.execute(stmt)
- except cx_Oracle.DatabaseError:
- raise StorageError('A pack or undo operation is in progress')
-
- def release_pack_lock(self, cursor):
- """Release the pack lock."""
- # No action needed
- pass
-
-
- def _add_object_ref_rows(self, cursor, add_rows):
- """Add rows to object_ref.
-
- The input rows are tuples containing (from_zoid, tid, to_zoid).
-
- This overrides the method by the same name in common.Adapter.
- """
- stmt = """
- INSERT INTO object_ref (zoid, tid, to_zoid)
- VALUES (:1, :2, :3)
- """
- cursor.executemany(stmt, add_rows)
-
-
- def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
- """Polls for new transactions.
-
- conn and cursor must have been created previously by open_for_load().
- prev_polled_tid is the tid returned at the last poll, or None
- if this is the first poll. If ignore_tid is not None, changes
- committed in that transaction will not be included in the list
- of changed OIDs.
-
- Returns (changed_oids, new_polled_tid). Raises StorageError
- if the database has disconnected.
- """
- try:
- # find out the tid of the most recent transaction.
- stmt = "SELECT MAX(tid) FROM transaction"
- cursor.execute(stmt)
- new_polled_tid = list(cursor)[0][0]
-
- if prev_polled_tid is None:
- # This is the first time the connection has polled.
- return None, new_polled_tid
-
- if new_polled_tid == prev_polled_tid:
- # No transactions have been committed since prev_polled_tid.
- return (), new_polled_tid
-
- stmt = "SELECT 1 FROM transaction WHERE tid = :1"
- cursor.execute(stmt, (prev_polled_tid,))
- rows = cursor.fetchall()
- if not rows:
- # Transaction not found; perhaps it has been packed.
- # The connection cache needs to be cleared.
- return None, new_polled_tid
-
- # Get the list of changed OIDs and return it.
- stmt = """
- SELECT DISTINCT zoid
- FROM object_state
- JOIN transaction USING (tid)
- WHERE tid > :1
- """
- if ignore_tid is not None:
- stmt += " AND tid != %d" % ignore_tid
- cursor.execute(stmt, (prev_polled_tid,))
- oids = [oid for (oid,) in cursor]
-
- return oids, new_polled_tid
-
- except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError):
- raise StorageError("database disconnected")
-
-
-class TrackingMap:
- """Provides values for keys while tracking which keys are accessed."""
-
- def __init__(self, source):
- self.source = source
- self.used = set()
-
- def __getitem__(self, key):
- self.used.add(key)
- return self.source[key]
-
Copied: relstorage/tags/1.0-beta1/relstorage/adapters/oracle.py (from rev 84129, relstorage/trunk/relstorage/adapters/oracle.py)
===================================================================
--- relstorage/tags/1.0-beta1/relstorage/adapters/oracle.py (rev 0)
+++ relstorage/tags/1.0-beta1/relstorage/adapters/oracle.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,742 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Oracle adapter for RelStorage."""
+
+import logging
+import re
+import thread
+import time
+import cx_Oracle
+from ZODB.POSException import StorageError
+
+from common import Adapter
+
+log = logging.getLogger("relstorage.adapters.oracle")
+
+
+class OracleAdapter(Adapter):
+ """Oracle adapter for RelStorage."""
+
+ _script_vars = {
+ 'TRUE': "'Y'",
+ 'FALSE': "'N'",
+ 'OCTET_LENGTH': 'LENGTH',
+ 'oid': ':oid',
+ 'tid': ':tid',
+ 'pack_tid': ':pack_tid',
+ 'undo_tid': ':undo_tid',
+ 'self_tid': ':self_tid',
+ 'min_tid': ':min_tid',
+ 'max_tid': ':max_tid',
+ }
+
+ _scripts = {
+ 'select_keep_tid': """
+ SELECT MAX(tid)
+ FROM object_state
+ WHERE zoid = pack_object.zoid
+ AND tid > 0
+ AND tid <= %(pack_tid)s
+ """,
+
+ 'choose_pack_transaction': """
+ SELECT MAX(tid)
+ FROM transaction
+ WHERE tid > 0
+ AND tid <= %(tid)s
+ AND packed = 'N'
+ """,
+
+ 'create_temp_pack_visit': None,
+ 'create_temp_undo': None,
+ 'reset_temp_undo': "DELETE FROM temp_undo",
+ }
+
+ def __init__(self, user, password, dsn, twophase=False, arraysize=64):
+ self._params = (user, password, dsn)
+ self._twophase = twophase
+ self._arraysize = arraysize
+
+
+ def _run_script_stmt(self, cursor, generic_stmt, generic_params=()):
+ """Execute a statement from a script with the given parameters.
+
+ This overrides the method by the same name in common.Adapter.
+ """
+ if generic_params:
+ # Oracle raises ORA-01036 if the parameter map contains extra keys,
+ # so filter out any unused parameters.
+ tracker = TrackingMap(self._script_vars)
+ stmt = generic_stmt % tracker
+ used = tracker.used
+ params = {}
+ for k, v in generic_params.iteritems():
+ if k in used:
+ params[k] = v
+ else:
+ stmt = generic_stmt % self._script_vars
+ params = ()
+
+ try:
+ cursor.execute(stmt, params)
+ except:
+ log.warning("script statement failed: %r; parameters: %r",
+ stmt, params)
+ raise
+
+
+ def create_schema(self, cursor):
+ """Create the database tables."""
+ stmt = """
+ CREATE TABLE commit_lock (dummy CHAR);
+
+ -- The list of all transactions in the database
+ CREATE TABLE transaction (
+ tid NUMBER(20) NOT NULL PRIMARY KEY,
+ packed CHAR DEFAULT 'N' CHECK (packed IN ('N', 'Y')),
+ username NVARCHAR2(255),
+ description NVARCHAR2(2000),
+ extension RAW(2000)
+ );
+
+ -- Create a special transaction to represent object creation. This
+ -- row is often referenced by object_state.prev_tid, but never by
+ -- object_state.tid.
+ INSERT INTO transaction (tid, username, description)
+ VALUES (0, 'system', 'special transaction for object creation');
+
+ CREATE SEQUENCE zoid_seq;
+
+ -- All object states in all transactions.
+ -- md5 and state can be null to represent object uncreation.
+ CREATE TABLE object_state (
+ zoid NUMBER(20) NOT NULL,
+ tid NUMBER(20) NOT NULL REFERENCES transaction
+ CHECK (tid > 0),
+ PRIMARY KEY (zoid, tid),
+ prev_tid NUMBER(20) NOT NULL REFERENCES transaction,
+ md5 CHAR(32),
+ state BLOB
+ );
+ CREATE INDEX object_state_tid ON object_state (tid);
+
+ -- Pointers to the current object state
+ CREATE TABLE current_object (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ tid NUMBER(20) NOT NULL,
+ FOREIGN KEY (zoid, tid) REFERENCES object_state
+ );
+
+ -- States that will soon be stored
+ CREATE GLOBAL TEMPORARY TABLE temp_store (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ prev_tid NUMBER(20) NOT NULL,
+ md5 CHAR(32),
+ state BLOB
+ ) ON COMMIT DELETE ROWS;
+
+ -- During packing, an exclusive lock is held on pack_lock.
+ CREATE TABLE pack_lock (dummy CHAR);
+
+ -- A list of referenced OIDs from each object_state.
+ -- This table is populated as needed during packing.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because rows in object_state
+ -- are never modified once committed, and rows are removed
+ -- from object_state only by packing.
+ CREATE TABLE object_ref (
+ zoid NUMBER(20) NOT NULL,
+ tid NUMBER(20) NOT NULL,
+ to_zoid NUMBER(20) NOT NULL
+ );
+ CREATE INDEX object_ref_from ON object_ref (zoid);
+ CREATE INDEX object_ref_tid ON object_ref (tid);
+ CREATE INDEX object_ref_to ON object_ref (to_zoid);
+
+ -- The object_refs_added table tracks whether object_refs has
+ -- been populated for all states in a given transaction.
+ -- An entry is added only when the work is finished.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because object states
+ -- are never added to a transaction once committed, and
+ -- rows are removed from the transaction table only by
+ -- packing.
+ CREATE TABLE object_refs_added (
+ tid NUMBER(20) NOT NULL PRIMARY KEY
+ );
+
+ -- Temporary state during packing:
+ -- The list of objects to pack. If keep is 'N',
+ -- the object and all its revisions will be removed.
+ -- If keep is 'Y', instead of removing the object,
+ -- the pack operation will cut the object's history.
+ -- If keep is 'Y' then the keep_tid field must also be set.
+ -- The keep_tid field specifies which revision to keep within
+ -- the list of packable transactions.
+ CREATE TABLE pack_object (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ keep CHAR NOT NULL CHECK (keep IN ('N', 'Y')),
+ keep_tid NUMBER(20)
+ );
+ CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+
+ -- Temporary state during packing: a list of objects
+ -- whose references need to be examined.
+ CREATE GLOBAL TEMPORARY TABLE temp_pack_visit (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY
+ );
+
+ -- Temporary state during undo: a list of objects
+ -- to be undone and the tid of the undone state.
+ CREATE GLOBAL TEMPORARY TABLE temp_undo (
+ zoid NUMBER(20) NOT NULL PRIMARY KEY,
+ prev_tid NUMBER(20) NOT NULL
+ );
+ """
+ self._run_script(cursor, stmt)
+
+
+ def prepare_schema(self):
+ """Create the database schema if it does not already exist."""
+ conn, cursor = self.open()
+ try:
+ try:
+ cursor.execute("""
+ SELECT 1 FROM USER_TABLES WHERE TABLE_NAME = 'OBJECT_STATE'
+ """)
+ if not cursor.fetchall():
+ self.create_schema(cursor)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def zap_all(self):
+ """Clear all data out of the database."""
+ conn, cursor = self.open()
+ try:
+ try:
+ stmt = """
+ DELETE FROM object_refs_added;
+ DELETE FROM object_ref;
+ DELETE FROM current_object;
+ DELETE FROM object_state;
+ DELETE FROM transaction;
+ -- Create a transaction to represent object creation.
+ INSERT INTO transaction (tid, username, description) VALUES
+ (0, 'system', 'special transaction for object creation');
+ DROP SEQUENCE zoid_seq;
+ CREATE SEQUENCE zoid_seq;
+ """
+ self._run_script(cursor, stmt)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
+ twophase=False):
+ """Open a database connection and return (conn, cursor)."""
+ try:
+ kw = {'twophase': twophase} #, 'threaded': True}
+ conn = cx_Oracle.connect(*self._params, **kw)
+ cursor = conn.cursor()
+ cursor.arraysize = self._arraysize
+ if transaction_mode:
+ cursor.execute("SET TRANSACTION %s" % transaction_mode)
+ return conn, cursor
+
+ except cx_Oracle.OperationalError:
+ log.warning("Unable to connect to DSN %s", self._params[2])
+ raise
+
+ def close(self, conn, cursor):
+ """Close both a cursor and connection, ignoring certain errors."""
+ for obj in (cursor, conn):
+ if obj is not None:
+ try:
+ obj.close()
+ except (cx_Oracle.InterfaceError,
+ cx_Oracle.OperationalError):
+ pass
+
+ def open_for_load(self):
+ """Open and initialize a connection for loading objects.
+
+ Returns (conn, cursor).
+ """
+ return self.open('READ ONLY')
+
+ def restart_load(self, cursor):
+ """After a rollback, reinitialize a connection for loading objects."""
+ cursor.execute("SET TRANSACTION READ ONLY")
+
+ def get_object_count(self):
+ """Returns the number of objects in the database"""
+ # The tests expect an exact number, but the code below generates
+ # an estimate, so this is disabled for now.
+ if True:
+ return 0
+ else:
+ conn, cursor = self.open('READ ONLY')
+ try:
+ cursor.execute("""
+ SELECT NUM_ROWS
+ FROM USER_TABLES
+ WHERE TABLE_NAME = 'CURRENT_OBJECT'
+ """)
+ res = cursor.fetchone()[0]
+ if res is None:
+ res = 0
+ else:
+ res = int(res)
+ return res
+ finally:
+ self.close(conn, cursor)
+
+ def get_db_size(self):
+ """Returns the approximate size of the database in bytes"""
+ # May not be possible without access to the dba_* objects
+ return 0
+
+ def load_current(self, cursor, oid):
+ """Returns the current pickle and integer tid for an object.
+
+ oid is an integer. Returns (None, None) if object does not exist.
+ """
+ cursor.execute("""
+ SELECT state, tid
+ FROM current_object
+ JOIN object_state USING(zoid, tid)
+ WHERE zoid = :1
+ """, (oid,))
+ for state, tid in cursor:
+ if state is not None:
+ state = state.read()
+ # else this object's creation has been undone
+ return state, tid
+ return None, None
+
+ def load_revision(self, cursor, oid, tid):
+ """Returns the pickle for an object on a particular transaction.
+
+ Returns None if no such state exists.
+ """
+ cursor.execute("""
+ SELECT state
+ FROM object_state
+ WHERE zoid = :1
+ AND tid = :2
+ """, (oid, tid))
+ for (state,) in cursor:
+ if state is not None:
+ return state.read()
+ return None
+
+ def exists(self, cursor, oid):
+ """Returns a true value if the given object exists."""
+ cursor.execute("SELECT 1 FROM current_object WHERE zoid = :1", (oid,))
+ return len(list(cursor))
+
+ def load_before(self, cursor, oid, tid):
+ """Returns the pickle and tid of an object before transaction tid.
+
+ Returns (None, None) if no earlier state exists.
+ """
+ stmt = """
+ SELECT state, tid
+ FROM object_state
+ WHERE zoid = :oid
+ AND tid = (
+ SELECT MAX(tid)
+ FROM object_state
+ WHERE zoid = :oid
+ AND tid < :tid
+ )
+ """
+ cursor.execute(stmt, {'oid': oid, 'tid': tid})
+ for state, tid in cursor:
+ if state is not None:
+ state = state.read()
+ # else this object's creation has been undone
+ return state, tid
+ return None, None
+
+ def get_object_tid_after(self, cursor, oid, tid):
+ """Returns the tid of the next change after an object revision.
+
+ Returns None if no later state exists.
+ """
+ stmt = """
+ SELECT MIN(tid)
+ FROM object_state
+ WHERE zoid = :1
+ AND tid > :2
+ """
+ cursor.execute(stmt, (oid, tid))
+ rows = cursor.fetchall()
+ if rows:
+ assert len(rows) == 1
+ return rows[0][0]
+ else:
+ return None
+
+ def _set_xid(self, cursor):
+ """Set up a distributed transaction"""
+ stmt = """
+ SELECT SYS_CONTEXT('USERENV', 'SID') FROM DUAL
+ """
+ cursor.execute(stmt)
+ xid = str(cursor.fetchone()[0])
+ cursor.connection.begin(0, xid, '0')
+
+ def open_for_store(self):
+ """Open and initialize a connection for storing objects.
+
+ Returns (conn, cursor).
+ """
+ if self._twophase:
+ conn, cursor = self.open(transaction_mode=None, twophase=True)
+ try:
+ self._set_xid(cursor)
+ except:
+ self.close(conn, cursor)
+ raise
+ else:
+ conn, cursor = self.open()
+ return conn, cursor
+
+ def restart_store(self, cursor):
+ """Reuse a store connection."""
+ try:
+ cursor.connection.rollback()
+ if self._twophase:
+ self._set_xid(cursor)
+ except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError):
+ raise StorageError("database disconnected")
+
+ def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+ """Store an object in the temporary table."""
+ cursor.setinputsizes(data=cx_Oracle.BLOB)
+ stmt = """
+ INSERT INTO temp_store (zoid, prev_tid, md5, state)
+ VALUES (:oid, :prev_tid, :md5sum, :data)
+ """
+ cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+ md5sum=md5sum, data=cx_Oracle.Binary(data))
+
+ def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+ """Replace an object in the temporary table."""
+ cursor.setinputsizes(data=cx_Oracle.BLOB)
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = :prev_tid,
+ md5 = :md5sum,
+ state = :data
+ WHERE zoid = :oid
+ """
+ cursor.execute(stmt, oid=oid, prev_tid=prev_tid,
+ md5sum=md5sum, data=cx_Oracle.Binary(data))
+
+ def restore(self, cursor, oid, tid, md5sum, data):
+ """Store an object directly, without conflict detection.
+
+ Used for copying transactions into this database.
+ """
+ cursor.setinputsizes(data=cx_Oracle.BLOB)
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ VALUES (:oid, :tid,
+ COALESCE((SELECT tid FROM current_object WHERE zoid = :oid), 0),
+ :md5sum, :data)
+ """
+ if data is not None:
+ data = cx_Oracle.Binary(data)
+ cursor.execute(stmt, oid=oid, tid=tid, md5sum=md5sum, data=data)
+
+ def start_commit(self, cursor):
+ """Prepare to commit."""
+ # Hold commit_lock to prevent concurrent commits
+ # (for as short a time as possible).
+ # Lock transaction and current_object in share mode to ensure
+ # conflict detection has the most current data.
+ cursor.execute("LOCK TABLE commit_lock IN EXCLUSIVE MODE")
+ cursor.execute("LOCK TABLE transaction IN SHARE MODE")
+ cursor.execute("LOCK TABLE current_object IN SHARE MODE")
+
+ def _parse_dsinterval(self, s):
+ """Convert an Oracle dsinterval (as a string) to a float."""
+ mo = re.match(r'([+-]\d+) (\d+):(\d+):([0-9.]+)', s)
+ if not mo:
+ raise ValueError(s)
+ day, hour, min, sec = [float(v) for v in mo.groups()]
+ return day * 86400 + hour * 3600 + min * 60 + sec
+
+ def get_tid_and_time(self, cursor):
+ """Returns the most recent tid and the current database time.
+
+ The database time is the number of seconds since the epoch.
+ """
+ cursor.execute("""
+ SELECT MAX(tid), TO_CHAR(TO_DSINTERVAL(SYSTIMESTAMP - TO_TIMESTAMP_TZ(
+ '1970-01-01 00:00:00 +00:00','YYYY-MM-DD HH24:MI:SS TZH:TZM')))
+ FROM transaction
+ """)
+ tid, now = cursor.fetchone()
+ return tid, self._parse_dsinterval(now)
+
+ def add_transaction(self, cursor, tid, username, description, extension,
+ packed=False):
+ """Add a transaction."""
+ stmt = """
+ INSERT INTO transaction
+ (tid, packed, username, description, extension)
+ VALUES (:1, :2, :3, :4, :5)
+ """
+ encoding = cursor.connection.encoding
+ cursor.execute(stmt, (
+ tid, packed and 'Y' or 'N', username.encode(encoding),
+ description.encode(encoding), cx_Oracle.Binary(extension)))
+
+ def detect_conflict(self, cursor):
+ """Find one conflict in the data about to be committed.
+
+ If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+ attempted_data). If there is no conflict, returns None.
+ """
+ stmt = """
+ SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+ temp_store.state
+ FROM temp_store
+ JOIN current_object ON (temp_store.zoid = current_object.zoid)
+ WHERE temp_store.prev_tid != current_object.tid
+ """
+ cursor.execute(stmt)
+ for oid, prev_tid, attempted_prev_tid, data in cursor:
+ return oid, prev_tid, attempted_prev_tid, data.read()
+ return None
+
+ def move_from_temp(self, cursor, tid):
+ """Moved the temporarily stored objects to permanent storage.
+
+ Returns the list of oids stored.
+ """
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT zoid, :tid, prev_tid, md5, state
+ FROM temp_store
+ """
+ cursor.execute(stmt, tid=tid)
+
+ stmt = """
+ SELECT zoid FROM temp_store
+ """
+ cursor.execute(stmt)
+ return [oid for (oid,) in cursor]
+
+ def update_current(self, cursor, tid):
+ """Update the current object pointers.
+
+ tid is the integer tid of the transaction being committed.
+ """
+ # Insert objects created in this transaction into current_object.
+ stmt = """
+ INSERT INTO current_object (zoid, tid)
+ SELECT zoid, tid FROM object_state
+ WHERE tid = :1
+ AND prev_tid = 0
+ """
+ cursor.execute(stmt, (tid,))
+
+ # Change existing objects.
+ stmt = """
+ UPDATE current_object SET tid = :1
+ WHERE zoid IN (
+ SELECT zoid FROM object_state
+ WHERE tid = :1
+ AND prev_tid != 0
+ )
+ """
+ cursor.execute(stmt, (tid,))
+
+ def set_min_oid(self, cursor, oid):
+ """Ensure the next OID is at least the given OID."""
+ next_oid = self.new_oid(cursor)
+ if next_oid < oid:
+ # Oracle provides no way modify the sequence value
+ # except through alter sequence or drop/create sequence,
+ # but either statement kills the current transaction.
+ # Therefore, open a temporary connection to make the
+ # alteration.
+ conn2, cursor2 = self.open()
+ try:
+ # Change the sequence by altering the increment.
+ # (this is safer than dropping and re-creating the sequence)
+ diff = oid - next_oid
+ cursor2.execute(
+ "ALTER SEQUENCE zoid_seq INCREMENT BY %d" % diff)
+ cursor2.execute("SELECT zoid_seq.nextval FROM DUAL")
+ cursor2.execute("ALTER SEQUENCE zoid_seq INCREMENT BY 1")
+ conn2.commit()
+ finally:
+ self.close(conn2, cursor2)
+
+ def commit_phase1(self, cursor, tid):
+ """Begin a commit. Returns the transaction name.
+
+ This method should guarantee that commit_phase2() will succeed,
+ meaning that if commit_phase2() would raise any error, the error
+ should be raised in commit_phase1() instead.
+ """
+ if self._twophase:
+ cursor.connection.prepare()
+ return '-'
+
+ def commit_phase2(self, cursor, txn):
+ """Final transaction commit."""
+ cursor.connection.commit()
+
+ def abort(self, cursor, txn=None):
+ """Abort the commit. If txn is not None, phase 1 is also aborted."""
+ cursor.connection.rollback()
+
+
+ def new_oid(self, cursor):
+ """Return a new, unused OID."""
+ stmt = "SELECT zoid_seq.nextval FROM DUAL"
+ cursor.execute(stmt)
+ return cursor.fetchone()[0]
+
+
+ def _transaction_iterator(self, cursor):
+ """Iterate over a list of transactions returned from the database.
+
+ Each row begins with (tid, username, description, extension)
+ and may have other columns.
+
+ This overrides the default implementation.
+ """
+ encoding = cursor.connection.encoding
+ for row in cursor:
+ tid, username, description = row[:3]
+ if username is not None:
+ username = username.decode(encoding)
+ if description is not None:
+ description = description.decode(encoding)
+ yield (tid, username, description) + tuple(row[3:])
+
+
+ def hold_pack_lock(self, cursor):
+ """Try to acquire the pack lock.
+
+ Raise an exception if packing or undo is already in progress.
+ """
+ stmt = """
+ LOCK TABLE pack_lock IN EXCLUSIVE MODE NOWAIT
+ """
+ try:
+ cursor.execute(stmt)
+ except cx_Oracle.DatabaseError:
+ raise StorageError('A pack or undo operation is in progress')
+
+ def release_pack_lock(self, cursor):
+ """Release the pack lock."""
+ # No action needed
+ pass
+
+
+ def _add_object_ref_rows(self, cursor, add_rows):
+ """Add rows to object_ref.
+
+ The input rows are tuples containing (from_zoid, tid, to_zoid).
+
+ This overrides the method by the same name in common.Adapter.
+ """
+ stmt = """
+ INSERT INTO object_ref (zoid, tid, to_zoid)
+ VALUES (:1, :2, :3)
+ """
+ cursor.executemany(stmt, add_rows)
+
+
+ def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
+ """Polls for new transactions.
+
+ conn and cursor must have been created previously by open_for_load().
+ prev_polled_tid is the tid returned at the last poll, or None
+ if this is the first poll. If ignore_tid is not None, changes
+ committed in that transaction will not be included in the list
+ of changed OIDs.
+
+ Returns (changed_oids, new_polled_tid). Raises StorageError
+ if the database has disconnected.
+ """
+ try:
+ # find out the tid of the most recent transaction.
+ stmt = "SELECT MAX(tid) FROM transaction"
+ cursor.execute(stmt)
+ new_polled_tid = list(cursor)[0][0]
+
+ if prev_polled_tid is None:
+ # This is the first time the connection has polled.
+ return None, new_polled_tid
+
+ if new_polled_tid == prev_polled_tid:
+ # No transactions have been committed since prev_polled_tid.
+ return (), new_polled_tid
+
+ stmt = "SELECT 1 FROM transaction WHERE tid = :1"
+ cursor.execute(stmt, (prev_polled_tid,))
+ rows = cursor.fetchall()
+ if not rows:
+ # Transaction not found; perhaps it has been packed.
+ # The connection cache needs to be cleared.
+ return None, new_polled_tid
+
+ # Get the list of changed OIDs and return it.
+ stmt = """
+ SELECT DISTINCT zoid
+ FROM object_state
+ JOIN transaction USING (tid)
+ WHERE tid > :1
+ """
+ if ignore_tid is not None:
+ stmt += " AND tid != %d" % ignore_tid
+ cursor.execute(stmt, (prev_polled_tid,))
+ oids = [oid for (oid,) in cursor]
+
+ return oids, new_polled_tid
+
+ except (cx_Oracle.OperationalError, cx_Oracle.InterfaceError):
+ raise StorageError("database disconnected")
+
+
+class TrackingMap:
+ """Provides values for keys while tracking which keys are accessed."""
+
+ def __init__(self, source):
+ self.source = source
+ self.used = set()
+
+ def __getitem__(self, key):
+ self.used.add(key)
+ return self.source[key]
Deleted: relstorage/tags/1.0-beta1/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/trunk/relstorage/adapters/postgresql.py 2008-02-20 03:08:20 UTC (rev 84068)
+++ relstorage/tags/1.0-beta1/relstorage/adapters/postgresql.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -1,624 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""PostgreSQL adapter for RelStorage."""
-
-from base64 import decodestring, encodestring
-import logging
-import psycopg2, psycopg2.extensions
-from ZODB.POSException import StorageError
-
-from common import Adapter
-
-log = logging.getLogger("relstorage.adapters.postgresql")
-
-
-# Notes about adapters:
-#
-# An adapter must not hold a connection, cursor, or database state, because
-# RelStorage opens multiple concurrent connections using a single adapter
-# instance.
-# All OID and TID values are integers, not binary strings, except as noted.
-
-
-class PostgreSQLAdapter(Adapter):
- """PostgreSQL adapter for RelStorage."""
-
- def __init__(self, dsn='', twophase=False):
- self._dsn = dsn
- self._twophase = twophase
-
- def create_schema(self, cursor):
- """Create the database tables."""
- stmt = """
- CREATE TABLE commit_lock ();
-
- -- The list of all transactions in the database
- CREATE TABLE transaction (
- tid BIGINT NOT NULL PRIMARY KEY,
- packed BOOLEAN NOT NULL DEFAULT FALSE,
- username VARCHAR(255) NOT NULL,
- description TEXT NOT NULL,
- extension BYTEA
- );
-
- -- Create a special transaction to represent object creation. This
- -- row is often referenced by object_state.prev_tid, but never by
- -- object_state.tid.
- INSERT INTO transaction (tid, username, description)
- VALUES (0, 'system', 'special transaction for object creation');
-
- CREATE SEQUENCE zoid_seq;
-
- -- All object states in all transactions. Note that md5 and state
- -- can be null to represent object uncreation.
- CREATE TABLE object_state (
- zoid BIGINT NOT NULL,
- tid BIGINT NOT NULL REFERENCES transaction
- CHECK (tid > 0),
- PRIMARY KEY (zoid, tid),
- prev_tid BIGINT NOT NULL REFERENCES transaction,
- md5 CHAR(32),
- state BYTEA
- );
- CREATE INDEX object_state_tid ON object_state (tid);
-
- -- Pointers to the current object state
- CREATE TABLE current_object (
- zoid BIGINT NOT NULL PRIMARY KEY,
- tid BIGINT NOT NULL,
- FOREIGN KEY (zoid, tid) REFERENCES object_state
- );
-
- -- During packing, an exclusive lock is held on pack_lock.
- CREATE TABLE pack_lock ();
-
- -- A list of referenced OIDs from each object_state.
- -- This table is populated as needed during packing.
- -- To prevent unnecessary table locking, it does not use
- -- foreign keys, which is safe because rows in object_state
- -- are never modified once committed, and rows are removed
- -- from object_state only by packing.
- CREATE TABLE object_ref (
- zoid BIGINT NOT NULL,
- tid BIGINT NOT NULL,
- to_zoid BIGINT NOT NULL
- );
- CREATE INDEX object_ref_from ON object_ref (zoid);
- CREATE INDEX object_ref_tid ON object_ref (tid);
- CREATE INDEX object_ref_to ON object_ref (to_zoid);
-
- -- The object_refs_added table tracks whether object_refs has
- -- been populated for all states in a given transaction.
- -- An entry is added only when the work is finished.
- -- To prevent unnecessary table locking, it does not use
- -- foreign keys, which is safe because object states
- -- are never added to a transaction once committed, and
- -- rows are removed from the transaction table only by
- -- packing.
- CREATE TABLE object_refs_added (
- tid BIGINT NOT NULL PRIMARY KEY
- );
-
- -- Temporary state during packing:
- -- The list of objects to pack. If keep is 'N',
- -- the object and all its revisions will be removed.
- -- If keep is 'Y', instead of removing the object,
- -- the pack operation will cut the object's history.
- -- If keep is 'Y' then the keep_tid field must also be set.
- -- The keep_tid field specifies which revision to keep within
- -- the list of packable transactions.
- CREATE TABLE pack_object (
- zoid BIGINT NOT NULL PRIMARY KEY,
- keep BOOLEAN NOT NULL,
- keep_tid BIGINT
- );
- CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
- """
- cursor.execute(stmt)
-
-
- def prepare_schema(self):
- """Create the database schema if it does not already exist."""
- conn, cursor = self.open()
- try:
- try:
- cursor.execute("""
- SELECT tablename
- FROM pg_tables
- WHERE tablename = 'object_state'
- """)
- if not cursor.rowcount:
- self.create_schema(cursor)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
-
-
- def zap(self):
- """Clear all data out of the database.
-
- Used by the test suite.
- """
- conn, cursor = self.open()
- try:
- try:
- cursor.execute("""
- DELETE FROM object_refs_added;
- DELETE FROM object_ref;
- DELETE FROM current_object;
- DELETE FROM object_state;
- DELETE FROM transaction;
- -- Create a special transaction to represent object creation.
- INSERT INTO transaction (tid, username, description) VALUES
- (0, 'system', 'special transaction for object creation');
- ALTER SEQUENCE zoid_seq START WITH 1;
- """)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
-
-
- def open(self,
- isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED):
- """Open a database connection and return (conn, cursor)."""
- try:
- conn = psycopg2.connect(self._dsn)
- conn.set_isolation_level(isolation)
- cursor = conn.cursor()
- cursor.arraysize = 64
- except psycopg2.OperationalError:
- log.warning("Unable to connect in %s", repr(self))
- raise
- return conn, cursor
-
- def close(self, conn, cursor):
- """Close a connection and cursor, ignoring certain errors.
- """
- for obj in (cursor, conn):
- if obj is not None:
- try:
- obj.close()
- except (psycopg2.InterfaceError,
- psycopg2.OperationalError):
- pass
-
- def open_for_load(self):
- """Open and initialize a connection for loading objects.
-
- Returns (conn, cursor).
- """
- conn, cursor = self.open(
- psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
- stmt = """
- PREPARE get_latest_tid AS
- SELECT tid
- FROM transaction
- ORDER BY tid DESC
- LIMIT 1
- """
- cursor.execute(stmt)
- return conn, cursor
-
- def restart_load(self, cursor):
- """After a rollback, reinitialize a connection for loading objects."""
- # No re-init necessary
- pass
-
- def get_object_count(self):
- """Returns the number of objects in the database"""
- # do later
- return 0
-
- def get_db_size(self):
- """Returns the approximate size of the database in bytes"""
- conn, cursor = self.open()
- try:
- cursor.execute("SELECT pg_database_size(current_database())")
- return cursor.fetchone()[0]
- finally:
- self.close(conn, cursor)
-
- def load_current(self, cursor, oid):
- """Returns the current pickle and integer tid for an object.
-
- oid is an integer. Returns (None, None) if object does not exist.
- """
- cursor.execute("""
- SELECT encode(state, 'base64'), tid
- FROM current_object
- JOIN object_state USING(zoid, tid)
- WHERE zoid = %s
- """, (oid,))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- state64, tid = cursor.fetchone()
- if state64 is not None:
- state = decodestring(state64)
- else:
- # This object's creation has been undone
- state = None
- return state, tid
- else:
- return None, None
-
- def load_revision(self, cursor, oid, tid):
- """Returns the pickle for an object on a particular transaction.
-
- Returns None if no such state exists.
- """
- cursor.execute("""
- SELECT encode(state, 'base64')
- FROM object_state
- WHERE zoid = %s
- AND tid = %s
- """, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- (state64,) = cursor.fetchone()
- if state64 is not None:
- return decodestring(state64)
- return None
-
- def exists(self, cursor, oid):
- """Returns a true value if the given object exists."""
- cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
- return cursor.rowcount
-
- def load_before(self, cursor, oid, tid):
- """Returns the pickle and tid of an object before transaction tid.
-
- Returns (None, None) if no earlier state exists.
- """
- cursor.execute("""
- SELECT encode(state, 'base64'), tid
- FROM object_state
- WHERE zoid = %s
- AND tid < %s
- ORDER BY tid DESC
- LIMIT 1
- """, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- state64, tid = cursor.fetchone()
- if state64 is not None:
- state = decodestring(state64)
- else:
- # The object's creation has been undone
- state = None
- return state, tid
- else:
- return None, None
-
- def get_object_tid_after(self, cursor, oid, tid):
- """Returns the tid of the next change after an object revision.
-
- Returns None if no later state exists.
- """
- stmt = """
- SELECT tid
- FROM object_state
- WHERE zoid = %s
- AND tid > %s
- ORDER BY tid
- LIMIT 1
- """
- cursor.execute(stmt, (oid, tid))
- if cursor.rowcount:
- assert cursor.rowcount == 1
- return cursor.fetchone()[0]
- else:
- return None
-
- def _make_temp_table(self, cursor):
- """Create the temporary table for storing objects"""
- if self._twophase:
- # PostgreSQL does not allow two phase transactions
- # to use temporary tables. :-(
- stmt = """
- CREATE TABLE temp_store (
- zoid BIGINT NOT NULL,
- prev_tid BIGINT NOT NULL,
- md5 CHAR(32),
- state BYTEA
- );
- CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
- """
- else:
- stmt = """
- CREATE TEMPORARY TABLE temp_store (
- zoid BIGINT NOT NULL,
- prev_tid BIGINT NOT NULL,
- md5 CHAR(32),
- state BYTEA
- ) ON COMMIT DROP;
- CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
- """
- cursor.execute(stmt)
-
- def open_for_store(self):
- """Open and initialize a connection for storing objects.
-
- Returns (conn, cursor).
- """
- conn, cursor = self.open()
- try:
- self._make_temp_table(cursor)
- return conn, cursor
- except:
- self.close(conn, cursor)
- raise
-
- def restart_store(self, cursor):
- """Reuse a store connection."""
- try:
- cursor.connection.rollback()
- if self._twophase:
- cursor.connection.set_isolation_level(
- psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
- self._make_temp_table(cursor)
- except (psycopg2.OperationalError, psycopg2.InterfaceError):
- raise StorageError("database disconnected")
-
- def store_temp(self, cursor, oid, prev_tid, md5sum, data):
- """Store an object in the temporary table."""
- stmt = """
- INSERT INTO temp_store (zoid, prev_tid, md5, state)
- VALUES (%s, %s, %s, decode(%s, 'base64'))
- """
- cursor.execute(stmt, (oid, prev_tid, md5sum, encodestring(data)))
-
- def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
- """Replace an object in the temporary table."""
- stmt = """
- UPDATE temp_store SET
- prev_tid = %s,
- md5 = %s,
- state = decode(%s, 'base64')
- WHERE zoid = %s
- """
- cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
-
- def restore(self, cursor, oid, tid, md5sum, data):
- """Store an object directly, without conflict detection.
-
- Used for copying transactions into this database.
- """
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- VALUES (%s, %s,
- COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
- %s, decode(%s, 'base64'))
- """
- if data is not None:
- data = encodestring(data)
- cursor.execute(stmt, (oid, tid, oid, md5sum, data))
-
- def start_commit(self, cursor):
- """Prepare to commit."""
- # Hold commit_lock to prevent concurrent commits
- # (for as short a time as possible).
- # Lock transaction and current_object in share mode to ensure
- # conflict detection has the most current data.
- cursor.execute("""
- LOCK TABLE commit_lock IN EXCLUSIVE MODE;
- LOCK TABLE transaction IN SHARE MODE;
- LOCK TABLE current_object IN SHARE MODE
- """)
-
- def get_tid_and_time(self, cursor):
- """Returns the most recent tid and the current database time.
-
- The database time is the number of seconds since the epoch.
- """
- cursor.execute("""
- SELECT tid, EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
- FROM transaction
- ORDER BY tid DESC
- LIMIT 1
- """)
- assert cursor.rowcount == 1
- return cursor.fetchone()
-
- def add_transaction(self, cursor, tid, username, description, extension,
- packed=False):
- """Add a transaction."""
- stmt = """
- INSERT INTO transaction
- (tid, packed, username, description, extension)
- VALUES (%s, %s, %s, %s, decode(%s, 'base64'))
- """
- cursor.execute(stmt, (
- tid, packed, username, description, encodestring(extension)))
-
- def detect_conflict(self, cursor):
- """Find one conflict in the data about to be committed.
-
- If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
- attempted_data). If there is no conflict, returns None.
- """
- stmt = """
- SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
- encode(temp_store.state, 'base64')
- FROM temp_store
- JOIN current_object ON (temp_store.zoid = current_object.zoid)
- WHERE temp_store.prev_tid != current_object.tid
- LIMIT 1
- """
- cursor.execute(stmt)
- if cursor.rowcount:
- oid, prev_tid, attempted_prev_tid, data = cursor.fetchone()
- return oid, prev_tid, attempted_prev_tid, decodestring(data)
- return None
-
- def move_from_temp(self, cursor, tid):
- """Moved the temporarily stored objects to permanent storage.
-
- Returns the list of oids stored.
- """
- stmt = """
- INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
- SELECT zoid, %s, prev_tid, md5, state
- FROM temp_store
- """
- cursor.execute(stmt, (tid,))
-
- stmt = """
- SELECT zoid FROM temp_store
- """
- cursor.execute(stmt)
- return [oid for (oid,) in cursor]
-
- def update_current(self, cursor, tid):
- """Update the current object pointers.
-
- tid is the integer tid of the transaction being committed.
- """
- cursor.execute("""
- -- Insert objects created in this transaction into current_object.
- INSERT INTO current_object (zoid, tid)
- SELECT zoid, tid FROM object_state
- WHERE tid = %(tid)s
- AND prev_tid = 0;
-
- -- Change existing objects. To avoid deadlocks,
- -- update in OID order.
- UPDATE current_object SET tid = %(tid)s
- WHERE zoid IN (
- SELECT zoid FROM object_state
- WHERE tid = %(tid)s
- AND prev_tid != 0
- ORDER BY zoid
- )
- """, {'tid': tid})
-
- def commit_phase1(self, cursor, tid):
- """Begin a commit. Returns the transaction name.
-
- This method should guarantee that commit_phase2() will succeed,
- meaning that if commit_phase2() would raise any error, the error
- should be raised in commit_phase1() instead.
- """
- if self._twophase:
- txn = 'T%d' % tid
- stmt = """
- DROP TABLE temp_store;
- PREPARE TRANSACTION %s
- """
- cursor.execute(stmt, (txn,))
- cursor.connection.set_isolation_level(
- psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
- return txn
- else:
- return '-'
-
- def commit_phase2(self, cursor, txn):
- """Final transaction commit."""
- if self._twophase:
- cursor.execute('COMMIT PREPARED %s', (txn,))
- else:
- cursor.connection.commit()
-
- def abort(self, cursor, txn=None):
- """Abort the commit. If txn is not None, phase 1 is also aborted."""
- if self._twophase:
- if txn is not None:
- cursor.execute('ROLLBACK PREPARED %s', (txn,))
- else:
- cursor.connection.rollback()
-
- def new_oid(self, cursor):
- """Return a new, unused OID."""
- stmt = "SELECT NEXTVAL('zoid_seq')"
- cursor.execute(stmt)
- return cursor.fetchone()[0]
-
-
- def hold_pack_lock(self, cursor):
- """Try to acquire the pack lock.
-
- Raise an exception if packing or undo is already in progress.
- """
- stmt = """
- LOCK pack_lock IN EXCLUSIVE MODE NOWAIT
- """
- try:
- cursor.execute(stmt)
- except psycopg2.DatabaseError:
- raise StorageError('A pack or undo operation is in progress')
-
- def release_pack_lock(self, cursor):
- """Release the pack lock."""
- # No action needed
- pass
-
-
- def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
- """Polls for new transactions.
-
- conn and cursor must have been created previously by open_for_load().
- prev_polled_tid is the tid returned at the last poll, or None
- if this is the first poll. If ignore_tid is not None, changes
- committed in that transaction will not be included in the list
- of changed OIDs.
-
- Returns (changed_oids, new_polled_tid). Raises StorageError
- if the database has disconnected.
- """
- try:
- # find out the tid of the most recent transaction.
- cursor.execute("EXECUTE get_latest_tid")
- # Expect the transaction table to always have at least one row.
- assert cursor.rowcount == 1
- new_polled_tid = cursor.fetchone()[0]
-
- if prev_polled_tid is None:
- # This is the first time the connection has polled.
- return None, new_polled_tid
-
- if new_polled_tid == prev_polled_tid:
- # No transactions have been committed since prev_polled_tid.
- return (), new_polled_tid
-
- stmt = "SELECT 1 FROM transaction WHERE tid = %s"
- cursor.execute(stmt, (prev_polled_tid,))
- if not cursor.rowcount:
- # Transaction not found; perhaps it has been packed.
- # The connection cache needs to be cleared.
- return None, new_polled_tid
-
- # Get the list of changed OIDs and return it.
- stmt = """
- SELECT DISTINCT zoid
- FROM object_state
- JOIN transaction USING (tid)
- WHERE tid > %s
- """
- if ignore_tid is not None:
- stmt += " AND tid != %d" % ignore_tid
- cursor.execute(stmt, (prev_polled_tid,))
- oids = [oid for (oid,) in cursor]
-
- return oids, new_polled_tid
-
- except (psycopg2.OperationalError, psycopg2.InterfaceError):
- raise StorageError("database disconnected")
-
Copied: relstorage/tags/1.0-beta1/relstorage/adapters/postgresql.py (from rev 84129, relstorage/trunk/relstorage/adapters/postgresql.py)
===================================================================
--- relstorage/tags/1.0-beta1/relstorage/adapters/postgresql.py (rev 0)
+++ relstorage/tags/1.0-beta1/relstorage/adapters/postgresql.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,625 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""PostgreSQL adapter for RelStorage."""
+
+from base64 import decodestring, encodestring
+import logging
+import psycopg2, psycopg2.extensions
+from ZODB.POSException import StorageError
+
+from common import Adapter
+
+log = logging.getLogger("relstorage.adapters.postgresql")
+
+psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
+
+
+class PostgreSQLAdapter(Adapter):
+ """PostgreSQL adapter for RelStorage."""
+
+ def __init__(self, dsn='', twophase=False):
+ self._dsn = dsn
+ self._twophase = twophase
+
+ def create_schema(self, cursor):
+ """Create the database tables."""
+ stmt = """
+ CREATE TABLE commit_lock ();
+
+ -- The list of all transactions in the database
+ CREATE TABLE transaction (
+ tid BIGINT NOT NULL PRIMARY KEY,
+ packed BOOLEAN NOT NULL DEFAULT FALSE,
+ username VARCHAR(255) NOT NULL,
+ description TEXT NOT NULL,
+ extension BYTEA
+ );
+
+ -- Create a special transaction to represent object creation. This
+ -- row is often referenced by object_state.prev_tid, but never by
+ -- object_state.tid.
+ INSERT INTO transaction (tid, username, description)
+ VALUES (0, 'system', 'special transaction for object creation');
+
+ CREATE SEQUENCE zoid_seq;
+
+ -- All object states in all transactions. Note that md5 and state
+ -- can be null to represent object uncreation.
+ CREATE TABLE object_state (
+ zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL REFERENCES transaction
+ CHECK (tid > 0),
+ PRIMARY KEY (zoid, tid),
+ prev_tid BIGINT NOT NULL REFERENCES transaction,
+ md5 CHAR(32),
+ state BYTEA
+ );
+ CREATE INDEX object_state_tid ON object_state (tid);
+
+ -- Pointers to the current object state
+ CREATE TABLE current_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ tid BIGINT NOT NULL,
+ FOREIGN KEY (zoid, tid) REFERENCES object_state
+ );
+
+ -- During packing, an exclusive lock is held on pack_lock.
+ CREATE TABLE pack_lock ();
+
+ -- A list of referenced OIDs from each object_state.
+ -- This table is populated as needed during packing.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because rows in object_state
+ -- are never modified once committed, and rows are removed
+ -- from object_state only by packing.
+ CREATE TABLE object_ref (
+ zoid BIGINT NOT NULL,
+ tid BIGINT NOT NULL,
+ to_zoid BIGINT NOT NULL
+ );
+ CREATE INDEX object_ref_from ON object_ref (zoid);
+ CREATE INDEX object_ref_tid ON object_ref (tid);
+ CREATE INDEX object_ref_to ON object_ref (to_zoid);
+
+ -- The object_refs_added table tracks whether object_refs has
+ -- been populated for all states in a given transaction.
+ -- An entry is added only when the work is finished.
+ -- To prevent unnecessary table locking, it does not use
+ -- foreign keys, which is safe because object states
+ -- are never added to a transaction once committed, and
+ -- rows are removed from the transaction table only by
+ -- packing.
+ CREATE TABLE object_refs_added (
+ tid BIGINT NOT NULL PRIMARY KEY
+ );
+
+ -- Temporary state during packing:
+ -- The list of objects to pack. If keep is 'N',
+ -- the object and all its revisions will be removed.
+ -- If keep is 'Y', instead of removing the object,
+ -- the pack operation will cut the object's history.
+ -- If keep is 'Y' then the keep_tid field must also be set.
+ -- The keep_tid field specifies which revision to keep within
+ -- the list of packable transactions.
+ CREATE TABLE pack_object (
+ zoid BIGINT NOT NULL PRIMARY KEY,
+ keep BOOLEAN NOT NULL,
+ keep_tid BIGINT
+ );
+ CREATE INDEX pack_object_keep_zoid ON pack_object (keep, zoid);
+ """
+ cursor.execute(stmt)
+
+
+ def prepare_schema(self):
+ """Create the database schema if it does not already exist."""
+ conn, cursor = self.open()
+ try:
+ try:
+ cursor.execute("""
+ SELECT tablename
+ FROM pg_tables
+ WHERE tablename = 'object_state'
+ """)
+ if not cursor.rowcount:
+ self.create_schema(cursor)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def zap_all(self):
+ """Clear all data out of the database."""
+ conn, cursor = self.open()
+ try:
+ try:
+ cursor.execute("""
+ DELETE FROM object_refs_added;
+ DELETE FROM object_ref;
+ DELETE FROM current_object;
+ DELETE FROM object_state;
+ DELETE FROM transaction;
+ -- Create a special transaction to represent object creation.
+ INSERT INTO transaction (tid, username, description) VALUES
+ (0, 'system', 'special transaction for object creation');
+ ALTER SEQUENCE zoid_seq START WITH 1;
+ """)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ finally:
+ self.close(conn, cursor)
+
+
+ def open(self,
+ isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED):
+ """Open a database connection and return (conn, cursor)."""
+ try:
+ conn = psycopg2.connect(self._dsn)
+ conn.set_client_encoding('UNICODE')
+ conn.set_isolation_level(isolation)
+ cursor = conn.cursor()
+ cursor.arraysize = 64
+ except psycopg2.OperationalError:
+ log.warning("Unable to connect in %s", repr(self))
+ raise
+ return conn, cursor
+
+ def close(self, conn, cursor):
+ """Close a connection and cursor, ignoring certain errors.
+ """
+ for obj in (cursor, conn):
+ if obj is not None:
+ try:
+ obj.close()
+ except (psycopg2.InterfaceError,
+ psycopg2.OperationalError):
+ pass
+
+ def open_for_load(self):
+ """Open and initialize a connection for loading objects.
+
+ Returns (conn, cursor).
+ """
+ conn, cursor = self.open(
+ psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+ stmt = """
+ PREPARE get_latest_tid AS
+ SELECT tid
+ FROM transaction
+ ORDER BY tid DESC
+ LIMIT 1
+ """
+ cursor.execute(stmt)
+ return conn, cursor
+
+ def restart_load(self, cursor):
+ """After a rollback, reinitialize a connection for loading objects."""
+ # No re-init necessary
+ pass
+
+ def get_object_count(self):
+ """Returns the number of objects in the database"""
+ # do later
+ return 0
+
+ def get_db_size(self):
+ """Returns the approximate size of the database in bytes"""
+ conn, cursor = self.open()
+ try:
+ cursor.execute("SELECT pg_database_size(current_database())")
+ return cursor.fetchone()[0]
+ finally:
+ self.close(conn, cursor)
+
+ def load_current(self, cursor, oid):
+ """Returns the current pickle and integer tid for an object.
+
+ oid is an integer. Returns (None, None) if object does not exist.
+ """
+ cursor.execute("""
+ SELECT encode(state, 'base64'), tid
+ FROM current_object
+ JOIN object_state USING(zoid, tid)
+ WHERE zoid = %s
+ """, (oid,))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ state64, tid = cursor.fetchone()
+ if state64 is not None:
+ state = decodestring(state64)
+ else:
+ # This object's creation has been undone
+ state = None
+ return state, tid
+ else:
+ return None, None
+
+ def load_revision(self, cursor, oid, tid):
+ """Returns the pickle for an object on a particular transaction.
+
+ Returns None if no such state exists.
+ """
+ cursor.execute("""
+ SELECT encode(state, 'base64')
+ FROM object_state
+ WHERE zoid = %s
+ AND tid = %s
+ """, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ (state64,) = cursor.fetchone()
+ if state64 is not None:
+ return decodestring(state64)
+ return None
+
+ def exists(self, cursor, oid):
+ """Returns a true value if the given object exists."""
+ cursor.execute("SELECT 1 FROM current_object WHERE zoid = %s", (oid,))
+ return cursor.rowcount
+
+ def load_before(self, cursor, oid, tid):
+ """Returns the pickle and tid of an object before transaction tid.
+
+ Returns (None, None) if no earlier state exists.
+ """
+ cursor.execute("""
+ SELECT encode(state, 'base64'), tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid < %s
+ ORDER BY tid DESC
+ LIMIT 1
+ """, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ state64, tid = cursor.fetchone()
+ if state64 is not None:
+ state = decodestring(state64)
+ else:
+ # The object's creation has been undone
+ state = None
+ return state, tid
+ else:
+ return None, None
+
+ def get_object_tid_after(self, cursor, oid, tid):
+ """Returns the tid of the next change after an object revision.
+
+ Returns None if no later state exists.
+ """
+ stmt = """
+ SELECT tid
+ FROM object_state
+ WHERE zoid = %s
+ AND tid > %s
+ ORDER BY tid
+ LIMIT 1
+ """
+ cursor.execute(stmt, (oid, tid))
+ if cursor.rowcount:
+ assert cursor.rowcount == 1
+ return cursor.fetchone()[0]
+ else:
+ return None
+
+ def _make_temp_table(self, cursor):
+ """Create the temporary table for storing objects"""
+ if self._twophase:
+ # PostgreSQL does not allow two phase transactions
+ # to use temporary tables. :-(
+ stmt = """
+ CREATE TABLE temp_store (
+ zoid BIGINT NOT NULL,
+ prev_tid BIGINT NOT NULL,
+ md5 CHAR(32),
+ state BYTEA
+ );
+ CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+ """
+ else:
+ stmt = """
+ CREATE TEMPORARY TABLE temp_store (
+ zoid BIGINT NOT NULL,
+ prev_tid BIGINT NOT NULL,
+ md5 CHAR(32),
+ state BYTEA
+ ) ON COMMIT DROP;
+ CREATE UNIQUE INDEX temp_store_zoid ON temp_store (zoid)
+ """
+ cursor.execute(stmt)
+
+ def open_for_store(self):
+ """Open and initialize a connection for storing objects.
+
+ Returns (conn, cursor).
+ """
+ conn, cursor = self.open()
+ try:
+ self._make_temp_table(cursor)
+ return conn, cursor
+ except:
+ self.close(conn, cursor)
+ raise
+
+ def restart_store(self, cursor):
+ """Reuse a store connection."""
+ try:
+ cursor.connection.rollback()
+ if self._twophase:
+ cursor.connection.set_isolation_level(
+ psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
+ self._make_temp_table(cursor)
+ except (psycopg2.OperationalError, psycopg2.InterfaceError):
+ raise StorageError("database disconnected")
+
+ def store_temp(self, cursor, oid, prev_tid, md5sum, data):
+ """Store an object in the temporary table."""
+ stmt = """
+ INSERT INTO temp_store (zoid, prev_tid, md5, state)
+ VALUES (%s, %s, %s, decode(%s, 'base64'))
+ """
+ cursor.execute(stmt, (oid, prev_tid, md5sum, encodestring(data)))
+
+ def replace_temp(self, cursor, oid, prev_tid, md5sum, data):
+ """Replace an object in the temporary table."""
+ stmt = """
+ UPDATE temp_store SET
+ prev_tid = %s,
+ md5 = %s,
+ state = decode(%s, 'base64')
+ WHERE zoid = %s
+ """
+ cursor.execute(stmt, (prev_tid, md5sum, encodestring(data), oid))
+
+ def restore(self, cursor, oid, tid, md5sum, data):
+ """Store an object directly, without conflict detection.
+
+ Used for copying transactions into this database.
+ """
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ VALUES (%s, %s,
+ COALESCE((SELECT tid FROM current_object WHERE zoid = %s), 0),
+ %s, decode(%s, 'base64'))
+ """
+ if data is not None:
+ data = encodestring(data)
+ cursor.execute(stmt, (oid, tid, oid, md5sum, data))
+
+ def start_commit(self, cursor):
+ """Prepare to commit."""
+ # Hold commit_lock to prevent concurrent commits
+ # (for as short a time as possible).
+ # Lock transaction and current_object in share mode to ensure
+ # conflict detection has the most current data.
+ cursor.execute("""
+ LOCK TABLE commit_lock IN EXCLUSIVE MODE;
+ LOCK TABLE transaction IN SHARE MODE;
+ LOCK TABLE current_object IN SHARE MODE
+ """)
+
+ def get_tid_and_time(self, cursor):
+ """Returns the most recent tid and the current database time.
+
+ The database time is the number of seconds since the epoch.
+ """
+ cursor.execute("""
+ SELECT tid, EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
+ FROM transaction
+ ORDER BY tid DESC
+ LIMIT 1
+ """)
+ assert cursor.rowcount == 1
+ return cursor.fetchone()
+
+ def add_transaction(self, cursor, tid, username, description, extension,
+ packed=False):
+ """Add a transaction."""
+ stmt = """
+ INSERT INTO transaction
+ (tid, packed, username, description, extension)
+ VALUES (%s, %s, %s, %s, decode(%s, 'base64'))
+ """
+ cursor.execute(stmt, (
+ tid, packed, username, description, encodestring(extension)))
+
+ def detect_conflict(self, cursor):
+ """Find one conflict in the data about to be committed.
+
+ If there is a conflict, returns (oid, prev_tid, attempted_prev_tid,
+ attempted_data). If there is no conflict, returns None.
+ """
+ stmt = """
+ SELECT temp_store.zoid, current_object.tid, temp_store.prev_tid,
+ encode(temp_store.state, 'base64')
+ FROM temp_store
+ JOIN current_object ON (temp_store.zoid = current_object.zoid)
+ WHERE temp_store.prev_tid != current_object.tid
+ LIMIT 1
+ """
+ cursor.execute(stmt)
+ if cursor.rowcount:
+ oid, prev_tid, attempted_prev_tid, data = cursor.fetchone()
+ return oid, prev_tid, attempted_prev_tid, decodestring(data)
+ return None
+
+ def move_from_temp(self, cursor, tid):
+ """Moved the temporarily stored objects to permanent storage.
+
+ Returns the list of oids stored.
+ """
+ stmt = """
+ INSERT INTO object_state (zoid, tid, prev_tid, md5, state)
+ SELECT zoid, %s, prev_tid, md5, state
+ FROM temp_store
+ """
+ cursor.execute(stmt, (tid,))
+
+ stmt = """
+ SELECT zoid FROM temp_store
+ """
+ cursor.execute(stmt)
+ return [oid for (oid,) in cursor]
+
+ def update_current(self, cursor, tid):
+ """Update the current object pointers.
+
+ tid is the integer tid of the transaction being committed.
+ """
+ cursor.execute("""
+ -- Insert objects created in this transaction into current_object.
+ INSERT INTO current_object (zoid, tid)
+ SELECT zoid, tid FROM object_state
+ WHERE tid = %(tid)s
+ AND prev_tid = 0;
+
+ -- Change existing objects. To avoid deadlocks,
+ -- update in OID order.
+ UPDATE current_object SET tid = %(tid)s
+ WHERE zoid IN (
+ SELECT zoid FROM object_state
+ WHERE tid = %(tid)s
+ AND prev_tid != 0
+ ORDER BY zoid
+ )
+ """, {'tid': tid})
+
+ def set_min_oid(self, cursor, oid):
+ """Ensure the next OID is at least the given OID."""
+ cursor.execute("""
+ SELECT CASE WHEN %s > nextval('zoid_seq')
+ THEN setval('zoid_seq', %s)
+ ELSE 0
+ END
+ """, (oid, oid))
+
+ def commit_phase1(self, cursor, tid):
+ """Begin a commit. Returns the transaction name.
+
+ This method should guarantee that commit_phase2() will succeed,
+ meaning that if commit_phase2() would raise any error, the error
+ should be raised in commit_phase1() instead.
+ """
+ if self._twophase:
+ txn = 'T%d' % tid
+ stmt = """
+ DROP TABLE temp_store;
+ PREPARE TRANSACTION %s
+ """
+ cursor.execute(stmt, (txn,))
+ cursor.connection.set_isolation_level(
+ psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
+ return txn
+ else:
+ return '-'
+
+ def commit_phase2(self, cursor, txn):
+ """Final transaction commit."""
+ if self._twophase:
+ cursor.execute('COMMIT PREPARED %s', (txn,))
+ else:
+ cursor.connection.commit()
+
+ def abort(self, cursor, txn=None):
+ """Abort the commit. If txn is not None, phase 1 is also aborted."""
+ if self._twophase:
+ if txn is not None:
+ cursor.execute('ROLLBACK PREPARED %s', (txn,))
+ else:
+ cursor.connection.rollback()
+
+ def new_oid(self, cursor):
+ """Return a new, unused OID."""
+ stmt = "SELECT NEXTVAL('zoid_seq')"
+ cursor.execute(stmt)
+ return cursor.fetchone()[0]
+
+
+ def hold_pack_lock(self, cursor):
+ """Try to acquire the pack lock.
+
+ Raise an exception if packing or undo is already in progress.
+ """
+ stmt = """
+ LOCK pack_lock IN EXCLUSIVE MODE NOWAIT
+ """
+ try:
+ cursor.execute(stmt)
+ except psycopg2.DatabaseError:
+ raise StorageError('A pack or undo operation is in progress')
+
+ def release_pack_lock(self, cursor):
+ """Release the pack lock."""
+ # No action needed
+ pass
+
+
+ def poll_invalidations(self, conn, cursor, prev_polled_tid, ignore_tid):
+ """Polls for new transactions.
+
+ conn and cursor must have been created previously by open_for_load().
+ prev_polled_tid is the tid returned at the last poll, or None
+ if this is the first poll. If ignore_tid is not None, changes
+ committed in that transaction will not be included in the list
+ of changed OIDs.
+
+ Returns (changed_oids, new_polled_tid). Raises StorageError
+ if the database has disconnected.
+ """
+ try:
+ # find out the tid of the most recent transaction.
+ cursor.execute("EXECUTE get_latest_tid")
+ # Expect the transaction table to always have at least one row.
+ assert cursor.rowcount == 1
+ new_polled_tid = cursor.fetchone()[0]
+
+ if prev_polled_tid is None:
+ # This is the first time the connection has polled.
+ return None, new_polled_tid
+
+ if new_polled_tid == prev_polled_tid:
+ # No transactions have been committed since prev_polled_tid.
+ return (), new_polled_tid
+
+ stmt = "SELECT 1 FROM transaction WHERE tid = %s"
+ cursor.execute(stmt, (prev_polled_tid,))
+ if not cursor.rowcount:
+ # Transaction not found; perhaps it has been packed.
+ # The connection cache needs to be cleared.
+ return None, new_polled_tid
+
+ # Get the list of changed OIDs and return it.
+ stmt = """
+ SELECT DISTINCT zoid
+ FROM object_state
+ JOIN transaction USING (tid)
+ WHERE tid > %s
+ """
+ if ignore_tid is not None:
+ stmt += " AND tid != %d" % ignore_tid
+ cursor.execute(stmt, (prev_polled_tid,))
+ oids = [oid for (oid,) in cursor]
+
+ return oids, new_polled_tid
+
+ except (psycopg2.OperationalError, psycopg2.InterfaceError):
+ raise StorageError("database disconnected")
+
Deleted: relstorage/tags/1.0-beta1/relstorage/component.xml
===================================================================
--- relstorage/trunk/relstorage/component.xml 2008-02-20 03:08:20 UTC (rev 84068)
+++ relstorage/tags/1.0-beta1/relstorage/component.xml 2008-02-22 09:40:07 UTC (rev 84133)
@@ -1,151 +0,0 @@
-<?xml version="1.0"?>
-
-<!-- RelStorage configuration via ZConfig -->
-
-<component prefix="relstorage.config">
-
- <import package="ZODB"/>
- <abstracttype name="relstorage.adapter"/>
-
- <sectiontype name="relstorage" implements="ZODB.storage"
- datatype=".RelStorageFactory">
- <section type="relstorage.adapter" name="*" attribute="adapter"/>
- <key name="name" datatype="string" required="no"/>
- <key name="create" datatype="boolean" default="true">
- <description>
- Flag that indicates whether the storage should be initialized if
- it does not already exist.
- </description>
- </key>
- <key name="read-only" datatype="boolean" default="false">
- <description>
- If true, only reads may be executed against the storage. Note
- that the "pack" operation is not considered a write operation
- and is still allowed on a read-only filestorage.
- </description>
- </key>
- <key name="poll-interval" datatype="float" required="no">
- <description>
- Defer polling the database for the specified maximum time interval.
- Set to 0 (the default) to always poll. Fractional seconds are
- allowed.
-
- Use this to lighten the database load on servers with high read
- volume and low write volume. A setting of 1-5 seconds is sufficient
- for most systems.
-
- While this setting should not affect database integrity,
- it increases the probability of basing transactions on stale data,
- leading to conflicts. Thus a nonzero setting can hurt
- the performance of servers with high write volume.
- </description>
- </key>
- </sectiontype>
-
- <sectiontype name="postgresql" implements="relstorage.adapter"
- datatype=".PostgreSQLAdapterFactory">
- <key name="dsn" datatype="string" required="no" default="">
- <description>
- The PostgreSQL data source name. For example:
-
- dsn dbname='template1' user='user' host='localhost' password='pass'
-
- If dsn is omitted, the adapter will connect to a local database with
- no password. Both the user and database name will match the
- name of the owner of the current process.
- </description>
- </key>
- </sectiontype>
-
- <sectiontype name="oracle" implements="relstorage.adapter"
- datatype=".OracleAdapterFactory">
- <key name="user" datatype="string" required="yes">
- <description>
- The Oracle account name
- </description>
- </key>
- <key name="password" datatype="string" required="yes">
- <description>
- The Oracle account password
- </description>
- </key>
- <key name="dsn" datatype="string" required="yes">
- <description>
- The Oracle data source name. The Oracle client library will
- normally expect to find the DSN in /etc/oratab.
- </description>
- </key>
- </sectiontype>
-
- <sectiontype name="mysql" implements="relstorage.adapter"
- datatype=".MySQLAdapterFactory">
-
- <key name="host" datatype="string" required="no">
- <description>
- host to connect
- </description>
- </key>
-
- <key name="user" datatype="string" required="no">
- <description>
- user to connect as
- </description>
- </key>
-
- <key name="passwd" datatype="string" required="no">
- <description>
- password to use
- </description>
- </key>
-
- <key name="db" datatype="string" required="no">
- <description>
- database to use
- </description>
- </key>
-
- <key name="port" datatype="integer" required="no">
- <description>
- TCP/IP port to connect to
- </description>
- </key>
-
- <key name="unix_socket" datatype="string" required="no">
- <description>
- location of unix_socket (UNIX-ish only)
- </description>
- </key>
-
- <key name="connect_timeout" datatype="integer" required="no">
- <description>
- number of seconds to wait before the connection attempt fails.
- </description>
- </key>
-
- <key name="compress" datatype="boolean" required="no">
- <description>
- if set, gzip compression is enabled
- </description>
- </key>
-
- <key name="named_pipe" datatype="boolean" required="no">
- <description>
- if set, connect to server via named pipe (Windows only)
- </description>
- </key>
-
- <key name="read_default_file" datatype="string" required="no">
- <description>
- see the MySQL documentation for mysql_options()
- </description>
- </key>
-
- <key name="read_default_group" datatype="string" required="no">
- <description>
- see the MySQL documentation for mysql_options()
- </description>
- </key>
-
- </sectiontype>
-
-</component>
Copied: relstorage/tags/1.0-beta1/relstorage/component.xml (from rev 84129, relstorage/trunk/relstorage/component.xml)
===================================================================
--- relstorage/tags/1.0-beta1/relstorage/component.xml (rev 0)
+++ relstorage/tags/1.0-beta1/relstorage/component.xml 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,166 @@
+<?xml version="1.0"?>
+
+<!-- RelStorage configuration via ZConfig -->
+
+<component prefix="relstorage.config">
+
+ <import package="ZODB"/>
+ <abstracttype name="relstorage.adapter"/>
+
+ <sectiontype name="relstorage" implements="ZODB.storage"
+ datatype=".RelStorageFactory">
+ <section type="relstorage.adapter" name="*" attribute="adapter"/>
+ <key name="name" datatype="string" required="no"/>
+ <key name="create" datatype="boolean" default="true">
+ <description>
+ Flag that indicates whether the storage should be initialized if
+ it does not already exist.
+ </description>
+ </key>
+ <key name="read-only" datatype="boolean" default="false">
+ <description>
+ If true, only reads may be executed against the storage. Note
+ that the "pack" operation is not considered a write operation
+ and is still allowed on a read-only filestorage.
+ </description>
+ </key>
+ <key name="poll-interval" datatype="float" required="no">
+ <description>
+ Defer polling the database for the specified maximum time interval.
+ Set to 0 (the default) to always poll. Fractional seconds are
+ allowed.
+
+ Use this to lighten the database load on servers with high read
+ volume and low write volume. A setting of 1-5 seconds is sufficient
+ for most systems.
+
+ While this setting should not affect database integrity,
+ it increases the probability of basing transactions on stale data,
+ leading to conflicts. Thus a nonzero setting can hurt
+ the performance of servers with high write volume.
+ </description>
+ </key>
+ <key name="pack-gc" datatype="boolean" default="true">
+ <description>
+ If pack-gc is false, pack operations do not perform
+ garbage collection. Garbage collection is enabled by default.
+
+ If garbage collection is disabled, pack operations keep at least one
+ revision of every object. With garbage collection disabled, the
+ pack code does not need to follow object references, making
+ packing conceivably much faster. However, some of that benefit
+ may be lost due to an ever increasing number of unused objects.
+
+ Disabling garbage collection is also a hack that ensures
+ inter-database references never break.
+ </description>
+ </key>
+ </sectiontype>
+
+ <sectiontype name="postgresql" implements="relstorage.adapter"
+ datatype=".PostgreSQLAdapterFactory">
+ <key name="dsn" datatype="string" required="no" default="">
+ <description>
+ The PostgreSQL data source name. For example:
+
+ dsn dbname='template1' user='user' host='localhost' password='pass'
+
+ If dsn is omitted, the adapter will connect to a local database with
+ no password. Both the user and database name will match the
+ name of the owner of the current process.
+ </description>
+ </key>
+ </sectiontype>
+
+ <sectiontype name="oracle" implements="relstorage.adapter"
+ datatype=".OracleAdapterFactory">
+ <key name="user" datatype="string" required="yes">
+ <description>
+ The Oracle account name
+ </description>
+ </key>
+ <key name="password" datatype="string" required="yes">
+ <description>
+ The Oracle account password
+ </description>
+ </key>
+ <key name="dsn" datatype="string" required="yes">
+ <description>
+ The Oracle data source name. The Oracle client library will
+ normally expect to find the DSN in /etc/oratab.
+ </description>
+ </key>
+ </sectiontype>
+
+ <sectiontype name="mysql" implements="relstorage.adapter"
+ datatype=".MySQLAdapterFactory">
+
+ <key name="host" datatype="string" required="no">
+ <description>
+ host to connect
+ </description>
+ </key>
+
+ <key name="user" datatype="string" required="no">
+ <description>
+ user to connect as
+ </description>
+ </key>
+
+ <key name="passwd" datatype="string" required="no">
+ <description>
+ password to use
+ </description>
+ </key>
+
+ <key name="db" datatype="string" required="no">
+ <description>
+ database to use
+ </description>
+ </key>
+
+ <key name="port" datatype="integer" required="no">
+ <description>
+ TCP/IP port to connect to
+ </description>
+ </key>
+
+ <key name="unix_socket" datatype="string" required="no">
+ <description>
+ location of unix_socket (UNIX-ish only)
+ </description>
+ </key>
+
+ <key name="connect_timeout" datatype="integer" required="no">
+ <description>
+ number of seconds to wait before the connection attempt fails.
+ </description>
+ </key>
+
+ <key name="compress" datatype="boolean" required="no">
+ <description>
+ if set, gzip compression is enabled
+ </description>
+ </key>
+
+ <key name="named_pipe" datatype="boolean" required="no">
+ <description>
+ if set, connect to server via named pipe (Windows only)
+ </description>
+ </key>
+
+ <key name="read_default_file" datatype="string" required="no">
+ <description>
+ see the MySQL documentation for mysql_options()
+ </description>
+ </key>
+
+ <key name="read_default_group" datatype="string" required="no">
+ <description>
+ see the MySQL documentation for mysql_options()
+ </description>
+ </key>
+
+ </sectiontype>
+
+</component>
Deleted: relstorage/tags/1.0-beta1/relstorage/config.py
===================================================================
--- relstorage/trunk/relstorage/config.py 2008-02-20 03:08:20 UTC (rev 84068)
+++ relstorage/tags/1.0-beta1/relstorage/config.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -1,52 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""ZConfig directive implementations for binding RelStorage to Zope"""
-
-from ZODB.config import BaseConfig
-
-from relstorage import RelStorage
-
-
-class RelStorageFactory(BaseConfig):
- """Open a storage configured via ZConfig"""
- def open(self):
- config = self.config
- adapter = config.adapter.open()
- return RelStorage(adapter, name=config.name, create=config.create,
- read_only=config.read_only, poll_interval=config.poll_interval)
-
-
-class PostgreSQLAdapterFactory(BaseConfig):
- def open(self):
- from adapters.postgresql import PostgreSQLAdapter
- return PostgreSQLAdapter(self.config.dsn)
-
-
-class OracleAdapterFactory(BaseConfig):
- def open(self):
- from adapters.oracle import OracleAdapter
- config = self.config
- return OracleAdapter(config.user, config.password, config.dsn)
-
-
-class MySQLAdapterFactory(BaseConfig):
- def open(self):
- from adapters.mysql import MySQLAdapter
- options = {}
- for key in self.config.getSectionAttributes():
- value = getattr(self.config, key)
- if value is not None:
- options[key] = value
- return MySQLAdapter(**options)
-
Copied: relstorage/tags/1.0-beta1/relstorage/config.py (from rev 84129, relstorage/trunk/relstorage/config.py)
===================================================================
--- relstorage/tags/1.0-beta1/relstorage/config.py (rev 0)
+++ relstorage/tags/1.0-beta1/relstorage/config.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,53 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""ZConfig directive implementations for binding RelStorage to Zope"""
+
+from ZODB.config import BaseConfig
+
+from relstorage import RelStorage
+
+
+class RelStorageFactory(BaseConfig):
+ """Open a storage configured via ZConfig"""
+ def open(self):
+ config = self.config
+ adapter = config.adapter.open()
+ return RelStorage(adapter, name=config.name, create=config.create,
+ read_only=config.read_only, poll_interval=config.poll_interval,
+ pack_gc=config.pack_gc)
+
+
+class PostgreSQLAdapterFactory(BaseConfig):
+ def open(self):
+ from adapters.postgresql import PostgreSQLAdapter
+ return PostgreSQLAdapter(self.config.dsn)
+
+
+class OracleAdapterFactory(BaseConfig):
+ def open(self):
+ from adapters.oracle import OracleAdapter
+ config = self.config
+ return OracleAdapter(config.user, config.password, config.dsn)
+
+
+class MySQLAdapterFactory(BaseConfig):
+ def open(self):
+ from adapters.mysql import MySQLAdapter
+ options = {}
+ for key in self.config.getSectionAttributes():
+ value = getattr(self.config, key)
+ if value is not None:
+ options[key] = value
+ return MySQLAdapter(**options)
+
Deleted: relstorage/tags/1.0-beta1/relstorage/relstorage.py
===================================================================
--- relstorage/trunk/relstorage/relstorage.py 2008-02-20 03:08:20 UTC (rev 84068)
+++ relstorage/tags/1.0-beta1/relstorage/relstorage.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -1,899 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""The core of RelStorage, a ZODB storage for relational databases.
-
-Stores pickles in the database.
-"""
-
-import base64
-import cPickle
-import logging
-import md5
-import os
-import time
-import weakref
-from ZODB.utils import p64, u64, z64
-from ZODB.BaseStorage import BaseStorage
-from ZODB import ConflictResolution, POSException
-from persistent.TimeStamp import TimeStamp
-
-log = logging.getLogger("relstorage")
-
-# Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
-# a failure revealed by the ZODB test suite. The test suite often fails
-# to call tpc_abort in the event of an error, leading to deadlocks.
-# The variable causes RelStorage to abort failed transactions
-# early rather than wait for an explicit abort.
-abort_early = os.environ.get('RELSTORAGE_ABORT_EARLY')
-
-
-class RelStorage(BaseStorage,
- ConflictResolution.ConflictResolvingStorage):
- """Storage to a relational database, based on invalidation polling"""
-
- def __init__(self, adapter, name=None, create=True,
- read_only=False, poll_interval=0):
- if name is None:
- name = 'RelStorage on %s' % adapter.__class__.__name__
-
- self._adapter = adapter
- self._name = name
- self._is_read_only = read_only
- self._poll_interval = poll_interval
-
- if create:
- self._adapter.prepare_schema()
-
- # load_conn and load_cursor are open most of the time.
- self._load_conn = None
- self._load_cursor = None
- self._load_transaction_open = False
- self._open_load_connection()
- # store_conn and store_cursor are open during commit,
- # but not necessarily open at other times.
- self._store_conn = None
- self._store_cursor = None
-
- BaseStorage.__init__(self, name)
-
- self._tid = None
- self._ltid = None
-
- # _prepared_txn is the name of the transaction to commit in the
- # second phase.
- self._prepared_txn = None
-
- # _instances is a list of weak references to storage instances bound
- # to the same database.
- self._instances = []
-
- # _closed is True after self.close() is called. Since close()
- # can be called from another thread, access to self._closed should
- # be inside a _lock_acquire()/_lock_release() block.
- self._closed = False
-
- def _open_load_connection(self):
- """Open the load connection to the database. Return nothing."""
- conn, cursor = self._adapter.open_for_load()
- self._drop_load_connection()
- self._load_conn, self._load_cursor = conn, cursor
- self._load_transaction_open = True
-
- def _drop_load_connection(self):
- conn, cursor = self._load_conn, self._load_cursor
- self._load_conn, self._load_cursor = None, None
- self._adapter.close(conn, cursor)
-
- def _drop_store_connection(self):
- conn, cursor = self._store_conn, self._store_cursor
- self._store_conn, self._store_cursor = None, None
- self._adapter.close(conn, cursor)
-
- def _rollback_load_connection(self):
- if self._load_conn is not None:
- self._load_conn.rollback()
- self._load_transaction_open = False
-
- def _start_load(self):
- if self._load_cursor is None:
- self._open_load_connection()
- else:
- self._adapter.restart_load(self._load_cursor)
- self._load_transaction_open = True
-
- def _zap(self):
- """Clear all objects out of the database.
-
- Used by the test suite.
- """
- self._adapter.zap()
- self._rollback_load_connection()
-
- def close(self):
- """Close the connections to the database."""
- self._lock_acquire()
- try:
- self._closed = True
- self._drop_load_connection()
- self._drop_store_connection()
- for wref in self._instances:
- instance = wref()
- if instance is not None:
- instance.close()
- finally:
- self._lock_release()
-
- def bind_connection(self, zodb_conn):
- """Get a connection-bound storage instance.
-
- Connections have their own storage instances so that
- the database can provide the MVCC semantics rather than ZODB.
- """
- res = BoundRelStorage(self, zodb_conn)
- self._instances.append(weakref.ref(res))
- return res
-
- def connection_closing(self):
- """Release resources."""
- self._rollback_load_connection()
-
- def __len__(self):
- return self._adapter.get_object_count()
-
- def getSize(self):
- """Return database size in bytes"""
- return self._adapter.get_db_size()
-
- def load(self, oid, version):
- self._lock_acquire()
- try:
- if not self._load_transaction_open:
- self._start_load()
- cursor = self._load_cursor
- state, tid_int = self._adapter.load_current(cursor, u64(oid))
- finally:
- self._lock_release()
- if tid_int is not None:
- if state:
- state = str(state)
- if not state:
- # This can happen if something attempts to load
- # an object whose creation has been undone.
- raise KeyError(oid)
- return state, p64(tid_int)
- else:
- raise KeyError(oid)
-
- def loadEx(self, oid, version):
- # Since we don't support versions, just tack the empty version
- # string onto load's result.
- return self.load(oid, version) + ("",)
-
- def loadSerial(self, oid, serial):
- """Load a specific revision of an object"""
- self._lock_acquire()
- try:
- if self._store_cursor is not None:
- # Allow loading data from later transactions
- # for conflict resolution.
- cursor = self._store_cursor
- else:
- if not self._load_transaction_open:
- self._start_load()
- cursor = self._load_cursor
- state = self._adapter.load_revision(cursor, u64(oid), u64(serial))
- if state is not None:
- state = str(state)
- if not state:
- raise POSKeyError(oid)
- return state
- else:
- raise KeyError(oid)
- finally:
- self._lock_release()
-
- def loadBefore(self, oid, tid):
- """Return the most recent revision of oid before tid committed."""
- oid_int = u64(oid)
-
- self._lock_acquire()
- try:
- if self._store_cursor is not None:
- # Allow loading data from later transactions
- # for conflict resolution.
- cursor = self._store_cursor
- else:
- if not self._load_transaction_open:
- self._start_load()
- cursor = self._load_cursor
- if not self._adapter.exists(cursor, u64(oid)):
- raise KeyError(oid)
-
- state, start_tid = self._adapter.load_before(
- cursor, oid_int, u64(tid))
- if start_tid is not None:
- end_int = self._adapter.get_object_tid_after(
- cursor, oid_int, start_tid)
- if end_int is not None:
- end = p64(end_int)
- else:
- end = None
- if state is not None:
- state = str(state)
- return state, p64(start_tid), end
- else:
- return None
- finally:
- self._lock_release()
-
-
- def store(self, oid, serial, data, version, transaction):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- if transaction is not self._transaction:
- raise POSException.StorageTransactionError(self, transaction)
- if version:
- raise POSException.Unsupported("Versions aren't supported")
-
- # If self._prepared_txn is not None, that means something is
- # attempting to store objects after the vote phase has finished.
- # That should not happen, should it?
- assert self._prepared_txn is None
- md5sum = md5.new(data).hexdigest()
-
- adapter = self._adapter
- cursor = self._store_cursor
- assert cursor is not None
- oid_int = u64(oid)
- if serial:
- prev_tid_int = u64(serial)
- else:
- prev_tid_int = 0
-
- self._lock_acquire()
- try:
- # save the data in a temporary table
- adapter.store_temp(cursor, oid_int, prev_tid_int, md5sum, data)
- return None
- finally:
- self._lock_release()
-
-
- def restore(self, oid, serial, data, version, prev_txn, transaction):
- # Like store(), but used for importing transactions. See the
- # comments in FileStorage.restore(). The prev_txn optimization
- # is not used.
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- if transaction is not self._transaction:
- raise POSException.StorageTransactionError(self, transaction)
- if version:
- raise POSException.Unsupported("Versions aren't supported")
-
- assert self._tid is not None
- assert self._prepared_txn is None
- if data is not None:
- md5sum = md5.new(data).hexdigest()
- else:
- # George Bailey object
- md5sum = None
-
- adapter = self._adapter
- cursor = self._store_cursor
- assert cursor is not None
- oid_int = u64(oid)
- tid_int = u64(serial)
-
- self._lock_acquire()
- try:
- # save the data. Note that md5sum and data can be None.
- adapter.restore(cursor, oid_int, tid_int, md5sum, data)
- finally:
- self._lock_release()
-
-
- def tpc_begin(self, transaction, tid=None, status=' '):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- self._lock_acquire()
- try:
- if self._transaction is transaction:
- return
- self._lock_release()
- self._commit_lock_acquire()
- self._lock_acquire()
- self._transaction = transaction
- self._clear_temp()
-
- user = transaction.user
- desc = transaction.description
- ext = transaction._extension
- if ext:
- ext = cPickle.dumps(ext, 1)
- else:
- ext = ""
- self._ude = user, desc, ext
- self._tstatus = status
-
- adapter = self._adapter
- cursor = self._store_cursor
- if cursor is not None:
- # Store cursor is still open, so try to use it again.
- try:
- adapter.restart_store(cursor)
- except POSException.StorageError:
- cursor = None
- log.exception("Store connection failed; retrying")
- self._drop_store_connection()
- if cursor is None:
- conn, cursor = adapter.open_for_store()
- self._store_conn, self._store_cursor = conn, cursor
-
- if tid is not None:
- # get the commit lock and add the transaction now
- packed = (status == 'p')
- adapter.start_commit(cursor)
- tid_int = u64(tid)
- try:
- adapter.add_transaction(
- cursor, tid_int, user, desc, ext, packed)
- except:
- self._drop_store_connection()
- raise
- # else choose the tid later
- self._tid = tid
-
- finally:
- self._lock_release()
-
- def _prepare_tid(self):
- """Choose a tid for the current transaction.
-
- This should be done as late in the commit as possible, since
- it must hold an exclusive commit lock.
- """
- if self._tid is not None:
- return
- if self._transaction is None:
- raise POSException.StorageError("No transaction in progress")
-
- adapter = self._adapter
- cursor = self._store_cursor
- adapter.start_commit(cursor)
- user, desc, ext = self._ude
-
- # Choose a transaction ID.
- # Base the transaction ID on the database time,
- # while ensuring that the tid of this transaction
- # is greater than any existing tid.
- last_tid, now = adapter.get_tid_and_time(cursor)
- stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
- stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
- tid = repr(stamp)
-
- tid_int = u64(tid)
- adapter.add_transaction(cursor, tid_int, user, desc, ext)
- self._tid = tid
-
-
- def _clear_temp(self):
- # It is assumed that self._lock_acquire was called before this
- # method was called.
- self._prepared_txn = None
-
-
- def _finish_store(self):
- """Move stored objects from the temporary table to final storage.
-
- Returns a list of (oid, tid) to be received by
- Connection._handle_serial().
- """
- assert self._tid is not None
- cursor = self._store_cursor
- adapter = self._adapter
-
- # Detect conflicting changes.
- # Try to resolve the conflicts.
- resolved = set() # a set of OIDs
- while True:
- conflict = adapter.detect_conflict(cursor)
- if conflict is None:
- break
-
- oid_int, prev_tid_int, serial_int, data = conflict
- oid = p64(oid_int)
- prev_tid = p64(prev_tid_int)
- serial = p64(serial_int)
-
- rdata = self.tryToResolveConflict(oid, prev_tid, serial, data)
- if rdata is None:
- # unresolvable; kill the whole transaction
- raise POSException.ConflictError(
- oid=oid, serials=(prev_tid, serial), data=data)
- else:
- # resolved
- data = rdata
- md5sum = md5.new(data).hexdigest()
- self._adapter.replace_temp(
- cursor, oid_int, prev_tid_int, md5sum, data)
- resolved.add(oid)
-
- # Move the new states into the permanent table
- tid_int = u64(self._tid)
- serials = []
- oid_ints = adapter.move_from_temp(cursor, tid_int)
- for oid_int in oid_ints:
- oid = p64(oid_int)
- if oid in resolved:
- serial = ConflictResolution.ResolvedSerial
- else:
- serial = self._tid
- serials.append((oid, serial))
-
- return serials
-
-
- def _vote(self):
- """Prepare the transaction for final commit."""
- # This method initiates a two-phase commit process,
- # saving the name of the prepared transaction in self._prepared_txn.
-
- # It is assumed that self._lock_acquire was called before this
- # method was called.
-
- if self._prepared_txn is not None:
- # the vote phase has already completed
- return
-
- self._prepare_tid()
- tid_int = u64(self._tid)
- cursor = self._store_cursor
- assert cursor is not None
-
- serials = self._finish_store()
- self._adapter.update_current(cursor, tid_int)
- self._prepared_txn = self._adapter.commit_phase1(cursor, tid_int)
-
- return serials
-
-
- def tpc_vote(self, transaction):
- self._lock_acquire()
- try:
- if transaction is not self._transaction:
- return
- try:
- return self._vote()
- except:
- if abort_early:
- # abort early to avoid lockups while running the
- # somewhat brittle ZODB test suite
- self.tpc_abort(transaction)
- raise
- finally:
- self._lock_release()
-
-
- def _finish(self, tid, user, desc, ext):
- """Commit the transaction."""
- # It is assumed that self._lock_acquire was called before this
- # method was called.
- assert self._tid is not None
- self._rollback_load_connection()
- txn = self._prepared_txn
- assert txn is not None
- self._adapter.commit_phase2(self._store_cursor, txn)
- self._prepared_txn = None
- self._ltid = self._tid
- self._tid = None
-
- def _abort(self):
- # the lock is held here
- self._rollback_load_connection()
- if self._store_cursor is not None:
- self._adapter.abort(self._store_cursor, self._prepared_txn)
- self._prepared_txn = None
- self._tid = None
-
- def lastTransaction(self):
- return self._ltid
-
- def new_oid(self):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- self._lock_acquire()
- try:
- cursor = self._load_cursor
- if cursor is None:
- self._open_load_connection()
- cursor = self._load_cursor
- oid_int = self._adapter.new_oid(cursor)
- return p64(oid_int)
- finally:
- self._lock_release()
-
- def cleanup(self):
- pass
-
- def supportsVersions(self):
- return False
-
- def modifiedInVersion(self, oid):
- return ''
-
- def supportsUndo(self):
- return True
-
- def supportsTransactionalUndo(self):
- return True
-
- def undoLog(self, first=0, last=-20, filter=None):
- if last < 0:
- last = first - last
-
- # use a private connection to ensure the most current results
- adapter = self._adapter
- conn, cursor = adapter.open()
- try:
- rows = adapter.iter_transactions(cursor)
- i = 0
- res = []
- for tid_int, user, desc, ext in rows:
- tid = p64(tid_int)
- d = {'id': base64.encodestring(tid)[:-1],
- 'time': TimeStamp(tid).timeTime(),
- 'user_name': user or '',
- 'description': desc or ''}
- if ext:
- ext = str(ext)
- if ext:
- d.update(cPickle.loads(ext))
- if filter is None or filter(d):
- if i >= first:
- res.append(d)
- i += 1
- if i >= last:
- break
- return res
-
- finally:
- adapter.close(conn, cursor)
-
- def history(self, oid, version=None, size=1, filter=None):
- self._lock_acquire()
- try:
- cursor = self._load_cursor
- oid_int = u64(oid)
- try:
- rows = self._adapter.iter_object_history(cursor, oid_int)
- except KeyError:
- raise KeyError(oid)
-
- res = []
- for tid_int, username, description, extension, length in rows:
- tid = p64(tid_int)
- if extension:
- d = loads(extension)
- else:
- d = {}
- d.update({"time": TimeStamp(tid).timeTime(),
- "user_name": username or '',
- "description": description or '',
- "tid": tid,
- "version": '',
- "size": length,
- })
- if filter is None or filter(d):
- res.append(d)
- if size is not None and len(res) >= size:
- break
- return res
- finally:
- self._lock_release()
-
-
- def undo(self, transaction_id, transaction):
- """Undo a transaction identified by transaction_id.
-
- transaction_id is the base 64 encoding of an 8 byte tid.
- Undo by writing new data that reverses the action taken by
- the transaction.
- """
-
- if self._is_read_only:
- raise POSException.ReadOnlyError()
- if transaction is not self._transaction:
- raise POSException.StorageTransactionError(self, transaction)
-
- undo_tid = base64.decodestring(transaction_id + '\n')
- assert len(undo_tid) == 8
- undo_tid_int = u64(undo_tid)
-
- self._lock_acquire()
- try:
- adapter = self._adapter
- cursor = self._store_cursor
- assert cursor is not None
-
- adapter.hold_pack_lock(cursor)
- try:
- # Note that _prepare_tid acquires the commit lock.
- # The commit lock must be acquired after the pack lock
- # because the database adapters also acquire in that
- # order during packing.
- self._prepare_tid()
- adapter.verify_undoable(cursor, undo_tid_int)
-
- self_tid_int = u64(self._tid)
- oid_ints = adapter.undo(cursor, undo_tid_int, self_tid_int)
- oids = [p64(oid_int) for oid_int in oid_ints]
-
- # Update the current object pointers immediately, so that
- # subsequent undo operations within this transaction will see
- # the new current objects.
- adapter.update_current(cursor, self_tid_int)
-
- return self._tid, oids
- finally:
- adapter.release_pack_lock(cursor)
- finally:
- self._lock_release()
-
-
- def pack(self, t, referencesf):
- if self._is_read_only:
- raise POSException.ReadOnlyError()
-
- pack_point = repr(TimeStamp(*time.gmtime(t)[:5]+(t%60,)))
- pack_point_int = u64(pack_point)
-
- def get_references(state):
- """Return the set of OIDs the given state refers to."""
- refs = set()
- if state:
- for oid in referencesf(str(state)):
- refs.add(u64(oid))
- return refs
-
- # Use a private connection (lock_conn and lock_cursor) to
- # hold the pack lock. Have the adapter open temporary
- # connections to do the actual work, allowing the adapter
- # to use special transaction modes for packing.
- adapter = self._adapter
- lock_conn, lock_cursor = adapter.open()
- try:
- adapter.hold_pack_lock(lock_cursor)
- try:
- # Find the latest commit before or at the pack time.
- tid_int = adapter.choose_pack_transaction(pack_point_int)
- if tid_int is None:
- # Nothing needs to be packed.
- return
-
- # In pre_pack, the adapter fills tables with
- # information about what to pack. The adapter
- # should not actually pack anything yet.
- adapter.pre_pack(tid_int, get_references)
-
- # Now pack.
- adapter.pack(tid_int)
- self._after_pack()
- finally:
- adapter.release_pack_lock(lock_cursor)
- finally:
- lock_conn.rollback()
- adapter.close(lock_conn, lock_cursor)
-
-
- def _after_pack(self):
- """Reset the transaction state after packing."""
- # The tests depend on this.
- self._rollback_load_connection()
-
- def iterator(self, start=None, stop=None):
- return TransactionIterator(self._adapter, start, stop)
-
-
-class BoundRelStorage(RelStorage):
- """Storage to a database, bound to a particular ZODB.Connection."""
-
- # The propagate_invalidations flag, set to a false value, tells
- # the Connection not to propagate object invalidations across
- # connections, since that ZODB feature is detrimental when the
- # storage provides its own MVCC.
- propagate_invalidations = False
-
- def __init__(self, parent, zodb_conn):
- # self._zodb_conn = zodb_conn
- RelStorage.__init__(self, adapter=parent._adapter, name=parent._name,
- create=False, read_only=parent._is_read_only,
- poll_interval=parent._poll_interval)
- # _prev_polled_tid contains the tid at the previous poll
- self._prev_polled_tid = None
- self._showed_disconnect = False
- self._poll_at = 0
-
- def connection_closing(self):
- """Release resources."""
- if not self._poll_interval:
- self._rollback_load_connection()
- # else keep the load transaction open so that it's possible
- # to ignore the next poll.
-
- def sync(self):
- """Process pending invalidations regardless of poll interval"""
- self._lock_acquire()
- try:
- if self._load_transaction_open:
- self._rollback_load_connection()
- finally:
- self._lock_release()
-
- def poll_invalidations(self, retry=True):
- """Looks for OIDs of objects that changed since _prev_polled_tid
-
- Returns {oid: 1}, or None if all objects need to be invalidated
- because prev_polled_tid is not in the database (presumably it
- has been packed).
- """
- self._lock_acquire()
- try:
- if self._closed:
- return {}
-
- if self._poll_interval:
- now = time.time()
- if self._load_transaction_open and now < self._poll_at:
- # It's not yet time to poll again. The previous load
- # transaction is still open, so it's safe to
- # ignore this poll.
- return {}
- # else poll now after resetting the timeout
- self._poll_at = now + self._poll_interval
-
- try:
- self._rollback_load_connection()
- self._start_load()
- conn = self._load_conn
- cursor = self._load_cursor
-
- # Ignore changes made by the last transaction committed
- # by this connection.
- if self._ltid is not None:
- ignore_tid = u64(self._ltid)
- else:
- ignore_tid = None
-
- # get a list of changed OIDs and the most recent tid
- oid_ints, new_polled_tid = self._adapter.poll_invalidations(
- conn, cursor, self._prev_polled_tid, ignore_tid)
- self._prev_polled_tid = new_polled_tid
-
- if oid_ints is None:
- oids = None
- else:
- oids = {}
- for oid_int in oid_ints:
- oids[p64(oid_int)] = 1
- return oids
- except POSException.StorageError:
- # disconnected
- self._poll_at = 0
- if not retry:
- raise
- if not self._showed_disconnect:
- log.warning("Lost connection in %s", repr(self))
- self._showed_disconnect = True
- self._open_load_connection()
- log.info("Reconnected in %s", repr(self))
- self._showed_disconnect = False
- return self.poll_invalidations(retry=False)
- finally:
- self._lock_release()
-
- def _after_pack(self):
- # Override transaction reset after packing. If the connection
- # wants to see the new state, it should call sync().
- pass
-
-
-class TransactionIterator(object):
- """Iterate over the transactions in a RelStorage instance."""
-
- def __init__(self, adapter, start, stop):
- self._adapter = adapter
- self._conn, self._cursor = self._adapter.open_for_load()
- self._closed = False
-
- if start is not None:
- start_int = u64(start)
- else:
- start_int = 1
- if stop is not None:
- stop_int = u64(stop)
- else:
- stop_int = None
-
- # _transactions: [(tid, packed, username, description, extension)]
- self._transactions = list(adapter.iter_transactions_range(
- self._cursor, start_int, stop_int))
- self._index = 0
-
- def close(self):
- self._adapter.close(self._conn, self._cursor)
- self._closed = True
-
- def iterator(self):
- return self
-
- def __len__(self):
- return len(self._transactions)
-
- def __getitem__(self, n):
- self._index = n
- return self.next()
-
- def next(self):
- if self._closed:
- raise IOError("TransactionIterator already closed")
- params = self._transactions[self._index]
- res = RecordIterator(self, *params)
- self._index += 1
- return res
-
-
-class RecordIterator(object):
- """Iterate over the objects in a transaction."""
- def __init__(self, trans_iter, tid_int, packed, user, desc, ext):
- self.tid = p64(tid_int)
- self.status = packed and 'p' or ' '
- self.user = user or ''
- self.description = desc or ''
- if ext:
- self._extension = cPickle.loads(ext)
- else:
- self._extension = {}
-
- cursor = trans_iter._cursor
- adapter = trans_iter._adapter
- self._records = list(adapter.iter_objects(cursor, tid_int))
- self._index = 0
-
- def __len__(self):
- return len(self._records)
-
- def __getitem__(self, n):
- self._index = n
- return self.next()
-
- def next(self):
- params = self._records[self._index]
- res = Record(self.tid, *params)
- self._index += 1
- return res
-
-
-class Record(object):
- """An object state in a transaction"""
- version = ''
- data_txn = None
-
- def __init__(self, tid, oid_int, data):
- self.tid = tid
- self.oid = p64(oid_int)
- if data is not None:
- self.data = str(data)
- else:
- self.data = None
-
Copied: relstorage/tags/1.0-beta1/relstorage/relstorage.py (from rev 84129, relstorage/trunk/relstorage/relstorage.py)
===================================================================
--- relstorage/tags/1.0-beta1/relstorage/relstorage.py (rev 0)
+++ relstorage/tags/1.0-beta1/relstorage/relstorage.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,932 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""The core of RelStorage, a ZODB storage for relational databases.
+
+Stores pickles in the database.
+"""
+
+import base64
+import cPickle
+import logging
+import md5
+import os
+import time
+import weakref
+from ZODB.utils import p64, u64, z64
+from ZODB.BaseStorage import BaseStorage
+from ZODB import ConflictResolution, POSException
+from persistent.TimeStamp import TimeStamp
+
+log = logging.getLogger("relstorage")
+
+# Set the RELSTORAGE_ABORT_EARLY environment variable when debugging
+# a failure revealed by the ZODB test suite. The test suite often fails
+# to call tpc_abort in the event of an error, leading to deadlocks.
+# The variable causes RelStorage to abort failed transactions
+# early rather than wait for an explicit abort.
+abort_early = os.environ.get('RELSTORAGE_ABORT_EARLY')
+
+
+class RelStorage(BaseStorage,
+ ConflictResolution.ConflictResolvingStorage):
+ """Storage to a relational database, based on invalidation polling"""
+
+ def __init__(self, adapter, name=None, create=True,
+ read_only=False, poll_interval=0, pack_gc=True):
+ if name is None:
+ name = 'RelStorage on %s' % adapter.__class__.__name__
+
+ self._adapter = adapter
+ self._name = name
+ self._is_read_only = read_only
+ self._poll_interval = poll_interval
+ self._pack_gc = pack_gc
+
+ if create:
+ self._adapter.prepare_schema()
+
+ # load_conn and load_cursor are open most of the time.
+ self._load_conn = None
+ self._load_cursor = None
+ self._load_transaction_open = False
+ self._open_load_connection()
+ # store_conn and store_cursor are open during commit,
+ # but not necessarily open at other times.
+ self._store_conn = None
+ self._store_cursor = None
+
+ BaseStorage.__init__(self, name)
+
+ self._tid = None
+ self._ltid = None
+
+ # _prepared_txn is the name of the transaction to commit in the
+ # second phase.
+ self._prepared_txn = None
+
+ # _instances is a list of weak references to storage instances bound
+ # to the same database.
+ self._instances = []
+
+ # _closed is True after self.close() is called. Since close()
+ # can be called from another thread, access to self._closed should
+ # be inside a _lock_acquire()/_lock_release() block.
+ self._closed = False
+
+ # _max_stored_oid is the highest OID stored by the current
+ # transaction
+ self._max_stored_oid = 0
+
+ # _max_new_oid is the highest OID provided by new_oid()
+ self._max_new_oid = 0
+
+
+ def _open_load_connection(self):
+ """Open the load connection to the database. Return nothing."""
+ conn, cursor = self._adapter.open_for_load()
+ self._drop_load_connection()
+ self._load_conn, self._load_cursor = conn, cursor
+ self._load_transaction_open = True
+
+ def _drop_load_connection(self):
+ conn, cursor = self._load_conn, self._load_cursor
+ self._load_conn, self._load_cursor = None, None
+ self._adapter.close(conn, cursor)
+
+ def _drop_store_connection(self):
+ conn, cursor = self._store_conn, self._store_cursor
+ self._store_conn, self._store_cursor = None, None
+ self._adapter.close(conn, cursor)
+
+ def _rollback_load_connection(self):
+ if self._load_conn is not None:
+ self._load_conn.rollback()
+ self._load_transaction_open = False
+
+ def _start_load(self):
+ if self._load_cursor is None:
+ self._open_load_connection()
+ else:
+ self._adapter.restart_load(self._load_cursor)
+ self._load_transaction_open = True
+
+ def zap_all(self):
+ """Clear all objects and transactions out of the database.
+
+ Used by the test suite and migration scripts.
+ """
+ self._adapter.zap_all()
+ self._rollback_load_connection()
+
+ def close(self):
+ """Close the connections to the database."""
+ self._lock_acquire()
+ try:
+ self._closed = True
+ self._drop_load_connection()
+ self._drop_store_connection()
+ for wref in self._instances:
+ instance = wref()
+ if instance is not None:
+ instance.close()
+ finally:
+ self._lock_release()
+
+ def bind_connection(self, zodb_conn):
+ """Get a connection-bound storage instance.
+
+ Connections have their own storage instances so that
+ the database can provide the MVCC semantics rather than ZODB.
+ """
+ res = BoundRelStorage(self, zodb_conn)
+ self._instances.append(weakref.ref(res))
+ return res
+
+ def connection_closing(self):
+ """Release resources."""
+ self._rollback_load_connection()
+
+ def __len__(self):
+ return self._adapter.get_object_count()
+
+ def getSize(self):
+ """Return database size in bytes"""
+ return self._adapter.get_db_size()
+
+ def load(self, oid, version):
+ self._lock_acquire()
+ try:
+ if not self._load_transaction_open:
+ self._start_load()
+ cursor = self._load_cursor
+ state, tid_int = self._adapter.load_current(cursor, u64(oid))
+ finally:
+ self._lock_release()
+ if tid_int is not None:
+ if state:
+ state = str(state)
+ if not state:
+ # This can happen if something attempts to load
+ # an object whose creation has been undone.
+ raise KeyError(oid)
+ return state, p64(tid_int)
+ else:
+ raise KeyError(oid)
+
+ def loadEx(self, oid, version):
+ # Since we don't support versions, just tack the empty version
+ # string onto load's result.
+ return self.load(oid, version) + ("",)
+
+ def loadSerial(self, oid, serial):
+ """Load a specific revision of an object"""
+ self._lock_acquire()
+ try:
+ if self._store_cursor is not None:
+ # Allow loading data from later transactions
+ # for conflict resolution.
+ cursor = self._store_cursor
+ else:
+ if not self._load_transaction_open:
+ self._start_load()
+ cursor = self._load_cursor
+ state = self._adapter.load_revision(cursor, u64(oid), u64(serial))
+ if state is not None:
+ state = str(state)
+ if not state:
+ raise POSKeyError(oid)
+ return state
+ else:
+ raise KeyError(oid)
+ finally:
+ self._lock_release()
+
+ def loadBefore(self, oid, tid):
+ """Return the most recent revision of oid before tid committed."""
+ oid_int = u64(oid)
+
+ self._lock_acquire()
+ try:
+ if self._store_cursor is not None:
+ # Allow loading data from later transactions
+ # for conflict resolution.
+ cursor = self._store_cursor
+ else:
+ if not self._load_transaction_open:
+ self._start_load()
+ cursor = self._load_cursor
+ if not self._adapter.exists(cursor, u64(oid)):
+ raise KeyError(oid)
+
+ state, start_tid = self._adapter.load_before(
+ cursor, oid_int, u64(tid))
+ if start_tid is not None:
+ end_int = self._adapter.get_object_tid_after(
+ cursor, oid_int, start_tid)
+ if end_int is not None:
+ end = p64(end_int)
+ else:
+ end = None
+ if state is not None:
+ state = str(state)
+ return state, p64(start_tid), end
+ else:
+ return None
+ finally:
+ self._lock_release()
+
+
+ def store(self, oid, serial, data, version, transaction):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ if transaction is not self._transaction:
+ raise POSException.StorageTransactionError(self, transaction)
+ if version:
+ raise POSException.Unsupported("Versions aren't supported")
+
+ # If self._prepared_txn is not None, that means something is
+ # attempting to store objects after the vote phase has finished.
+ # That should not happen, should it?
+ assert self._prepared_txn is None
+ md5sum = md5.new(data).hexdigest()
+
+ adapter = self._adapter
+ cursor = self._store_cursor
+ assert cursor is not None
+ oid_int = u64(oid)
+ if serial:
+ prev_tid_int = u64(serial)
+ else:
+ prev_tid_int = 0
+
+ self._lock_acquire()
+ try:
+ self._max_stored_oid = max(self._max_stored_oid, oid_int)
+ # save the data in a temporary table
+ adapter.store_temp(cursor, oid_int, prev_tid_int, md5sum, data)
+ return None
+ finally:
+ self._lock_release()
+
+
+ def restore(self, oid, serial, data, version, prev_txn, transaction):
+ # Like store(), but used for importing transactions. See the
+ # comments in FileStorage.restore(). The prev_txn optimization
+ # is not used.
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ if transaction is not self._transaction:
+ raise POSException.StorageTransactionError(self, transaction)
+ if version:
+ raise POSException.Unsupported("Versions aren't supported")
+
+ assert self._tid is not None
+ assert self._prepared_txn is None
+ if data is not None:
+ md5sum = md5.new(data).hexdigest()
+ else:
+ # George Bailey object
+ md5sum = None
+
+ adapter = self._adapter
+ cursor = self._store_cursor
+ assert cursor is not None
+ oid_int = u64(oid)
+ tid_int = u64(serial)
+
+ self._lock_acquire()
+ try:
+ self._max_stored_oid = max(self._max_stored_oid, oid_int)
+ # save the data. Note that md5sum and data can be None.
+ adapter.restore(cursor, oid_int, tid_int, md5sum, data)
+ finally:
+ self._lock_release()
+
+
+ def tpc_begin(self, transaction, tid=None, status=' '):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ self._lock_acquire()
+ try:
+ if self._transaction is transaction:
+ return
+ self._lock_release()
+ self._commit_lock_acquire()
+ self._lock_acquire()
+ self._transaction = transaction
+ self._clear_temp()
+
+ user = transaction.user
+ desc = transaction.description
+ ext = transaction._extension
+ if ext:
+ ext = cPickle.dumps(ext, 1)
+ else:
+ ext = ""
+ self._ude = user, desc, ext
+ self._tstatus = status
+
+ adapter = self._adapter
+ cursor = self._store_cursor
+ if cursor is not None:
+ # Store cursor is still open, so try to use it again.
+ try:
+ adapter.restart_store(cursor)
+ except POSException.StorageError:
+ cursor = None
+ log.exception("Store connection failed; retrying")
+ self._drop_store_connection()
+ if cursor is None:
+ conn, cursor = adapter.open_for_store()
+ self._store_conn, self._store_cursor = conn, cursor
+
+ if tid is not None:
+ # get the commit lock and add the transaction now
+ packed = (status == 'p')
+ adapter.start_commit(cursor)
+ tid_int = u64(tid)
+ try:
+ adapter.add_transaction(
+ cursor, tid_int, user, desc, ext, packed)
+ except:
+ self._drop_store_connection()
+ raise
+ # else choose the tid later
+ self._tid = tid
+
+ finally:
+ self._lock_release()
+
+ def _prepare_tid(self):
+ """Choose a tid for the current transaction.
+
+ This should be done as late in the commit as possible, since
+ it must hold an exclusive commit lock.
+ """
+ if self._tid is not None:
+ return
+ if self._transaction is None:
+ raise POSException.StorageError("No transaction in progress")
+
+ adapter = self._adapter
+ cursor = self._store_cursor
+ adapter.start_commit(cursor)
+ user, desc, ext = self._ude
+
+ # Choose a transaction ID.
+ # Base the transaction ID on the database time,
+ # while ensuring that the tid of this transaction
+ # is greater than any existing tid.
+ last_tid, now = adapter.get_tid_and_time(cursor)
+ stamp = TimeStamp(*(time.gmtime(now)[:5] + (now % 60,)))
+ stamp = stamp.laterThan(TimeStamp(p64(last_tid)))
+ tid = repr(stamp)
+
+ tid_int = u64(tid)
+ adapter.add_transaction(cursor, tid_int, user, desc, ext)
+ self._tid = tid
+
+
+ def _clear_temp(self):
+ # It is assumed that self._lock_acquire was called before this
+ # method was called.
+ self._prepared_txn = None
+ self._max_stored_oid = 0
+
+
+ def _finish_store(self):
+ """Move stored objects from the temporary table to final storage.
+
+ Returns a list of (oid, tid) to be received by
+ Connection._handle_serial().
+ """
+ assert self._tid is not None
+ cursor = self._store_cursor
+ adapter = self._adapter
+
+ # Detect conflicting changes.
+ # Try to resolve the conflicts.
+ resolved = set() # a set of OIDs
+ while True:
+ conflict = adapter.detect_conflict(cursor)
+ if conflict is None:
+ break
+
+ oid_int, prev_tid_int, serial_int, data = conflict
+ oid = p64(oid_int)
+ prev_tid = p64(prev_tid_int)
+ serial = p64(serial_int)
+
+ rdata = self.tryToResolveConflict(oid, prev_tid, serial, data)
+ if rdata is None:
+ # unresolvable; kill the whole transaction
+ raise POSException.ConflictError(
+ oid=oid, serials=(prev_tid, serial), data=data)
+ else:
+ # resolved
+ data = rdata
+ md5sum = md5.new(data).hexdigest()
+ self._adapter.replace_temp(
+ cursor, oid_int, prev_tid_int, md5sum, data)
+ resolved.add(oid)
+
+ # Move the new states into the permanent table
+ tid_int = u64(self._tid)
+ serials = []
+ oid_ints = adapter.move_from_temp(cursor, tid_int)
+ for oid_int in oid_ints:
+ oid = p64(oid_int)
+ if oid in resolved:
+ serial = ConflictResolution.ResolvedSerial
+ else:
+ serial = self._tid
+ serials.append((oid, serial))
+
+ return serials
+
+
+ def _vote(self):
+ """Prepare the transaction for final commit."""
+ # This method initiates a two-phase commit process,
+ # saving the name of the prepared transaction in self._prepared_txn.
+
+ # It is assumed that self._lock_acquire was called before this
+ # method was called.
+
+ if self._prepared_txn is not None:
+ # the vote phase has already completed
+ return
+
+ cursor = self._store_cursor
+ assert cursor is not None
+
+ if self._max_stored_oid > self._max_new_oid:
+ self._adapter.set_min_oid(cursor, self._max_stored_oid + 1)
+
+ self._prepare_tid()
+ tid_int = u64(self._tid)
+
+ serials = self._finish_store()
+ self._adapter.update_current(cursor, tid_int)
+ self._prepared_txn = self._adapter.commit_phase1(cursor, tid_int)
+
+ return serials
+
+
+ def tpc_vote(self, transaction):
+ self._lock_acquire()
+ try:
+ if transaction is not self._transaction:
+ return
+ try:
+ return self._vote()
+ except:
+ if abort_early:
+ # abort early to avoid lockups while running the
+ # somewhat brittle ZODB test suite
+ self.tpc_abort(transaction)
+ raise
+ finally:
+ self._lock_release()
+
+
+ def _finish(self, tid, user, desc, ext):
+ """Commit the transaction."""
+ # It is assumed that self._lock_acquire was called before this
+ # method was called.
+ assert self._tid is not None
+ self._rollback_load_connection()
+ txn = self._prepared_txn
+ assert txn is not None
+ self._adapter.commit_phase2(self._store_cursor, txn)
+ self._prepared_txn = None
+ self._ltid = self._tid
+ self._tid = None
+
+ def _abort(self):
+ # the lock is held here
+ self._rollback_load_connection()
+ if self._store_cursor is not None:
+ self._adapter.abort(self._store_cursor, self._prepared_txn)
+ self._prepared_txn = None
+ self._tid = None
+
+ def lastTransaction(self):
+ return self._ltid
+
+ def new_oid(self):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ self._lock_acquire()
+ try:
+ cursor = self._load_cursor
+ if cursor is None:
+ self._open_load_connection()
+ cursor = self._load_cursor
+ oid_int = self._adapter.new_oid(cursor)
+ self._max_new_oid = max(self._max_new_oid, oid_int)
+ return p64(oid_int)
+ finally:
+ self._lock_release()
+
+ def cleanup(self):
+ pass
+
+ def supportsVersions(self):
+ return False
+
+ def modifiedInVersion(self, oid):
+ return ''
+
+ def supportsUndo(self):
+ return True
+
+ def supportsTransactionalUndo(self):
+ return True
+
+ def undoLog(self, first=0, last=-20, filter=None):
+ if last < 0:
+ last = first - last
+
+ # use a private connection to ensure the most current results
+ adapter = self._adapter
+ conn, cursor = adapter.open()
+ try:
+ rows = adapter.iter_transactions(cursor)
+ i = 0
+ res = []
+ for tid_int, user, desc, ext in rows:
+ tid = p64(tid_int)
+ d = {'id': base64.encodestring(tid)[:-1],
+ 'time': TimeStamp(tid).timeTime(),
+ 'user_name': user or '',
+ 'description': desc or ''}
+ if ext:
+ ext = str(ext)
+ if ext:
+ d.update(cPickle.loads(ext))
+ if filter is None or filter(d):
+ if i >= first:
+ res.append(d)
+ i += 1
+ if i >= last:
+ break
+ return res
+
+ finally:
+ adapter.close(conn, cursor)
+
+ def history(self, oid, version=None, size=1, filter=None):
+ self._lock_acquire()
+ try:
+ cursor = self._load_cursor
+ oid_int = u64(oid)
+ try:
+ rows = self._adapter.iter_object_history(cursor, oid_int)
+ except KeyError:
+ raise KeyError(oid)
+
+ res = []
+ for tid_int, username, description, extension, length in rows:
+ tid = p64(tid_int)
+ if extension:
+ d = loads(extension)
+ else:
+ d = {}
+ d.update({"time": TimeStamp(tid).timeTime(),
+ "user_name": username or '',
+ "description": description or '',
+ "tid": tid,
+ "version": '',
+ "size": length,
+ })
+ if filter is None or filter(d):
+ res.append(d)
+ if size is not None and len(res) >= size:
+ break
+ return res
+ finally:
+ self._lock_release()
+
+
+ def undo(self, transaction_id, transaction):
+ """Undo a transaction identified by transaction_id.
+
+ transaction_id is the base 64 encoding of an 8 byte tid.
+ Undo by writing new data that reverses the action taken by
+ the transaction.
+ """
+
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+ if transaction is not self._transaction:
+ raise POSException.StorageTransactionError(self, transaction)
+
+ undo_tid = base64.decodestring(transaction_id + '\n')
+ assert len(undo_tid) == 8
+ undo_tid_int = u64(undo_tid)
+
+ self._lock_acquire()
+ try:
+ adapter = self._adapter
+ cursor = self._store_cursor
+ assert cursor is not None
+
+ adapter.hold_pack_lock(cursor)
+ try:
+ # Note that _prepare_tid acquires the commit lock.
+ # The commit lock must be acquired after the pack lock
+ # because the database adapters also acquire in that
+ # order during packing.
+ self._prepare_tid()
+ adapter.verify_undoable(cursor, undo_tid_int)
+
+ self_tid_int = u64(self._tid)
+ oid_ints = adapter.undo(cursor, undo_tid_int, self_tid_int)
+ oids = [p64(oid_int) for oid_int in oid_ints]
+
+ # Update the current object pointers immediately, so that
+ # subsequent undo operations within this transaction will see
+ # the new current objects.
+ adapter.update_current(cursor, self_tid_int)
+
+ return self._tid, oids
+ finally:
+ adapter.release_pack_lock(cursor)
+ finally:
+ self._lock_release()
+
+
+ def set_pack_gc(self, pack_gc):
+ """Configures whether garbage collection during packing is enabled.
+
+ Garbage collection is enabled by default. If GC is disabled,
+ packing keeps at least one revision of every object.
+ With GC disabled, the pack code does not need to follow object
+ references, making packing conceivably much faster.
+ However, some of that benefit may be lost due to an ever
+ increasing number of unused objects.
+
+ Disabling garbage collection is also a hack that ensures
+ inter-database references never break.
+ """
+ self._pack_gc = pack_gc
+
+
+ def pack(self, t, referencesf):
+ if self._is_read_only:
+ raise POSException.ReadOnlyError()
+
+ pack_point = repr(TimeStamp(*time.gmtime(t)[:5]+(t%60,)))
+ pack_point_int = u64(pack_point)
+
+ def get_references(state):
+ """Return the set of OIDs the given state refers to."""
+ refs = set()
+ if state:
+ for oid in referencesf(str(state)):
+ refs.add(u64(oid))
+ return refs
+
+ # Use a private connection (lock_conn and lock_cursor) to
+ # hold the pack lock. Have the adapter open temporary
+ # connections to do the actual work, allowing the adapter
+ # to use special transaction modes for packing.
+ adapter = self._adapter
+ lock_conn, lock_cursor = adapter.open()
+ try:
+ adapter.hold_pack_lock(lock_cursor)
+ try:
+ # Find the latest commit before or at the pack time.
+ tid_int = adapter.choose_pack_transaction(pack_point_int)
+ if tid_int is None:
+ # Nothing needs to be packed.
+ return
+
+ # In pre_pack, the adapter fills tables with
+ # information about what to pack. The adapter
+ # should not actually pack anything yet.
+ adapter.pre_pack(tid_int, get_references, self._pack_gc)
+
+ # Now pack.
+ adapter.pack(tid_int)
+ self._after_pack()
+ finally:
+ adapter.release_pack_lock(lock_cursor)
+ finally:
+ lock_conn.rollback()
+ adapter.close(lock_conn, lock_cursor)
+
+
+ def _after_pack(self):
+ """Reset the transaction state after packing."""
+ # The tests depend on this.
+ self._rollback_load_connection()
+
+ def iterator(self, start=None, stop=None):
+ return TransactionIterator(self._adapter, start, stop)
+
+
+class BoundRelStorage(RelStorage):
+ """Storage to a database, bound to a particular ZODB.Connection."""
+
+ # The propagate_invalidations flag, set to a false value, tells
+ # the Connection not to propagate object invalidations across
+ # connections, since that ZODB feature is detrimental when the
+ # storage provides its own MVCC.
+ propagate_invalidations = False
+
+ def __init__(self, parent, zodb_conn):
+ # self._zodb_conn = zodb_conn
+ RelStorage.__init__(self, adapter=parent._adapter, name=parent._name,
+ create=False, read_only=parent._is_read_only,
+ poll_interval=parent._poll_interval, pack_gc=parent._pack_gc)
+ # _prev_polled_tid contains the tid at the previous poll
+ self._prev_polled_tid = None
+ self._showed_disconnect = False
+ self._poll_at = 0
+
+ def connection_closing(self):
+ """Release resources."""
+ if not self._poll_interval:
+ self._rollback_load_connection()
+ # else keep the load transaction open so that it's possible
+ # to ignore the next poll.
+
+ def sync(self):
+ """Process pending invalidations regardless of poll interval"""
+ self._lock_acquire()
+ try:
+ if self._load_transaction_open:
+ self._rollback_load_connection()
+ finally:
+ self._lock_release()
+
+ def poll_invalidations(self, retry=True):
+ """Looks for OIDs of objects that changed since _prev_polled_tid
+
+ Returns {oid: 1}, or None if all objects need to be invalidated
+ because prev_polled_tid is not in the database (presumably it
+ has been packed).
+ """
+ self._lock_acquire()
+ try:
+ if self._closed:
+ return {}
+
+ if self._poll_interval:
+ now = time.time()
+ if self._load_transaction_open and now < self._poll_at:
+ # It's not yet time to poll again. The previous load
+ # transaction is still open, so it's safe to
+ # ignore this poll.
+ return {}
+ # else poll now after resetting the timeout
+ self._poll_at = now + self._poll_interval
+
+ try:
+ self._rollback_load_connection()
+ self._start_load()
+ conn = self._load_conn
+ cursor = self._load_cursor
+
+ # Ignore changes made by the last transaction committed
+ # by this connection.
+ if self._ltid is not None:
+ ignore_tid = u64(self._ltid)
+ else:
+ ignore_tid = None
+
+ # get a list of changed OIDs and the most recent tid
+ oid_ints, new_polled_tid = self._adapter.poll_invalidations(
+ conn, cursor, self._prev_polled_tid, ignore_tid)
+ self._prev_polled_tid = new_polled_tid
+
+ if oid_ints is None:
+ oids = None
+ else:
+ oids = {}
+ for oid_int in oid_ints:
+ oids[p64(oid_int)] = 1
+ return oids
+ except POSException.StorageError:
+ # disconnected
+ self._poll_at = 0
+ if not retry:
+ raise
+ if not self._showed_disconnect:
+ log.warning("Lost connection in %s", repr(self))
+ self._showed_disconnect = True
+ self._open_load_connection()
+ log.info("Reconnected in %s", repr(self))
+ self._showed_disconnect = False
+ return self.poll_invalidations(retry=False)
+ finally:
+ self._lock_release()
+
+ def _after_pack(self):
+ # Override transaction reset after packing. If the connection
+ # wants to see the new state, it should call sync().
+ pass
+
+
+class TransactionIterator(object):
+ """Iterate over the transactions in a RelStorage instance."""
+
+ def __init__(self, adapter, start, stop):
+ self._adapter = adapter
+ self._conn, self._cursor = self._adapter.open_for_load()
+ self._closed = False
+
+ if start is not None:
+ start_int = u64(start)
+ else:
+ start_int = 1
+ if stop is not None:
+ stop_int = u64(stop)
+ else:
+ stop_int = None
+
+ # _transactions: [(tid, username, description, extension, packed)]
+ self._transactions = list(adapter.iter_transactions_range(
+ self._cursor, start_int, stop_int))
+ self._index = 0
+
+ def close(self):
+ self._adapter.close(self._conn, self._cursor)
+ self._closed = True
+
+ def iterator(self):
+ return self
+
+ def __len__(self):
+ return len(self._transactions)
+
+ def __getitem__(self, n):
+ self._index = n
+ return self.next()
+
+ def next(self):
+ if self._closed:
+ raise IOError("TransactionIterator already closed")
+ params = self._transactions[self._index]
+ res = RecordIterator(self, *params)
+ self._index += 1
+ return res
+
+
+class RecordIterator(object):
+ """Iterate over the objects in a transaction."""
+ def __init__(self, trans_iter, tid_int, user, desc, ext, packed):
+ self.tid = p64(tid_int)
+ self.status = packed and 'p' or ' '
+ self.user = user or ''
+ self.description = desc or ''
+ if ext:
+ self._extension = cPickle.loads(ext)
+ else:
+ self._extension = {}
+
+ cursor = trans_iter._cursor
+ adapter = trans_iter._adapter
+ self._records = list(adapter.iter_objects(cursor, tid_int))
+ self._index = 0
+
+ def __len__(self):
+ return len(self._records)
+
+ def __getitem__(self, n):
+ self._index = n
+ return self.next()
+
+ def next(self):
+ params = self._records[self._index]
+ res = Record(self.tid, *params)
+ self._index += 1
+ return res
+
+
+class Record(object):
+ """An object state in a transaction"""
+ version = ''
+ data_txn = None
+
+ def __init__(self, tid, oid_int, data):
+ self.tid = tid
+ self.oid = p64(oid_int)
+ if data is not None:
+ self.data = str(data)
+ else:
+ self.data = None
+
Deleted: relstorage/tags/1.0-beta1/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/trunk/relstorage/tests/reltestbase.py 2008-02-20 03:08:20 UTC (rev 84068)
+++ relstorage/tags/1.0-beta1/relstorage/tests/reltestbase.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -1,442 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""A foundation for relstorage adapter tests"""
-
-import unittest
-from relstorage.relstorage import RelStorage
-
-from ZODB.DB import DB
-from ZODB.utils import p64
-from ZODB.FileStorage import FileStorage
-from persistent.mapping import PersistentMapping
-import transaction
-
-from ZODB.tests import StorageTestBase, BasicStorage, \
- TransactionalUndoStorage, PackableStorage, \
- Synchronization, ConflictResolution, HistoryStorage, \
- IteratorStorage, RevisionStorage, PersistentStorage, \
- MTStorage, ReadOnlyStorage, RecoveryStorage
-
-from ZODB.tests.MinPO import MinPO
-from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
-
-
-class BaseRelStorageTests(StorageTestBase.StorageTestBase):
-
- def make_adapter(self):
- # abstract method
- raise NotImplementedError
-
- def open(self, **kwargs):
- adapter = self.make_adapter()
- self._storage = RelStorage(adapter, **kwargs)
-
- def setUp(self):
- self.open(create=1)
- self._storage._zap()
-
- def tearDown(self):
- self._storage.close()
- self._storage.cleanup()
-
-
-class RelStorageTests(
- BaseRelStorageTests,
- BasicStorage.BasicStorage,
- TransactionalUndoStorage.TransactionalUndoStorage,
- RevisionStorage.RevisionStorage,
- PackableStorage.PackableStorage,
- PackableStorage.PackableUndoStorage,
- Synchronization.SynchronizedStorage,
- ConflictResolution.ConflictResolvingStorage,
- HistoryStorage.HistoryStorage,
- IteratorStorage.IteratorStorage,
- IteratorStorage.ExtendedIteratorStorage,
- PersistentStorage.PersistentStorage,
- MTStorage.MTStorage,
- ReadOnlyStorage.ReadOnlyStorage
- ):
-
- def checkCrossConnectionInvalidation(self):
- # Verify connections see updated state at txn boundaries
- db = DB(self._storage)
- try:
- c1 = db.open()
- r1 = c1.root()
- r1['myobj'] = 'yes'
- c2 = db.open()
- r2 = c2.root()
- self.assert_('myobj' not in r2)
-
- storage = c1._storage
- t = transaction.Transaction()
- t.description = 'invalidation test'
- storage.tpc_begin(t)
- c1.commit(t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
-
- self.assert_('myobj' not in r2)
- c2.sync()
- self.assert_('myobj' in r2)
- self.assert_(r2['myobj'] == 'yes')
- finally:
- db.close()
-
- def checkCrossConnectionIsolation(self):
- # Verify MVCC isolates connections
- db = DB(self._storage)
- try:
- c1 = db.open()
- r1 = c1.root()
- r1['alpha'] = PersistentMapping()
- r1['gamma'] = PersistentMapping()
- transaction.commit()
-
- # Open a second connection but don't load root['alpha'] yet
- c2 = db.open()
- r2 = c2.root()
-
- r1['alpha']['beta'] = 'yes'
-
- storage = c1._storage
- t = transaction.Transaction()
- t.description = 'isolation test 1'
- storage.tpc_begin(t)
- c1.commit(t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
-
- # The second connection will now load root['alpha'], but due to
- # MVCC, it should continue to see the old state.
- self.assert_(r2['alpha']._p_changed is None) # A ghost
- self.assert_(not r2['alpha'])
- self.assert_(r2['alpha']._p_changed == 0)
-
- # make root['alpha'] visible to the second connection
- c2.sync()
-
- # Now it should be in sync
- self.assert_(r2['alpha']._p_changed is None) # A ghost
- self.assert_(r2['alpha'])
- self.assert_(r2['alpha']._p_changed == 0)
- self.assert_(r2['alpha']['beta'] == 'yes')
-
- # Repeat the test with root['gamma']
- r1['gamma']['delta'] = 'yes'
-
- storage = c1._storage
- t = transaction.Transaction()
- t.description = 'isolation test 2'
- storage.tpc_begin(t)
- c1.commit(t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
-
- # The second connection will now load root[3], but due to MVCC,
- # it should continue to see the old state.
- self.assert_(r2['gamma']._p_changed is None) # A ghost
- self.assert_(not r2['gamma'])
- self.assert_(r2['gamma']._p_changed == 0)
-
- # make root[3] visible to the second connection
- c2.sync()
-
- # Now it should be in sync
- self.assert_(r2['gamma']._p_changed is None) # A ghost
- self.assert_(r2['gamma'])
- self.assert_(r2['gamma']._p_changed == 0)
- self.assert_(r2['gamma']['delta'] == 'yes')
- finally:
- db.close()
-
- def checkResolveConflictBetweenConnections(self):
- # Verify that conflict resolution works between storage instances
- # bound to connections.
- obj = ConflictResolution.PCounter()
- obj.inc()
-
- oid = self._storage.new_oid()
-
- revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))
-
- storage1 = self._storage.bind_connection(None)
- storage1.load(oid, '')
- storage2 = self._storage.bind_connection(None)
- storage2.load(oid, '')
-
- obj.inc()
- obj.inc()
- # The effect of committing two transactions with the same
- # pickle is to commit two different transactions relative to
- # revid1 that add two to _value.
- root_storage = self._storage
- try:
- self._storage = storage1
- revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
- self._storage = storage2
- revid3 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
-
- data, serialno = self._storage.load(oid, '')
- inst = zodb_unpickle(data)
- self.assertEqual(inst._value, 5)
- finally:
- self._storage = root_storage
-
- def check16KObject(self):
- # Store 16 * 1024 bytes in an object, then retrieve it
- data = 'a 16 byte string' * 1024
- oid = self._storage.new_oid()
- self._dostoreNP(oid, data=data)
- got, serialno = self._storage.load(oid, '')
- self.assertEqual(len(got), len(data))
- self.assertEqual(got, data)
-
- def check16MObject(self):
- # Store 16 * 1024 * 1024 bytes in an object, then retrieve it
- data = 'a 16 byte string' * (1024 * 1024)
- oid = self._storage.new_oid()
- self._dostoreNP(oid, data=data)
- got, serialno = self._storage.load(oid, '')
- self.assertEqual(len(got), len(data))
- self.assertEqual(got, data)
-
- def checkMultipleStores(self):
- # Verify a connection can commit multiple transactions
- db = DB(self._storage)
- try:
- c1 = db.open()
- r1 = c1.root()
- r1['alpha'] = 1
- transaction.commit()
- r1['alpha'] = 2
- transaction.commit()
- finally:
- db.close()
-
- def checkPollInterval(self):
- # Verify the poll_interval parameter causes RelStorage to
- # delay invalidation polling.
- self._storage._poll_interval = 3600
- db = DB(self._storage)
- try:
- c1 = db.open()
- r1 = c1.root()
- r1['alpha'] = 1
- transaction.commit()
-
- c2 = db.open()
- r2 = c2.root()
- self.assertEqual(r2['alpha'], 1)
-
- r1['alpha'] = 2
- # commit c1 without triggering c2.afterCompletion().
- storage = c1._storage
- t = transaction.Transaction()
- storage.tpc_begin(t)
- c1.commit(t)
- storage.tpc_vote(t)
- storage.tpc_finish(t)
-
- # c2 should not see the change yet
- r2 = c2.root()
- self.assertEqual(r2['alpha'], 1)
-
- # expire the poll timer and verify c2 sees the change
- c2._storage._poll_at -= 3601
- c2._flush_invalidations()
- r2 = c2.root()
- self.assertEqual(r2['alpha'], 2)
-
- transaction.abort()
- c2.close()
- c1.close()
-
- finally:
- db.close()
-
-
- def checkTransactionalUndoIterator(self):
- # this test overrides the broken version in TransactionalUndoStorage.
-
- s = self._storage
-
- BATCHES = 4
- OBJECTS = 4
-
- orig = []
- for i in range(BATCHES):
- t = transaction.Transaction()
- tid = p64(i + 1)
- s.tpc_begin(t, tid)
- txn_orig = []
- for j in range(OBJECTS):
- oid = s.new_oid()
- obj = MinPO(i * OBJECTS + j)
- revid = s.store(oid, None, zodb_pickle(obj), '', t)
- txn_orig.append((tid, oid, revid))
- serials = s.tpc_vote(t)
- if not serials:
- orig.extend(txn_orig)
- else:
- # The storage provided revision IDs after the vote
- serials = dict(serials)
- for tid, oid, revid in txn_orig:
- self.assertEqual(revid, None)
- orig.append((tid, oid, serials[oid]))
- s.tpc_finish(t)
-
- i = 0
- for tid, oid, revid in orig:
- self._dostore(oid, revid=revid, data=MinPO(revid),
- description="update %s" % i)
-
- # Undo the OBJECTS transactions that modified objects created
- # in the ith original transaction.
-
- def undo(i):
- info = s.undoInfo()
- t = transaction.Transaction()
- s.tpc_begin(t)
- base = i * OBJECTS + i
- for j in range(OBJECTS):
- tid = info[base + j]['id']
- s.undo(tid, t)
- s.tpc_vote(t)
- s.tpc_finish(t)
-
- for i in range(BATCHES):
- undo(i)
-
- # There are now (2 + OBJECTS) * BATCHES transactions:
- # BATCHES original transactions, followed by
- # OBJECTS * BATCHES modifications, followed by
- # BATCHES undos
-
- iter = s.iterator()
- offset = 0
-
- eq = self.assertEqual
-
- for i in range(BATCHES):
- txn = iter[offset]
- offset += 1
-
- tid = p64(i + 1)
- eq(txn.tid, tid)
-
- L1 = [(rec.oid, rec.tid, rec.data_txn) for rec in txn]
- L2 = [(oid, revid, None) for _tid, oid, revid in orig
- if _tid == tid]
-
- eq(L1, L2)
-
- for i in range(BATCHES * OBJECTS):
- txn = iter[offset]
- offset += 1
- eq(len([rec for rec in txn if rec.data_txn is None]), 1)
-
- for i in range(BATCHES):
- txn = iter[offset]
- offset += 1
-
- # The undos are performed in reverse order.
- otid = p64(BATCHES - i)
- L1 = [rec.oid for rec in txn]
- L2 = [oid for _tid, oid, revid in orig if _tid == otid]
- L1.sort()
- L2.sort()
- eq(L1, L2)
-
- self.assertRaises(IndexError, iter.__getitem__, offset)
-
-
-class IteratorDeepCompareUnordered:
- # Like IteratorDeepCompare, but compensates for OID order
- # differences in transactions.
- def compare(self, storage1, storage2):
- eq = self.assertEqual
- iter1 = storage1.iterator()
- iter2 = storage2.iterator()
- for txn1, txn2 in zip(iter1, iter2):
- eq(txn1.tid, txn2.tid)
- eq(txn1.status, txn2.status)
- eq(txn1.user, txn2.user)
- eq(txn1.description, txn2.description)
- eq(txn1._extension, txn2._extension)
- recs1 = [(r.oid, r) for r in txn1]
- recs1.sort()
- recs2 = [(r.oid, r) for r in txn2]
- recs2.sort()
- for (oid1, rec1), (oid2, rec2) in zip(recs1, recs2):
- eq(rec1.oid, rec2.oid)
- eq(rec1.tid, rec2.tid)
- eq(rec1.version, rec2.version)
- eq(rec1.data, rec2.data)
- # Make sure there are no more records left in rec1 and rec2,
- # meaning they were the same length.
- self.assertRaises(IndexError, txn1.next)
- self.assertRaises(IndexError, txn2.next)
- # Make sure ther are no more records left in txn1 and txn2, meaning
- # they were the same length
- self.assertRaises(IndexError, iter1.next)
- self.assertRaises(IndexError, iter2.next)
- iter1.close()
- iter2.close()
-
-
-
-class RecoveryStorageSubset(IteratorDeepCompareUnordered):
- # The subset of RecoveryStorage tests that do not rely on version
- # support.
- pass
-
-for name, attr in RecoveryStorage.RecoveryStorage.__dict__.items():
- if 'check' in name and 'Version' not in name:
- setattr(RecoveryStorageSubset, name, attr)
-
-
-class ToFileStorage(BaseRelStorageTests, RecoveryStorageSubset):
- def setUp(self):
- self.open(create=1)
- self._storage._zap()
- self._dst = FileStorage("Dest.fs", create=True)
-
- def tearDown(self):
- self._storage.close()
- self._dst.close()
- self._storage.cleanup()
- self._dst.cleanup()
-
- def new_dest(self):
- return FileStorage('Dest.fs')
-
-
-class FromFileStorage(BaseRelStorageTests, RecoveryStorageSubset):
- def setUp(self):
- self.open(create=1)
- self._storage._zap()
- self._dst = self._storage
- self._storage = FileStorage("Source.fs", create=True)
-
- def tearDown(self):
- self._storage.close()
- self._dst.close()
- self._storage.cleanup()
- self._dst.cleanup()
-
- def new_dest(self):
- return self._dst
-
-
Copied: relstorage/tags/1.0-beta1/relstorage/tests/reltestbase.py (from rev 84129, relstorage/trunk/relstorage/tests/reltestbase.py)
===================================================================
--- relstorage/tags/1.0-beta1/relstorage/tests/reltestbase.py (rev 0)
+++ relstorage/tags/1.0-beta1/relstorage/tests/reltestbase.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,510 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""A foundation for relstorage adapter tests"""
+
+import time
+import unittest
+from relstorage.relstorage import RelStorage
+
+from ZODB.DB import DB
+from ZODB.utils import p64
+from ZODB.FileStorage import FileStorage
+from persistent.mapping import PersistentMapping
+import transaction
+
+from ZODB.tests import StorageTestBase, BasicStorage, \
+ TransactionalUndoStorage, PackableStorage, \
+ Synchronization, ConflictResolution, HistoryStorage, \
+ IteratorStorage, RevisionStorage, PersistentStorage, \
+ MTStorage, ReadOnlyStorage, RecoveryStorage
+
+from ZODB.tests.MinPO import MinPO
+from ZODB.tests.StorageTestBase import zodb_unpickle, zodb_pickle
+from ZODB.serialize import referencesf
+
+
+class BaseRelStorageTests(StorageTestBase.StorageTestBase):
+
+ def make_adapter(self):
+ # abstract method
+ raise NotImplementedError
+
+ def open(self, **kwargs):
+ adapter = self.make_adapter()
+ self._storage = RelStorage(adapter, **kwargs)
+
+ def setUp(self):
+ self.open(create=1)
+ self._storage.zap_all()
+
+ def tearDown(self):
+ self._storage.close()
+ self._storage.cleanup()
+
+
+class RelStorageTests(
+ BaseRelStorageTests,
+ BasicStorage.BasicStorage,
+ TransactionalUndoStorage.TransactionalUndoStorage,
+ RevisionStorage.RevisionStorage,
+ PackableStorage.PackableStorage,
+ PackableStorage.PackableUndoStorage,
+ Synchronization.SynchronizedStorage,
+ ConflictResolution.ConflictResolvingStorage,
+ HistoryStorage.HistoryStorage,
+ IteratorStorage.IteratorStorage,
+ IteratorStorage.ExtendedIteratorStorage,
+ PersistentStorage.PersistentStorage,
+ MTStorage.MTStorage,
+ ReadOnlyStorage.ReadOnlyStorage
+ ):
+
+ def checkCrossConnectionInvalidation(self):
+ # Verify connections see updated state at txn boundaries
+ db = DB(self._storage)
+ try:
+ c1 = db.open()
+ r1 = c1.root()
+ r1['myobj'] = 'yes'
+ c2 = db.open()
+ r2 = c2.root()
+ self.assert_('myobj' not in r2)
+
+ storage = c1._storage
+ t = transaction.Transaction()
+ t.description = 'invalidation test'
+ storage.tpc_begin(t)
+ c1.commit(t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
+
+ self.assert_('myobj' not in r2)
+ c2.sync()
+ self.assert_('myobj' in r2)
+ self.assert_(r2['myobj'] == 'yes')
+ finally:
+ db.close()
+
+ def checkCrossConnectionIsolation(self):
+ # Verify MVCC isolates connections
+ db = DB(self._storage)
+ try:
+ c1 = db.open()
+ r1 = c1.root()
+ r1['alpha'] = PersistentMapping()
+ r1['gamma'] = PersistentMapping()
+ transaction.commit()
+
+ # Open a second connection but don't load root['alpha'] yet
+ c2 = db.open()
+ r2 = c2.root()
+
+ r1['alpha']['beta'] = 'yes'
+
+ storage = c1._storage
+ t = transaction.Transaction()
+ t.description = 'isolation test 1'
+ storage.tpc_begin(t)
+ c1.commit(t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
+
+ # The second connection will now load root['alpha'], but due to
+ # MVCC, it should continue to see the old state.
+ self.assert_(r2['alpha']._p_changed is None) # A ghost
+ self.assert_(not r2['alpha'])
+ self.assert_(r2['alpha']._p_changed == 0)
+
+ # make root['alpha'] visible to the second connection
+ c2.sync()
+
+ # Now it should be in sync
+ self.assert_(r2['alpha']._p_changed is None) # A ghost
+ self.assert_(r2['alpha'])
+ self.assert_(r2['alpha']._p_changed == 0)
+ self.assert_(r2['alpha']['beta'] == 'yes')
+
+ # Repeat the test with root['gamma']
+ r1['gamma']['delta'] = 'yes'
+
+ storage = c1._storage
+ t = transaction.Transaction()
+ t.description = 'isolation test 2'
+ storage.tpc_begin(t)
+ c1.commit(t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
+
+ # The second connection will now load root[3], but due to MVCC,
+ # it should continue to see the old state.
+ self.assert_(r2['gamma']._p_changed is None) # A ghost
+ self.assert_(not r2['gamma'])
+ self.assert_(r2['gamma']._p_changed == 0)
+
+ # make root[3] visible to the second connection
+ c2.sync()
+
+ # Now it should be in sync
+ self.assert_(r2['gamma']._p_changed is None) # A ghost
+ self.assert_(r2['gamma'])
+ self.assert_(r2['gamma']._p_changed == 0)
+ self.assert_(r2['gamma']['delta'] == 'yes')
+ finally:
+ db.close()
+
+ def checkResolveConflictBetweenConnections(self):
+ # Verify that conflict resolution works between storage instances
+ # bound to connections.
+ obj = ConflictResolution.PCounter()
+ obj.inc()
+
+ oid = self._storage.new_oid()
+
+ revid1 = self._dostoreNP(oid, data=zodb_pickle(obj))
+
+ storage1 = self._storage.bind_connection(None)
+ storage1.load(oid, '')
+ storage2 = self._storage.bind_connection(None)
+ storage2.load(oid, '')
+
+ obj.inc()
+ obj.inc()
+ # The effect of committing two transactions with the same
+ # pickle is to commit two different transactions relative to
+ # revid1 that add two to _value.
+ root_storage = self._storage
+ try:
+ self._storage = storage1
+ revid2 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
+ self._storage = storage2
+ revid3 = self._dostoreNP(oid, revid=revid1, data=zodb_pickle(obj))
+
+ data, serialno = self._storage.load(oid, '')
+ inst = zodb_unpickle(data)
+ self.assertEqual(inst._value, 5)
+ finally:
+ self._storage = root_storage
+
+ def check16KObject(self):
+ # Store 16 * 1024 bytes in an object, then retrieve it
+ data = 'a 16 byte string' * 1024
+ oid = self._storage.new_oid()
+ self._dostoreNP(oid, data=data)
+ got, serialno = self._storage.load(oid, '')
+ self.assertEqual(len(got), len(data))
+ self.assertEqual(got, data)
+
+ def checkPreventOIDOverlap(self):
+ # Store an object with a particular OID, then verify that
+ # OID is not reused.
+ data = 'mydata'
+ oid1 = '\0' * 7 + '\x0f'
+ self._dostoreNP(oid1, data=data)
+ oid2 = self._storage.new_oid()
+ self.assert_(oid1 < oid2, 'old OID %r should be less than new OID %r'
+ % (oid1, oid2))
+
+ def check16MObject(self):
+ # Store 16 * 1024 * 1024 bytes in an object, then retrieve it
+ data = 'a 16 byte string' * (1024 * 1024)
+ oid = self._storage.new_oid()
+ self._dostoreNP(oid, data=data)
+ got, serialno = self._storage.load(oid, '')
+ self.assertEqual(len(got), len(data))
+ self.assertEqual(got, data)
+
+ def checkMultipleStores(self):
+ # Verify a connection can commit multiple transactions
+ db = DB(self._storage)
+ try:
+ c1 = db.open()
+ r1 = c1.root()
+ r1['alpha'] = 1
+ transaction.commit()
+ r1['alpha'] = 2
+ transaction.commit()
+ finally:
+ db.close()
+
+ def checkPollInterval(self):
+ # Verify the poll_interval parameter causes RelStorage to
+ # delay invalidation polling.
+ self._storage._poll_interval = 3600
+ db = DB(self._storage)
+ try:
+ c1 = db.open()
+ r1 = c1.root()
+ r1['alpha'] = 1
+ transaction.commit()
+
+ c2 = db.open()
+ r2 = c2.root()
+ self.assertEqual(r2['alpha'], 1)
+
+ r1['alpha'] = 2
+ # commit c1 without triggering c2.afterCompletion().
+ storage = c1._storage
+ t = transaction.Transaction()
+ storage.tpc_begin(t)
+ c1.commit(t)
+ storage.tpc_vote(t)
+ storage.tpc_finish(t)
+
+ # c2 should not see the change yet
+ r2 = c2.root()
+ self.assertEqual(r2['alpha'], 1)
+
+ # expire the poll timer and verify c2 sees the change
+ c2._storage._poll_at -= 3601
+ c2._flush_invalidations()
+ r2 = c2.root()
+ self.assertEqual(r2['alpha'], 2)
+
+ transaction.abort()
+ c2.close()
+ c1.close()
+
+ finally:
+ db.close()
+
+
+ def checkTransactionalUndoIterator(self):
+ # this test overrides the broken version in TransactionalUndoStorage.
+
+ s = self._storage
+
+ BATCHES = 4
+ OBJECTS = 4
+
+ orig = []
+ for i in range(BATCHES):
+ t = transaction.Transaction()
+ tid = p64(i + 1)
+ s.tpc_begin(t, tid)
+ txn_orig = []
+ for j in range(OBJECTS):
+ oid = s.new_oid()
+ obj = MinPO(i * OBJECTS + j)
+ revid = s.store(oid, None, zodb_pickle(obj), '', t)
+ txn_orig.append((tid, oid, revid))
+ serials = s.tpc_vote(t)
+ if not serials:
+ orig.extend(txn_orig)
+ else:
+ # The storage provided revision IDs after the vote
+ serials = dict(serials)
+ for tid, oid, revid in txn_orig:
+ self.assertEqual(revid, None)
+ orig.append((tid, oid, serials[oid]))
+ s.tpc_finish(t)
+
+ i = 0
+ for tid, oid, revid in orig:
+ self._dostore(oid, revid=revid, data=MinPO(revid),
+ description="update %s" % i)
+
+ # Undo the OBJECTS transactions that modified objects created
+ # in the ith original transaction.
+
+ def undo(i):
+ info = s.undoInfo()
+ t = transaction.Transaction()
+ s.tpc_begin(t)
+ base = i * OBJECTS + i
+ for j in range(OBJECTS):
+ tid = info[base + j]['id']
+ s.undo(tid, t)
+ s.tpc_vote(t)
+ s.tpc_finish(t)
+
+ for i in range(BATCHES):
+ undo(i)
+
+ # There are now (2 + OBJECTS) * BATCHES transactions:
+ # BATCHES original transactions, followed by
+ # OBJECTS * BATCHES modifications, followed by
+ # BATCHES undos
+
+ iter = s.iterator()
+ offset = 0
+
+ eq = self.assertEqual
+
+ for i in range(BATCHES):
+ txn = iter[offset]
+ offset += 1
+
+ tid = p64(i + 1)
+ eq(txn.tid, tid)
+
+ L1 = [(rec.oid, rec.tid, rec.data_txn) for rec in txn]
+ L2 = [(oid, revid, None) for _tid, oid, revid in orig
+ if _tid == tid]
+
+ eq(L1, L2)
+
+ for i in range(BATCHES * OBJECTS):
+ txn = iter[offset]
+ offset += 1
+ eq(len([rec for rec in txn if rec.data_txn is None]), 1)
+
+ for i in range(BATCHES):
+ txn = iter[offset]
+ offset += 1
+
+ # The undos are performed in reverse order.
+ otid = p64(BATCHES - i)
+ L1 = [rec.oid for rec in txn]
+ L2 = [oid for _tid, oid, revid in orig if _tid == otid]
+ L1.sort()
+ L2.sort()
+ eq(L1, L2)
+
+ self.assertRaises(IndexError, iter.__getitem__, offset)
+
+ def checkNonASCIITransactionMetadata(self):
+ # Verify the database stores and retrieves non-ASCII text
+ # in transaction metadata.
+ db = DB(self._storage)
+ try:
+ c1 = db.open()
+ r1 = c1.root()
+ r1['alpha'] = 1
+ user = u"\u65e5\u672c\u8e86"
+ transaction.get().setUser(user)
+ transaction.commit()
+ r1['alpha'] = 2
+ desc = u"Japanese: \u65e5\u672c\u8e86"
+ transaction.get().note(desc)
+ transaction.commit()
+
+ info = self._storage.undoInfo()
+ self.assertEqual(info[0]['description'], desc)
+ self.assertEqual(info[1]['user_name'], '/ ' + user)
+ finally:
+ db.close()
+
+ def checkPackGC(self, gc_enabled=True):
+ db = DB(self._storage)
+ try:
+ c1 = db.open()
+ r1 = c1.root()
+ r1['alpha'] = PersistentMapping()
+ transaction.commit()
+
+ oid = r1['alpha']._p_oid
+ r1['alpha'] = None
+ transaction.commit()
+
+ # The object should still exist
+ self._storage.load(oid, '')
+
+ # Pack
+ now = packtime = time.time()
+ while packtime <= now:
+ packtime = time.time()
+ self._storage.pack(packtime, referencesf)
+
+ if gc_enabled:
+ # The object should now be gone
+ self.assertRaises(KeyError, self._storage.load, oid, '')
+ else:
+ # The object should still exist
+ self._storage.load(oid, '')
+ finally:
+ db.close()
+
+ def checkPackGCDisabled(self):
+ self._storage.set_pack_gc(False)
+ self.checkPackGC(gc_enabled=False)
+
+
+class IteratorDeepCompareUnordered:
+ # Like IteratorDeepCompare, but compensates for OID order
+ # differences in transactions.
+ def compare(self, storage1, storage2):
+ eq = self.assertEqual
+ iter1 = storage1.iterator()
+ iter2 = storage2.iterator()
+ for txn1, txn2 in zip(iter1, iter2):
+ eq(txn1.tid, txn2.tid)
+ eq(txn1.status, txn2.status)
+ eq(txn1.user, txn2.user)
+ eq(txn1.description, txn2.description)
+ eq(txn1._extension, txn2._extension)
+ recs1 = [(r.oid, r) for r in txn1]
+ recs1.sort()
+ recs2 = [(r.oid, r) for r in txn2]
+ recs2.sort()
+ for (oid1, rec1), (oid2, rec2) in zip(recs1, recs2):
+ eq(rec1.oid, rec2.oid)
+ eq(rec1.tid, rec2.tid)
+ eq(rec1.version, rec2.version)
+ eq(rec1.data, rec2.data)
+ # Make sure there are no more records left in rec1 and rec2,
+ # meaning they were the same length.
+ self.assertRaises(IndexError, txn1.next)
+ self.assertRaises(IndexError, txn2.next)
+ # Make sure ther are no more records left in txn1 and txn2, meaning
+ # they were the same length
+ self.assertRaises(IndexError, iter1.next)
+ self.assertRaises(IndexError, iter2.next)
+ iter1.close()
+ iter2.close()
+
+
+
+class RecoveryStorageSubset(IteratorDeepCompareUnordered):
+ # The subset of RecoveryStorage tests that do not rely on version
+ # support.
+ pass
+
+for name, attr in RecoveryStorage.RecoveryStorage.__dict__.items():
+ if 'check' in name and 'Version' not in name:
+ setattr(RecoveryStorageSubset, name, attr)
+
+
+class ToFileStorage(BaseRelStorageTests, RecoveryStorageSubset):
+ def setUp(self):
+ self.open(create=1)
+ self._storage.zap_all()
+ self._dst = FileStorage("Dest.fs", create=True)
+
+ def tearDown(self):
+ self._storage.close()
+ self._dst.close()
+ self._storage.cleanup()
+ self._dst.cleanup()
+
+ def new_dest(self):
+ return FileStorage('Dest.fs')
+
+
+class FromFileStorage(BaseRelStorageTests, RecoveryStorageSubset):
+ def setUp(self):
+ self.open(create=1)
+ self._storage.zap_all()
+ self._dst = self._storage
+ self._storage = FileStorage("Source.fs", create=True)
+
+ def tearDown(self):
+ self._storage.close()
+ self._dst.close()
+ self._storage.cleanup()
+ self._dst.cleanup()
+
+ def new_dest(self):
+ return self._dst
+
+
Deleted: relstorage/tags/1.0-beta1/relstorage/tests/speedtest.py
===================================================================
--- relstorage/trunk/relstorage/tests/speedtest.py 2008-02-20 03:08:20 UTC (rev 84068)
+++ relstorage/tags/1.0-beta1/relstorage/tests/speedtest.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -1,378 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2008 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Compare the speed of RelStorage with FileStorage + ZEO.
-
-Splits into many processes to avoid contention over the global
-interpreter lock.
-"""
-
-import cPickle
-import logging
-import os
-import shutil
-import signal
-import sys
-import tempfile
-import time
-import traceback
-
-import transaction
-from BTrees.IOBTree import IOBTree
-from ZODB.DB import DB
-from ZODB.Connection import Connection
-
-from relstorage.relstorage import RelStorage
-
-debug = False
-txn_count = 10
-object_counts = [10000] # [1, 100, 10000]
-concurrency_levels = range(1, 16, 2)
-contenders = [
- ('ZEO + FileStorage', 'zeofs_test'),
- ('PostgreSQLAdapter', 'postgres_test'),
- ('MySQLAdapter', 'mysql_test'),
- ('OracleAdapter', 'oracle_test'),
- ]
-repetitions = 3
-max_attempts = 20
-
-
-class ChildProcessError(Exception):
- """A child process failed"""
-
-
-def run_in_child(wait, func, *args, **kw):
- pid = os.fork()
- if pid == 0:
- # child
- try:
- try:
- logging.basicConfig()
- if debug:
- logging.getLogger().setLevel(logging.DEBUG)
-
- func(*args, **kw)
- except:
- traceback.print_exc()
- os._exit(1)
- finally:
- os._exit(0)
- elif wait:
- pid_again, code = os.waitpid(pid, 0)
- if code:
- raise ChildProcessError(
- "process running %r failed with exit code %d" % (func, code))
- return pid
-
-
-class ZEOServerRunner(object):
-
- def __init__(self):
- self.dir = tempfile.mkdtemp()
- self.store_fn = os.path.join(self.dir, 'storage')
- self.sock_fn = os.path.join(self.dir, 'sock')
- self.pid = None
-
- def run(self):
- from ZODB.FileStorage import FileStorage
- from ZEO.StorageServer import StorageServer
-
- fs = FileStorage(self.store_fn, create=True)
- ss = StorageServer(self.sock_fn, {'1': fs})
-
- import ThreadedAsync.LoopCallback
- ThreadedAsync.LoopCallback.loop()
-
- def start(self):
- self.pid = run_in_child(False, self.run)
- # parent
- sys.stderr.write('Waiting for ZEO server to start...')
- while not os.path.exists(self.sock_fn):
- sys.stderr.write('.')
- sys.stderr.flush()
- time.sleep(0.1)
- sys.stderr.write(' started.\n')
- sys.stderr.flush()
-
- def stop(self):
- os.kill(self.pid, signal.SIGTERM)
- shutil.rmtree(self.dir)
-
-
-class SpeedTest:
-
- def __init__(self, concurrency, objects_per_txn, zeo_runner):
- self.concurrency = concurrency
- self.data_to_store = dict((n, 1) for n in range(objects_per_txn))
- self.zeo_runner = zeo_runner
-
- def populate(self, make_storage):
- # initialize the database
- storage = make_storage()
- db = DB(storage)
- conn = db.open()
- root = conn.root()
-
- # clear the database
- root['speedtest'] = None
- transaction.commit()
- db.pack()
-
- # put a tree in the database
- root['speedtest'] = t = IOBTree()
- for i in range(self.concurrency):
- t[i] = IOBTree()
- transaction.commit()
- conn.close()
- db.close()
- if debug:
- print >> sys.stderr, 'Populated storage.'
-
- def write_test(self, storage, n):
- db = DB(storage)
- start = time.time()
- for i in range(txn_count):
- conn = db.open()
- root = conn.root()
- myobj = root['speedtest'][n]
- myobj[i] = IOBTree(self.data_to_store)
- transaction.commit()
- conn.close()
- end = time.time()
- db.close()
- return end - start
-
- def read_test(self, storage, n):
- db = DB(storage)
- start = time.time()
- for i in range(txn_count):
- conn = db.open()
- root = conn.root()
- got = len(list(root['speedtest'][n][i]))
- expect = len(self.data_to_store)
- if got != expect:
- raise AssertionError
- conn.close()
- end = time.time()
- db.close()
- return end - start
-
- def run_tests(self, make_storage):
- """Run a write and read test.
-
- Returns the mean time per write transaction and
- the mean time per read transaction.
- """
- run_in_child(True, self.populate, make_storage)
- r = range(self.concurrency)
-
- def write(n):
- return self.write_test(make_storage(), n)
- def read(n):
- return self.read_test(make_storage(), n)
-
- write_times = distribute(write, r)
- read_times = distribute(read, r)
- count = float(self.concurrency * txn_count)
- return (sum(write_times) / count, sum(read_times) / count)
-
- def zeofs_test(self):
- def make_storage():
- from ZEO.ClientStorage import ClientStorage
- return ClientStorage(self.zeo_runner.sock_fn)
- return self.run_tests(make_storage)
-
- def postgres_test(self):
- from relstorage.adapters.postgresql import PostgreSQLAdapter
- adapter = PostgreSQLAdapter('dbname=relstoragetest')
- adapter.zap()
- def make_storage():
- return RelStorage(adapter)
- return self.run_tests(make_storage)
-
- def oracle_test(self):
- from relstorage.adapters.oracle import OracleAdapter
- from relstorage.tests.testoracle import getOracleParams
- user, password, dsn = getOracleParams()
- adapter = OracleAdapter(user, password, dsn)
- adapter.zap()
- def make_storage():
- return RelStorage(adapter)
- return self.run_tests(make_storage)
-
- def mysql_test(self):
- from relstorage.adapters.mysql import MySQLAdapter
- adapter = MySQLAdapter(db='relstoragetest')
- adapter.zap()
- def make_storage():
- return RelStorage(adapter)
- return self.run_tests(make_storage)
-
-
-def distribute(func, param_iter):
- """Call a function in separate processes concurrently.
-
- param_iter is an iterator that provides the parameter for each
- function call. The parameter is passed as the single argument.
- The results of calling the function are appended to a list, which
- is returned once all functions have returned. If any function
- raises an error, the error is re-raised in the caller.
- """
- dir = tempfile.mkdtemp()
- try:
- waiting = set() # set of child process IDs
- for param in param_iter:
- pid = os.fork()
- if pid == 0:
- # child
- try:
- logging.basicConfig()
- if debug:
- logging.getLogger().setLevel(logging.DEBUG)
-
- fn = os.path.join(dir, str(os.getpid()))
- try:
- res = 1, func(param)
- except:
- traceback.print_exc()
- res = 0, sys.exc_info()[:2]
- f = open(fn, 'wb')
- try:
- cPickle.dump(res, f)
- finally:
- f.close()
- finally:
- os._exit(0)
- else:
- # parent
- waiting.add(pid)
- results = []
- try:
- while waiting:
- for pid in list(waiting):
- pid_again, code = os.waitpid(pid, os.WNOHANG)
- if not pid_again:
- continue
- waiting.remove(pid)
- if code:
- raise ChildProcessError(
- "A process failed with exit code %d" % code)
- else:
- fn = os.path.join(dir, str(pid))
- f = open(fn, 'rb')
- try:
- ok, value = cPickle.load(f)
- if ok:
- results.append(value)
- else:
- raise ChildProcessError(
- "a child process raised an error: "
- "%s: %s" % tuple(value))
- finally:
- f.close()
- time.sleep(0.1)
- return results
- finally:
- # kill the remaining processes
- for pid in waiting:
- try:
- os.kill(pid, signal.SIGTERM)
- except OSError:
- pass
- finally:
- shutil.rmtree(dir)
-
-
-def main():
- zeo_runner = ZEOServerRunner()
- zeo_runner.start()
-
- # results: {(objects_per_txn, concurrency, contender, direction): [time]}}
- results = {}
- for objects_per_txn in object_counts:
- for concurrency in concurrency_levels:
- for contender_name, method_name in contenders:
- for direction in (0, 1):
- key = (objects_per_txn, concurrency,
- contender_name, direction)
- results[key] = []
-
- try:
- for objects_per_txn in object_counts:
- for concurrency in concurrency_levels:
- test = SpeedTest(concurrency, objects_per_txn, zeo_runner)
- for contender_name, method_name in contenders:
- print >> sys.stderr, (
- 'Testing %s with objects_per_txn=%d and concurrency=%d'
- % (contender_name, objects_per_txn, concurrency))
- method = getattr(test, method_name)
- key = (objects_per_txn, concurrency, contender_name)
-
- for rep in range(repetitions):
- for attempt in range(max_attempts):
- msg = ' Running %d/%d...' % (rep + 1, repetitions)
- if attempt > 0:
- msg += ' (attempt %d)' % (attempt + 1)
- print >> sys.stderr, msg,
- try:
- w, r = method()
- except ChildProcessError:
- if attempt >= max_attempts - 1:
- raise
- else:
- break
- msg = 'write %5.3fs, read %5.3fs' % (w, r)
- print >> sys.stderr, msg
- results[key + (0,)].append(w)
- results[key + (1,)].append(r)
-
- # The finally clause causes test results to print even if the tests
- # stop early.
- finally:
- zeo_runner.stop()
-
- # show the results in CSV format
- print >> sys.stderr, (
- 'Average time per transaction in seconds. Best of 3.')
-
- for objects_per_txn in object_counts:
- print '** Results with objects_per_txn=%d **' % objects_per_txn
-
- line = ['"Concurrency"']
- for contender_name, func in contenders:
- for direction in (0, 1):
- dir_text = ['write', 'read'][direction]
- line.append('%s - %s' % (contender_name, dir_text))
- print ', '.join(line)
-
- for concurrency in concurrency_levels:
- line = [str(concurrency)]
-
- for contender_name, method_name in contenders:
- for direction in (0, 1):
- key = (objects_per_txn, concurrency,
- contender_name, direction)
- lst = results[key]
- if lst:
- t = min(lst)
- line.append('%5.3f' % t)
- else:
- line.append('?')
-
- print ', '.join(line)
- print
-
-
-if __name__ == '__main__':
- main()
Copied: relstorage/tags/1.0-beta1/relstorage/tests/speedtest.py (from rev 84069, relstorage/trunk/relstorage/tests/speedtest.py)
===================================================================
--- relstorage/tags/1.0-beta1/relstorage/tests/speedtest.py (rev 0)
+++ relstorage/tags/1.0-beta1/relstorage/tests/speedtest.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,378 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Compare the speed of RelStorage with FileStorage + ZEO.
+
+Splits into many processes to avoid contention over the global
+interpreter lock.
+"""
+
+import cPickle
+import logging
+import os
+import shutil
+import signal
+import sys
+import tempfile
+import time
+import traceback
+
+import transaction
+from BTrees.IOBTree import IOBTree
+from ZODB.DB import DB
+from ZODB.Connection import Connection
+
+from relstorage.relstorage import RelStorage
+
+debug = False
+txn_count = 10
+object_counts = [10000] # [1, 100, 10000]
+concurrency_levels = range(1, 16, 2)
+contenders = [
+ ('ZEO + FileStorage', 'zeofs_test'),
+ ('PostgreSQLAdapter', 'postgres_test'),
+ ('MySQLAdapter', 'mysql_test'),
+ ('OracleAdapter', 'oracle_test'),
+ ]
+repetitions = 3
+max_attempts = 20
+
+
+class ChildProcessError(Exception):
+ """A child process failed"""
+
+
+def run_in_child(wait, func, *args, **kw):
+ pid = os.fork()
+ if pid == 0:
+ # child
+ try:
+ try:
+ logging.basicConfig()
+ if debug:
+ logging.getLogger().setLevel(logging.DEBUG)
+
+ func(*args, **kw)
+ except:
+ traceback.print_exc()
+ os._exit(1)
+ finally:
+ os._exit(0)
+ elif wait:
+ pid_again, code = os.waitpid(pid, 0)
+ if code:
+ raise ChildProcessError(
+ "process running %r failed with exit code %d" % (func, code))
+ return pid
+
+
+class ZEOServerRunner(object):
+
+ def __init__(self):
+ self.dir = tempfile.mkdtemp()
+ self.store_fn = os.path.join(self.dir, 'storage')
+ self.sock_fn = os.path.join(self.dir, 'sock')
+ self.pid = None
+
+ def run(self):
+ from ZODB.FileStorage import FileStorage
+ from ZEO.StorageServer import StorageServer
+
+ fs = FileStorage(self.store_fn, create=True)
+ ss = StorageServer(self.sock_fn, {'1': fs})
+
+ import ThreadedAsync.LoopCallback
+ ThreadedAsync.LoopCallback.loop()
+
+ def start(self):
+ self.pid = run_in_child(False, self.run)
+ # parent
+ sys.stderr.write('Waiting for ZEO server to start...')
+ while not os.path.exists(self.sock_fn):
+ sys.stderr.write('.')
+ sys.stderr.flush()
+ time.sleep(0.1)
+ sys.stderr.write(' started.\n')
+ sys.stderr.flush()
+
+ def stop(self):
+ os.kill(self.pid, signal.SIGTERM)
+ shutil.rmtree(self.dir)
+
+
+class SpeedTest:
+
+ def __init__(self, concurrency, objects_per_txn, zeo_runner):
+ self.concurrency = concurrency
+ self.data_to_store = dict((n, 1) for n in range(objects_per_txn))
+ self.zeo_runner = zeo_runner
+
+ def populate(self, make_storage):
+ # initialize the database
+ storage = make_storage()
+ db = DB(storage)
+ conn = db.open()
+ root = conn.root()
+
+ # clear the database
+ root['speedtest'] = None
+ transaction.commit()
+ db.pack()
+
+ # put a tree in the database
+ root['speedtest'] = t = IOBTree()
+ for i in range(self.concurrency):
+ t[i] = IOBTree()
+ transaction.commit()
+ conn.close()
+ db.close()
+ if debug:
+ print >> sys.stderr, 'Populated storage.'
+
+ def write_test(self, storage, n):
+ db = DB(storage)
+ start = time.time()
+ for i in range(txn_count):
+ conn = db.open()
+ root = conn.root()
+ myobj = root['speedtest'][n]
+ myobj[i] = IOBTree(self.data_to_store)
+ transaction.commit()
+ conn.close()
+ end = time.time()
+ db.close()
+ return end - start
+
+ def read_test(self, storage, n):
+ db = DB(storage)
+ start = time.time()
+ for i in range(txn_count):
+ conn = db.open()
+ root = conn.root()
+ got = len(list(root['speedtest'][n][i]))
+ expect = len(self.data_to_store)
+ if got != expect:
+ raise AssertionError
+ conn.close()
+ end = time.time()
+ db.close()
+ return end - start
+
+ def run_tests(self, make_storage):
+ """Run a write and read test.
+
+ Returns the mean time per write transaction and
+ the mean time per read transaction.
+ """
+ run_in_child(True, self.populate, make_storage)
+ r = range(self.concurrency)
+
+ def write(n):
+ return self.write_test(make_storage(), n)
+ def read(n):
+ return self.read_test(make_storage(), n)
+
+ write_times = distribute(write, r)
+ read_times = distribute(read, r)
+ count = float(self.concurrency * txn_count)
+ return (sum(write_times) / count, sum(read_times) / count)
+
+ def zeofs_test(self):
+ def make_storage():
+ from ZEO.ClientStorage import ClientStorage
+ return ClientStorage(self.zeo_runner.sock_fn)
+ return self.run_tests(make_storage)
+
+ def postgres_test(self):
+ from relstorage.adapters.postgresql import PostgreSQLAdapter
+ adapter = PostgreSQLAdapter('dbname=relstoragetest')
+ adapter.zap_all()
+ def make_storage():
+ return RelStorage(adapter)
+ return self.run_tests(make_storage)
+
+ def oracle_test(self):
+ from relstorage.adapters.oracle import OracleAdapter
+ from relstorage.tests.testoracle import getOracleParams
+ user, password, dsn = getOracleParams()
+ adapter = OracleAdapter(user, password, dsn)
+ adapter.zap_all()
+ def make_storage():
+ return RelStorage(adapter)
+ return self.run_tests(make_storage)
+
+ def mysql_test(self):
+ from relstorage.adapters.mysql import MySQLAdapter
+ adapter = MySQLAdapter(db='relstoragetest')
+ adapter.zap_all()
+ def make_storage():
+ return RelStorage(adapter)
+ return self.run_tests(make_storage)
+
+
+def distribute(func, param_iter):
+ """Call a function in separate processes concurrently.
+
+ param_iter is an iterator that provides the parameter for each
+ function call. The parameter is passed as the single argument.
+ The results of calling the function are appended to a list, which
+ is returned once all functions have returned. If any function
+ raises an error, the error is re-raised in the caller.
+ """
+ dir = tempfile.mkdtemp()
+ try:
+ waiting = set() # set of child process IDs
+ for param in param_iter:
+ pid = os.fork()
+ if pid == 0:
+ # child
+ try:
+ logging.basicConfig()
+ if debug:
+ logging.getLogger().setLevel(logging.DEBUG)
+
+ fn = os.path.join(dir, str(os.getpid()))
+ try:
+ res = 1, func(param)
+ except:
+ traceback.print_exc()
+ res = 0, sys.exc_info()[:2]
+ f = open(fn, 'wb')
+ try:
+ cPickle.dump(res, f)
+ finally:
+ f.close()
+ finally:
+ os._exit(0)
+ else:
+ # parent
+ waiting.add(pid)
+ results = []
+ try:
+ while waiting:
+ for pid in list(waiting):
+ pid_again, code = os.waitpid(pid, os.WNOHANG)
+ if not pid_again:
+ continue
+ waiting.remove(pid)
+ if code:
+ raise ChildProcessError(
+ "A process failed with exit code %d" % code)
+ else:
+ fn = os.path.join(dir, str(pid))
+ f = open(fn, 'rb')
+ try:
+ ok, value = cPickle.load(f)
+ if ok:
+ results.append(value)
+ else:
+ raise ChildProcessError(
+ "a child process raised an error: "
+ "%s: %s" % tuple(value))
+ finally:
+ f.close()
+ time.sleep(0.1)
+ return results
+ finally:
+ # kill the remaining processes
+ for pid in waiting:
+ try:
+ os.kill(pid, signal.SIGTERM)
+ except OSError:
+ pass
+ finally:
+ shutil.rmtree(dir)
+
+
+def main():
+ zeo_runner = ZEOServerRunner()
+ zeo_runner.start()
+
+ # results: {(objects_per_txn, concurrency, contender, direction): [time]}}
+ results = {}
+ for objects_per_txn in object_counts:
+ for concurrency in concurrency_levels:
+ for contender_name, method_name in contenders:
+ for direction in (0, 1):
+ key = (objects_per_txn, concurrency,
+ contender_name, direction)
+ results[key] = []
+
+ try:
+ for objects_per_txn in object_counts:
+ for concurrency in concurrency_levels:
+ test = SpeedTest(concurrency, objects_per_txn, zeo_runner)
+ for contender_name, method_name in contenders:
+ print >> sys.stderr, (
+ 'Testing %s with objects_per_txn=%d and concurrency=%d'
+ % (contender_name, objects_per_txn, concurrency))
+ method = getattr(test, method_name)
+ key = (objects_per_txn, concurrency, contender_name)
+
+ for rep in range(repetitions):
+ for attempt in range(max_attempts):
+ msg = ' Running %d/%d...' % (rep + 1, repetitions)
+ if attempt > 0:
+ msg += ' (attempt %d)' % (attempt + 1)
+ print >> sys.stderr, msg,
+ try:
+ w, r = method()
+ except ChildProcessError:
+ if attempt >= max_attempts - 1:
+ raise
+ else:
+ break
+ msg = 'write %5.3fs, read %5.3fs' % (w, r)
+ print >> sys.stderr, msg
+ results[key + (0,)].append(w)
+ results[key + (1,)].append(r)
+
+ # The finally clause causes test results to print even if the tests
+ # stop early.
+ finally:
+ zeo_runner.stop()
+
+ # show the results in CSV format
+ print >> sys.stderr, (
+ 'Average time per transaction in seconds. Best of 3.')
+
+ for objects_per_txn in object_counts:
+ print '** Results with objects_per_txn=%d **' % objects_per_txn
+
+ line = ['"Concurrency"']
+ for contender_name, func in contenders:
+ for direction in (0, 1):
+ dir_text = ['write', 'read'][direction]
+ line.append('%s - %s' % (contender_name, dir_text))
+ print ', '.join(line)
+
+ for concurrency in concurrency_levels:
+ line = [str(concurrency)]
+
+ for contender_name, method_name in contenders:
+ for direction in (0, 1):
+ key = (objects_per_txn, concurrency,
+ contender_name, direction)
+ lst = results[key]
+ if lst:
+ t = min(lst)
+ line.append('%5.3f' % t)
+ else:
+ line.append('?')
+
+ print ', '.join(line)
+ print
+
+
+if __name__ == '__main__':
+ main()
Copied: relstorage/tags/1.0-beta1/setup.py (from rev 84130, relstorage/trunk/setup.py)
===================================================================
--- relstorage/tags/1.0-beta1/setup.py (rev 0)
+++ relstorage/tags/1.0-beta1/setup.py 2008-02-22 09:40:07 UTC (rev 84133)
@@ -0,0 +1,63 @@
+##############################################################################
+#
+# Copyright (c) 2008 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""A backend for ZODB that stores pickles in a relational database.
+
+This is designed to be a drop-in replacement for the standard ZODB
+combination of FileStorage and ZEO. Multiple ZODB clients can
+share the same database without any additional configuration.
+Supports undo, historical database views, packing, and lossless
+migration between FileStorage and RelStorage instances.
+
+The supported relational databases are PostgreSQL 8.1 and above
+(using the psycopg2 Python module), MySQL 5.0 and above (using the
+MySQLdb 1.2.2 Python module), and Oracle 10g (using cx_Oracle 4.3).
+
+A small patch to ZODB is required. See the patch files distributed
+with RelStorage.
+"""
+
+VERSION = "1.0-beta1"
+
+classifiers = """\
+Development Status :: 4 - Beta
+Intended Audience :: Developers
+License :: OSI Approved :: Zope Public License
+Programming Language :: Python
+Topic :: Database
+Topic :: Software Development :: Libraries :: Python Modules
+Operating System :: Microsoft :: Windows
+Operating System :: Unix
+"""
+
+
+from distutils.core import setup
+
+doclines = __doc__.split("\n")
+
+setup(
+ name="RelStorage",
+ version=VERSION,
+ maintainer="Shane Hathaway",
+ maintainer_email="shane at hathawaymix.org",
+ url="http://wiki.zope.org/ZODB/RelStorage",
+ packages=['relstorage', 'relstorage.adapters', 'relstorage.tests'],
+ package_data={
+ 'relstorage': ['component.xml'],
+ },
+ license="ZPL 2.1",
+ platforms=["any"],
+ description=doclines[0],
+ classifiers=filter(None, classifiers.split("\n")),
+ long_description = "\n".join(doclines[2:]),
+ )
More information about the Checkins
mailing list