[Zodb-checkins] CVS: StandaloneZODB/ZEO - StorageServer.py:1.28.2.9 start.py:1.24.2.3 trigger.py:1.1.8.5

Jeremy Hylton jeremy@zope.com
Thu, 4 Apr 2002 17:01:08 -0500


Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv29777

Modified Files:
      Tag: zeo-1_0-branch
	StorageServer.py start.py trigger.py 
Log Message:
Backport fixes from the zeo-1_0-debug-branch.

Summary:

   - Fix the SIGHUP handler so that it doesn't cause asyncore to
     bail.  Fixes in two places.  StorageServer must be prepared for
     accept() to return None.  trigger module most be prepared to
     catch os.error on os.read().

   - Replace sys.stderr.write() with zLOG call when accept() fails.

   - Add log calls when clients are suspend and resumed on the
     distributed commit lock.


=== StandaloneZODB/ZEO/StorageServer.py 1.28.2.8 => 1.28.2.9 ===
 from ZODB.Transaction import Transaction
 import traceback
-from zLOG import LOG, INFO, ERROR, TRACE, BLATHER
+from zLOG import LOG, INFO, ERROR, TRACE, BLATHER, PROBLEM
 from ZODB.referencesf import referencesf
 from thread import start_new_thread
 from cStringIO import StringIO
@@ -126,9 +126,12 @@
     
     def handle_accept(self):
         try:
-            sock, addr = self.accept()
-        except socket.error:
-            sys.stderr.write('warning: accept failed\n')
+            x = self.accept()
+            if x is None:
+                return
+            sock, addr = x
+        except socket.error, err:
+            LOG('ZEO Server', PROBLEM, 'accept() failed: %s' % err)
         else:
             ZEOConnection(self, sock, addr)
 
@@ -457,11 +460,17 @@
 
     def commitlock_suspend(self, resume, args, onerror):
         self.__storage._waiting.append((resume, args, onerror))
+        LOG('ZEO Server', BLATHER,
+            "Client tpc_begin() suspended. "
+            "%d clients queued" % len(self.__storage._waiting))
 
     def commitlock_resume(self):
         waiting = self.__storage._waiting
         while waiting:
             resume, args, onerror = waiting.pop(0)
+            LOG('ZEO Server', BLATHER,
+                "Client tpc_begin() resumed. "
+                "%d clients still queued" % len(self.__storage._waiting))
             try:
                 if apply(resume, args):
                     break
@@ -515,17 +524,18 @@
         self.__invalidated=[]
 
     def tpc_begin_sync(self, id, user, description, ext):
-        if self.__closed: return
-        t=self._transaction
-        if t is not None and id == t.id: return
-        storage=self.__storage
+        if self.__closed:
+            return
+        t = self._transaction
+        if t is not None and id == t.id:
+            return
+        storage = self.__storage
         if storage._transaction is None:
             self.try_again_sync(id, user, description, ext)
         else:
             self.commitlock_suspend(self.try_again_sync,
                                     (id, user, description, ext),
                                     self.close)
-
         return _noreturn
         
     def try_again_sync(self, id, user, description, ext):
@@ -537,6 +547,7 @@
             t.description=description
             storage.tpc_begin(t)
             self.__invalidated=[]
+            LOG("ZEO Server", BLATHER, "try_again_sync return None")
             self.message_output('RN.')
             
         return 1
@@ -549,6 +560,8 @@
         storage = self.__storage
         r = storage.tpc_finish(t)
 
+        self.commitlock_resume()
+
         self._transaction = None
         if self.__invalidated:
             self.__server.invalidate(self, self.__storage_id,
@@ -556,7 +569,6 @@
                                      self.get_size_info())
             self.__invalidated = []
             
-        self.commitlock_resume()
 
 def init_storage(storage):
     if not hasattr(storage,'tpc_vote'): storage.tpc_vote=lambda *args: None


=== StandaloneZODB/ZEO/start.py 1.24.2.2 => 1.24.2.3 ===
                            )
 
-    opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
-
     fs=os.path.join(var, 'Data.fs')
 
     usage="""%s [options] [filename]
@@ -120,6 +118,13 @@
 
     if no file name is specified, then %s is used.
     """ % (me, fs)
+
+    try:
+        opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
+    except getopt.error, err:
+        print err
+        print usage
+        sys.exit(1)
 
     port=None
     debug=detailed=0


=== StandaloneZODB/ZEO/trigger.py 1.1.8.4 => 1.1.8.5 ===
 # from Sam Rushing's Medusa server.
 
-
 import asyncore
-#import asynchat
-
+import errno
 import os
 import socket
 import string
@@ -26,7 +24,7 @@
     
 if os.name == 'posix':
 
-    class trigger (asyncore.file_dispatcher):
+    class trigger(asyncore.file_dispatcher):
 
         "Wake up a call to select() running in the main thread"
 
@@ -58,10 +56,10 @@
         # new data onto a channel's outgoing data queue at the same time that
         # the main thread is trying to remove some]
 
-        def __init__ (self):
+        def __init__(self):
             r, w = self._fds = os.pipe()
             self.trigger = w
-            asyncore.file_dispatcher.__init__ (self, r)
+            asyncore.file_dispatcher.__init__(self, r)
             self.lock = thread.allocate_lock()
             self.thunks = []
 
@@ -69,30 +67,35 @@
             os.close(self._fds[0])
             os.close(self._fds[1])
 
-        def __repr__ (self):
-            return '<select-trigger (pipe) at %x>' % id(self)
+        def __repr__(self):
+            return '<select-trigger(pipe) at %x>' % id(self)
 
-        def readable (self):
+        def readable(self):
             return 1
 
-        def writable (self):
+        def writable(self):
             return 0
 
-        def handle_connect (self):
+        def handle_connect(self):
             pass
 
-        def pull_trigger (self, thunk=None):
+        def pull_trigger(self, thunk=None):
             # print 'PULL_TRIGGER: ', len(self.thunks)
             if thunk:
                 try:
                     self.lock.acquire()
-                    self.thunks.append (thunk)
+                    self.thunks.append(thunk)
                 finally:
                     self.lock.release()
-            os.write (self.trigger, 'x')
+            os.write(self.trigger, 'x')
 
-        def handle_read (self):
-            self.recv (8192)
+        def handle_read(self):
+            try:
+                self.recv(8192)
+            except os.error, err:
+                if err[0] == errno.EAGAIN: # resource temporarily unavailable
+                    return
+                raise
             try:
                 self.lock.acquire()
                 for thunk in self.thunks:
@@ -101,7 +104,7 @@
                     except:
                         nil, t, v, tbinfo = asyncore.compact_traceback()
                         print ('exception in trigger thunk:'
-                               ' (%s:%s %s)' % (t, v, tbinfo))
+                               '(%s:%s %s)' % (t, v, tbinfo))
                 self.thunks = []
             finally:
                 self.lock.release()
@@ -113,13 +116,13 @@
 
     # win32-safe version
 
-    class trigger (asyncore.dispatcher):
+    class trigger(asyncore.dispatcher):
 
         address = ('127.9.9.9', 19999)
 
-        def __init__ (self):
-            a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
-            w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+        def __init__(self):
+            a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 
             # set TCP_NODELAY to true to avoid buffering
             w.setsockopt(socket.IPPROTO_TCP, 1, 1)
@@ -137,45 +140,50 @@
                         raise 'Bind Error', 'Cannot bind trigger!'
                     port=port - 1
             
-            a.listen (1)
-            w.setblocking (0)
+            a.listen(1)
+            w.setblocking(0)
             try:
-                w.connect (self.address)
+                w.connect(self.address)
             except:
                 pass
             r, addr = a.accept()
             a.close()
-            w.setblocking (1)
+            w.setblocking(1)
             self.trigger = w
 
-            asyncore.dispatcher.__init__ (self, r)
+            asyncore.dispatcher.__init__(self, r)
             self.lock = thread.allocate_lock()
             self.thunks = []
             self._trigger_connected = 0
 
-        def __repr__ (self):
+        def __repr__(self):
             return '<select-trigger (loopback) at %x>' % id(self)
 
-        def readable (self):
+        def readable(self):
             return 1
 
-        def writable (self):
+        def writable(self):
             return 0
 
-        def handle_connect (self):
+        def handle_connect(self):
             pass
 
-        def pull_trigger (self, thunk=None):
+        def pull_trigger(self, thunk=None):
             if thunk:
                 try:
                     self.lock.acquire()
-                    self.thunks.append (thunk)
+                    self.thunks.append(thunk)
                 finally:
                     self.lock.release()
-            self.trigger.send ('x')
+            self.trigger.send('x')
 
-        def handle_read (self):
-            self.recv (8192)
+        def handle_read(self):
+            try:
+                self.recv(8192)
+            except os.error, err:
+                if err[0] == errno.EAGAIN: # resource temporarily unavailable
+                    return
+                raise
             try:
                 self.lock.acquire()
                 for thunk in self.thunks: