[Zope3-checkins] SVN: Zope3/trunk/src/zope/app/sqlexpr/ Updated
package.
Stephan Richter
srichter at cosmos.phy.tufts.edu
Tue Aug 24 17:18:55 EDT 2004
Log message for revision 27255:
Updated package.
Made tests package a module.
Handle many more user input errors than before.
Added tests to check for all failure cases.
Changed:
U Zope3/trunk/src/zope/app/sqlexpr/sqlexpr.py
D Zope3/trunk/src/zope/app/sqlexpr/tests/
A Zope3/trunk/src/zope/app/sqlexpr/tests.py
-=-
Modified: Zope3/trunk/src/zope/app/sqlexpr/sqlexpr.py
===================================================================
--- Zope3/trunk/src/zope/app/sqlexpr/sqlexpr.py 2004-08-24 20:33:03 UTC (rev 27254)
+++ Zope3/trunk/src/zope/app/sqlexpr/sqlexpr.py 2004-08-24 21:18:55 UTC (rev 27255)
@@ -15,71 +15,60 @@
$Id$
"""
-import re
+from zope.component.exceptions import ComponentLookupError
from zope.interface import implements
-from zope.component import getService, createObject
-from zope.app.rdb import queryForResults
from zope.tales.interfaces import ITALESExpression
-from zope.tales.expressions import NAME_RE
+from zope.tales.expressions import StringExpr
+from zope.app import zapi
+from zope.app.exception.interfaces import UserError
+from zope.app.rdb import queryForResults
+from zope.app.rdb.interfaces import IZopeDatabaseAdapter, IZopeConnection
-_interp = re.compile(r'\$(%(n)s)|\${(%(n)s(?:/[^}]*)*)}' % {'n': NAME_RE})
+class ConnectionError(UserError):
+ """This exception is raised when the user did not specify an RDB
+ connection."""
-class NoConnectionSpecified(Exception):
- pass
-
-class SQLExpr(object):
+class SQLExpr(StringExpr):
"""SQL Expression Handler class"""
- implements(ITALESExpression)
- def __init__(self, name, expr, engine):
- # Completely taken from StringExpr
- self._s = expr
- if '%' in expr:
- expr = expr.replace('%', '%%')
- self._vars = vars = []
- if '$' in expr:
- # Use whatever expr type is registered as "path".
- path_type = engine.getTypes()['path']
- parts = []
- for exp in expr.split('$$'):
- if parts: parts.append('$')
- m = _interp.search(exp)
- while m is not None:
- parts.append(exp[:m.start()])
- parts.append('%s')
- vars.append(path_type(
- 'path', m.group(1) or m.group(2), engine))
- exp = exp[m.end():]
- m = _interp.search(exp)
- if '$' in exp:
- raise CompilerError, (
- '$ must be doubled or followed by a simple path')
- parts.append(exp)
- expr = ''.join(parts)
- self._expr = expr
-
def __call__(self, econtext):
- vvals = []
- for var in self._vars:
- v = var(econtext)
- if isinstance(v, (str, unicode)):
- v = sql_quote(v)
- vvals.append(v)
-
if econtext.vars.has_key('sql_conn'):
# TODO: It is hard-coded that the connection name variable is called
# 'sql_conn'. We should find a better solution.
conn_name = econtext.vars['sql_conn']
- connection_service = getService("SQLDatabaseConnections",
- econtext.context)
- connection = connection_service.getConnection(conn_name)
+ adapter = zapi.queryUtility(IZopeDatabaseAdapter, conn_name)
+ if adapter is None:
+ raise ConnectionError, \
+ ("The RDB DA name, '%s' you specified is not "
+ "valid." %conn_name)
+ connection = adapter()
elif econtext.vars.has_key('rdb') and econtext.vars.has_key('dsn'):
rdb = econtext.vars['rdb']
dsn = econtext.vars['dsn']
- connection = createObject(None, rdb, dsn)()
+ try:
+ connection = zapi.createObject(None, rdb, dsn)()
+ except ComponentLookupError:
+ raise ConnectionError, \
+ ("The factory id, '%s', you specified in the `rdb` "
+ "attribute did not match any registered factory." %rdb)
+ except TypeError:
+ raise ConnectionError, \
+ ("The factory id, '%s', you specifed did not create a "
+ "Zope Database Adapter component." %rdb)
+ if not IZopeConnection.providedBy(connection):
+ raise ConnectionError, \
+ ("The factory id, '%s', you specifed did not create a "
+ "Zope Database Adapter component." %rdb)
else:
- raise NoConnectionSpecified
+ raise ConnectionError, \
+ 'You did not specify a RDB connection.'
+ vvals = []
+ for var in self._vars:
+ v = var(econtext)
+ if isinstance(v, (str, unicode)):
+ v = sql_quote(v)
+ vvals.append(v)
query = self._expr % tuple(vvals)
return queryForResults(connection, query)
Added: Zope3/trunk/src/zope/app/sqlexpr/tests.py
===================================================================
--- Zope3/trunk/src/zope/app/sqlexpr/tests.py 2004-08-24 20:33:03 UTC (rev 27254)
+++ Zope3/trunk/src/zope/app/sqlexpr/tests.py 2004-08-24 21:18:55 UTC (rev 27255)
@@ -0,0 +1,142 @@
+##############################################################################
+#
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""SQL Expression Type Tests
+
+$Id: test_sqlexpr.py 26878 2004-08-03 16:25:34Z jim $
+"""
+import unittest
+
+from zope.interface import implements
+from zope.component.factory import Factory
+from zope.component.interfaces import IFactory
+from zope.component.tests.placelesssetup import PlacelessSetup
+from zope.tales.tests.test_expressions import Data
+from zope.tales.engine import Engine
+
+from zope.app.tests import ztapi
+from zope.app.rdb.interfaces import IZopeDatabaseAdapter, IZopeConnection
+from zope.app.rdb.tests.stubs import ConnectionStub
+from zope.app.sqlexpr.sqlexpr import SQLExpr, ConnectionError
+
+
+class AdapterStub(object):
+ implements(IZopeDatabaseAdapter)
+
+ def __init__(self, dsn):
+ return
+
+ def __call__(self):
+ return ConnectionStub()
+
+class ConnectionStub(object):
+ implements(IZopeConnection)
+
+ def __init__(self):
+ self._called = {}
+
+ def cursor(self):
+ return CursorStub()
+
+ def answer(self):
+ return 42
+
+ def commit(self, *ignored):
+ v = self._called.setdefault('commit',0)
+ v += 1
+ self._called['commit'] = v
+
+ def rollback(self, *ignored):
+ v = self._called.setdefault('rollback',0)
+ v += 1
+ self._called['rollback'] = v
+
+class CursorStub(object):
+
+ description = (('id', 0, 0, 0, 0, 0, 0),
+ ('name', 0, 0, 0, 0, 0, 0),
+ ('email', 0, 0, 0, 0, 0, 0))
+
+
+ def fetchall(self, *args, **kw):
+ return ((1, 'Stephan', 'srichter'),
+ (2, 'Foo Bar', 'foobar'))
+
+ def execute(self, operation, *args, **kw):
+ if operation != 'SELECT num FROM hitchhike':
+ raise AssertionError(operation, 'SELECT num FROM hitchhike')
+
+
+class TypeInfoStub(object):
+ paramstyle = 'pyformat'
+ threadsafety = 0
+ def getConverter(self, type):
+ return lambda x: x
+
+
+class SQLExprTest(PlacelessSetup, unittest.TestCase):
+
+ def setUp(self):
+ super(SQLExprTest, self).setUp()
+ ztapi.provideUtility(IFactory, Factory(AdapterStub),
+ 'zope.da.Stub')
+ ztapi.provideUtility(IFactory, Factory(lambda x: None),
+ 'zope.Fake')
+ ztapi.provideUtility(IZopeDatabaseAdapter, AdapterStub(''),
+ 'test')
+
+ def test_exprUsingRDBAndDSN(self):
+ context = Data(vars = {'rdb': 'zope.da.Stub', 'dsn': 'dbi://test'})
+ expr = SQLExpr('name', 'SELECT num FROM hitchhike', Engine)
+ result = expr(context)
+ self.assertEqual(1, result[0].id)
+ self.assertEqual('Stephan', result[0].name)
+ self.assertEqual('srichter', result[0].email)
+ self.assertEqual('Foo Bar', result[1].name)
+
+ def test_exprUsingSQLConn(self):
+ context = Data(vars = {'sql_conn': 'test'})
+ expr = SQLExpr('name', 'SELECT num FROM hitchhike', Engine)
+ result = expr(context)
+ self.assertEqual(1, result[0].id)
+ self.assertEqual('Stephan', result[0].name)
+ self.assertEqual('srichter', result[0].email)
+ self.assertEqual('Foo Bar', result[1].name)
+
+ def test_exprUsingRDBAndDSN_InvalidFactoryId(self):
+ context = Data(vars = {'rdb': 'zope.da.Stub1', 'dsn': 'dbi://test'})
+ expr = SQLExpr('name', 'SELECT num FROM hitchhike', Engine)
+ self.assertRaises(ConnectionError, expr, context)
+
+ def test_exprUsingRDBAndDSN_WrongFactory(self):
+ context = Data(vars = {'rdb': 'zope.Fake', 'dsn': 'dbi://test'})
+ expr = SQLExpr('name', 'SELECT num FROM hitchhike', Engine)
+ self.assertRaises(ConnectionError, expr, context)
+
+ def test_exprUsingSQLConn_WrongId(self):
+ context = Data(vars = {'sql_conn': 'test1'})
+ expr = SQLExpr('name', 'SELECT num FROM hitchhike', Engine)
+ self.assertRaises(ConnectionError, expr, context)
+
+ def test_noRDBSpecs(self):
+ expr = SQLExpr('name', 'SELECT num FROM hitchhike', Engine)
+ self.assertRaises(ConnectionError, expr, Data(vars={}))
+
+
+def test_suite():
+ return unittest.TestSuite((
+ unittest.makeSuite(SQLExprTest),
+ ))
+
+if __name__ == '__main__':
+ unittest.main(defaultTest='test_suite')
More information about the Zope3-Checkins
mailing list