[Zope3-checkins]
SVN: Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/re
use the name "real" instead of "remote"
Benji York
benji at zope.com
Sat Jun 3 14:12:09 EDT 2006
Log message for revision 68478:
use the name "real" instead of "remote"
Changed:
A Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/real.py
A Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/realproxy.py
D Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remote.py
D Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remoteproxy.py
-=-
Copied: Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/real.py (from rev 68477, Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remote.py)
===================================================================
--- Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remote.py 2006-06-03 18:00:48 UTC (rev 68477)
+++ Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/real.py 2006-06-03 18:12:09 UTC (rev 68478)
@@ -0,0 +1,216 @@
+##############################################################################
+#
+# Copyright (c) 2005 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Browser-based Functional Doctests
+
+$Id$
+"""
+__docformat__ = "reStructuredText"
+from BeautifulSoup import BeautifulSoup
+from zope.testbrowser import interfaces
+from zope.testbrowser.realproxy import ServerManager, PROXY_PORT
+from zope.testbrowser.utilities import disambiguate, zeroOrOne, \
+ SetattrErrorsMixin, PystoneTimer
+import re
+import urlparse
+
+
+try:
+ from zope import interface
+except ImportError:
+ from dummymodules import interface
+
+def getTagText(soup):
+ text = str(soup)
+ text = re.sub('<[^>]*>', '', text)
+ text = re.sub(' +', ' ', text)
+ return text
+
+class Browser(SetattrErrorsMixin):
+ """A web user agent."""
+ interface.implements(interfaces.IBrowser)
+
+ _contents = None
+ _counter = 0
+
+ def __init__(self, url=None):
+ self.serverManager = ServerManager()
+ self.serverManager.start()
+ self.timer = PystoneTimer()
+ self._enable_setattr_errors = True
+
+ if url is not None:
+ self.open(url)
+
+ def close(self):
+ self.serverManager.stop()
+
+ @property
+ def url(self):
+ """See zope.testbrowser.interfaces.IBrowser"""
+ return self.executeCommand('getUrl')
+
+ @property
+ def isHtml(self):
+ """See zope.testbrowser.interfaces.IBrowser"""
+ raise NotImplemented
+
+ @property
+ def title(self):
+ """See zope.testbrowser.interfaces.IBrowser"""
+ raise NotImplemented
+
+ @property
+ def contents(self):
+ """See zope.testbrowser.interfaces.IBrowser"""
+ # XXX see note in commands.js
+ return self.executeCommand('getContents')
+
+ @property
+ def headers(self):
+ """See zope.testbrowser.interfaces.IBrowser"""
+ raise NotImplemented
+
+ def handleErrors():
+ """See zope.testbrowser.interfaces.IBrowser"""
+ raise NotImplemented
+
+ def open(self, url, data=None):
+ """See zope.testbrowser.interfaces.IBrowser"""
+
+ (scheme, netloc, path, params, query, frag) = urlparse.urlparse(url)
+
+ if scheme != 'http':
+ self.send_error(400, "unknown scheme %r" % scheme)
+
+ url = urlparse.urlunparse(
+ (scheme, 'localhost:%s' % PROXY_PORT, path, params, query, frag))
+
+ self._start_timer()
+ self.executeCommand('open', url, data)
+ self._stop_timer()
+ self._changed()
+
+ def executeCommand(self, command, *args):
+ """Execute a JavaScript routine on the client (not an official API)"""
+ return self.serverManager.executeCommand(command, *args)
+
+ def _start_timer(self):
+ self.timer.start()
+
+ def _stop_timer(self):
+ self.timer.stop()
+
+ @property
+ def lastRequestPystones(self):
+ """See zope.testbrowser.interfaces.IBrowser"""
+ return self.timer.elapsedPystones
+
+ @property
+ def lastRequestSeconds(self):
+ """See zope.testbrowser.interfaces.IBrowser"""
+ return self.timer.elapsedSeconds
+
+ def reload(self):
+ """See zope.testbrowser.interfaces.IBrowser"""
+ self._start_timer()
+ self.executeCommand('reload')
+ self._stop_timer()
+ self._changed()
+
+ def goBack(self, count=1):
+ """See zope.testbrowser.interfaces.IBrowser"""
+ self._start_timer()
+ self.executeCommand('goBack')
+ self._stop_timer()
+ self._changed()
+
+ def addHeader(self, key, value):
+ """See zope.testbrowser.interfaces.IBrowser"""
+ raise NotImplemented
+
+ def getLink(self, text=None, url=None, id=None, index=None):
+ """See zope.testbrowser.interfaces.IBrowser"""
+ soup = BeautifulSoup(self.contents)('a')
+ links = []
+
+ # "msg" holds the disambiguation message
+ # the Link instance below needs to know the index of the a tag (n)
+ if text is not None:
+ msg = 'text %r' % text
+ links = []
+ for n, a in enumerate(soup):
+ # remove all tags from the text in order to search it
+ if text in getTagText(a):
+ links.append((a, n))
+ elif url is not None:
+ msg = 'url %r' % text
+ for n, a in enumerate(soup):
+ if a['href'] == url:
+ links.append((a, n))
+ elif id is not None:
+ msg = 'id %r' % id
+ for n, a in enumerate(soup):
+ if a['id'] == id:
+ links.append((a, n))
+
+ link, n = disambiguate(links, msg, index)
+ return Link(link, n, self)
+
+ def getControl(self, label=None, name=None, index=None):
+ """See zope.testbrowser.interfaces.IBrowser"""
+ raise NotImplemented
+
+ def getForm(self, id=None, name=None, action=None, index=None):
+ """See zope.testbrowser.interfaces.IBrowser"""
+ raise NotImplemented
+
+ def _changed(self):
+ self._counter += 1
+ self._contents = None
+
+
+class Link(SetattrErrorsMixin):
+ interface.implements(interfaces.ILink)
+
+ def __init__(self, link, index, browser):
+ self.link = link
+ self.browser = browser
+ self.url = urlparse.urljoin(self.browser.url, link['href'])
+ self._remembered_id = browser.executeCommand('rememberLinkN', index)
+ self._browser_counter = self.browser._counter
+ self._enable_setattr_errors = True
+
+ def click(self):
+ if self._browser_counter != self.browser._counter:
+ raise interfaces.ExpiredError
+ self.browser._start_timer()
+ self.browser.executeCommand('clickRememberedLink', self._remembered_id)
+ self.browser._stop_timer()
+ self.browser._changed()
+
+ @property
+ def text(self):
+ return getTagText(self.link)
+
+ @property
+ def tag(self):
+ return self.link.name
+
+ @property
+ def attrs(self):
+ return dict(self.link.attrs)
+
+ def __repr__(self):
+ return "<%s text=%r url=%r>" % (
+ self.__class__.__name__, self.text, self.url)
Copied: Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/realproxy.py (from rev 68475, Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remoteproxy.py)
Deleted: Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remote.py
===================================================================
--- Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remote.py 2006-06-03 18:00:48 UTC (rev 68477)
+++ Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remote.py 2006-06-03 18:12:09 UTC (rev 68478)
@@ -1,216 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2005 Zope Corporation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""Browser-based Functional Doctests
-
-$Id$
-"""
-__docformat__ = "reStructuredText"
-from BeautifulSoup import BeautifulSoup
-from zope.testbrowser import interfaces
-from zope.testbrowser.remoteproxy import ServerManager, PROXY_PORT
-from zope.testbrowser.utilities import disambiguate, zeroOrOne, \
- SetattrErrorsMixin, PystoneTimer
-import re
-import urlparse
-
-
-try:
- from zope import interface
-except ImportError:
- from dummymodules import interface
-
-def getTagText(soup):
- text = str(soup)
- text = re.sub('<[^>]*>', '', text)
- text = re.sub(' +', ' ', text)
- return text
-
-class Browser(SetattrErrorsMixin):
- """A web user agent."""
- interface.implements(interfaces.IBrowser)
-
- _contents = None
- _counter = 0
-
- def __init__(self, url=None):
- self.serverManager = ServerManager()
- self.serverManager.start()
- self.timer = PystoneTimer()
- self._enable_setattr_errors = True
-
- if url is not None:
- self.open(url)
-
- def close(self):
- self.serverManager.stop()
-
- @property
- def url(self):
- """See zope.testbrowser.interfaces.IBrowser"""
- return self.executeCommand('getUrl')
-
- @property
- def isHtml(self):
- """See zope.testbrowser.interfaces.IBrowser"""
- raise NotImplemented
-
- @property
- def title(self):
- """See zope.testbrowser.interfaces.IBrowser"""
- raise NotImplemented
-
- @property
- def contents(self):
- """See zope.testbrowser.interfaces.IBrowser"""
- # XXX see note in commands.js
- return self.executeCommand('getContents')
-
- @property
- def headers(self):
- """See zope.testbrowser.interfaces.IBrowser"""
- raise NotImplemented
-
- def handleErrors():
- """See zope.testbrowser.interfaces.IBrowser"""
- raise NotImplemented
-
- def open(self, url, data=None):
- """See zope.testbrowser.interfaces.IBrowser"""
-
- (scheme, netloc, path, params, query, frag) = urlparse.urlparse(url)
-
- if scheme != 'http':
- self.send_error(400, "unknown scheme %r" % scheme)
-
- url = urlparse.urlunparse(
- (scheme, 'localhost:%s' % PROXY_PORT, path, params, query, frag))
-
- self._start_timer()
- self.executeCommand('open', url, data)
- self._stop_timer()
- self._changed()
-
- def executeCommand(self, command, *args):
- """Execute a JavaScript routine on the client (not an official API)"""
- return self.serverManager.executeCommand(command, *args)
-
- def _start_timer(self):
- self.timer.start()
-
- def _stop_timer(self):
- self.timer.stop()
-
- @property
- def lastRequestPystones(self):
- """See zope.testbrowser.interfaces.IBrowser"""
- return self.timer.elapsedPystones
-
- @property
- def lastRequestSeconds(self):
- """See zope.testbrowser.interfaces.IBrowser"""
- return self.timer.elapsedSeconds
-
- def reload(self):
- """See zope.testbrowser.interfaces.IBrowser"""
- self._start_timer()
- self.executeCommand('reload')
- self._stop_timer()
- self._changed()
-
- def goBack(self, count=1):
- """See zope.testbrowser.interfaces.IBrowser"""
- self._start_timer()
- self.executeCommand('goBack')
- self._stop_timer()
- self._changed()
-
- def addHeader(self, key, value):
- """See zope.testbrowser.interfaces.IBrowser"""
- raise NotImplemented
-
- def getLink(self, text=None, url=None, id=None, index=None):
- """See zope.testbrowser.interfaces.IBrowser"""
- soup = BeautifulSoup(self.contents)('a')
- links = []
-
- # "msg" holds the disambiguation message
- # the Link instance below needs to know the index of the a tag (n)
- if text is not None:
- msg = 'text %r' % text
- links = []
- for n, a in enumerate(soup):
- # remove all tags from the text in order to search it
- if text in getTagText(a):
- links.append((a, n))
- elif url is not None:
- msg = 'url %r' % text
- for n, a in enumerate(soup):
- if a['href'] == url:
- links.append((a, n))
- elif id is not None:
- msg = 'id %r' % id
- for n, a in enumerate(soup):
- if a['id'] == id:
- links.append((a, n))
-
- link, n = disambiguate(links, msg, index)
- return Link(link, n, self)
-
- def getControl(self, label=None, name=None, index=None):
- """See zope.testbrowser.interfaces.IBrowser"""
- raise NotImplemented
-
- def getForm(self, id=None, name=None, action=None, index=None):
- """See zope.testbrowser.interfaces.IBrowser"""
- raise NotImplemented
-
- def _changed(self):
- self._counter += 1
- self._contents = None
-
-
-class Link(SetattrErrorsMixin):
- interface.implements(interfaces.ILink)
-
- def __init__(self, link, index, browser):
- self.link = link
- self.browser = browser
- self.url = urlparse.urljoin(self.browser.url, link['href'])
- self._remembered_id = browser.executeCommand('rememberLinkN', index)
- self._browser_counter = self.browser._counter
- self._enable_setattr_errors = True
-
- def click(self):
- if self._browser_counter != self.browser._counter:
- raise interfaces.ExpiredError
- self.browser._start_timer()
- self.browser.executeCommand('clickRememberedLink', self._remembered_id)
- self.browser._stop_timer()
- self.browser._changed()
-
- @property
- def text(self):
- return getTagText(self.link)
-
- @property
- def tag(self):
- return self.link.name
-
- @property
- def attrs(self):
- return dict(self.link.attrs)
-
- def __repr__(self):
- return "<%s text=%r url=%r>" % (
- self.__class__.__name__, self.text, self.url)
Deleted: Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remoteproxy.py
===================================================================
--- Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remoteproxy.py 2006-06-03 18:00:48 UTC (rev 68477)
+++ Zope3/branches/benji-testbrowser-with-real-browsers/src/zope/testbrowser/remoteproxy.py 2006-06-03 18:12:09 UTC (rev 68478)
@@ -1,208 +0,0 @@
-import BaseHTTPServer
-import Queue
-import SocketServer
-import cgi
-import httplib
-import os
-import select
-import simplejson
-import socket
-import threading
-import urlparse
-
-base_dir = '/home/benji/workspace/testbrowser'
-allowed_resources = ['MochiKit', 'shim.js', 'commands.js', 'start.html']
-PROXY_PORT = 8000
-
-class Constant(object):
- def __init__(self, name):
- self.name = name
-
- def __str__(self):
- return self.name
-
-UNDEFINED = Constant('undefined')
-
-class RequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
-# server_version = "TinyHTTPProxy/" + __version__
- rbufsize = 0
- remote_host = 'localhost:8080' # TODO needs to be configurable
-
- def __init__(self, request, client_address, server):
- self.command_queue = server.command_queue
- self.result_queue = server.result_queue
- BaseHTTPServer.BaseHTTPRequestHandler.__init__(
- self, request, client_address, server)
-
- def _connect(self, netloc, soc):
- if ':' in netloc:
- i = netloc.index(':')
- host_and_port = netloc[:i], int(netloc[i+1:])
- else:
- host_and_port = netloc, 80
- try:
- soc.connect(host_and_port)
- except socket.error, arg:
- try:
- msg = arg[1]
- except:
- msg = arg
- self.send_error(404, msg)
- return False
- return True
-
-# def do_CONNECT(self):
-# soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-# try:
-# if self._connect(self.path, soc):
-# self.log_request(200)
-# self.wfile.write(self.protocol_version +
-# " 200 Connection established\r\n")
-# self.wfile.write("\r\n")
-# self._read_write(soc, 300)
-# finally:
-# soc.close()
-# self.connection.close()
-
- def sendFile(self, path):
- assert path.startswith('/')
- path = path[1:]
- assert path.split('/')[1] in allowed_resources
- # XXX might use too much memory
- self.wfile.write(open(os.path.join(base_dir, path)).read())
-
- def handleRequest(self):
- (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(
- self.path, 'http')
- assert netloc == ''
- print self.path
- if scheme != 'http':
- self.send_error(400, "unknown scheme %r" % scheme)
- return
- self.log_request()
-
- if self.command in ('GET', 'POST'):
- if path.startswith('/__resources__/'):
- self.sendFile(path)
- return
- elif path.startswith('/__api__/'):
- operation = urlparse.urlparse(self.path)[2].split('/')[-1]
- raw_json = self.rfile.next()
-
- if raw_json.strip() == 'undefined':
- last_result = UNDEFINED
- else:
- last_result = simplejson.loads(raw_json)
-
- if operation == 'next':
- # if no command has been processed yet, the last_result
- # value will be a placeholder
- if last_result != '__testbrowser__no_result_yet':
- self.result_queue.put(last_result)
-
- response = self.command_queue.get()
- if response[0] == '_tb_stop':
- self.server.stop = True
- self.result_queue.put('the server has stopped')
- else:
- self.send_response(404)
- self.send_header('Content-Type', 'text/plain')
- self.end_headers()
- msg = 'unknown operation: %r' % operation
- self.wfile.write(msg)
- raise RuntimeError(msg)
-
- self.send_response(200)
- self.send_header('Content-Type', 'text/plain')
- self.end_headers()
- self.wfile.write(simplejson.dumps(response))
- return
-
- soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- print 'sending', self.remote_host
- if self._connect(self.remote_host, soc):
- soc.send("%s %s %s\r\n" % (
- self.command,
- urlparse.urlunparse(('', '', path, params, query, '')),
- self.request_version))
- self.headers['Connection'] = 'close'
-
- for key_val in self.headers.items():
- soc.send("%s: %s\r\n" % key_val)
- soc.send("\r\n")
- self._read_write(soc)
- print 'done with', self.path
- finally:
- soc.close()
- self.connection.close()
-
- def _read_write(self, soc, max_idling=20):
- iw = [self.connection, soc]
- count = 0
- while 1:
- count += 1
- (iwtd, _, ewtd) = select.select(iw, [], iw, 3)
- if ewtd:
- break
- if iwtd:
- for i in iwtd:
- if i is soc:
- out = self.connection
- else:
- out = soc
- data = i.recv(8192)
-
- if data:
- out.send(data)
- count = 0
-
- if count == max_idling:
- break
-
- do_HEAD = do_POST = do_PUT = do_DELETE = do_GET = handleRequest
-
- def do_NOOP(self):
- self.send_response(200)
- self.end_headers()
-
- def log_message(self, format, *args):
- pass
-
-
-class HttpServer(SocketServer.ThreadingMixIn, BaseHTTPServer.HTTPServer):
-
- stop = False
-
- def __init__(self, *args, **kws):
- self.command_queue = Queue.Queue()
- self.result_queue = Queue.Queue()
- BaseHTTPServer.HTTPServer.__init__(self, *args, **kws)
-
- def serve_forever(self):
- """Handle one request at a time until stopped."""
- while not self.stop:
- self.handle_request()
-
-
-class ServerManager(object):
- def __init__(self):
- self.port = PROXY_PORT
- self.server = HttpServer(('0.0.0.0', self.port), RequestHandler)
-
- def start(self):
- self.server_thread = threading.Thread(
- target=self.server.serve_forever)
- self.server_thread.setDaemon(True)
- self.server_thread.start()
-
- def stop(self):
- self.executeCommand('stop')
- conn = httplib.HTTPConnection('localhost:%d' % self.port)
- conn.request('NOOP', '/')
- conn.getresponse()
- self.server_thread.join()
-
- def executeCommand(self, command, *args):
- self.server.command_queue.put( ('_tb_'+command, simplejson.dumps(args)) )
- return self.server.result_queue.get()
More information about the Zope3-Checkins
mailing list