[Zodb-checkins] CVS: ZEO/ZEO/zrpc - __init__.py:1.1.2.1.2.1 client.py:1.1.2.2.2.1 connection.py:1.1.2.2.2.1 error.py:1.1.2.1.2.1 log.py:1.1.2.2.2.1 marshal.py:1.1.2.1.2.1 server.py:1.1.2.1.2.1 trigger.py:1.1.2.1.2.1

Jeremy Hylton jeremy@zope.com
Thu, 25 Apr 2002 16:19:56 -0400


Update of /cvs-repository/ZEO/ZEO/zrpc
In directory cvs.zope.org:/tmp/cvs-serv5401/ZEO/zrpc

Modified Files:
      Tag: ZEO2-branch
	__init__.py client.py connection.py error.py log.py marshal.py 
	server.py trigger.py 
Log Message:
Merge the Standby-branch into the ZEO2-branch.

The Standby-branch is history.



=== ZEO/ZEO/zrpc/__init__.py 1.1.2.1 => 1.1.2.1.2.1 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
 # zrpc is a package with the following modules
 # error -- exceptions raised by zrpc
 # marshal -- internal, handles basic protocol issues


=== ZEO/ZEO/zrpc/client.py 1.1.2.2 => 1.1.2.2.2.1 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
 import errno
 import select
 import socket
@@ -23,13 +36,13 @@
         self.tmax = tmax
         self.connected = 0
         self.connection = None
+        self.closed = 0
         # If _thread is not None, then there is a helper thread
         # attempting to connect.  _thread is protected by _connect_lock.
         self._thread = None
         self._connect_lock = threading.Lock()
         self.trigger = None
         self.thr_async = 0
-        self.closed = 0
         ThreadedAsync.register_loop_callback(self.set_async)
 
     def __repr__(self):
@@ -79,18 +92,45 @@
             self._connect_lock.release()
         if self.connection:
             self.connection.close()
+        if self.trigger is not None:
+            self.trigger.close()
 
     def set_async(self, map):
-        # XXX need each connection started with async==0 to have a callback
-        self.trigger = trigger()
-        self.thr_async = 1 # XXX needs to be set on the Connection
+        # This is the callback registered with ThreadedAsync.  The
+        # callback might be called multiple times, so it shouldn't
+        # create a trigger every time and should never do anything
+        # after it's closed.
+
+        # It may be that the only case where it is called multiple
+        # times is in the test suite, where ThreadedAsync's loop can
+        # be started in a child process after a fork.  Regardless,
+        # it's good to be defensive.
+
+        # XXX need each connection started with async==0 to have a
+        # callback
+        if not self.closed and self.trigger is None:
+            self.trigger = trigger()
+            self.thr_async = 1 # XXX needs to be set on the Connection
 
     def attempt_connect(self):
+        """Attempt a connection to the server without blocking too long.
+
+        There isn't a crisp definition for too long.  When a
+        ClientStorage is created, it attempts to connect to the
+        server.  If the server isn't immediately available, it can
+        operate from the cache.  This method will start the background
+        connection thread and wait a little while to see if it
+        finishes quickly.
+        """
+
         # XXX will a single attempt take too long?
         self.connect()
         try:
             event = self._thread.one_attempt
         except AttributeError:
+            # An AttributeError means that (1) _thread is None and (2)
+            # as a consquence of (1) that the connect thread has
+            # already exited.
             pass
         else:
             event.wait()
@@ -132,77 +172,122 @@
     # helper for non-local exit
     def __init__(self, sock):
         self.sock = sock
-            
+
+# When trying to do a connect on a non-blocking socket, some outcomes
+# are expected.  Set _CONNECT_IN_PROGRESS to the errno value(s) expected
+# when an initial connect can't complete immediately.  Set _CONNECT_OK
+# to the errno value(s) expected if the connect succeeds *or* if it's
+# already connected (our code can attempt redundant connects).
+if hasattr(errno, "WSAEWOULDBLOCK"):    # Windows
+    _CONNECT_IN_PROGRESS = (errno.WSAEWOULDBLOCK,)
+    _CONNECT_OK          = (0, errno.WSAEISCONN)
+else:                                   # Unix
+    _CONNECT_IN_PROGRESS = (errno.EINPROGRESS,)
+    _CONNECT_OK          = (0, errno.EISCONN)
+
 class ConnectThread(threading.Thread):
+    """Thread that tries to connect to server given one or more addresses.
+    The thread is passed a ConnectionManager and the manager's client
+    as arguments.  It calls notifyConnected() on the client when a
+    socket connects.  If notifyConnected() returns without raising an
+    exception, the thread is done; it calls connect_done() on the
+    manager and exits.
+
+    The thread will continue to run, attempting connections, until a
+    successful notifyConnected() or stop() is called.
+    """
 
     __super_init = threading.Thread.__init__
 
-    def __init__(self, mgr, client, addr, tmin, tmax):
-        self.__super_init(name="Connect(%s)" % addr)
+    # We don't expect clients to call any methods of this Thread other
+    # than close() and those defined by the Thread API.
+
+    def __init__(self, mgr, client, addrs, tmin, tmax):
+        self.__super_init(name="Connect(%s)" % addrs)
         self.mgr = mgr
         self.client = client
-        self.addr = addr
+        self.addrs = addrs
         self.tmin = tmin
         self.tmax = tmax
         self.stopped = 0
         self.one_attempt = threading.Event()
+        # A ConnectThread keeps track of whether it has finished a
+        # call to attempt_connects().  This allows the
+        # ConnectionManager to make an attempt to connect right away,
+        # but not block for too long if the server isn't immediately
+        # available.
 
     def stop(self):
         self.stopped = 1
 
+    # Every method from run() to the end is used internally by the Thread.
+
     def run(self):
         delay = self.tmin
-        while not (self.stopped or self.attempt_connects()):
+        while not self.stopped:
+            success = self.attempt_connects()
             if not self.one_attempt.isSet():
                 self.one_attempt.set()
+            if success:
+                break
             time.sleep(delay)
             delay *= 2
             if delay > self.tmax:
                 delay = self.tmax
         log("thread exiting: %s" % self.getName())
-                
+
+    def close_sockets(self):
+        for s in self.sockets.keys():
+            s.close()
+
     def attempt_connects(self):
-        "Return true if any connect attempt succeeds."
-        sockets = {}
+        """Try connecting to all self.addrs addresses.
+
+        If at least one succeeds, pick a success arbitrarily, close all other
+        successes (if any), and return true.  If none succeed, return false.
+        """
+
+        self.sockets = {}  # {open socket:  connection address}
 
-        log("attempting connection on %d sockets" % len(self.addr))
+        log("attempting connection on %d sockets" % len(self.addrs))
         try:
-            for domain, addr in self.addr:
+            for domain, addr in self.addrs:
                 if __debug__:
                     log("attempt connection to %s" % repr(addr),
                         level=zLOG.DEBUG)
-                s = socket.socket(domain, socket.SOCK_STREAM)
+                try:
+                    s = socket.socket(domain, socket.SOCK_STREAM)
+                except socket.error, err:
+                    log("Failed to create socket with domain=%s: %s" % (
+                        domain, err), level=zLOG.ERROR)
+                    continue
                 s.setblocking(0)
+                self.sockets[s] = addr
+                # connect() raises Connected iff it succeeds
                 # XXX can still block for a while if addr requires DNS
-                e = self.connect(s, addr)
-                if e is not None:
-                    sockets[s] = addr
+                self.connect(s)
 
             # next wait until they actually connect
-            while sockets:
+            while self.sockets:
                 if self.stopped:
-                    for s in sockets.keys():
-                        s.close()
+                    self.close_sockets()
                     return 0
                 try:
-                    r, w, x = select.select([], sockets.keys(), [], 1.0)
+                    r, w, x = select.select([], self.sockets.keys(), [], 1.0)
                 except select.error:
                     continue
                 for s in w:
-                    e = self.connect(s, sockets[s])
-                    if e is None:
-                        del sockets[s]
+                    # connect() raises Connected iff it succeeds
+                    self.connect(s)
         except Connected, container:
             s = container.sock
-            del sockets[s]
-            # close all the other sockets
-            for s in sockets.keys():
-                s.close()
+            del self.sockets[s] # don't close the newly connected socket
+            self.close_sockets()
             return 1
         return 0
 
-    def connect(self, s, addr):
-        """Call s.connect_ex(addr) and return true if loop should continue.
+    def connect(self, s):
+        """Call s.connect_ex(addr); raise Connected iff connection succeeds.
 
         We have to handle several possible return values from
         connect_ex().  If the socket is connected and the initial ZEO
@@ -211,27 +296,42 @@
         select() loop in the caller and an exception is a principled
         way to do the abort.
 
-        If the socket sonnects and the initial ZEO setup fails or the
-        connect_ex() returns an error, we close the socket and ignore it.
+        If the socket sonnects and the initial ZEO setup
+        (notifyConnected()) fails or the connect_ex() returns an
+        error, we close the socket, remove it from self.sockets, and
+        proceed with the other sockets.
 
         If connect_ex() returns EINPROGRESS, we need to try again later.
         """
-        
-        e = s.connect_ex(addr)
-        if e == errno.EINPROGRESS:
-            return 1
-        elif e == 0:
-            c = self.test_connection(s, addr)
-            log("connected to %s" % repr(addr), level=zLOG.DEBUG)
-            if c:
-                raise Connected(s)
+        addr = self.sockets[s]
+        try:
+            e = s.connect_ex(addr)
+        except socket.error, msg:
+            log("failed to connect to %s: %s" % (addr, msg),
+                level=zLOG.ERROR)
         else:
-            if __debug__:
+            if e in _CONNECT_IN_PROGRESS:
+                return
+            elif e in _CONNECT_OK:
+                c = self.test_connection(s, addr)
+                if c:
+                    log("connected to %s" % repr(addr), level=zLOG.DEBUG)
+                    raise Connected(s)
+            else:
                 log("error connecting to %s: %s" % (addr, errno.errorcode[e]),
                     level=zLOG.DEBUG)
-            s.close()
+        # Any execution that doesn't raise Connected() or return
+        # because of CONNECT_IN_PROGRESS is an error.  Make sure the
+        # socket is closed and remove it from the dict of pending
+        # sockets.
+        s.close()
+        del self.sockets[s]
 
     def test_connection(self, s, addr):
+        # Establish a connection at the zrpc level and call the
+        # client's notifyConnected(), giving the zrpc application a
+        # chance to do app-level check of whether the connection is
+        # okay.
         c = ManagedConnection(s, addr, self.client, self.mgr)
         try:
             self.client.notifyConnected(c)
@@ -239,6 +339,8 @@
             log("error connecting to server: %s" % str(addr),
                 level=zLOG.ERROR, error=sys.exc_info())
             c.close()
+            # Closing the ZRPC connection will eventually close the
+            # socket, somewhere in asyncore.
             return 0
         self.mgr.connect_done(c)
         return 1


=== ZEO/ZEO/zrpc/connection.py 1.1.2.2 => 1.1.2.2.2.1 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
 import asyncore
 import sys
 import threading
@@ -137,7 +150,7 @@
         if flags & ASYNC:
             if ret is not None:
                 raise ZRPCError("async method %s returned value %s" %
-                                (name, repr(ret))
+                                (name, repr(ret)))
         else:
             if __debug__:
                 log("%s return %s" % (name, short_repr(ret)), zLOG.DEBUG)
@@ -162,7 +175,7 @@
     def send_reply(self, msgid, ret):
         msg = self.marshal.encode(msgid, 0, REPLY, ret)
         self.message_output(msg)
-    
+
     def return_error(self, msgid, flags, err_type, err_value):
         if flags is None:
             self.log_error("Exception raised during decoding")
@@ -234,7 +247,7 @@
         # XXX The message won't go out right away in this case.  It
         # will wait for the asyncore loop to get control again.  Seems
         # okay to comment our for now, but need to understand better.
-##        self._do_async_poll()
+        self._do_async_poll()
 
     # handle IO, possibly in async mode
 
@@ -271,7 +284,7 @@
                 if self.closed:
                     raise DisconnectedError()
         self.__reply_lock.release()
-            
+
     def _do_async_poll(self, wait_for_reply=0):
         "Invoke asyncore mainloop to get pending message out."
 
@@ -284,13 +297,9 @@
             asyncore.poll(0.0, self._map)
 
 class ServerConnection(Connection):
+    """Connection on the server side"""
+    # XXX Do we need this class anymore?
 
-    def _do_async_poll(self, wait=0):
-        """If this is a server, there is no explicit IO to do"""
-        pass
-
-    # XXX _do_async_loop is never called.  Should it be defined as
-    # above anyway?
 
 class ManagedServerConnection(ServerConnection):
     """A connection that notifies its ConnectionManager of closing"""
@@ -310,7 +319,7 @@
     """A connection that notifies its ConnectionManager of closing.
 
     A managed connection also defers the ThreadedAsync work to its
-    manager. 
+    manager.
     """
     __super_init = Connection.__init__
     __super_close = Connection.close
@@ -324,6 +333,9 @@
         # the manager should actually close the trigger
         del self.trigger
 
+    def set_async(self, map):
+        pass
+
     def _prepare_async(self):
         # Don't do the register_loop_callback that the superclass does
         pass
@@ -345,4 +357,3 @@
     def close(self):
         self.__super_close()
         self.__mgr.notify_closed(self)
-


=== ZEO/ZEO/zrpc/error.py 1.1.2.1 => 1.1.2.1.2.1 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
 from ZODB import POSException
 from ZEO.Exceptions import Disconnected
 
@@ -9,4 +22,3 @@
 
 class DisconnectedError(ZRPCError, Disconnected):
     """The database storage is disconnected from the storage server."""
-


=== ZEO/ZEO/zrpc/log.py 1.1.2.2 => 1.1.2.2.2.1 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
 import os
 import types
 import zLOG


=== ZEO/ZEO/zrpc/marshal.py 1.1.2.1 => 1.1.2.1.2.1 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
 import cPickle
 from cStringIO import StringIO
 import types
@@ -7,7 +20,7 @@
 
     # It's okay to share a single Pickler as long as it's in fast
     # mode, which means that it doesn't have a memo.
-    
+
     pickler = cPickle.Pickler()
     pickler.fast = 1
     pickle = pickler.dump
@@ -46,7 +59,7 @@
         r = getattr(m, name)
     except AttributeError:
         raise ZRPCError("module %s has no global %s" % (module, name))
-        
+
     safe = getattr(r, '__no_side_effects__', 0)
     if safe:
         return r


=== ZEO/ZEO/zrpc/server.py 1.1.2.1 => 1.1.2.1.2.1 ===
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
+# 
+##############################################################################
 import asyncore
 import socket
 import types
@@ -14,7 +27,7 @@
 
     reuse_addr = 1
 
-    def __init__(self, addr, factory=Connection, reuse_addr=None):  
+    def __init__(self, addr, factory=Connection, reuse_addr=None):
         self.__super_init()
         self.addr = addr
         self.factory = factory


=== ZEO/ZEO/zrpc/trigger.py 1.1.2.1 => 1.1.2.1.2.1 ===
-# 
-# Zope Public License (ZPL) Version 1.0
-# -------------------------------------
-# 
-# Copyright (c) Digital Creations.  All rights reserved.
-# 
-# This license has been certified as Open Source(tm).
-# 
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-# 
-# 1. Redistributions in source code must retain the above copyright
-#    notice, this list of conditions, and the following disclaimer.
-# 
-# 2. Redistributions in binary form must reproduce the above copyright
-#    notice, this list of conditions, and the following disclaimer in
-#    the documentation and/or other materials provided with the
-#    distribution.
-# 
-# 3. Digital Creations requests that attribution be given to Zope
-#    in any manner possible. Zope includes a "Powered by Zope"
-#    button that is installed by default. While it is not a license
-#    violation to remove this button, it is requested that the
-#    attribution remain. A significant investment has been put
-#    into Zope, and this effort will continue if the Zope community
-#    continues to grow. This is one way to assure that growth.
-# 
-# 4. All advertising materials and documentation mentioning
-#    features derived from or use of this software must display
-#    the following acknowledgement:
-# 
-#      "This product includes software developed by Digital Creations
-#      for use in the Z Object Publishing Environment
-#      (http://www.zope.org/)."
-# 
-#    In the event that the product being advertised includes an
-#    intact Zope distribution (with copyright and license included)
-#    then this clause is waived.
-# 
-# 5. Names associated with Zope or Digital Creations must not be used to
-#    endorse or promote products derived from this software without
-#    prior written permission from Digital Creations.
-# 
-# 6. Modified redistributions of any form whatsoever must retain
-#    the following acknowledgment:
-# 
-#      "This product includes software developed by Digital Creations
-#      for use in the Z Object Publishing Environment
-#      (http://www.zope.org/)."
-# 
-#    Intact (re-)distributions of any official Zope release do not
-#    require an external acknowledgement.
-# 
-# 7. Modifications are encouraged but must be packaged separately as
-#    patches to official Zope releases.  Distributions that do not
-#    clearly separate the patches from the original work must be clearly
-#    labeled as unofficial distributions.  Modifications which do not
-#    carry the name Zope may be packaged in any form, as long as they
-#    conform to all of the clauses above.
-# 
-# 
-# Disclaimer
-# 
-#   THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
-#   EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-#   IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
-#   PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
-#   CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-#   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-#   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
-#   USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
-#   ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
-#   OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
-#   OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
-#   SUCH DAMAGE.
-# 
-# 
-# This software consists of contributions made by Digital Creations and
-# many individuals on behalf of Digital Creations.  Specific
-# attributions are listed in the accompanying credits file.
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+# 
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE
 # 
 ##############################################################################
-
 # This module is a simplified version of the select_trigger module
 # from Sam Rushing's Medusa server.
 
-
 import asyncore
-#import asynchat
 
 import os
 import socket
-import string
 import thread
-    
+
 if os.name == 'posix':
 
     class trigger (asyncore.file_dispatcher):
@@ -136,6 +61,11 @@
             self.lock = thread.allocate_lock()
             self.thunks = []
 
+        def close(self):
+            self.del_channel()
+            self.socket.close() # the read side of the pipe
+            os.close(self.trigger) # the write side of the pipe
+
         def __repr__ (self):
             return '<select-trigger (pipe) at %x>' % id(self)
 
@@ -199,7 +129,7 @@
                     if port <= 19950:
                         raise 'Bind Error', 'Cannot bind trigger!'
                     port=port - 1
-            
+
             a.listen (1)
             w.setblocking (0)
             try:
@@ -250,7 +180,3 @@
                 self.thunks = []
             finally:
                 self.lock.release()
-
-
-
-