[Checkins] SVN: zope.sqlalchemy/branches/elro-retry-support/ retry support working under postgres, still some oracle problems
Laurence Rowe
l at lrowe.co.uk
Fri Jul 23 14:42:39 EDT 2010
Log message for revision 114967:
retry support working under postgres, still some oracle problems
Changed:
A zope.sqlalchemy/branches/elro-retry-support/oracle.cfg
U zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/datamanager.py
U zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/tests.py
-=-
Added: zope.sqlalchemy/branches/elro-retry-support/oracle.cfg
===================================================================
--- zope.sqlalchemy/branches/elro-retry-support/oracle.cfg (rev 0)
+++ zope.sqlalchemy/branches/elro-retry-support/oracle.cfg 2010-07-23 18:42:39 UTC (rev 114967)
@@ -0,0 +1,24 @@
+[buildout]
+extends = buildout.cfg
+parts += python-oracle cx_Oracle test
+python = python-oracle
+allow-hosts += *.sourceforge.net
+
+[python-oracle]
+recipe = gocept.cxoracle
+instant-client = ${buildout:directory}/instantclient-basiclite-10.2.0.4.0-macosx-x64.zip
+instant-sdk = instantclient-sdk-10.2.0.4.0-macosx-x64.zip
+
+[cx_Oracle]
+recipe = zc.recipe.egg:custom
+egg = cx_Oracle
+
+[test]
+eggs += cx_Oracle
+environment = testenv
+
+[scripts]
+eggs += cx_Oracle
+
+[testenv]
+TEST_DSN = oracle://system:oracle@192.168.56.101/orcl
Modified: zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/datamanager.py
===================================================================
--- zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/datamanager.py 2010-07-23 18:41:59 UTC (rev 114966)
+++ zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/datamanager.py 2010-07-23 18:42:39 UTC (rev 114967)
@@ -16,9 +16,27 @@
from zope.interface import implements
from transaction.interfaces import ISavepointDataManager, IDataManagerSavepoint
from transaction._transaction import Status as ZopeStatus
+from sqlalchemy.orm.exc import ConcurrentModificationError
+from sqlalchemy.exc import DBAPIError
from sqlalchemy.orm.session import SessionExtension
from sqlalchemy.engine.base import Engine
+_retryable_errors = []
+try:
+ import psycopg2.extensions
+except ImportError:
+ pass
+else:
+ _retryable_errors.append((psycopg2.extensions.TransactionRollbackError, None))
+
+# ORA-08177: can't serialize access for this transaction
+try:
+ import cx_Oracle
+except ImportError:
+ pass
+else:
+ _retryable_errors.append((cx_Oracle.DatabaseError, lambda e: e.args[0].code==8177))
+
# The status of the session is stored on the connection info
STATUS_ACTIVE = 'active' # session joined to transaction, writes allowed.
STATUS_CHANGED = 'changed' # data has been written
@@ -110,6 +128,18 @@
def _savepoint(self):
return SessionSavepoint(self.session)
+
+ def should_retry(self, error):
+ if isinstance(error, ConcurrentModificationError):
+ return True
+ if isinstance(error, DBAPIError):
+ orig = error.orig
+ for error_type, test in _retryable_errors:
+ if isinstance(orig, error_type):
+ if test is None:
+ return True
+ if test(orig):
+ return True
class TwoPhaseSessionDataManager(SessionDataManager):
Modified: zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/tests.py
===================================================================
--- zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/tests.py 2010-07-23 18:41:59 UTC (rev 114966)
+++ zope.sqlalchemy/branches/elro-retry-support/src/zope/sqlalchemy/tests.py 2010-07-23 18:42:39 UTC (rev 114967)
@@ -29,8 +29,9 @@
import unittest
import transaction
import threading
+import time
import sqlalchemy as sa
-from sqlalchemy import orm, sql
+from sqlalchemy import orm, sql, exc
from zope.sqlalchemy import datamanager as tx
from zope.sqlalchemy import mark_changed
@@ -85,6 +86,7 @@
class TestTwo(SimpleModel): pass
def setup_mappers():
+ orm.clear_mappers()
# Other tests can clear mappers by calling clear_mappers(),
# be more robust by setting up mappers in the test setup.
m1 = orm.mapper(User, test_users,
@@ -152,8 +154,7 @@
def tearDown(self):
transaction.abort()
metadata.drop_all(engine)
- for m in self.mappers:
- m.dispose()
+ orm.clear_mappers()
def testAbortBeforeCommit(self):
# Simulate what happens in a conflict error
@@ -172,7 +173,7 @@
transaction.begin()
session = Session()
conn = session.connection()
- conn.execute("SELECT 1")
+ conn.execute("SELECT 1 FROM test_users")
mark_changed(session)
transaction.commit()
@@ -198,7 +199,7 @@
transaction.begin()
session = Session()
conn = session.connection()
- conn.execute("SELECT 1")
+ conn.execute("SELECT 1 FROM test_users")
mark_changed(session)
transaction.commit()
@@ -461,7 +462,83 @@
results = engine.connect().execute(test_users.select(test_users.c.lastname=="smith"))
self.assertEqual(len(results.fetchall()), 2)
+class RetryTests(unittest.TestCase):
+
+ def setUp(self):
+ self.mappers = setup_mappers()
+ metadata.drop_all(engine)
+ metadata.create_all(engine)
+ self.tm1 = transaction.TransactionManager()
+ self.tm2 = transaction.TransactionManager()
+ e1 = sa.create_engine(TEST_DSN, isolation_level='SERIALIZABLE', echo=True)
+ e2 = sa.create_engine(TEST_DSN, isolation_level='SERIALIZABLE', echo=True)
+ self.s1 = orm.sessionmaker(
+ bind=e1,
+ extension=tx.ZopeTransactionExtension(transaction_manager=self.tm1),
+ twophase=TEST_TWOPHASE,
+ )()
+ self.s2 = orm.sessionmaker(
+ bind=e2,
+ extension=tx.ZopeTransactionExtension(transaction_manager=self.tm2),
+ twophase=TEST_TWOPHASE,
+ )()
+ self.tm1.begin()
+ self.s1.add(User(id=1, firstname='udo', lastname='juergens'))
+ self.tm1.commit()
+
+ def tearDown(self):
+ self.tm1.abort()
+ self.tm2.abort()
+ metadata.drop_all(engine)
+ orm.clear_mappers()
+
+ def testRetry(self):
+ tm1, tm2, s1, s2 = self.tm1, self.tm2, self.s1, self.s2
+ # make sure we actually start a session.
+ tm1.begin()
+ self.failUnless(len(s1.query(User).all())==1, "Users table should have one row")
+ tm2.begin()
+ self.failUnless(len(s2.query(User).all())==1, "Users table should have one row")
+ s1.query(User).delete()
+ user = s2.query(User).get(1)
+ user.lastname = u'smith'
+ tm1.commit()
+ raised = False
+ try:
+ s2.flush()
+ except orm.exc.ConcurrentModificationError, e:
+ # This error is thrown when the number of updated rows is not as expected
+ raised = True
+ self.failUnless(raised, "Did not raise expected error")
+ self.failUnless(tm2._retryable(type(e), e), "Error should be retryable")
+
+ def testRetryThread(self):
+ tm1, tm2, s1, s2 = self.tm1, self.tm2, self.s1, self.s2
+ # make sure we actually start a session.
+ tm1.begin()
+ self.failUnless(len(s1.query(User).all())==1, "Users table should have one row")
+ tm2.begin()
+ self.failUnless(len(s2.query(User).all())==1, "Users table should have one row")
+ s1.query(User).delete()
+ raised = False
+
+ def target():
+ time.sleep(0.2)
+ tm1.commit()
+
+ thread = threading.Thread(target=target)
+ thread.start()
+ try:
+ user = s2.query(User).with_lockmode('update').get(1)
+ except exc.DBAPIError, e:
+ # This error wraps the underlying DBAPI module error, some of which are retryable
+ raised = True
+ self.failUnless(raised, "Did not raise expected error")
+ self.failUnless(tm2._retryable(type(e), e), "Error should be retryable")
+ thread.join() # well, we must have joined by now
+
+
class MultipleEngineTests(unittest.TestCase):
def setUp(self):
@@ -475,8 +552,7 @@
transaction.abort()
bound_metadata1.drop_all()
bound_metadata2.drop_all()
- for m in self.mappers:
- m.dispose()
+ orm.clear_mappers()
def testTwoEngines(self):
session = UnboundSession()
@@ -500,7 +576,7 @@
import doctest
optionflags = doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS
suite = TestSuite()
- for cls in (ZopeSQLAlchemyTests, MultipleEngineTests):
+ for cls in (ZopeSQLAlchemyTests, MultipleEngineTests, RetryTests):
suite.addTest(makeSuite(cls))
suite.addTest(doctest.DocFileSuite('README.txt', optionflags=optionflags, tearDown=tearDownReadMe,
globs={'TEST_DSN': TEST_DSN, 'TEST_TWOPHASE': TEST_TWOPHASE}))
More information about the checkins
mailing list