[Zope-Checkins] CVS: Zope/lib/python/ZServer - FCGIServer.py:1.22.18.1 FTPServer.py:1.28.2.1 HTTPServer.py:1.46.12.1 PCGIServer.py:1.26.18.1 component.xml:1.3.18.1 datatypes.py:1.2.18.1

Chris McDonough chrism at zope.com
Wed Oct 1 00:34:06 EDT 2003


Update of /cvs-repository/Zope/lib/python/ZServer
In directory cvs.zope.org:/tmp/cvs-serv21030

Modified Files:
      Tag: chrism-zserver-connection-policies-branch
	FCGIServer.py FTPServer.py HTTPServer.py PCGIServer.py 
	component.xml datatypes.py 
Log Message:
Experimental branch which allows ZServer servers to specify a different 
thread pool than the default thread pool, and a ZODB 'connection policy'.

The advantages of this include:

- Thread resource exhaustion will not effect servers which specify
  a different thread pool.

- The connection policy can be used for many purposes (including,
  for instance, specifying in the future, whether MVCC should be used for a i
  particular connection), but its main purpose in this context is
  to allow us to define a 'temporary' connection policy which obtains
  a connection outside of a (possibly exhausted) database connection
  pool.

HTTP, WebDAV, FTP, PCGI, and FCGI servers can make use of the new features
by using the 'thread-pool' and 'zodb-connection-policy' keys within
the zope.conf file within their respective server sections.  For instance:

<http-server>
  address 8080
  thread-pool emergency
  zodb-connection-policy temporary
</http-server>



=== Zope/lib/python/ZServer/FCGIServer.py 1.22 => 1.22.18.1 ===
--- Zope/lib/python/ZServer/FCGIServer.py:1.22	Tue Mar 18 16:15:14 2003
+++ Zope/lib/python/ZServer/FCGIServer.py	Wed Oct  1 00:33:35 2003
@@ -34,7 +34,7 @@
 
 from ZServer import CONNECTION_LIMIT, requestCloseOnExec
 
-from PubCore import handle
+from PubCore import get_handler, DEFAULT_POOLNAME, DEFAULT_CONNPOLICY
 from PubCore.ZEvent import Wakeup
 from ZPublisher.HTTPResponse import HTTPResponse
 from ZPublisher.HTTPRequest import HTTPRequest
@@ -272,7 +272,8 @@
     closed=0
     using_temp_stdin=None
 
-    def __init__(self, server, sock, addr):
+    def __init__(self, server, sock, addr, pool_name=DEFAULT_POOLNAME,
+                 connection_policy=DEFAULT_CONNPOLICY):
         self.server = server
         self.addr = addr
         asynchat.async_chat.__init__(self, sock)
@@ -283,7 +284,8 @@
         self.stdin = StringIO()
         self.filterData = StringIO()  # not currently used, but maybe someday
         self.requestId = 0
-
+        self.handle = get_handler(pool_name)
+        self.connection_policy = connection_policy
 
     def setInitialState(self):
         self.data = StringIO()
@@ -431,7 +433,8 @@
                                 stderr = StringIO())
         response.setChannel(self)
         request  = HTTPRequest(self.stdin, self.env, response)
-        handle(self.server.module, request, response)
+        request._connection_policy = self.connection_policy
+        self.handle(self.server.module, request, response)
 
 
     def log_request(self, bytes):
@@ -587,10 +590,14 @@
                  port=None,
                  socket_file=None,
                  resolver=None,
-                 logger_object=None):
+                 logger_object=None,
+                 pool_name=DEFAULT_POOLNAME,
+                 connection_policy=DEFAULT_CONNPOLICY):
 
         self.ip = ip
         self.count=counter()
+        self.pool_name = pool_name
+        self.connection_policy = connection_policy
         asyncore.dispatcher.__init__(self)
         if not logger_object:
             logger_object = logger.file_logger(sys.stdout)
@@ -638,7 +645,8 @@
         except socket.error:
             self.log_info('Server accept() threw an exception', 'warning')
             return
-        self.channel_class(self, conn, addr)
+        self.channel_class(self, conn, addr, self.pool_name,
+                           self.connection_policy)
 
 
     def readable(self):


=== Zope/lib/python/ZServer/FTPServer.py 1.28 => 1.28.2.1 ===
--- Zope/lib/python/ZServer/FTPServer.py:1.28	Fri Sep 26 12:13:59 2003
+++ Zope/lib/python/ZServer/FTPServer.py	Wed Oct  1 00:33:35 2003
@@ -64,10 +64,10 @@
 
 """
 
-from PubCore import handle
+from PubCore import get_handler, DEFAULT_POOLNAME, DEFAULT_CONNPOLICY
 from medusa.ftp_server import ftp_channel, ftp_server, recv_channel
 import asyncore, asynchat
-from medusa import filesys
+from medusa import filesys, logger
 
 from FTPResponse import make_response
 from FTPRequest import FTPRequest
@@ -76,6 +76,7 @@
 
 from cStringIO import StringIO
 import os
+import sys
 from mimetypes import guess_type
 import marshal
 import stat
@@ -88,7 +89,9 @@
     read_only=0
     anonymous=1
 
-    def __init__ (self, server, conn, addr, module):
+    def __init__ (self, server, conn, addr, module,
+                  pool_name=DEFAULT_POOLNAME,
+                  connection_policy=DEFAULT_CONNPOLICY):
         ftp_channel.__init__(self,server,conn,addr)
         requestCloseOnExec(conn)
         self.module=module
@@ -96,6 +99,11 @@
         self.password=''
         self.path='/'
         self.cookies={}
+        self.handle = get_handler(pool_name)
+        self.policy = connection_policy
+
+    def set_policy(self, request):
+        request._connection_policy = self.policy
 
     def _join_paths(self,*args):
         path=os.path.join(*args)
@@ -169,8 +177,10 @@
 
     def listdir (self, path, long=0):
         response=make_response(self, self.listdir_completion, long)
-        request=FTPRequest(path, 'LST', self, response,globbing=self.globbing,recursive=self.recursive)
-        handle(self.module, request, response)
+        request=FTPRequest(path, 'LST', self, response,
+                           globbing=self.globbing,recursive=self.recursive)
+        self.set_policy(request)
+        self.handle(self.module, request, response)
 
     def listdir_completion(self, long, response):
         status=response.getStatus()
@@ -205,7 +215,8 @@
         response=make_response(self, self.cwd_completion,
                 self._join_paths(self.path,line[1]))
         request=FTPRequest(line[1],'CWD',self,response)
-        handle(self.module,request,response)
+        self.set_policy(request)
+        self.handle(self.module,request,response)
 
     def cwd_completion(self,path,response):
         'cwd completion callback'
@@ -249,7 +260,8 @@
             return
         response=make_response(self, self.mdtm_completion)
         request=FTPRequest(line[1],'MDTM',self,response)
-        handle(self.module,request,response)
+        self.set_policy(request)
+        self.handle(self.module,request,response)
 
     def mdtm_completion(self, response):
         status=response.getStatus()
@@ -276,7 +288,8 @@
             return
         response=make_response(self, self.size_completion)
         request=FTPRequest(line[1],'SIZE',self,response)
-        handle(self.module,request,response)
+        self.set_policy(request)
+        self.handle(self.module,request,response)
 
     def size_completion(self,response):
         status=response.getStatus()
@@ -301,7 +314,8 @@
         # Support download restarts if possible.
         if self.restart_position > 0:
             request.environ['HTTP_RANGE'] = 'bytes=%d-' % self.restart_position
-        handle(self.module,request,response)
+        self.set_policy(request)
+        self.handle(self.module,request,response)
 
     def retr_completion(self, file, response):
         status=response.getStatus()
@@ -351,7 +365,8 @@
         'callback to do the STOR, after we have the input'
         response=make_response(self, self.stor_completion)
         request=FTPRequest(path,'STOR',self,response,stdin=data)
-        handle(self.module,request,response)
+        self.set_policy(request)
+        self.handle(self.module,request,response)
 
     def stor_completion(self,response):
         status=response.getStatus()
@@ -380,7 +395,8 @@
         patht,idt=os.path.split(line[1])
         response=make_response(self, self.rnto_completion)
         request=FTPRequest(pathf,('RNTO',idf,idt),self,response)
-        handle(self.module,request,response)
+        self.set_policy(request)
+        self.handle(self.module,request,response)
 
     def rnto_completion(self,response):
         status=response.getStatus()
@@ -396,7 +412,8 @@
         path,id=os.path.split(line[1])
         response=make_response(self, self.dele_completion)
         request=FTPRequest(path,('DELE',id),self,response)
-        handle(self.module,request,response)
+        self.set_policy(request)
+        self.handle(self.module,request,response)
 
     def dele_completion(self,response):
         status=response.getStatus()
@@ -414,7 +431,8 @@
         path,id=os.path.split(line[1])
         response=make_response(self, self.mkd_completion)
         request=FTPRequest(path,('MKD',id),self,response)
-        handle(self.module,request,response)
+        self.set_policy(request)
+        self.handle(self.module,request,response)
 
     cmd_xmkd=cmd_mkd
 
@@ -436,7 +454,8 @@
         path,id=os.path.split(line[1])
         response=make_response(self, self.rmd_completion)
         request=FTPRequest(path,('RMD',id),self,response)
-        handle(self.module,request,response)
+        self.set_policy(request)
+        self.handle(self.module,request,response)
 
     cmd_xrmd=cmd_rmd
 
@@ -481,7 +500,8 @@
             response=make_response(self, self.pass_completion,
                     self._join_paths('/',path))
             request=FTPRequest(path,'PASS',self,response)
-            handle(self.module,request,response)
+            self.set_policy(request)
+            self.handle(self.module,request,response)
 
 
     def pass_completion(self,path,response):
@@ -595,11 +615,23 @@
     limiter=FTPLimiter(10,1)
     shutup=0
 
-    def __init__(self,module,*args,**kw):
+    def __init__(self,
+                 module,
+                 hostname  = None,
+                 ip ='',
+                 port = 21,
+                 resolver = None,
+                 logger_object = logger.file_logger (sys.stdout),
+                 pool_name=DEFAULT_POOLNAME,
+                 connection_policy=DEFAULT_CONNPOLICY,
+                 ):
         self.shutup=1
-        ftp_server.__init__(self, None, *args, **kw)
+        ftp_server.__init__(self, None, hostname, ip, port, resolver,
+                            logger_object)
         self.shutup=0
         self.module=module
+        self.pool_name = pool_name
+        self.connection_policy = connection_policy
         self.log_info('FTP server started at %s\n'
                       '\tHostname: %s\n\tPort: %d' % (
                         time.ctime(time.time()),
@@ -629,7 +661,8 @@
             return
         self.total_sessions.increment()
         self.log_info('Incoming connection from %s:%d' % (addr[0], addr[1]))
-        self.ftp_channel_class (self, conn, addr, self.module)
+        self.ftp_channel_class (self, conn, addr, self.module, self.pool_name,
+                                self.connection_policy)
 
     def readable(self):
         return len(asyncore.socket_map) < CONNECTION_LIMIT


=== Zope/lib/python/ZServer/HTTPServer.py 1.46 => 1.46.12.1 ===
--- Zope/lib/python/ZServer/HTTPServer.py:1.46	Wed Apr  9 16:26:23 2003
+++ Zope/lib/python/ZServer/HTTPServer.py	Wed Oct  1 00:33:35 2003
@@ -41,7 +41,7 @@
 import socket
 from cStringIO import StringIO
 
-from PubCore import handle
+from PubCore import get_handler, DEFAULT_POOLNAME, DEFAULT_CONNPOLICY
 from HTTPResponse import make_response
 from ZPublisher.HTTPRequest import HTTPRequest
 
@@ -289,11 +289,14 @@
     zombie_timeout=100*60 # 100 minutes
     max_header_len = 8196
 
-    def __init__(self, server, conn, addr):
+    def __init__(self, server, conn, addr, pool_name=DEFAULT_POOLNAME,
+                 connection_policy=DEFAULT_CONNPOLICY):
         http_channel.__init__(self, server, conn, addr)
         requestCloseOnExec(conn)
         self.queue=[]
         self.working=0
+        self.handle = get_handler(pool_name)
+        self.connection_policy = connection_policy
 
     def push(self, producer, send=1):
         # this is thread-safe when send is false
@@ -326,7 +329,8 @@
                 self.working=1
                 try: module_name, request, response=self.queue.pop(0)
                 except: return
-                handle(module_name, request, response)
+                request._connection_policy = self.connection_policy
+                self.handle(module_name, request, response)
 
     def close(self):
         self.closed=1
@@ -375,7 +379,9 @@
     channel_class = zhttp_channel
     shutup=0
 
-    def __init__ (self, ip, port, resolver=None, logger_object=None):
+    def __init__ (self, ip, port, resolver=None, logger_object=None,
+                  pool_name=DEFAULT_POOLNAME,
+                  connection_policy=DEFAULT_CONNPOLICY):
         self.shutup=1
         http_server.__init__(self, ip, port, resolver, logger_object)
         self.shutup=0
@@ -385,6 +391,8 @@
                         self.server_name,
                         self.server_port
                         ))
+        self.pool_name = pool_name
+        self.connection_policy = connection_policy
 
     def clean_shutdown_control(self,phase,time_in_this_phase):
         if phase==2:
@@ -407,3 +415,32 @@
         # override asyncore limits for nt's listen queue size
         self.accepting = 1
         return self.socket.listen (num)
+
+    def handle_accept (self):
+        # overridden to support thread pool name
+        self.total_clients.increment()
+        try:
+            tup = self.accept()
+        except socket.error:
+                # linux: on rare occasions we get a bogus socket back from
+                # accept.  socketmodule.c:makesockaddr complains that the
+                # address family is unknown.  We don't want the whole server
+                # to shut down because of this.
+            self.log_info ('warning: server accept() threw an exception',
+                           'warning')
+            self.total_clients.decrement()
+            return
+        try:
+            conn, addr = tup
+        except TypeError:
+            # unpack non-sequence.  this can happen when a read event
+            # fires on a listening socket, but when we call accept()
+            # we get EWOULDBLOCK, so dispatcher.accept() returns None.
+            # Seen on FreeBSD3 and Linux.
+            #self.log_info ('warning: server accept() returned %s '
+            #               '(EWOULDBLOCK?)' % tup, 'warning')
+            self.total_clients.decrement()
+            return
+
+        self.channel_class(self, conn, addr, self.pool_name,
+                           self.connection_policy)


=== Zope/lib/python/ZServer/PCGIServer.py 1.26 => 1.26.18.1 ===
--- Zope/lib/python/ZServer/PCGIServer.py:1.26	Tue Mar 18 16:15:14 2003
+++ Zope/lib/python/ZServer/PCGIServer.py	Wed Oct  1 00:33:35 2003
@@ -37,7 +37,7 @@
 import ZServer
 from ZServer import CONNECTION_LIMIT, requestCloseOnExec
 
-from PubCore import handle
+from PubCore import get_handler, DEFAULT_POOLNAME, DEFAULT_CONNPOLICY
 from PubCore.ZEvent import Wakeup
 from ZPublisher.HTTPResponse import HTTPResponse
 from ZPublisher.HTTPRequest import HTTPRequest
@@ -58,7 +58,8 @@
 
     closed=0
 
-    def __init__(self,server,sock,addr):
+    def __init__(self, server, sock, addr, pool_name=DEFAULT_POOLNAME,
+                 connection_policy=DEFAULT_CONNPOLICY):
         self.server = server
         self.addr = addr
         asynchat.async_chat.__init__ (self, sock)
@@ -68,6 +69,8 @@
         self.set_terminator(10)
         self.size=None
         self.done=None
+        self.handle = get_handler(pool_name)
+        self.connection_policy = connection_policy
 
     def found_terminator(self):
         if self.size is None:
@@ -132,7 +135,8 @@
         self.done=1
         response=PCGIResponse(stdout=PCGIPipe(self), stderr=StringIO())
         request=HTTPRequest(self.data, self.env, response)
-        handle(self.server.module, request, response)
+        request._connection_policy = self.connection_policy
+        self.handle(self.server.module, request, response)
 
     def collect_incoming_data(self, data):
         self.data.write(data)
@@ -222,14 +226,16 @@
     channel_class=PCGIChannel
 
     def __init__ (self,
-            module='Main',
-            ip='127.0.0.1',
-            port=None,
-            socket_file=None,
-            pid_file=None,
-            pcgi_file=None,
-            resolver=None,
-            logger_object=None):
+                  module='Main',
+                  ip='127.0.0.1',
+                  port=None,
+                  socket_file=None,
+                  pid_file=None,
+                  pcgi_file=None,
+                  resolver=None,
+                  logger_object=None,
+                  pool_name=DEFAULT_POOLNAME,
+                  connection_policy=DEFAULT_CONNPOLICY):
 
         self.ip = ip
         asyncore.dispatcher.__init__(self)
@@ -246,6 +252,7 @@
         self.port=port
         self.pid_file=pid_file
         self.socket_file=socket_file
+        self.connection_policy = connection_policy
         if pcgi_file is not None:
             self.read_info(pcgi_file)
 
@@ -321,7 +328,8 @@
         except socket.error:
             self.log_info('Server accept() threw an exception', 'warning')
             return
-        self.channel_class(self, conn, addr)
+        self.channel_class(self, conn, addr, self.pool_name,
+                           self.connection_policy)
 
     def readable(self):
         return len(asyncore.socket_map) < CONNECTION_LIMIT


=== Zope/lib/python/ZServer/component.xml 1.3 => 1.3.18.1 ===
--- Zope/lib/python/ZServer/component.xml:1.3	Mon Mar 24 17:32:39 2003
+++ Zope/lib/python/ZServer/component.xml	Wed Oct  1 00:33:35 2003
@@ -19,6 +19,8 @@
          receive WebDAV source responses to GET requests.
        </description>
      </key>
+     <key name="thread-pool" default="default"/>
+     <key name="zodb-connection-policy" default="default"/>
   </sectiontype>
 
   <sectiontype name="webdav-source-server"
@@ -26,24 +28,32 @@
                implements="ZServer.server">
      <key name="address" datatype="inet-address"/>
      <key name="force-connection-close" datatype="boolean" default="off"/>
+     <key name="thread-pool" default="default"/>
+     <key name="zodb-connection-policy" default="default"/>
   </sectiontype>
 
   <sectiontype name="persistent-cgi"
                datatype=".PCGIServerFactory"
                implements="ZServer.server">
     <key name="path" datatype="existing-file"/>
+    <key name="thread-pool" default="default"/>
+    <key name="zodb-connection-policy" default="default"/>
   </sectiontype>
 
   <sectiontype name="fast-cgi"
                datatype=".FCGIServerFactory"
                implements="ZServer.server">
     <key name="address" datatype="socket-address"/>
+    <key name="thread-pool" default="default"/>
+    <key name="zodb-connection-policy" default="default"/>
   </sectiontype>
 
   <sectiontype name="ftp-server"
                datatype=".FTPServerFactory"
                implements="ZServer.server">
      <key name="address" datatype="inet-address"/>
+     <key name="thread-pool" default="default"/>
+    <key name="zodb-connection-policy" default="default"/>
   </sectiontype>
 
   <sectiontype name="monitor-server"


=== Zope/lib/python/ZServer/datatypes.py 1.2 => 1.2.18.1 ===
--- Zope/lib/python/ZServer/datatypes.py:1.2	Tue Mar 18 16:15:14 2003
+++ Zope/lib/python/ZServer/datatypes.py	Wed Oct  1 00:33:35 2003
@@ -56,6 +56,8 @@
         # webdav-source-server sections won't have webdav_source_clients:
         webdav_clients = getattr(section, "webdav_source_clients", None)
         self.webdav_source_clients = webdav_clients
+        self.thread_pool = getattr(section, 'thread_pool', 'default')
+        self.policy = getattr(section, 'zodb_connection_policy', 'default')
 
     def create(self):
         from ZServer import HTTPServer
@@ -66,7 +68,9 @@
             handler.set_webdav_source_clients(self.webdav_source_clients)
         server = HTTPServer.zhttp_server(ip=self.host, port=self.port,
                                          resolver=self.dnsresolver,
-                                         logger_object=access_logger)
+                                         logger_object=access_logger,
+                                         pool_name=self.thread_pool,
+                                         connection_policy=self.policy)
         server.install_handler(handler)
         return server
 




More information about the Zope-Checkins mailing list