[Zope3-checkins] SVN: Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/re use the name "real" instead of "remote"

Benji York benji at zope.com
Sat Jun 3 14:12:09 EDT 2006


Log message for revision 68478:
  use the name "real" instead of "remote"
  

Changed:
  A   Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/real.py
  A   Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/realproxy.py
  D   Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remote.py
  D   Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remoteproxy.py

-=-
Copied: Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/real.py (from rev 68477, Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remote.py)
===================================================================
--- Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remote.py	2006-06-03 18:00:48 UTC (rev 68477)
+++ Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/real.py	2006-06-03 18:12:09 UTC (rev 68478)
@@ -0,0 +1,216 @@
+##############################################################################
+#
+# Copyright (c) 2005 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Browser-based Functional Doctests
+
+$Id$
+"""
+__docformat__ = "reStructuredText"
+from BeautifulSoup import BeautifulSoup
+from zope.testbrowser import interfaces
+from zope.testbrowser.realproxy import ServerManager, PROXY_PORT
+from zope.testbrowser.utilities import disambiguate, zeroOrOne, \
+    SetattrErrorsMixin, PystoneTimer
+import re
+import urlparse
+
+
+try:
+    from zope import interface
+except ImportError:
+    from dummymodules import interface
+
+def getTagText(soup):
+    text = str(soup)
+    text = re.sub('<[^>]*>', '', text)
+    text = re.sub(' +', ' ', text)
+    return text
+
+class Browser(SetattrErrorsMixin):
+    """A web user agent."""
+    interface.implements(interfaces.IBrowser)
+
+    _contents = None
+    _counter = 0
+
+    def __init__(self, url=None):
+        self.serverManager = ServerManager()
+        self.serverManager.start()
+        self.timer = PystoneTimer()
+        self._enable_setattr_errors = True
+
+        if url is not None:
+            self.open(url)
+
+    def close(self):
+        self.serverManager.stop()
+
+    @property
+    def url(self):
+        """See zope.testbrowser.interfaces.IBrowser"""
+        return self.executeCommand('getUrl')
+
+    @property
+    def isHtml(self):
+        """See zope.testbrowser.interfaces.IBrowser"""
+        raise NotImplemented
+
+    @property
+    def title(self):
+        """See zope.testbrowser.interfaces.IBrowser"""
+        raise NotImplemented
+
+    @property
+    def contents(self):
+        """See zope.testbrowser.interfaces.IBrowser"""
+        # XXX see note in commands.js
+        return self.executeCommand('getContents')
+
+    @property
+    def headers(self):
+        """See zope.testbrowser.interfaces.IBrowser"""
+        raise NotImplemented
+
+    def handleErrors():
+        """See zope.testbrowser.interfaces.IBrowser"""
+        raise NotImplemented
+
+    def open(self, url, data=None):
+        """See zope.testbrowser.interfaces.IBrowser"""
+
+        (scheme, netloc, path, params, query, frag) = urlparse.urlparse(url)
+
+        if scheme != 'http':
+            self.send_error(400, "unknown scheme %r" % scheme)
+
+        url = urlparse.urlunparse(
+            (scheme, 'localhost:%s' % PROXY_PORT, path, params, query, frag))
+
+        self._start_timer()
+        self.executeCommand('open', url, data)
+        self._stop_timer()
+        self._changed()
+
+    def executeCommand(self, command, *args):
+        """Execute a JavaScript routine on the client (not an official API)"""
+        return self.serverManager.executeCommand(command, *args)
+
+    def _start_timer(self):
+        self.timer.start()
+
+    def _stop_timer(self):
+        self.timer.stop()
+
+    @property
+    def lastRequestPystones(self):
+        """See zope.testbrowser.interfaces.IBrowser"""
+        return self.timer.elapsedPystones
+
+    @property
+    def lastRequestSeconds(self):
+        """See zope.testbrowser.interfaces.IBrowser"""
+        return self.timer.elapsedSeconds
+
+    def reload(self):
+        """See zope.testbrowser.interfaces.IBrowser"""
+        self._start_timer()
+        self.executeCommand('reload')
+        self._stop_timer()
+        self._changed()
+
+    def goBack(self, count=1):
+        """See zope.testbrowser.interfaces.IBrowser"""
+        self._start_timer()
+        self.executeCommand('goBack')
+        self._stop_timer()
+        self._changed()
+
+    def addHeader(self, key, value):
+        """See zope.testbrowser.interfaces.IBrowser"""
+        raise NotImplemented
+
+    def getLink(self, text=None, url=None, id=None, index=None):
+        """See zope.testbrowser.interfaces.IBrowser"""
+        soup = BeautifulSoup(self.contents)('a')
+        links = []
+
+        # "msg" holds the disambiguation message
+        # the Link instance below needs to know the index of the a tag (n)
+        if text is not None:
+            msg = 'text %r' % text
+            links = []
+            for n, a in enumerate(soup):
+                # remove all tags from the text in order to search it
+                if text in getTagText(a):
+                    links.append((a, n))
+        elif url is not None:
+            msg = 'url %r' % text
+            for n, a in enumerate(soup):
+                if a['href'] == url:
+                    links.append((a, n))
+        elif id is not None:
+            msg = 'id %r' % id
+            for n, a in enumerate(soup):
+                if a['id'] == id:
+                    links.append((a, n))
+
+        link, n = disambiguate(links, msg, index)
+        return Link(link, n, self)
+
+    def getControl(self, label=None, name=None, index=None):
+        """See zope.testbrowser.interfaces.IBrowser"""
+        raise NotImplemented
+
+    def getForm(self, id=None, name=None, action=None, index=None):
+        """See zope.testbrowser.interfaces.IBrowser"""
+        raise NotImplemented
+
+    def _changed(self):
+        self._counter += 1
+        self._contents = None
+
+
+class Link(SetattrErrorsMixin):
+    interface.implements(interfaces.ILink)
+
+    def __init__(self, link, index, browser):
+        self.link = link
+        self.browser = browser
+        self.url = urlparse.urljoin(self.browser.url, link['href'])
+        self._remembered_id = browser.executeCommand('rememberLinkN', index)
+        self._browser_counter = self.browser._counter
+        self._enable_setattr_errors = True
+
+    def click(self):
+        if self._browser_counter != self.browser._counter:
+            raise interfaces.ExpiredError
+        self.browser._start_timer()
+        self.browser.executeCommand('clickRememberedLink', self._remembered_id)
+        self.browser._stop_timer()
+        self.browser._changed()
+
+    @property
+    def text(self):
+        return getTagText(self.link)
+
+    @property
+    def tag(self):
+        return self.link.name
+
+    @property
+    def attrs(self):
+        return dict(self.link.attrs)
+
+    def __repr__(self):
+        return "<%s text=%r url=%r>" % (
+            self.__class__.__name__, self.text, self.url)

Copied: Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/realproxy.py (from rev 68475, Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remoteproxy.py)

Deleted: Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remote.py
===================================================================
--- Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remote.py	2006-06-03 18:00:48 UTC (rev 68477)
+++ Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remote.py	2006-06-03 18:12:09 UTC (rev 68478)
@@ -1,216 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2005 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL).  A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Browser-based Functional Doctests
-
-$Id$
-"""
-__docformat__ = "reStructuredText"
-from BeautifulSoup import BeautifulSoup
-from zope.testbrowser import interfaces
-from zope.testbrowser.remoteproxy import ServerManager, PROXY_PORT
-from zope.testbrowser.utilities import disambiguate, zeroOrOne, \
-    SetattrErrorsMixin, PystoneTimer
-import re
-import urlparse
-
-
-try:
-    from zope import interface
-except ImportError:
-    from dummymodules import interface
-
-def getTagText(soup):
-    text = str(soup)
-    text = re.sub('<[^>]*>', '', text)
-    text = re.sub(' +', ' ', text)
-    return text
-
-class Browser(SetattrErrorsMixin):
-    """A web user agent."""
-    interface.implements(interfaces.IBrowser)
-
-    _contents = None
-    _counter = 0
-
-    def __init__(self, url=None):
-        self.serverManager = ServerManager()
-        self.serverManager.start()
-        self.timer = PystoneTimer()
-        self._enable_setattr_errors = True
-
-        if url is not None:
-            self.open(url)
-
-    def close(self):
-        self.serverManager.stop()
-
-    @property
-    def url(self):
-        """See zope.testbrowser.interfaces.IBrowser"""
-        return self.executeCommand('getUrl')
-
-    @property
-    def isHtml(self):
-        """See zope.testbrowser.interfaces.IBrowser"""
-        raise NotImplemented
-
-    @property
-    def title(self):
-        """See zope.testbrowser.interfaces.IBrowser"""
-        raise NotImplemented
-
-    @property
-    def contents(self):
-        """See zope.testbrowser.interfaces.IBrowser"""
-        # XXX see note in commands.js
-        return self.executeCommand('getContents')
-
-    @property
-    def headers(self):
-        """See zope.testbrowser.interfaces.IBrowser"""
-        raise NotImplemented
-
-    def handleErrors():
-        """See zope.testbrowser.interfaces.IBrowser"""
-        raise NotImplemented
-
-    def open(self, url, data=None):
-        """See zope.testbrowser.interfaces.IBrowser"""
-
-        (scheme, netloc, path, params, query, frag) = urlparse.urlparse(url)
-
-        if scheme != 'http':
-            self.send_error(400, "unknown scheme %r" % scheme)
-
-        url = urlparse.urlunparse(
-            (scheme, 'localhost:%s' % PROXY_PORT, path, params, query, frag))
-
-        self._start_timer()
-        self.executeCommand('open', url, data)
-        self._stop_timer()
-        self._changed()
-
-    def executeCommand(self, command, *args):
-        """Execute a JavaScript routine on the client (not an official API)"""
-        return self.serverManager.executeCommand(command, *args)
-
-    def _start_timer(self):
-        self.timer.start()
-
-    def _stop_timer(self):
-        self.timer.stop()
-
-    @property
-    def lastRequestPystones(self):
-        """See zope.testbrowser.interfaces.IBrowser"""
-        return self.timer.elapsedPystones
-
-    @property
-    def lastRequestSeconds(self):
-        """See zope.testbrowser.interfaces.IBrowser"""
-        return self.timer.elapsedSeconds
-
-    def reload(self):
-        """See zope.testbrowser.interfaces.IBrowser"""
-        self._start_timer()
-        self.executeCommand('reload')
-        self._stop_timer()
-        self._changed()
-
-    def goBack(self, count=1):
-        """See zope.testbrowser.interfaces.IBrowser"""
-        self._start_timer()
-        self.executeCommand('goBack')
-        self._stop_timer()
-        self._changed()
-
-    def addHeader(self, key, value):
-        """See zope.testbrowser.interfaces.IBrowser"""
-        raise NotImplemented
-
-    def getLink(self, text=None, url=None, id=None, index=None):
-        """See zope.testbrowser.interfaces.IBrowser"""
-        soup = BeautifulSoup(self.contents)('a')
-        links = []
-
-        # "msg" holds the disambiguation message
-        # the Link instance below needs to know the index of the a tag (n)
-        if text is not None:
-            msg = 'text %r' % text
-            links = []
-            for n, a in enumerate(soup):
-                # remove all tags from the text in order to search it
-                if text in getTagText(a):
-                    links.append((a, n))
-        elif url is not None:
-            msg = 'url %r' % text
-            for n, a in enumerate(soup):
-                if a['href'] == url:
-                    links.append((a, n))
-        elif id is not None:
-            msg = 'id %r' % id
-            for n, a in enumerate(soup):
-                if a['id'] == id:
-                    links.append((a, n))
-
-        link, n = disambiguate(links, msg, index)
-        return Link(link, n, self)
-
-    def getControl(self, label=None, name=None, index=None):
-        """See zope.testbrowser.interfaces.IBrowser"""
-        raise NotImplemented
-
-    def getForm(self, id=None, name=None, action=None, index=None):
-        """See zope.testbrowser.interfaces.IBrowser"""
-        raise NotImplemented
-
-    def _changed(self):
-        self._counter += 1
-        self._contents = None
-
-
-class Link(SetattrErrorsMixin):
-    interface.implements(interfaces.ILink)
-
-    def __init__(self, link, index, browser):
-        self.link = link
-        self.browser = browser
-        self.url = urlparse.urljoin(self.browser.url, link['href'])
-        self._remembered_id = browser.executeCommand('rememberLinkN', index)
-        self._browser_counter = self.browser._counter
-        self._enable_setattr_errors = True
-
-    def click(self):
-        if self._browser_counter != self.browser._counter:
-            raise interfaces.ExpiredError
-        self.browser._start_timer()
-        self.browser.executeCommand('clickRememberedLink', self._remembered_id)
-        self.browser._stop_timer()
-        self.browser._changed()
-
-    @property
-    def text(self):
-        return getTagText(self.link)
-
-    @property
-    def tag(self):
-        return self.link.name
-
-    @property
-    def attrs(self):
-        return dict(self.link.attrs)
-
-    def __repr__(self):
-        return "<%s text=%r url=%r>" % (
-            self.__class__.__name__, self.text, self.url)

Deleted: Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remoteproxy.py
===================================================================
--- Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remoteproxy.py	2006-06-03 18:00:48 UTC (rev 68477)
+++ Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remoteproxy.py	2006-06-03 18:12:09 UTC (rev 68478)
@@ -1,208 +0,0 @@
-import BaseHTTPServer
-import Queue
-import SocketServer
-import cgi
-import httplib
-import os
-import select
-import simplejson
-import socket
-import threading
-import urlparse
-
-base_dir = '/home/benji/workspace/testbrowser'
-allowed_resources = ['MochiKit', 'shim.js', 'commands.js', 'start.html']
-PROXY_PORT = 8000
-
-class Constant(object):
-    def __init__(self, name):
-        self.name = name
-
-    def __str__(self):
-        return self.name
-
-UNDEFINED = Constant('undefined')
-
-class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
-#    server_version = "TinyHTTPProxy/" + __version__
-    rbufsize = 0
-    remote_host = 'localhost:8080' # TODO needs to be configurable
-
-    def __init__(self, request, client_address, server):
-        self.command_queue = server.command_queue
-        self.result_queue = server.result_queue
-        BaseHTTPServer.BaseHTTPRequestHandler.__init__(
-            self, request, client_address, server)
-
-    def _connect(self, netloc, soc):
-        if ':' in netloc:
-            i = netloc.index(':')
-            host_and_port = netloc[:i], int(netloc[i+1:])
-        else:
-            host_and_port = netloc, 80
-        try:
-            soc.connect(host_and_port)
-        except socket.error, arg:
-            try:
-                msg = arg[1]
-            except:
-                msg = arg
-            self.send_error(404, msg)
-            return False
-        return True
-
-#    def do_CONNECT(self):
-#        soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-#        try:
-#            if self._connect(self.path, soc):
-#                self.log_request(200)
-#                self.wfile.write(self.protocol_version +
-#                                 " 200 Connection established\r\n")
-#                self.wfile.write("\r\n")
-#                self._read_write(soc, 300)
-#        finally:
-#            soc.close()
-#            self.connection.close()
-
-    def sendFile(self, path):
-        assert path.startswith('/')
-        path = path[1:]
-        assert path.split('/')[1] in allowed_resources
-        # XXX might use too much memory
-        self.wfile.write(open(os.path.join(base_dir, path)).read())
-
-    def handleRequest(self):
-        (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(
-            self.path, 'http')
-        assert netloc == ''
-        print self.path
-        if scheme != 'http':
-            self.send_error(400, "unknown scheme %r" % scheme)
-            return
-        self.log_request()
-
-        if self.command in ('GET', 'POST'):
-            if path.startswith('/__resources__/'):
-                self.sendFile(path)
-                return
-            elif path.startswith('/__api__/'):
-                operation = urlparse.urlparse(self.path)[2].split('/')[-1]
-                raw_json = self.rfile.next()
-
-                if raw_json.strip() == 'undefined':
-                    last_result = UNDEFINED
-                else:
-                    last_result = simplejson.loads(raw_json)
-
-                if operation == 'next':
-                    # if no command has been processed yet, the last_result
-                    # value will be a placeholder
-                    if last_result != '__testbrowser__no_result_yet':
-                        self.result_queue.put(last_result)
-
-                    response = self.command_queue.get()
-                    if response[0] == '_tb_stop':
-                        self.server.stop = True
-                        self.result_queue.put('the server has stopped')
-                else:
-                    self.send_response(404)
-                    self.send_header('Content-Type', 'text/plain')
-                    self.end_headers()
-                    msg = 'unknown operation: %r' % operation
-                    self.wfile.write(msg)
-                    raise RuntimeError(msg)
-
-                self.send_response(200)
-                self.send_header('Content-Type', 'text/plain')
-                self.end_headers()
-                self.wfile.write(simplejson.dumps(response))
-                return
-
-        soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        try:
-            print 'sending', self.remote_host
-            if self._connect(self.remote_host, soc):
-                soc.send("%s %s %s\r\n" % (
-                    self.command,
-                    urlparse.urlunparse(('', '', path, params, query, '')),
-                    self.request_version))
-                self.headers['Connection'] = 'close'
-
-                for key_val in self.headers.items():
-                    soc.send("%s: %s\r\n" % key_val)
-                soc.send("\r\n")
-                self._read_write(soc)
-                print 'done with', self.path
-        finally:
-            soc.close()
-            self.connection.close()
-
-    def _read_write(self, soc, max_idling=20):
-        iw = [self.connection, soc]
-        count = 0
-        while 1:
-            count += 1
-            (iwtd, _, ewtd) = select.select(iw, [], iw, 3)
-            if ewtd:
-                break
-            if iwtd:
-                for i in iwtd:
-                    if i is soc:
-                        out = self.connection
-                    else:
-                        out = soc
-                    data = i.recv(8192)
-
-                    if data:
-                        out.send(data)
-                        count = 0
-
-            if count == max_idling:
-                break
-
-    do_HEAD = do_POST = do_PUT = do_DELETE = do_GET = handleRequest
-
-    def do_NOOP(self):
-        self.send_response(200)
-        self.end_headers()
-
-    def log_message(self, format, *args):
-        pass
-
-
-class HttpServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
-
-    stop = False
-
-    def __init__(self, *args, **kws):
-        self.command_queue = Queue.Queue()
-        self.result_queue = Queue.Queue()
-        BaseHTTPServer.HTTPServer.__init__(self, *args, **kws)
-
-    def serve_forever(self):
-        """Handle one request at a time until stopped."""
-        while not self.stop:
-            self.handle_request()
-
-
-class ServerManager(object):
-    def __init__(self):
-        self.port = PROXY_PORT
-        self.server = HttpServer(('0.0.0.0', self.port), RequestHandler)
-
-    def start(self):
-        self.server_thread = threading.Thread(
-                                target=self.server.serve_forever)
-        self.server_thread.setDaemon(True)
-        self.server_thread.start()
-
-    def stop(self):
-        self.executeCommand('stop')
-        conn = httplib.HTTPConnection('localhost:%d' % self.port)
-        conn.request('NOOP', '/')
-        conn.getresponse()
-        self.server_thread.join()
-
-    def executeCommand(self, command, *args):
-        self.server.command_queue.put( ('_tb_'+command, simplejson.dumps(args)) )
-        return self.server.result_queue.get()



More information about the Zope3-Checkins mailing list