[Zope-Checkins] CVS: Zope/lib/python/Products/CoreSessionTracking/tests - makerequest.py:1.1.2.1 oneoff.py:1.1.2.1 testAutoExpireMapping.py:1.1.2.1 testSessionDataManager.py:1.1.2.1 testSessionIdManager.py:1.1.2.1
Matthew T. Kromer
matt@zope.com
Thu, 4 Oct 2001 15:34:29 -0400
Update of /cvs-repository/Zope/lib/python/Products/CoreSessionTracking/tests
In directory cvs.zope.org:/tmp/cvs-serv7585/tests
Added Files:
Tag: matt-CoreSessionTracking-branch
makerequest.py oneoff.py testAutoExpireMapping.py
testSessionDataManager.py testSessionIdManager.py
Log Message:
Adding more files from CoreSessionTracking
=== Added File Zope/lib/python/Products/CoreSessionTracking/tests/makerequest.py ===
##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
"""
Facilitates unit tests which requires an acquirable REQUEST from
ZODB objects
Usage:
import makerequest
app = makerequest.makerequest(Zope.app())
$Id: makerequest.py,v 1.1.2.1 2001/10/04 19:34:28 matt Exp $
"""
import os
from os import environ
from sys import stdin
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPResponse import HTTPResponse
from ZPublisher.BaseRequest import RequestContainer
def makerequest(app):
resp = HTTPResponse()
environ['SERVER_NAME']='foo'
environ['SERVER_PORT']='80'
environ['REQUEST_METHOD'] = 'GET'
req = HTTPRequest(stdin, environ, resp)
return app.__of__(RequestContainer(REQUEST = req))
=== Added File Zope/lib/python/Products/CoreSessionTracking/tests/oneoff.py ===
##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
import sys
sys.path.insert(0, '../../..')
from sys import stdin
from os import environ
import os
import time
os.chdir('../../..')
import Zope
from Products.CoreSessionTracking.SessionIdManager import SessionIdManager
from Products.CoreSessionTracking.SessionDataManager import SessionDataManager
from Products.CoreSessionTracking.SessionDataManager import SessionData
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPResponse import HTTPResponse
from ZPublisher.BaseRequest import RequestContainer
app = Zope.app()
sm=SessionDataManager(id='smtoo',
path='/nonundo_db/session_data_container',
timeout=20,
gc=0,
title='SessionThingToo')
app._setObject('smtoo', sm)
resp = HTTPResponse()
environ['SERVER_NAME']='fred'
environ['SERVER_PORT']='80'
req = HTTPRequest(stdin, environ, resp)
app = app.__of__(RequestContainer(REQUEST = req))
sd = SessionData('a')
assert sm.id == 'smtoo'
get_transaction().abort()
app._p_jar.close()
del app
=== Added File Zope/lib/python/Products/CoreSessionTracking/tests/testAutoExpireMapping.py ===
##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
import sys, os, time, whrandom
sys.path.insert(0, '../../..')
os.chdir('../../..')
import ZODB
from Products.CoreSessionTracking.AutoExpireMapping import AutoExpireMapping
from Products.CoreSessionTracking.SessionFanout import SessionFanout
from ExtensionClass import Base
from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
class TestAutoExpireMapping(TestCase):
def setUp(self):
self.errmargin = .20
self.timeout = 60
self.t = AutoExpireMapping(timeout_mins=self.timeout/60)
def tearDown(self):
self.t = None
del self.t
def testGetItemFails(self):
self.assertRaises(KeyError, self._getitemfail)
def _getitemfail(self):
return self.t[10]
def testGetReturnsDefault(self):
assert self.t.get(10) == None
assert self.t.get(10, 'foo') == 'foo'
def testSetItemGetItemWorks(self):
self.t[10] = 1
a = self.t[10]
assert a == 1, `a`
def testReplaceWorks(self):
self.t[10] = 1
assert self.t[10] == 1
self.t[10] = 2
assert self.t[10] == 2
def testHasKeyWorks(self):
self.t[10] = 1
assert self.t.has_key(10)
def testValuesWorks(self):
for x in range(10, 110):
self.t[x] = x
v = self.t.values()
v.sort()
assert len(v) == 100
i = 10
for x in v:
assert x == i
i = i + 1
def testKeysWorks(self):
for x in range(10, 110):
self.t[x] = x
v = self.t.keys()
v.sort()
assert len(v) == 100
i = 10
for x in v:
assert x == i
i = i + 1
def testItemsWorks(self):
for x in range(10, 110):
self.t[x] = x
v = self.t.items()
v.sort()
assert len(v) == 100
i = 10
for x in v:
assert x[0] == i
assert x[1] == i
i = i + 1
def testDeleteInvalidKeyRaisesKeyError(self):
self.assertRaises(KeyError, self._deletefail)
def _deletefail(self):
del self.t[10]
def donttestDeleteNoChildrenWorks(self):
self.t[5] = 6
self.t[2] = 10
self.t[6] = 12
self.t[1] = 100
self.t[3] = 200
self.t[10] = 500
self.t[4] = 99
del self.t[4]
assert lsubtract(self.t.keys(), [1,2,3,5,6,10]) == [], `self.t.keys()`
def donttestDeleteOneChildWorks(self):
self.t[5] = 6
self.t[2] = 10
self.t[6] = 12
self.t[1] = 100
self.t[3] = 200
self.t[10] = 500
self.t[4] = 99
del self.t[3]
assert lsubtract(self.t.keys(), [1,2,4,5,6,10]) == [], `self.t.keys()`
def donttestDeleteTwoChildrenNoInorderSuccessorWorks(self):
self.t[5] = 6
self.t[2] = 10
self.t[6] = 12
self.t[1] = 100
self.t[3] = 200
self.t[10] = 500
self.t[4] = 99
del self.t[2]
assert lsubtract(self.t.keys(),[1,3,4,5,6,10])==[], `self.t.keys()`
def donttestDeleteTwoChildrenInorderSuccessorWorks(self):
self.t[5] = 6
self.t[2] = 10
self.t[6] = 12
self.t[1] = 100
self.t[3] = 200
self.t[10] = 500
self.t[4] = 99
self.t[2.5] = 150
del self.t[2]
assert lsubtract(self.t.keys(),[1,2.5,3,4,5,6,10])==[], `self.t.keys()`
def donttestDeleteRootWorks(self):
self.t[5] = 6
self.t[2] = 10
self.t[6] = 12
self.t[1] = 100
self.t[3] = 200
self.t[10] = 500
self.t[4] = 99
self.t[2.5] = 150
del self.t[5]
assert lsubtract(self.t.keys(),[1,2,2.5,3,4,6,10])==[], `self.t.keys()`
def testRandomNonOverlappingInserts(self):
added = {}
r = range(10, 110)
for x in r:
k = whrandom.choice(r)
if not added.has_key(k):
self.t[k] = x
added[k] = 1
addl = added.keys()
addl.sort()
assert lsubtract(self.t.keys(),addl)==[], `self.t.keys()`
def testRandomOverlappingInserts(self):
added = {}
r = range(10, 110)
for x in r:
k = whrandom.choice(r)
self.t[k] = x
added[k] = 1
addl = added.keys()
addl.sort()
assert lsubtract(self.t.keys(), addl) ==[]
def testRandomDeletes(self):
r = range(10, 1010)
added = []
for x in r:
k = whrandom.choice(r)
self.t[k] = x
added.append(k)
deleted = []
for x in r:
k = whrandom.choice(r)
if self.t.has_key(k):
del self.t[k]
deleted.append(k)
if self.t.has_key(k):
print "had problems deleting %s" % k
badones = []
for x in deleted:
if self.t.has_key(x):
badones.append(x)
assert badones == [], (badones, added, deleted)
def testTargetedDeletes(self):
r = range(10, 1010)
for x in r:
k = whrandom.choice(r)
self.t[k] = x
for x in r:
try:
del self.t[x]
except KeyError:
pass
assert self.t.keys() == [], `self.t.keys()`
def testPathologicalRightBranching(self):
r = range(10, 1010)
for x in r:
self.t[x] = 1
assert lsubtract(self.t.keys(), r) == []
for x in r:
del self.t[x]
assert lsubtract(self.t.keys(), []) == [], self.t.keys()
def testPathologicalLeftBranching(self):
r = range(10, 1010)
revr = r[:]
revr.reverse()
for x in revr:
self.t[x] = 1
assert lsubtract(self.t.keys(),r) == []
for x in revr:
del self.t[x]
assert lsubtract(self.t.keys(),[]) == [], self.t.keys()
def donttestSuccessorChildParentRewriteExerciseCase(self):
add_order = [
85, 73, 165, 273, 215, 142, 233, 67, 86, 166, 235, 225, 255,
73, 175, 171, 285, 162, 108, 28, 283, 258, 232, 199, 260,
298, 275, 44, 261, 291, 4, 181, 285, 289, 216, 212, 129,
243, 97, 48, 48, 159, 22, 285, 92, 110, 27, 55, 202, 294,
113, 251, 193, 290, 55, 58, 239, 71, 4, 75, 129, 91, 111,
271, 101, 289, 194, 218, 77, 142, 94, 100, 115, 101, 226,
17, 94, 56, 18, 163, 93, 199, 286, 213, 126, 240, 245, 190,
195, 204, 100, 199, 161, 292, 202, 48, 165, 6, 173, 40, 218,
271, 228, 7, 166, 173, 138, 93, 22, 140, 41, 234, 17, 249,
215, 12, 292, 246, 272, 260, 140, 58, 2, 91, 246, 189, 116,
72, 259, 34, 120, 263, 168, 298, 118, 18, 28, 299, 192, 252,
112, 60, 277, 273, 286, 15, 263, 141, 241, 172, 255, 52, 89,
127, 119, 255, 184, 213, 44, 116, 231, 173, 298, 178, 196,
89, 184, 289, 98, 216, 115, 35, 132, 278, 238, 20, 241, 128,
179, 159, 107, 206, 194, 31, 260, 122, 56, 144, 118, 283,
183, 215, 214, 87, 33, 205, 183, 212, 221, 216, 296, 40,
108, 45, 188, 139, 38, 256, 276, 114, 270, 112, 214, 191,
147, 111, 299, 107, 101, 43, 84, 127, 67, 205, 251, 38, 91,
297, 26, 165, 187, 19, 6, 73, 4, 176, 195, 90, 71, 30, 82,
139, 210, 8, 41, 253, 127, 190, 102, 280, 26, 233, 32, 257,
194, 263, 203, 190, 111, 218, 199, 29, 81, 207, 18, 180,
157, 172, 192, 135, 163, 275, 74, 296, 298, 265, 105, 191,
282, 277, 83, 188, 144, 259, 6, 173, 81, 107, 292, 231,
129, 65, 161, 113, 103, 136, 255, 285, 289, 1
]
delete_order = [
276, 273, 12, 275, 2, 286, 127, 83, 92, 33, 101, 195,
299, 191, 22, 232, 291, 226, 110, 94, 257, 233, 215, 184,
35, 178, 18, 74, 296, 210, 298, 81, 265, 175, 116, 261,
212, 277, 260, 234, 6, 129, 31, 4, 235, 249, 34, 289, 105,
259, 91, 93, 119, 7, 183, 240, 41, 253, 290, 136, 75, 292,
67, 112, 111, 256, 163, 38, 126, 139, 98, 56, 282, 60, 26,
55, 245, 225, 32, 52, 40, 271, 29, 252, 239, 89, 87, 205,
213, 180, 97, 108, 120, 218, 44, 187, 196, 251, 202, 203,
172, 28, 188, 77, 90, 199, 297, 282, 141, 100, 161, 216,
73, 19, 17, 189, 30, 258
]
for x in add_order:
self.t[x] = 1
for x in delete_order:
try: del self.t[x]
except KeyError:
if self.t.has_key(x): assert 1==2,"failed to delete %s" % x
def testItemsGetExpired(self):
for x in range(10, 110):
self.t[x] = x
# these items will time out while we sleep
time.sleep(self.timeout * (self.errmargin+1))
for x in range(110, 210):
self.t[x] = x
assert len(self.t.keys()) == 100, len(self.t.keys())
# we should still have 100 - 199
for x in range(110, 210):
assert self.t[x] == x
# but we shouldn't have 0 - 100
for x in range(10, 110):
try: self.t[x]
except KeyError: pass
else: assert 1 == 2, x
def testChangingTimeoutWorks(self):
# 1 minute
for x in range(10, 110):
self.t[x] = x
time.sleep(self.timeout * (self.errmargin+1))
assert len(self.t.keys()) == 0, len(self.t.keys())
# 2 minutes
self.t._setTimeout(self.timeout/60*2)
self.t._reset()
for x in range(10, 110):
self.t[x] = x
time.sleep(self.timeout)
assert len(self.t.keys()) == 100, len(self.t.keys())
time.sleep(self.timeout * (self.errmargin+1))
assert len(self.t.keys()) == 0, len(self.t.keys())
# 3 minutes
self.t._setTimeout(self.timeout/60*3)
self.t._reset()
for x in range(10, 110):
self.t[x] = x
time.sleep(self.timeout)
assert len(self.t.keys()) == 100, len(self.t.keys())
time.sleep(self.timeout)
assert len(self.t.keys()) == 100, len(self.t.keys())
time.sleep(self.timeout * (self.errmargin+1))
assert len(self.t.keys()) == 0, len(self.t.keys())
def testGetItemDelaysTimeout(self):
for x in range(10, 110):
self.t[x] = x
# current bucket will become old after we sleep for a while.
time.sleep(self.timeout/2)
# these items will be added to the new current bucket by getitem
for x in range(10, 110):
self.t[x]
time.sleep(self.timeout/2)
assert len(self.t.keys()) == 100, len(self.t.keys())
for x in range(10, 110):
assert self.t[x] == x
def testSetItemDelaysTimeout(self):
for x in range(10, 110):
self.t[x] = x
# current bucket will become old after we sleep for a while.
time.sleep(self.timeout/2)
# these items will be added to the new current bucket by getitem
for x in range(10, 110):
self.t[x] = x + 1
time.sleep(self.timeout/2)
assert len(self.t.keys()) == 100, len(self.t.keys())
for x in range(10, 110):
assert self.t[x] == x + 1
def testGetDelaysTimeout(self):
for x in range(10, 110):
self.t[x] = x
# current bucket will become old after we sleep for a while.
time.sleep(self.timeout/2)
# these items will be added to the new current bucket by getitem
for x in range(10, 110):
self.t.get(x)
time.sleep(self.timeout/2)
assert len(self.t.keys()) == 100, len(self.t.keys())
for x in range(10, 110):
assert self.t[x] == x
def testLen(self):
added = {}
r = range(10, 1010)
for x in r:
k = whrandom.choice(r)
self.t[k] = x
added[k] = x
addl = added.keys()
addl.sort()
assert len(self.t) == len(addl), len(self.t)
def testResetWorks(self):
self.t[10] = 1
self.t._reset()
assert not self.t.get(10)
def testGetTimeoutMinutesWorks(self):
assert self.t.getTimeoutMinutes() == self.timeout / 60
self.t._setTimeout(10)
assert self.t.getTimeoutMinutes() == 10
class TestSessionFanout(TestAutoExpireMapping):
def setUp(self):
self.errmargin = .20
self.timeout = 60
class ExWrappedSessionFanout(Base, SessionFanout):
pass
self.t = ExWrappedSessionFanout(timeout_mins=self.timeout/60)
def lsubtract(l1, l2):
l1=list(l1)
l2=list(l2)
l = filter(lambda x, l1=l1: x not in l1, l2)
l = l + filter(lambda x, l2=l2: x not in l2, l1)
return l
def test_suite():
print "AutoExpireMapping tests take just about forever (10+ mins)"
testsuite = makeSuite(TestAutoExpireMapping, 'test')
testsuite2 = makeSuite(TestSessionFanout, 'test')
alltests = TestSuite((testsuite2, testsuite))
return alltests
if __name__ == '__main__':
runner = TextTestRunner()
runner.run(test_suite())
=== Added File Zope/lib/python/Products/CoreSessionTracking/tests/testSessionDataManager.py ===
##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
import sys, os, time
sys.path.insert(0, '../../..')
import makerequest
os.chdir('../../..')
import ZODB # in order to get Persistence.Persistent working
from Products.CoreSessionTracking.SessionIdManager import SessionIdManager
from Products.CoreSessionTracking.SessionDataManager import SessionDataManager
from Products.CoreSessionTracking.SessionData import SessionData
from Products.CoreSessionTracking.SessionDataManager import SessionDataManagerErr, WRITE_EVERY
from ZODB.POSException import InvalidObjectReference
from DateTime import DateTime
from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
sessionidmgr = 'session_id_mgr'
def f(sdo):
pass
class TestBase(TestCase):
def setUp(self):
import Zope
self.app = makerequest.makerequest(Zope.app())
del Zope
timeout = self.timeout = 1
sidmgr = SessionIdManager(sessionidmgr)
sm=SessionDataManager(
id='sm', path=None, timeout_mins=timeout, title='SessionThing',
onstart='/onstartf', onend='/onendf')
try: self.app._delObject(sessionidmgr)
except AttributeError: pass
try: self.app._delObject('sm')
except AttributeError: pass
try: self.app._delObject('byonend')
except AttributeError: pass
self.app._setObject(sessionidmgr, sidmgr)
self.app._setObject('sm', sm)
admin = self.app.acl_users.getUser('admin')
if admin is None:
raise "Need to define an 'admin' user before running these tests"
admin = admin.__of__(self.app.acl_users)
self.app.sm.changeOwnership(admin)
def tearDown(self):
get_transaction().abort()
self.app._p_jar.close()
self.app = None
del self.app
class TestSessionManager(TestBase):
def testHasId(self):
assert self.app.sm.id == 'sm'
def testHasTitle(self):
assert self.app.sm.title == 'SessionThing'
def testGetSessionDataNoCreate(self):
sd = self.app.sm.getSessionData(0)
assert sd is None, repr(sd)
def testGetSessionDataCreate(self):
sd = self.app.sm.getSessionData(1)
assert sd.__class__ is SessionData, repr(sd)
def testHasSessionData(self):
sd = self.app.sm.getSessionData()
assert self.app.sm.hasSessionData()
def testNewSessionDataObjectIsValid(self):
sdType = type(SessionData(1))
sd = self.app.sm.getSessionData()
assert type(getattr(sd, 'aq_base', sd)) is sdType
assert not hasattr(sd, '_invalid')
def testInvalidateSessionDataObject(self):
sd = self.app.sm.getSessionData()
sd.invalidate()
assert hasattr(sd, '_invalid')
def testSessionTokenIsSet(self):
sd = self.app.sm.getSessionData()
mgr = getattr(self.app, sessionidmgr)
assert mgr.hasToken()
def testBadExternalSDCPath(self):
self.app.sm.REQUEST['REQUEST_METHOD'] = 'GET' # fake out webdav
self.app.sm.setSessionDataContainerPath('/fudgeffoloo')
try:
self.app.sm.getSessionData()
except SessionDataManagerErr:
pass
else:
assert 1 == 2, self.app.sm.getSessionDataContainerPath()
def testInvalidateSessionDataObject(self):
sd = self.app.sm.getSessionData()
sd.invalidate()
assert not self.app.sm.hasSessionData()
def testGhostUnghostSessionManager(self):
get_transaction().commit()
sd = self.app.sm.getSessionData()
sd.set('foo', 'bar')
self.app.sm._p_changed = None
get_transaction().commit()
assert self.app.sm.getSessionData().get('foo') == 'bar'
def testSubcommit(self):
sd = self.app.sm.getSessionData()
sd.set('foo', 'bar')
assert get_transaction().commit(1) == None
def testForeignObject(self):
self.assertRaises(InvalidObjectReference, self._foreignAdd)
def _foreignAdd(self):
ob = self.app.sm
sd = self.app.sm.getSessionData()
sd.set('foo', ob)
get_transaction().commit()
class TestTimeoutRelated(TestBase):
def testLastAccessed(self):
sdo = self.app.sm.getSessionData()
la1 = sdo.getLastAccessed()
time.sleep(WRITE_EVERY + 1)
sdo = self.app.sm.getSessionData()
assert sdo.getLastAccessed() > la1
def testOnStart(self):
self.app.sm.setOnStartPath('/onstartf')
sdo = self.app.sm.getSessionData()
now = DateTime()
k = sdo.get('a')
assert type(k) == type(now)
assert k <= now
def testOnEnd(self):
self.app.sm.setOnEndPath('/onendf')
sdo = self.app.sm.getSessionData()
timeout = self.timeout * 60
time.sleep(timeout + (timeout * .33))
sdo = self.app.sm.getSessionData()
assert hasattr(self.app, 'byonend')
if __name__ == '__main__':
suite = makeSuite(TestTimeoutRelated, 'test')
suite2 = makeSuite(TestSessionManager, 'test')
runner = TextTestRunner()
runner.run(suite2)
=== Added File Zope/lib/python/Products/CoreSessionTracking/tests/testSessionIdManager.py ===
##############################################################################
#
# Zope Public License (ZPL) Version 1.0
# -------------------------------------
#
# Copyright (c) Digital Creations. All rights reserved.
#
# This license has been certified as Open Source(tm).
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# 1. Redistributions in source code must retain the above copyright
# notice, this list of conditions, and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions, and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# 3. Digital Creations requests that attribution be given to Zope
# in any manner possible. Zope includes a "Powered by Zope"
# button that is installed by default. While it is not a license
# violation to remove this button, it is requested that the
# attribution remain. A significant investment has been put
# into Zope, and this effort will continue if the Zope community
# continues to grow. This is one way to assure that growth.
#
# 4. All advertising materials and documentation mentioning
# features derived from or use of this software must display
# the following acknowledgement:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# In the event that the product being advertised includes an
# intact Zope distribution (with copyright and license included)
# then this clause is waived.
#
# 5. Names associated with Zope or Digital Creations must not be used to
# endorse or promote products derived from this software without
# prior written permission from Digital Creations.
#
# 6. Modified redistributions of any form whatsoever must retain
# the following acknowledgment:
#
# "This product includes software developed by Digital Creations
# for use in the Z Object Publishing Environment
# (http://www.zope.org/)."
#
# Intact (re-)distributions of any official Zope release do not
# require an external acknowledgement.
#
# 7. Modifications are encouraged but must be packaged separately as
# patches to official Zope releases. Distributions that do not
# clearly separate the patches from the original work must be clearly
# labeled as unofficial distributions. Modifications which do not
# carry the name Zope may be packaged in any form, as long as they
# conform to all of the clauses above.
#
#
# Disclaimer
#
# THIS SOFTWARE IS PROVIDED BY DIGITAL CREATIONS ``AS IS'' AND ANY
# EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DIGITAL CREATIONS OR ITS
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
# SUCH DAMAGE.
#
#
# This software consists of contributions made by Digital Creations and
# many individuals on behalf of Digital Creations. Specific
# attributions are listed in the accompanying credits file.
#
##############################################################################
"""
Test suite for session id manager.
$Id: testSessionIdManager.py,v 1.1.2.1 2001/10/04 19:34:28 matt Exp $
"""
__version__ = "$Revision: 1.1.2.1 $"[11:-2]
import sys
sys.path.insert(0, '../../..')
sys.path.insert(0, '..')
import ZODB
from SessionIdManager import SessionIdManager, SessionIdManagerErr
from unittest import TestCase, TestSuite, TextTestRunner, makeSuite
from ZPublisher.HTTPRequest import HTTPRequest
from ZPublisher.HTTPResponse import HTTPResponse
from sys import stdin
from os import environ
class TestSessionIdManager(TestCase):
def setUp(self):
self.m = SessionIdManager('foo')
resp = HTTPResponse()
environ['SERVER_NAME']='fred'
environ['SERVER_PORT']='80'
req = HTTPRequest(stdin, environ, resp)
self.m.REQUEST = req
def tearDown(self):
del self.m
def testSetTokenKey(self):
self.m.setTokenKey('foo')
assert self.m.getTokenKey()== 'foo'
def testSetBadKeyString(self):
try:
self.m.setTokenKey('')
except SessionIdManagerErr:
pass
else:
assert 1 == 2
try:
self.m.setTokenKey(1)
except SessionIdManagerErr:
pass
else:
assert 1 == 2
def testSetBadNamespaces(self):
d = {1:'gummy', 2:'froopy'}
try:
self.m.setTokenKeyNamespaces(d)
except SessionIdManagerErr:
pass
else:
assert 1 == 2
def testSetGoodNamespaces(self):
d = {1:'cookies', 2:'form'}
self.m.setTokenKeyNamespaces(d)
assert self.m.getTokenKeyNamespaces() == d
def testSetBadCookiePath(self):
path = '/;'
try:
self.m.setCookiePath(path)
except SessionIdManagerErr:
pass
else:
assert 1 == 2
def testSetGoodCookiePath(self):
self.m.setCookiePath('/foo')
assert self.m.getCookiePath() == '/foo'
def testSetBadCookieLifeDays(self):
life = ''
try:
self.m.setCookieLifeDays('')
except SessionIdManagerErr:
pass
else:
assert 1 == 2
def testSetGoodCookieLifeDays(self):
self.m.setCookieLifeDays(1)
assert self.m.getCookieLifeDays() == 1
def testSetBadCookieDomain(self):
life = ''
try:
self.m.setCookieDomain('gubble')
except SessionIdManagerErr:
pass
else:
assert 1 == 2
def testSetGoodCookieLifeDays(self):
self.m.setCookieLifeDays(1)
assert self.m.getCookieLifeDays() == 1
def testSetNoCookieDomain(self):
domain = ''
self.m.setCookieDomain(domain)
setdomain = self.m.getCookieDomain()
assert setdomain == domain, "%s" % setdomain
def testSetBadCookieDomain(self):
domain = 'zope.org' # not enough dots
try:
self.m.setCookieDomain(domain)
except SessionIdManagerErr:
pass
else:
assert 1 == 2
domain = {1:1} # must be stringtype
try:
self.m.setCookieDomain(domain)
except SessionIdManagerErr:
pass
else:
assert 1 == 2
domain = '.zope.org;' # semicolon follows
try:
self.m.setCookieDomain(domain)
except SessionIdManagerErr:
pass
else:
assert 1 == 2
def testSetGoodCookieDomain(self):
self.m.setCookieDomain('.zope.org')
setdomain = self.m.getCookieDomain()
assert setdomain == '.zope.org', "%s" % setdomain
def testSetCookieSecure(self):
self.m.setCookieSecure(1)
assert self.m.getCookieSecure() == 1
def testDelegateToParent(self):
self.m.turnOff()
try:
a = self.m.hasToken()
except SessionIdManagerErr:
pass
else:
assert 1==2
def testGetTokenCookie(self):
token = self.m.getToken()
self.m.REQUEST.session_token_ = token
self.m.REQUEST.session_token_ns_ = 'cookies'
tokenkey = self.m.getTokenKey()
self.m.REQUEST.cookies[tokenkey] = token
a = self.m.getToken()
assert a == token, repr(a)
assert self.m.isTokenFromCookie()
def testSetSessionTokenDontCreate(self):
a = self.m.getToken(0)
assert a == None
def testSetSessionTokenCreate(self):
a = self.m.getToken(1)
tokenkey = self.m.getTokenKey()
b = self.m.REQUEST.RESPONSE.cookies[tokenkey]
assert a == b['value'], (a, b)
def testHasToken(self):
assert not self.m.hasToken()
a = self.m.getToken()
assert self.m.hasToken()
def testTokenIsNew(self):
a = self.m.getToken()
assert self.m.isTokenNew()
def testIsTokenFromCookieFirst(self):
token = self.m.getToken()
self.m.REQUEST.session_token_ = token
self.m.REQUEST.session_token_ns_ = 'cookies'
tokenkey = self.m.getTokenKey()
self.m.REQUEST.cookies[tokenkey] = token
self.m.setTokenKeyNamespaces({1:'cookies', 2:'form'})
a = self.m.getToken()
assert self.m.isTokenFromCookie()
def testIsTokenFromFormFirst(self):
token = self.m.getToken()
self.m.REQUEST.session_token_ = token
self.m.REQUEST.session_token_ns_ = 'form'
tokenkey = self.m.getTokenKey()
self.m.REQUEST.form[tokenkey] = token
self.m.setTokenKeyNamespaces({1:'form', 2:'cookies'})
a = self.m.getToken()
assert self.m.isTokenFromForm()
def testIsTokenFromCookieOnly(self):
token = self.m.getToken()
self.m.REQUEST.session_token_ = token
self.m.REQUEST.session_token_ns_ = 'cookies'
tokenkey = self.m.getTokenKey()
self.m.REQUEST.cookies[tokenkey] = token
self.m.setTokenKeyNamespaces({1:'cookies'})
a = self.m.getToken()
assert self.m.isTokenFromCookie()
def testIsTokenFromFormOnly(self):
token = self.m.getToken()
self.m.REQUEST.session_token_ = token
self.m.REQUEST.session_token_ns_ = 'form'
tokenkey = self.m.getTokenKey()
self.m.REQUEST.form[tokenkey] = token
self.m.setTokenKeyNamespaces({1:'form'})
a = self.m.getToken()
assert self.m.isTokenFromForm()
def testDelegateToParentFail(self):
self.m.turnOff()
try:
self.m.getToken()
except SessionIdManagerErr:
pass
else:
assert 1==2
def testDelegateToParentSucceed(self):
self.m.turnOff()
class foo:
pass
class bar:
def getToken(unself, create=1):
return 'worked'
fooi = foo()
bari = bar()
setattr(fooi, self.m.id, bari)
self.m.aq_parent = fooi
assert self.m.getToken() == 'worked'
def testEncodeUrl(self):
keystring = self.m.getTokenKey()
key = self.m.getToken()
u = '/home/chrism/foo'
r = self.m.encodeUrl(u)
assert r == '%s?%s=%s' % (u, keystring, key)
u = 'http://www.zope.org/Members/mcdonc?foo=bar&spam=eggs'
r = self.m.encodeUrl(u)
assert r == '%s&%s=%s' % (u, keystring, key)
if __name__ == '__main__':
testsuite = makeSuite(TestSessionIdManager, 'test')
runner = TextTestRunner()
runner.run(testsuite)