[Zodb-checkins] CVS: StandaloneZODB/ZEO/zrpc - NOTES:1.1.2.1 __init__.py:1.1.2.1 client.py:1.1.2.1 connection.py:1.1.2.1 error.py:1.1.2.1 log.py:1.1.2.1 marshal.py:1.1.2.1 server.py:1.1.2.1 trigger.py:1.1.2.1
Jeremy Hylton
jeremy@zope.com
Wed, 16 Jan 2002 09:52:39 -0500
Update of /cvs-repository/StandaloneZODB/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv22529/zrpc
Added Files:
Tag: Standby-branch
NOTES __init__.py client.py connection.py error.py log.py
marshal.py server.py trigger.py
Log Message:
Convert zrpc from module zrpc2 to zrpc package.
=== Added File StandaloneZODB/ZEO/zrpc/NOTES ===
The Connection object should be extended to support more flexible
handling for outstanding calls. In particular, it should be possible
to have multiple calls with return values outstanding.
The mechanism described here is based on the promises mechanism in
Argus, which was influenced by futures in Multilisp.
Promises: Linguistic Support for Efficient Asynchronous Procedure
Calls in Distributed Systems. Barbara Liskov and Liuba Shrira.
Proc. of Conf. on Programming Language Design and Implementation
(PLDI), June 1988.
We want to support two different kinds of calls:
- send : invoke a method that returns no value
- call : invoke a method that returns a value
On the client, a call immediately returns a promise. A promise is an
object that can be used to claim the return value when it becomes
available.
- ready(): returns true if the return value is ready or an exception
occurred
- claim(): returns the call's return value or raises an exception,
blocking if necessary
The server side of a zrpc connection can be implemented using
asyncore. In that case, a method call blocks other RPC activity until
it returns. If a call needs to return a value, but can't return
immediately, it returns a delay object (ZEO.zrpc.server.Delay).
When the zrpc connection receives a Delay object, it does not
immediately return to the caller. Instead, it returns when the
reply() method is called. A Delay has two methods:
- set_sender()
- reply(obj): returns obj to the sender
-----------------------------------------
Open issues:
Delayed exception
There is currently no mechanism to raise an exception from a delayed
pcall.
Synchronization
The following item is part of Argus, but the motivation isn't entirely
clear.
For any two calls, C1 and C2, C1 always starts on the server
first. For the promises, C2 is ready() iff C1 is also ready().
The promises can be claimed in any order.
A related notion:
The connection should also support a synch() method that returns
only when all outstanding calls have completed. If any of these
calls raised an exception, the synch() call raises an exception.
XXX synch() sounds potentially useful, but it's not clear if it would
be useful for ZEO. In ZEO a single connection object handles multiple
threads, each thread is going to make independent calls. When a
particular tpc_begin() returns and a thread commits its transaction,
it makes more calls. These calls will before any of the other
tpc_begin() calls.
I think the Argus approach would be to use separate handlers for each
thread (not sure Argus had threads), so that a single thread could
rely on ordering guarantees.
Multithreaded server
There are lots of issues to work out here.
Delays may not be necessary if the connecftion handler runs in a
different thread than the object the handles the calls.
=== Added File StandaloneZODB/ZEO/zrpc/__init__.py ===
# zrpc is a package with the following modules
# error -- exceptions raised by zrpc
# marshal -- internal, handles basic protocol issues
# connection -- object dispatcher
# client -- manages connection creation to remote server
# server -- manages incoming connections from remote clients
# trigger -- medusa's trigger
=== Added File StandaloneZODB/ZEO/zrpc/client.py ===
import errno
import select
import socket
import sys
import threading
import time
import types
import ThreadedAsync
import zLOG
from ZEO.zrpc.log import log
from ZEO.zrpc.trigger import trigger
from ZEO.zrpc.connection import ManagedConnection
class ConnectionManager:
"""Keeps a connection up over time"""
def __init__(self, addr, client, tmin=1, tmax=180):
self.set_addr(addr)
self.client = client
self.tmin = tmin
self.tmax = tmax
self.connected = 0
self.connection = None
# If _thread is not None, then there is a helper thread
# attempting to connect. _thread is protected by _connect_lock.
self._thread = None
self._connect_lock = threading.Lock()
self.trigger = None
self.thr_async = 0
self.closed = 0
ThreadedAsync.register_loop_callback(self.set_async)
def __repr__(self):
return "<%s for %s>" % (self.__class__.__name__, self.addr)
def set_addr(self, addr):
"Set one or more addresses to use for server."
# For backwards compatibility (and simplicity?) the
# constructor accepts a single address in the addr argument --
# a string for a Unix domain socket or a 2-tuple with a
# hostname and port. It can also accept a list of such addresses.
addr_type = self._guess_type(addr)
if addr_type is not None:
self.addr = [(addr_type, addr)]
else:
self.addr = []
for a in addr:
addr_type = self._guess_type(a)
if addr_type is None:
raise ValueError, "unknown address in list: %s" % repr(a)
self.addr.append((addr_type, a))
def _guess_type(self, addr):
if isinstance(addr, types.StringType):
return socket.AF_UNIX
if (len(addr) == 2
and isinstance(addr[0], types.StringType)
and isinstance(addr[1], types.IntType)):
return socket.AF_INET
# not anything I know about
return None
def close(self):
"""Prevent ConnectionManager from opening new connections"""
self.closed = 1
self._connect_lock.acquire()
try:
if self._thread is not None:
# XXX race on _thread
self._thread.stop()
self._thread.join()
finally:
self._connect_lock.release()
if self.connection:
self.connection.close()
# XXX get rid of this?
def register_client(self, client):
self.client = client
def set_async(self, map):
# XXX need each connection started with async==0 to have a callback
self.trigger = trigger()
self.thr_async = 1 # XXX needs to be set on the Connection
def attempt_connect(self):
# XXX will a single attempt take too long?
self.connect()
try:
event = self._thread.one_attempt
except AttributeError:
pass
else:
event.wait()
return self.connected
def connect(self, sync=0):
if self.connected == 1:
return
self._connect_lock.acquire()
try:
if self._thread is None:
log("starting thread to connect to server")
self._thread = ConnectThread(self, self.client, self.addr,
self.tmin, self.tmax)
self._thread.start()
if sync:
try:
self._thread.join()
except AttributeError:
# probably means the thread exited quickly
pass
finally:
self._connect_lock.release()
def connect_done(self, c):
log("connect_done()")
self.connected = 1
self.connection = c
self._thread = None
def notify_closed(self, conn):
self.connected = 0
self.connection = None
self.client.notifyDisconnected()
if not self.closed:
self.connect()
class Connected(Exception):
# helper for non-local exit
def __init__(self, sock):
self.sock = sock
class ConnectThread(threading.Thread):
__super_init = threading.Thread.__init__
def __init__(self, mgr, client, addr, tmin, tmax):
self.__super_init(name="Connect(%s)" % addr)
self.mgr = mgr
self.client = client
self.addr = addr
self.tmin = tmin
self.tmax = tmax
self.stopped = 0
self.one_attempt = threading.Event()
def stop(self):
self.stopped = 1
def run(self):
delay = self.tmin
while not (self.stopped or self.attempt_connects()):
if not self.one_attempt.isSet():
self.one_attempt.set()
time.sleep(delay)
delay *= 2
if delay > self.tmax:
delay = self.tmax
log("thread exiting: %s" % self.getName())
def attempt_connects(self):
"Return true if any connect attempt succeeds."
sockets = {}
log("attempting connection on %d sockets" % len(self.addr))
try:
for domain, addr in self.addr:
if __debug__:
log("attempt connection to %s" % repr(addr),
level=zLOG.DEBUG)
s = socket.socket(domain, socket.SOCK_STREAM)
s.setblocking(0)
# XXX can still block for a while if addr requires DNS
e = self.connect(s, addr)
if e is not None:
sockets[s] = addr
# next wait until they actually connect
while sockets:
if self.stopped:
for s in sockets.keys():
s.close()
return 0
try:
r, w, x = select.select([], sockets.keys(), [], 1.0)
except select.error:
continue
for s in w:
e = self.connect(s, sockets[s])
if e is None:
del sockets[s]
except Connected, container:
s = container.sock
del sockets[s]
# close all the other sockets
for s in sockets.keys():
s.close()
return 1
return 0
def connect(self, s, addr):
"""Call s.connect_ex(addr) and return true if loop should continue.
We have to handle several possible return values from
connect_ex(). If the socket is connected and the initial ZEO
setup works, we're done. Report success by raising an
exception. Yes, the is odd, but we need to bail out of the
select() loop in the caller and an exception is a principled
way to do the abort.
If the socket sonnects and the initial ZEO setup fails or the
connect_ex() returns an error, we close the socket and ignore it.
If connect_ex() returns EINPROGRESS, we need to try again later.
"""
e = s.connect_ex(addr)
if e == errno.EINPROGRESS:
return 1
elif e == 0:
c = self.test_connection(s, addr)
log("connected to %s" % repr(addr), level=zLOG.DEBUG)
if c:
raise Connected(s)
else:
if __debug__:
log("error connecting to %s: %s" % (addr, errno.errorcode[e]),
level=zLOG.DEBUG)
s.close()
def test_connection(self, s, addr):
c = ManagedConnection(s, addr, self.client, self.mgr)
try:
self.client.notifyConnected(c)
except:
log("error connecting to server: %s" % str(addr),
level=zLOG.ERROR, error=sys.exc_info())
c.close()
return 0
self.mgr.connect_done(c)
return 1
=== Added File StandaloneZODB/ZEO/zrpc/connection.py ===
import asyncore
import sys
import threading
import types
import ThreadedAsync
from ZEO import smac # XXX put smac in zrpc?
from ZEO.zrpc.error import ZRPCError, DisconnectedError, DecodingError
from ZEO.zrpc.log import log
from ZEO.zrpc.marshal import Marshaller
from ZEO.zrpc.trigger import trigger
import zLOG
from ZODB import POSException
REPLY = ".reply" # message name used for replies
ASYNC = 1
# XXX get rid of this class and use hasattr()
class Handler:
"""Base class used to handle RPC caller discovery"""
def set_caller(self, addr):
self.__caller = addr
def get_caller(self):
return self.__caller
def clear_caller(self):
self.__caller = None
class Delay:
"""Used to delay response to client for synchronous calls
When a synchronous call is made and the original handler returns
without handling the call, it returns a Delay object that prevents
the mainloop from sending a response.
"""
def set_sender(self, msgid, send_reply):
self.msgid = msgid
self.send_reply = send_reply
def reply(self, obj):
self.send_reply(self.msgid, obj)
class Connection(smac.SizedMessageAsyncConnection):
"""Dispatcher for RPC on object
The connection supports synchronous calls, which expect a return,
and asynchronous calls that do not.
It uses the Marshaller class to handle encoding and decoding of
method calls are arguments.
A Connection is designed for use in a multithreaded application,
where a synchronous call must block until a response is ready.
The current design only allows a single synchronous call to be
outstanding.
"""
__super_init = smac.SizedMessageAsyncConnection.__init__
__super_close = smac.SizedMessageAsyncConnection.close
__super_writable = smac.SizedMessageAsyncConnection.writable
def __init__(self, sock, addr, obj=None):
self.obj = obj
self.marshal = Marshaller()
self.closed = 0
self.msgid = 0
self.__super_init(sock, addr)
# A Connection either uses asyncore directly or relies on an
# asyncore mainloop running in a separate thread. If
# thr_async is true, then the mainloop is running in a
# separate thread. If thr_async is true, then the asyncore
# trigger (self.trigger) is used to notify that thread of
# activity on the current thread.
self.thr_async = 0
self.trigger = None
self._prepare_async()
self._map = {self._fileno: self}
self.__call_lock = threading.Lock()
# The reply lock is used to block when a synchronous call is
# waiting for a response
self.__reply_lock = threading.Lock()
self.__reply_lock.acquire()
# If the object implements the Handler interface (XXX checked
# by isinstance), it wants to know who the caller is.
if isinstance(obj, Handler):
self.set_caller = 1
else:
self.set_caller = 0
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.addr)
def close(self):
caller = sys._getframe(1).f_code.co_name
log("close() caller=%s" % caller)
if self.closed:
return
self.closed = 1
self.close_trigger()
self.__super_close()
def close_trigger(self):
if self.trigger is not None:
self.trigger.close()
def register_object(self, obj):
"""Register obj as the true object to invoke methods on"""
self.obj = obj
def message_input(self, message):
"""Decoding an incoming message and dispatch it"""
# XXX Not sure what to do with errors that reach this level.
# Need to catch ZRPCErrors in handle_reply() and
# handle_request() so that they get back to the client.
try:
msgid, flags, name, args = self.marshal.decode(message)
except DecodingError, msg:
return self.return_error(None, None, DecodingError, msg)
if __debug__:
log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
repr(args)[:40]),
level=zLOG.DEBUG)
if name == REPLY:
self.handle_reply(msgid, flags, args)
else:
self.handle_request(msgid, flags, name, args)
def handle_reply(self, msgid, flags, args):
if __debug__:
log("recv reply: %s, %s, %s" % (msgid, flags, str(args)[:40]),
level=zLOG.DEBUG)
self.__reply = msgid, flags, args
self.__reply_lock.release() # will fail if lock is unlocked
def handle_request(self, msgid, flags, name, args):
if __debug__:
log("call %s%s on %s" % (name, repr(args)[:40], repr(self.obj)),
zLOG.DEBUG)
if not self.check_method(name):
raise ZRPCError("Invalid method name: %s on %s" % (name,
`self.obj`))
meth = getattr(self.obj, name)
try:
if self.set_caller:
self.obj.set_caller(self)
try:
ret = meth(*args)
finally:
self.obj.clear_caller()
else:
ret = meth(*args)
except (POSException.UndoError,
POSException.VersionCommitError), msg:
error = sys.exc_info()[:2]
log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
return self.return_error(msgid, flags, error[0], error[1])
except Exception, msg:
error = sys.exc_info()[:2]
log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
return self.return_error(msgid, flags, error[0], error[1])
if flags & ASYNC:
if ret is not None:
log("async method %s returned value %s" % (name, repr(ret)),
zLOG.ERROR)
raise ZRPCError("async method returned value")
else:
if __debug__:
log("%s return %s" % (name, repr(ret)[:40]), zLOG.DEBUG)
if isinstance(ret, Delay):
ret.set_sender(msgid, self.send_reply)
else:
self.send_reply(msgid, ret)
def handle_error(self):
self.log_error()
self.close()
def log_error(self, msg="No error message supplied"):
error = sys.exc_info()
log(msg, zLOG.ERROR, error=error)
del error
def check_method(self, name):
# XXX minimal security check should go here: Is name exported?
return hasattr(self.obj, name)
def send_reply(self, msgid, ret):
msg = self.marshal.encode(msgid, 0, REPLY, ret)
self.message_output(msg)
def return_error(self, msgid, flags, err_type, err_value):
if flags is None:
self.log_error("Exception raised during decoding")
return
if flags & ASYNC:
self.log_error("Asynchronous call raised exception: %s" % self)
return
if type(err_value) is not types.InstanceType:
err_value = err_type, err_value
try:
msg = self.marshal.encode(msgid, 0, REPLY, (err_type, err_value))
except self.marshal.errors:
err = ZRPCError("Couldn't pickle error %s" % `err_value`)
msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
self.message_output(msg)
self._do_io()
# The next two methods are used by clients to invoke methods on
# remote objects
# XXX Should revise design to allow multiple outstanding
# synchronous calls
def call(self, method, *args):
self.__call_lock.acquire()
try:
return self._call(method, args)
finally:
self.__call_lock.release()
def _call(self, method, args):
if self.closed:
raise DisconnectedError("This action is temporarily unavailable")
msgid = self.msgid
self.msgid = self.msgid + 1
if __debug__:
log("send msg: %d, 0, %s, ..." % (msgid, method))
self.message_output(self.marshal.encode(msgid, 0, method, args))
self.__reply = None
# lock is currently held
self._do_io(wait=1)
# lock is held again...
r_msgid, r_flags, r_args = self.__reply
self.__reply_lock.acquire()
assert r_msgid == msgid, "%s != %s: %s" % (r_msgid, msgid, r_args)
if type(r_args) == types.TupleType \
and type(r_args[0]) == types.ClassType \
and issubclass(r_args[0], Exception):
raise r_args[1] # error raised by server
return r_args
def callAsync(self, method, *args):
self.__call_lock.acquire()
try:
self._callAsync(method, args)
finally:
self.__call_lock.release()
def _callAsync(self, method, args):
if self.closed:
raise DisconnectedError("This action is temporarily unavailable")
msgid = self.msgid
self.msgid += 1
if __debug__:
log("send msg: %d, %d, %s, ..." % (msgid, ASYNC, method))
self.message_output(self.marshal.encode(msgid, ASYNC, method, args))
self._do_io()
# handle IO, possibly in async mode
def _prepare_async(self):
self.thr_async = 0
ThreadedAsync.register_loop_callback(self.set_async)
# XXX If we are not in async mode, this will cause dead
# Connections to be leaked.
def set_async(self, map):
# XXX do we need a lock around this? I'm not sure there is
# any harm to a race with _do_io().
self.trigger = trigger()
self.thr_async = 1
def is_async(self):
if self.thr_async:
return 1
else:
return 0
def _do_io(self, wait=0): # XXX need better name
# XXX invariant? lock must be held when calling with wait==1
# otherwise, in non-async mode, there will be no poll
if __debug__:
log("_do_io(wait=%d), async=%d" % (wait, self.is_async()),
level=zLOG.DEBUG)
if self.is_async():
self.trigger.pull_trigger()
if wait:
self.__reply_lock.acquire()
# wait until reply...
self.__reply_lock.release()
else:
if wait:
# do loop only if lock is already acquired
while not self.__reply_lock.acquire(0):
asyncore.poll(10.0, self._map)
if self.closed:
raise DisconnectedError()
self.__reply_lock.release()
else:
asyncore.poll(0.0, self._map)
# XXX it seems that we need to release before returning if
# called with wait==1. perhaps the caller need not acquire
# upon return...
class ServerConnection(Connection):
# XXX this is a hack
def _do_io(self, wait=0):
"""If this is a server, there is no explicit IO to do"""
pass
class ManagedServerConnection(ServerConnection):
"""A connection that notifies its ConnectionManager of closing"""
__super_init = Connection.__init__
__super_close = Connection.close
def __init__(self, sock, addr, obj, mgr):
self.__mgr = mgr
self.__super_init(sock, addr, obj)
def close(self):
self.__super_close()
self.__mgr.close(self)
class ManagedConnection(Connection):
"""A connection that notifies its ConnectionManager of closing.
A managed connection also defers the ThreadedAsync work to its
manager.
"""
__super_init = Connection.__init__
__super_close = Connection.close
def __init__(self, sock, addr, obj, mgr):
self.__mgr = mgr
self.__super_init(sock, addr, obj)
self.check_mgr_async()
def close_trigger(self):
# the manager should actually close the trigger
del self.trigger
def _prepare_async(self):
# Don't do the register_loop_callback that the superclass does
pass
def check_mgr_async(self):
if not self.thr_async and self.__mgr.thr_async:
assert self.__mgr.trigger is not None, \
"manager (%s) has no trigger" % self.__mgr
self.thr_async = 1
self.trigger = self.__mgr.trigger
return 1
return 0
def is_async(self):
if self.thr_async:
return 1
return self.check_mgr_async()
def close(self):
self.__super_close()
self.__mgr.notify_closed(self)
=== Added File StandaloneZODB/ZEO/zrpc/error.py ===
from ZODB import POSException
from ZEO.Exceptions import Disconnected
class ZRPCError(POSException.StorageError):
pass
class DecodingError(ZRPCError):
"""A ZRPC message could not be decoded."""
class DisconnectedError(ZRPCError, Disconnected):
"""The database storage is disconnected from the storage server."""
=== Added File StandaloneZODB/ZEO/zrpc/log.py ===
import os
import zLOG
_label = "zrpc:%s" % os.getpid()
def new_label():
global _label
_label = "zrpc:%s" % os.getpid()
def log(message, level=zLOG.BLATHER, label=None, error=None):
zLOG.LOG(label or _label, level, message, error=error)
=== Added File StandaloneZODB/ZEO/zrpc/marshal.py ===
import cPickle
from cStringIO import StringIO
import types
class Marshaller:
"""Marshal requests and replies to second across network"""
# It's okay to share a single Pickler as long as it's in fast
# mode, which means that it doesn't have a memo.
pickler = cPickle.Pickler()
pickler.fast = 1
pickle = pickler.dump
errors = (cPickle.UnpickleableError,
cPickle.UnpicklingError,
cPickle.PickleError,
cPickle.PicklingError)
def encode(self, msgid, flags, name, args):
"""Returns an encoded message"""
return self.pickle((msgid, flags, name, args), 1)
def decode(self, msg):
"""Decodes msg and returns its parts"""
unpickler = cPickle.Unpickler(StringIO(msg))
unpickler.find_global = find_global
try:
return unpickler.load() # msgid, flags, name, args
except (cPickle.UnpicklingError, IndexError), err_msg:
log("can't decode %s" % repr(msg), level=zLOG.ERROR)
raise DecodingError(msg)
_globals = globals()
_silly = ('__doc__',)
def find_global(module, name):
"""Helper for message unpickler"""
try:
m = __import__(module, _globals, _globals, _silly)
except ImportError, msg:
raise ZRPCError("import error %s: %s" % (module, msg))
try:
r = getattr(m, name)
except AttributeError:
raise ZRPCError("module %s has no global %s" % (module, name))
safe = getattr(r, '__no_side_effects__', 0)
if safe:
return r
# XXX what's a better way to do this? esp w/ 2.1 & 2.2
if type(r) == types.ClassType and issubclass(r, Exception):
return r
raise ZRPCError("Unsafe global: %s.%s" % (module, name))
=== Added File StandaloneZODB/ZEO/zrpc/server.py ===
import asyncore
import socket
import types
from ZEO.zrpc.connection import Connection, Delay
from ZEO.zrpc.log import log
# Export the main asyncore loop
loop = asyncore.loop
class Dispatcher(asyncore.dispatcher):
"""A server that accepts incoming RPC connections"""
__super_init = asyncore.dispatcher.__init__
reuse_addr = 1
def __init__(self, addr, factory=Connection, reuse_addr=None):
self.__super_init()
self.addr = addr
self.factory = factory
self.clients = []
if reuse_addr is not None:
self.reuse_addr = reuse_addr
self._open_socket()
def _open_socket(self):
if type(self.addr) == types.TupleType:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
log("listening on %s" % str(self.addr))
self.bind(self.addr)
self.listen(5)
def writable(self):
return 0
def readable(self):
return 1
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error, msg:
log("accepted failed: %s" % msg)
return
c = self.factory(sock, addr)
log("connect from %s: %s" % (repr(addr), c))
self.clients.append(c)
=== Added File StandaloneZODB/ZEO/zrpc/trigger.py ===
##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
# This module is a simplified version of the select_trigger module
# from Sam Rushing's Medusa server.
import asyncore
#import asynchat
import os
import socket
import string
import thread
if os.name == 'posix':
class trigger (asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def __init__ (self):
r, w = os.pipe()
self.trigger = w
asyncore.file_dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
def __repr__ (self):
return '<select-trigger (pipe) at %x>' % id(self)
def readable (self):
return 1
def writable (self):
return 0
def handle_connect (self):
pass
def pull_trigger (self, thunk=None):
# print 'PULL_TRIGGER: ', len(self.thunks)
if thunk:
try:
self.lock.acquire()
self.thunks.append (thunk)
finally:
self.lock.release()
os.write (self.trigger, 'x')
def handle_read (self):
self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks:
try:
thunk()
except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
self.thunks = []
finally:
self.lock.release()
else:
# win32-safe version
class trigger (asyncore.dispatcher):
address = ('127.9.9.9', 19999)
def __init__ (self):
a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
# set TCP_NODELAY to true to avoid buffering
w.setsockopt(socket.IPPROTO_TCP, 1, 1)
# tricky: get a pair of connected sockets
host='127.0.0.1'
port=19999
while 1:
try:
self.address=(host, port)
a.bind(self.address)
break
except:
if port <= 19950:
raise 'Bind Error', 'Cannot bind trigger!'
port=port - 1
a.listen (1)
w.setblocking (0)
try:
w.connect (self.address)
except:
pass
r, addr = a.accept()
a.close()
w.setblocking (1)
self.trigger = w
asyncore.dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._trigger_connected = 0
def __repr__ (self):
return '<select-trigger (loopback) at %x>' % id(self)
def readable (self):
return 1
def writable (self):
return 0
def handle_connect (self):
pass
def pull_trigger (self, thunk=None):
if thunk:
try:
self.lock.acquire()
self.thunks.append (thunk)
finally:
self.lock.release()
self.trigger.send ('x')
def handle_read (self):
self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks:
try:
thunk()
except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
self.thunks = []
finally:
self.lock.release()