[Zope3-checkins] CVS: Zope3/src/transaction - txn.py:1.2.12.1 manager.py:1.2.12.1 interfaces.py:1.2.12.1 __init__.py:1.2.12.1
Jeremy Hylton
jeremy@zope.com
Sat, 1 Mar 2003 21:32:54 -0500
Update of /cvs-repository/Zope3/src/transaction
In directory cvs.zope.org:/tmp/cvs-serv18356/src/transaction
Modified Files:
Tag: jeremy-atomic-invalidation-branch
txn.py manager.py interfaces.py __init__.py
Log Message:
Add suspend() and resume() methods to the transaction interface.
There are no tests for these methods yet. (Looks like there are
hardly any tests of the txn api.)
=== Zope3/src/transaction/txn.py 1.2 => 1.2.12.1 ===
--- Zope3/src/transaction/txn.py:1.2 Wed Dec 25 09:12:14 2002
+++ Zope3/src/transaction/txn.py Sat Mar 1 21:32:52 2003
@@ -4,6 +4,7 @@
__metaclass__ = type
from transaction.interfaces import ITransaction, TransactionError
+from threading import Lock
class Set(dict):
@@ -19,6 +20,7 @@
COMMITTED = "Committed"
ABORTING = "Aborting"
ABORTED = "Aborted"
+ SUSPENDED = "Suspended"
class Transaction:
@@ -28,7 +30,9 @@
self._manager = manager
self._parent = parent
self._status = Status.ACTIVE
+ self._suspend = None
self._resources = Set()
+ self._lock = Lock()
def __repr__(self):
return "<%s %X %s>" % (self.__class__.__name__, id(self), self._status)
@@ -69,3 +73,25 @@
def status(self):
"""Return the status of the transaction."""
return self._status
+
+ def suspend(self):
+ self._lock.acquire()
+ try:
+ if self._status == Status.SUSPENDED:
+ raise TransactionError("Already suspended")
+ self._manager.suspend(self)
+ self._suspend = self._status
+ self._status = Status.SUSPENDED
+ finally:
+ self._lock.release()
+
+ def resume(self):
+ self._lock.acquire()
+ try:
+ if self._status != Status.SUSPENDED:
+ raise TransactionError("Can only resume suspended transaction")
+ self._manager.resume(self)
+ self._status = self._suspend
+ self._suspend = None
+ finally:
+ self._lock.release()
=== Zope3/src/transaction/manager.py 1.2 => 1.2.12.1 ===
--- Zope3/src/transaction/manager.py:1.2 Wed Dec 25 09:12:14 2002
+++ Zope3/src/transaction/manager.py Sat Mar 1 21:32:52 2003
@@ -1,26 +1,19 @@
import logging
-from transaction.interfaces import IRollback
-from transaction.txn import Transaction, Status
+from transaction.interfaces import *
+from transaction.txn import Transaction, Status, Set
# XXX need to change asserts of transaction status into explicit checks
# that raise some exception
# XXX need lots of error checking
-class TransactionManager(object):
-
- txn_factory = Transaction
-
- def __init__(self):
- self.logger = logging.getLogger("txn")
-
- def new(self):
- txn = self.txn_factory(self)
- self.logger.debug("%s: begin", txn)
- return txn
-
+class AbstractTransactionManager(object):
+ # base class to provide commit logic
+ # concrete class must provide logger attribute
+
def commit(self, txn):
+ # commit calls _finishCommit() or abort()
assert txn._status is Status.ACTIVE
txn._status = Status.PREPARING
prepare_ok = True
@@ -35,11 +28,11 @@
txn._status = Status.PREPARED
# XXX An error below is intolerable. What state to use?
if prepare_ok:
- self._commit(txn)
+ self._finishCommit(txn)
else:
self.abort(txn)
- def _commit(self, txn):
+ def _finishCommit(self, txn):
self.logger.debug("%s: commit", txn)
# finish the two-phase commit
for r in txn._resources:
@@ -58,6 +51,36 @@
self.logger.debug("%s: savepoint", txn)
return Rollback([r.savepoint(txn) for r in txn._resources])
+class TransactionManager(AbstractTransactionManager):
+
+ txn_factory = Transaction
+
+ __implements__ = ITransactionManager
+
+ def __init__(self):
+ self.logger = logging.getLogger("txn")
+ self._current = None
+
+ def get(self):
+ if self._current is None:
+ self._current = self.begin()
+ return self._current
+
+ def begin(self):
+ txn = self.txn_factory(self)
+ self.logger.debug("%s: begin", txn)
+ return txn
+
+ def commit(self, txn):
+ super(TransactionManager, self).commit(txn)
+ self._current = None
+
+ def abort(self, txn):
+ super(TransactionManager, self).abort(txn)
+ self._current = None
+
+ # XXX need suspend and resume
+
class Rollback(object):
__implements__ = IRollback
@@ -72,24 +95,46 @@
# make the transaction manager visible to client code
import thread
-class ThreadedTransactionManager(TransactionManager):
+class ThreadedTransactionManager(AbstractTransactionManager):
+
+ # XXX Do we need locking on _pool or _suspend?
+
+ # Most methods read and write pool based on the id of the current
+ # thread, so they should never interfere with each other.
+
+ # The suspend() and resume() methods modify the _suspend set,
+ # but suspend() only adds a new thread. The resume() method
+ # does need a lock to prevent two different threads from resuming
+ # the same transaction.
+
+ __implements__ = ITransactionManager
def __init__(self):
- TransactionManager.__init__(self)
+ self.logger = logging.getLogger("txn")
self._pool = {}
+ self._suspend = Set()
+ self._lock = thread.allocate_lock()
- def new(self):
+ def get(self):
tid = thread.get_ident()
txn = self._pool.get(tid)
if txn is None:
- txn = super(ThreadedTransactionManager, self).new()
- self._pool[tid] = txn
+ txn = self.begin()
+ return txn
+
+ def begin(self):
+ tid = thread.get_ident()
+ txn = self._pool.get(tid)
+ if txn is not None:
+ txn.abort()
+ txn = self.txn_factory(self)
+ self._pool[tid] = txn
return txn
- def _commit(self, txn):
+ def _finishCommit(self, txn):
tid = thread.get_ident()
assert self._pool[tid] is txn
- super(ThreadedTransactionManager, self)._commit(txn)
+ super(ThreadedTransactionManager, self)._finishCommit(txn)
del self._pool[tid]
def abort(self, txn):
@@ -97,3 +142,27 @@
assert self._pool[tid] is txn
super(ThreadedTransactionManager, self).abort(txn)
del self._pool[tid]
+
+ # XXX should we require that the transaction calling suspend()
+ # be the one that is using the transaction?
+
+ # XXX need to add locking to suspend() and resume()
+
+ def suspend(self, txn):
+ tid = thread.get_ident()
+ if self._pool[tid] is txn:
+ self._suspend.add(txn)
+ del self._pool[tid]
+ else:
+ raise TransactionError("txn %s not owned by thread %s" %
+ (txn, tid))
+
+ def resume(self, txn):
+ tid = thread.get_ident()
+ if self._pool.get(tid) is not None:
+ raise TransactionError("thread %s already has transaction" %
+ tid)
+ if txn not in self._suspend:
+ raise TransactionError("unknown transaction: %s" % txn)
+ del self._suspend[txn]
+ self._pool[tid] = txn
=== Zope3/src/transaction/interfaces.py 1.2 => 1.2.12.1 ===
--- Zope3/src/transaction/interfaces.py:1.2 Wed Dec 25 09:12:14 2002
+++ Zope3/src/transaction/interfaces.py Sat Mar 1 21:32:52 2003
@@ -75,3 +75,43 @@
def status():
"""Return status of the current transaction."""
+
+ def suspend():
+ """Suspend the current transaction.
+
+ If a transaction is suspended, the transaction manager no
+ longer treats it as active. The resume() method must be
+ called before the transaction can be used.
+ """
+
+ def resume():
+ """Resume the current transaction.
+
+ If another transaction is active, it must be suspended before
+ resume() is called.
+ """
+
+class ITransactionManager(Interface):
+ """Coordinates application use of transactional resources."""
+
+ def get():
+ """Return the curren transaction.
+
+ Calls new() to start a new transaction if one does not exist.
+ """
+
+ def begin():
+ """Return a new transaction.
+
+ If a transaction is currently active for the calling thread,
+ it is aborted.
+ """
+
+ def commit(txn):
+ """Commit txn."""
+
+ def abort(txn):
+ """Abort txn."""
+
+ def savepoint(txn):
+ """Return rollback object that can restore txn to current state."""
=== Zope3/src/transaction/__init__.py 1.2 => 1.2.12.1 ===
--- Zope3/src/transaction/__init__.py:1.2 Wed Dec 25 09:12:14 2002
+++ Zope3/src/transaction/__init__.py Sat Mar 1 21:32:52 2003
@@ -15,7 +15,7 @@
from transaction.manager import ThreadedTransactionManager
_manager = ThreadedTransactionManager()
-get_transaction = _manager.new
+get_transaction = _manager.get
def set_factory(factory):
_manager.txn_factory = factory