[Zope-Checkins] CVS: ZODB3/ZEO - StorageServer.py:1.90

Guido van Rossum guido@python.org
Mon, 20 Jan 2003 14:39:01 -0500


Update of /cvs-repository/ZODB3/ZEO
In directory cvs.zope.org:/tmp/cvs-serv5070

Modified Files:
	StorageServer.py 
Log Message:
More cleanup and refactoring:

- Get rid of _lock() and _unlock(); _lock() is inlined, and so is
  _unlock(), after factoring out some common code at the end of
  tpc_begin() and tpc_end().

- In the refactored _unlock() code, only call self.timeout.end() if
  self.locked was actually set.

In the TimeoutThread class:

- Add some comments.

- Add some assertions.

- Get rid of the stop() method; we're a daemon thread so we'll be
  killed anyway; close_server() is only used from the test suite.

- Switch from using a lock + an event to a condition variable.  Be
  religious about doing stuff only while holding the lock.

- Inline the timeout() function; it shouldn't reacquire the lock
  anyway.

--Guido & Jeremy


=== ZODB3/ZEO/StorageServer.py 1.89 => 1.90 ===
--- ZODB3/ZEO/StorageServer.py:1.89	Thu Jan  9 16:50:18 2003
+++ ZODB3/ZEO/StorageServer.py	Mon Jan 20 14:38:59 2003
@@ -157,18 +157,6 @@
                 return 0
         return 1
 
-    # _lock() and _unlock() control the locked flag
-
-    def _lock(self):
-        self.locked = 1
-        self.timeout.begin(self)
-        self.stats.lock_time = time.time()
-
-    def _unlock(self):
-        self.locked = 0
-        self.timeout.end(self)
-        self.stats.lock_time = None
-
     def register(self, storage_id, read_only):
         """Select the storage that this client will use
 
@@ -360,10 +348,8 @@
         if self.invalidated:
             self.server.invalidate(self, self.storage_id, tid,
                                    self.invalidated, self.get_size_info())
-        self.transaction = None
-        self._unlock()
+        self._clear_transaction()
         # Return the tid, for cache invalidation optimization
-        self._handle_waiting()
         return tid
 
     def tpc_abort(self, id):
@@ -373,8 +359,17 @@
         self.stats.aborts += 1
         if self.locked:
             self.storage.tpc_abort(self.transaction)
+        self._clear_transaction()
+
+    def _clear_transaction(self):
+        # Common code at end of tpc_finish() and tpc_abort()
         self.transaction = None
-        self._unlock()
+        if self.locked:
+            self.locked = 0
+            self.timeout.end(self)
+            self.stats.lock_time = None
+        # _handle_waiting() can start another transaction (by
+        # restarting a waiting one) so must be done last
         self._handle_waiting()
 
     def _abort(self):
@@ -437,7 +432,9 @@
             return self._wait(lambda: self._transactionalUndo(trans_id))
 
     def _tpc_begin(self, txn, tid, status):
-        self._lock()
+        self.locked = 1
+        self.timeout.begin(self)
+        self.stats.lock_time = time.time()
         self.storage.tpc_begin(txn, tid, status)
 
     def _store(self, oid, serial, data, version):
@@ -753,8 +750,6 @@
 
         This is only called from the test suite, AFAICT.
         """
-        for timeout in self.timeouts.values():
-            timeout.stop()
         self.dispatcher.close()
         if self.monitor is not None:
             self.monitor.close()
@@ -786,81 +781,62 @@
     def end(self, client):
         pass
 
-    def stop(self):
-        pass
-
 class TimeoutThread(threading.Thread):
     """Monitors transaction progress and generates timeouts."""
 
+    # There is one TimeoutThread per storage, because there's one
+    # transaction lock per storage.
+
     def __init__(self, timeout):
         threading.Thread.__init__(self)
         self.setDaemon(1)
         self._timeout = timeout
         self._client = None
         self._deadline = None
-        self._stop = 0
-        self._active = threading.Event()
-        self._lock = threading.Lock()
+        self._cond = threading.Condition() # Protects _client and _deadline
         self._trigger = trigger()
 
-    def stop(self):
-        self._stop = 1
-
     def begin(self, client):
-        self._lock.acquire()
+        # Called from the restart code the "main" thread, whenever the
+        # storage lock is being acquired.  (Serialized by asyncore.)
+        self._cond.acquire()
         try:
-            self._active.set()
+            assert self._client is None
             self._client = client
             self._deadline = time.time() + self._timeout
+            self._cond.notify()
         finally:
-            self._lock.release()
+            self._cond.release()
 
     def end(self, client):
-        # The ZEOStorage will call this message for every aborted
-        # transaction, regardless of whether the transaction started
-        # the 2PC.  Ignore here if 2PC never began.
-        if client is not self._client:
-            return
-        self._lock.acquire()
+        # Called from the "main" thread whenever the storage lock is
+        # being released.  (Serialized by asyncore.)
+        self._cond.acquire()
         try:
-            self._active.clear()
+            assert self._client  is not None
             self._client = None
             self._deadline = None
         finally:
-            self._lock.release()
+            self._cond.release()
 
     def run(self):
-        while not self._stop:
-            self._active.wait()
-            self._lock.acquire()
+        # Code running in the thread.
+        while 1:
+            self._cond.acquire()
             try:
-                deadline = self._deadline
-                if deadline is None:
-                    continue
-                howlong = deadline - time.time()
+                while self._client is None:
+                    self._cond.wait()
+                howlong = self._deadline - time.time()
+                client = self._client # For the howlong <= 0 branch below
             finally:
-                self._lock.release()
+                self._cond.release()
             if howlong <= 0:
-                self.timeout()
+                client.log("Transaction timeout after %s seconds" %
+                           self._timeout)
+                self._trigger.pull_trigger(lambda: client.connection.close())
             else:
                 time.sleep(howlong)
         self.trigger.close()
-
-    def timeout(self):
-        self._lock.acquire()
-        try:
-            client = self._client
-            deadline = self._deadline
-            self._active.clear()
-            self._client = None
-            self._deadline = None
-        finally:
-            self._lock.release()
-        if client is None:
-            return
-        elapsed = time.time() - (deadline - self._timeout)
-        client.log("Transaction timeout after %d seconds" % int(elapsed))
-        self._trigger.pull_trigger(lambda: client.connection.close())
 
 def run_in_thread(method, *args):
     t = SlowMethodThread(method, args)