[Zodb-checkins] CVS: ZODB3/ZEO - TransactionBuffer.py:1.8

Jeremy Hylton jeremy@zope.com
Thu, 3 Oct 2002 12:57:56 -0400


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv24818

Modified Files:
	TransactionBuffer.py 
Log Message:
Add a lock to the TransactionBuffer to guard against a close while
it is in use by another thread.


=== ZODB3/ZEO/TransactionBuffer.py 1.7 => 1.8 ===
--- ZODB3/ZEO/TransactionBuffer.py:1.7	Thu Sep  5 22:45:07 2002
+++ ZODB3/ZEO/TransactionBuffer.py	Thu Oct  3 12:57:55 2002
@@ -21,8 +21,9 @@
 # A faster implementation might store trans data in memory until it
 # reaches a certain size.
 
-import tempfile
 import cPickle
+import tempfile
+from threading import Lock
 
 class TransactionBuffer:
 
@@ -32,8 +33,30 @@
     #
     # get_size can be called any time
 
+    # The TransactionBuffer is used by client storage to hold update
+    # data until the tpc_finish().  It is normally used by a single
+    # thread, because only one thread can be in the two-phase commit
+    # at one time.
+
+    # It is possible, however, for one thread to close the storage
+    # while another thread is in the two-phase commit.  We must use
+    # a lock to guard against this race, because unpredictable things
+    # can happen in Python if one thread closes a file that another
+    # thread is reading.  In a debug build, an assert() can fail.
+
+    # XXX If an operation is performed on a closed TransactionBuffer,
+    # it has no effect and does not raise an exception.  The only time
+    # this should occur is when a ClientStorage is closed in one
+    # thread while another thread is in its tpc_finish().  It's not
+    # clear what should happen in this case.  If the tpc_finish()
+    # completes without error, the Connection using it could have
+    # inconsistent data.  This should have minimal effect, though,
+    # because the Connection is connected to a closed storage.
+
     def __init__(self):
         self.file = tempfile.TemporaryFile(suffix=".tbuf")
+        self.lock = Lock()
+        self.closed = 0
         self.count = 0
         self.size = 0
         # It's safe to use a fast pickler because the only objects
@@ -42,14 +65,27 @@
         self.pickler.fast = 1
 
     def close(self):
+        self.lock.acquire()
         try:
-            self.file.close()
-        except OSError:
-            pass
-
+            self.closed = 1
+            try:
+                self.file.close()
+            except OSError:
+                pass
+        finally:
+            self.lock.release()
 
     def store(self, oid, version, data):
+        self.lock.acquire()
+        try:
+            self._store(oid, version, data)
+        finally:
+            self.lock.release()
+
+    def _store(self, oid, version, data):
         """Store oid, version, data for later retrieval"""
+        if self.closed:
+            return
         self.pickler.dump((oid, version, data))
         self.count += 1
         # Estimate per-record cache size
@@ -59,14 +95,26 @@
             self.size = self.size + len(version) + len(data) + 12
 
     def invalidate(self, oid, version):
-        self.pickler.dump((oid, version, None))
-        self.count += 1
+        self.lock.acquire()
+        try:
+            if self.closed:
+                return
+            self.pickler.dump((oid, version, None))
+            self.count += 1
+        finally:
+            self.lock.release()
 
     def clear(self):
         """Mark the buffer as empty"""
-        self.file.seek(0)
-        self.count = 0
-        self.size = 0
+        self.lock.acquire()
+        try:
+            if self.closed:
+                return
+            self.file.seek(0)
+            self.count = 0
+            self.size = 0
+        finally:
+            self.lock.release()
 
     # unchecked constraints:
     # 1. can't call store() after begin_iterate()
@@ -74,12 +122,27 @@
 
     def begin_iterate(self):
         """Move the file pointer in advance of iteration"""
-        self.file.flush()
-        self.file.seek(0)
-        self.unpickler = cPickle.Unpickler(self.file)
+        self.lock.acquire()
+        try:
+            if self.closed:
+                return
+            self.file.flush()
+            self.file.seek(0)
+            self.unpickler = cPickle.Unpickler(self.file)
+        finally:
+            self.lock.release()
 
     def next(self):
+        self.lock.acquire()
+        try:
+            return self._next()
+        finally:
+            self.lock.release()
+
+    def _next(self):
         """Return next tuple of data or None if EOF"""
+        if self.closed:
+            return None
         if self.count == 0:
             del self.unpickler
             return None