[Zope-Checkins] SVN: Zope/trunk/lib/python/ZServer/ I have so made a hash out of this merge from the chrism-clockserver-merge branch. I have no idea what I'm doing, clearly. But this checkin gets us to the place I intended originally.

Chris McDonough chrism at plope.com
Wed Dec 21 12:33:31 EST 2005


Log message for revision 40964:
  I have so made a hash out of this merge from the chrism-clockserver-merge branch.  I have no idea what I'm doing, clearly.  But this checkin gets us to the place I intended originally.
  

Changed:
  U   Zope/trunk/lib/python/ZServer/ClockServer.py
  U   Zope/trunk/lib/python/ZServer/tests/test_clockserver.py

-=-
Modified: Zope/trunk/lib/python/ZServer/ClockServer.py
===================================================================
--- Zope/trunk/lib/python/ZServer/ClockServer.py	2005-12-21 16:36:46 UTC (rev 40963)
+++ Zope/trunk/lib/python/ZServer/ClockServer.py	2005-12-21 17:33:31 UTC (rev 40964)
@@ -18,7 +18,6 @@
 import socket
 import time
 import StringIO
-import base64
 import asyncore
 
 from ZServer.medusa.http_server import http_request
@@ -27,7 +26,9 @@
 from ZServer.HTTPResponse import make_response
 from ZPublisher.HTTPRequest import HTTPRequest
 
-def timeslice(period, when=0):
+def timeslice(period, when=None, t=time.time):
+    if when is None:
+        when =  t()
     return when - (when % period)
 
 class LogHelper:
@@ -66,7 +67,7 @@
     SERVER_IDENT = 'Zope Clock' 
 
     def __init__ (self, method, period=60, user=None, password=None,
-                  host=None, logger=None):
+                  host=None, logger=None, handler=None):
         self.period = period
         self.method = method
 
@@ -80,8 +81,7 @@
         h.append('Host: %s' % host)
         auth = False
         if user and password:
-            encoded = base64.encodestring('%s:%s' % (user, password))
-            encoded = encoded.replace('\012', '')
+            encoded = ('%s:%s' % (user, password)).encode('base64')
             h.append('Authorization: Basic %s' % encoded)
             auth = True
 
@@ -90,6 +90,10 @@
         self.logger = LogHelper(logger)
         self.log_info('Clock server for "%s" started (user: %s, period: %s)'
                       % (method, auth and user or 'Anonymous', self.period))
+        if handler is None:
+            # for unit testing
+            handler = handle
+        self.zhandler = handler
 
     def get_requests_and_response(self):
         out = StringIO.StringIO()
@@ -120,10 +124,10 @@
             env['QUERY_STRING'] = query
         env['channel.creation_time']=time.time()
         for header in req.header:
-            key,value=header.split(":",1)
-            key=key.upper()
-            value=value.strip()
-            key='HTTP_%s' % ("_".join(key.split( "-")))
+            key,value = header.split(":",1)
+            key = key.upper()
+            value = value.strip()
+            key = 'HTTP_%s' % ("_".join(key.split( "-")))
             if value:
                 env[key]=value
         return env
@@ -135,17 +139,18 @@
             # no need for threadsafety here, as we're only ever in one thread
             self.last_slice = slice
             req, zreq, resp = self.get_requests_and_response()
-            handle('Zope', zreq, resp)
-        return 0
+            self.zhandler('Zope2', zreq, resp)
+        return False
 
     def handle_read(self):
-        pass
+        return True
 
     def handle_write (self):
-        self.log_info ('unexpected write event', 'warning')
+        self.log_info('unexpected write event', 'warning')
+        return True
 
     def writable(self):
-        return 0
+        return False
 
     def handle_error (self):      # don't close the socket on error
         (file,fun,line), t, v, tbinfo = asyncore.compact_traceback()

Modified: Zope/trunk/lib/python/ZServer/tests/test_clockserver.py
===================================================================
--- Zope/trunk/lib/python/ZServer/tests/test_clockserver.py	2005-12-21 16:36:46 UTC (rev 40963)
+++ Zope/trunk/lib/python/ZServer/tests/test_clockserver.py	2005-12-21 17:33:31 UTC (rev 40964)
@@ -1,23 +1,32 @@
 import unittest
+import time
 from StringIO import StringIO
 
 from ZServer import ClockServer
 
+class DummyLogger:
+    def __init__(self):
+        self.L = []
+        
+    def log(self, *arg, **kw):
+        self.L.extend(arg)
+
+    def read(self):
+        return ' '.join(self.L)
+
 class LogHelperTests(unittest.TestCase):
     def _getTargetClass(self):
         return ClockServer.LogHelper
 
     def _makeOne(self, *arg, **kw):
-        return self._getTargetClass()(*arg **kw)
+        return self._getTargetClass()(*arg, **kw)
 
     def test_helper(self):
         from StringIO import StringIO
-        logger = StringIO()
-        logger.log = logger.write
+        logger = DummyLogger()
         helper = self._makeOne(logger)
         self.assertEqual(helper.logger, logger)
         logger.log('ip', 'msg', foo=1, bar=2)
-        logger.seek(0)
         self.assertEqual(logger.read(), 'ip msg')
 
 class ClockServerTests(unittest.TestCase):
@@ -25,11 +34,10 @@
         return ClockServer.ClockServer
 
     def _makeOne(self, *arg, **kw):
-        return self._getTargetClass()(*arg **kw)
+        return self._getTargetClass()(*arg, **kw)
 
     def test_ctor(self):
-        logger = StringIO()
-        logger.log = logger.write
+        logger = DummyLogger()
         server = self._makeOne(method='a', period=60, user='charlie',
                                password='brown', host='localhost',
                                logger=logger)
@@ -39,14 +47,9 @@
                           'Accept: text/html,text/plain',
                           'Host: localhost',
                           'Authorization: Basic %s' % auth])
-        logger.seek(0)
-        self.assertEqual(
-            logger.read(),
-            'Clock server for "a" started (user: charlie, period: 60)')
                           
     def test_get_requests_and_response(self):
-        logger = StringIO()
-        logger.log = logger.write
+        logger = DummyLogger()
         server = self._makeOne(method='a', period=60, user='charlie',
                                password='brown', host='localhost',
                                logger=logger)
@@ -55,13 +58,12 @@
         from ZServer.medusa.http_server import http_request
         from ZServer.HTTPResponse import HTTPResponse
         from ZPublisher.HTTPRequest import HTTPRequest
-        self.failUnless(issubclass(req, http_request))
-        self.failUnless(issubclass(resp, HTTPResponse))
-        self.failUnlesss(issubclass(zreq, HTTPRequest))
+        self.failUnless(isinstance(req, http_request))
+        self.failUnless(isinstance(resp, HTTPResponse))
+        self.failUnless(isinstance(zreq, HTTPRequest))
 
     def test_get_env(self):
-        logger = StringIO()
-        logger.log = logger.write
+        logger = DummyLogger()
         server = self._makeOne(method='a', period=60, user='charlie',
                                password='brown', host='localhost',
                                logger=logger)
@@ -69,7 +71,7 @@
             def split_uri(self):
                 return '/a%20', '/b', '?foo=bar', ''
 
-            header = ['BAR']
+            header = ['BAR:baz']
         env = server.get_env(dummy_request())
         _ENV = dict(REQUEST_METHOD = 'GET',
                     SERVER_PORT = 'Clock',
@@ -81,31 +83,75 @@
                     REMOTE_ADDR = '0')
         for k, v in _ENV.items():
             self.assertEqual(env[k], v)
-        self.assertEqual(env['PATH_INFO'], '')
-        self.assertEqual(env['PATH_TRANSLATED'], '')
+        self.assertEqual(env['PATH_INFO'], '/a /b')
+        self.assertEqual(env['PATH_TRANSLATED'], '/a /b')
         self.assertEqual(env['QUERY_STRING'], 'foo=bar')
         self.assert_(env['channel.creation_time'])
 
     def test_handle_write(self):
-        logger = StringIO()
-        logger.log = logger.write
+        logger = DummyLogger()
         server = self._makeOne(method='a', period=60, user='charlie',
                                password='brown', host='localhost',
                                logger=logger)
-        server.handle_write()
-        logger.seek(0)
-        self.assertEqual(logger.read(), 'unexpected write event')
+        self.assertEqual(server.handle_write(), True)
 
     def test_handle_error(self):
-        logger = StringIO()
-        logger.log = logger.write
+        logger = DummyLogger()
         server = self._makeOne(method='a', period=60, user='charlie',
                                password='brown', host='localhost',
                                logger=logger)
-        server.handle_error()
-        logger.seek(0)
-        self.assertEqual(logger.read, 'foo')
+        self.assertRaises(AssertionError, server.handle_error)
 
+    def test_readable(self):
+        logger = DummyLogger()
+        class DummyHandler:
+            def __init__(self):
+                self.arg = []
+            def __call__(self, *arg):
+                self.arg = arg
+        handler = DummyHandler()
+        server = self._makeOne(method='a', period=1, user='charlie',
+                               password='brown', host='localhost',
+                               logger=logger, handler=handler)
+        self.assertEqual(server.readable(), False)
+        self.assertEqual(handler.arg, [])
+        time.sleep(1.1) # allow timeslice to switch
+        self.assertEqual(server.readable(), False)
+        self.assertEqual(handler.arg[0], 'Zope2')
+        from ZServer.HTTPResponse import HTTPResponse
+        from ZPublisher.HTTPRequest import HTTPRequest
+        self.assert_(isinstance(handler.arg[1], HTTPRequest))
+        self.assert_(isinstance(handler.arg[2], HTTPResponse))
+
+    def test_timeslice(self):
+        from ZServer.ClockServer import timeslice
+        aslice = timeslice(3, 6)
+        self.assertEqual(aslice, 6)
+        aslice = timeslice(3, 7)
+        self.assertEqual(aslice, 6)
+        aslice = timeslice(3, 8)
+        self.assertEqual(aslice, 6)
+        aslice = timeslice(3, 9)
+        self.assertEqual(aslice, 9)
+        aslice = timeslice(3, 10)
+        self.assertEqual(aslice, 9)
+        aslice = timeslice(3, 11)
+        self.assertEqual(aslice, 9)
+        aslice = timeslice(3, 12)
+        self.assertEqual(aslice, 12)
+        aslice = timeslice(3, 13)
+        self.assertEqual(aslice, 12)
+        aslice = timeslice(3, 14)
+        self.assertEqual(aslice, 12)
+        aslice = timeslice(3, 15)
+        self.assertEqual(aslice, 15)
+        aslice = timeslice(3, 16)
+        self.assertEqual(aslice, 15)
+        aslice = timeslice(3, 17)
+        self.assertEqual(aslice, 15)
+        aslice = timeslice(3, 18)
+        self.assertEqual(aslice, 18)
+
 def test_suite():
     suite = unittest.makeSuite(ClockServerTests)
     suite.addTest(unittest.makeSuite(LogHelperTests))



More information about the Zope-Checkins mailing list