[Zope3-checkins] CVS: Zope3/src/zodb/zeo/zrpc - __init__.py:1.1.2.1 client.py:1.1.2.1 connection.py:1.1.2.1 error.py:1.1.2.1 log.py:1.1.2.1 marshal.py:1.1.2.1 server.py:1.1.2.1 smac.py:1.1.2.1 trigger.py:1.1.2.1
Jim Fulton
jim@zope.com
Mon, 23 Dec 2002 14:30:56 -0500
Update of /cvs-repository/Zope3/src/zodb/zeo/zrpc
In directory cvs.zope.org:/tmp/cvs-serv19908/zodb/zeo/zrpc
Added Files:
Tag: NameGeddon-branch
__init__.py client.py connection.py error.py log.py marshal.py
server.py smac.py trigger.py
Log Message:
Initial renaming before debugging
=== Added File Zope3/src/zodb/zeo/zrpc/__init__.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# zrpc is a package with the following modules
# client -- manages connection creation to remote server
# connection -- object dispatcher
# log -- logging helper
# error -- exceptions raised by zrpc
# marshal -- internal, handles basic protocol issues
# server -- manages incoming connections from remote clients
# smac -- sized message async connections
# trigger -- medusa's trigger
# zrpc is not an advertised subpackage of ZEO; its interfaces are internal
=== Added File Zope3/src/zodb/zeo/zrpc/client.py === (421/521 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import errno
import select
import socket
import sys
import threading
import time
import types
import ThreadedAsync
from zodb.interfaces import ReadOnlyError
from zodb.zeo.zrpc import log
from zodb.zeo.zrpc.trigger import trigger
from zodb.zeo.zrpc.connection import ManagedConnection
class ConnectionManager:
"""Keeps a connection up over time"""
def __init__(self, addrs, client, tmin=1, tmax=180):
self.addrlist = self._parse_addrs(addrs)
self.client = client
self.tmin = tmin
self.tmax = tmax
self.cond = threading.Condition(threading.Lock())
self.connection = None # Protected by self.cond
self.closed = 0
# If thread is not None, then there is a helper thread
# attempting to connect.
self.thread = None # Protected by self.cond
self.trigger = None
self.thr_async = 0
ThreadedAsync.register_loop_callback(self.set_async)
def __repr__(self):
return "<%s for %s>" % (self.__class__.__name__, self.addrlist)
[-=- -=- -=- 421 lines omitted -=- -=- -=-]
self.state = "tested"
except ReadOnlyError:
log.info("CW: ReadOnlyError in testConnection (%s)",
repr(self.addr))
self.close()
return
except:
log.error("CW: error in testConnection (%s)", repr(self.addr),
exc_info=True)
self.close()
return
if self.preferred:
self.notify_client()
def notify_client(self):
"""Call the client's notifyConnected().
If this succeeds, call the manager's connect_done().
If the client is already connected, we assume it's a fallback
connection, and the new connection must be a preferred
connection. The client will close the old connection.
"""
try:
self.client.notifyConnected(self.conn)
except:
log.error("CW: error in notifyConnected (%s)", repr(self.addr),
exc_info=True)
self.close()
return
self.state = "notified"
self.mgr.connect_done(self.conn, self.preferred)
def close(self):
"""Close the socket and reset everything."""
self.state = "closed"
self.mgr = self.client = None
self.preferred = 0
if self.conn is not None:
# Closing the ZRPC connection will eventually close the
# socket, somewhere in asyncore.
# XXX Why do we care? --Guido
self.conn.close()
self.conn = None
if self.sock is not None:
self.sock.close()
self.sock = None
def fileno(self):
return self.sock.fileno()
=== Added File Zope3/src/zodb/zeo/zrpc/connection.py === (407/507 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import errno
import select
import sys
import threading
import types
import logging
import ThreadedAsync
from zodb.zeo.zrpc import smac
from zodb.zeo.zrpc.error import ZRPCError, DisconnectedError
from zodb.zeo.zrpc import log
from zodb.zeo.zrpc.marshal import Marshaller
from zodb.zeo.zrpc.trigger import trigger
from zodb import POSException
REPLY = ".reply" # message name used for replies
ASYNC = 1
class Delay:
"""Used to delay response to client for synchronous calls
When a synchronous call is made and the original handler returns
without handling the call, it returns a Delay object that prevents
the mainloop from sending a response.
"""
def set_sender(self, msgid, send_reply, return_error):
self.msgid = msgid
self.send_reply = send_reply
self.return_error = return_error
def reply(self, obj):
self.send_reply(self.msgid, obj)
def error(self, exc_info):
log.error("Error raised in delayed method", exc_info=True)
[-=- -=- -=- 407 lines omitted -=- -=- -=-]
self.mgr = mgr
self.__super_init(sock, addr, obj)
self.obj.notifyConnected(self)
def close(self):
self.obj.notifyDisconnected()
self.mgr.close_conn(self)
self.__super_close()
class ManagedConnection(Connection):
"""Client-side Connection subclass."""
__super_init = Connection.__init__
__super_close = Connection.close
def __init__(self, sock, addr, obj, mgr):
self.mgr = mgr
self.__super_init(sock, addr, obj)
self.check_mgr_async()
# Defer the ThreadedAsync work to the manager.
def close_trigger(self):
# the manager should actually close the trigger
del self.trigger
def set_async(self, map):
pass
def _prepare_async(self):
# Don't do the register_loop_callback that the superclass does
pass
def check_mgr_async(self):
if not self.thr_async and self.mgr.thr_async:
assert self.mgr.trigger is not None, \
"manager (%s) has no trigger" % self.mgr
self.thr_async = 1
self.trigger = self.mgr.trigger
return 1
return 0
def is_async(self):
# XXX could the check_mgr_async() be avoided on each test?
if self.thr_async:
return 1
return self.check_mgr_async()
def close(self):
self.mgr.close_conn(self)
self.__super_close()
=== Added File Zope3/src/zodb/zeo/zrpc/error.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from zodb import POSException
from zodb.zeo.exceptions import Disconnected
class ZRPCError(POSException.StorageError):
pass
class DisconnectedError(ZRPCError, Disconnected):
"""The database storage is disconnected from the storage server."""
=== Added File Zope3/src/zodb/zeo/zrpc/log.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
import types
import threading
import logging
LOG_THREAD_ID = 0 # Set this to 1 during heavy debugging
_label = "zrpc:%s" % os.getpid()
# The code duplication here is for speed (save a layer of function call).
def critical(msg, *args, **kw):
label = _label
if LOG_THREAD_ID:
label = "%s:%s" % (label, threading.currentThread().getName())
logging.critical("%s: "+msg, label, *args, **kw)
def error(msg, *args, **kw):
label = _label
if LOG_THREAD_ID:
label = "%s:%s" % (label, threading.currentThread().getName())
logging.error("%s: "+msg, label, *args, **kw)
def warn(msg, *args, **kw):
label = _label
if LOG_THREAD_ID:
label = "%s:%s" % (label, threading.currentThread().getName())
logging.warn("%s: "+msg, label, *args, **kw)
def info(msg, *args, **kw):
label = _label
if LOG_THREAD_ID:
label = "%s:%s" % (label, threading.currentThread().getName())
logging.info("%s: "+msg, label, *args, **kw)
def debug(msg, *args, **kw):
label = _label
if LOG_THREAD_ID:
label = "%s:%s" % (label, threading.currentThread().getName())
logging.debug("%s: "+msg, label, *args, **kw)
REPR_LIMIT = 40
def short_repr(obj):
"Return an object repr limited to REPR_LIMIT bytes."
# Some of the objects being repr'd are large strings. It's wastes
# a lot of memory to repr them and then truncate, so special case
# them in this function.
# Also handle short repr of a tuple containing a long string.
# This strategy works well for arguments to StorageServer methods.
# The oid is usually first and will get included in its entirety.
# The pickle is near the beginning, too, and you can often fit the
# module name in the pickle.
if isinstance(obj, types.StringType):
if len(obj) > REPR_LIMIT:
r = repr(obj[:REPR_LIMIT])
else:
r = repr(obj)
if len(r) > REPR_LIMIT:
r = r[:REPR_LIMIT-4] + '...' + r[-1]
return r
elif isinstance(obj, types.TupleType):
elts = []
size = 0
for elt in obj:
r = repr(elt)
elts.append(r)
size += len(r)
if size > REPR_LIMIT:
break
r = "(%s)" % (", ".join(elts))
else:
r = repr(obj)
if len(r) > REPR_LIMIT:
return r[:REPR_LIMIT] + '...'
else:
return r
=== Added File Zope3/src/zodb/zeo/zrpc/marshal.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import cPickle
from cStringIO import StringIO
import types
from zodb.zeo.zrpc.error import ZRPCError
from zodb.zeo.zrpc import log
class Marshaller:
"""Marshal requests and replies to second across network"""
def encode(self, msgid, flags, name, args):
"""Returns an encoded message"""
# (We used to have a global pickler, but that's not thread-safe. :-( )
pickler = cPickle.Pickler()
pickler.fast = 1
return pickler.dump((msgid, flags, name, args), 1)
def decode(self, msg):
"""Decodes msg and returns its parts"""
unpickler = cPickle.Unpickler(StringIO(msg))
unpickler.find_global = find_global
try:
return unpickler.load() # msgid, flags, name, args
except:
log.error("can't decode message: %s", log.short_repr(msg))
raise
_globals = globals()
_silly = ('__doc__',)
def find_global(module, name):
"""Helper for message unpickler"""
try:
m = __import__(module, _globals, _globals, _silly)
except ImportError, msg:
raise ZRPCError("import error %s: %s" % (module, msg))
try:
r = getattr(m, name)
except AttributeError:
raise ZRPCError("module %s has no global %s" % (module, name))
safe = getattr(r, '__no_side_effects__', 0)
if safe:
return r
# XXX what's a better way to do this? esp w/ 2.1 & 2.2
if type(r) == types.ClassType and issubclass(r, Exception):
return r
raise ZRPCError("Unsafe global: %s.%s" % (module, name))
=== Added File Zope3/src/zodb/zeo/zrpc/server.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import socket
import types
from zodb.zeo.zrpc.connection import Connection, Delay
from zodb.zeo.zrpc import log
import ThreadedAsync
# Export the main asyncore loop
loop = ThreadedAsync.loop
class Dispatcher(asyncore.dispatcher):
"""A server that accepts incoming RPC connections"""
__super_init = asyncore.dispatcher.__init__
reuse_addr = 1
def __init__(self, addr, factory=Connection, reuse_addr=None):
self.__super_init()
self.addr = addr
self.factory = factory
self.clients = []
if reuse_addr is not None:
self.reuse_addr = reuse_addr
self._open_socket()
def _open_socket(self):
if type(self.addr) == types.TupleType:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
log.info("listening on %s", str(self.addr))
self.bind(self.addr)
self.listen(5)
def writable(self):
return 0
def readable(self):
return 1
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error, msg:
log.info("accepted failed: %s", msg)
return
c = self.factory(sock, addr)
log.info("connect from %s: %s", repr(addr), c)
self.clients.append(c)
=== Added File Zope3/src/zodb/zeo/zrpc/smac.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Sized Message Async Connections."""
import asyncore, struct
import threading
import socket, errno
from types import StringType
from zodb.zeo.exceptions import Disconnected
from zodb.zeo.zrpc import log
# Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
# or that only one is actually used.
tmp_dict = {errno.EWOULDBLOCK: 0,
errno.EAGAIN: 0,
errno.EINTR: 0,
}
expected_socket_read_errors = tuple(tmp_dict.keys())
tmp_dict = {errno.EAGAIN: 0,
errno.EWOULDBLOCK: 0,
errno.ENOBUFS: 0,
errno.EINTR: 0,
}
expected_socket_write_errors = tuple(tmp_dict.keys())
del tmp_dict
# We chose 60000 as the socket limit by looking at the largest strings
# that we could pass to send() without blocking.
SEND_SIZE = 60000
class SizedMessageAsyncConnection(asyncore.dispatcher):
__super_init = asyncore.dispatcher.__init__
__super_close = asyncore.dispatcher.close
__closed = 1 # Marker indicating that we're closed
socket = None # to outwit Sam's getattr
def __init__(self, sock, addr, map=None, debug=None):
self.addr = addr
if debug is not None:
self._debug = debug
elif not hasattr(self, '_debug'):
self._debug = __debug__
# __input_lock protects __inp, __input_len, __state, __msg_size
self.__input_lock = threading.Lock()
self.__inp = None # None, a single String, or a list
self.__input_len = 0
# Instance variables __state and __msg_size work together:
# when __state == 0:
# __msg_size == 4, and the next thing read is a message size;
# when __state == 1:
# __msg_size is variable, and the next thing read is a message.
# The next thing read is always of length __msg_size.
# The state alternates between 0 and 1.
self.__state = 0
self.__msg_size = 4
self.__output_lock = threading.Lock() # Protects __output
self.__output = []
self.__closed = 0
self.__super_init(sock, map)
def get_addr(self):
return self.addr
# XXX avoid expensive getattr calls? Can't remember exactly what
# this comment was supposed to mean, but it has something to do
# with the way asyncore uses getattr and uses if sock:
def __nonzero__(self):
return 1
def handle_read(self):
self.__input_lock.acquire()
try:
# Use a single __inp buffer and integer indexes to make this fast.
try:
d = self.recv(8192)
except socket.error, err:
if err[0] in expected_socket_read_errors:
return
raise
if not d:
return
input_len = self.__input_len + len(d)
msg_size = self.__msg_size
state = self.__state
inp = self.__inp
if msg_size > input_len:
if inp is None:
self.__inp = d
elif type(self.__inp) is StringType:
self.__inp = [self.__inp, d]
else:
self.__inp.append(d)
self.__input_len = input_len
return # keep waiting for more input
# load all previous input and d into single string inp
if isinstance(inp, StringType):
inp = inp + d
elif inp is None:
inp = d
else:
inp.append(d)
inp = "".join(inp)
offset = 0
while (offset + msg_size) <= input_len:
msg = inp[offset:offset + msg_size]
offset = offset + msg_size
if not state:
# waiting for message
msg_size = struct.unpack(">i", msg)[0]
state = 1
else:
msg_size = 4
state = 0
# XXX We call message_input() with __input_lock
# held!!! And message_input() may end up calling
# message_output(), which has its own lock. But
# message_output() cannot call message_input(), so
# the locking order is always consistent, which
# prevents deadlock. Also, message_input() may
# take a long time, because it can cause an
# incoming call to be handled. During all this
# time, the __input_lock is held. That's a good
# thing, because it serializes incoming calls.
self.message_input(msg)
self.__state = state
self.__msg_size = msg_size
self.__inp = inp[offset:]
self.__input_len = input_len - offset
finally:
self.__input_lock.release()
def readable(self):
return 1
def writable(self):
if len(self.__output) == 0:
return 0
else:
return 1
def handle_write(self):
self.__output_lock.acquire()
try:
output = self.__output
while output:
# Accumulate output into a single string so that we avoid
# multiple send() calls, but avoid accumulating too much
# data. If we send a very small string and have more data
# to send, we will likely incur delays caused by the
# unfortunate interaction between the Nagle algorithm and
# delayed acks. If we send a very large string, only a
# portion of it will actually be delivered at a time.
l = 0
for i in range(len(output)):
l += len(output[i])
if l > SEND_SIZE:
break
i += 1
# It is very unlikely that i will be 1.
v = "".join(output[:i])
del output[:i]
try:
n = self.send(v)
except socket.error, err:
if err[0] in expected_socket_write_errors:
break # we couldn't write anything
raise
if n < len(v):
output.insert(0, v[n:])
break # we can't write any more
finally:
self.__output_lock.release()
def handle_close(self):
self.close()
def message_output(self, message):
if __debug__:
if self._debug:
log.debug('message_output %d bytes: %s',
len(message), log.short_repr(message))
if self.__closed:
raise Disconnected("Action is temporarily unavailable")
self.__output_lock.acquire()
try:
# do two separate appends to avoid copying the message string
self.__output.append(struct.pack(">i", len(message)))
if len(message) <= SEND_SIZE:
self.__output.append(message)
else:
for i in range(0, len(message), SEND_SIZE):
self.__output.append(message[i:i+SEND_SIZE])
finally:
self.__output_lock.release()
def close(self):
if not self.__closed:
self.__closed = 1
self.__super_close()
=== Added File Zope3/src/zodb/zeo/zrpc/trigger.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import os
import socket
import thread
import traceback
if os.name == 'posix':
class trigger(asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def __init__(self):
r, w = self._fds = os.pipe()
self.trigger = w
asyncore.file_dispatcher.__init__(self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._closed = 0
# Override the asyncore close() method, because it seems that
# it would only close the r file descriptor and not w. The
# constructor calls file_dispatcher.__init__ and passes r,
# which would get stored in a file_wrapper and get closed by
# the default close. But that would leave w open...
def close(self):
if not self._closed:
self._closed = 1
self.del_channel()
for fd in self._fds:
os.close(fd)
self._fds = []
def __repr__(self):
return '<select-trigger (pipe) at %x>' % id(self)
def readable(self):
return 1
def writable(self):
return 0
def handle_connect(self):
pass
def handle_close(self):
self.close()
def pull_trigger(self, thunk=None):
if thunk:
self.lock.acquire()
try:
self.thunks.append(thunk)
finally:
self.lock.release()
os.write(self.trigger, 'x')
def handle_read(self):
try:
self.recv(8192)
except socket.error:
return
self.lock.acquire()
try:
for thunk in self.thunks:
try:
thunk()
except:
L = traceback.format_exception(*sys.exc_info())
print 'exception in trigger thunk:\n%s' % "".join(L)
self.thunks = []
finally:
self.lock.release()
else:
# XXX Should define a base class that has the common methods and
# then put the platform-specific in a subclass named trigger.
# win32-safe version
HOST = '127.0.0.1'
MINPORT = 19950
NPORTS = 50
class trigger(asyncore.dispatcher):
portoffset = 0
def __init__(self):
a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# set TCP_NODELAY to true to avoid buffering
w.setsockopt(socket.IPPROTO_TCP, 1, 1)
# tricky: get a pair of connected sockets
for i in range(NPORTS):
trigger.portoffset = (trigger.portoffset + 1) % NPORTS
port = MINPORT + trigger.portoffset
address = (HOST, port)
try:
a.bind(address)
except socket.error:
continue
else:
break
else:
raise RuntimeError, 'Cannot bind trigger!'
a.listen(1)
w.setblocking(0)
try:
w.connect(address)
except:
pass
r, addr = a.accept()
a.close()
w.setblocking(1)
self.trigger = w
asyncore.dispatcher.__init__(self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._trigger_connected = 0
def __repr__(self):
return '<select-trigger (loopback) at %x>' % id(self)
def readable(self):
return 1
def writable(self):
return 0
def handle_connect(self):
pass
def pull_trigger(self, thunk=None):
if thunk:
self.lock.acquire()
try:
self.thunks.append(thunk)
finally:
self.lock.release()
self.trigger.send('x')
def handle_read(self):
try:
self.recv(8192)
except socket.error:
return
self.lock.acquire()
try:
for thunk in self.thunks:
try:
thunk()
except:
L = traceback.format_exception(*sys.exc_info())
print 'exception in trigger thunk:\n%s' % "".join(L)
self.thunks = []
finally:
self.lock.release()