[Zope3-checkins] SVN: Zope3/trunk/ Added the HTTP request recorder,
which lets you inspect raw HTTP requests and
Marius Gedminas
marius at pov.lt
Fri Mar 11 22:45:36 EST 2005
Log message for revision 29455:
Added the HTTP request recorder, which lets you inspect raw HTTP requests and
responses. It can be used to create functional doctests without requiring
third-party tools such as TCPWatch. See the README.txt file in
zope.app.recorder for instructions. You can also look at the screenshot at
http://mg.pov.lt/blog/zope3-request-recorder.html
Changed:
U Zope3/trunk/doc/CHANGES.txt
A Zope3/trunk/package-includes/zope.app.recorder-configure.zcml
A Zope3/trunk/src/zope/app/recorder/
A Zope3/trunk/src/zope/app/recorder/README.txt
A Zope3/trunk/src/zope/app/recorder/__init__.py
A Zope3/trunk/src/zope/app/recorder/browser.py
A Zope3/trunk/src/zope/app/recorder/configure.zcml
A Zope3/trunk/src/zope/app/recorder/sessions.pt
A Zope3/trunk/src/zope/app/recorder/tests.py
-=-
Modified: Zope3/trunk/doc/CHANGES.txt
===================================================================
--- Zope3/trunk/doc/CHANGES.txt 2005-03-12 01:29:49 UTC (rev 29454)
+++ Zope3/trunk/doc/CHANGES.txt 2005-03-12 03:45:35 UTC (rev 29455)
@@ -9,7 +9,12 @@
Some future release (Zope X3 3.1.0)
New features
-
+
+ - Added the HTTP request recorder, which lets you inspect raw HTTP
+ requests and responses. It can be used to create functional doctests
+ without requiring third-party tools such as TCPWatch. See the
+ README.txt file in zope.app.recorder for instructions.
+
- zope.app.form.utility setUpEditWidgets, when given a source
that is security proxied and asked to set up edit widgets
for fields that will raise Unauthorized when set, now raises
Added: Zope3/trunk/package-includes/zope.app.recorder-configure.zcml
===================================================================
--- Zope3/trunk/package-includes/zope.app.recorder-configure.zcml 2005-03-12 01:29:49 UTC (rev 29454)
+++ Zope3/trunk/package-includes/zope.app.recorder-configure.zcml 2005-03-12 03:45:35 UTC (rev 29455)
@@ -0,0 +1 @@
+<include package="zope.app.recorder" />
Property changes on: Zope3/trunk/package-includes/zope.app.recorder-configure.zcml
___________________________________________________________________
Name: svn:eol-style
+ native
Added: Zope3/trunk/src/zope/app/recorder/README.txt
===================================================================
--- Zope3/trunk/src/zope/app/recorder/README.txt 2005-03-12 01:29:49 UTC (rev 29454)
+++ Zope3/trunk/src/zope/app/recorder/README.txt 2005-03-12 03:45:35 UTC (rev 29455)
@@ -0,0 +1,39 @@
+HTTP request/response recorder
+==============================
+
+zope.app.recorder lets you create functional doctests without relying on
+third-party tools such as tcpwatch.
+
+Quick Start
+-----------
+
+Add the following section to your zope.conf:
+
+ <server>
+ type RecordingHTTP
+ port 8081
+ </server>
+
+Now go to http://localhost:8081/ and do whatever needs to be recorded. When
+done, go to http://localhost:8081/++etc++process/RecordedSessions.html and
+download your recorded session as a ready-to-run functional doctest.
+
+This tool can also be useful for other purposes, not just for creating
+functional doctests.
+
+To Do
+-----
+
+- Remove the unsuccessful attempt to make RecordedSessions use MappingStorage,
+ unless someone has an idea how to make it work.
+
+Ideas for Further Development
+-----------------------------
+
+- Refactor zope.app.testing.dochttp for easier reuse
+- List recorded requests in batches
+- Let users clear only selected requests
+- Show the remote IP for each request, allow filtering by IP
+- Show the authenticated user for each request
+- See how zope.app.recorder breaks with HTTP pipelining/chunked transfer
+ encoding, then fix it
Property changes on: Zope3/trunk/src/zope/app/recorder/README.txt
___________________________________________________________________
Name: svn:eol-style
+ native
Added: Zope3/trunk/src/zope/app/recorder/__init__.py
===================================================================
--- Zope3/trunk/src/zope/app/recorder/__init__.py 2005-03-12 01:29:49 UTC (rev 29454)
+++ Zope3/trunk/src/zope/app/recorder/__init__.py 2005-03-12 03:45:35 UTC (rev 29455)
@@ -0,0 +1,265 @@
+##############################################################################
+#
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+HTTP session recorder.
+
+$Id$
+"""
+__docformat__ = 'restructuredtext'
+
+import thread
+import threading
+import transaction
+import ZODB.MappingStorage
+from ZODB.POSException import ConflictError
+from BTrees.IOBTree import IOBTree
+from zope.app.publication.httpfactory import HTTPPublicationRequestFactory
+from zope.app.server.servertype import ServerType
+from zope.server.http.commonaccesslogger import CommonAccessLogger
+from zope.server.http.publisherhttpserver import PublisherHTTPServer
+from zope.server.http.httpserverchannel import HTTPServerChannel
+from zope.server.http.httprequestparser import HTTPRequestParser
+from zope.server.http.httptask import HTTPTask
+from zope.publisher.publish import publish
+
+
+class RecordingHTTPTask(HTTPTask):
+ """An HTTPTask that remembers the response as a string."""
+
+ def __init__(self, *args, **kw):
+ self._response_data = []
+ HTTPTask.__init__(self, *args, **kw)
+
+ def write(self, data):
+ """Send data to the client.
+
+ Wraps HTTPTask.write and records the response.
+ """
+ if not self.wrote_header:
+ # HTTPTask.write will call self.buildResponseHeader() and send the
+ # result before sending 'data'. This code assumes that
+ # buildResponseHeader will return the same string when called the
+ # second time.
+ self._response_data.append(self.buildResponseHeader())
+ HTTPTask.write(self, data)
+ self._response_data.append(data)
+
+ def getRawResponse(self):
+ """Return the full HTTP response as a string."""
+ return ''.join(self._response_data)
+
+
+class RecordingHTTPRequestParser(HTTPRequestParser):
+ """An HTTPRequestParser that remembers the raw request as a string."""
+
+ def __init__(self, *args, **kw):
+ self._request_data = []
+ HTTPRequestParser.__init__(self, *args, **kw)
+
+ def received(self, data):
+ """Process data received from the client.
+
+ Wraps HTTPRequestParser.write and records the request.
+ """
+ consumed = HTTPRequestParser.received(self, data)
+ self._request_data.append(data[:consumed])
+ return consumed
+
+ def getRawRequest(self):
+ """Return the full HTTP request as a string."""
+ return ''.join(self._request_data)
+
+
+class RecordingHTTPServerChannel(HTTPServerChannel):
+ """An HTTPServerChannel that records request and response."""
+
+ task_class = RecordingHTTPTask
+ parser_class = RecordingHTTPRequestParser
+
+
+class RecordingHTTPServer(PublisherHTTPServer):
+ """Zope Publisher-specific HTTP server that can record requests."""
+
+ channel_class = RecordingHTTPServerChannel
+ num_retries = 10
+
+ def executeRequest(self, task):
+ """Process a request.
+
+ Wraps PublisherHTTPServer.executeRequest().
+ """
+ PublisherHTTPServer.executeRequest(self, task)
+ # PublisherHTTPServer either committed or aborted a transaction,
+ # so we need a new one.
+ # TODO: Actually, we only need a transaction if we use
+ # ZODBBasedRequestStorage, which we don't since it has problems
+ # keeping data fresh enough. This loop will go away soon, unless
+ # I manage to fix ZODBBasedRequestStorage.
+ for n in range(self.num_retries):
+ try:
+ txn = transaction.begin()
+ txn.note("request recorder")
+ requestStorage.add(RecordedRequest.fromHTTPTask(task))
+ transaction.commit()
+ except ConflictError:
+ transaction.abort()
+ if n == self.num_retries - 1:
+ raise
+ else:
+ break
+
+
+class RecordedRequest(object):
+ """A single recorded request and response."""
+
+ def __init__(self, timestamp, request_string, response_string,
+ method=None, path=None, status=None, reason=None):
+ self.timestamp = timestamp # float value, as returned by time.time()
+ self.request_string = request_string
+ self.response_string = response_string
+ # The following attributes could be extracted from request_string and
+ # response_string, but it is simpler to just take readily-available
+ # values from RecordingHTTPTask.
+ self.method = method
+ self.path = path
+ self.status = status
+ self.reason = reason
+
+ def fromHTTPTask(cls, task):
+ """Create a RecordedRequest with data extracted from RecordingHTTPTask.
+ """
+ rq = cls(timestamp=task.start_time,
+ request_string=task.request_data.getRawRequest(),
+ response_string=task.getRawResponse(),
+ method=task.request_data.command.upper(),
+ path=task.request_data.path,
+ status=task.status,
+ reason=task.reason)
+ return rq
+
+ fromHTTPTask = classmethod(fromHTTPTask)
+
+
+class RequestStorage(object):
+ """A collection of recorded requests.
+
+ This class is thread-safe, that is, its methods can be called from multiple
+ threads simultaneously.
+
+ Most of thread safety comes from Python's global interpreter lock, but
+ 'add' needs extra locking.
+ """
+
+ _requests = {}
+ _lock = threading.Lock()
+
+ def add(self, rr):
+ """Add a RecordedRequest to the list."""
+ self._lock.acquire()
+ try:
+ rr.id = len(self._requests) + 1
+ self._requests[rr.id] = rr
+ finally:
+ self._lock.release()
+
+ def __len__(self):
+ """Return the number of recorded requests."""
+ return len(self._requests)
+
+ def __iter__(self):
+ """List all recorded requests."""
+ # Iterate over a new list object instead of calling itervalues, so that
+ # we don't have to worry about other threads modifying the dict while
+ # this thread is iterating over it.
+ return iter(self._requests.values())
+
+ def get(self, id):
+ """Return the request with a given id, or None."""
+ return self._requests.get(id)
+
+ def clear(self):
+ """Clear all recorded requests."""
+ self._requests.clear()
+
+
+class ZODBBasedRequestStorage(object):
+ """A collection of recorded requests.
+
+ This class is thread-safe, that is, its methods can be called from multiple
+ threads simultaneously.
+
+ In addition, it is transactional.
+
+ TODO: The simple ID allocation strategy used by RequestStorage.add will
+ cause frequent conflict errors. Something should be done about that.
+
+ TODO: _getData() tends to return stale data, and you need to refresh the
+ ++etc++process/RecordedSessions.html page two or three times until
+ it becomes up to date.
+
+ TODO: This class is not used because of the previous problem. Either fix
+ the problem, or remove this class.
+ """
+
+ _ram_storage = ZODB.MappingStorage.MappingStorage()
+ _ram_db = ZODB.DB(_ram_storage)
+ _conns = {}
+ _key = 'RequestStorage'
+
+ def _getData(self):
+ """Get the shared data container from the mapping storage."""
+ # This method closely mimics RAMSessionDataContainer._getData
+ # from zope.app.session.session
+ tid = thread.get_ident()
+ if tid not in self._conns:
+ self._conns[tid] = self._ram_db.open()
+ root = self._conns[tid].root()
+ if self._key not in root:
+ root[self._key] = IOBTree()
+ return root[self._key]
+
+ def add(self, rr):
+ """Add a RecordedRequest to the list."""
+ requests = self._getData()
+ rr.id = len(requests) + 1
+ requests[rr.id] = rr
+
+ def __len__(self):
+ """Return the number of recorded requests."""
+ return len(self._getData())
+
+ def __iter__(self):
+ """List all recorded requests."""
+ return iter(self._getData().values())
+
+ def get(self, id):
+ """Return the request with a given id, or None."""
+ requests = self._getData()
+ return requests.get(id)
+
+ def clear(self):
+ """Clear all recorded requests."""
+ self._getData().clear()
+
+
+#
+# Globals
+#
+
+requestStorage = RequestStorage()
+
+recordinghttp = ServerType(RecordingHTTPServer,
+ HTTPPublicationRequestFactory,
+ CommonAccessLogger,
+ 8081, True)
Property changes on: Zope3/trunk/src/zope/app/recorder/__init__.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Added: Zope3/trunk/src/zope/app/recorder/browser.py
===================================================================
--- Zope3/trunk/src/zope/app/recorder/browser.py 2005-03-12 01:29:49 UTC (rev 29454)
+++ Zope3/trunk/src/zope/app/recorder/browser.py 2005-03-12 03:45:35 UTC (rev 29455)
@@ -0,0 +1,176 @@
+##############################################################################
+#
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+Browser views for the HTTP session recorder.
+
+$Id$
+"""
+__docformat__ = 'restructuredtext'
+
+import re
+import sys
+import urllib
+import datetime
+from cStringIO import StringIO
+from zope.interface import Interface
+from zope.schema import Text
+from zope.app.form.utility import setUpWidgets, getWidgetsData
+from zope.app.form.interfaces import IInputWidget, WidgetsError
+from zope.app.publisher.browser import BrowserView
+from zope.app import recorder
+from zope.app.testing import dochttp
+from zope.app.i18n import ZopeMessageIDFactory as _
+from zope.publisher.interfaces import NotFound
+
+
+class IRecorderSessionsFilterForm(Interface):
+ """Form for filtering recorded requests."""
+
+ skip_urls = Text(title=_(u"URLs to ignore"),
+ description=_(u"""A list of regular expressions.
+
+ Requests whose paths match any of the expressions listed
+ here will not be shown."""),
+ required=False,
+ default=u'')
+
+
+class RecordedSessionsView(BrowserView):
+ """View for /++etc++process/RecordedSessions.html"""
+
+ error = None
+
+ def __init__(self, context, request):
+ BrowserView.__init__(self, context, request)
+ setUpWidgets(self, IRecorderSessionsFilterForm, IInputWidget)
+
+ def _skip_urls(self):
+ """Extract URL regexes from the request.
+
+ Returns a multi-line string.
+ """
+ try:
+ data = getWidgetsData(self, IRecorderSessionsFilterForm)
+ except WidgetsError:
+ return ''
+ return data.get('skip_urls') or ''
+
+ def _skip_urls_as_regexes(self):
+ """Extract URL regexes from the request.
+
+ Returns an iterator of compiled regex objects.
+
+ Skips invalid regexes and sets self.error.
+ """
+ for pattern in self._skip_urls().splitlines():
+ if pattern.strip():
+ try:
+ yield re.compile(pattern)
+ except re.error:
+ self.error = _('Invalid regex: %s') % pattern
+
+ def _requests(self):
+ """List all requests that should be shown on the page.
+
+ Performs filtering by URL regexps.
+
+ Returns an iterator of dicts packed with information.
+ """
+ skip_urls = list(self._skip_urls_as_regexes())
+ formatter = self.request.locale.dates.getFormatter('dateTime', 'short')
+ requests = [(rq.timestamp, rq) for rq in recorder.requestStorage]
+ requests.sort()
+ for timestamp, rq in requests:
+ for skip_url in skip_urls:
+ if skip_url.search(rq.path):
+ break
+ else:
+ info = {}
+ info['object'] = rq
+ dt = datetime.datetime.fromtimestamp(rq.timestamp)
+ info['time'] = formatter.format(dt)
+ info['method'] = rq.method
+ info['path'] = rq.path
+ info['request_length'] = len(rq.request_string)
+ info['response_length'] = len(rq.response_string)
+ info['status'] = rq.status
+ info['id'] = rq.id
+ yield info
+
+ requests = property(lambda self: list(self._requests()))
+
+ def recordedRequest(self, id):
+ """Return a request string as text/plain."""
+ rq = recorder.requestStorage.get(id)
+ if rq is None:
+ raise NotFound(self.context, id)
+ self.request.response.setHeader('Content-Type', 'text/plain')
+ return rq.request_string
+
+ def recordedResponse(self, id):
+ """Return a response string as text/plain."""
+ rq = recorder.requestStorage.get(id)
+ if rq is None:
+ raise NotFound(self.context, id)
+ self.request.response.setHeader('Content-Type', 'text/plain')
+ return rq.response_string
+
+ def __call__(self):
+ """Render the page and process forms."""
+ if 'CLEAR' in self.request:
+ return self.clear()
+ if 'FTEST' in self.request:
+ return self.makeFTest()
+ return self.index()
+
+ def clear(self):
+ """Clear all stored requests."""
+ recorder.requestStorage.clear()
+ url = str(self.request.URL)
+ skip_urls = self._skip_urls()
+ if skip_urls:
+ url += '?field.skip_urls=' + urllib.quote(skip_urls)
+ self.request.response.redirect(url)
+ return ''
+
+ def makeFTest(self):
+ """Create a functional doctest from selected requests."""
+ requests = map(recorder.requestStorage.get, self.request.get('id', []))
+ requests = filter(None, requests)
+ self.request.response.setHeader('Content-Type', 'text/plain')
+ self.request.response.setHeader('Content-Disposition',
+ 'attachment; filename="ftest.txt"')
+ return make_doctest(requests)
+
+
+def make_doctest(requests):
+ """Convert a list of RecordedRequest objects into a doctest."""
+ options, args = dochttp.parser.parse_args(dochttp.default_options)
+ skip_rq_headers = [name.lower()
+ for name in (options.skip_request_header or ())]
+ skip_rs_headers = [name.lower()
+ for name in (options.skip_response_header or ())]
+ old_stdout = sys.stdout
+ try:
+ sys.stdout = StringIO()
+ for rq in requests:
+ request = dochttp.Message(StringIO(rq.request_string),
+ skip_rq_headers)
+ response = dochttp.Message(StringIO(rq.response_string),
+ skip_rs_headers)
+ dochttp.output_test(request, response, options.clean_redirects)
+ return sys.stdout.getvalue()
+ finally:
+ sys.stdout = old_stdout
+
Property changes on: Zope3/trunk/src/zope/app/recorder/browser.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
Added: Zope3/trunk/src/zope/app/recorder/configure.zcml
===================================================================
--- Zope3/trunk/src/zope/app/recorder/configure.zcml 2005-03-12 01:29:49 UTC (rev 29454)
+++ Zope3/trunk/src/zope/app/recorder/configure.zcml 2005-03-12 03:45:35 UTC (rev 29455)
@@ -0,0 +1,36 @@
+<configure xmlns="http://namespaces.zope.org/zope"
+ xmlns:browser="http://namespaces.zope.org/browser"
+ i18n_domain="zope">
+
+ <utility
+ name="RecordingHTTP"
+ component=".recordinghttp"
+ provides="zope.app.server.servertype.IServerType"
+ />
+
+ <browser:page
+ for="zope.app.applicationcontrol.interfaces.IApplicationControl"
+ name="RecordedSessions.html"
+ permission="zope.ManageApplication"
+ class=".browser.RecordedSessionsView"
+ template="sessions.pt"
+ menu="zmi_views" title="Recorded Sessions"
+ />
+
+ <browser:page
+ for="zope.app.applicationcontrol.interfaces.IApplicationControl"
+ name="RecordedRequest.html"
+ permission="zope.ManageApplication"
+ class=".browser.RecordedSessionsView"
+ attribute="recordedRequest"
+ />
+
+ <browser:page
+ for="zope.app.applicationcontrol.interfaces.IApplicationControl"
+ name="RecordedResponse.html"
+ permission="zope.ManageApplication"
+ class=".browser.RecordedSessionsView"
+ attribute="recordedResponse"
+ />
+
+</configure>
Property changes on: Zope3/trunk/src/zope/app/recorder/configure.zcml
___________________________________________________________________
Name: svn:eol-style
+ native
Added: Zope3/trunk/src/zope/app/recorder/sessions.pt
===================================================================
--- Zope3/trunk/src/zope/app/recorder/sessions.pt 2005-03-12 01:29:49 UTC (rev 29454)
+++ Zope3/trunk/src/zope/app/recorder/sessions.pt 2005-03-12 03:45:35 UTC (rev 29455)
@@ -0,0 +1,70 @@
+<html metal:use-macro="context/@@standard_macros/view">
+<head>
+ <title i18n:translate="">Recorded HTTP Sessions</title>
+ <style type="text/css" metal:fill-slot="style_slot">
+ td { border-bottom: 1px dotted #eee; }
+ </style>
+</head>
+<body>
+<metal:block fill-slot="body" tal:define="requests view/requests">
+
+ <p tal:condition="not:requests" i18n:translate="">
+ There are no recorded requests that pass your filter.
+ </p>
+ <form method="POST" tal:condition="requests"
+ tal:attributes="action request/URL">
+ <input tal:replace="structure view/skip_urls_widget/hidden" />
+ <table>
+ <tr>
+ <th> </th>
+ <th i18n:translate="">Time</th>
+ <th i18n:translate="">Method</th>
+ <th i18n:translate="">Path</th>
+ <th i18n:translate="">Status</th>
+ </tr>
+ <tr tal:repeat="rq requests">
+ <td><input type="checkbox" name="id:int:list" checked="checked"
+ tal:attributes="value rq/id; id string:chk${rq/id}" /></td>
+ <td><label tal:attributes="for string:chk${rq/id}"
+ tal:content="rq/time">5 minutes ago</label></td>
+ <td><a tal:attributes="href string:RecordedRequest.html?id:int=${rq/id}"
+ tal:content="rq/method">GET</a></td>
+ <td><label tal:attributes="for string:chk${rq/id}"
+ tal:content="rq/path">/some/path</label></td>
+ <td><a tal:attributes="href string:RecordedResponse.html?id:int=${rq/id}"
+ tal:content="rq/status">200</a></td>
+ </tr>
+ </table>
+
+ <div class="row">
+ <div class="control">
+ <input type="submit" name="FTEST" value="Create Functional Doctest"
+ i18n:attributes="value" />
+ <input type="submit" name="CLEAR" value="Clear All"
+ i18n:attributes="value" />
+ </div>
+ </div>
+ </form>
+
+ <form method="GET"
+ tal:attributes="action request/URL">
+ <hr />
+
+ <p class="error" tal:condition="view/error" tal:content="view/error">
+ Something's wrong.
+ </p>
+
+ <div class="row" tal:define="widget nocall:view/skip_urls_widget">
+ <metal:block use-macro="context/@@form_macros/widget_row" />
+ </div>
+
+ <div class="row">
+ <div class="control">
+ <input type="submit" value="Filter" i18n:attributes="value" />
+ </div>
+ </div>
+ </form>
+
+</metal:block>
+</body>
+</html>
Property changes on: Zope3/trunk/src/zope/app/recorder/sessions.pt
___________________________________________________________________
Name: svn:eol-style
+ native
Added: Zope3/trunk/src/zope/app/recorder/tests.py
===================================================================
--- Zope3/trunk/src/zope/app/recorder/tests.py 2005-03-12 01:29:49 UTC (rev 29454)
+++ Zope3/trunk/src/zope/app/recorder/tests.py 2005-03-12 03:45:35 UTC (rev 29455)
@@ -0,0 +1,700 @@
+#!/usr/bin/env python2.3
+##############################################################################
+#
+# Copyright (c) 2004 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""
+Unit tests for zope.app.recorder.
+
+$Id$
+"""
+__docformat__ = 'restructuredtext'
+
+import unittest
+import transaction
+from zope.testing import doctest
+from zope.publisher.browser import TestRequest
+from zope.app.testing import setup, ztapi
+from zope.app.publisher.browser import BrowserView
+
+
+def doctest_RecordingHTTPServer():
+ r"""Unit tests for RecordingHTTPServer.
+
+ We will use stubs instead of real channel and request parser objects, to
+ keep the test fixture small.
+
+ >>> from zope.app.recorder import RecordingHTTPTask
+ >>> channel = ChannelStub()
+ >>> request_data = RequestDataStub()
+ >>> task = RecordingHTTPTask(channel, request_data)
+
+ RecordingHTTPTask is a thin wrapper around HTTPTask. It records all data
+ written through task.write, plus the response header, of course.
+
+ >>> task.write('request body\n')
+ >>> task.write('goes in here')
+
+ We need to strip CR characters, as they confuse doctest.
+
+ >>> print task.getRawResponse().replace('\r', '')
+ HTTP/1.1 200 Ok
+ Connection: close
+ Server: Stub Server
+ <BLANKLINE>
+ request body
+ goes in here
+
+ """
+
+
+def doctest_RecordingHTTPRequestParser():
+ r"""Unit tests for RecordingHTTPRequestParser.
+
+ >>> from zope.app.recorder import RecordingHTTPRequestParser
+ >>> from zope.server.adjustments import default_adj
+ >>> parser = RecordingHTTPRequestParser(default_adj)
+
+ RecordingHTTPRequestParser is a thin wrapper around HTTPRequestParser. It
+ records all data consumed by parser.received.
+
+ >>> parser.received('GET / HTTP/1.1\r\n')
+ 16
+ >>> parser.received('Content-Length: 3\r\n')
+ 19
+ >>> parser.received('\r\n')
+ 2
+ >>> parser.received('abc plus some junk')
+ 3
+
+ We need to strip CR characters, as they confuse doctest.
+
+ >>> print parser.getRawRequest().replace('\r', '')
+ GET / HTTP/1.1
+ Content-Length: 3
+ <BLANKLINE>
+ abc
+
+ """
+
+
+def doctest_RecordingHTTPServer():
+ r"""Unit tests for RecordingHTTPServer.
+
+ RecordingHTTPServer is a very thin wrapper over PublisherHTTPServer. To
+ keep things simple, we will override the constructor and prevent it from
+ listening on sockets.
+
+ >>> from zope.app.recorder import RecordingHTTPServer
+ >>> class RecordingHTTPServerForTests(RecordingHTTPServer):
+ ... def __init__(self):
+ ... self.request_factory = TestRequest
+ ... self.request_factory.publication = PublicationStub()
+ >>> server = RecordingHTTPServerForTests()
+
+ We will need a request parser
+
+ >>> from zope.app.recorder import RecordingHTTPRequestParser
+ >>> from zope.server.adjustments import default_adj
+ >>> parser = RecordingHTTPRequestParser(default_adj)
+ >>> parser.received('GET / HTTP/1.1\r\n\r\n')
+ 18
+
+ We will also need a task
+
+ >>> from zope.app.recorder import RecordingHTTPTask
+ >>> channel = ChannelStub()
+ >>> task = RecordingHTTPTask(channel, parser)
+ >>> task.start_time = 42
+
+ Go!
+
+ >>> server.executeRequest(task)
+
+ Let's see what we got:
+
+ >>> from zope.app import recorder
+ >>> len(recorder.requestStorage)
+ 1
+ >>> rq = iter(recorder.requestStorage).next()
+ >>> rq.timestamp
+ 42
+ >>> rq.request_string
+ 'GET / HTTP/1.1\r\n\r\n'
+ >>> rq.method
+ 'GET'
+ >>> rq.path
+ '/'
+ >>> print rq.response_string.replace('\r', '')
+ HTTP/1.1 599 No status set
+ Content-Length: 0
+ X-Powered-By: Zope (www.zope.org), Python (www.python.org)
+ Server: Stub Server
+ <BLANKLINE>
+ <BLANKLINE>
+ >>> rq.status
+ 599
+ >>> rq.reason
+ 'No status set'
+
+ Clean up:
+
+ >>> recorder.requestStorage.clear()
+
+ """
+
+
+def doctest_RequestStorage():
+ r"""Unit tests for RequestStorage.
+
+ RequestStorage uses MappingStorage for transactional data storage, shared
+ between threads. Initially the storage is empty
+
+ >>> from zope.app.recorder import RequestStorage
+ >>> storage = RequestStorage()
+ >>> len(storage)
+ 0
+ >>> list(storage)
+ []
+
+ Request IDs are allocated sequentially
+
+ >>> from zope.app.recorder import RecordedRequest
+ >>> storage.add(RecordedRequest(42, 'request', 'response'))
+ >>> storage.add(RecordedRequest(43, 'request', 'response'))
+ >>> len(storage)
+ 2
+ >>> [(r.id, r.timestamp) for r in storage]
+ [(1, 42), (2, 43)]
+
+ >>> storage.get(1).timestamp
+ 42
+ >>> storage.get(2).timestamp
+ 43
+ >>> storage.get(3) is None
+ True
+
+ You can clear the storage
+
+ >>> storage.clear()
+ >>> len(storage)
+ 0
+ >>> list(storage)
+ []
+
+ """
+
+
+def doctest_make_doctest():
+ r'''Unit tests for make_doctest.
+
+ >>> from zope.app.recorder.browser import make_doctest
+ >>> from zope.app.recorder import RecordedRequest
+ >>> rq1 = RecordedRequest(0, 'GET / HTTP/1.1\r\n\r\n',
+ ... 'HTTP/1.1 200 OK\r\n'
+ ... 'Content-Length: 13\r\n\r\n'
+ ... 'Hello, world!')
+ >>> rq2 = RecordedRequest(0, 'GET /bye.html HTTP/1.1\r\n\r\n',
+ ... 'HTTP/1.1 200 OK\r\n'
+ ... 'Content-Length: 15\r\n\r\n'
+ ... 'Goodbye, world!')
+ >>> s = make_doctest([rq1, rq2])
+ >>> print '|' + s.replace('\n', '\n|')
+ |
+ |
+ | >>> print http(r"""
+ | ... GET / HTTP/1.1
+ | ... """)
+ | HTTP/1.1 200 OK
+ | Content-Length: 13
+ | <BLANKLINE>
+ | Hello, world!
+ |
+ |
+ | >>> print http(r"""
+ | ... GET /bye.html HTTP/1.1
+ | ... """)
+ | HTTP/1.1 200 OK
+ | Content-Length: 15
+ | <BLANKLINE>
+ | Goodbye, world!
+ |
+
+ '''
+
+
+def doctest_RecordedSessionsView_skip_urls():
+ """Unit test for RecordedSessionsView._skip_urls
+
+ >>> setUpBrowser()
+
+ >>> from zope.app.recorder.browser import RecordedSessionsView
+ >>> context = None
+ >>> request = TestRequest()
+ >>> view = RecordedSessionsView(context, request)
+
+ No skip_urls in the request.
+
+ >>> view._skip_urls()
+ ''
+
+ Empty skip_urls field
+
+ >>> request.form['field.skip_urls'] = u''
+ >>> view._skip_urls()
+ ''
+
+ Non-empty skip_urls
+
+ >>> request.form['field.skip_urls'] = u'/@@/'
+ >>> view._skip_urls()
+ u'/@@/'
+
+ >>> tearDownBrowser()
+
+ """
+
+
+def doctest_RecordedSessionsView_skip_urls_as_regexes():
+ r"""Unit test for RecordedSessionsView._skip_urls_as_regexes
+
+ >>> setUpBrowser()
+
+ >>> from zope.app.recorder.browser import RecordedSessionsView
+ >>> context = None
+ >>> request = TestRequest()
+ >>> view = RecordedSessionsView(context, request)
+
+ No skip_urls in the request.
+
+ >>> list(view._skip_urls_as_regexes())
+ []
+
+ Valid regexes (note that empty lines are skipped)
+
+ >>> request.form['field.skip_urls'] = u'/@@/\n \nxyzzy'
+ >>> list(view._skip_urls_as_regexes()) # doctest: +ELLIPSIS
+ [<_sre.SRE_Pattern object...>, <_sre.SRE_Pattern object...>]
+ >>> r1, r2 = view._skip_urls_as_regexes()
+ >>> r1.search('/@@/icon.png') # doctest: +ELLIPSIS
+ <...Match object...>
+ >>> r2.search('/foo/xyzzy.html') # doctest: +ELLIPSIS
+ <...Match object...>
+
+ An invalid regexp
+
+ >>> request.form['field.skip_urls'] = u'/@@/\n++etc++\n'
+ >>> list(view._skip_urls_as_regexes()) # doctest: +ELLIPSIS
+ [<_sre.SRE_Pattern object...>]
+ >>> r1, = view._skip_urls_as_regexes()
+ >>> r1.search('/@@/icon.png') # doctest: +ELLIPSIS
+ <...Match object...>
+ >>> view.error
+ u'Invalid regex: ++etc++'
+
+ >>> tearDownBrowser()
+
+ """
+
+
+def doctest_RecordedSessionsView_requests():
+ r"""Unit test for RecordedSessionsView._requests
+
+ >>> setUpBrowser()
+
+ >>> from zope.app.recorder.browser import RecordedSessionsView
+ >>> context = None
+ >>> request = TestRequest()
+ >>> view = RecordedSessionsView(context, request)
+
+ No recorded requests
+
+ >>> view.requests
+ []
+
+ Let's add a couple
+
+ >>> from zope.app import recorder
+ >>> recorder.requestStorage.add(recorder.RecordedRequest(timestamp=0,
+ ... request_string='GET /something HTTP/1.1\r\n\r\n',
+ ... response_string='HTTP/1.1 404 Not Found\r\n\r\n',
+ ... method='GET', path='/something', status=404,
+ ... ))
+ >>> recorder.requestStorage.add(recorder.RecordedRequest(timestamp=1,
+ ... request_string='GET /something_else HTTP/1.1\r\n\r\n',
+ ... response_string='HTTP/1.1 200 OK\r\n\r\n',
+ ... method='GET', path='/something_else', status=200,
+ ... ))
+
+ (Note that although the timestamps are constant (0 is 1st Jan 1970,
+ midnight UTC), the returned value depends on the time zone of the system on
+ which you run the tests. I hope that at least the time format is
+ constant.)
+
+ >>> from zope.testing.doctestunit import pprint
+ >>> pprint(view.requests) # doctest: +ELLIPSIS
+ [{'id': 1,
+ 'method': 'GET',
+ 'object': <zope.app.recorder.RecordedRequest object at ...>,
+ 'path': '/something',
+ 'request_length': 27,
+ 'response_length': 26,
+ 'status': 404,
+ 'time': u'.../.../... ...:...'},
+ {'id': 2,
+ 'method': 'GET',
+ 'object': <zope.app.recorder.RecordedRequest object at ...>,
+ 'path': '/something_else',
+ 'request_length': 32,
+ 'response_length': 19,
+ 'status': 200,
+ 'time': u'.../.../... ...:...'}]
+
+ >>> tearDownBrowser()
+ >>> recorder.requestStorage.clear()
+
+ """
+
+
+def doctest_RecordedSessionsView_recordedRequest():
+ r"""Unit test for RecordedSessionsView.recordedRequest
+
+ >>> setUpBrowser()
+
+ >>> from zope.app.recorder.browser import RecordedSessionsView
+ >>> context = None
+ >>> request = TestRequest()
+ >>> view = RecordedSessionsView(context, request)
+
+ >>> from zope.app import recorder
+ >>> recorder.requestStorage.add(recorder.RecordedRequest(timestamp=0,
+ ... request_string='GET /something HTTP/1.1\r\n\r\n',
+ ... response_string='HTTP/1.1 404 Not Found\r\n\r\n',
+ ... method='GET', path='/something', status=404,
+ ... ))
+
+ >>> view.recordedRequest(1)
+ 'GET /something HTTP/1.1\r\n\r\n'
+ >>> request.response.getHeader('Content-Type')
+ 'text/plain'
+
+ >>> view.recordedRequest(42)
+ Traceback (most recent call last):
+ ...
+ NotFound: Object: None, name: 42
+
+ >>> recorder.requestStorage.clear()
+ >>> tearDownBrowser()
+
+ """
+
+
+def doctest_RecordedSessionsView_recordedResponse():
+ r"""Unit test for RecordedSessionsView.recordedResponse
+
+ >>> setUpBrowser()
+
+ >>> from zope.app.recorder.browser import RecordedSessionsView
+ >>> context = None
+ >>> request = TestRequest()
+ >>> view = RecordedSessionsView(context, request)
+
+ >>> from zope.app import recorder
+ >>> recorder.requestStorage.add(recorder.RecordedRequest(timestamp=0,
+ ... request_string='GET /something HTTP/1.1\r\n\r\n',
+ ... response_string='HTTP/1.1 404 Not Found\r\n\r\n',
+ ... method='GET', path='/something', status=404,
+ ... ))
+
+ >>> view.recordedResponse(1)
+ 'HTTP/1.1 404 Not Found\r\n\r\n'
+ >>> request.response.getHeader('Content-Type')
+ 'text/plain'
+
+ >>> view.recordedResponse(42)
+ Traceback (most recent call last):
+ ...
+ NotFound: Object: None, name: 42
+
+ >>> recorder.requestStorage.clear()
+ >>> tearDownBrowser()
+
+ """
+
+
+def doctest_RecordedSessionsView_clear():
+ r"""Unit test for RecordedSessionsView.clear
+
+ >>> setUpBrowser()
+
+ >>> from zope.app.recorder.browser import RecordedSessionsView
+ >>> context = None
+ >>> request = TestRequest()
+ >>> view = RecordedSessionsView(context, request)
+
+ >>> from zope.app import recorder
+ >>> recorder.requestStorage.add(recorder.RecordedRequest(timestamp=0,
+ ... request_string='GET /something HTTP/1.1\r\n\r\n',
+ ... response_string='HTTP/1.1 404 Not Found\r\n\r\n',
+ ... method='GET', path='/something', status=404,
+ ... ))
+
+ The 'clear' method clears all stored requests and redirects back to the
+ page.
+
+ >>> view.clear()
+ ''
+
+ >>> list(recorder.requestStorage)
+ []
+
+ >>> request.response.getStatus()
+ 302
+ >>> request.response.getHeader('Location')
+ 'http://127.0.0.1'
+
+ The value of skip_urls is not lost.
+
+ >>> request.form['field.skip_urls'] = u'/@@/\n+'
+ >>> view.clear()
+ ''
+ >>> request.response.getHeader('Location')
+ 'http://127.0.0.1?field.skip_urls=/%40%40/%0A%2B'
+
+ >>> recorder.requestStorage.clear()
+ >>> tearDownBrowser()
+
+ """
+
+
+def doctest_RecordedSessionsView_makeFTest():
+ r"""Unit test for RecordedSessionsView.makeFTest
+
+ >>> setUpBrowser()
+
+ >>> from zope.app.recorder.browser import RecordedSessionsView
+ >>> context = None
+ >>> request = TestRequest()
+ >>> view = RecordedSessionsView(context, request)
+
+ >>> from zope.app import recorder
+ >>> recorder.requestStorage.add(recorder.RecordedRequest(timestamp=0,
+ ... request_string='GET /something HTTP/1.1\r\n\r\n',
+ ... response_string='HTTP/1.1 404 Not Found\r\n\r\n',
+ ... method='GET', path='/something', status=404,
+ ... ))
+
+ The 'makeFTest' method creates a doctest and returns it. You need to
+ specify a list of request IDs. IDs of nonexistent requests are silently
+ ignored.
+
+ >>> request.form['id'] = [1, 42]
+
+ >>> print view.makeFTest() # doctest: +ELLIPSIS
+ <BLANKLINE>
+ ...>>> print http(...
+ ...404 Not Found...
+
+ >>> request.response.getHeader('Content-Type')
+ 'text/plain'
+ >>> request.response.getHeader('Content-Disposition')
+ 'attachment; filename="ftest.txt"'
+
+ >>> recorder.requestStorage.clear()
+ >>> tearDownBrowser()
+
+ """
+
+
+def doctest_RecordedSessionsView_call():
+ r"""Unit test for RecordedSessionsView.__call__
+
+ >>> setUpBrowser()
+
+ >>> from zope.app.recorder.browser import RecordedSessionsView
+ >>> context = None
+ >>> request = TestRequest()
+ >>> request.setPrincipal(PrincipalStub)
+ >>> view = RecordedSessionsView(context, request)
+
+ view.index is a page template that appears thanks to ZCML magic.
+
+ >>> from zope.app.pagetemplate.viewpagetemplatefile \
+ ... import ViewPageTemplateFile, BoundPageTemplate
+ >>> view.index = BoundPageTemplate(ViewPageTemplateFile('sessions.pt'),
+ ... view)
+
+ >>> from zope.app import recorder
+ >>> recorder.requestStorage.add(recorder.RecordedRequest(timestamp=0,
+ ... request_string='GET /something HTTP/1.1\r\n\r\n',
+ ... response_string='HTTP/1.1 404 Not Found\r\n\r\n',
+ ... method='GET', path='/something', status=404,
+ ... ))
+
+ >>>
+
+ Simple rendering:
+
+ >>> print view() # doctest: +ELLIPSIS,+NORMALIZE_WHITESPACE
+ <html>
+ ...
+ <form method="POST" action="http://127.0.0.1">
+ <input class="hiddenType" id="field.skip_urls" name="field.skip_urls" type="hidden" value="" />
+ <table>
+ <tr>
+ <th> </th>
+ <th>Time</th>
+ <th>Method</th>
+ <th>Path</th>
+ <th>Status</th>
+ </tr>
+ <tr>
+ <td><input type="checkbox" name="id:int:list"
+ checked="checked" id="chk1" value="1" /></td>
+ <td><label for="chk1">...</label></td>
+ <td><a href="RecordedRequest.html?id:int=1">GET</a></td>
+ <td><label for="chk1">/something</label></td>
+ <td><a href="RecordedResponse.html?id:int=1">404</a></td>
+ </tr>
+ </table>
+ <BLANKLINE>
+ <div class="row">
+ <div class="control">
+ <input type="submit" name="FTEST"
+ value="Create Functional Doctest" />
+ <input type="submit" name="CLEAR" value="Clear All" />
+ </div>
+ </div>
+ </form>
+ ...
+
+ 'FTEST' button:
+
+ >>> request.form['FTEST'] = u"Create Functional Doctest"
+ >>> request.form['id'] = [1]
+ >>> print view() # doctest: +ELLIPSIS,+NORMALIZE_WHITESPACE
+ <BLANKLINE>
+ ...>>> print http(...
+
+ 'CLEAR' button:
+
+ >>> del request.form['FTEST']
+ >>> request.form['CLEAR'] = u"Clear All"
+ >>> print view()
+ <BLANKLINE>
+ >>> request.response.getStatus()
+ 302
+ >>> request.response.getHeader('Location')
+ 'http://127.0.0.1'
+
+ >>> recorder.requestStorage.clear()
+ >>> tearDownBrowser()
+
+ """
+
+
+
+class ServerStub(object):
+ """Stub for HTTPServer."""
+
+ SERVER_IDENT = 'Stub Server'
+ server_name = 'RecordingHTTPServer'
+ port = 8081
+
+
+class ChannelStub(object):
+ """Stub for HTTPServerChannel."""
+
+ server = ServerStub()
+ creation_time = 42
+ addr = ('addr', )
+
+ def write(self, data):
+ pass
+
+
+class RequestDataStub(object):
+ """Stub for HTTPRequestParser."""
+
+ version = "1.1"
+ headers = {}
+
+
+class PublicationStub(object):
+ """Stub for Publication."""
+
+ def handleException(self, *args):
+ pass
+
+ def endRequest(self, request, object):
+ pass
+
+
+class PrincipalStub(object):
+ """Stub for request.principal."""
+
+ title = 'Random user'
+
+
+class ViewGetMenuStub(BrowserView):
+ """Stub for @@view_get_menu"""
+
+ def __getitem__(self, name):
+ return []
+
+
+def setUpBrowser(test=None):
+ """Set up for zope.app.recorder.browser doctests"""
+ setup.placelessSetUp()
+ setup.setUpTraversal()
+
+ # Widgets need some setup
+ from zope.schema.interfaces import IText
+ from zope.app.form.browser.textwidgets import TextAreaWidget
+ from zope.app.form.interfaces import IInputWidget
+ ztapi.browserViewProviding(IText, TextAreaWidget, IInputWidget)
+
+ # ++view++ namespace
+ from zope.app.traversing.interfaces import ITraversable
+ import zope.app.traversing.namespace
+ ztapi.provideView(None, None, ITraversable, 'view',
+ zope.app.traversing.namespace.view)
+
+ # Macros
+ from zope.app.basicskin.standardmacros import StandardMacros
+ from zope.app.form.browser.macros import FormMacros
+ from zope.app.pagetemplate.simpleviewclass import SimpleViewClass
+ ztapi.browserView(None, 'standard_macros', StandardMacros)
+ ztapi.browserView(None, 'view_macros',
+ SimpleViewClass("../basicskin/view_macros.pt"))
+ ztapi.browserView(None, 'form_macros', FormMacros)
+ ztapi.browserView(None, 'widget_macros',
+ SimpleViewClass('../form/browser/widget_macros.pt'))
+ ztapi.browserView(None, 'view_get_menu', ViewGetMenuStub)
+
+
+def tearDownBrowser(test=None):
+ """Tear down for zope.app.recorder.browser doctests"""
+ setup.placelessTearDown()
+
+
+def tearDown(test=None):
+ """Tear down for zope.app.recorder doctests."""
+ transaction.abort()
+
+
+def test_suite():
+ return unittest.TestSuite([doctest.DocTestSuite(tearDown=tearDown)])
+
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
Property changes on: Zope3/trunk/src/zope/app/recorder/tests.py
___________________________________________________________________
Name: svn:keywords
+ Id
Name: svn:eol-style
+ native
More information about the Zope3-Checkins
mailing list