[Zodb-checkins] CVS: Packages/ZEO - zrpc2.py:1.1.2.6
jeremy@digicool.com
jeremy@digicool.com
Wed, 18 Apr 2001 16:57:28 -0400 (EDT)
Update of /cvs-repository/Packages/ZEO
In directory korak:/tmp/cvs-serv8684
Modified Files:
Tag: ZEO-ZRPC-Dev
zrpc2.py
Log Message:
Sundry fixes and updates
Make sure all places were sockets are created check to see if it's
AF_INET or AF_UNIX.
Rename closed() method to notify_closed() and add closed attribute.
Update managed connection to reflect new names args here and in
ClientStorage.
--- Updated File zrpc2.py in package Packages/ZEO --
--- zrpc2.py 2001/03/30 21:19:36 1.1.2.5
+++ zrpc2.py 2001/04/18 20:57:28 1.1.2.6
@@ -10,6 +10,14 @@
method is a string specifying the method to invoke.
For a reply, the method is ".reply".
args is a tuple of the argument to pass to method.
+
+XXX need to specify a version number that describes the protocol.
+allow for future revision.
+
+XXX support multiple outstanding calls
+
+XXX factor out common pattern of deciding what protocol to use based
+on whether address is tuple or string
"""
import asyncore
@@ -50,7 +58,10 @@
loop = asyncore.loop
def connect(addr, client=None):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ if type(addr) == types.TupleType:
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ else:
+ s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(addr)
c = Connection(s, addr, client)
return c
@@ -171,8 +182,8 @@
# waiting for a response
self.__super_init(sock, addr)
self._prepare_async()
- self.__reply_lock = DebugLock()
-## self.__reply_lock = thread.allocate_lock()
+## self.__reply_lock = DebugLock()
+ self.__reply_lock = thread.allocate_lock()
self.__reply_lock.acquire()
if isinstance(obj, Handler):
self.set_caller = 1
@@ -191,6 +202,9 @@
def message_input(self, message):
"""Decoding an incoming message and dispatch it"""
+ # XXX Not sure what to do with errors that reach this level.
+ # Need to catch ZRPCErrors in handle_reply() and
+ # handle_request() so that they get back to the client.
try:
msgid, flags, name, args = self.marshal.decode(message)
except DecodingError, msg:
@@ -227,7 +241,7 @@
except (POSException.UndoError,
POSException.VersionCommitError), msg:
return self.return_error(msgid, flags, sys.exc_info()[0],
- sys.exc_info()[1])
+ sys.exc_info()[1])
except Exception, msg:
return self.return_error(msgid, flags, sys.exc_info()[0],
sys.exc_info()[1])
@@ -244,7 +258,10 @@
def handle_error(self):
t, v, tb = sys.exc_info()
- print t, v
+ if type(v) == types.StringType:
+ print t, repr(v)
+ else:
+ print t, v
traceback.print_tb(tb)
def check_method(self, name):
@@ -405,8 +422,13 @@
self._connect_lock = threading.Lock()
self.trigger = None
self.async = 0
+ self.closed = 0
ThreadedAsync.register_loop_callback(self.set_async)
+ def close(self):
+ """Prevent ConnectionManager from opening new connections"""
+ self.closed = 1
+
def register_object(self, obj):
self.obj = obj
@@ -467,10 +489,11 @@
t = self.tmax
return t
- def closed(self, conn):
+ def notify_closed(self, conn):
self.connected = 0
self.obj.notifyDisconnected(None)
- self.connect()
+ if not self.closed:
+ self.connect()
class ManagedServerConnection(ServerConnection):
"""A connection that notifies its ConnectionManager of closing"""
@@ -483,7 +506,8 @@
def close(self):
self.__super_close()
- self.__mgr.closed(self)
+ log("self.__mgr = %s" % repr(self.__mgr))
+ self.__mgr.close(self)
class ManagedConnection(Connection):
"""A connection that notifies its ConnectionManager of closing.
@@ -518,7 +542,7 @@
def close(self):
self.__super_close()
- self.__mgr.closed(self)
+ self.__mgr.notify_closed(self)
class Dispatcher(asyncore.dispatcher):
"""A server that accepts incoming RPC connections"""
@@ -537,7 +561,10 @@
self._open_socket()
def _open_socket(self):
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ if type(self.addr) == types.TupleType:
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ else:
+ self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind(self.addr)
self.listen(5)