[Zodb-checkins]
SVN: ZODB/branches/jim-async-client/src/ZEO/zrpc/connection.py
Inlined poll function in preparation for adding a heartbeat.
Jim Fulton
jim at zope.com
Mon Jul 17 10:50:27 EDT 2006
Log message for revision 69153:
Inlined poll function in preparation for adding a heartbeat.
Changed:
U ZODB/branches/jim-async-client/src/ZEO/zrpc/connection.py
-=-
Modified: ZODB/branches/jim-async-client/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/jim-async-client/src/ZEO/zrpc/connection.py 2006-07-17 14:22:33 UTC (rev 69152)
+++ ZODB/branches/jim-async-client/src/ZEO/zrpc/connection.py 2006-07-17 14:50:26 UTC (rev 69153)
@@ -19,6 +19,8 @@
import types
import logging
+import traceback, time
+
import ThreadedAsync
from ZEO.zrpc import smac
from ZEO.zrpc.error import ZRPCError, DisconnectedError
@@ -34,17 +36,116 @@
# Dedicated Client select loop:
client_map = {}
client_trigger = trigger(client_map)
+client_timeout = 30.0
def client_loop():
map = client_map
- poll_fun = asyncore.poll
logger = logging.getLogger('ZEO.zrpc.client_loop')
+ logger.addHandler(logging.StreamHandler())
+
+ read = asyncore.read
+ write = asyncore.write
+ _exception = asyncore._exception
+
while map:
try:
- poll_fun(30.0, map)
+ r = []; w = []; e = []
+ for fd, obj in map.items():
+ is_r = obj.readable()
+ is_w = obj.writable()
+ if is_r:
+ r.append(fd)
+ if is_w:
+ w.append(fd)
+ if is_r or is_w:
+ e.append(fd)
+
+ try:
+ r, w, e = select.select(r, w, e, client_timeout)
+ except select.error, err:
+ if err[0] != errno.EINTR:
+ if err[0] == errno.EBADF:
+
+ # If a connection is closed while we are
+ # calling select on it, we can get a bad
+ # file-descriptor error. We'll check for this
+ # case by looking for entries in r and w that
+ # are not in the socket map.
+
+ if [fd for fd in r if fd not in client_map]:
+ continue
+ if [fd for fd in w if fd not in client_map]:
+ continue
+
+# print 'BADF', list(client_map), r, w, e
+ raise
+ else:
+ continue
+
+ for fd in r:
+ obj = map.get(fd)
+ if obj is None:
+ continue
+ read(obj)
+
+ for fd in w:
+ obj = map.get(fd)
+ if obj is None:
+ continue
+ write(obj)
+
+ for fd in e:
+ obj = map.get(fd)
+ if obj is None:
+ continue
+ _exception(obj)
+
except:
+# print 'poll failure', sys.exc_info()[1], time.time()
logger.exception('poll failure')
+ raise
+#import time
+def poll(timeout, map):
+ if map:
+ r = []; w = []; e = []
+ for fd, obj in map.items():
+ is_r = obj.readable()
+ is_w = obj.writable()
+ if is_r:
+ r.append(fd)
+ if is_w:
+ w.append(fd)
+ if is_r or is_w:
+ e.append(fd)
+
+ try:
+ r, w, e = select.select(r, w, e, timeout)
+ except select.error, err:
+ if err[0] != errno.EINTR:
+ raise
+ else:
+ return
+
+ for fd in r:
+ obj = map.get(fd)
+ if obj is None:
+ continue
+ asyncore.read(obj)
+
+ for fd in w:
+ obj = map.get(fd)
+ if obj is None:
+ continue
+ asyncore.write(obj)
+
+ for fd in e:
+ obj = map.get(fd)
+ if obj is None:
+ continue
+ asyncore._exception(obj)
+
+
client_thread = threading.Thread(target=client_loop)
client_thread.setDaemon(True)
client_thread.start()
More information about the Zodb-checkins
mailing list