[Zope-Checkins] CVS: ZODB3/ZEO - TransactionBuffer.py:1.8
Jeremy Hylton
jeremy@zope.com
Thu, 3 Oct 2002 12:57:55 -0400
Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv24818
Modified Files:
TransactionBuffer.py
Log Message:
Add a lock to the TransactionBuffer to guard against a close while
it is in use by another thread.
=== ZODB3/ZEO/TransactionBuffer.py 1.7 => 1.8 ===
--- ZODB3/ZEO/TransactionBuffer.py:1.7 Thu Sep 5 22:45:07 2002
+++ ZODB3/ZEO/TransactionBuffer.py Thu Oct 3 12:57:55 2002
@@ -21,8 +21,9 @@
# A faster implementation might store trans data in memory until it
# reaches a certain size.
-import tempfile
import cPickle
+import tempfile
+from threading import Lock
class TransactionBuffer:
@@ -32,8 +33,30 @@
#
# get_size can be called any time
+ # The TransactionBuffer is used by client storage to hold update
+ # data until the tpc_finish(). It is normally used by a single
+ # thread, because only one thread can be in the two-phase commit
+ # at one time.
+
+ # It is possible, however, for one thread to close the storage
+ # while another thread is in the two-phase commit. We must use
+ # a lock to guard against this race, because unpredictable things
+ # can happen in Python if one thread closes a file that another
+ # thread is reading. In a debug build, an assert() can fail.
+
+ # XXX If an operation is performed on a closed TransactionBuffer,
+ # it has no effect and does not raise an exception. The only time
+ # this should occur is when a ClientStorage is closed in one
+ # thread while another thread is in its tpc_finish(). It's not
+ # clear what should happen in this case. If the tpc_finish()
+ # completes without error, the Connection using it could have
+ # inconsistent data. This should have minimal effect, though,
+ # because the Connection is connected to a closed storage.
+
def __init__(self):
self.file = tempfile.TemporaryFile(suffix=".tbuf")
+ self.lock = Lock()
+ self.closed = 0
self.count = 0
self.size = 0
# It's safe to use a fast pickler because the only objects
@@ -42,14 +65,27 @@
self.pickler.fast = 1
def close(self):
+ self.lock.acquire()
try:
- self.file.close()
- except OSError:
- pass
-
+ self.closed = 1
+ try:
+ self.file.close()
+ except OSError:
+ pass
+ finally:
+ self.lock.release()
def store(self, oid, version, data):
+ self.lock.acquire()
+ try:
+ self._store(oid, version, data)
+ finally:
+ self.lock.release()
+
+ def _store(self, oid, version, data):
"""Store oid, version, data for later retrieval"""
+ if self.closed:
+ return
self.pickler.dump((oid, version, data))
self.count += 1
# Estimate per-record cache size
@@ -59,14 +95,26 @@
self.size = self.size + len(version) + len(data) + 12
def invalidate(self, oid, version):
- self.pickler.dump((oid, version, None))
- self.count += 1
+ self.lock.acquire()
+ try:
+ if self.closed:
+ return
+ self.pickler.dump((oid, version, None))
+ self.count += 1
+ finally:
+ self.lock.release()
def clear(self):
"""Mark the buffer as empty"""
- self.file.seek(0)
- self.count = 0
- self.size = 0
+ self.lock.acquire()
+ try:
+ if self.closed:
+ return
+ self.file.seek(0)
+ self.count = 0
+ self.size = 0
+ finally:
+ self.lock.release()
# unchecked constraints:
# 1. can't call store() after begin_iterate()
@@ -74,12 +122,27 @@
def begin_iterate(self):
"""Move the file pointer in advance of iteration"""
- self.file.flush()
- self.file.seek(0)
- self.unpickler = cPickle.Unpickler(self.file)
+ self.lock.acquire()
+ try:
+ if self.closed:
+ return
+ self.file.flush()
+ self.file.seek(0)
+ self.unpickler = cPickle.Unpickler(self.file)
+ finally:
+ self.lock.release()
def next(self):
+ self.lock.acquire()
+ try:
+ return self._next()
+ finally:
+ self.lock.release()
+
+ def _next(self):
"""Return next tuple of data or None if EOF"""
+ if self.closed:
+ return None
if self.count == 0:
del self.unpickler
return None