[Zodb-checkins] CVS: StandaloneZODB/ZEO - StorageServer.py:1.28.2.9 start.py:1.24.2.3 trigger.py:1.1.8.5
Jeremy Hylton
jeremy@zope.com
Thu, 4 Apr 2002 17:01:08 -0500
Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv29777
Modified Files:
Tag: zeo-1_0-branch
StorageServer.py start.py trigger.py
Log Message:
Backport fixes from the zeo-1_0-debug-branch.
Summary:
- Fix the SIGHUP handler so that it doesn't cause asyncore to
bail. Fixes in two places. StorageServer must be prepared for
accept() to return None. trigger module most be prepared to
catch os.error on os.read().
- Replace sys.stderr.write() with zLOG call when accept() fails.
- Add log calls when clients are suspend and resumed on the
distributed commit lock.
=== StandaloneZODB/ZEO/StorageServer.py 1.28.2.8 => 1.28.2.9 ===
from ZODB.Transaction import Transaction
import traceback
-from zLOG import LOG, INFO, ERROR, TRACE, BLATHER
+from zLOG import LOG, INFO, ERROR, TRACE, BLATHER, PROBLEM
from ZODB.referencesf import referencesf
from thread import start_new_thread
from cStringIO import StringIO
@@ -126,9 +126,12 @@
def handle_accept(self):
try:
- sock, addr = self.accept()
- except socket.error:
- sys.stderr.write('warning: accept failed\n')
+ x = self.accept()
+ if x is None:
+ return
+ sock, addr = x
+ except socket.error, err:
+ LOG('ZEO Server', PROBLEM, 'accept() failed: %s' % err)
else:
ZEOConnection(self, sock, addr)
@@ -457,11 +460,17 @@
def commitlock_suspend(self, resume, args, onerror):
self.__storage._waiting.append((resume, args, onerror))
+ LOG('ZEO Server', BLATHER,
+ "Client tpc_begin() suspended. "
+ "%d clients queued" % len(self.__storage._waiting))
def commitlock_resume(self):
waiting = self.__storage._waiting
while waiting:
resume, args, onerror = waiting.pop(0)
+ LOG('ZEO Server', BLATHER,
+ "Client tpc_begin() resumed. "
+ "%d clients still queued" % len(self.__storage._waiting))
try:
if apply(resume, args):
break
@@ -515,17 +524,18 @@
self.__invalidated=[]
def tpc_begin_sync(self, id, user, description, ext):
- if self.__closed: return
- t=self._transaction
- if t is not None and id == t.id: return
- storage=self.__storage
+ if self.__closed:
+ return
+ t = self._transaction
+ if t is not None and id == t.id:
+ return
+ storage = self.__storage
if storage._transaction is None:
self.try_again_sync(id, user, description, ext)
else:
self.commitlock_suspend(self.try_again_sync,
(id, user, description, ext),
self.close)
-
return _noreturn
def try_again_sync(self, id, user, description, ext):
@@ -537,6 +547,7 @@
t.description=description
storage.tpc_begin(t)
self.__invalidated=[]
+ LOG("ZEO Server", BLATHER, "try_again_sync return None")
self.message_output('RN.')
return 1
@@ -549,6 +560,8 @@
storage = self.__storage
r = storage.tpc_finish(t)
+ self.commitlock_resume()
+
self._transaction = None
if self.__invalidated:
self.__server.invalidate(self, self.__storage_id,
@@ -556,7 +569,6 @@
self.get_size_info())
self.__invalidated = []
- self.commitlock_resume()
def init_storage(storage):
if not hasattr(storage,'tpc_vote'): storage.tpc_vote=lambda *args: None
=== StandaloneZODB/ZEO/start.py 1.24.2.2 => 1.24.2.3 ===
)
- opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
-
fs=os.path.join(var, 'Data.fs')
usage="""%s [options] [filename]
@@ -120,6 +118,13 @@
if no file name is specified, then %s is used.
""" % (me, fs)
+
+ try:
+ opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
+ except getopt.error, err:
+ print err
+ print usage
+ sys.exit(1)
port=None
debug=detailed=0
=== StandaloneZODB/ZEO/trigger.py 1.1.8.4 => 1.1.8.5 ===
# from Sam Rushing's Medusa server.
-
import asyncore
-#import asynchat
-
+import errno
import os
import socket
import string
@@ -26,7 +24,7 @@
if os.name == 'posix':
- class trigger (asyncore.file_dispatcher):
+ class trigger(asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread"
@@ -58,10 +56,10 @@
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
- def __init__ (self):
+ def __init__(self):
r, w = self._fds = os.pipe()
self.trigger = w
- asyncore.file_dispatcher.__init__ (self, r)
+ asyncore.file_dispatcher.__init__(self, r)
self.lock = thread.allocate_lock()
self.thunks = []
@@ -69,30 +67,35 @@
os.close(self._fds[0])
os.close(self._fds[1])
- def __repr__ (self):
- return '<select-trigger (pipe) at %x>' % id(self)
+ def __repr__(self):
+ return '<select-trigger(pipe) at %x>' % id(self)
- def readable (self):
+ def readable(self):
return 1
- def writable (self):
+ def writable(self):
return 0
- def handle_connect (self):
+ def handle_connect(self):
pass
- def pull_trigger (self, thunk=None):
+ def pull_trigger(self, thunk=None):
# print 'PULL_TRIGGER: ', len(self.thunks)
if thunk:
try:
self.lock.acquire()
- self.thunks.append (thunk)
+ self.thunks.append(thunk)
finally:
self.lock.release()
- os.write (self.trigger, 'x')
+ os.write(self.trigger, 'x')
- def handle_read (self):
- self.recv (8192)
+ def handle_read(self):
+ try:
+ self.recv(8192)
+ except os.error, err:
+ if err[0] == errno.EAGAIN: # resource temporarily unavailable
+ return
+ raise
try:
self.lock.acquire()
for thunk in self.thunks:
@@ -101,7 +104,7 @@
except:
nil, t, v, tbinfo = asyncore.compact_traceback()
print ('exception in trigger thunk:'
- ' (%s:%s %s)' % (t, v, tbinfo))
+ '(%s:%s %s)' % (t, v, tbinfo))
self.thunks = []
finally:
self.lock.release()
@@ -113,13 +116,13 @@
# win32-safe version
- class trigger (asyncore.dispatcher):
+ class trigger(asyncore.dispatcher):
address = ('127.9.9.9', 19999)
- def __init__ (self):
- a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
- w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ def __init__(self):
+ a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# set TCP_NODELAY to true to avoid buffering
w.setsockopt(socket.IPPROTO_TCP, 1, 1)
@@ -137,45 +140,50 @@
raise 'Bind Error', 'Cannot bind trigger!'
port=port - 1
- a.listen (1)
- w.setblocking (0)
+ a.listen(1)
+ w.setblocking(0)
try:
- w.connect (self.address)
+ w.connect(self.address)
except:
pass
r, addr = a.accept()
a.close()
- w.setblocking (1)
+ w.setblocking(1)
self.trigger = w
- asyncore.dispatcher.__init__ (self, r)
+ asyncore.dispatcher.__init__(self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._trigger_connected = 0
- def __repr__ (self):
+ def __repr__(self):
return '<select-trigger (loopback) at %x>' % id(self)
- def readable (self):
+ def readable(self):
return 1
- def writable (self):
+ def writable(self):
return 0
- def handle_connect (self):
+ def handle_connect(self):
pass
- def pull_trigger (self, thunk=None):
+ def pull_trigger(self, thunk=None):
if thunk:
try:
self.lock.acquire()
- self.thunks.append (thunk)
+ self.thunks.append(thunk)
finally:
self.lock.release()
- self.trigger.send ('x')
+ self.trigger.send('x')
- def handle_read (self):
- self.recv (8192)
+ def handle_read(self):
+ try:
+ self.recv(8192)
+ except os.error, err:
+ if err[0] == errno.EAGAIN: # resource temporarily unavailable
+ return
+ raise
try:
self.lock.acquire()
for thunk in self.thunks: