[Zope3-checkins] CVS: Zope3/src/zope/app/session - __init__.py:1.1
browser.zcml:1.1 configure.zcml:1.1 interfaces.py:1.1
session.stx:1.1 tests.py:1.1
Stephan Richter
srichter at cosmos.phy.tufts.edu
Wed Mar 10 07:11:21 EST 2004
Update of /cvs-repository/Zope3/src/zope/app/session
In directory cvs.zope.org:/tmp/cvs-serv21016/src/zope/app/session
Added Files:
__init__.py browser.zcml configure.zcml interfaces.py
session.stx tests.py
Log Message:
Moved session code to zope.app.session. This ends the era of
zope.app.utilities. Added necessary module aliases for ZODBs to be preserved,
since SessionDataManagers are created in the bootstrap and every ZODB has one.
=== Added File Zope3/src/zope/app/session/__init__.py ===
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Simplistic session service implemented using cookies.
This is more of a demonstration than a full implementation, but it should
work.
$Id: __init__.py,v 1.1 2004/03/10 12:11:19 srichter Exp $
"""
import sha, time, string, random, hmac, logging
from UserDict import IterableUserDict
from heapq import heapify, heappop
from persistence import Persistent
from zope.server.http.http_date import build_http_date
from zope.interface import implements
from zope.interface.common.mapping import IMapping
from zope.app import zapi
from BTrees.OOBTree import OOBTree
from zope.app.interfaces.services.utility import ILocalUtility
from zope.app.interfaces.annotation import IAttributeAnnotatable
from interfaces import IBrowserIdManager, IBrowserId, ICookieBrowserIdManager, \
ISessionDataContainer, ISession
from zope.app.container.interfaces import IContained
cookieSafeTrans = string.maketrans("+/", "-.")
def digestEncode(s):
"""Encode SHA digest for cookie."""
return s.encode("base64")[:-2].translate(cookieSafeTrans)
class BrowserId(str):
"""A browser id"""
implements(IBrowserId)
class CookieBrowserIdManager(Persistent):
"""Session service implemented using cookies."""
implements(IBrowserIdManager, ICookieBrowserIdManager,
ILocalUtility, IAttributeAnnotatable,
)
__parent__ = __name__ = None
def __init__(self):
self.namespace = "zope3_cs_%x" % (int(time.time()) - 1000000000)
self.secret = "%.20f" % random.random()
self.cookieLifetime = None
def generateUniqueId(self):
"""Generate a new, random, unique id."""
data = "%.20f%.20f%.20f" % (random.random(), time.time(), time.clock())
digest = sha.sha(data).digest()
s = digestEncode(digest)
# we store a HMAC of the random value together with it, which makes
# our session ids unforgeable.
mac = hmac.new(s, self.secret, digestmod=sha).digest()
return BrowserId(s + digestEncode(mac))
def getRequestId(self, request):
"""Return the IBrowserId encoded in request or None if it's
non-existent."""
# If there is an id set on the response, use that but don't trust it.
# We need to check the response in case there has already been a new
# session created during the course of this request.
response_cookie = request.response.getCookie(self.namespace)
if response_cookie:
sid = response_cookie['value']
else:
sid = request.cookies.get(self.namespace)
if sid is None or len(sid) != 54:
return None
s, mac = sid[:27], sid[27:]
if (digestEncode(hmac.new(s, self.secret, digestmod=sha).digest())
!= mac):
return None
else:
return BrowserId(sid)
def setRequestId(self, request, id):
"""Set cookie with id on request."""
# XXX Currently, the path is the ApplicationURL. This is reasonable,
# and will be adequate for most purposes.
# A better path to use would be that of the folder that contains
# the service-manager this service is registered within. However,
# that would be expensive to look up on each request, and would
# have to be altered to take virtual hosting into account.
# Seeing as this service instance has a unique namespace for its
# cookie, using ApplicationURL shouldn't be a problem.
if self.cookieLifetime is not None:
if self.cookieLifetime:
expires = build_http_date(time.time() + self.cookieLifetime)
else:
expires = 'Tue, 19 Jan 2038 00:00:00 GMT'
request.response.setCookie(
self.namespace, id, expires=expires,
path=request.getApplicationURL(path_only=True)
)
else:
request.response.setCookie(
self.namespace, id,
path=request.getApplicationURL(path_only=True)
)
def getBrowserId(self, request):
''' See zope.app.interfaces.utilities.session.IBrowserIdManager '''
sid = self.getRequestId(request)
if sid is None:
sid = self.generateUniqueId()
self.setRequestId(request, sid)
return sid
class PersistentSessionDataContainer(Persistent, IterableUserDict):
''' A SessionDataContainer that stores data in the ZODB '''
__parent__ = __name__ = None
implements(ISessionDataContainer, ILocalUtility, IAttributeAnnotatable)
def __init__(self):
self.data = OOBTree()
self.sweepInterval = 5*60
def __getitem__(self, key):
rv = IterableUserDict.__getitem__(self, key)
now = time.time()
# Only update lastAccessTime once every few minutes, rather than
# every hit, to avoid ZODB bloat since this is being stored
# persistently
if rv.lastAccessTime + self.sweepInterval < now:
rv.lastAccessTime = now
# XXX: When scheduler exists, this method should just schedule
# a sweep later since we are currently busy handling a request
# and may end up doing simultaneous sweeps
self.sweep()
return rv
def sweep(self):
''' Clean out stale data '''
expire_time = time.time() - self.sweepInterval
heap = [(v.lastAccessTime, k) for k,v in self.data.items()]
heapify(heap)
while heap:
lastAccessTime, key = heappop(heap)
if lastAccessTime < expire_time:
del self.data[key]
else:
return
class SessionData(Persistent, IterableUserDict):
''' Mapping nodes in the ISessionDataContainer tree '''
implements(IMapping)
def __init__(self):
self.data = OOBTree()
self.lastAccessTime = time.time()
class Session(IterableUserDict):
implements(ISession)
def __init__(self, data_manager, browser_id, product_id):
''' See zope.app.interfaces.utilities.session.ISession '''
browser_id = str(browser_id)
product_id = str(product_id)
try:
data = data_manager[browser_id]
except KeyError:
data_manager[browser_id] = SessionData()
data_manager[browser_id][product_id] = SessionData()
self.data = data_manager[browser_id][product_id]
else:
try:
self.data = data[product_id]
except KeyError:
data[product_id] = SessionData()
self.data = data[product_id]
def getSession(context, request, product_id, session_data_container=None):
''' Retrieve an ISession. session_data_container defaults to
an ISessionDataContainer utility registered with the name product_id
XXX: This method will probably be changed when we have an
Interaction or other object that combines context & request
into a single object.
'''
if session_data_container is None:
dc = zapi.getUtility(context, ISessionDataContainer, product_id)
elif ISessionDataContainer.providedBy(session_data_container):
dc = session_data_container
else:
dc = zapi.getUtility(
context, ISessionDataContainer, session_data_container
)
bim = zapi.getUtility(context, IBrowserIdManager)
browser_id = bim.getBrowserId(request)
return Session(dc, browser_id, product_id)
=== Added File Zope3/src/zope/app/session/browser.zcml ===
<configure
xmlns:zope="http://namespaces.zope.org/zope"
xmlns="http://namespaces.zope.org/browser">
<!-- Cookie Browser Id Manager -->
<addMenuItem
title="Cookie Browser Id Manager"
description="Uses a cookie to uniquely identify a browser, allowing
state to be maintained between requests"
class=".CookieBrowserIdManager"
permission="zope.ManageContent" />
<!-- XXX: We want an add form, but namespace needs to default to a unique
cookie name
<addform
schema=".interfaces.ICookieBrowserIdManager"
label="Add a Cookie Browser ID Manager"
content_factory=".CookieBrowserIdManager"
name="zope.app.interfaces.utilities.session"
permission="zope.ManageContent" />
-->
<editform
schema=".interfaces.ICookieBrowserIdManager"
label="Cookie Browser ID Manager Properties"
name="edit.html" menu="zmi_views" title="Edit"
permission="zope.ManageContent" />
<!-- PersistentSessionDataContainer -->
<addMenuItem
title="Persistent Session Data Container"
description="Stores session data persistently in the ZODB"
class=".PersistentSessionDataContainer"
permission="zope.ManageContent" />
<editform
schema=".interfaces.ISessionDataContainer"
label="Persistent Session Data Container Properties"
name="edit.html" menu="zmi_views" title="Edit"
permission="zope.ManageContent" />
</configure>
=== Added File Zope3/src/zope/app/session/configure.zcml ===
<configure
xmlns="http://namespaces.zope.org/zope"
xmlns:browser="http://namespaces.zope.org/browser">
<!-- For backward compatibility -->
<modulealias
module="zope.app.session"
alias="zope.app.utilities.session"
/>
<modulealias
module="zope.app.session"
alias="zope.app.interfaces.utilities"
/>
<modulealias
module=".interfaces"
alias="zope.app.interfaces.utilities.session"
/>
<!-- Session machinery -->
<content class=".CookieBrowserIdManager">
<require
interface=".interfaces.ICookieBrowserIdManager"
permission="zope.Public" />
<require
set_schema=".interfaces.ICookieBrowserIdManager"
permission="zope.ManageContent" />
</content>
<content class=".SessionData">
<allow interface="zope.interface.common.mapping.IMapping" />
</content>
<content class=".PersistentSessionDataContainer">
<implements
interface=".interfaces.ISessionDataContainer"/>
<require
interface=".interfaces.ISessionDataContainer"
permission="zope.Public" />
<require
set_schema=".interfaces.ISessionDataContainer"
permission="zope.ManageContent" />
</content>
<include file="browser.zcml" />
</configure>
=== Added File Zope3/src/zope/app/session/interfaces.py ===
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Interfaces for session service.
$Id: interfaces.py,v 1.1 2004/03/10 12:11:19 srichter Exp $
"""
import re
from zope.interface import Interface
from zope.interface.common.mapping import IMapping, IReadMapping, IWriteMapping
from zope import schema
from zope.app.container.interfaces import IContainer
from zope.app.i18n import ZopeMessageIDFactory as _
class IBrowserIdManager(Interface):
"""Manages sessions - fake state over multiple browser requests."""
def getBrowserId(request):
"""Return the IBrowserId for the given request.
If the request doesn't have an attached sessionId a new one will be
generated.
This will do whatever is possible to do the HTTP request to ensure the
session id will be preserved. Depending on the specific method,
further action might be necessary on the part of the user. See the
documentation for the specific implementation and its interfaces.
"""
""" XXX: Want this
def invalidate(browser_id):
''' Expire the browser_id, and remove any matching ISessionData data
'''
"""
class ICookieBrowserIdManager(IBrowserIdManager):
"""Manages sessions using a cookie"""
namespace = schema.TextLine(
title=_('Cookie Name'),
description=_(
"Name of cookie used to maintain state. "
"Must be unique to the site domain name, and only contain "
"ASCII letters, digits and '_'"
),
required=True,
min_length=1,
max_length=30,
constraint=re.compile("^[\d\w_]+$").search,
)
cookieLifetime = schema.Int(
title=_('Cookie Lifetime'),
description=_(
"Number of seconds until the browser expires the cookie. "
"Leave blank expire the cookie when the browser is quit. "
"Set to 0 to never expire. "
),
min=0,
required=False,
default=None,
missing_value=None,
)
class IBrowserId(Interface):
"""A unique ID representing a session"""
def __str__():
"""As a unique ASCII string"""
class ISessionDataContainer(IReadMapping, IWriteMapping):
"""Stores data objects for sessions.
The object implementing this interface is responsible for expiring data as
it feels appropriate.
Used like::
session_data_container[browser_id][product_id][key] = value
Attempting to access a key that does not exist will raise a KeyError.
"""
timeout = schema.Int(
title=_(u"Timeout"),
description=_(
"Number of seconds before data becomes stale and may "
"be removed"),
default=3600,
required=True,
min=1,
)
sweepInterval = schema.Int(
title=_(u"Purge Interval"),
description=_(
"How often stale data is purged in seconds. "
"Higer values improve performance."
),
default=5*60,
required=True,
min=1,
)
class ISession(IMapping):
"""A session object that keeps the state of the user.
To access bits of data within an ISessionDataContainer, we
need to know the browser_id, the product_id, and the actual key.
An ISession is a wrapper around an ISessionDataContainer that
simplifies this by storing the browser_id and product_id enabling
access using just the key.
"""
def __init__(session_data_container, browser_id, product_id):
"""Construct an ISession"""
=== Added File Zope3/src/zope/app/session/session.stx ===
Session Support
---------------
Sessions allow us to fake state over a stateless protocol - HTTP. We do this
by having a unique identifier stored across multiple HTTP requests, be it
a cookie or some id mangled into the URL.
The IBrowserIdManager Utility provides this unique id. It is responsible
for propagating this id so that future requests from the browser get
the same id (eg. by setting an HTTP cookie)
ISessionDataContainer Utilities provide a mapping interface to store
session data. The ISessionDataContainer is responsible for expiring
data.
Python example::
>>> browser_id = getAdapter(request, IBrowserId))
>>> explicit_dm = getUtility(context, ISessionDataContainer,
... 'zopeproducts.fooprod')
>>> session = Session(explicit_dm, browser_id, 'zopeproducts.foorprod')
>>> session['color'] = 'red'
or....
>>> session = zapi.getSession(context, request, 'zopeproducts.fooprod')
>>> session['color'] = 'red'
Page Template example::
<tal:x condition="exists:session/zopeproducts.fooprod/count">
<tal:x condition="python:
session['zopeproducts.fooprod']['count'] += 1" />
</tal:x>
<tal:x condition="not:exists:session/zopeprodicts.fooprod/count">
<tal:x condition="python:
session['zopeproducts.fooprod']['count'] = 1 />
</tal:x>
<span content="session/zopeproducts.fooprod/count">6</span>
TODO
----
Do we want to provide one or more 'default' ISessionDataContainer's out of the
box (eg. 'persistant' and 'transient')?
=== Added File Zope3/src/zope/app/session/tests.py ===
# -*- coding: ascii -*-
##############################################################################
#
# Copyright (c) 2004 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
'''
$Id: tests.py,v 1.1 2004/03/10 12:11:19 srichter Exp $
'''
import unittest, doctest, time, rfc822
from zope.app import zapi
from zope.app.tests import ztapi
from zope.app.tests import setup
import zope.interface
from zope.app.interfaces.services.utility import ILocalUtility
from zope.app.services.utility import LocalUtilityService
from zope.app.services.servicenames import Utilities
from zope.app.interfaces.annotation import IAttributeAnnotatable
from zope.app.session.interfaces import \
IBrowserId, IBrowserIdManager, ISession, ISessionDataContainer
from zope.app.session import \
CookieBrowserIdManager, Session, SessionData, getSession, \
PersistentSessionDataContainer
from zope.publisher.interfaces.http import IHTTPRequest
from zope.publisher.http import HTTPRequest
def test_CookieBrowserIdManager():
"""
CookieBrowserIdManager.generateUniqueId should generate a unique
IBrowserId each time it is called
>>> bim = CookieBrowserIdManager()
>>> id1 = bim.generateUniqueId()
>>> id2 = bim.generateUniqueId()
>>> id1 != id2
True
>>> IBrowserId.providedBy(id1)
True
>>> IBrowserId.providedBy(id2)
True
CookieBrowserIdManager.getRequestId pulls the IBrowserId from an
IHTTPRequest, or returns None if there isn't one stored in it.
Because cookies cannnot be trusted, we confirm that they are not forged,
returning None if we have a corrupt or forged browser id.
>>> request = HTTPRequest(None, None, {}, None)
>>> bim.getRequestId(request) is None
True
>>> bim.setRequestId(request, id1)
>>> bim.getRequestId(request) == id1
True
>>> bim.setRequestId(request, 'invalid_id')
>>> bim.getRequestId(request) is None
True
Make sure that the same browser id is extracted from a cookie in
request (sent from the browser) and a cookie in request.response
(set during this transaction)
>>> request2 = HTTPRequest(None, None, {}, None)
>>> request2._cookies = request.response._cookies
>>> bim.getRequestId(request) == bim.getRequestId(request2)
True
CookieBrowserIdManager.getBrowserId pulls the IBrowserId from an
IHTTPRequest, or generates a new one and returns it after storing
it in the request.
>>> id3 = bim.getBrowserId(request)
>>> id4 = bim.getBrowserId(request)
>>> str(id3) == str(id4)
True
>>> id3 == id4
True
Confirm the path of the cookie is correct. The value being tested
for here will eventually change - it should be the URL to the
site containing the CookieBrowserIdManager
>>> cookie = request.response.getCookie(bim.namespace)
>>> cookie['path'] == request.getApplicationURL(path_only=True)
True
Confirm the expiry time of the cookie is correct.
Default is no expires.
>>> cookie.has_key('expires')
False
Expiry time of 0 means never (well - close enough)
>>> bim.cookieLifetime = 0
>>> request = HTTPRequest(None, None, {}, None)
>>> bid = bim.getBrowserId(request)
>>> cookie = request.response.getCookie(bim.namespace)
>>> cookie['expires']
'Tue, 19 Jan 2038 00:00:00 GMT'
>>> bim.cookieLifetime = 3600
>>> request = HTTPRequest(None, None, {}, None)
>>> bid = bim.getBrowserId(request)
>>> cookie = request.response.getCookie(bim.namespace)
>>> expires = time.mktime(rfc822.parsedate(cookie['expires']))
>>> expires > time.mktime(time.gmtime()) + 55*60
True
"""
def test_PersistentSessionIdContainer():
"""
Ensure mapping interface is working as expected
>>> sdc = PersistentSessionDataContainer()
>>> sdc['a']
Traceback (most recent call last):
File "<stdin>", line 1, in ?
File "/usr/python-2.3.3/lib/python2.3/UserDict.py", line 19, in __getitem__
def __getitem__(self, key): return self.data[key]
KeyError: 'a'
>>> sdc['a'] = SessionData()
>>> pdict = SessionData()
>>> sdc['a'] = pdict
>>> id(pdict) == id(sdc['a'])
True
>>> del sdc['a']
>>> sdc['a']
Traceback (most recent call last):
File "<stdin>", line 1, in ?
File "/usr/python-2.3.3/lib/python2.3/UserDict.py", line 19, in __getitem__
def __getitem__(self, key): return self.data[key]
KeyError: 'a'
>>> del sdc['a']
Traceback (most recent call last):
File "<stdin>", line 1, in ?
File "/usr/python-2.3.3/lib/python2.3/UserDict.py", line 21, in __delitem__
def __delitem__(self, key): del self.data[key]
KeyError: 'a'
Make sure stale data is removed
>>> sdc.sweepInterval = 60
>>> sdc[1], sdc[2] = sd1, sd2 = SessionData(), SessionData()
>>> ignore = sdc[1], sdc[2]
>>> sd1.lastAccessTime = sd1.lastAccessTime - 62
>>> sd2.lastAccessTime = sd2.lastAccessTime - 62
>>> ignore = sdc[1]
>>> sdc.get(2, 'stale')
'stale'
"""
def test_Session():
"""
>>> data_container = PersistentSessionDataContainer()
>>> session = Session(data_container, 'browser id', 'zopeproducts.foo')
>>> session['color']
Traceback (most recent call last):
File "<stdin>", line 1, in ?
File "zope/app/utilities/session.py", line 157, in __getitem__
return self._data[key]
KeyError: 'color'
>>> session['color'] = 'red'
>>> session['color']
'red'
And make sure no namespace conflicts...
>>> session2 = Session(data_container, 'browser id', 'zopeproducts.bar')
>>> session2['color'] = 'blue'
>>> session['color']
'red'
>>> session2['color']
'blue'
Test the rest of the dictionary interface...
>>> 'foo' in session
False
>>> 'color' in session
True
>>> session.get('size', 'missing')
'missing'
>>> session.get('color', 'missing')
'red'
>>> list(session.keys())
['color']
>>> list(session.values())
['red']
>>> list(session.items())
[('color', 'red')]
>>> len(session)
1
>>> [x for x in session]
['color']
>>> del session['color']
>>> session.get('color') is None
True
"""
def test_localutilities():
"""
Setup a placeful environment with a IBrowserIdManager
and ISessionDataContainer
>>> root = setup.placefulSetUp(site=True)
>>> setup.createStandardServices(root)
>>> sm = setup.createServiceManager(root)
>>> us = setup.addService(sm, Utilities, LocalUtilityService())
>>> idmanager = CookieBrowserIdManager()
>>> zope.interface.directlyProvides(idmanager,
... IAttributeAnnotatable, ILocalUtility)
>>> bim = setup.addUtility(
... sm, '', IBrowserIdManager, idmanager, 'test')
>>> pdc = PersistentSessionDataContainer()
>>> zope.interface.directlyProvides(pdc,
... IAttributeAnnotatable, ILocalUtility)
>>> sdc = setup.addUtility(sm, 'persistent', ISessionDataContainer, pdc)
>>> sdc = setup.addUtility(sm, 'products.foo',ISessionDataContainer, pdc)
>>> sdc = setup.addUtility(sm, 'products.bar', ISessionDataContainer, pdc)
>>> request = HTTPRequest(None, None, {}, None)
Make sure we can access utilities
>>> sdc = zapi.getUtility(root, ISessionDataContainer, 'persistent')
>>> bim = zapi.getUtility(root, IBrowserIdManager)
Make sure getSession works
>>> session1 = getSession(root, request, 'products.foo')
>>> session2 = getSession(root, request, 'products.bar', 'persistent')
>>> session3 = getSession(root, request, 'products.baz', pdc)
Make sure it returned sane values
>>> ISession.providedBy(session1)
True
>>> ISession.providedBy(session2)
True
>>> ISession.providedBy(session3)
True
Make sure that product_ids don't share a namespace
>>> session1['color'] = 'red'
>>> session2['color'] = 'blue'
>>> session1['color']
'red'
>>> session2['color']
'blue'
>>> setup.placefulTearDown()
>>> 'Thats all folks!'
'Thats all folks!'
"""
def test_suite():
return unittest.TestSuite((
doctest.DocTestSuite(),
))
if __name__ == '__main__':
unittest.main()
# vim: set filetype=python ts=4 sw=4 et si
More information about the Zope3-Checkins
mailing list