[Zope-Checkins] CVS: Zope/lib/python/ZEO/zrpc - __init__.py:1.4.2.1 client.py:1.20.2.1 connection.py:1.38.4.1 error.py:1.4.4.1 log.py:1.6.4.1 marshal.py:1.11.4.1 server.py:1.5.8.1 smac.py:1.35.2.1 trigger.py:1.8.4.1
Chris McDonough
chrism@zope.com
Tue, 8 Oct 2002 20:41:43 -0400
Update of /cvs-repository/Zope/lib/python/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv15249/ZEO/zrpc
Added Files:
Tag: chrism-install-branch
__init__.py client.py connection.py error.py log.py marshal.py
server.py smac.py trigger.py
Log Message:
Committing ZEO to chrism-install-branch.
=== Added File Zope/lib/python/ZEO/zrpc/__init__.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# zrpc is a package with the following modules
# client -- manages connection creation to remote server
# connection -- object dispatcher
# log -- logging helper
# error -- exceptions raised by zrpc
# marshal -- internal, handles basic protocol issues
# server -- manages incoming connections from remote clients
# smac -- sized message async connections
# trigger -- medusa's trigger
# zrpc is not an advertised subpackage of ZEO; its interfaces are internal
=== Added File Zope/lib/python/ZEO/zrpc/client.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import errno
import select
import socket
import sys
import threading
import time
import types
import ThreadedAsync
import zLOG
from ZODB.POSException import ReadOnlyError
from ZEO.zrpc.log import log
from ZEO.zrpc.trigger import trigger
from ZEO.zrpc.connection import ManagedConnection
class ConnectionManager:
"""Keeps a connection up over time"""
def __init__(self, addrs, client, tmin=1, tmax=180):
self.addrlist = self._parse_addrs(addrs)
self.client = client
self.tmin = tmin
self.tmax = tmax
self.cond = threading.Condition(threading.Lock())
self.connection = None # Protected by self.cond
self.closed = 0
# If thread is not None, then there is a helper thread
# attempting to connect.
self.thread = None # Protected by self.cond
self.trigger = None
self.thr_async = 0
ThreadedAsync.register_loop_callback(self.set_async)
def __repr__(self):
return "<%s for %s>" % (self.__class__.__name__, self.addrlist)
def _parse_addrs(self, addrs):
# Return a list of (addr_type, addr) pairs.
# For backwards compatibility (and simplicity?) the
# constructor accepts a single address in the addrs argument --
# a string for a Unix domain socket or a 2-tuple with a
# hostname and port. It can also accept a list of such addresses.
addr_type = self._guess_type(addrs)
if addr_type is not None:
return [(addr_type, addrs)]
else:
addrlist = []
for addr in addrs:
addr_type = self._guess_type(addr)
if addr_type is None:
raise ValueError, (
"unknown address in list: %s" % repr(addr))
addrlist.append((addr_type, addr))
return addrlist
def _guess_type(self, addr):
if isinstance(addr, types.StringType):
return socket.AF_UNIX
if (len(addr) == 2
and isinstance(addr[0], types.StringType)
and isinstance(addr[1], types.IntType)):
return socket.AF_INET
# not anything I know about
return None
def close(self):
"""Prevent ConnectionManager from opening new connections"""
self.closed = 1
self.cond.acquire()
try:
t = self.thread
self.thread = None
conn = self.connection
finally:
self.cond.release()
if t is not None:
log("CM.close(): stopping and joining thread")
t.stop()
t.join(30)
if t.isAlive():
log("CM.close(): self.thread.join() timed out",
level=zLOG.WARNING)
if conn is not None:
# This will call close_conn() below which clears self.connection
conn.close()
if self.trigger is not None:
self.trigger.close()
self.trigger = None
def set_async(self, map):
# This is the callback registered with ThreadedAsync. The
# callback might be called multiple times, so it shouldn't
# create a trigger every time and should never do anything
# after it's closed.
# It may be that the only case where it is called multiple
# times is in the test suite, where ThreadedAsync's loop can
# be started in a child process after a fork. Regardless,
# it's good to be defensive.
# XXX need each connection started with async==0 to have a
# callback
log("CM.set_async(%s)" % repr(map))
if not self.closed and self.trigger is None:
log("CM.set_async(): first call")
self.trigger = trigger()
self.thr_async = 1 # XXX needs to be set on the Connection
def attempt_connect(self):
"""Attempt a connection to the server without blocking too long.
There isn't a crisp definition for too long. When a
ClientStorage is created, it attempts to connect to the
server. If the server isn't immediately available, it can
operate from the cache. This method will start the background
connection thread and wait a little while to see if it
finishes quickly.
"""
# XXX Will a single attempt take too long?
# XXX Answer: it depends -- normally, you'll connect or get a
# connection refused error very quickly. Packet-eating
# firewalls and other mishaps may cause the connect to take a
# long time to time out though. It's also possible that you
# connect quickly to a slow server, and the attempt includes
# at least one roundtrip to the server (the register() call).
# But that's as fast as you can expect it to be.
self.connect()
self.cond.acquire()
try:
t = self.thread
conn = self.connection
finally:
self.cond.release()
if t is not None and conn is None:
event = t.one_attempt
event.wait()
self.cond.acquire()
try:
conn = self.connection
finally:
self.cond.release()
return conn is not None
def connect(self, sync=0):
self.cond.acquire()
try:
if self.connection is not None:
return
t = self.thread
if t is None:
log("CM.connect(): starting ConnectThread")
self.thread = t = ConnectThread(self, self.client,
self.addrlist,
self.tmin, self.tmax)
t.start()
if sync:
while self.connection is None:
self.cond.wait(30)
if self.connection is None:
log("CM.connect(sync=1): still waiting...")
finally:
self.cond.release()
if sync:
assert self.connection is not None
def connect_done(self, conn, preferred):
# Called by ConnectWrapper.notify_client() after notifying the client
log("CM.connect_done(preferred=%s)" % preferred)
self.cond.acquire()
try:
self.connection = conn
if preferred:
self.thread = None
self.cond.notifyAll() # Wake up connect(sync=1)
finally:
self.cond.release()
def close_conn(self, conn):
# Called by the connection when it is closed
self.cond.acquire()
try:
if conn is not self.connection:
# Closing a non-current connection
log("CM.close_conn() non-current", level=zLOG.BLATHER)
return
log("CM.close_conn()")
self.connection = None
finally:
self.cond.release()
self.client.notifyDisconnected()
if not self.closed:
self.connect()
def is_connected(self):
self.cond.acquire()
try:
return self.connection is not None
finally:
self.cond.release()
# When trying to do a connect on a non-blocking socket, some outcomes
# are expected. Set _CONNECT_IN_PROGRESS to the errno value(s) expected
# when an initial connect can't complete immediately. Set _CONNECT_OK
# to the errno value(s) expected if the connect succeeds *or* if it's
# already connected (our code can attempt redundant connects).
if hasattr(errno, "WSAEWOULDBLOCK"): # Windows
# XXX The official Winsock docs claim that WSAEALREADY should be
# treated as yet another "in progress" indicator, but we've never
# seen this.
_CONNECT_IN_PROGRESS = (errno.WSAEWOULDBLOCK,)
# Win98: WSAEISCONN; Win2K: WSAEINVAL
_CONNECT_OK = (0, errno.WSAEISCONN, errno.WSAEINVAL)
else: # Unix
_CONNECT_IN_PROGRESS = (errno.EINPROGRESS,)
_CONNECT_OK = (0, errno.EISCONN)
class ConnectThread(threading.Thread):
"""Thread that tries to connect to server given one or more addresses.
The thread is passed a ConnectionManager and the manager's client
as arguments. It calls testConnection() on the client when a
socket connects; that should return 1 or 0 indicating whether this
is a preferred or a fallback connection. It may also raise an
exception, in which case the connection is abandoned.
The thread will continue to run, attempting connections, until a
preferred connection is seen and successfully handed over to the
manager and client.
As soon as testConnection() finds a preferred connection, or after
all sockets have been tried and at least one fallback connection
has been seen, notifyConnected(connection) is called on the client
and connect_done() on the manager. If this was a preferred
connection, the thread then exits; otherwise, it keeps trying
until it gets a preferred connection, and then reconnects the
client using that connection.
"""
__super_init = threading.Thread.__init__
# We don't expect clients to call any methods of this Thread other
# than close() and those defined by the Thread API.
def __init__(self, mgr, client, addrlist, tmin, tmax):
self.__super_init(name="Connect(%s)" % addrlist)
self.mgr = mgr
self.client = client
self.addrlist = addrlist
self.tmin = tmin
self.tmax = tmax
self.stopped = 0
self.one_attempt = threading.Event()
# A ConnectThread keeps track of whether it has finished a
# call to try_connecting(). This allows the ConnectionManager
# to make an attempt to connect right away, but not block for
# too long if the server isn't immediately available.
def stop(self):
self.stopped = 1
def run(self):
delay = self.tmin
success = 0
while not self.stopped:
success = self.try_connecting()
if not self.one_attempt.isSet():
self.one_attempt.set()
if success > 0:
break
time.sleep(delay)
delay = min(delay*2, self.tmax)
log("CT: exiting thread: %s" % self.getName())
def try_connecting(self):
"""Try connecting to all self.addrlist addresses.
Return 1 if a preferred connection was found; 0 if no
connection was found; and -1 if a fallback connection was
found.
"""
log("CT: attempting to connect on %d sockets" % len(self.addrlist))
# Create socket wrappers
wrappers = {} # keys are active wrappers
for domain, addr in self.addrlist:
wrap = ConnectWrapper(domain, addr, self.mgr, self.client)
wrap.connect_procedure()
if wrap.state == "notified":
for wrap in wrappers.keys():
wrap.close()
return 1
if wrap.state != "closed":
wrappers[wrap] = wrap
# Next wait until they all actually connect (or fail)
# XXX If a sockets never connects, nor fails, we'd wait forever!
while wrappers:
if self.stopped:
for wrap in wrappers.keys():
wrap.close()
return 0
# Select connecting wrappers
connecting = [wrap
for wrap in wrappers.keys()
if wrap.state == "connecting"]
if not connecting:
break
try:
r, w, x = select.select([], connecting, connecting, 1.0)
except select.error, msg:
log("CT: select failed; msg=%s" % str(msg),
level=zLOG.WARNING) # XXX Is this the right level?
continue
# Exceptable wrappers are in trouble; close these suckers
for wrap in x:
log("CT: closing troubled socket %s" % str(wrap.addr))
del wrappers[wrap]
wrap.close()
# Writable sockets are connected
for wrap in w:
wrap.connect_procedure()
if wrap.state == "notified":
del wrappers[wrap] # Don't close this one
for wrap in wrappers.keys():
wrap.close()
return 1
if wrap.state == "closed":
del wrappers[wrap]
# If we've got wrappers left at this point, they're fallback
# connections. Try notifying them until one succeeds.
for wrap in wrappers.keys():
assert wrap.state == "tested" and wrap.preferred == 0
if self.mgr.is_connected():
wrap.close()
else:
wrap.notify_client()
if wrap.state == "notified":
del wrappers[wrap] # Don't close this one
for wrap in wrappers.keys():
wrap.close()
return -1
assert wrap.state == "closed"
del wrappers[wrap]
# Alas, no luck.
assert not wrappers
return 0
class ConnectWrapper:
"""An object that handles the connection procedure for one socket.
This is a little state machine with states:
closed
opened
connecting
connected
tested
notified
"""
def __init__(self, domain, addr, mgr, client):
"""Store arguments and create non-blocking socket."""
self.domain = domain
self.addr = addr
self.mgr = mgr
self.client = client
# These attributes are part of the interface
self.state = "closed"
self.sock = None
self.conn = None
self.preferred = 0
log("CW: attempt to connect to %s" % repr(addr))
try:
self.sock = socket.socket(domain, socket.SOCK_STREAM)
except socket.error, err:
log("CW: can't create socket, domain=%s: %s" % (domain, err),
level=zLOG.ERROR)
self.close()
return
self.sock.setblocking(0)
self.state = "opened"
def connect_procedure(self):
"""Call sock.connect_ex(addr) and interpret result."""
if self.state in ("opened", "connecting"):
try:
err = self.sock.connect_ex(self.addr)
except socket.error, msg:
log("CW: connect_ex(%r) failed: %s" % (self.addr, msg),
level=zLOG.ERROR)
self.close()
return
log("CW: connect_ex(%s) returned %s" %
(self.addr, errno.errorcode.get(err) or str(err)))
if err in _CONNECT_IN_PROGRESS:
self.state = "connecting"
return
if err not in _CONNECT_OK:
log("CW: error connecting to %s: %s" %
(self.addr, errno.errorcode.get(err) or str(err)),
level=zLOG.WARNING)
self.close()
return
self.state = "connected"
if self.state == "connected":
self.test_connection()
def test_connection(self):
"""Establish and test a connection at the zrpc level.
Call the client's testConnection(), giving the client a chance
to do app-level check of the connection.
"""
self.conn = ManagedConnection(self.sock, self.addr,
self.client, self.mgr)
self.sock = None # The socket is now owned by the connection
try:
self.preferred = self.client.testConnection(self.conn)
self.state = "tested"
except ReadOnlyError:
log("CW: ReadOnlyError in testConnection (%s)" % repr(self.addr))
self.close()
return
except:
log("CW: error in testConnection (%s)" % repr(self.addr),
level=zLOG.ERROR, error=sys.exc_info())
self.close()
return
if self.preferred:
self.notify_client()
def notify_client(self):
"""Call the client's notifyConnected().
If this succeeds, call the manager's connect_done().
If the client is already connected, we assume it's a fallback
connection, and the new connection must be a preferred
connection. The client will close the old connection.
"""
try:
self.client.notifyConnected(self.conn)
except:
log("CW: error in notifyConnected (%s)" % repr(self.addr),
level=zLOG.ERROR, error=sys.exc_info())
self.close()
return
self.state = "notified"
self.mgr.connect_done(self.conn, self.preferred)
def close(self):
"""Close the socket and reset everything."""
self.state = "closed"
self.mgr = self.client = None
self.preferred = 0
if self.conn is not None:
# Closing the ZRPC connection will eventually close the
# socket, somewhere in asyncore.
# XXX Why do we care? --Guido
self.conn.close()
self.conn = None
if self.sock is not None:
self.sock.close()
self.sock = None
def fileno(self):
return self.sock.fileno()
=== Added File Zope/lib/python/ZEO/zrpc/connection.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import errno
import select
import sys
import threading
import types
import ThreadedAsync
from ZEO.zrpc import smac
from ZEO.zrpc.error import ZRPCError, DisconnectedError
from ZEO.zrpc.log import log, short_repr
from ZEO.zrpc.marshal import Marshaller
from ZEO.zrpc.trigger import trigger
import zLOG
from ZODB import POSException
REPLY = ".reply" # message name used for replies
ASYNC = 1
class Delay:
"""Used to delay response to client for synchronous calls
When a synchronous call is made and the original handler returns
without handling the call, it returns a Delay object that prevents
the mainloop from sending a response.
"""
def set_sender(self, msgid, send_reply, return_error):
self.msgid = msgid
self.send_reply = send_reply
self.return_error = return_error
def reply(self, obj):
self.send_reply(self.msgid, obj)
def error(self, exc_info):
log("Error raised in delayed method", zLOG.ERROR, error=exc_info)
self.return_error(self.msgid, 0, *exc_info[:2])
class MTDelay(Delay):
def __init__(self):
self.ready = threading.Event()
def set_sender(self, msgid, send_reply, return_error):
Delay.set_sender(self, msgid, send_reply, return_error)
self.ready.set()
def reply(self, obj):
self.ready.wait()
Delay.reply(self, obj)
def error(self, exc_info):
self.ready.wait()
Delay.error(self, exc_info)
class Connection(smac.SizedMessageAsyncConnection):
"""Dispatcher for RPC on object on both sides of socket.
The connection supports synchronous calls, which expect a return,
and asynchronous calls, which do not.
It uses the Marshaller class to handle encoding and decoding of
method calls and arguments. Marshaller uses pickle to encode
arbitrary Python objects. The code here doesn't ever see the wire
format.
A Connection is designed for use in a multithreaded application,
where a synchronous call must block until a response is ready.
A socket connection between a client and a server allows either
side to invoke methods on the other side. The processes on each
end of the socket use a Connection object to manage communication.
The Connection deals with decoded RPC messages. They are
represented as four-tuples containing: msgid, flags, method name,
and a tuple of method arguments.
The msgid starts at zero and is incremented by one each time a
method call message is sent. Each side of the connection has a
separate msgid state.
When one side of the connection (the client) calls a method, it
sends a message with a new msgid. The other side (the server),
replies with a message that has the same msgid, the string
".reply" (the global variable REPLY) as the method name, and the
actual return value in the args position. Note that each side of
the Connection can initiate a call, in which case it will be the
client for that particular call.
The protocol also supports asynchronous calls. The client does
not wait for a return value for an asynchronous call. The only
defined flag is ASYNC. If a method call message has the ASYNC
flag set, the server will raise an exception.
If a method call raises an Exception, the exception is propagated
back to the client via the REPLY message. The client side will
raise any exception it receives instead of returning the value to
the caller.
"""
__super_init = smac.SizedMessageAsyncConnection.__init__
__super_close = smac.SizedMessageAsyncConnection.close
protocol_version = "Z200"
def __init__(self, sock, addr, obj=None):
self.obj = None
self.marshal = Marshaller()
self.closed = 0
self.msgid = 0
self.__super_init(sock, addr)
# A Connection either uses asyncore directly or relies on an
# asyncore mainloop running in a separate thread. If
# thr_async is true, then the mainloop is running in a
# separate thread. If thr_async is true, then the asyncore
# trigger (self.trigger) is used to notify that thread of
# activity on the current thread.
self.thr_async = 0
self.trigger = None
self._prepare_async()
self._map = {self._fileno: self}
# __msgid_lock guards access to msgid
self.msgid_lock = threading.Lock()
# __replies_cond is used to block when a synchronous call is
# waiting for a response
self.replies_cond = threading.Condition()
self.replies = {}
self.register_object(obj)
self.handshake()
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.addr)
__str__ = __repr__ # Defeat asyncore's dreaded __getattr__
def close(self):
if self.closed:
return
self._map.clear()
self.closed = 1
self.close_trigger()
self.__super_close()
def close_trigger(self):
# overridden by ManagedConnection
if self.trigger is not None:
self.trigger.close()
def register_object(self, obj):
"""Register obj as the true object to invoke methods on"""
self.obj = obj
def handshake(self):
# When a connection is created the first message sent is a
# 4-byte protocol version. This mechanism should allow the
# protocol to evolve over time, and let servers handle clients
# using multiple versions of the protocol.
# The mechanism replaces the message_input() method for the
# first message received.
# The client sends the protocol version it is using.
self._message_input = self.message_input
self.message_input = self.recv_handshake
self.message_output(self.protocol_version)
def recv_handshake(self, message):
if message == self.protocol_version:
self.message_input = self._message_input
else:
log("recv_handshake: bad handshake %s" % short_repr(message),
level=zLOG.ERROR)
# otherwise do something else...
def message_input(self, message):
"""Decoding an incoming message and dispatch it"""
# If something goes wrong during decoding, the marshaller
# will raise an exception. The exception will ultimately
# result in asycnore calling handle_error(), which will
# close the connection.
msgid, flags, name, args = self.marshal.decode(message)
if __debug__:
log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
short_repr(args)),
level=zLOG.TRACE)
if name == REPLY:
self.handle_reply(msgid, flags, args)
else:
self.handle_request(msgid, flags, name, args)
def handle_reply(self, msgid, flags, args):
if __debug__:
log("recv reply: %s, %s, %s" % (msgid, flags, short_repr(args)),
level=zLOG.DEBUG)
self.replies_cond.acquire()
try:
self.replies[msgid] = flags, args
self.replies_cond.notifyAll()
finally:
self.replies_cond.release()
def handle_request(self, msgid, flags, name, args):
if not self.check_method(name):
msg = "Invalid method name: %s on %s" % (name, repr(self.obj))
raise ZRPCError(msg)
if __debug__:
log("calling %s%s" % (name, short_repr(args)), level=zLOG.BLATHER)
meth = getattr(self.obj, name)
try:
ret = meth(*args)
except (SystemExit, KeyboardInterrupt):
raise
except Exception, msg:
error = sys.exc_info()
log("%s() raised exception: %s" % (name, msg), zLOG.INFO,
error=error)
error = error[:2]
return self.return_error(msgid, flags, *error)
if flags & ASYNC:
if ret is not None:
raise ZRPCError("async method %s returned value %s" %
(name, short_repr(ret)))
else:
if __debug__:
log("%s returns %s" % (name, short_repr(ret)), zLOG.DEBUG)
if isinstance(ret, Delay):
ret.set_sender(msgid, self.send_reply, self.return_error)
else:
self.send_reply(msgid, ret)
def handle_error(self):
if sys.exc_info()[0] == SystemExit:
raise sys.exc_info()
self.log_error("Error caught in asyncore")
self.close()
def log_error(self, msg="No error message supplied"):
log(msg, zLOG.ERROR, error=sys.exc_info())
def check_method(self, name):
# XXX Is this sufficient "security" for now?
if name.startswith('_'):
return None
return hasattr(self.obj, name)
def send_reply(self, msgid, ret):
try:
msg = self.marshal.encode(msgid, 0, REPLY, ret)
except self.marshal.errors:
try:
r = short_repr(ret)
except:
r = "<unreprable>"
err = ZRPCError("Couldn't pickle return %.100s" % r)
msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
self.message_output(msg)
self.poll()
def return_error(self, msgid, flags, err_type, err_value):
if flags & ASYNC:
self.log_error("Asynchronous call raised exception: %s" % self)
return
if type(err_value) is not types.InstanceType:
err_value = err_type, err_value
try:
msg = self.marshal.encode(msgid, 0, REPLY, (err_type, err_value))
except self.marshal.errors:
try:
r = short_repr(err_value)
except:
r = "<unreprable>"
err = ZRPCError("Couldn't pickle error %.100s" % r)
msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
self.message_output(msg)
self.poll()
# The next two public methods (call and callAsync) are used by
# clients to invoke methods on remote objects
def send_call(self, method, args, flags):
# send a message and return its msgid
self.msgid_lock.acquire()
try:
msgid = self.msgid
self.msgid = self.msgid + 1
finally:
self.msgid_lock.release()
if __debug__:
log("send msg: %d, %d, %s, ..." % (msgid, flags, method),
zLOG.TRACE)
buf = self.marshal.encode(msgid, flags, method, args)
self.message_output(buf)
return msgid
def call(self, method, *args):
if self.closed:
raise DisconnectedError()
msgid = self.send_call(method, args, 0)
r_flags, r_args = self.wait(msgid)
if (isinstance(r_args, types.TupleType)
and type(r_args[0]) == types.ClassType
and issubclass(r_args[0], Exception)):
inst = r_args[1]
raise inst # error raised by server
else:
return r_args
def callAsync(self, method, *args):
if self.closed:
raise DisconnectedError()
self.send_call(method, args, ASYNC)
self.poll()
# handle IO, possibly in async mode
def _prepare_async(self):
self.thr_async = 0
ThreadedAsync.register_loop_callback(self.set_async)
# XXX If we are not in async mode, this will cause dead
# Connections to be leaked.
def set_async(self, map):
self.trigger = trigger()
self.thr_async = 1
def is_async(self):
# overridden for ManagedConnection
if self.thr_async:
return 1
else:
return 0
def wait(self, msgid):
"""Invoke asyncore mainloop and wait for reply."""
if __debug__:
log("wait(%d), async=%d" % (msgid, self.is_async()),
level=zLOG.TRACE)
if self.is_async():
self.trigger.pull_trigger()
# Delay used when we call asyncore.poll() directly.
# Start with a 1 msec delay, double until 1 sec.
delay = 0.001
self.replies_cond.acquire()
try:
while 1:
if self.closed:
raise DisconnectedError()
reply = self.replies.get(msgid)
if reply is not None:
del self.replies[msgid]
if __debug__:
log("wait(%d): reply=%s" % (msgid, short_repr(reply)),
level=zLOG.DEBUG)
return reply
if self.is_async():
self.replies_cond.wait(10.0)
else:
self.replies_cond.release()
try:
try:
if __debug__:
log("wait(%d): asyncore.poll(%s)" %
(msgid, delay), level=zLOG.TRACE)
asyncore.poll(delay, self._map)
if delay < 1.0:
delay += delay
except select.error, err:
log("Closing. asyncore.poll() raised %s." % err,
level=zLOG.BLATHER)
self.close()
finally:
self.replies_cond.acquire()
finally:
self.replies_cond.release()
def poll(self):
"""Invoke asyncore mainloop to get pending message out."""
if __debug__:
log("poll(), async=%d" % self.is_async(), level=zLOG.TRACE)
if self.is_async():
self.trigger.pull_trigger()
else:
asyncore.poll(0.0, self._map)
def pending(self):
"""Invoke mainloop until any pending messages are handled."""
if __debug__:
log("pending(), async=%d" % self.is_async(), level=zLOG.TRACE)
if self.is_async():
return
# Inline the asyncore poll() function to know whether any input
# was actually read. Repeat until no input is ready.
# XXX This only does reads.
r_in = [self._fileno]
w_in = []
x_in = []
while 1:
try:
r, w, x = select.select(r_in, w_in, x_in, 0)
except select.error, err:
if err[0] == errno.EINTR:
continue
else:
raise
if not r:
break
try:
self.handle_read_event()
except asyncore.ExitNow:
raise
except:
self.handle_error()
class ManagedServerConnection(Connection):
"""Server-side Connection subclass."""
__super_init = Connection.__init__
__super_close = Connection.close
def __init__(self, sock, addr, obj, mgr):
self.mgr = mgr
self.__super_init(sock, addr, obj)
self.obj.notifyConnected(self)
def close(self):
self.obj.notifyDisconnected()
self.mgr.close_conn(self)
self.__super_close()
class ManagedConnection(Connection):
"""Client-side Connection subclass."""
__super_init = Connection.__init__
__super_close = Connection.close
def __init__(self, sock, addr, obj, mgr):
self.mgr = mgr
self.__super_init(sock, addr, obj)
self.check_mgr_async()
# Defer the ThreadedAsync work to the manager.
def close_trigger(self):
# the manager should actually close the trigger
del self.trigger
def set_async(self, map):
pass
def _prepare_async(self):
# Don't do the register_loop_callback that the superclass does
pass
def check_mgr_async(self):
if not self.thr_async and self.mgr.thr_async:
assert self.mgr.trigger is not None, \
"manager (%s) has no trigger" % self.mgr
self.thr_async = 1
self.trigger = self.mgr.trigger
return 1
return 0
def is_async(self):
# XXX could the check_mgr_async() be avoided on each test?
if self.thr_async:
return 1
return self.check_mgr_async()
def close(self):
self.mgr.close_conn(self)
self.__super_close()
=== Added File Zope/lib/python/ZEO/zrpc/error.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from ZODB import POSException
from ZEO.Exceptions import Disconnected
class ZRPCError(POSException.StorageError):
pass
class DisconnectedError(ZRPCError, Disconnected):
"""The database storage is disconnected from the storage server."""
=== Added File Zope/lib/python/ZEO/zrpc/log.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
import types
import zLOG
import threading
LOG_THREAD_ID = 0 # Set this to 1 during heavy debugging
_label = "zrpc:%s" % os.getpid()
def new_label():
global _label
_label = "zrpc:%s" % os.getpid()
def log(message, level=zLOG.BLATHER, label=None, error=None):
label = label or _label
if LOG_THREAD_ID:
label = "%s:%s" % (label, threading.currentThread().getName())
zLOG.LOG(label, level, message, error=error)
REPR_LIMIT = 40
def short_repr(obj):
"Return an object repr limited to REPR_LIMIT bytes."
# Some of the objects being repr'd are large strings. It's wastes
# a lot of memory to repr them and then truncate, so special case
# them in this function.
# Also handle short repr of a tuple containing a long string.
# This strategy works well for arguments to StorageServer methods.
# The oid is usually first and will get included in its entirety.
# The pickle is near the beginning, too, and you can often fit the
# module name in the pickle.
if isinstance(obj, types.StringType):
if len(obj) > REPR_LIMIT:
r = repr(obj[:REPR_LIMIT])
else:
r = repr(obj)
if len(r) > REPR_LIMIT:
r = r[:REPR_LIMIT-4] + '...' + r[-1]
return r
elif isinstance(obj, types.TupleType):
elts = []
size = 0
for elt in obj:
r = repr(elt)
elts.append(r)
size += len(r)
if size > REPR_LIMIT:
break
r = "(%s)" % (", ".join(elts))
else:
r = repr(obj)
if len(r) > REPR_LIMIT:
return r[:REPR_LIMIT] + '...'
else:
return r
=== Added File Zope/lib/python/ZEO/zrpc/marshal.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import cPickle
from cStringIO import StringIO
import types
import zLOG
from ZEO.zrpc.error import ZRPCError
from ZEO.zrpc.log import log, short_repr
class Marshaller:
"""Marshal requests and replies to second across network"""
def encode(self, msgid, flags, name, args):
"""Returns an encoded message"""
# (We used to have a global pickler, but that's not thread-safe. :-( )
pickler = cPickle.Pickler()
pickler.fast = 1
return pickler.dump((msgid, flags, name, args), 1)
def decode(self, msg):
"""Decodes msg and returns its parts"""
unpickler = cPickle.Unpickler(StringIO(msg))
unpickler.find_global = find_global
try:
return unpickler.load() # msgid, flags, name, args
except:
log("can't decode message: %s" % short_repr(msg), level=zLOG.ERROR)
raise
_globals = globals()
_silly = ('__doc__',)
def find_global(module, name):
"""Helper for message unpickler"""
try:
m = __import__(module, _globals, _globals, _silly)
except ImportError, msg:
raise ZRPCError("import error %s: %s" % (module, msg))
try:
r = getattr(m, name)
except AttributeError:
raise ZRPCError("module %s has no global %s" % (module, name))
safe = getattr(r, '__no_side_effects__', 0)
if safe:
return r
# XXX what's a better way to do this? esp w/ 2.1 & 2.2
if type(r) == types.ClassType and issubclass(r, Exception):
return r
raise ZRPCError("Unsafe global: %s.%s" % (module, name))
=== Added File Zope/lib/python/ZEO/zrpc/server.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import socket
import types
from ZEO.zrpc.connection import Connection, Delay
from ZEO.zrpc.log import log
# Export the main asyncore loop
loop = asyncore.loop
class Dispatcher(asyncore.dispatcher):
"""A server that accepts incoming RPC connections"""
__super_init = asyncore.dispatcher.__init__
reuse_addr = 1
def __init__(self, addr, factory=Connection, reuse_addr=None):
self.__super_init()
self.addr = addr
self.factory = factory
self.clients = []
if reuse_addr is not None:
self.reuse_addr = reuse_addr
self._open_socket()
def _open_socket(self):
if type(self.addr) == types.TupleType:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
log("listening on %s" % str(self.addr))
self.bind(self.addr)
self.listen(5)
def writable(self):
return 0
def readable(self):
return 1
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error, msg:
log("accepted failed: %s" % msg)
return
c = self.factory(sock, addr)
log("connect from %s: %s" % (repr(addr), c))
self.clients.append(c)
=== Added File Zope/lib/python/ZEO/zrpc/smac.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Sized Message Async Connections."""
import asyncore, struct
import threading
from ZEO.Exceptions import Disconnected
import zLOG
from types import StringType
from ZEO.zrpc.log import log, short_repr
import socket, errno
# Use the dictionary to make sure we get the minimum number of errno
# entries. We expect that EWOULDBLOCK == EAGAIN on most systems --
# or that only one is actually used.
tmp_dict = {errno.EWOULDBLOCK: 0,
errno.EAGAIN: 0,
errno.EINTR: 0,
}
expected_socket_read_errors = tuple(tmp_dict.keys())
tmp_dict = {errno.EAGAIN: 0,
errno.EWOULDBLOCK: 0,
errno.ENOBUFS: 0,
errno.EINTR: 0,
}
expected_socket_write_errors = tuple(tmp_dict.keys())
del tmp_dict
# We chose 60000 as the socket limit by looking at the largest strings
# that we could pass to send() without blocking.
SEND_SIZE = 60000
class SizedMessageAsyncConnection(asyncore.dispatcher):
__super_init = asyncore.dispatcher.__init__
__super_close = asyncore.dispatcher.close
__closed = 1 # Marker indicating that we're closed
socket = None # to outwit Sam's getattr
def __init__(self, sock, addr, map=None, debug=None):
self.addr = addr
if debug is not None:
self._debug = debug
elif not hasattr(self, '_debug'):
self._debug = __debug__
# __input_lock protects __inp, __input_len, __state, __msg_size
self.__input_lock = threading.Lock()
self.__inp = None # None, a single String, or a list
self.__input_len = 0
# Instance variables __state and __msg_size work together:
# when __state == 0:
# __msg_size == 4, and the next thing read is a message size;
# when __state == 1:
# __msg_size is variable, and the next thing read is a message.
# The next thing read is always of length __msg_size.
# The state alternates between 0 and 1.
self.__state = 0
self.__msg_size = 4
self.__output_lock = threading.Lock() # Protects __output
self.__output = []
self.__closed = 0
self.__super_init(sock, map)
# XXX avoid expensive getattr calls? Can't remember exactly what
# this comment was supposed to mean, but it has something to do
# with the way asyncore uses getattr and uses if sock:
def __nonzero__(self):
return 1
def handle_read(self):
self.__input_lock.acquire()
try:
# Use a single __inp buffer and integer indexes to make this fast.
try:
d = self.recv(8192)
except socket.error, err:
if err[0] in expected_socket_read_errors:
return
raise
if not d:
return
input_len = self.__input_len + len(d)
msg_size = self.__msg_size
state = self.__state
inp = self.__inp
if msg_size > input_len:
if inp is None:
self.__inp = d
elif type(self.__inp) is StringType:
self.__inp = [self.__inp, d]
else:
self.__inp.append(d)
self.__input_len = input_len
return # keep waiting for more input
# load all previous input and d into single string inp
if isinstance(inp, StringType):
inp = inp + d
elif inp is None:
inp = d
else:
inp.append(d)
inp = "".join(inp)
offset = 0
while (offset + msg_size) <= input_len:
msg = inp[offset:offset + msg_size]
offset = offset + msg_size
if not state:
# waiting for message
msg_size = struct.unpack(">i", msg)[0]
state = 1
else:
msg_size = 4
state = 0
# XXX We call message_input() with __input_lock
# held!!! And message_input() may end up calling
# message_output(), which has its own lock. But
# message_output() cannot call message_input(), so
# the locking order is always consistent, which
# prevents deadlock. Also, message_input() may
# take a long time, because it can cause an
# incoming call to be handled. During all this
# time, the __input_lock is held. That's a good
# thing, because it serializes incoming calls.
self.message_input(msg)
self.__state = state
self.__msg_size = msg_size
self.__inp = inp[offset:]
self.__input_len = input_len - offset
finally:
self.__input_lock.release()
def readable(self):
return 1
def writable(self):
if len(self.__output) == 0:
return 0
else:
return 1
def handle_write(self):
self.__output_lock.acquire()
try:
output = self.__output
while output:
# Accumulate output into a single string so that we avoid
# multiple send() calls, but avoid accumulating too much
# data. If we send a very small string and have more data
# to send, we will likely incur delays caused by the
# unfortunate interaction between the Nagle algorithm and
# delayed acks. If we send a very large string, only a
# portion of it will actually be delivered at a time.
l = 0
for i in range(len(output)):
l += len(output[i])
if l > SEND_SIZE:
break
i += 1
# It is very unlikely that i will be 1.
v = "".join(output[:i])
del output[:i]
try:
n = self.send(v)
except socket.error, err:
if err[0] in expected_socket_write_errors:
break # we couldn't write anything
raise
if n < len(v):
output.insert(0, v[n:])
break # we can't write any more
finally:
self.__output_lock.release()
def handle_close(self):
self.close()
def message_output(self, message):
if __debug__:
if self._debug:
log('message_output %d bytes: %s' %
(len(message), short_repr(message)),
level=zLOG.TRACE)
if self.__closed:
raise Disconnected, (
"This action is temporarily unavailable."
"<p>"
)
self.__output_lock.acquire()
try:
# do two separate appends to avoid copying the message string
self.__output.append(struct.pack(">i", len(message)))
if len(message) <= SEND_SIZE:
self.__output.append(message)
else:
for i in range(0, len(message), SEND_SIZE):
self.__output.append(message[i:i+SEND_SIZE])
finally:
self.__output_lock.release()
def close(self):
if not self.__closed:
self.__closed = 1
self.__super_close()
=== Added File Zope/lib/python/ZEO/zrpc/trigger.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import os
import socket
import string
import thread
if os.name == 'posix':
class trigger(asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def __init__(self):
r, w = self._fds = os.pipe()
self.trigger = w
asyncore.file_dispatcher.__init__(self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._closed = 0
# Override the asyncore close() method, because it seems that
# it would only close the r file descriptor and not w. The
# constructor calls file_dispactcher.__init__ and passes r,
# which would get stored in a file_wrapper and get closed by
# the default close. But that would leave w open...
def close(self):
if not self._closed:
self._closed = 1
self.del_channel()
for fd in self._fds:
os.close(fd)
def __repr__(self):
return '<select-trigger (pipe) at %x>' % id(self)
def readable(self):
return 1
def writable(self):
return 0
def handle_connect(self):
pass
def pull_trigger(self, thunk=None):
if thunk:
self.lock.acquire()
try:
self.thunks.append(thunk)
finally:
self.lock.release()
os.write(self.trigger, 'x')
def handle_read(self):
self.recv(8192)
self.lock.acquire()
try:
for thunk in self.thunks:
try:
thunk()
except:
nil, t, v, tbinfo = asyncore.compact_traceback()
print ('exception in trigger thunk:'
' (%s:%s %s)' % (t, v, tbinfo))
self.thunks = []
finally:
self.lock.release()
else:
# XXX Should define a base class that has the common methods and
# then put the platform-specific in a subclass named trigger.
# win32-safe version
HOST = '127.0.0.1'
MINPORT = 19950
NPORTS = 50
class trigger(asyncore.dispatcher):
portoffset = 0
def __init__(self):
a = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
w = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# set TCP_NODELAY to true to avoid buffering
w.setsockopt(socket.IPPROTO_TCP, 1, 1)
# tricky: get a pair of connected sockets
for i in range(NPORTS):
trigger.portoffset = (trigger.portoffset + 1) % NPORTS
port = MINPORT + trigger.portoffset
address = (HOST, port)
try:
a.bind(address)
except socket.error:
continue
else:
break
else:
raise RuntimeError, 'Cannot bind trigger!'
a.listen(1)
w.setblocking(0)
try:
w.connect(address)
except:
pass
r, addr = a.accept()
a.close()
w.setblocking(1)
self.trigger = w
asyncore.dispatcher.__init__(self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._trigger_connected = 0
def __repr__(self):
return '<select-trigger (loopback) at %x>' % id(self)
def readable(self):
return 1
def writable(self):
return 0
def handle_connect(self):
pass
def pull_trigger(self, thunk=None):
if thunk:
self.lock.acquire()
try:
self.thunks.append(thunk)
finally:
self.lock.release()
self.trigger.send('x')
def handle_read(self):
self.recv(8192)
self.lock.acquire()
try:
for thunk in self.thunks:
try:
thunk()
except:
nil, t, v, tbinfo = asyncore.compact_traceback()
print ('exception in trigger thunk:'
' (%s:%s %s)' % (t, v, tbinfo))
self.thunks = []
finally:
self.lock.release()