[Checkins] SVN: zope.publisher/trunk/ Compatibility with python-3.3 added, support for pypy removed
Andrey Lebedev
cvs-admin at zope.org
Thu Feb 21 14:06:48 UTC 2013
Log message for revision 129566:
Compatibility with python-3.3 added, support for pypy removed
py3-attempt2 branch merged
Changed:
A zope.publisher/trunk/MANIFEST.in
U zope.publisher/trunk/bootstrap.py
U zope.publisher/trunk/setup.py
A zope.publisher/trunk/src/zope/publisher/_compat.py
U zope.publisher/trunk/src/zope/publisher/base.py
U zope.publisher/trunk/src/zope/publisher/browser.py
U zope.publisher/trunk/src/zope/publisher/configure.zcml
U zope.publisher/trunk/src/zope/publisher/ftp.py
U zope.publisher/trunk/src/zope/publisher/http.py
U zope.publisher/trunk/src/zope/publisher/httpresults.txt
U zope.publisher/trunk/src/zope/publisher/interfaces/__init__.py
U zope.publisher/trunk/src/zope/publisher/paste.txt
U zope.publisher/trunk/src/zope/publisher/principallogging.py
U zope.publisher/trunk/src/zope/publisher/publish.py
U zope.publisher/trunk/src/zope/publisher/skinnable.txt
U zope.publisher/trunk/src/zope/publisher/testing.py
U zope.publisher/trunk/src/zope/publisher/tests/basetestiapplicationrequest.py
U zope.publisher/trunk/src/zope/publisher/tests/basetestipublicationrequest.py
U zope.publisher/trunk/src/zope/publisher/tests/test_baserequest.py
U zope.publisher/trunk/src/zope/publisher/tests/test_baseresponse.py
U zope.publisher/trunk/src/zope/publisher/tests/test_browser.py
U zope.publisher/trunk/src/zope/publisher/tests/test_browserlanguages.py
U zope.publisher/trunk/src/zope/publisher/tests/test_browserrequest.py
U zope.publisher/trunk/src/zope/publisher/tests/test_browserresponse.py
U zope.publisher/trunk/src/zope/publisher/tests/test_ftp.py
U zope.publisher/trunk/src/zope/publisher/tests/test_http.py
U zope.publisher/trunk/src/zope/publisher/tests/test_ipublication.py
U zope.publisher/trunk/src/zope/publisher/tests/test_mapply.py
U zope.publisher/trunk/src/zope/publisher/tests/test_principallogging.py
U zope.publisher/trunk/src/zope/publisher/tests/test_publisher.py
U zope.publisher/trunk/src/zope/publisher/tests/test_skinnable.py
U zope.publisher/trunk/src/zope/publisher/tests/test_xmlrpc.py
U zope.publisher/trunk/src/zope/publisher/tests/test_xmlrpcrequest.py
U zope.publisher/trunk/src/zope/publisher/tests/test_zcml.py
U zope.publisher/trunk/src/zope/publisher/xmlrpc.py
U zope.publisher/trunk/src/zope/publisher/xmlrpc.txt
A zope.publisher/trunk/tox.ini
-=-
Copied: zope.publisher/trunk/MANIFEST.in (from rev 129564, zope.publisher/branches/py3-attempt2/MANIFEST.in)
===================================================================
--- zope.publisher/trunk/MANIFEST.in (rev 0)
+++ zope.publisher/trunk/MANIFEST.in 2013-02-21 14:06:47 UTC (rev 129566)
@@ -0,0 +1,9 @@
+include *.rst
+include *.txt
+include tox.ini
+include bootstrap.py
+include buildout.cfg
+
+recursive-include src *
+
+global-exclude *.pyc
Modified: zope.publisher/trunk/bootstrap.py
===================================================================
--- zope.publisher/trunk/bootstrap.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/bootstrap.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -18,33 +18,148 @@
use the -c option to specify an alternate configuration file.
"""
-import os, shutil, sys, tempfile, urllib2
+import os, shutil, sys, tempfile
+from optparse import OptionParser
tmpeggs = tempfile.mkdtemp()
-ez = {}
-exec urllib2.urlopen('http://peak.telecommunity.com/dist/ez_setup.py'
- ).read() in ez
-ez['use_setuptools'](to_dir=tmpeggs, download_delay=0)
+usage = '''\
+[DESIRED PYTHON FOR BUILDOUT] bootstrap.py [options]
-import pkg_resources
+Bootstraps a buildout-based project.
-cmd = 'from setuptools.command.easy_install import main; main()'
-if sys.platform == 'win32':
- cmd = '"%s"' % cmd # work around spawn lamosity on windows
+Simply run this script in a directory containing a buildout.cfg, using the
+Python that you want bin/buildout to use.
-ws = pkg_resources.working_set
-assert os.spawnle(
- os.P_WAIT, sys.executable, sys.executable,
- '-c', cmd, '-mqNxd', tmpeggs, 'zc.buildout',
- dict(os.environ,
- PYTHONPATH=
- ws.find(pkg_resources.Requirement.parse('setuptools')).location
- ),
- ) == 0
+Note that by using --setup-source and --download-base to point to
+local resources, you can keep this script from going over the network.
+'''
+parser = OptionParser(usage=usage)
+parser.add_option("-v", "--version", help="use a specific zc.buildout version")
+
+parser.add_option("-t", "--accept-buildout-test-releases",
+ dest='accept_buildout_test_releases',
+ action="store_true", default=False,
+ help=("Normally, if you do not specify a --version, the "
+ "bootstrap script and buildout gets the newest "
+ "*final* versions of zc.buildout and its recipes and "
+ "extensions for you. If you use this flag, "
+ "bootstrap and buildout will get the newest releases "
+ "even if they are alphas or betas."))
+parser.add_option("-c", "--config-file",
+ help=("Specify the path to the buildout configuration "
+ "file to be used."))
+parser.add_option("-f", "--find-links",
+ help=("Specify a URL to search for buildout releases"))
+
+
+options, args = parser.parse_args()
+
+######################################################################
+# load/install distribute
+
+to_reload = False
+try:
+ import pkg_resources, setuptools
+ if not hasattr(pkg_resources, '_distribute'):
+ to_reload = True
+ raise ImportError
+except ImportError:
+ ez = {}
+
+ try:
+ from urllib.request import urlopen
+ except ImportError:
+ from urllib2 import urlopen
+
+ exec(urlopen('http://python-distribute.org/distribute_setup.py').read(), ez)
+ setup_args = dict(to_dir=tmpeggs, download_delay=0, no_fake=True)
+ ez['use_setuptools'](**setup_args)
+
+ if to_reload:
+ reload(pkg_resources)
+ import pkg_resources
+ # This does not (always?) update the default working set. We will
+ # do it.
+ for path in sys.path:
+ if path not in pkg_resources.working_set.entries:
+ pkg_resources.working_set.add_entry(path)
+
+######################################################################
+# Install buildout
+
+ws = pkg_resources.working_set
+
+cmd = [sys.executable, '-c',
+ 'from setuptools.command.easy_install import main; main()',
+ '-mZqNxd', tmpeggs]
+
+find_links = os.environ.get(
+ 'bootstrap-testing-find-links',
+ options.find_links or
+ ('http://downloads.buildout.org/'
+ if options.accept_buildout_test_releases else None)
+ )
+if find_links:
+ cmd.extend(['-f', find_links])
+
+distribute_path = ws.find(
+ pkg_resources.Requirement.parse('distribute')).location
+
+requirement = 'zc.buildout'
+version = options.version
+if version is None and not options.accept_buildout_test_releases:
+ # Figure out the most recent final version of zc.buildout.
+ import setuptools.package_index
+ _final_parts = '*final-', '*final'
+ def _final_version(parsed_version):
+ for part in parsed_version:
+ if (part[:1] == '*') and (part not in _final_parts):
+ return False
+ return True
+ index = setuptools.package_index.PackageIndex(
+ search_path=[distribute_path])
+ if find_links:
+ index.add_find_links((find_links,))
+ req = pkg_resources.Requirement.parse(requirement)
+ if index.obtain(req) is not None:
+ best = []
+ bestv = None
+ for dist in index[req.project_name]:
+ distv = dist.parsed_version
+ if _final_version(distv):
+ if bestv is None or distv > bestv:
+ best = [dist]
+ bestv = distv
+ elif distv == bestv:
+ best.append(dist)
+ if best:
+ best.sort()
+ version = best[-1].version
+if version:
+ requirement = '=='.join((requirement, version))
+cmd.append(requirement)
+
+import subprocess
+if subprocess.call(cmd, env=dict(os.environ, PYTHONPATH=distribute_path)) != 0:
+ raise Exception(
+ "Failed to execute command:\n%s",
+ repr(cmd)[1:-1])
+
+######################################################################
+# Import and run buildout
+
ws.add_entry(tmpeggs)
-ws.require('zc.buildout')
+ws.require(requirement)
import zc.buildout.buildout
-zc.buildout.buildout.main(sys.argv[1:] + ['bootstrap'])
+
+if not [a for a in args if '=' not in a]:
+ args.append('bootstrap')
+
+# if -c was provided, we push it back into args for buildout' main function
+if options.config_file is not None:
+ args[0:0] = ['-c', options.config_file]
+
+zc.buildout.buildout.main(args)
shutil.rmtree(tmpeggs)
Modified: zope.publisher/trunk/setup.py
===================================================================
--- zope.publisher/trunk/setup.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/setup.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -16,16 +16,30 @@
# When developing and releasing this package, please follow the documented
# Zope Toolkit policies as described by this documentation.
##############################################################################
-
from setuptools import setup, find_packages
-entry_points = """
+def alltests():
+ import os
+ import sys
+ import unittest
+ # use the zope.testrunner machinery to find all the
+ # test suites we've put under ourselves
+ import zope.testrunner.find
+ import zope.testrunner.options
+ here = os.path.abspath(os.path.join(os.path.dirname(__file__), 'src'))
+ args = sys.argv[:]
+ defaults = ["--test-path", here]
+ options = zope.testrunner.options.get_options(args, defaults)
+ suites = list(zope.testrunner.find.find_suites(options))
+ return unittest.TestSuite(suites)
+
+entry_points = '''
[paste.app_factory]
main = zope.publisher.paste:Application
[zope.publisher.publication_factory]
sample = zope.publisher.tests.test_paste:SamplePublication
-"""
+'''
setup(name='zope.publisher',
version='4.0.0dev',
@@ -33,7 +47,7 @@
license='ZPL 2.1',
author='Zope Foundation and Contributors',
author_email='zope-dev at zope.org',
- description="The Zope publisher publishes Python objects on the web.",
+ description='The Zope publisher publishes Python objects on the web.',
long_description=(open('README.txt').read()
+ '\n\n'
+ open('CHANGES.txt').read()),
@@ -42,16 +56,16 @@
'Intended Audience :: Developers',
'License :: OSI Approved :: Zope Public License',
'Programming Language :: Python',
- "Programming Language :: Python :: 2",
- "Programming Language :: Python :: 2.6",
- "Programming Language :: Python :: 2.7",
+ 'Programming Language :: Python :: 2',
+ 'Programming Language :: Python :: 2.6',
+ 'Programming Language :: Python :: 2.7',
+ 'Programming Language :: Python :: 3',
+ 'Programming Language :: Python :: 3.3',
+ 'Programming Language :: Python :: Implementation :: CPython',
'Natural Language :: English',
'Operating System :: OS Independent',
'Topic :: Internet :: WWW/HTTP',
],
-
- entry_points=entry_points,
-
packages=find_packages('src'),
package_dir={'': 'src'},
namespace_packages=['zope',],
@@ -62,14 +76,16 @@
'zope.contenttype >= 3.5',
'zope.event',
'zope.exceptions',
- 'zope.i18n',
- 'zope.interface',
+ 'zope.i18n >= 4.0.0a3',
+ 'zope.interface >= 3.8.0',
'zope.location',
'zope.proxy',
- 'zope.security',
+ 'zope.security >= 4.0.0a1',
],
extras_require={'test': ['zope.testing']},
+ tests_require = ['zope.testing', 'zope.testrunner'],
+ test_suite = '__main__.alltests',
+ entry_points=entry_points,
include_package_data=True,
-
zip_safe=False,
)
Copied: zope.publisher/trunk/src/zope/publisher/_compat.py (from rev 129564, zope.publisher/branches/py3-attempt2/src/zope/publisher/_compat.py)
===================================================================
--- zope.publisher/trunk/src/zope/publisher/_compat.py (rev 0)
+++ zope.publisher/trunk/src/zope/publisher/_compat.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -0,0 +1,34 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Compatibility module for xmlrpclib
+
+This module unifies namespace for xmlrpclib, that changed its name in
+python-3.x (became xmlrpc.client).
+
+The intention is to let xmlrpclib names to be importable from zcml.
+"""
+import sys
+PYTHON2 = sys.version_info[0] == 2
+PYTHON3 = sys.version_info[0] == 3
+
+if PYTHON2:
+ _u = unicode
+ from xmlrpclib import *
+ import types
+ CLASS_TYPES = (type, types.ClassType)
+else:
+ _u = str
+ CLASS_TYPES = (type,)
+ from xmlrpc.client import *
+
Modified: zope.publisher/trunk/src/zope/publisher/base.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/base.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/base.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -16,7 +16,7 @@
Specifically, 'BaseRequest', 'BaseResponse', and 'DefaultPublication' are
specified here.
"""
-from cStringIO import StringIO
+from io import BytesIO, StringIO
from zope.interface import implementer
from zope.interface.common.mapping import IReadMapping, IEnumerableMapping
@@ -28,6 +28,8 @@
from zope.publisher.interfaces import IRequest, IResponse, IDebugFlags
from zope.publisher.publish import mapply
+from zope.publisher._compat import PYTHON2
+
_marker = object()
@implementer(IResponse)
@@ -50,7 +52,9 @@
def handleException(self, exc_info):
'See IPublisherResponse'
- f = StringIO()
+ # We want exception to be formatted to native strings. Pick
+ # respective io class depending on python version.
+ f = BytesIO() if PYTHON2 else StringIO()
print_exception(
exc_info[0], exc_info[1], exc_info[2], 100, f)
self.setResult(f.getvalue())
@@ -398,7 +402,7 @@
environ['PATH_INFO'] = path
if body_instream is None:
- body_instream = StringIO('')
+ body_instream = BytesIO(b'')
super(TestRequest, self).__init__(body_instream, environ)
Modified: zope.publisher/trunk/src/zope/publisher/browser.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/browser.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/browser.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -21,7 +21,6 @@
__docformat__ = 'restructuredtext'
import re
-from types import ListType, TupleType, StringType
from cgi import FieldStorage
import tempfile
@@ -42,7 +41,7 @@
from zope.publisher.interfaces.browser import IBrowserPage
from zope.publisher.interfaces.browser import IBrowserSkinType
from zope.publisher.interfaces.http import IHTTPRequest
-from zope.publisher.http import HTTPRequest, HTTPResponse
+from zope.publisher.http import HTTPRequest, HTTPResponse, getCharsetUsingRequest
# BBB imports, this compoennts get moved from this module
from zope.publisher.interfaces import ISkinType #BBB import
@@ -52,11 +51,13 @@
from zope.publisher.skinnable import applySkin #BBB import
from zope.publisher.skinnable import SkinChangedEvent #BBB import
+from zope.publisher._compat import PYTHON2, _u
-__ArrayTypes = (ListType, TupleType)
-start_of_header_search=re.compile('(<head[^>]*>)', re.I).search
-base_re_search=re.compile('(<base.*?>)',re.I).search
+__ArrayTypes = (list, tuple)
+
+start_of_header_search=re.compile(b'(<head[^>]*>)', re.I).search
+base_re_search=re.compile(b'(<base.*?>)',re.I).search
isRelative = re.compile("[-_.!~*a-zA-z0-9'()@&=+$,]+(/|$)").match
newlines = re.compile('\r\n|\n\r|\r')
@@ -89,7 +90,7 @@
def field2int(v):
if isinstance(v, __ArrayTypes):
- return map(field2int, v)
+ return list(map(field2int, v))
v = field2string(v)
if not v:
raise ValueError('Empty entry when <strong>integer</strong> expected')
@@ -100,7 +101,7 @@
def field2float(v):
if isinstance(v, __ArrayTypes):
- return map(field2float, v)
+ return list(map(field2float, v))
v = field2string(v)
if not v:
raise ValueError(
@@ -113,7 +114,7 @@
def field2long(v):
if isinstance(v, __ArrayTypes):
- return map(field2long, v)
+ return list(map(field2long, v))
v = field2string(v)
# handle trailing 'L' if present.
@@ -122,7 +123,7 @@
if not v:
raise ValueError('Empty entry when <strong>integer</strong> expected')
try:
- return long(v)
+ return int(v)
except ValueError:
raise ValueError("A long integer was expected in the value '%s'" % v)
@@ -165,7 +166,7 @@
type_converters[field_type] = converter
-isCGI_NAME = {
+isCGI_NAME = lambda key: key in {
# These fields are placed in request.environ instead of request.form.
'SERVER_SOFTWARE' : 1,
'SERVER_NAME' : 1,
@@ -185,12 +186,12 @@
'CONTENT_TYPE' : 1,
'CONTENT_LENGTH' : 1,
'SERVER_URL': 1,
- }.has_key
+ }
-hide_key={
+hide_key=lambda key: key in {
'HTTP_AUTHORIZATION':1,
'HTTP_CGI_AUTHORIZATION': 1,
- }.has_key
+ }
class Record(object):
@@ -206,12 +207,12 @@
return self.__dict__[key]
def __str__(self):
- items = self.__dict__.items()
+ items = list(self.__dict__.items())
items.sort()
return "{" + ", ".join(["%s: %s" % item for item in items]) + "}"
def __repr__(self):
- items = self.__dict__.items()
+ items = list(self.__dict__.items())
items.sort()
return ("{"
+ ", ".join(["%s: %s" % (key, repr(value))
@@ -246,13 +247,19 @@
def _decode(self, text):
"""Try to decode the text using one of the available charsets."""
+ # According to PEP-3333, in python-3, QUERY_STRING is a string,
+ # representing 'latin-1' encoded byte array. So, if we are in python-3
+ # context, encode text as 'latin-1' first, to try to decode
+ # resulting byte array using user-supplied charset.
+ if not isinstance(text, bytes):
+ text = text.encode('latin-1')
if self.charsets is None:
envadapter = IUserPreferredCharsets(self)
self.charsets = envadapter.getPreferredCharsets() or ['utf-8']
self.charsets = [c for c in self.charsets if c != '*']
for charset in self.charsets:
try:
- text = unicode(text, charset)
+ text = _u(text, charset)
break
except UnicodeError:
pass
@@ -292,8 +299,9 @@
del env['QUERY_STRING']
+ args = {'encoding': 'utf-8'} if not PYTHON2 else {}
fs = ZopeFieldStorage(fp=fp, environ=env,
- keep_blank_values=1)
+ keep_blank_values=1, **args)
fslist = getattr(fs, 'list', None)
if fslist is not None:
@@ -389,11 +397,10 @@
# skip over empty fields
return
- # Make it unicode if not None
if key is not None:
key = self._decode(key)
- if type(item) == StringType:
+ if isinstance(item, (str, bytes)):
item = self._decode(item)
if flags:
@@ -516,7 +523,7 @@
"""Insert defaults into form dictionary."""
form = self.form
- for keys, values in self.__defaults.iteritems():
+ for keys, values in self.__defaults.items():
if not keys in form:
form[keys] = values
else:
@@ -583,7 +590,7 @@
d.update(self._environ)
d.update(self._cookies)
d.update(self.form)
- return d.keys()
+ return list(d.keys())
def get(self, key, default=None):
@@ -630,7 +637,9 @@
d[m] = getattr(file,m)
self.headers = aFieldStorage.headers
- filename = unicode(aFieldStorage.filename, 'UTF-8')
+ filename = aFieldStorage.filename
+ if isinstance(aFieldStorage.filename, bytes):
+ filename = _u(aFieldStorage.filename, 'UTF-8')
# fix for IE full paths
filename = filename[filename.rfind('\\')+1:].strip()
self.filename = filename
@@ -661,8 +670,8 @@
if kw:
_testEnv.update(kw)
if body_instream is None:
- from StringIO import StringIO
- body_instream = StringIO('')
+ from io import BytesIO
+ body_instream = BytesIO()
super(TestRequest, self).__init__(body_instream, _testEnv)
if form:
@@ -728,9 +737,17 @@
ibase = base_re_search(body)
if ibase is None:
# Make sure the base URL is not a unicode string.
- base = str(self.getBase())
- body = ('%s\n<base href="%s" />\n%s' %
- (body[:index], base, body[index:]))
+ base = self.getBase()
+ if not isinstance(base, bytes):
+ encoding = getCharsetUsingRequest(self._request) or 'utf-8'
+ base = self.getBase().encode(encoding)
+ #body = (b'%s\n<base href="%s" />\n%s' %
+ # (body[:index], base, body[index:]))
+ body = b''.join([body[:index],
+ b'\n<base href="',
+ base,
+ b'" />\n',
+ body[index:]])
return body
def getBase(self):
Modified: zope.publisher/trunk/src/zope/publisher/configure.zcml
===================================================================
--- zope.publisher/trunk/src/zope/publisher/configure.zcml 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/configure.zcml 2013-02-21 14:06:47 UTC (rev 129566)
@@ -61,15 +61,15 @@
provides="zope.i18n.interfaces.IModifiableUserPreferredLanguages"
/>
- <class class="xmlrpclib.Binary">
+ <class class="._compat.Binary">
<allow attributes="data encode decode" />
</class>
- <class class="xmlrpclib.Fault">
+ <class class="._compat.Fault">
<allow attributes="faultCode faultString" />
</class>
- <class class="xmlrpclib.DateTime">
+ <class class="._compat.DateTime">
<allow attributes="value" />
</class>
Modified: zope.publisher/trunk/src/zope/publisher/ftp.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/ftp.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/ftp.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -13,6 +13,7 @@
##############################################################################
"""FTP Publisher
"""
+import six
from zope.interface import implementer
from zope.publisher.interfaces.ftp import IFTPCredentials, IFTPRequest
from zope.publisher.base import BaseResponse, BaseRequest
@@ -27,7 +28,7 @@
def getResult(self):
if getattr(self, '_exc', None) is not None:
- raise self._exc[0], self._exc[1], self._exc[2]
+ six.reraise(self._exc[0], self._exc[1], self._exc[2])
return self._result
def handleException(self, exc_info):
Modified: zope.publisher/trunk/src/zope/publisher/http.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/http.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/http.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -13,7 +13,9 @@
##############################################################################
"""HTTP Publisher
"""
-from cStringIO import StringIO
+import sys
+import base64
+from io import BytesIO
from zope.i18n.interfaces import IUserPreferredCharsets
from zope.i18n.interfaces import IUserPreferredLanguages
from zope.i18n.locales import locales, LoadLocaleError
@@ -31,19 +33,28 @@
from zope.publisher.interfaces.http import IResult
from zope.publisher.interfaces.logginginfo import ILoggingInfo
from zope.publisher.skinnable import setDefaultSkin
-import Cookie
-import cgi
import logging
import tempfile
import types
-import urllib
-import urlparse
import zope.component
import zope.contenttype.parse
import zope.event
import zope.interface
+from zope.publisher._compat import PYTHON2, CLASS_TYPES, _u
+if PYTHON2:
+ import Cookie as cookies
+ from urllib import splitport, quote
+ from urlparse import urlsplit
+ from cgi import escape
+else:
+ import http.cookies as cookies
+ from urllib.parse import splitport, quote, urlsplit
+ from html import escape
+ unicode = str
+ basestring = (str, bytes)
+
# Default Encoding
ENCODING = 'UTF-8'
@@ -72,7 +83,10 @@
if 'HTTP_CGI_AUTHORIZATION' in dict:
dict['HTTP_AUTHORIZATION'] = dict.pop('HTTP_CGI_AUTHORIZATION')
if 'PATH_INFO' in dict:
- dict['PATH_INFO'] = dict['PATH_INFO'].decode('utf-8')
+ # Recode PATH_INFO to UTF-8 from original latin1
+ pi = dict['PATH_INFO']
+ pi = pi if isinstance(pi, bytes) else pi.encode('latin1')
+ dict['PATH_INFO'] = pi.decode(ENCODING)
return dict
@zope.interface.implementer(IHTTPVirtualHostChangedEvent)
@@ -176,8 +190,8 @@
return self.__request.getURL(i)
else:
return self.__request.getApplicationURL(i)
- except IndexError, v:
- if v[0] == i:
+ except IndexError as v:
+ if v.args[0] == i:
return default
raise
@@ -195,7 +209,7 @@
if not size:
size = environment.get('HTTP_CONTENT_LENGTH')
if not size or int(size) < 65536:
- self.cacheStream = StringIO()
+ self.cacheStream = BytesIO()
else:
self.cacheStream = tempfile.TemporaryFile()
self.size = size and int(size) or -1
@@ -225,7 +239,7 @@
def readlines(self, hint=0):
data = self.stream.readlines(hint)
- self.cacheStream.write(''.join(data))
+ self.cacheStream.write(b''.join(data))
return data
@@ -352,7 +366,7 @@
script = get_env('SCRIPT_NAME', '').strip()
# _script and the other _names are meant for URL construction
- self._app_names = filter(None, script.split('/'))
+ self._app_names = [f for f in script.split('/') if f]
# get server URL and store it too, since we are already looking it up
server_url = get_env('SERVER_URL', None)
@@ -379,9 +393,9 @@
else:
protocol = 'http'
- if environ.has_key('HTTP_HOST'):
+ if 'HTTP_HOST' in environ:
host = environ['HTTP_HOST'].strip()
- hostname, port = urllib.splitport(host)
+ hostname, port = splitport(host)
else:
hostname = environ.get('SERVER_NAME', '').strip()
port = environ.get('SERVER_PORT', '')
@@ -401,13 +415,18 @@
# ignore cookies on a CookieError
try:
- c = Cookie.SimpleCookie(text)
- except Cookie.CookieError, e:
- eventlog.warn(e)
+ c = cookies.SimpleCookie(text)
+ except cookies.CookieError as e:
+ eventlog.warning(e)
return result
for k,v in c.items():
- result[unicode(k, ENCODING)] = unicode(v.value, ENCODING)
+ # recode cookie value to ENCODING (UTF-8)
+ rk = _u(k if type(k) == bytes
+ else k.encode('latin1'), ENCODING)
+ rv = _u(v.value if type(v.value) == bytes
+ else v.value.encode('latin1'), ENCODING)
+ result[rk] = rv
return result
@@ -491,7 +510,9 @@
'See IHTTPCredentials'
if self._auth and self._auth.lower().startswith('basic '):
encoded = self._auth.split(None, 1)[-1]
- name, password = encoded.decode("base64").split(':', 1)
+ decoded = base64.b64decode(encoded.encode('iso-8859-1'))
+ name, password = bytes.split(decoded, b':', 1)
+ #name, password = base64.b64decode(encoded.encode('ascii')).split(':', 1)
return name, password
def unauthorized(self, challenge):
@@ -521,8 +542,7 @@
raise IndexError(level)
names = names[:-level]
# See: http://www.ietf.org/rfc/rfc2718.txt, Section 2.2.5
- names = [urllib.quote(name.encode("utf-8"), safe='/+@')
- for name in names]
+ names = [quote(name.encode("utf-8"), safe='/+@') for name in names]
if path_only:
if not names:
@@ -544,8 +564,7 @@
names = self._app_names
# See: http://www.ietf.org/rfc/rfc2718.txt, Section 2.2.5
- names = [urllib.quote(name.encode("utf-8"), safe='/+@')
- for name in names]
+ names = [quote(name.encode("utf-8"), safe='/+@') for name in names]
if path_only:
return names and ('/' + '/'.join(names)) or '/'
@@ -784,7 +803,7 @@
def consumeBody(self):
'See IHTTPResponse'
- return ''.join(self._result)
+ return b''.join(self._result)
def consumeBodyIter(self):
@@ -797,12 +816,13 @@
content_type = self.getHeader('content-type')
if isinstance(body, unicode):
- if not unicode_mimetypes_re.match(content_type):
+ ct = content_type
+ if not unicode_mimetypes_re.match(ct):
raise ValueError(
'Unicode results must have a text, RFC 3023, or '
'+xml content type.')
- major, minor, params = zope.contenttype.parse.parse(content_type)
+ major, minor, params = zope.contenttype.parse.parse(ct)
if 'charset' in params:
encoding = params['charset']
@@ -833,13 +853,13 @@
Calls self.setBody() with an error response.
"""
t, v = exc_info[:2]
- if isinstance(t, (types.ClassType, type)):
+ if isinstance(t, CLASS_TYPES):
if issubclass(t, Redirect):
self.redirect(v.getLocation())
return
title = tname = t.__name__
else:
- title = tname = unicode(t)
+ title = tname = _u(t)
# Throwing non-protocol-specific exceptions is a good way
# for apps to control the status code.
@@ -854,7 +874,7 @@
self.setStatus(500, u"The engines can't take any more, Jim!")
def _html(self, title, content):
- t = cgi.escape(title)
+ t = escape(title)
return (
u"<html><head><title>%s</title></head>\n"
u"<body><h2>%s</h2>\n"
@@ -900,13 +920,22 @@
def _cookie_list(self):
try:
- c = Cookie.SimpleCookie()
- except Cookie.CookieError, e:
- eventlog.warn(e)
+ c = cookies.SimpleCookie()
+ except cookies.CookieError as e:
+ eventlog.warning(e)
return []
for name, attrs in self._cookies.items():
name = str(name)
- c[name] = attrs['value'].encode(ENCODING)
+
+ # In python-2.x, Cookie module expects plain bytes (not unicode).
+ # However, in python-3.x, latin-1 unicode string is expected (not
+ # bytes). We make this distinction clear here.
+ cookieval = attrs['value'].encode(ENCODING)
+ if PYTHON2:
+ c[name] = cookieval
+ else:
+ c[name] = cookieval.decode('latin-1')
+
for k,v in attrs.items():
if k == 'value':
continue
@@ -918,7 +947,7 @@
k = 'max-age'
elif k == 'comment':
# Encode rather than throw an exception
- v = urllib.quote(v.encode('utf-8'), safe="/?:@&+")
+ v = quote(v.encode('utf-8'), safe="/?:@&+")
c[name][k] = str(v)
return str(c).splitlines()
@@ -929,16 +958,15 @@
"for more information."
)
-def sort_charsets(x, y):
- if y[1] == 'utf-8':
- return 1
- if x[1] == 'utf-8':
- return -1
- return cmp(y, x)
+def sort_charsets(charset):
+ # Make utf-8 to be the last element of the sorted list
+ if charset[1] == 'utf-8':
+ return (1, charset)
+ # Otherwise, sort by charset
+ return (0, charset)
-
def extract_host(url):
- scheme, host, path, query, fragment = urlparse.urlsplit(url)
+ scheme, host, path, query, fragment = urlsplit(url)
if ':' not in host:
port = DEFAULT_PORTS.get(scheme)
if port:
@@ -996,7 +1024,7 @@
# range , unlike many other encodings. Since Zope can easily use very
# different ranges, like providing a French-Chinese dictionary, it is
# always good to use UTF-8.
- charsets.sort(sort_charsets)
+ charsets.sort(key=sort_charsets, reverse=True)
charsets = [charset for quality, charset in charsets]
if sawstar and 'utf-8' not in charsets:
charsets.insert(0, 'utf-8')
@@ -1034,6 +1062,8 @@
self.body = body
def __iter__(self):
+ if isinstance(self.body, bytes):
+ return iter([self.body])
return iter(self.body)
Modified: zope.publisher/trunk/src/zope/publisher/httpresults.txt
===================================================================
--- zope.publisher/trunk/src/zope/publisher/httpresults.txt 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/httpresults.txt 2013-02-21 14:06:47 UTC (rev 129566)
@@ -67,17 +67,27 @@
To close, we'll build a quick example so you can see it working.
+(To make the code work in both python-2.x and python-3.x, define ``unicode`` name for
+python-3.x:
+
+ >>> import sys
+ >>> if sys.version_info[0] > 2:
+ ... unicode = str
+
>>> import zope.interface
>>> import zope.component
>>> from zope.publisher.browser import TestRequest
>>> from zope.publisher.interfaces.http import IResult, IHTTPRequest
- >>> import cgi
+ >>> try:
+ ... from html import escape
+ ... except ImportError:
+ ... from cgi import escape
>>> @zope.interface.implementer(IResult)
... @zope.component.adapter(unicode, IHTTPRequest)
... def do_something_silly_to_unicode_results(val, request):
... request.response.setHeader('X-Silly', 'Yes')
... return (u'<html>\n<head>\n<title>raw</title>\n</head>\n<body>\n' +
- ... cgi.escape(val) + '\n</body>\n</html>')
+ ... escape(val) + '\n</body>\n</html>')
...
>>> zope.component.provideAdapter(do_something_silly_to_unicode_results)
@@ -93,7 +103,7 @@
'text/html;charset=utf-8'
>>> res = tuple(request.response.consumeBodyIter())
>>> res
- ('<html>\n<head>\n<title>raw</title>\n</head>\n<body>\n<h1>Foo!</h1>\n</body>\n</html>',)
+ (b'<html>\n<head>\n<title>raw</title>\n</head>\n<body>\n<h1>Foo!</h1>\n</body>\n</html>',)
>>> len(res[0]) == int(request.response.getHeader('content-length'))
True
Modified: zope.publisher/trunk/src/zope/publisher/interfaces/__init__.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/interfaces/__init__.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/interfaces/__init__.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -62,10 +62,10 @@
def __str__(self):
try:
- ob = `self.ob`
+ ob = repr(self.ob)
except:
ob = 'unprintable object'
- return 'Object: %s, name: %s' % (ob, `self.name`)
+ return 'Object: %s, name: %s' % (ob, repr(self.name))
class IDebugError(ITraversalException):
def getObject():
Modified: zope.publisher/trunk/src/zope/publisher/paste.txt
===================================================================
--- zope.publisher/trunk/src/zope/publisher/paste.txt 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/paste.txt 2013-02-21 14:06:47 UTC (rev 129566)
@@ -29,6 +29,11 @@
Detailed example
================
+We will use python3 compatible print statement, so import it from the
+future.
+
+ >>> from __future__ import print_function
+
There's a sample publication class in
zope.publisher.tests.test_paste.SamplePublication. It is a class that
takes a global_config positional argument and arbitrary keyword
@@ -55,13 +60,13 @@
environment dictionary and a start-response function:
>>> def start_response(status, headers):
- ... print status
- >>> import cStringIO
+ ... print(status)
+ >>> import io
>>> env = {'CONTENT_TYPE': 'text/plain', 'PATH_INFO': '/a/b',
- ... 'REQUEST_METHOD': 'GET', 'wsgi.input': cStringIO.StringIO('')}
+ ... 'REQUEST_METHOD': 'GET', 'wsgi.input': io.BytesIO(b'')}
>>> for data in app(env, start_response):
- ... print data
+ ... print(data.decode('latin-1'))
200 Ok
<html><body>Thanks for your request:<br />
<h1>BrowserRequest</h1>
@@ -70,7 +75,7 @@
PATH_INFO: /a/b
QUERY_STRING:
REQUEST_METHOD: GET
- wsgi.input: <cStringIO.StringI object at ...>
+ wsgi.input: <...io.BytesIO object at ...>
</pre>
<h1>Publication arguments:</h1>
Globals: {'global_option': 42}<br />
Modified: zope.publisher/trunk/src/zope/publisher/principallogging.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/principallogging.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/principallogging.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -30,4 +30,5 @@
self.principal = principal
def getLogMessage(self):
- return self.principal.id.encode('ascii', 'backslashreplace')
+ pid = self.principal.id
+ return pid.encode('ascii', 'backslashreplace').decode('latin1')
Modified: zope.publisher/trunk/src/zope/publisher/publish.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/publish.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/publish.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -16,6 +16,8 @@
Provide an apply-like facility that works with any mapping object
"""
import sys
+
+import six
from zope import component
from zope.interface import implementer
from zope.publisher.interfaces import Retry, IReRaiseException
@@ -39,11 +41,11 @@
if bases is not None:
raise TypeError("mapply() can not call class constructors")
- im_func = getattr(unwrapped, 'im_func', None)
+ im_func = getattr(unwrapped, '__func__', None)
if im_func is not None:
unwrapped = im_func
wrapperCount += 1
- elif getattr(unwrapped, 'func_code', None) is not None:
+ elif getattr(unwrapped, '__code__', None) is not None:
break
else:
unwrapped = getattr(unwrapped, '__call__' , None)
@@ -64,8 +66,8 @@
unwrapped, wrapperCount = unwrapMethod(unwrapped)
- code = unwrapped.func_code
- defaults = unwrapped.func_defaults
+ code = unwrapped.__code__
+ defaults = unwrapped.__defaults__
names = code.co_varnames[wrapperCount:code.co_argcount]
nargs = len(names)
@@ -154,7 +156,7 @@
break # Successful.
- except Retry, retryException:
+ except Retry as retryException:
if request.supportsRetry():
# Create a copy of the request and use it.
newrequest = request.retry()
@@ -187,7 +189,7 @@
response = request.response
if to_raise is not None:
- raise to_raise[0], to_raise[1], to_raise[2]
+ six.reraise(to_raise[0], to_raise[1], to_raise[2])
finally:
to_raise = None # Avoid circ. ref.
Modified: zope.publisher/trunk/src/zope/publisher/skinnable.txt
===================================================================
--- zope.publisher/trunk/src/zope/publisher/skinnable.txt 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/skinnable.txt 2013-02-21 14:06:47 UTC (rev 129566)
@@ -88,8 +88,8 @@
Now our request provides IJSONRequest because it implement that interface:
- >>> from StringIO import StringIO
- >>> request = JSONRequest(StringIO(''), {})
+ >>> from io import BytesIO
+ >>> request = JSONRequest(BytesIO(b''), {})
>>> IJSONRequest.providedBy(request)
True
@@ -160,7 +160,7 @@
>>> sm = zope.component.getSiteManager()
>>> sm.registerAdapter(
... IJSONDefaultLayer, (IJSONRequest,), IDefaultSkin, name='default')
- >>> request = JSONRequest(StringIO(''), {})
+ >>> request = JSONRequest(BytesIO(b''), {})
>>> IJSONDefaultLayer.providedBy(request)
False
>>> setDefaultSkin(request)
@@ -210,7 +210,7 @@
setDefaultSkin uses the custom layer interface instead of IJSONDefaultLayer:
- >>> request = JSONRequest(StringIO(''), {})
+ >>> request = JSONRequest(BytesIO(b''), {})
>>> IMySkin.providedBy(request)
False
@@ -229,7 +229,7 @@
method are replaced by the applied layer/skin interface. This is important
for our retry pattern which will ensure that we start with a clean request:
- >>> request = JSONRequest(StringIO(''), {})
+ >>> request = JSONRequest(BytesIO(b''), {})
>>> class IFoo(Interface):
... pass
@@ -261,7 +261,7 @@
Let's start with a fresh request:
- >>> request = JSONRequest(StringIO(''), {})
+ >>> request = JSONRequest(BytesIO(b''), {})
Now we can apply the SkinA:
@@ -287,7 +287,7 @@
If we set a default skin and later apply a custom skin, the default skin get
removed at the time the applySkin get called within a new ISkinType:
- >>> request = JSONRequest(StringIO(''), {})
+ >>> request = JSONRequest(BytesIO(b''), {})
Note, that our IMySkin is the default skin for IJSONRequest. We can aprove that
by lookup an IDefaultSkin interface for our request:
@@ -319,12 +319,17 @@
SkinChangedEvent
----------------
+We will use python-3 style print function, so we import it from the
+future:
+
+ >>> from __future__ import print_function
+
Changing the skin on a request triggers the ISkinChangedEvent event:
>>> import zope.component
>>> from zope.publisher.interfaces import ISkinChangedEvent
>>> def receiveSkinEvent(event):
- ... print "Notified SkinEvent for:", event.request.__class__.__name__
+ ... print("Notified SkinEvent for: %s" % event.request.__class__.__name__)
>>> zope.component.provideHandler(receiveSkinEvent, (ISkinChangedEvent,))
>>> applySkin(request, ISkinA)
Notified SkinEvent for: JSONRequest
Modified: zope.publisher/trunk/src/zope/publisher/testing.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/testing.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/testing.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -12,12 +12,29 @@
#
##############################################################################
+import re
import contextlib
import zope.publisher.browser
import zope.security.management
import zope.security.testing
+from zope.testing import renormalizing
+from zope.publisher._compat import PYTHON2
+if PYTHON2:
+ rules = [(re.compile("b('.*?')"), r"\1"),
+ (re.compile('b(".*?")'), r"\1"),
+ ]
+ output_checker = renormalizing.RENormalizing(rules)
+else:
+ rules = [(re.compile("u('.*?')"), r"\1"),
+ (re.compile('u(".*?")'), r"\1"),
+ (re.compile("b('.*?')"), r"\1"),
+ (re.compile('b(".*?")'), r"\1"),
+ ]
+ output_checker = renormalizing.RENormalizing(rules)
+
+
# These are enhanced versions of the ones in zope.security.testing,
# they use a TestRequest instead of a TestParticipation.
Modified: zope.publisher/trunk/src/zope/publisher/tests/basetestiapplicationrequest.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/basetestiapplicationrequest.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/basetestiapplicationrequest.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -53,4 +53,4 @@
# Now make sure that request.get can actually deal with return
# self back to us correctly:
- self.assert_(request.get('REQUEST') is request)
+ self.assertTrue(request.get('REQUEST') is request)
Modified: zope.publisher/trunk/src/zope/publisher/tests/basetestipublicationrequest.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/basetestipublicationrequest.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/basetestipublicationrequest.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -58,15 +58,15 @@
rcresource2 = sys.getrefcount(resource2)
request.hold(resource2)
- self.failUnless(sys.getrefcount(resource) > rcresource)
- self.failUnless(sys.getrefcount(resource2) > rcresource2)
- self.failIf(resource2.released)
+ self.assertTrue(sys.getrefcount(resource) > rcresource)
+ self.assertTrue(sys.getrefcount(resource2) > rcresource2)
+ self.assertFalse(resource2.released)
request.close()
- self.failUnless(resource2.released)
+ self.assertTrue(resource2.released)
# Responses are not unreferenced during close()
- self.failUnless(sys.getrefcount(response) >= rcresponse)
+ self.assertTrue(sys.getrefcount(response) >= rcresponse)
self.assertEqual(sys.getrefcount(resource), rcresource)
self.assertEqual(sys.getrefcount(resource2), rcresource2)
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_baserequest.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_baserequest.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_baserequest.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -24,7 +24,7 @@
from zope.publisher.tests.basetestiapplicationrequest \
import BaseTestIApplicationRequest
-from StringIO import StringIO
+from io import BytesIO
from zope.interface import Interface, providedBy, alsoProvides
from zope.component import provideAdapter
@@ -35,7 +35,7 @@
def _Test__new(self, **kw):
from zope.publisher.base import BaseRequest
- return BaseRequest(StringIO(''), kw)
+ return BaseRequest(BytesIO(), kw)
def _Test__expectedViewType(self):
return None # we don't expect
@@ -43,8 +43,8 @@
def test_IApplicationRequest_bodyStream(self):
from zope.publisher.base import BaseRequest
- request = BaseRequest(StringIO('spam'), {})
- self.assertEqual(request.bodyStream.read(), 'spam')
+ request = BaseRequest(BytesIO(b'spam'), {})
+ self.assertEqual(request.bodyStream.read(), b'spam')
def test_IPublicationRequest_getPositionalArguments(self):
self.assertEqual(self._Test__new().getPositionalArguments(), ())
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_baseresponse.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_baseresponse.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_baseresponse.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -18,9 +18,7 @@
from zope.publisher.base import BaseResponse
from zope.publisher.interfaces import IResponse
from zope.interface.verify import verifyObject
-from StringIO import StringIO
-
class TestBaseResponse(TestCase):
def test_interface(self):
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_browser.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_browser.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_browser.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -13,14 +13,23 @@
##############################################################################
"""Test zope.publisher.browser doctests
"""
+import re
import unittest
from doctest import DocTestSuite
+from zope.testing.renormalizing import RENormalizing
+
__docformat__ = "reStructuredText"
def test_suite():
+
+ checker = RENormalizing([
+ # Python 3 includes module name in exceptions
+ (re.compile(r"zope.publisher.interfaces.NotFound"), "NotFound"),
+ ])
+
return unittest.TestSuite((
- DocTestSuite('zope.publisher.browser'),
+ DocTestSuite('zope.publisher.browser', checker=checker),
))
if __name__ == '__main__':
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_browserlanguages.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_browserlanguages.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_browserlanguages.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -60,7 +60,7 @@
return CacheableBrowserLanguages(request)
def test_cached_languages(self):
- eq = self.failUnlessEqual
+ eq = self.assertEqual
request = TestRequest("da, en, pt")
browser_languages = self.factory(request)
eq(list(browser_languages.getPreferredLanguages()), ["da", "en", "pt"])
@@ -73,12 +73,12 @@
return ModifiableBrowserLanguages(request)
def test_setPreferredLanguages(self):
- eq = self.failUnlessEqual
+ eq = self.assertEqual
request = TestRequest("da, en, pt")
browser_languages = self.factory(request)
eq(list(browser_languages.getPreferredLanguages()), ["da", "en", "pt"])
browser_languages.setPreferredLanguages(["ru", "en"])
- self.failUnless(request.localized)
+ self.assertTrue(request.localized)
eq(list(browser_languages.getPreferredLanguages()), ["ru", "en"])
def test_conflicting_adapters(self):
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_browserrequest.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_browserrequest.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_browserrequest.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -14,7 +14,7 @@
import sys
import unittest
-from StringIO import StringIO
+from io import BytesIO
from zope.interface import implementer, directlyProvides, Interface
from zope.interface.verify import verifyObject
@@ -29,6 +29,7 @@
from zope.publisher.interfaces.browser import IBrowserPublication
from zope.publisher.base import DefaultPublication
+from zope.publisher._compat import PYTHON2
from zope.publisher.tests.test_http import HTTPTests
from zope.publisher.tests.publication import TestPublication
@@ -39,15 +40,15 @@
from zope.publisher.tests.basetestiapplicationrequest \
import BaseTestIApplicationRequest
-LARGE_FILE_BODY = """-----------------------------1
+LARGE_FILE_BODY = b''.join([b"""-----------------------------1
Content-Disposition: form-data; name="upload"; filename="test"
Content-Type: text/plain
-Here comes some text! %s
+Here comes some text! """, (b'test'*1000), b"""
-----------------------------1--
-""" % ('test' * 1000)
+"""])
-IE_FILE_BODY = """-----------------------------1
+IE_FILE_BODY = b"""-----------------------------1
Content-Disposition: form-data; name="upload"; filename="C:\\Windows\\notepad.exe"
Content-Type: text/plain
@@ -55,7 +56,10 @@
-----------------------------1--
"""
+if not PYTHON2:
+ unicode = str
+
def publish(request):
publish_(request, handle_errors=0)
@@ -101,7 +105,7 @@
class Item(object):
"""Required docstring for the publisher."""
def __call__(self, a, b):
- return u"%s, %s" % (`a`, `b`)
+ return u"%s, %s" % (repr(a).lstrip('u'), repr(b).lstrip('u'))
class Item3(object):
"""Required docstring for the publisher."""
@@ -115,7 +119,7 @@
def index(self, a, b):
"""Required docstring for the publisher."""
- return u"%s, %s" % (`a`, `b`)
+ return u"%s, %s" % (repr(a).lstrip('u'), repr(b).lstrip('u'))
class Item2(object):
"""Required docstring for the publisher."""
@@ -131,35 +135,35 @@
self.app.folder.item2 = Item2()
self.app.folder.item3 = Item3()
- def _createRequest(self, extra_env={}, body=""):
+ def _createRequest(self, extra_env={}, body=b""):
env = self._testEnv.copy()
env.update(extra_env)
if len(body):
env['CONTENT_LENGTH'] = str(len(body))
publication = Publication(self.app)
- instream = StringIO(body)
+ instream = BytesIO(body)
request = TestBrowserRequest(instream, env)
request.setPublication(publication)
return request
def testTraversalToItem(self):
res = self._publisherResults()
- self.failUnlessEqual(
+ self.assertEqual(
res,
"Status: 200 Ok\r\n"
- "Content-Length: 7\r\n"
+ "Content-Length: 6\r\n"
"Content-Type: text/plain;charset=utf-8\r\n"
"X-Content-Type-Warning: guessed from content\r\n"
"X-Powered-By: Zope (www.zope.org), Python (www.python.org)\r\n"
"\r\n"
- "u'5', 6")
+ "'5', 6")
def testNoDefault(self):
request = self._createRequest()
response = request.response
publish(request)
- self.failIf(response.getBase())
+ self.assertFalse(response.getBase())
def testDefault(self):
extra = {'PATH_INFO': '/folder/item2'}
@@ -171,7 +175,7 @@
def testDefaultPOST(self):
extra = {'PATH_INFO': '/folder/item2', "REQUEST_METHOD": "POST"}
- request = self._createRequest(extra, body='a=5&b:int=6')
+ request = self._createRequest(extra, body=b'a=5&b:int=6')
response = request.response
publish(request)
self.assertEqual(response.getBase(),
@@ -187,7 +191,7 @@
'CONTENT_TYPE': 'multipart/form-data;\
boundary=---------------------------1'}
- body = """-----------------------------1
+ body = b"""-----------------------------1
Content-Disposition: form-data; name="field.contentType"
...
application/octet-stream
@@ -207,11 +211,11 @@
request = self._createRequest(extra, body=LARGE_FILE_BODY)
request.processInputs()
- self.assert_(request.form['upload'].name)
+ self.assertTrue(request.form['upload'].name)
request = self._createRequest(extra, body=IE_FILE_BODY)
request.processInputs()
- self.assertEquals(request.form['upload'].filename, 'notepad.exe')
+ self.assertEqual(request.form['upload'].filename, 'notepad.exe')
def testDefault2(self):
@@ -227,14 +231,14 @@
request = self._createRequest(extra)
response = request.response
publish(request)
- self.failIf(response.getBase())
+ self.assertFalse(response.getBase())
def testDefault4(self):
extra = {'PATH_INFO': '/folder/item2/view/'}
request = self._createRequest(extra)
response = request.response
publish(request)
- self.failIf(response.getBase())
+ self.assertFalse(response.getBase())
def testDefault6(self):
extra = {'PATH_INFO': '/folder/item2/'}
@@ -261,7 +265,7 @@
{u'a':u'5', u'b':6})
def testFormNoEncodingUsesUTF8(self):
- encoded = 'K\xc3\x83\xc2\xb6hlerstra\xc3\x83\xc2\x9fe'
+ encoded = 'K\xc3\xb6hlerstra\xc3\x9fe'
extra = {
# if nothing else is specified, form data should be
# interpreted as UTF-8, as this stub query string is
@@ -271,8 +275,8 @@
# many mainstream browsers do not send HTTP_ACCEPT_CHARSET
del request._environ['HTTP_ACCEPT_CHARSET']
publish(request)
- self.assert_(isinstance(request.form[u'street'], unicode))
- self.assertEqual(unicode(encoded, 'utf-8'), request.form['street'])
+ self.assertTrue(isinstance(request.form[u'street'], unicode))
+ self.assertEqual(u'K\xf6hlerstra\xdfe', request.form['street'])
def testFormAcceptsStarButNotUTF8(self):
extra = {
@@ -292,7 +296,7 @@
def testQueryStringIgnoredForPOST(self):
request = self._createRequest(
{"REQUEST_METHOD": "POST",
- 'PATH_INFO': '/folder/item3'}, body='c=5&d:int=6')
+ 'PATH_INFO': '/folder/item3'}, body=b'c=5&d:int=6')
publish(request)
self.assertEqual(request.form, {u'c': u'5', u'd': 6})
self.assertEqual(request.get('QUERY_STRING'), 'a=5&b:int=6')
@@ -307,22 +311,22 @@
extra = {'QUERY_STRING':'a.x:tuple:record=5&a.x:tuple:record=6&b=1'}
request = self._createRequest(extra)
publish(request)
- keys = request.form.keys()
- keys.sort()
+ keys = sorted(request.form.keys())
self.assertEqual(keys, [u'a',u'b'])
self.assertEqual(request.form[u'b'], u'1')
- self.assertEqual(request.form[u'a'].keys(), [u'x'])
+ self.assertEqual(list(request.form[u'a'].keys()), [u'x'])
self.assertEqual(request.form[u'a'][u'x'], (u'5',u'6'))
self.assertEqual(request.form[u'a'].x, (u'5',u'6'))
- self.assertEqual(str(request.form[u'a']), "{x: (u'5', u'6')}")
- self.assertEqual(repr(request.form[u'a']), "{x: (u'5', u'6')}")
+ self.assertEqual(str(request.form[u'a']).replace("u'", "'"),
+ "{x: ('5', '6')}")
+ self.assertEqual(repr(request.form[u'a']).replace("u'", "'"),
+ "{x: ('5', '6')}")
def testFormRecordsTypes(self):
extra = {'QUERY_STRING':'a.x:records=5&a.x:records=6&b=1'}
request = self._createRequest(extra)
publish(request)
- keys = request.form.keys()
- keys.sort()
+ keys = sorted(request.form.keys())
self.assertEqual(keys, [u'a',u'b'])
self.assertEqual(request.form[u'b'], u'1')
self.assertEqual(len(request.form[u'a']), 2)
@@ -330,16 +334,17 @@
self.assertEqual(request.form[u'a'][0].x, u'5')
self.assertEqual(request.form[u'a'][1][u'x'], u'6')
self.assertEqual(request.form[u'a'][1].x, u'6')
- self.assertEqual(str(request.form[u'a']), "[{x: u'5'}, {x: u'6'}]")
- self.assertEqual(repr(request.form[u'a']), "[{x: u'5'}, {x: u'6'}]")
+ self.assertEqual(str(request.form[u'a']).replace("u'", "'"),
+ "[{x: '5'}, {x: '6'}]")
+ self.assertEqual(repr(request.form[u'a']).replace("u'", "'"),
+ "[{x: '5'}, {x: '6'}]")
def testFormMultipleRecordsTypes(self):
extra = {'QUERY_STRING':'a.x:records:int=5&a.y:records:int=51'
'&a.x:records:int=6&a.y:records:int=61&b=1'}
request = self._createRequest(extra)
publish(request)
- keys = request.form.keys()
- keys.sort()
+ keys = sorted(request.form.keys())
self.assertEqual(keys, [u'a',u'b'])
self.assertEqual(request.form[u'b'], u'1')
self.assertEqual(len(request.form[u'a']), 2)
@@ -360,15 +365,16 @@
extra = {'QUERY_STRING':'a.x:list:record=5&a.x:list:record=6&b=1'}
request = self._createRequest(extra)
publish(request)
- keys = request.form.keys()
- keys.sort()
+ keys = sorted(request.form.keys())
self.assertEqual(keys, [u'a',u'b'])
self.assertEqual(request.form[u'b'], u'1')
- self.assertEqual(request.form[u'a'].keys(), [u'x'])
+ self.assertEqual(list(request.form[u'a'].keys()), [u'x'])
self.assertEqual(request.form[u'a'][u'x'], [u'5',u'6'])
self.assertEqual(request.form[u'a'].x, [u'5',u'6'])
- self.assertEqual(str(request.form[u'a']), "{x: [u'5', u'6']}")
- self.assertEqual(repr(request.form[u'a']), "{x: [u'5', u'6']}")
+ self.assertEqual(str(request.form[u'a']).replace("u'", "'"),
+ "{x: ['5', '6']}")
+ self.assertEqual(repr(request.form[u'a']).replace("u'", "'"),
+ "{x: ['5', '6']}")
def testFormListTypes2(self):
extra = {'QUERY_STRING':'a=5&a=6&b=1'}
@@ -514,9 +520,9 @@
dict(REQUEST_METHOD='POST',
CONTENT_TYPE='application/x-foo',
),
- 'test body')
+ b'test body')
request.processInputs()
- self.assertEqual(request.bodyStream.read(), 'test body')
+ self.assertEqual(request.bodyStream.read(), b'test body')
def test_post_body_not_necessarily(self):
request = self._createRequest(
@@ -524,9 +530,9 @@
CONTENT_TYPE='application/x-www-form-urlencoded',
QUERY_STRING='',
),
- 'x=1&y=2')
+ b'x=1&y=2')
request.processInputs()
- self.assertEqual(request.bodyStream.read(), '')
+ self.assertEqual(request.bodyStream.read(), b'')
self.assertEqual(dict(request.form), dict(x='1', y='2'))
request = self._createRequest(
@@ -535,9 +541,9 @@
'; charset=UTF-8'),
QUERY_STRING='',
),
- 'x=1&y=2')
+ b'x=1&y=2')
request.processInputs()
- self.assertEqual(request.bodyStream.read(), '')
+ self.assertEqual(request.bodyStream.read(), b'')
self.assertEqual(dict(request.form), dict(x='1', y='2'))
@implementer(IBrowserPublication)
@@ -554,11 +560,11 @@
def _Test__new(self, environ=None, **kw):
if environ is None:
environ = kw
- return BrowserRequest(StringIO(''), environ)
+ return BrowserRequest(BytesIO(b''), environ)
def test_IApplicationRequest_bodyStream(self):
- request = BrowserRequest(StringIO('spam'), {})
- self.assertEqual(request.bodyStream.read(), 'spam')
+ request = BrowserRequest(BytesIO(b'spam'), {})
+ self.assertEqual(request.bodyStream.read(), b'spam')
# Needed by BaseTestIEnumerableMapping tests:
def _IEnumerableMapping__stateDict(self):
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_browserresponse.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_browserresponse.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_browserresponse.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -35,7 +35,7 @@
<blah>
</html>
""")
- self.assert_(response.getHeader('content-type').startswith("text/html")
+ self.assertTrue(response.getHeader('content-type').startswith("text/html")
)
response = BrowserResponse()
@@ -45,7 +45,7 @@
<blah>
</html>
""")
- self.assert_(response.getHeader('content-type').startswith("text/html")
+ self.assertTrue(response.getHeader('content-type').startswith("text/html")
)
response = BrowserResponse()
@@ -55,7 +55,7 @@
<blah>
</html>
""")
- self.assert_(response.getHeader('content-type').startswith("text/html")
+ self.assertTrue(response.getHeader('content-type').startswith("text/html")
)
response = BrowserResponse()
@@ -66,14 +66,14 @@
<blah>
</html>
""")
- self.assert_(response.getHeader('content-type').startswith("text/html")
+ self.assertTrue(response.getHeader('content-type').startswith("text/html")
)
response = BrowserResponse()
response.setResult(
"""Hello world
""")
- self.assert_(response.getHeader('content-type').startswith(
+ self.assertTrue(response.getHeader('content-type').startswith(
"text/plain")
)
@@ -81,7 +81,7 @@
response.setResult(
"""<p>Hello world
""")
- self.assert_(
+ self.assertTrue(
response.getHeader('content-type').startswith("text/plain")
)
@@ -95,7 +95,7 @@
# the result :(
response = BrowserResponse()
response.setStatus(304)
- response.setResult('')
+ response.setResult(b'')
self.assertEqual(response.getHeader('content-type'), None)
def testInsertBase(self):
@@ -106,22 +106,22 @@
# Make sure that bases are inserted
response.setBase('http://localhost/folder/')
- self.assert_(
- '<base href="http://localhost/folder/" />' in
- insertBase('<html><head></head><body>Page</body></html>'))
+ self.assertTrue(
+ b'<base href="http://localhost/folder/" />' in
+ insertBase(b'<html><head></head><body>Page</body></html>'))
# Ensure that unicode bases work as well
response.setBase(u'http://localhost/folder/')
- body = insertBase('<html><head></head><body>Page</body></html>')
- self.assert_(isinstance(body, str))
- self.assert_('<base href="http://localhost/folder/" />' in body)
+ body = insertBase(b'<html><head></head><body>Page</body></html>')
+ self.assertTrue(isinstance(body, bytes))
+ self.assertTrue(b'<base href="http://localhost/folder/" />' in body)
# Ensure that encoded bodies work, when a base is inserted.
response.setBase('http://localhost/folder')
result = insertBase(
- '<html><head></head><body>\xc3\x9bung</body></html>')
- self.assert_(isinstance(body, str))
- self.assert_('<base href="http://localhost/folder" />' in result)
+ b'<html><head></head><body>\xc3\x9bung</body></html>')
+ self.assertTrue(isinstance(body, bytes))
+ self.assertTrue(b'<base href="http://localhost/folder" />' in result)
def testInsertBaseInSetResultUpdatesContentLength(self):
# Make sure that the Content-Length header is updated to account
@@ -131,13 +131,13 @@
base = 'http://localhost/folder/'
response.setBase(base)
inserted_text = '\n<base href="%s" />\n' % base
- html_page = """<html>
+ html_page = b"""<html>
<head></head>
<blah>
</html>
"""
response.setResult(html_page)
- self.assertEquals(
+ self.assertEqual(
int(response.getHeader('content-length')),
len(html_page) + len(inserted_text))
@@ -155,19 +155,19 @@
exc_info = sys.exc_info()
response.handleException(exc_info)
- self.assertEquals(response.getHeader("content-type"),
+ self.assertEqual(response.getHeader("content-type"),
"text/html;charset=utf-8")
- self.assertEquals(response.getStatus(), 500)
- self.assert_(response.consumeBody() in
- ["<html><head><title><type 'exceptions.ValueError'></title></head>\n"
- "<body><h2><type 'exceptions.ValueError'></h2>\n"
- "A server error occurred.\n"
- "</body></html>\n",
- "<html><head><title>ValueError</title></head>\n"
- "<body><h2>ValueError</h2>\n"
- "A server error occurred.\n"
- "</body></html>\n"]
- )
+ self.assertEqual(response.getStatus(), 500)
+ self.assertTrue(response.consumeBody() in
+ [b"<html><head><title><type 'exceptions.ValueError'></title></head>\n"
+ b"<body><h2><type 'exceptions.ValueError'></h2>\n"
+ b"A server error occurred.\n"
+ b"</body></html>\n",
+ b"<html><head><title>ValueError</title></head>\n"
+ b"<body><h2>ValueError</h2>\n"
+ b"A server error occurred.\n"
+ b"</body></html>\n"]
+ )
def test_suite():
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_ftp.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_ftp.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_ftp.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -14,14 +14,14 @@
"""FTP Publisher Tests
"""
import sys
-from cStringIO import StringIO
+from io import BytesIO
from unittest import TestCase, TestSuite, main, makeSuite
import zope.publisher.ftp
class Test(TestCase):
def setUp(self):
- self.__input = StringIO('')
+ self.__input = BytesIO(b'')
env = {'credentials': ('bob', '123'),
'path': '/a/b/c',
'command': 'foo',
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_http.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_http.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_http.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -17,8 +17,7 @@
import sys
import tempfile
import unittest
-from cStringIO import StringIO
-from Cookie import CookieError
+from io import BytesIO
from doctest import DocFileSuite
import zope.event
@@ -47,7 +46,12 @@
import BaseTestIPublisherRequest
from zope.publisher.tests.basetestiapplicationrequest \
import BaseTestIApplicationRequest
+from zope.publisher.testing import output_checker
+if sys.version_info[0] > 2:
+ from http.cookies import CookieError
+else:
+ from Cookie import CookieError
@implementer(ILoggingInfo)
@@ -63,7 +67,7 @@
return self._id
-data = '''\
+data = b'''\
line 1
line 2
line 3'''
@@ -86,8 +90,8 @@
return result
def testRead(self):
- stream = HTTPInputStream(StringIO(data), {})
- output = ''
+ stream = HTTPInputStream(BytesIO(data), {})
+ output = b''
self.assertEqual(output, self.getCacheStreamValue(stream))
output += stream.read(5)
self.assertEqual(output, self.getCacheStreamValue(stream))
@@ -96,7 +100,7 @@
self.assertEqual(data, self.getCacheStreamValue(stream))
def testReadLine(self):
- stream = HTTPInputStream(StringIO(data), {})
+ stream = HTTPInputStream(BytesIO(data), {})
output = stream.readline()
self.assertEqual(output, self.getCacheStreamValue(stream))
output += stream.readline()
@@ -108,15 +112,15 @@
self.assertEqual(data, self.getCacheStreamValue(stream))
def testReadLines(self):
- stream = HTTPInputStream(StringIO(data), {})
- output = ''.join(stream.readlines(4))
+ stream = HTTPInputStream(BytesIO(data), {})
+ output = b''.join(stream.readlines(4))
self.assertEqual(output, self.getCacheStreamValue(stream))
- output += ''.join(stream.readlines())
+ output += b''.join(stream.readlines())
self.assertEqual(output, self.getCacheStreamValue(stream))
self.assertEqual(data, self.getCacheStreamValue(stream))
def testGetCacheStream(self):
- stream = HTTPInputStream(StringIO(data), {})
+ stream = HTTPInputStream(BytesIO(data), {})
stream.read(5)
self.assertEqual(data, stream.getCacheStream().read())
@@ -127,24 +131,24 @@
# definitely over that).
# HTTPInputStream understands both CONTENT_LENGTH...
- stream = HTTPInputStream(StringIO(data), {'CONTENT_LENGTH': '100000'})
- self.assert_(isinstance(stream.getCacheStream(), TempFileType))
+ stream1 = HTTPInputStream(BytesIO(data), {'CONTENT_LENGTH': '100000'})
+ self.assertTrue(isinstance(stream1.getCacheStream(), TempFileType))
# ... and HTTP_CONTENT_LENGTH.
- stream = HTTPInputStream(StringIO(data), {'HTTP_CONTENT_LENGTH':
+ stream2 = HTTPInputStream(BytesIO(data), {'HTTP_CONTENT_LENGTH':
'100000'})
- self.assert_(isinstance(stream.getCacheStream(), TempFileType))
+ self.assertTrue(isinstance(stream2.getCacheStream(), TempFileType))
# If CONTENT_LENGTH is absent or empty, it takes the value
# given in HTTP_CONTENT_LENGTH:
- stream = HTTPInputStream(StringIO(data),
+ stream3 = HTTPInputStream(BytesIO(data),
{'CONTENT_LENGTH': '',
'HTTP_CONTENT_LENGTH': '100000'})
- self.assert_(isinstance(stream.getCacheStream(), TempFileType))
+ self.assertTrue(isinstance(stream3.getCacheStream(), TempFileType))
# In fact, HTTPInputStream can be instantiated with both an
# empty CONTENT_LENGTH and an empty HTTP_CONTENT_LENGTH:
- stream = HTTPInputStream(StringIO(data),
+ stream4 = HTTPInputStream(BytesIO(data),
{'CONTENT_LENGTH': '',
'HTTP_CONTENT_LENGTH': ''})
@@ -162,10 +166,10 @@
def read(self, size=-1):
if size == -1:
raise ServerHung
- return 'a'*size
+ return b'a'*size
stream = HTTPInputStream(NonClosingStream(), {'CONTENT_LENGTH': '10'})
- self.assertEquals(stream.getCacheStream().read(), 'aaaaaaaaaa')
+ self.assertEqual(stream.getCacheStream().read(), b'aaaaaaaaaa')
stream = HTTPInputStream(NonClosingStream(), {})
self.assertRaises(ServerHung, stream.getCacheStream)
@@ -194,26 +198,26 @@
class Item(object):
"""Required docstring for the publisher."""
def __call__(self, a, b):
- return "%s, %s" % (`a`, `b`)
+ return ("%s, %s" % (repr(a), repr(b))).encode('latin1')
self.app = AppRoot()
self.app.folder = Folder()
self.app.folder.item = Item()
self.app.xxx = Item()
- def _createRequest(self, extra_env={}, body=""):
+ def _createRequest(self, extra_env={}, body=b""):
env = self._testEnv.copy()
env.update(extra_env)
if len(body):
env['CONTENT_LENGTH'] = str(len(body))
publication = DefaultPublication(self.app)
- instream = StringIO(body)
+ instream = BytesIO(body)
request = HTTPRequest(instream, env)
request.setPublication(publication)
return request
- def _publisherResults(self, extra_env={}, body=""):
+ def _publisherResults(self, extra_env={}, body=b""):
request = self._createRequest(extra_env, body)
response = request.response
publish(request, handle_errors=False)
@@ -224,7 +228,7 @@
+
"\r\n".join([("%s: %s" % h) for h in headers]) + "\r\n\r\n"
+
- ''.join(response.consumeBody())
+ response.consumeBody().decode('utf8')
)
def test_double_dots(self):
@@ -252,7 +256,7 @@
def testTraversalToItem(self):
res = self._publisherResults()
- self.failUnlessEqual(
+ self.assertEqual(
res,
"Status: 200 Ok\r\n"
"Content-Length: 6\r\n"
@@ -264,35 +268,35 @@
# test HTTP/1.0
env = {'SERVER_PROTOCOL':'HTTP/1.0'}
- request = self._createRequest(env, '')
+ request = self._createRequest(env, b'')
location = request.response.redirect('http://foobar.com/redirected')
- self.assertEquals(location, 'http://foobar.com/redirected')
- self.assertEquals(request.response.getStatus(), 302)
- self.assertEquals(request.response.getHeader('location'), location)
+ self.assertEqual(location, 'http://foobar.com/redirected')
+ self.assertEqual(request.response.getStatus(), 302)
+ self.assertEqual(request.response.getHeader('location'), location)
# test HTTP/1.1
env = {'SERVER_PROTOCOL':'HTTP/1.1'}
- request = self._createRequest(env, '')
+ request = self._createRequest(env, b'')
location = request.response.redirect('http://foobar.com/redirected')
- self.assertEquals(request.response.getStatus(), 303)
+ self.assertEqual(request.response.getStatus(), 303)
# test explicit status
- request = self._createRequest(env, '')
+ request = self._createRequest(env, b'')
request.response.redirect('http://foobar.com/explicit', 304)
- self.assertEquals(request.response.getStatus(), 304)
+ self.assertEqual(request.response.getStatus(), 304)
# test non-string location, like URLGetter
- request = self._createRequest(env, '')
+ request = self._createRequest(env, b'')
request.response.redirect(request.URL)
- self.assertEquals(request.response.getStatus(), 303)
- self.assertEquals(request.response.getHeader('location'),
+ self.assertEqual(request.response.getStatus(), 303)
+ self.assertEqual(request.response.getHeader('location'),
str(request.URL))
def testUntrustedRedirect(self):
# Redirects are by default only allowed to target the same host as the
# request was directed to. This is to counter fishing.
- request = self._createRequest({}, '')
+ request = self._createRequest({}, b'')
self.assertRaises(
ValueError,
request.response.redirect, 'http://phishing-inc.com')
@@ -301,81 +305,81 @@
# host. They aren't really allowed per RFC but the response object
# supports them and people are probably using them.
location = request.response.redirect('/foo', trusted=False)
- self.assertEquals('/foo', location)
+ self.assertEqual('/foo', location)
# If we pass `trusted` for the redirect, we can redirect the browser
# anywhere we want, though.
location = request.response.redirect(
'http://my-friends.com', trusted=True)
- self.assertEquals('http://my-friends.com', location)
+ self.assertEqual('http://my-friends.com', location)
# We can redirect to our own full server URL, with or without a port
# being specified. Let's explicitly set a host name to test this is
# this is how virtual hosting works:
request.setApplicationServer('example.com')
location = request.response.redirect('http://example.com')
- self.assertEquals('http://example.com', location)
+ self.assertEqual('http://example.com', location)
request.setApplicationServer('example.com', port=8080)
location = request.response.redirect('http://example.com:8080')
- self.assertEquals('http://example.com:8080', location)
+ self.assertEqual('http://example.com:8080', location)
# The default port for HTTP and HTTPS may be omitted:
request.setApplicationServer('example.com')
location = request.response.redirect('http://example.com:80')
- self.assertEquals('http://example.com:80', location)
+ self.assertEqual('http://example.com:80', location)
request.setApplicationServer('example.com', port=80)
location = request.response.redirect('http://example.com')
- self.assertEquals('http://example.com', location)
+ self.assertEqual('http://example.com', location)
request.setApplicationServer('example.com', 'https')
location = request.response.redirect('https://example.com:443')
- self.assertEquals('https://example.com:443', location)
+ self.assertEqual('https://example.com:443', location)
request.setApplicationServer('example.com', 'https', 443)
location = request.response.redirect('https://example.com')
- self.assertEquals('https://example.com', location)
+ self.assertEqual('https://example.com', location)
def testUnregisteredStatus(self):
# verify we can set the status to an unregistered int value
- request = self._createRequest({}, '')
+ request = self._createRequest({}, b'')
request.response.setStatus(289)
- self.assertEquals(request.response.getStatus(), 289)
+ self.assertEqual(request.response.getStatus(), 289)
def testRequestEnvironment(self):
req = self._createRequest()
publish(req, handle_errors=0) # Force expansion of URL variables
- self.assertEquals(str(req.URL), 'http://foobar.com/folder/item')
- self.assertEquals(req.URL['-1'], 'http://foobar.com/folder')
- self.assertEquals(req.URL['-2'], 'http://foobar.com')
+ self.assertEqual(str(req.URL), 'http://foobar.com/folder/item')
+ self.assertEqual(req.URL['-1'], 'http://foobar.com/folder')
+ self.assertEqual(req.URL['-2'], 'http://foobar.com')
self.assertRaises(KeyError, req.URL.__getitem__, '-3')
- self.assertEquals(req.URL['0'], 'http://foobar.com')
- self.assertEquals(req.URL['1'], 'http://foobar.com/folder')
- self.assertEquals(req.URL['2'], 'http://foobar.com/folder/item')
+ self.assertEqual(req.URL['0'], 'http://foobar.com')
+ self.assertEqual(req.URL['1'], 'http://foobar.com/folder')
+ self.assertEqual(req.URL['2'], 'http://foobar.com/folder/item')
self.assertRaises(KeyError, req.URL.__getitem__, '3')
- self.assertEquals(req.URL.get('0'), 'http://foobar.com')
- self.assertEquals(req.URL.get('1'), 'http://foobar.com/folder')
- self.assertEquals(req.URL.get('2'), 'http://foobar.com/folder/item')
- self.assertEquals(req.URL.get('3', 'none'), 'none')
+ self.assertEqual(req.URL.get('0'), 'http://foobar.com')
+ self.assertEqual(req.URL.get('1'), 'http://foobar.com/folder')
+ self.assertEqual(req.URL.get('2'), 'http://foobar.com/folder/item')
+ self.assertEqual(req.URL.get('3', 'none'), 'none')
- self.assertEquals(req['SERVER_URL'], 'http://foobar.com')
- self.assertEquals(req['HTTP_HOST'], 'foobar.com')
- self.assertEquals(req['PATH_INFO'], '/folder/item')
- self.assertEquals(req['CONTENT_LENGTH'], '0')
+ self.assertEqual(req['SERVER_URL'], 'http://foobar.com')
+ self.assertEqual(req['HTTP_HOST'], 'foobar.com')
+ self.assertEqual(req['PATH_INFO'], '/folder/item')
+ self.assertEqual(req['CONTENT_LENGTH'], '0')
self.assertRaises(KeyError, req.__getitem__, 'HTTP_AUTHORIZATION')
- self.assertEquals(req['GATEWAY_INTERFACE'], 'TestFooInterface/1.0')
- self.assertEquals(req['HTTP_OFF_THE_WALL'], "Spam 'n eggs")
+ self.assertEqual(req['GATEWAY_INTERFACE'], 'TestFooInterface/1.0')
+ self.assertEqual(req['HTTP_OFF_THE_WALL'], "Spam 'n eggs")
self.assertRaises(KeyError, req.__getitem__,
'HTTP_WE_DID_NOT_PROVIDE_THIS')
def testRequestLocale(self):
eq = self.assertEqual
- unless = self.failUnless
+ unless = self.assertTrue
from zope.publisher.browser import BrowserLanguages
from zope.publisher.interfaces.http import IHTTPRequest
@@ -448,17 +452,17 @@
}
req = self._createRequest(extra_env=cookies)
- self.assertEquals(req.cookies[u'foo'], u'bar')
- self.assertEquals(req[u'foo'], u'bar')
+ self.assertEqual(req.cookies[u'foo'], u'bar')
+ self.assertEqual(req[u'foo'], u'bar')
- self.assertEquals(req.cookies[u'spam'], u'eggs')
- self.assertEquals(req[u'spam'], u'eggs')
+ self.assertEqual(req.cookies[u'spam'], u'eggs')
+ self.assertEqual(req[u'spam'], u'eggs')
- self.assertEquals(req.cookies[u'this'], u'Should be accepted')
- self.assertEquals(req[u'this'], u'Should be accepted')
+ self.assertEqual(req.cookies[u'this'], u'Should be accepted')
+ self.assertEqual(req[u'this'], u'Should be accepted')
# Reserved key
- self.failIf(req.cookies.has_key('path'))
+ self.assertFalse(req.cookies.has_key('path'))
def testCookieErrorToLog(self):
cookies = {
@@ -467,23 +471,23 @@
}
req = self._createRequest(extra_env=cookies)
- self.failIf(req.cookies.has_key('foo'))
- self.failIf(req.has_key('foo'))
+ self.assertFalse(req.cookies.has_key('foo'))
+ self.assertFalse(req.has_key('foo'))
- self.failIf(req.cookies.has_key('spam'))
- self.failIf(req.has_key('spam'))
+ self.assertFalse(req.cookies.has_key('spam'))
+ self.assertFalse(req.has_key('spam'))
- self.failIf(req.cookies.has_key('ldap/OU'))
- self.failIf(req.has_key('ldap/OU'))
+ self.assertFalse(req.cookies.has_key('ldap/OU'))
+ self.assertFalse(req.has_key('ldap/OU'))
# Reserved key
- self.failIf(req.cookies.has_key('path'))
+ self.assertFalse(req.cookies.has_key('path'))
def testCookiesUnicode(self):
# Cookie values are assumed to be UTF-8 encoded
cookies = {'HTTP_COOKIE': r'key="\342\230\243";'}
req = self._createRequest(extra_env=cookies)
- self.assertEquals(req.cookies[u'key'], u'\N{BIOHAZARD SIGN}')
+ self.assertEqual(req.cookies[u'key'], u'\N{BIOHAZARD SIGN}')
def testHeaders(self):
headers = {
@@ -491,33 +495,34 @@
'Another-Test': 'another',
}
req = self._createRequest(extra_env=headers)
- self.assertEquals(req.headers[u'TEST_HEADER'], u'test')
- self.assertEquals(req.headers[u'TEST-HEADER'], u'test')
- self.assertEquals(req.headers[u'test_header'], u'test')
- self.assertEquals(req.getHeader('TEST_HEADER', literal=True), u'test')
- self.assertEquals(req.getHeader('TEST-HEADER', literal=True), None)
- self.assertEquals(req.getHeader('test_header', literal=True), None)
- self.assertEquals(req.getHeader('Another-Test', literal=True),
+ self.assertEqual(req.headers[u'TEST_HEADER'], u'test')
+ self.assertEqual(req.headers[u'TEST-HEADER'], u'test')
+ self.assertEqual(req.headers[u'test_header'], u'test')
+ self.assertEqual(req.getHeader('TEST_HEADER', literal=True), u'test')
+ self.assertEqual(req.getHeader('TEST-HEADER', literal=True), None)
+ self.assertEqual(req.getHeader('test_header', literal=True), None)
+ self.assertEqual(req.getHeader('Another-Test', literal=True),
'another')
def testBasicAuth(self):
from zope.publisher.interfaces.http import IHTTPCredentials
+ import base64
req = self._createRequest()
verifyObject(IHTTPCredentials, req)
lpq = req._authUserPW()
- self.assertEquals(lpq, None)
+ self.assertEqual(lpq, None)
env = {}
- login, password = ("tim", "123:456")
- s = ("%s:%s" % (login, password)).encode("base64").rstrip()
- env['HTTP_AUTHORIZATION'] = "Basic %s" % s
+ login, password = (b"tim", b"123:456")
+ s = base64.b64encode(b':'.join((login, password)))
+ env['HTTP_AUTHORIZATION'] = "Basic %s" % s.decode('ascii')
req = self._createRequest(env)
lpw = req._authUserPW()
- self.assertEquals(lpw, (login, password))
+ self.assertEqual(lpw, (login, password))
def testSetPrincipal(self):
req = self._createRequest()
req.setPrincipal(UserStub("jim"))
- self.assertEquals(req.response.authUser, 'jim')
+ self.assertEqual(req.response.authUser, 'jim')
def test_method(self):
r = self._createRequest(extra_env={'REQUEST_METHOD':'SPAM'})
@@ -530,25 +535,25 @@
zope.event.subscribers.append(events.append)
req = self._createRequest()
req.setApplicationServer('foo')
- self.assertEquals(req._app_server, 'http://foo')
+ self.assertEqual(req._app_server, 'http://foo')
req.setApplicationServer('foo', proto='https')
- self.assertEquals(req._app_server, 'https://foo')
+ self.assertEqual(req._app_server, 'https://foo')
req.setApplicationServer('foo', proto='https', port=8080)
- self.assertEquals(req._app_server, 'https://foo:8080')
+ self.assertEqual(req._app_server, 'https://foo:8080')
req.setApplicationServer('foo', proto='http', port='9673')
- self.assertEquals(req._app_server, 'http://foo:9673')
+ self.assertEqual(req._app_server, 'http://foo:9673')
req.setApplicationServer('foo', proto='https', port=443)
- self.assertEquals(req._app_server, 'https://foo')
+ self.assertEqual(req._app_server, 'https://foo')
req.setApplicationServer('foo', proto='https', port='443')
- self.assertEquals(req._app_server, 'https://foo')
+ self.assertEqual(req._app_server, 'https://foo')
req.setApplicationServer('foo', port=80)
- self.assertEquals(req._app_server, 'http://foo')
+ self.assertEqual(req._app_server, 'http://foo')
req.setApplicationServer('foo', proto='telnet', port=80)
- self.assertEquals(req._app_server, 'telnet://foo:80')
+ self.assertEqual(req._app_server, 'telnet://foo:80')
zope.event.subscribers.pop()
- self.assertEquals(len(events), 8)
+ self.assertEqual(len(events), 8)
for event in events:
- self.assertEquals(event.request, req)
+ self.assertEqual(event.request, req)
def test_setApplicationNames(self):
events = []
@@ -556,12 +561,12 @@
req = self._createRequest()
names = ['x', 'y', 'z']
req.setVirtualHostRoot(names)
- self.assertEquals(req._app_names, ['x', 'y', 'z'])
+ self.assertEqual(req._app_names, ['x', 'y', 'z'])
names[0] = 'muahahahaha'
- self.assertEquals(req._app_names, ['x', 'y', 'z'])
+ self.assertEqual(req._app_names, ['x', 'y', 'z'])
zope.event.subscribers.pop()
- self.assertEquals(len(events), 1)
- self.assertEquals(events[0].request, req)
+ self.assertEqual(len(events), 1)
+ self.assertEqual(events[0].request, req)
def test_setVirtualHostRoot(self):
events = []
@@ -570,22 +575,22 @@
req._traversed_names = ['x', 'y']
req._last_obj_traversed = object()
req.setVirtualHostRoot()
- self.failIf(req._traversed_names)
- self.assertEquals(req._vh_root, req._last_obj_traversed)
+ self.assertFalse(req._traversed_names)
+ self.assertEqual(req._vh_root, req._last_obj_traversed)
zope.event.subscribers.pop()
- self.assertEquals(len(events), 1)
- self.assertEquals(events[0].request, req)
+ self.assertEqual(len(events), 1)
+ self.assertEqual(events[0].request, req)
def test_getVirtualHostRoot(self):
req = self._createRequest()
- self.assertEquals(req.getVirtualHostRoot(), None)
+ self.assertEqual(req.getVirtualHostRoot(), None)
req._vh_root = object()
- self.assertEquals(req.getVirtualHostRoot(), req._vh_root)
+ self.assertEqual(req.getVirtualHostRoot(), req._vh_root)
def test_traverse(self):
req = self._createRequest()
req.traverse(self.app)
- self.assertEquals(req._traversed_names, ['folder', 'item'])
+ self.assertEqual(req._traversed_names, ['folder', 'item'])
# setting it during traversal matters
req = self._createRequest()
@@ -594,8 +599,8 @@
req.setVirtualHostRoot()
req.publication.callTraversalHooks = hook
req.traverse(self.app)
- self.assertEquals(req._traversed_names, ['item'])
- self.assertEquals(req._vh_root, self.app.folder)
+ self.assertEqual(req._traversed_names, ['item'])
+ self.assertEqual(req._vh_root, self.app.folder)
def test_traverseDuplicateHooks(self):
"""
@@ -620,7 +625,7 @@
req.setPublication(publication)
req.setTraversalStack(req.getTraversalStack() + ["vh"])
req.traverse(self.app)
- self.assertEquals(len(hooks), 3)
+ self.assertEqual(len(hooks), 3)
def testInterface(self):
from zope.publisher.interfaces.http import IHTTPCredentials
@@ -634,21 +639,21 @@
req = self._createRequest()
deduceServerURL = req._HTTPRequest__deduceServerURL
req._environ = {'HTTP_HOST': 'example.com:80'}
- self.assertEquals(deduceServerURL(), 'http://example.com')
+ self.assertEqual(deduceServerURL(), 'http://example.com')
req._environ = {'HTTP_HOST': 'example.com:8080'}
- self.assertEquals(deduceServerURL(), 'http://example.com:8080')
+ self.assertEqual(deduceServerURL(), 'http://example.com:8080')
req._environ = {'HTTP_HOST': 'example.com:443', 'HTTPS': 'on'}
- self.assertEquals(deduceServerURL(), 'https://example.com')
+ self.assertEqual(deduceServerURL(), 'https://example.com')
req._environ = {'HTTP_HOST': 'example.com:80', 'HTTPS': 'ON'}
- self.assertEquals(deduceServerURL(), 'https://example.com:80')
+ self.assertEqual(deduceServerURL(), 'https://example.com:80')
req._environ = {'HTTP_HOST': 'example.com:8080',
'SERVER_PORT_SECURE': '1'}
- self.assertEquals(deduceServerURL(), 'https://example.com:8080')
+ self.assertEqual(deduceServerURL(), 'https://example.com:8080')
req._environ = {'SERVER_NAME': 'example.com', 'SERVER_PORT':'8080',
'SERVER_PORT_SECURE': '0'}
- self.assertEquals(deduceServerURL(), 'http://example.com:8080')
+ self.assertEqual(deduceServerURL(), 'http://example.com:8080')
req._environ = {'SERVER_NAME': 'example.com'}
- self.assertEquals(deduceServerURL(), 'http://example.com')
+ self.assertEqual(deduceServerURL(), 'http://example.com')
def testUnicodeURLs(self):
# The request expects PATH_INFO to be utf-8 encoded when it gets it.
@@ -682,11 +687,11 @@
request.response.setResult(result)
body = request.response.consumeBody()
- self.assertEquals(request.response.getStatus(), 200)
- self.assertEquals(request.response.getHeader('Content-Type'),
+ self.assertEqual(request.response.getStatus(), 200)
+ self.assertEqual(request.response.getHeader('Content-Type'),
'text/plain;charset=utf-8')
- self.assertEquals(body,
- 'Latin a with ogonek\xc4\x85 Cyrillic ya \xd1\x8f')
+ self.assertEqual(body,
+ b'Latin a with ogonek\xc4\x85 Cyrillic ya \xd1\x8f')
class ConcreteHTTPTests(HTTPTests):
"""Tests that we don't have to worry about subclasses inheriting and
@@ -704,7 +709,7 @@
r = self._createRequest(extra_env={"PATH_INFO": "/xxx"})
publish(r, handle_errors=0)
r.shiftNameToApplication()
- self.assertEquals(r.getApplicationURL(), appurl+"/xxx")
+ self.assertEqual(r.getApplicationURL(), appurl+"/xxx")
# Verify that we can only shift if we've traversed only a single name
r = self._createRequest(extra_env={"PATH_INFO": "/folder/item"})
@@ -726,13 +731,13 @@
return response
def _parseResult(self, response):
- return dict(response.getHeaders()), ''.join(response.consumeBody())
+ return dict(response.getHeaders()), response.consumeBody()
def _getResultFromResponse(self, body, charset='utf-8', headers=None):
response = self._createResponse()
assert(charset == 'utf-8')
if headers is not None:
- for hdr, val in headers.iteritems():
+ for hdr, val in headers.items():
response.setHeader(hdr, val)
response.setResult(body)
return self._parseResult(response)
@@ -744,7 +749,7 @@
response.setHeader('Content-Type', 'text/plain;charset=us-ascii')
# Output the data
- data = 'a'*10
+ data = b'a'*10
response.setResult(DirectResult(data))
headers, body = self._parseResult(response)
@@ -755,43 +760,43 @@
self.assertEqual(body, data)
# Make sure that no Content-Length header was added
- self.assert_('Content-Length' not in headers)
+ self.assertTrue('Content-Length' not in headers)
def testContentLength(self):
- eq = self.failUnlessEqual
+ eq = self.assertEqual
headers, body = self._getResultFromResponse("test", "utf-8",
{"content-type": "text/plain"})
eq("4", headers["Content-Length"])
- eq("test", body)
+ eq(b"test", body)
headers, body = self._getResultFromResponse(
u'\u0442\u0435\u0441\u0442', "utf-8",
{"content-type": "text/plain"})
eq("8", headers["Content-Length"])
- eq('\xd1\x82\xd0\xb5\xd1\x81\xd1\x82', body)
+ eq(b'\xd1\x82\xd0\xb5\xd1\x81\xd1\x82', body)
def testContentType(self):
- eq = self.failUnlessEqual
+ eq = self.assertEqual
- headers, body = self._getResultFromResponse("test", "utf-8")
+ headers, body = self._getResultFromResponse(b"test", "utf-8")
eq("", headers.get("Content-Type", ""))
- eq("test", body)
+ eq(b"test", body)
headers, body = self._getResultFromResponse(u"test",
headers={"content-type": "text/plain"})
eq("text/plain;charset=utf-8", headers["Content-Type"])
- eq("test", body)
+ eq(b"test", body)
headers, body = self._getResultFromResponse(u"test", "utf-8",
{"content-type": "text/html"})
eq("text/html;charset=utf-8", headers["Content-Type"])
- eq("test", body)
+ eq(b"test", body)
headers, body = self._getResultFromResponse(u"test", "utf-8",
{"content-type": "text/plain;charset=cp1251"})
eq("text/plain;charset=cp1251", headers["Content-Type"])
- eq("test", body)
+ eq(b"test", body)
# see https://bugs.launchpad.net/zope.publisher/+bug/98395
# RFC 3023 types and */*+xml output as unicode
@@ -799,37 +804,37 @@
headers, body = self._getResultFromResponse(u"test", "utf-8",
{"content-type": "text/xml"})
eq("text/xml;charset=utf-8", headers["Content-Type"])
- eq("test", body)
+ eq(b"test", body)
headers, body = self._getResultFromResponse(u"test", "utf-8",
{"content-type": "application/xml"})
eq("application/xml;charset=utf-8", headers["Content-Type"])
- eq("test", body)
+ eq(b"test", body)
headers, body = self._getResultFromResponse(u"test", "utf-8",
{"content-type": "text/xml-external-parsed-entity"})
eq("text/xml-external-parsed-entity;charset=utf-8",
headers["Content-Type"])
- eq("test", body)
+ eq(b"test", body)
headers, body = self._getResultFromResponse(u"test", "utf-8",
{"content-type": "application/xml-external-parsed-entity"})
eq("application/xml-external-parsed-entity;charset=utf-8",
headers["Content-Type"])
- eq("test", body)
+ eq(b"test", body)
# Mozilla XUL
headers, body = self._getResultFromResponse(u"test", "utf-8",
{"content-type": "application/vnd+xml"})
eq("application/vnd+xml;charset=utf-8", headers["Content-Type"])
- eq("test", body)
+ eq(b"test", body)
# end RFC 3023 / xml as unicode
- headers, body = self._getResultFromResponse("test", "utf-8",
+ headers, body = self._getResultFromResponse(b"test", "utf-8",
{"content-type": "image/gif"})
eq("image/gif", headers["Content-Type"])
- eq("test", body)
+ eq(b"test", body)
def _getCookieFromResponse(self, cookies):
# Shove the cookies through request, parse the Set-Cookie header
@@ -837,7 +842,7 @@
response = self._createResponse()
for name, value, kw in cookies:
response.setCookie(name, value, **kw)
- response.setResult('test')
+ response.setResult(b'test')
return [header[1]
for header in response.getHeaders()
if header[0] == "Set-Cookie"]
@@ -846,20 +851,20 @@
c = self._getCookieFromResponse([
('foo', 'bar', {}),
])
- self.failUnless('foo=bar;' in c or 'foo=bar' in c,
+ self.assertTrue('foo=bar;' in c or 'foo=bar' in c,
'foo=bar; not in %r' % c)
c = self._getCookieFromResponse([
('foo', 'bar', {}),
('alpha', 'beta', {}),
])
- self.failUnless('foo=bar;' in c or 'foo=bar' in c)
- self.failUnless('alpha=beta;' in c or 'alpha=beta' in c)
+ self.assertTrue('foo=bar;' in c or 'foo=bar' in c)
+ self.assertTrue('alpha=beta;' in c or 'alpha=beta' in c)
c = self._getCookieFromResponse([
('sign', u'\N{BIOHAZARD SIGN}', {}),
])
- self.failUnless((r'sign="\342\230\243";' in c) or
+ self.assertTrue((r'sign="\342\230\243";' in c) or
(r'sign="\342\230\243"' in c))
self.assertRaises(
@@ -878,17 +883,17 @@
'seCure': True,
}),
])[0]
- self.failUnless('foo=bar;' in c or 'foo=bar' in c)
- self.failUnless('expires=Sat, 12 Jul 2014 23:26:28 GMT;' in c, repr(c))
- self.failUnless('Domain=example.com;' in c)
- self.failUnless('Path=/froboz;' in c)
- self.failUnless('Max-Age=3600;' in c)
- self.failUnless('Comment=blah%3B%E2%98%A3?;' in c, repr(c))
- self.failUnless('secure;' in c or 'secure' in c)
+ self.assertTrue('foo=bar;' in c or 'foo=bar' in c)
+ self.assertTrue('expires=Sat, 12 Jul 2014 23:26:28 GMT;' in c, repr(c))
+ self.assertTrue('Domain=example.com;' in c)
+ self.assertTrue('Path=/froboz;' in c)
+ self.assertTrue('Max-Age=3600;' in c)
+ self.assertTrue('Comment=blah%3B%E2%98%A3?;' in c, repr(c))
+ self.assertTrue('secure;' in c or 'secure' in c)
c = self._getCookieFromResponse([('foo', 'bar', {'secure': False})])[0]
- self.failUnless('foo=bar;' in c or 'foo=bar' in c)
- self.failIf('secure' in c)
+ self.assertTrue('foo=bar;' in c or 'foo=bar' in c)
+ self.assertFalse('secure' in c)
def test_handleException(self):
response = HTTPResponse()
@@ -898,20 +903,20 @@
exc_info = sys.exc_info()
response.handleException(exc_info)
- self.assertEquals(response.getHeader("content-type"),
+ self.assertEqual(response.getHeader("content-type"),
"text/html;charset=utf-8")
- self.assertEquals(response.getStatus(), 500)
- self.assert_(response.consumeBody() in
- ["<html><head>"
- "<title><type 'exceptions.ValueError'></title></head>\n"
- "<body><h2><type 'exceptions.ValueError'></h2>\n"
- "A server error occurred.\n"
- "</body></html>\n",
- "<html><head><title>ValueError</title></head>\n"
- "<body><h2>ValueError</h2>\n"
- "A server error occurred.\n"
- "</body></html>\n"]
- )
+ self.assertEqual(response.getStatus(), 500)
+ self.assertTrue(response.consumeBody() in
+ [b"<html><head>"
+ b"<title><type 'exceptions.ValueError'></title></head>\n"
+ b"<body><h2><type 'exceptions.ValueError'></h2>\n"
+ b"A server error occurred.\n"
+ b"</body></html>\n",
+ b"<html><head><title>ValueError</title></head>\n"
+ b"<body><h2>ValueError</h2>\n"
+ b"A server error occurred.\n"
+ b"</body></html>\n"]
+ )
class APITests(BaseTestIPublicationRequest,
@@ -922,11 +927,11 @@
def _Test__new(self, environ=None, **kw):
if environ is None:
environ = kw
- return HTTPRequest(StringIO(''), environ)
+ return HTTPRequest(BytesIO(b''), environ)
def test_IApplicationRequest_bodyStream(self):
- request = HTTPRequest(StringIO('spam'), {})
- self.assertEqual(request.bodyStream.read(), 'spam')
+ request = HTTPRequest(BytesIO(b'spam'), {})
+ self.assertEqual(request.bodyStream.read(), b'spam')
# Needed by BaseTestIEnumerableMapping tests:
def _IEnumerableMapping__stateDict(self):
@@ -973,7 +978,8 @@
suite.addTest(unittest.makeSuite(TestHTTPResponse))
suite.addTest(unittest.makeSuite(HTTPInputStreamTests))
suite.addTest(DocFileSuite(
- '../httpresults.txt', setUp=cleanUp, tearDown=cleanUp))
+ '../httpresults.txt', setUp=cleanUp, tearDown=cleanUp,
+ checker=output_checker))
suite.addTest(unittest.makeSuite(APITests))
return suite
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_ipublication.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_ipublication.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_ipublication.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -16,7 +16,7 @@
import sys
from unittest import TestCase, main, makeSuite
-from StringIO import StringIO
+from io import BytesIO
from zope.interface.verify import verifyObject
from zope.publisher.interfaces import IPublication
@@ -46,7 +46,7 @@
def _Test__request(self):
from zope.publisher.base import BaseRequest
- request = BaseRequest(StringIO(''), {})
+ request = BaseRequest(BytesIO(b''), {})
request.setTraversalStack(['Engineering', 'ZopeCorp'])
publication = self._Test__new()
request.setPublication(publication)
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_mapply.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_mapply.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_mapply.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -16,6 +16,7 @@
import unittest
from zope.publisher.publish import mapply
+from zope.publisher._compat import PYTHON2
class MapplyTests(unittest.TestCase):
@@ -24,10 +25,10 @@
return '%d%d%d' % (a, b, c)
values = {'a':2, 'b':3, 'c':5}
v = mapply(compute, (), values)
- self.failUnlessEqual(v, '235')
+ self.assertEqual(v, '235')
v = mapply(compute, (7,), values)
- self.failUnlessEqual(v, '735')
+ self.assertEqual(v, '735')
def testClass(self):
values = {'a':2, 'b':3, 'c':5}
@@ -38,19 +39,32 @@
compute = __call__
cc = c()
v = mapply(cc, (), values)
- self.failUnlessEqual(v, '335')
+ self.assertEqual(v, '335')
del values['c']
v = mapply(cc.compute, (), values)
- self.failUnlessEqual(v, '334')
+ self.assertEqual(v, '334')
+ def testClassicClass(self):
+ if not PYTHON2:
+ # Classic classes are only available in py3
+ return
+
+ values = {'a':2, 'b':3}
+ class c(object):
+ a = 3
+ def __call__(self, b, c=4):
+ return '%d%d%d' % (self.a, b, c)
+ compute = __call__
+ cc = c()
+
class c2:
"""Must be a classic class."""
-
+
c2inst = c2()
c2inst.__call__ = cc
v = mapply(c2inst, (), values)
- self.failUnlessEqual(v, '334')
+ self.assertEqual(v, '334')
def test_suite():
loader = unittest.TestLoader()
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_principallogging.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_principallogging.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_principallogging.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -35,7 +35,7 @@
from zope.publisher.principallogging import PrincipalLogging
principal = PrincipalStub()
pl = PrincipalLogging(principal)
- self.assertEquals(pl.getLogMessage(), '\\xfc principal')
+ self.assertEqual(pl.getLogMessage(), '\\xfc principal')
def test_suite():
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_publisher.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_publisher.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_publisher.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -26,7 +26,7 @@
from zope.interface.verify import verifyClass
from zope.interface import implementedBy
-from StringIO import StringIO
+from io import BytesIO
class ErrorToRetry(Exception):
"""A sample exception that should be retried."""
@@ -65,7 +65,7 @@
publication = DefaultPublication(self.app)
path = path.split('/')
path.reverse()
- request = TestRequest(StringIO(''), **kw)
+ request = TestRequest(BytesIO(b''), **kw)
request.setTraversalStack(path)
request.setPublication(publication)
return request
@@ -86,7 +86,7 @@
provided=IReRaiseException)
def testImplementsIPublication(self):
- self.failUnless(IPublication.providedBy(
+ self.assertTrue(IPublication.providedBy(
DefaultPublication(self.app)))
def testInterfacesVerify(self):
@@ -95,11 +95,11 @@
def testTraversalToItem(self):
res = self._publisherResults('/folder/item')
- self.failUnlessEqual(res, 'item')
+ self.assertEqual(res, 'item')
res = self._publisherResults('/folder/item/')
- self.failUnlessEqual(res, 'item')
+ self.assertEqual(res, 'item')
res = self._publisherResults('folder/item')
- self.failUnlessEqual(res, 'item')
+ self.assertEqual(res, 'item')
def testUnderscoreUnauthorizedException(self):
self.assertRaises(Unauthorized, self._publisherResults, '/_item')
@@ -134,7 +134,7 @@
except:
pass
self._unregisterExcAdapter(doReRaiseAdapter)
- self.failUnlessEqual(raised, True)
+ self.assertEqual(raised, True)
def testRetryErrorIsUnwrapped(self):
test = self
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_skinnable.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_skinnable.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_skinnable.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -14,9 +14,12 @@
##############################################################################
"""HTTP Publisher Tests
"""
+import re
import unittest
import doctest
+
import zope.testing
+from zope.testing.renormalizing import RENormalizing
def cleanUp(test):
@@ -24,9 +27,14 @@
def test_suite():
+ checker = RENormalizing([
+ # Python 3 includes module name in exceptions
+ (re.compile(r"__builtin__"), "builtins"),
+ ])
+
return unittest.TestSuite(
doctest.DocFileSuite('../skinnable.txt',
- setUp=cleanUp, tearDown=cleanUp))
+ setUp=cleanUp, tearDown=cleanUp, checker=checker))
if __name__ == '__main__':
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_xmlrpc.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_xmlrpc.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_xmlrpc.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -13,12 +13,17 @@
##############################################################################
"""Testing the XML-RPC Publisher code.
"""
+import sys
import doctest
-import xmlrpclib
import zope.component.testing
from zope.publisher import xmlrpc
from zope.security.checker import defineChecker, Checker, CheckerPublic
+if sys.version_info[0] == 2:
+ import xmlrpclib
+else:
+ import xmlrpc.client as xmlrpclib
+
def setUp(test):
zope.component.testing.setUp(test)
zope.component.provideAdapter(xmlrpc.ListPreMarshaller)
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_xmlrpcrequest.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_xmlrpcrequest.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_xmlrpcrequest.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -14,7 +14,7 @@
"""XML-RPC Request Tests
"""
import unittest
-from StringIO import StringIO
+from io import BytesIO
from zope.publisher.base import DefaultPublication
from zope.publisher.http import HTTPCharsets
@@ -39,7 +39,7 @@
XMLRPCRequest.__init__(self, *args, **kw)
-xmlrpc_call = u'''<?xml version='1.0'?>
+xmlrpc_call = b'''<?xml version='1.0'?>
<methodCall>
<methodName>action</methodName>
<params>
@@ -81,7 +81,7 @@
class Item(object):
def __call__(self, a, b):
- return "%s, %s" % (`a`, `b`)
+ return "%s, %s" % (repr(a), repr(b))
def doit(self, a, b):
return 'do something %s %s' % (a, b)
@@ -90,7 +90,7 @@
def action(self, a):
return "Parameter[type: %s; value: %s" %(
- type(a).__name__, `a`)
+ type(a).__name__, repr(a))
class Item2(object):
view = View()
@@ -109,7 +109,7 @@
env['CONTENT_LENGTH'] = str(len(body))
publication = Publication(self.app)
- instream = StringIO(body)
+ instream = BytesIO(body)
request = TestXMLRPCRequest(instream, env)
request.setPublication(publication)
return request
@@ -118,15 +118,15 @@
def testProcessInput(self):
req = self._createRequest({}, xmlrpc_call)
req.processInputs()
- self.failUnlessEqual(req.getPositionalArguments(), (1,))
- self.failUnlessEqual(tuple(req._path_suffix), ('action',))
+ self.assertEqual(req.getPositionalArguments(), (1,))
+ self.assertEqual(tuple(req._path_suffix), ('action',))
def testTraversal(self):
req = self._createRequest({}, xmlrpc_call)
req.processInputs()
action = req.traverse(self.app)
- self.failUnlessEqual(action(*req.getPositionalArguments()),
+ self.assertEqual(action(*req.getPositionalArguments()),
"Parameter[type: int; value: 1")
Modified: zope.publisher/trunk/src/zope/publisher/tests/test_zcml.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/tests/test_zcml.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/tests/test_zcml.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -13,7 +13,7 @@
##############################################################################
"""Tests for browser:defaultSkin and browser:defaultView directives
"""
-from cStringIO import StringIO
+from io import BytesIO
import doctest
import unittest
@@ -59,6 +59,10 @@
%s
</configure>"""
+def templated(contents):
+ body = template % contents
+ return BytesIO(body.encode('latin-1'))
+
class Test(cleanup.CleanUp, unittest.TestCase):
def setUp(self):
@@ -68,13 +72,13 @@
def testDefaultView(self):
self.assertTrue(
component.queryMultiAdapter((ob, request), IDefaultViewName) is None)
- xmlconfig(StringIO(template % (
+ xmlconfig(templated(
'''
<browser:defaultView
name="test"
for="zope.publisher.tests.test_zcml.IOb" />
'''
- )))
+ ))
self.assertEqual(getDefaultViewName(ob, request), 'test')
@@ -88,7 +92,7 @@
component.queryMultiAdapter((ob, request2), IDefaultViewName),
None)
- xmlconfig(StringIO(template % (
+ xmlconfig(templated(
'''
<browser:defaultView
for="zope.publisher.tests.test_zcml.IOb"
@@ -100,7 +104,7 @@
name="test2"
/>
'''
- )))
+ ))
self.assertEqual(
zope.publisher.defaultview.getDefaultViewName(ob, request2),
@@ -114,14 +118,14 @@
component.queryMultiAdapter((ob, request), IDefaultViewName),
None)
- xmlconfig(StringIO(template % (
+ xmlconfig(templated(
'''
<browser:defaultView
for="zope.publisher.tests.test_zcml.Ob"
name="test"
/>
'''
- )))
+ ))
self.assertEqual(
zope.publisher.defaultview.getDefaultViewName(ob, request),
@@ -134,7 +138,7 @@
None)
XMLConfig('meta.zcml', component)()
- xmlconfig(StringIO(template % (
+ xmlconfig(templated(
'''
<interface
interface="
@@ -156,7 +160,7 @@
factory="zope.publisher.tests.test_zcml.V2"
/>
'''
- )))
+ ))
# Simulate Zope Publication behavior in beforeTraversal()
adapters = component.getSiteManager().adapters
Modified: zope.publisher/trunk/src/zope/publisher/xmlrpc.py
===================================================================
--- zope.publisher/trunk/src/zope/publisher/xmlrpc.py 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/xmlrpc.py 2013-02-21 14:06:47 UTC (rev 129566)
@@ -18,9 +18,8 @@
__docformat__ = 'restructuredtext'
import sys
-import xmlrpclib
import datetime
-from StringIO import StringIO
+from io import StringIO
import zope.component
import zope.interface
@@ -31,6 +30,11 @@
from zope.publisher.http import HTTPRequest, HTTPResponse, DirectResult
from zope.security.proxy import isinstance
+if sys.version_info[0] == 2:
+ import xmlrpclib
+else:
+ import xmlrpc.client as xmlrpclib
+
@implementer(IXMLRPCRequest)
class XMLRPCRequest(HTTPRequest):
@@ -47,7 +51,7 @@
# Using lines() does not work as Twisted's BufferedStream sends back
# an empty stream here for read() (bug). Using readlines() does not
# work with paster.httpserver. However, readline() works fine.
- lines = ''
+ lines = b''
while True:
line = self._body_instream.readline()
if not line:
@@ -183,7 +187,7 @@
self.data = data
def __call__(self):
- raise Exception, "Not implemented"
+ raise Exception("Not implemented")
@zope.component.adapter(dict)
class DictPreMarshaller(PreMarshallerBase):
@@ -198,7 +202,7 @@
"""Pre-marshaller for list"""
def __call__(self):
- return map(premarshal, self.data)
+ return [premarshal(x) for x in self.data]
@zope.component.adapter(tuple)
class TuplePreMarshaller(ListPreMarshaller):
Modified: zope.publisher/trunk/src/zope/publisher/xmlrpc.txt
===================================================================
--- zope.publisher/trunk/src/zope/publisher/xmlrpc.txt 2013-02-21 13:58:05 UTC (rev 129565)
+++ zope.publisher/trunk/src/zope/publisher/xmlrpc.txt 2013-02-21 14:06:47 UTC (rev 129566)
@@ -38,10 +38,18 @@
So xmlrpclib will be happy. :)
+Import xmlrpclib depending on current python interpreter version
+
+ >>> import sys
+ >>> if sys.version_info[0] == 2:
+ ... import xmlrpclib
+ ... else:
+ ... import xmlrpc.client as xmlrpclib
+
+
We can also use premarshal to strip proxies off of Fault objects.
We have to make a security declaration first though:
- >>> import xmlrpclib
>>> fault = xmlrpclib.Fault(1, 'waaa')
>>> proxied_fault = ProxyFactory(fault)
>>> stripped_fault = premarshal(proxied_fault)
@@ -51,7 +59,7 @@
Standard python datetime objects are also handled:
>>> import datetime
- >>> sample = datetime.datetime(2006,06,17,21,41,00)
+ >>> sample = datetime.datetime(2006,6,17,21,41,00)
>>> stripped_date = premarshal(sample)
>>> isinstance(stripped_date, datetime.datetime)
False
@@ -61,8 +69,7 @@
We can also use premarshal to strip proxies off of Binary objects.
We have to make a security declaration first though:
- >>> import xmlrpclib
- >>> binary = xmlrpclib.Binary('foobar')
+ >>> binary = xmlrpclib.Binary(b'foobar')
>>> proxied_binary = ProxyFactory(binary)
>>> stripped_binary = premarshal(proxied_binary)
>>> type(stripped_binary) is Proxy
Copied: zope.publisher/trunk/tox.ini (from rev 129564, zope.publisher/branches/py3-attempt2/tox.ini)
===================================================================
--- zope.publisher/trunk/tox.ini (rev 0)
+++ zope.publisher/trunk/tox.ini 2013-02-21 14:06:47 UTC (rev 129566)
@@ -0,0 +1,45 @@
+[tox]
+envlist = py26,py27,py33
+
+[testenv]
+commands =
+ python setup.py test -q
+# without explicit deps, setup.py test will download a bunch of eggs into $PWD
+deps =
+ zope.browser
+ zope.component
+ zope.configuration
+ zope.contenttype >= 3.5
+ zope.event
+ zope.exceptions
+ zope.i18n
+ zope.interface
+ zope.location
+ zope.proxy
+ zope.security
+
+[testenv:coverage]
+basepython =
+ python2.7
+commands =
+# The installed version messes up nose's test discovery / coverage reporting
+# So, we uninstall that from the environment, and then install the editable
+# version, before running nosetests.
+ pip uninstall -y zope.publisher
+ pip install -e .
+ nosetests --with-xunit --with-xcoverage
+deps =
+ nose
+ coverage
+ nosexcover
+ zope.browser
+ zope.component
+ zope.configuration
+ zope.contenttype >= 3.5
+ zope.event
+ zope.exceptions
+ zope.i18n
+ zope.interface
+ zope.location
+ zope.proxy
+ zope.security
More information about the checkins
mailing list