[Zodb-checkins] CVS: StandaloneZODB/ZEO - logger.py:1.2 ClientStorage.py:1.40 StorageServer.py:1.36 smac.py:1.16 start.py:1.31 trigger.py:1.5 zrpc.py:1.23
Jeremy Hylton
jeremy@zope.com
Thu, 4 Apr 2002 18:09:31 -0500
Update of /cvs-repository/StandaloneZODB/ZEO
In directory cvs.zope.org:/tmp/cvs-serv14866
Modified Files:
ClientStorage.py StorageServer.py smac.py start.py trigger.py
zrpc.py
Added Files:
logger.py
Log Message:
Commit the zeo-1_0-debug-branch to the trunk.
I expect this code will become ZEO 1.1.
=== StandaloneZODB/ZEO/logger.py 1.1 => 1.2 ===
+from types import StringType
+from zLOG import *
+
+__all__ = ["zLogger", "format_msg"]
+
+_MAX_MSG_SIZE = 120
+
+def format_msg(*args):
+ accum = []
+ total_len = 0
+ for arg in args:
+ if not isinstance(arg, StringType):
+ arg = str(arg)
+ accum.append(arg)
+ total_len = total_len + len(arg)
+ if total_len >= _MAX_MSG_SIZE:
+ break
+ m = string.join(accum)
+ if len(m) > _MAX_MSG_SIZE:
+ m = m[:_MAX_MSG_SIZE] + ' ...'
+ return m
+
+class zLogger:
+
+ def __init__(self, channel):
+ self.channel = channel
+
+ def __str__(self):
+ raise RuntimeError, "don't print me"
+
+ def trace(self, msg):
+ LOG(self.channel, TRACE, msg)
+
+ def debug(self, msg):
+ LOG(self.channel, DEBUG, msg)
+
+ def blather(self, msg):
+ LOG(self.channel, BLATHER, msg)
+
+ def info(self, msg):
+ LOG(self.channel, INFO, msg)
+
+ def problem(self, msg):
+ LOG(self.channel, PROBLEM, msg)
+
+ def warning(self, msg):
+ LOG(self.channel, WARNING, msg)
+
+ def error(self, msg, error=None):
+ LOG(self.channel, ERROR, msg, error=error)
+
+ def panic(self, msg):
+ LOG(self.channel, PANIC, msg)
=== StandaloneZODB/ZEO/ClientStorage.py 1.39 => 1.40 ===
__version__='$Revision$'[11:-2]
-import struct, time, os, socket, string, Sync, zrpc, ClientCache
-import tempfile, Invalidator, ExtensionClass, thread
-import ThreadedAsync
-
-now=time.time
+import struct, time, os, socket, string
+import tempfile, thread
from struct import pack, unpack
+from types import TupleType
+
+import Invalidator, ExtensionClass
+import ThreadedAsync, Sync, zrpc, ClientCache
+
from ZODB import POSException, BaseStorage
from ZODB.TimeStamp import TimeStamp
-from zLOG import LOG, PROBLEM, INFO
-try: from ZODB.ConflictResolution import ResolvedSerial
-except: ResolvedSerial='rs'
+from ZEO.logger import zLogger
+
+log = zLogger("ZEO Client")
-TupleType=type(())
+try:
+ from ZODB.ConflictResolution import ResolvedSerial
+except:
+ ResolvedSerial='rs'
class ClientStorageError(POSException.StorageError):
"""An error occured in the ZEO Client Storage"""
@@ -62,8 +67,12 @@
self._info={'length': 0, 'size': 0, 'name': 'ZEO Client',
'supportsUndo':0, 'supportsVersions': 0,
}
-
- self._call=zrpc.asyncRPC(connection, debug=debug,
+
+ if debug:
+ debug_log = log
+ else:
+ debug_log = None
+ self._call=zrpc.asyncRPC(connection, debug=debug_log,
tmin=min_disconnect_poll,
tmax=max_disconnect_poll)
@@ -132,7 +141,7 @@
# If we can't connect right away, go ahead and open the cache
# and start a separate thread to try and reconnect.
- LOG("ClientStorage", PROBLEM, "Failed to connect to storage")
+ log.problem("Failed to connect to storage")
self._cache.open()
thread.start_new_thread(self._call.connect,(0,))
@@ -140,7 +149,7 @@
# notifyConnected
def notifyConnected(self, s):
- LOG("ClientStorage", INFO, "Connected to storage")
+ log.info("Connected to storage")
self._lock_acquire()
try:
@@ -197,7 +206,7 @@
### responsible for starting the thread that makes the connection.
def notifyDisconnected(self, ignored):
- LOG("ClientStorage", PROBLEM, "Disconnected from storage")
+ log.problem("Disconnected from storage")
self._connected=0
self._transaction=None
thread.start_new_thread(self._call.connect,(0,))
@@ -233,7 +242,7 @@
def close(self):
self._lock_acquire()
try:
- LOG("ClientStorage", INFO, "close")
+ log.info("close")
self._call.closeIntensionally()
try:
self._tfile.close()
@@ -549,6 +558,9 @@
finally: self._lock_release()
def sync(self): self._call.sync()
+
+ def status(self):
+ self._call.sendMessage('status')
def getWakeup(_w=[]):
if _w: return _w[0]
=== StandaloneZODB/ZEO/StorageServer.py 1.35 => 1.36 ===
import asyncore, socket, string, sys, os
-from smac import SizedMessageAsyncConnection
-from ZODB import POSException
import cPickle
from cPickle import Unpickler
+from cStringIO import StringIO
+from thread import start_new_thread
+import time
+from types import StringType
+
+from ZODB import POSException
from ZODB.POSException import TransactionError, UndoError, VersionCommitError
from ZODB.Transaction import Transaction
-import traceback
-from zLOG import LOG, INFO, ERROR, TRACE, BLATHER
from ZODB.referencesf import referencesf
-from thread import start_new_thread
-from cStringIO import StringIO
+from ZODB.utils import U64
+
from ZEO import trigger
from ZEO import asyncwrap
-from ZEO.smac import Disconnected
-from types import StringType
-
-class StorageServerError(POSException.StorageError): pass
-
-max_blather=120
-def blather(*args):
- accum = []
- total_len = 0
- for arg in args:
- if not isinstance(arg, StringType):
- arg = str(arg)
- accum.append(arg)
- total_len = total_len + len(arg)
- if total_len >= max_blather:
- break
- m = string.join(accum)
- if len(m) > max_blather: m = m[:max_blather] + ' ...'
- LOG('ZEO Server', TRACE, m)
+from ZEO.smac import Disconnected, SizedMessageAsyncConnection
+from ZEO.logger import zLogger, format_msg
+class StorageServerError(POSException.StorageError):
+ pass
# We create a special fast pickler! This allows us
# to create slightly more efficient pickles and
@@ -56,6 +43,8 @@
pickler.fast=1 # Don't use the memo
dump=pickler.dump
+log = zLogger("ZEO Server")
+
class StorageServer(asyncore.dispatcher):
def __init__(self, connection, storages):
@@ -80,14 +69,14 @@
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
- LOG('ZEO Server', INFO, 'Listening on %s' % repr(connection))
+ log.info('Listening on %s' % repr(connection))
self.bind(connection)
self.listen(5)
def register_connection(self, connection, storage_id):
storage=self.__storages.get(storage_id, None)
if storage is None:
- LOG('ZEO Server', ERROR, "Unknown storage_id: %s" % storage_id)
+ log.error("Unknown storage_id: %s" % storage_id)
connection.close()
return None, None
@@ -126,18 +115,29 @@
def handle_accept(self):
try:
- sock, addr = self.accept()
- except socket.error:
- sys.stderr.write('warning: accept failed\n')
+ r = self.accept()
+ if r is None:
+ return
+ sock, addr = r
+ except socket.error, err:
+ log.warning("accept() failed: %s" % err)
else:
ZEOConnection(self, sock, addr)
- def log_info(self, message, type='info'):
- if type=='error': type=ERROR
- else: type=INFO
- LOG('ZEO Server', type, message)
+ def status(self):
+ """Log status information about connections and storages"""
- log=log_info
+ lines = []
+ for storage_id, connections in self.__connections.items():
+ s = "Storage %s has %d connections" % (storage_id,
+ len(connections))
+ lines.append(s)
+ for c in connections:
+ lines.append("%s readable=%s writeable=%s" % (
+ c, c.readable(), c.writable()))
+ lines.append("\t" + c.stats())
+ log.info(string.join(lines, "\n"))
+ return _noreturn
storage_methods={}
for n in (
@@ -148,6 +148,7 @@
'tpc_finish', 'undo', 'undoLog', 'undoInfo', 'versionEmpty', 'versions',
'transactionalUndo',
'vote', 'zeoLoad', 'zeoVerify', 'beginZeoVerify', 'endZeoVerify',
+ 'status'
):
storage_methods[n]=1
storage_method=storage_methods.has_key
@@ -159,7 +160,8 @@
raise StorageServerError, (
"Couldn\'t import global module %s" % module)
- try: r=getattr(m, name)
+ try:
+ r=getattr(m, name)
except:
raise StorageServerError, (
"Couldn\'t find global %s in module %s" % (name, module))
@@ -177,12 +179,52 @@
def __init__(self, server, sock, addr):
self.__server=server
+ self.status = server.status
self.__invalidated=[]
self.__closed=None
- if __debug__: debug='ZEO Server'
- else: debug=0
+ if __debug__:
+ debug = log
+ else:
+ debug = None
+
+ if __debug__:
+ # store some detailed statistics about method calls
+ self._last_method = None
+ self._t_begin = None
+ self._t_end = None
+ self._ncalls = 0
+
SizedMessageAsyncConnection.__init__(self, sock, addr, debug=debug)
- LOG('ZEO Server', INFO, 'Connect %s %s' % (id(self), `addr`))
+ self.logaddr = repr(addr) # form of addr suitable for logging
+ log.info('Connect %s %s' % (id(self), self.logaddr))
+
+ def stats(self):
+ # This method is called via the status() command. The stats
+ # are of limited use for the current command, because the
+ # actual invocation of status() will clobber the previous
+ # method's statistics.
+ #
+ # When there are multiple connections active, a new connection
+ # can always get detailed statistics about other connections.
+ if __debug__:
+ if self._last_method == "status":
+ return "method=status begin=%s end=... ncalls=%d" % (
+ self._t_begin, self._ncalls)
+ if self._t_end is not None and self._t_begin is not None:
+ delta = self._t_end - self._t_begin
+ else:
+ delta = -1
+ return "method=%s begin=%s end=%s delta=%.3f ncalls=%d" % (
+ self._last_method, self._t_begin, self._t_end, delta,
+ self._ncalls)
+ else:
+ return ""
+
+ def __repr__(self):
+ return "<ZEOConnection %s%s" % (`self.addr`,
+ # sort of messy way to add tag 'closed' to
+ # connections that are closed
+ (self.__closed is None and '>' or ' closed>'))
def close(self):
t=self._transaction
@@ -196,19 +238,26 @@
self.__server.unregister_connection(self, self.__storage_id)
self.__closed=1
SizedMessageAsyncConnection.close(self)
- LOG('ZEO Server', INFO, 'Close %s' % id(self))
+ log.info('Close %s' % id(self))
def message_input(self, message,
dump=dump, Unpickler=Unpickler, StringIO=StringIO,
None=None):
if __debug__:
- if len(message) > max_blather:
- tmp = `message[:max_blather]`
+
+ self._t_begin = time.time()
+ self._t_end = None
+
+ if len(message) > 120: # XXX need constant from logger
+ tmp = `message[:120]`
else:
tmp = `message`
- blather('message_input', id(self), tmp)
+ log.trace("message_input %s" % tmp)
if self.__storage is None:
+ if __debug__:
+ log.blather("register connection to %s from %s" % (message,
+ self.logaddr))
# This is the first communication from the client
self.__storage, self.__storage_id = (
self.__server.register_connection(self, message))
@@ -226,27 +275,42 @@
name, args = args[0], args[1:]
if __debug__:
- apply(blather,
- ("call", id(self), ":", name,) + args)
+ self._last_method = name
+ self._ncalls = self._ncalls + 1
+ log.debug("call %s%s from %s" % (name, format_msg(args),
+ self.logaddr))
if not storage_method(name):
+ log.warning("Invalid method name: %s" % name)
+ if __debug__:
+ self._t_end = time.time()
raise 'Invalid Method Name', name
if hasattr(self, name):
r=apply(getattr(self, name), args)
else:
r=apply(getattr(self.__storage, name), args)
- if r is _noreturn: return
- except (UndoError, VersionCommitError):
- # These are normal usage errors. No need to leg them
+ if r is _noreturn:
+ if __debug__:
+ log.debug("no return to %s" % self.logaddr)
+ self._t_end = time.time()
+ return
+ except (UndoError, VersionCommitError), err:
+ if __debug__:
+ log.debug("return error %s to %s" % (err, self.logaddr))
+ self._t_end = time.time()
+ # These are normal usage errors. No need to log them.
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
return
except:
- LOG('ZEO Server', ERROR, 'error', error=sys.exc_info())
+ if __debug__:
+ self._t_end = time.time()
+ log.error("error", error=sys.exc_info())
self.return_error(sys.exc_info()[0], sys.exc_info()[1])
return
if __debug__:
- blather("%s R: %s" % (id(self), `r`))
+ log.debug("return %s to %s" % (format_msg(r), self.logaddr))
+ self._t_end = time.time()
r=dump(r,1)
self.message_output('R'+r)
@@ -256,7 +320,7 @@
err_value = err_type, err_value
if __debug__:
- blather("%s E: %s" % (id(self), `err_value`))
+ log.trace("%s E: %s" % (id(self), `err_value`))
try: r=dump(err_value, 1)
except:
@@ -292,6 +356,8 @@
}
def zeoLoad(self, oid):
+ if __debug__:
+ log.blather("zeoLoad(%s) %s" % (U64(oid), self.logaddr))
storage=self.__storage
v=storage.modifiedInVersion(oid)
if v: pv, sv = storage.load(oid, v)
@@ -308,6 +374,8 @@
def beginZeoVerify(self):
+ if __debug__:
+ log.blather("beginZeoVerify() %s" % self.logaddr)
self.message_output('bN.')
return _noreturn
@@ -324,6 +392,8 @@
return _noreturn
def endZeoVerify(self):
+ if __debug__:
+ log.blather("endZeoVerify() %s" % self.logaddr)
self.message_output('eN.')
return _noreturn
@@ -340,11 +410,11 @@
def _pack(self, t, wait=0):
try:
- LOG('ZEO Server', BLATHER, 'pack begin')
+ log.blather('pack begin')
self.__storage.pack(t, referencesf)
- LOG('ZEO Server', BLATHER, 'pack end')
+ log.blather('pack end')
except:
- LOG('ZEO Server', ERROR,
+ log.error(
'Pack failed for %s' % self.__storage_id,
error=sys.exc_info())
if wait:
@@ -381,6 +451,9 @@
def storea(self, oid, serial, data, version, id,
dump=dump):
+ if __debug__:
+ log.blather("storea(%s, [%d], %s) %s" % (U64(oid), len(data),
+ U64(id), self.logaddr))
try:
t=self._transaction
if t is None or id != t.id:
@@ -396,7 +469,7 @@
# all errors need to be serialized to prevent unexpected
# returns, which would screw up the return handling.
# IOW, Anything that ends up here is evil enough to be logged.
- LOG('ZEO Server', ERROR, 'store error', error=sys.exc_info())
+ log.error('store error', error=sys.exc_info())
newserial=sys.exc_info()[1]
else:
if serial != '\0\0\0\0\0\0\0\0':
@@ -420,12 +493,17 @@
return self.__storage.tpc_vote(t)
def transactionalUndo(self, trans_id, id):
+ if __debug__:
+ log.blather("transactionalUndo(%s, %s) %s" % (trans_id,
+ U64(id), self.logaddr))
t=self._transaction
if t is None or id != t.id:
raise POSException.StorageTransactionError(self, id)
return self.__storage.transactionalUndo(trans_id, self._transaction)
def undo(self, transaction_id):
+ if __debug__:
+ log.blather("undo(%s) %s" % (transaction_id, self.logaddr))
oids=self.__storage.undo(transaction_id)
if oids:
self.__server.invalidate(
@@ -457,11 +535,15 @@
def commitlock_suspend(self, resume, args, onerror):
self.__storage._waiting.append((resume, args, onerror))
+ log.blather("suspend %s. %d queued clients" % (resume.im_self,
+ len(self.__storage._waiting)))
def commitlock_resume(self):
waiting = self.__storage._waiting
while waiting:
resume, args, onerror = waiting.pop(0)
+ log.blather("resuming queued client %s, %d still queued" % (
+ resume.im_self, len(waiting)))
try:
if apply(resume, args):
break
@@ -471,12 +553,18 @@
# disconnect will have generated its own log event.
onerror()
except:
- LOG('ZEO Server', ERROR,
+ log.error(
"Unexpected error handling queued tpc_begin()",
error=sys.exc_info())
onerror()
def tpc_abort(self, id):
+ if __debug__:
+ try:
+ log.blather("tpc_abort(%s) %s" % (U64(id), self.logaddr))
+ except:
+ print repr(id)
+ raise
t = self._transaction
if t is None or id != t.id:
return
@@ -492,6 +580,10 @@
self.message_output('UN.')
def tpc_begin(self, id, user, description, ext):
+ if __debug__:
+ log.blather("tpc_begin(%s, %s, %s) %s" % (U64(id), `user`,
+ `description`,
+ self.logaddr))
t = self._transaction
if t is not None:
if id == t.id:
@@ -505,7 +597,8 @@
if storage._transaction is not None:
self.commitlock_suspend(self.unlock, (), self.close)
return 1 # Return a flag indicating a lock condition.
-
+
+ assert id != 't'
self._transaction=t=Transaction()
t.id=id
t.user=user
@@ -542,6 +635,8 @@
return 1
def tpc_finish(self, id, user, description, ext):
+ if __debug__:
+ log.blather("tpc_finish(%s) %s" % (U64(id), self.logaddr))
t = self._transaction
if id != t.id:
return
@@ -564,7 +659,7 @@
if __name__=='__main__':
import ZODB.FileStorage
name, port = sys.argv[1:3]
- blather(name, port)
+ log.trace(format_msg(name, port))
try:
port='', int(port)
except:
=== StandaloneZODB/ZEO/smac.py 1.15 => 1.16 ===
import asyncore, string, struct, zLOG, sys, Acquisition
import socket, errno
-from zLOG import LOG, TRACE, ERROR, INFO
+from logger import zLogger
# Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
@@ -48,10 +48,10 @@
SizedMessageAsyncConnection.inheritedAttribute(
'__init__')(self, sock, map)
self.addr=addr
- if debug is not None:
- self._debug=debug
- elif not hasattr(self, '_debug'):
- self._debug=__debug__ and 'smac'
+ if debug is None and __debug__:
+ self._debug = zLogger("smac")
+ else:
+ self._debug = debug
self.__state=None
self.__inp=None
self.__inpl=0
@@ -132,23 +132,18 @@
def message_output(self, message,
pack=struct.pack, len=len):
- if self._debug:
- if len(message) > 40: m=message[:40]+' ...'
- else: m=message
- LOG(self._debug, TRACE, 'message_output %s' % `m`)
+ if self._debug is not None:
+ if len(message) > 40:
+ m = message[:40]+' ...'
+ else:
+ m = message
+ self._debug.trace('message_output %s' % `m`)
append=self.__append
if append is None:
raise Disconnected("This action is temporarily unavailable.<p>")
append(pack(">i",len(message))+message)
-
- def log_info(self, message, type='info'):
- if type=='error': type=ERROR
- else: type=INFO
- LOG('ZEO', type, message)
-
- log=log_info
def close(self):
if self.__append is not None:
=== StandaloneZODB/ZEO/start.py 1.30 => 1.31 ===
)
- opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
+ fs = os.path.join(var, 'Data.fs')
- fs=os.path.join(var, 'Data.fs')
-
- usage="""%s [options] [filename]
+ usage = """%s [options] [filename]
where options are:
@@ -121,6 +119,13 @@
if no file name is specified, then %s is used.
""" % (me, fs)
+ try:
+ opts, args = getopt.getopt(args, 'p:Ddh:U:sS:u:')
+ except getopt.error, err:
+ print err
+ print usage
+ sys.exit(1)
+
port=None
debug=detailed=0
host=''
@@ -217,15 +222,15 @@
import signal
signal.signal(signal.SIGTERM,
- lambda sig, frame, s=storages: shutdown(s)
- )
+ lambda sig, frame, s=storages: shutdown(s))
signal.signal(signal.SIGINT,
- lambda sig, frame, s=storages: shutdown(s, 0)
- )
- try: signal.signal(signal.SIGHUP, rotate_logs_handler)
- except: pass
-
- except: pass
+ lambda sig, frame, s=storages: shutdown(s, 0))
+ try:
+ signal.signal(signal.SIGHUP, rotate_logs_handler)
+ except:
+ pass
+ except:
+ pass
items=storages.items()
items.sort()
@@ -236,13 +241,16 @@
ZEO.StorageServer.StorageServer(unix, storages)
- try: ppid, pid = os.getppid(), os.getpid()
- except: pass # getpid not supported
- else: open(zeo_pid,'w').write("%s %s" % (ppid, pid))
+ try:
+ ppid, pid = os.getppid(), os.getpid()
+ except:
+ pass # getpid not supported
+ else:
+ open(zeo_pid,'w').write("%s %s" % (ppid, pid))
except:
# Log startup exception and tell zdaemon not to restart us.
- info=sys.exc_info()
+ info = sys.exc_info()
try:
import zLOG
zLOG.LOG("z2", zLOG.PANIC, "Startup exception",
@@ -280,21 +288,29 @@
# unnecessary, since we now use so_reuseaddr.
for ignored in 1,2:
for socket in asyncore.socket_map.values():
- try: socket.close()
- except: pass
+ try:
+ socket.close()
+ except:
+ pass
for storage in storages.values():
- try: storage.close()
- except: pass
+ try:
+ storage.close()
+ except:
+ pass
try:
from zLOG import LOG, INFO
LOG('ZEO Server', INFO,
"Shutting down (%s)" % (die and "shutdown" or "restart")
)
- except: pass
+ except:
+ pass
- if die: sys.exit(0)
- else: sys.exit(1)
+ if die:
+ sys.exit(0)
+ else:
+ sys.exit(1)
-if __name__=='__main__': main(sys.argv)
+if __name__ == '__main__':
+ main(sys.argv)
=== StandaloneZODB/ZEO/trigger.py 1.4 => 1.5 ===
# from Sam Rushing's Medusa server.
-
import asyncore
-#import asynchat
-
+import errno
import os
import socket
import string
@@ -26,7 +24,7 @@
if os.name == 'posix':
- class trigger (asyncore.file_dispatcher):
+ class trigger(asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread"
@@ -58,10 +56,10 @@
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
- def __init__ (self):
+ def __init__(self):
r, w = self._fds = os.pipe()
self.trigger = w
- asyncore.file_dispatcher.__init__ (self, r)
+ asyncore.file_dispatcher.__init__(self, r)
self.lock = thread.allocate_lock()
self.thunks = []
@@ -69,30 +67,35 @@
os.close(self._fds[0])
os.close(self._fds[1])
- def __repr__ (self):
- return '<select-trigger (pipe) at %x>' % id(self)
+ def __repr__(self):
+ return '<select-trigger(pipe) at %x>' % id(self)
- def readable (self):
+ def readable(self):
return 1
- def writable (self):
+ def writable(self):
return 0
- def handle_connect (self):
+ def handle_connect(self):
pass
- def pull_trigger (self, thunk=None):
+ def pull_trigger(self, thunk=None):
# print 'PULL_TRIGGER: ', len(self.thunks)
if thunk:
try:
self.lock.acquire()
- self.thunks.append (thunk)
+ self.thunks.append(thunk)
finally:
self.lock.release()
- os.write (self.trigger, 'x')
+ os.write(self.trigger, 'x')
- def handle_read (self):
- self.recv (8192)
+ def handle_read(self):
+ try:
+ self.recv(8192)
+ except os.error, err:
+ if err[0] == errno.EAGAIN: # resource temporarily unavailable
+ return
+ raise
try:
self.lock.acquire()
for thunk in self.thunks:
@@ -101,7 +104,7 @@
except:
nil, t, v, tbinfo = asyncore.compact_traceback()
print ('exception in trigger thunk:'
- ' (%s:%s %s)' % (t, v, tbinfo))
+ '(%s:%s %s)' % (t, v, tbinfo))
self.thunks = []
finally:
self.lock.release()
@@ -113,13 +116,13 @@
# win32-safe version
- class trigger (asyncore.dispatcher):
+ class trigger(asyncore.dispatcher):
address = ('127.9.9.9', 19999)
- def __init__ (self):
- a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
- w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ def __init__(self):
+ a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# set TCP_NODELAY to true to avoid buffering
w.setsockopt(socket.IPPROTO_TCP, 1, 1)
@@ -137,45 +140,50 @@
raise 'Bind Error', 'Cannot bind trigger!'
port=port - 1
- a.listen (1)
- w.setblocking (0)
+ a.listen(1)
+ w.setblocking(0)
try:
- w.connect (self.address)
+ w.connect(self.address)
except:
pass
r, addr = a.accept()
a.close()
- w.setblocking (1)
+ w.setblocking(1)
self.trigger = w
- asyncore.dispatcher.__init__ (self, r)
+ asyncore.dispatcher.__init__(self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._trigger_connected = 0
- def __repr__ (self):
+ def __repr__(self):
return '<select-trigger (loopback) at %x>' % id(self)
- def readable (self):
+ def readable(self):
return 1
- def writable (self):
+ def writable(self):
return 0
- def handle_connect (self):
+ def handle_connect(self):
pass
- def pull_trigger (self, thunk=None):
+ def pull_trigger(self, thunk=None):
if thunk:
try:
self.lock.acquire()
- self.thunks.append (thunk)
+ self.thunks.append(thunk)
finally:
self.lock.release()
- self.trigger.send ('x')
+ self.trigger.send('x')
- def handle_read (self):
- self.recv (8192)
+ def handle_read(self):
+ try:
+ self.recv(8192)
+ except os.error, err:
+ if err[0] == errno.EAGAIN: # resource temporarily unavailable
+ return
+ raise
try:
self.lock.acquire()
for thunk in self.thunks:
=== StandaloneZODB/ZEO/zrpc.py 1.22 => 1.23 ===
self.__call_lr=l.release
- def connect(self, tryonce=1, log_type='client'):
+ def connect(self, tryonce=1):
t=self._tmin
connection = self._connection
debug=self._debug
while self.__closed == 0:
- if log_type: LOG(log_type, INFO,
- 'Trying to connect to server: %s' % `connection`)
+ LOG("client", INFO,
+ 'Trying to connect to server: %s' % `connection`)
try:
if type(connection) is type(''):
s=socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
@@ -75,15 +75,15 @@
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(connection)
except Exception, err:
- if debug:
- LOG(debug, DEBUG, "Failed to connect to server: %s" % err)
+ if debug is not None:
+ debug.blather("Failed to connect to server: %s" % err)
if tryonce: return 0
time.sleep(t)
t=t*2
if t > self._tmax: t=self._tmax
else:
- if debug:
- LOG(debug, DEBUG, "Connected to server")
+ if debug is not None:
+ debug.blather("Connected to server")
# Make sure the result lock is set, se we don't
# get an old result (e.g. the exception that
@@ -199,12 +199,12 @@
self._outOfBand=f
def message_input(self, m):
- if self._debug:
+ if self._debug is not None:
if len(m) > 60:
md = repr(m[:60]) + ' ...'
else:
md = repr(m)
- LOG(self._debug, TRACE, 'message_input %s' % md)
+ self._debug.trace('message_input %s' % md)
c=m[:1]
if c in 'RE':