[CMF-checkins]
SVN: CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/
- Move all tests in test_Template304Handling into
test_CachingPolicyManager
Jens Vagelpohl
jens at dataflake.org
Sat Sep 10 11:09:54 EDT 2005
Log message for revision 38439:
- Move all tests in test_Template304Handling into test_CachingPolicyManager
- Rewrite tests formerly in test_Template304Handling so they do not rely
on ZopeTestCase.PortalTestCase anymore
- Remove a few overly long lines
Changed:
U CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/CachingPolicyManager.py
U CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/base/dummy.py
D CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/framework.py
U CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/test_CachingPolicyManager.py
D CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/test_Template304Handling.py
U CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/utils.py
-=-
Modified: CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/CachingPolicyManager.py
===================================================================
--- CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/CachingPolicyManager.py 2005-09-10 14:40:30 UTC (rev 38438)
+++ CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/CachingPolicyManager.py 2005-09-10 15:09:54 UTC (rev 38439)
@@ -295,7 +295,8 @@
if self.getNoCache():
control.append( 'no-cache' )
- headers.append(('Pragma', 'no-cache')) # tell HTTP 1.0 clients not to cache
+ # The following is for HTTP 1.0 clients
+ headers.append(('Pragma', 'no-cache'))
if self.getNoStore():
control.append( 'no-store' )
@@ -661,7 +662,6 @@
return ()
- # 304 handling helper
security.declareProtected( View, 'getHTTPCachingHeaders' )
def getModTimeAndETag( self, content, view_method, keywords, time=None):
""" Return the modification time and ETag for the content object,
Modified: CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/base/dummy.py
===================================================================
--- CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/base/dummy.py 2005-09-10 14:40:30 UTC (rev 38438)
+++ CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/base/dummy.py 2005-09-10 15:09:54 UTC (rev 38439)
@@ -49,6 +49,32 @@
return self._id
+class DummyType(DummyObject):
+ """ A Dummy Type object """
+
+ def __init__(self, id='Dummy Content', title='Dummy Content', actions=()):
+ """ To fake out some actions, pass in a sequence of tuples where the
+ first element represents the ID or alias of the action and the
+ second element is the path to the object to be invoked, such as
+ a page template.
+ """
+
+ self.id = id
+ self.title = title
+ self._actions = {}
+
+ self._setActions(actions)
+
+ def _setActions(self, actions=()):
+ for action_id, action_path in actions:
+ self._actions[action_id] = action_path
+
+ def Title(self):
+ return self.title
+
+ def queryMethodID(self, alias, default=None, context=None):
+ return self._actions.get(alias, default)
+
class DummyContent( PortalContent, Item ):
"""
A Dummy piece of PortalContent
@@ -277,12 +303,14 @@
Action Provider
"""
- _actions = ( DummyObject()
- , DummyObject()
- )
-
root = 'DummyTool'
+ view_actions = ( ('', 'dummy_view')
+ , ('view', 'dummy_view')
+ , ('(Default)', 'dummy_view')
+ )
+
+
def __init__(self, anon=1):
self.anon = anon
@@ -309,10 +337,12 @@
# TypesTool
def listTypeInfo(self, container=None):
- return ( DummyObject('Dummy Content'), )
+ typ = 'Dummy Content'
+ return ( DummyType(typ, title=typ, actions=self.view_actions), )
def getTypeInfo(self, contentType):
- return ( DummyObject('Dummy Content'), )
+ typ = 'Dummy Content'
+ return DummyType(typ, title=typ, actions=self.view_actions)
# WorkflowTool
test_notified = None
Deleted: CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/framework.py
===================================================================
--- CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/framework.py 2005-09-10 14:40:30 UTC (rev 38438)
+++ CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/framework.py 2005-09-10 15:09:54 UTC (rev 38439)
@@ -1,116 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2005 Zope Corporation and Contributors. All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-"""ZopeTestCase framework
-
-COPY THIS FILE TO YOUR 'tests' DIRECTORY.
-
-This version of framework.py will use the SOFTWARE_HOME
-environment variable to locate Zope and the Testing package.
-
-If the tests are run in an INSTANCE_HOME installation of Zope,
-Products.__path__ and sys.path with be adjusted to include the
-instance's Products and lib/python directories respectively.
-
-If you explicitly set INSTANCE_HOME prior to running the tests,
-auto-detection is disabled and the specified path will be used
-instead.
-
-If the 'tests' directory contains a custom_zodb.py file, INSTANCE_HOME
-will be adjusted to use it.
-
-If you set the ZEO_INSTANCE_HOME environment variable a ZEO setup
-is assumed, and you can attach to a running ZEO server (via the
-instance's custom_zodb.py).
-
-The following code should be at the top of every test module:
-
- import os, sys
- if __name__ == '__main__':
- execfile(os.path.join(sys.path[0], 'framework.py'))
-
-...and the following at the bottom:
-
- if __name__ == '__main__':
- framework()
-
-$Id: framework.py 30245 2005-05-05 09:50:09Z shh $
-"""
-
-__version__ = '0.2.4'
-
-# Save start state
-#
-__SOFTWARE_HOME = os.environ.get('SOFTWARE_HOME', '')
-__INSTANCE_HOME = os.environ.get('INSTANCE_HOME', '')
-
-if __SOFTWARE_HOME.endswith(os.sep):
- __SOFTWARE_HOME = os.path.dirname(__SOFTWARE_HOME)
-
-if __INSTANCE_HOME.endswith(os.sep):
- __INSTANCE_HOME = os.path.dirname(__INSTANCE_HOME)
-
-# Find and import the Testing package
-#
-if not sys.modules.has_key('Testing'):
- p0 = sys.path[0]
- if p0 and __name__ == '__main__':
- os.chdir(p0)
- p0 = ''
- s = __SOFTWARE_HOME
- p = d = s and s or os.getcwd()
- while d:
- if os.path.isdir(os.path.join(p, 'Testing')):
- zope_home = os.path.dirname(os.path.dirname(p))
- sys.path[:1] = [p0, p, zope_home]
- break
- p, d = s and ('','') or os.path.split(p)
- else:
- print 'Unable to locate Testing package.',
- print 'You might need to set SOFTWARE_HOME.'
- sys.exit(1)
-
-import Testing, unittest
-execfile(os.path.join(os.path.dirname(Testing.__file__), 'common.py'))
-
-# Include ZopeTestCase support
-#
-if 1: # Create a new scope
-
- p = os.path.join(os.path.dirname(Testing.__file__), 'ZopeTestCase')
-
- if not os.path.isdir(p):
- print 'Unable to locate ZopeTestCase package.',
- print 'You might need to install ZopeTestCase.'
- sys.exit(1)
-
- ztc_common = 'ztc_common.py'
- ztc_common_global = os.path.join(p, ztc_common)
-
- f = 0
- if os.path.exists(ztc_common_global):
- execfile(ztc_common_global)
- f = 1
- if os.path.exists(ztc_common):
- execfile(ztc_common)
- f = 1
-
- if not f:
- print 'Unable to locate %s.' % ztc_common
- sys.exit(1)
-
-# Debug
-#
-print 'SOFTWARE_HOME: %s' % os.environ.get('SOFTWARE_HOME', 'Not set')
-print 'INSTANCE_HOME: %s' % os.environ.get('INSTANCE_HOME', 'Not set')
-sys.stdout.flush()
-
Modified: CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/test_CachingPolicyManager.py
===================================================================
--- CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/test_CachingPolicyManager.py 2005-09-10 14:40:30 UTC (rev 38438)
+++ CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/test_CachingPolicyManager.py 2005-09-10 15:09:54 UTC (rev 38439)
@@ -14,6 +14,8 @@
$Id$
"""
+import base64
+import os
from unittest import TestCase, TestSuite, makeSuite, main
import Testing
@@ -25,11 +27,24 @@
from App.Common import rfc1123_date
from DateTime.DateTime import DateTime
+from AccessControl.SecurityManagement import newSecurityManager
+from Products.CMFCore.tests.base.testcase import RequestTest
+from Products.CMFCore.tests.base.testcase import FSDVTest
+from Products.CMFCore.tests.test_FSPageTemplate import FSPTMaker
+from Products.CMFCore.tests.base.dummy import DummySite
+from Products.CMFCore.tests.base.dummy import DummyUserFolder
+from Products.CMFCore.tests.base.dummy import DummyContent
+from Products.CMFCore.tests.base.dummy import DummyTool
+from Products.CMFCore.FSPageTemplate import FSPageTemplate
+from Products.CMFCore import CachingPolicyManager
+
+
ACCLARK = DateTime( '2001/01/01' )
+portal_owner = 'portal_owner'
-class DummyContent:
+class DummyContent2:
__allow_access_to_unprotected_subobjects__ = 1
@@ -55,7 +70,7 @@
def _makeContext( self, **kw ):
from Products.CMFCore.CachingPolicyManager import createCPContext
- return createCPContext( DummyContent(self._epoch)
+ return createCPContext( DummyContent2(self._epoch)
, 'foo_view', kw, self._epoch )
def test_empty( self ):
@@ -369,7 +384,7 @@
mgr = self._makeOne()
self.assertEqual( len( mgr.listPolicies() ), 0 )
- headers = mgr.getHTTPCachingHeaders( content=DummyContent(self._epoch)
+ headers = mgr.getHTTPCachingHeaders( content=DummyContent2(self._epoch)
, view_method='foo_view'
, keywords={}
, time=self._epoch
@@ -457,7 +472,7 @@
def test_lookupNoMatch( self ):
mgr = self._makeOneWithPolicies()
- headers = mgr.getHTTPCachingHeaders( content=DummyContent(self._epoch)
+ headers = mgr.getHTTPCachingHeaders( content=DummyContent2(self._epoch)
, view_method='foo_view'
, keywords={}
, time=self._epoch
@@ -467,7 +482,7 @@
def test_lookupMatchFoo( self ):
mgr = self._makeOneWithPolicies()
- headers = mgr.getHTTPCachingHeaders( content=DummyContent(self._epoch)
+ headers = mgr.getHTTPCachingHeaders( content=DummyContent2(self._epoch)
, view_method='foo_view'
, keywords={ 'foo' : 1 }
, time=self._epoch
@@ -480,7 +495,7 @@
def test_lookupMatchBar( self ):
mgr = self._makeOneWithPolicies()
- headers = mgr.getHTTPCachingHeaders( content=DummyContent(self._epoch)
+ headers = mgr.getHTTPCachingHeaders( content=DummyContent2(self._epoch)
, view_method='foo_view'
, keywords={ 'bar' : 1 }
, time=self._epoch
@@ -498,7 +513,7 @@
def test_lookupMatchBaz( self ):
mgr = self._makeOneWithPolicies()
- headers = mgr.getHTTPCachingHeaders( content=DummyContent(self._epoch)
+ headers = mgr.getHTTPCachingHeaders( content=DummyContent2(self._epoch)
, view_method='foo_view'
, keywords={ 'baz' : 1 }
, time=self._epoch
@@ -519,7 +534,7 @@
def test_lookupMatchQux( self ):
mgr = self._makeOneWithPolicies()
- headers = mgr.getHTTPCachingHeaders( content=DummyContent(self._epoch)
+ headers = mgr.getHTTPCachingHeaders( content=DummyContent2(self._epoch)
, view_method='foo_view'
, keywords={ 'qux' : 1 }
, time=self._epoch
@@ -538,11 +553,255 @@
self.assertEqual( headers[2][1] , 'max-age=86400' )
+class CachingPolicyManager304Tests(RequestTest, FSDVTest):
+
+ def setUp(self):
+ RequestTest.setUp(self)
+ FSDVTest.setUp(self)
+
+ now = DateTime()
+
+ # Create a fake portal and the tools we need
+ self.portal = DummySite(id='portal').__of__(self.root)
+ self.portal._setObject('portal_types', DummyTool())
+
+ # This is a FSPageTemplate that will be used as the View for
+ # our content objects. It doesn't matter what it returns.
+ path = os.path.join(self.skin_path_name, 'testPT2.pt')
+ self.portal._setObject('dummy_view', FSPageTemplate('dummy_view', path))
+
+ uf = self.root.acl_users
+ password = 'secret'
+ uf.userFolderAddUser(portal_owner, password, ['Manager'], [])
+ user = uf.getUserById(portal_owner)
+ if not hasattr(user, 'aq_base'):
+ user = user.__of__(uf)
+ newSecurityManager(None, user)
+ owner_auth = '%s:%s' % (portal_owner, password)
+ self.auth_header = "Basic %s" % base64.encodestring(owner_auth)
+
+ self.portal._setObject('doc1', DummyContent('doc1'))
+ self.portal._setObject('doc2', DummyContent('doc2'))
+ self.portal._setObject('doc3', DummyContent('doc3'))
+ self.portal.doc1.modified_date = now
+ self.portal.doc2.modified_date = now
+ self.portal.doc3.modified_date = now
+
+ CachingPolicyManager.manage_addCachingPolicyManager(self.portal)
+ cpm = self.portal.caching_policy_manager
+
+ # This policy only applies to doc1. It will not emit any ETag header
+ # but it enables If-modified-since handling.
+ cpm.addPolicy(policy_id = 'policy_no_etag',
+ predicate = 'python:object.getId()=="doc1"',
+ mtime_func = '',
+ max_age_secs = 0,
+ no_cache = 0,
+ no_store = 0,
+ must_revalidate = 0,
+ vary = '',
+ etag_func = '',
+ enable_304s = 1)
+
+ # This policy only applies to doc2. It will emit an ETag with
+ # the constant value "abc" and also enable if-modified-since handling.
+ cpm.addPolicy(policy_id = 'policy_etag',
+ predicate = 'python:object.getId()=="doc2"',
+ mtime_func = '',
+ max_age_secs = 0,
+ no_cache = 0,
+ no_store = 0,
+ must_revalidate = 0,
+ vary = '',
+ etag_func = 'string:abc',
+ enable_304s = 1)
+
+ # This policy only applies to doc3. Etags with constant values of
+ # "abc" are emitted, but if-modified-since handling is turned off.
+ cpm.addPolicy(policy_id = 'policy_disabled',
+ predicate = 'python:object.getId()=="doc3"',
+ mtime_func = '',
+ max_age_secs = 0,
+ no_cache = 0,
+ no_store = 0,
+ must_revalidate = 0,
+ vary = '',
+ etag_func = 'string:abc',
+ enable_304s = 0)
+
+
+ def tearDown(self):
+ RequestTest.tearDown(self)
+ FSDVTest.tearDown(self)
+
+
+ def _cleanup(self):
+ # Clean up request and response
+ req = self.portal.REQUEST
+
+ for header in ( 'IF_MODIFIED_SINCE'
+ , 'HTTP_AUTHORIZATION'
+ , 'IF_NONE_MATCH'
+ ):
+ if req.environ.get(header, None) is not None:
+ del req.environ[header]
+
+ req.RESPONSE.setStatus(200)
+
+
+ def testUnconditionalGET(self):
+ # In this case the Request does not specify any if-modified-since
+ # value to take into account, thereby completely circumventing any
+ # if-modified-since handling. This must not produce a response status
+ # of 304, regardless of any other headers.
+ self.portal.doc1()
+ response = self.portal.REQUEST.RESPONSE
+ self.assertEqual(response.getStatus(), 200)
+
+
+ def testConditionalGETNoETag(self):
+ yesterday = DateTime() - 1
+ doc1 = self.portal.doc1
+ request = doc1.REQUEST
+ response = request.RESPONSE
+
+ # If doc1 has beeen modified since yesterday (which it has), we want
+ # the full rendering.
+ request.environ['IF_MODIFIED_SINCE'] = rfc1123_date(yesterday)
+ request.environ['HTTP_AUTHORIZATION'] = self.auth_header
+ doc1()
+ self.assertEqual(response.getStatus(), 200)
+ self._cleanup()
+
+ # If doc1 has been modified since its creation (which it hasn't), we
+ # want the full rendering. This must return a 304 response.
+ request.environ['IF_MODIFIED_SINCE'] = rfc1123_date(doc1.modified_date)
+ request.environ['HTTP_AUTHORIZATION'] = self.auth_header
+ doc1()
+ self.assertEqual(response.getStatus(), 304)
+ self._cleanup()
+
+ # ETag handling is not enabled in the policy for doc1, so asking for
+ # one will not produce any matches. We get the full rendering.
+ request.environ['IF_NONE_MATCH'] = '"123"'
+ request.environ['HTTP_AUTHORIZATION'] = self.auth_header
+ doc1()
+ self.assertEqual(response.getStatus(), 200)
+ self._cleanup()
+
+ # We are asking for an ETag as well as modifications after doc2 has
+ # been created. Both won't match and wwe get the full rendering.
+ request.environ['IF_NONE_MATCH'] = '"123"'
+ request.environ['IF_MODIFIED_SINCE'] = rfc1123_date(doc1.modified_date)
+ request.environ['HTTP_AUTHORIZATION'] = self.auth_header
+ doc1()
+ self.assertEqual(response.getStatus(), 200)
+ self._cleanup()
+
+
+ def testConditionalGETETag(self):
+ yesterday = DateTime() - 1
+ doc2 = self.portal.doc2
+ request = doc2.REQUEST
+ response = request.RESPONSE
+
+ # Has doc2 been modified since yesterday? Yes it has, so we get the
+ # full rendering.
+ request.environ['IF_MODIFIED_SINCE'] = rfc1123_date(yesterday)
+ request.environ['HTTP_AUTHORIZATION'] = self.auth_header
+ doc2()
+ self.assertEqual(response.getStatus(), 200)
+ self._cleanup()
+
+ # If doc2 has not been modified since its creation (which it hasn't),
+ # we would get a 304 here. However, the policy for doc2 also expects
+ # to get an ETag in the request, which we are not setting here. So
+ # the policy fails and we get a full rendering.
+ request.environ['IF_MODIFIED_SINCE'] = rfc1123_date(doc2.modified_date)
+ request.environ['HTTP_AUTHORIZATION'] = self.auth_header
+ doc2()
+ self.assertEqual(response.getStatus(), 200)
+ self._cleanup()
+
+ # Now we are setting an ETag in our request, but an ETag that does not
+ # match the policy's expected value. The policy fails and we get the
+ # full rendering.
+ request.environ['IF_NONE_MATCH'] = '"123"'
+ request.environ['HTTP_AUTHORIZATION'] = self.auth_header
+ doc2()
+ self.assertEqual(response.getStatus(), 200)
+ self._cleanup()
+
+ # Here we provide the correct and matching ETag value, and we don't
+ # specify any if-modified-since condition. This is enough for our
+ # policy to trigger 304.
+ request.environ['IF_NONE_MATCH'] = '"abc"'
+ request.environ['HTTP_AUTHORIZATION'] = self.auth_header
+ doc2()
+ self.assertEqual(response.getStatus(), 304)
+ self._cleanup()
+
+ # We specify an ETag and a modification time condition that dooes not
+ # match, so we get the full rendering
+ request.environ['IF_MODIFIED_SINCE'] = rfc1123_date(doc2.modified_date)
+ request.environ['IF_NONE_MATCH'] = '"123"'
+ request.environ['HTTP_AUTHORIZATION'] = self.auth_header
+ doc2()
+ self.assertEqual(response.getStatus(), 200)
+ self._cleanup()
+
+ # We hand in a matching modified time condition which is supposed to
+ # trigger full rendering. This will lead the ETag condition to be
+ # overrridden.
+ request.environ['IF_MODIFIED_SINCE'] = rfc1123_date(yesterday)
+ request.environ['IF_NONE_MATCH'] = '"abc"'
+ request.environ['HTTP_AUTHORIZATION'] = self.auth_header
+ doc2()
+ self.assertEqual(response.getStatus(), 200)
+ self._cleanup()
+
+ # Now we pass an ETag that matches the policy and a modified time
+ # condition that is not fulfilled. It is safe to serve a 304.
+ request.environ['IF_MODIFIED_SINCE'] = rfc1123_date(doc2.modified_date)
+ request.environ['IF_NONE_MATCH'] = '"abc"'
+ request.environ['HTTP_AUTHORIZATION'] = self.auth_header
+ doc2()
+ self.assertEqual(response.getStatus(), 304)
+ self._cleanup()
+
+
+ def testConditionalGETDisabled(self):
+ yesterday = DateTime() - 1
+ doc3 = self.portal.doc3
+ request = doc3.REQUEST
+ response = request.RESPONSE
+
+ # Our policy disables any 304-handling, so even though the ETag matches
+ # the policy, we will get the full rendering.
+ request.environ['IF_NONE_MATCH'] = '"abc"'
+ request.environ['HTTP_AUTHORIZATION'] = self.auth_header
+ doc3()
+ self.assertEqual(response.getStatus(), 200)
+ self._cleanup()
+
+ # Now both the ETag and the modified condition would trigger a 304
+ # response *if* 304-handling was enabled. It is not in our policy, so
+ # we get the full rendering again.
+ request.environ['IF_MODIFIED_SINCE'] = rfc1123_date(doc3.modified_date)
+ request.environ['IF_NONE_MATCH'] = '"abc"'
+ request.environ['HTTP_AUTHORIZATION'] = self.auth_header
+ doc3()
+ self.assertEqual(response.getStatus(), 200)
+ self._cleanup()
+
+
def test_suite():
return TestSuite((
makeSuite(CachingPolicyTests),
makeSuite(CachingPolicyManagerTests),
+ makeSuite(CachingPolicyManager304Tests),
))
if __name__ == '__main__':
main(defaultTest='test_suite')
+
Deleted: CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/test_Template304Handling.py
===================================================================
--- CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/test_Template304Handling.py 2005-09-10 14:40:30 UTC (rev 38438)
+++ CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/tests/test_Template304Handling.py 2005-09-10 15:09:54 UTC (rev 38439)
@@ -1,207 +0,0 @@
-##############################################################################
-#
-# Copyright (c) 2002 Zope Corporation and Contributors. All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-""" Unit tests for status 304 handling by FSPageTemplate.
-
-$Id: $
-"""
-
-import os, sys
-if __name__ == '__main__':
- execfile(os.path.join(sys.path[0], 'framework.py'))
-
-try:
- ZTC_available = True
- from Testing import ZopeTestCase
- from Testing.ZopeTestCase import Functional
- from Testing.ZopeTestCase import PortalTestCase
- from Testing.ZopeTestCase.PortalTestCase import portal_name
-except:
- ZTC_available = False
- class Functional:
- pass
- class PortalTestCase:
- pass
- portal_name = None
-
-if ZTC_available:
- # clean up any junk left over from test_ActionProviderBase
- from Products.CMFSetup import profile_registry
- profile_registry.clear()
-
- # set up the CMF
- ZopeTestCase.installProduct('CMFSetup')
- ZopeTestCase.installProduct('CMFCore')
- ZopeTestCase.installProduct('CMFDefault')
- ZopeTestCase.installProduct('ZCTextIndex')
- ZopeTestCase.installProduct('CMFCalendar')
- ZopeTestCase.installProduct('CMFTopic')
- ZopeTestCase.installProduct('DCWorkflow')
- ZopeTestCase.installProduct('MailHost')
- ZopeTestCase.installProduct('PageTemplates')
- ZopeTestCase.installProduct('PythonScripts')
- ZopeTestCase.installProduct('ExternalMethod')
-
-from App.Common import rfc1123_date
-from DateTime import DateTime
-from AccessControl.SecurityManagement import newSecurityManager
-
-portal_owner = 'portal_owner'
-
-from Products.CMFCore import CachingPolicyManager
-
-
-class TestTemplate304Handling(Functional, PortalTestCase):
-
- def setUp(self):
- ZopeTestCase.PortalTestCase.setUp(self)
-
- def getPortal(self):
- # this is used by the framework to set things up
- # do not call it directly -- use self.portal instead
- if getattr(self.app, portal_name, None) is None:
- self.app.manage_addProduct['CMFDefault'].manage_addCMFSite(portal_name)
- return getattr(self.app, portal_name)
-
- def afterSetUp(self):
- uf = self.app.acl_users
- password = 'secret'
- uf.userFolderAddUser(portal_owner, password, ['Manager'], [])
- user = uf.getUserById(portal_owner)
- if not hasattr(user, 'aq_base'):
- user = user.__of__(uf)
- newSecurityManager(None, user)
- self.owner_auth = '%s:%s' % (portal_owner, password)
-
- self.portal.invokeFactory('Document', 'doc1')
- self.portal.invokeFactory('Document', 'doc2')
- self.portal.invokeFactory('Document', 'doc3')
-
- CachingPolicyManager.manage_addCachingPolicyManager(self.portal)
- cpm = self.portal.caching_policy_manager
-
- cpm.addPolicy(policy_id = 'policy_no_etag',
- predicate = 'python:object.getId()=="doc1"',
- mtime_func = '',
- max_age_secs = 0,
- no_cache = 0,
- no_store = 0,
- must_revalidate = 0,
- vary = '',
- etag_func = '',
- enable_304s = 1)
-
- cpm.addPolicy(policy_id = 'policy_etag',
- predicate = 'python:object.getId()=="doc2"',
- mtime_func = '',
- max_age_secs = 0,
- no_cache = 0,
- no_store = 0,
- must_revalidate = 0,
- vary = '',
- etag_func = 'string:abc',
- enable_304s = 1)
-
- cpm.addPolicy(policy_id = 'policy_disabled',
- predicate = 'python:object.getId()=="doc3"',
- mtime_func = '',
- max_age_secs = 0,
- no_cache = 0,
- no_store = 0,
- must_revalidate = 0,
- vary = '',
- etag_func = 'string:abc',
- enable_304s = 0)
-
-
- def testUnconditionalGET(self):
- content_path = '/' + self.portal.doc1.absolute_url(1) + '/document_view'
- request = self.portal.REQUEST
- response = self.publish(content_path, self.owner_auth)
- self.assertEqual(response.getStatus(), 200)
-
-
- def testConditionalGETNoETag(self):
- yesterday = DateTime() - 1
-
- doc1 = self.portal.doc1
-
- content_path = '/' + doc1.absolute_url(1) + '/document_view'
-
- response = self.publish(content_path, env={'IF_MODIFIED_SINCE': rfc1123_date(yesterday)}, basic=self.owner_auth)
- self.assertEqual(response.getStatus(), 200)
-
- response = self.publish(content_path, env={'IF_MODIFIED_SINCE': rfc1123_date(doc1.modified())}, basic=self.owner_auth)
- self.assertEqual(response.getStatus(), 304)
-
- response = self.publish(content_path, env={'IF_NONE_MATCH': '"123"' }, basic=self.owner_auth)
- self.assertEqual(response.getStatus(), 200)
-
- response = self.publish(content_path, env={'IF_MODIFIED_SINCE': rfc1123_date(doc1.modified()), 'IF_NONE_MATCH': '"123"'}, basic=self.owner_auth)
- self.assertEqual(response.getStatus(), 200)
-
-
- def testConditionalGETETag(self):
- yesterday = DateTime() - 1
-
- doc2 = self.portal.doc2
-
- content_path = '/' + doc2.absolute_url(1) + '/document_view'
- response = self.publish(content_path, env={'IF_MODIFIED_SINCE': rfc1123_date(yesterday)}, basic=self.owner_auth)
- self.assertEqual(response.getStatus(), 200)
-
- response = self.publish(content_path, env={'IF_MODIFIED_SINCE': rfc1123_date(doc2.modified())}, basic=self.owner_auth)
- self.assertEqual(response.getStatus(), 200) # should be 200 because we also have an etag
-
- response = self.publish(content_path, env={'IF_NONE_MATCH': '"123"' }, basic=self.owner_auth)
- self.assertEqual(response.getStatus(), 200)
-
- # etag matches, no modification time in request
- response = self.publish(content_path, env={'IF_NONE_MATCH': '"abc"' }, basic=self.owner_auth)
- self.assertEqual(response.getStatus(), 304)
-
- response = self.publish(content_path, env={'IF_MODIFIED_SINCE': rfc1123_date(doc2.modified()), 'IF_NONE_MATCH': '"123"'}, basic=self.owner_auth)
- self.assertEqual(response.getStatus(), 200)
-
- response = self.publish(content_path, env={'IF_MODIFIED_SINCE': rfc1123_date(yesterday), 'IF_NONE_MATCH': '"abc"'}, basic=self.owner_auth)
- self.assertEqual(response.getStatus(), 200)
-
- # etag matches, valid modification time in request
- response = self.publish(content_path, env={'IF_MODIFIED_SINCE': rfc1123_date(doc2.modified()), 'IF_NONE_MATCH': '"abc"'}, basic=self.owner_auth)
- self.assertEqual(response.getStatus(), 304)
-
-
- def testConditionalGETDisabled(self):
- yesterday = DateTime() - 1
-
- doc3 = self.portal.doc3
- content_path = '/' + doc3.absolute_url(1) + '/document_view'
-
- response = self.publish(content_path, env={'IF_NONE_MATCH': '"abc"' }, basic=self.owner_auth)
- self.assertEqual(response.getStatus(), 200)
-
- response = self.publish(content_path, env={'IF_MODIFIED_SINCE': rfc1123_date(doc3.modified()), 'IF_NONE_MATCH': '"abc"'}, basic=self.owner_auth)
- self.assertEqual(response.getStatus(), 200)
-
-
-def test_suite():
- if ZTC_available:
- from unittest import TestSuite, makeSuite
- suite = TestSuite()
- suite.addTest(makeSuite(TestTemplate304Handling))
- return suite
- else:
- print 'Warning: test_Template304Handling.py requires ZopeTestCase to run.'
- print 'ZopeTestCase is available from http://www.zope.org/Members/shh/ZopeTestCase and ships with Zope 2.8+'
-
-if __name__ == '__main__':
- framework()
Modified: CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/utils.py
===================================================================
--- CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/utils.py 2005-09-10 14:40:30 UTC (rev 38438)
+++ CMF/branches/geoffd-cachingpolicymanager-branch/CMFCore/utils.py 2005-09-10 15:09:54 UTC (rev 38439)
@@ -329,13 +329,15 @@
# Parse a string of etags from an If-None-Match header
# Code follows ZPublisher.HTTPRequest.parse_cookie
parse_etags_lock=allocate_lock()
-def parse_etags(text,
- result=None,
- etagre_quote = re.compile('(\s*\"([^\"]*)\"\s*,{0,1})'), # quoted etags (assumed separated by whitespace + a comma)
- etagre_noquote = re.compile('(\s*([^,]*)\s*,{0,1})'), # non-quoted etags (assumed separated by whitespace + a comma)
- acquire=parse_etags_lock.acquire,
- release=parse_etags_lock.release,
- ):
+def parse_etags( text
+ , result=None
+ # quoted etags (assumed separated by whitespace + a comma)
+ , etagre_quote = re.compile('(\s*\"([^\"]*)\"\s*,{0,1})')
+ # non-quoted etags (assumed separated by whitespace + a comma)
+ , etagre_noquote = re.compile('(\s*([^,]*)\s*,{0,1})')
+ , acquire=parse_etags_lock.acquire
+ , release=parse_etags_lock.release
+ ):
if result is None: result=[]
if not len(text):
@@ -431,15 +433,20 @@
return False
if if_modified_since:
- if not content_mod_time or mod_time_secs < 0 or mod_time_secs > if_modified_since:
+ if ( not content_mod_time or
+ mod_time_secs < 0 or
+ mod_time_secs > if_modified_since ):
return False
if client_etags:
- if not content_etag or (content_etag not in client_etags and '*' not in client_etags):
+ if ( not content_etag or
+ (content_etag not in client_etags and '*' not in client_etags) ):
return False
else:
- # If we generate an ETag, don't validate the conditional GET unless the client supplies an ETag
- # This may be more conservative than the spec requires, but we are already _way_ more conservative.
+ # If we generate an ETag, don't validate the conditional GET unless
+ # the client supplies an ETag
+ # This may be more conservative than the spec requires, but we are
+ # already _way_ more conservative.
if content_etag:
return False
More information about the CMF-checkins
mailing list