[Zope3-checkins] SVN: Zope3/branches/Zope-3.1/src/zope/app/rdb/
Merged revision 37757 from the trunk:
Dmitry Vasiliev
"dima athlabs.spb.ru" at zope.org
Sat Aug 6 08:42:55 EDT 2005
Log message for revision 37758:
Merged revision 37757 from the trunk:
Now SQL operation parameters will be properly encoded before executing
Changed:
U Zope3/branches/Zope-3.1/src/zope/app/rdb/__init__.py
U Zope3/branches/Zope-3.1/src/zope/app/rdb/tests/stubs.py
U Zope3/branches/Zope-3.1/src/zope/app/rdb/tests/test_zopecursor.py
-=-
Modified: Zope3/branches/Zope-3.1/src/zope/app/rdb/__init__.py
===================================================================
--- Zope3/branches/Zope-3.1/src/zope/app/rdb/__init__.py 2005-08-06 12:34:43 UTC (rev 37757)
+++ Zope3/branches/Zope-3.1/src/zope/app/rdb/__init__.py 2005-08-06 12:42:55 UTC (rev 37758)
@@ -286,15 +286,27 @@
def execute(self, operation, parameters=None):
"""Executes an operation, registering the underlying
connection with the transaction system. """
-
- if isinstance(operation, unicode):
- encoding = self.connection.getTypeInfo().getEncoding()
- operation = operation.encode(encoding)
+ operation, parameters = self._prepareOperation(operation, parameters)
self.connection.registerForTxn()
if parameters is None:
return self.cursor.execute(operation)
return self.cursor.execute(operation, parameters)
+ def _prepareOperation(self, operation, parameters):
+ encoding = self.connection.getTypeInfo().getEncoding()
+ if isinstance(operation, unicode):
+ operation = operation.encode(encoding)
+ if isinstance(parameters, (tuple, list)):
+ parameters = list(parameters)
+ for i, v in enumerate(parameters):
+ if isinstance(v, unicode):
+ parameters[i] = v.encode(encoding)
+ elif isinstance(parameters, dict):
+ for k, v in parameters.items():
+ if isinstance(v, unicode):
+ parameters[k] = v.encode(encoding)
+ return operation, parameters
+
def __getattr__(self, key):
return getattr(self.cursor, key)
Modified: Zope3/branches/Zope-3.1/src/zope/app/rdb/tests/stubs.py
===================================================================
--- Zope3/branches/Zope-3.1/src/zope/app/rdb/tests/stubs.py 2005-08-06 12:34:43 UTC (rev 37757)
+++ Zope3/branches/Zope-3.1/src/zope/app/rdb/tests/stubs.py 2005-08-06 12:42:55 UTC (rev 37758)
@@ -47,5 +47,11 @@
threadsafety = 0
encoding = 'utf-8'
+ def setEncoding(self, encoding):
+ self.encoding = encoding
+
+ def getEncoding(self):
+ return self.encoding
+
def getConverter(self, type):
return lambda x: x
Modified: Zope3/branches/Zope-3.1/src/zope/app/rdb/tests/test_zopecursor.py
===================================================================
--- Zope3/branches/Zope-3.1/src/zope/app/rdb/tests/test_zopecursor.py 2005-08-06 12:34:43 UTC (rev 37757)
+++ Zope3/branches/Zope-3.1/src/zope/app/rdb/tests/test_zopecursor.py 2005-08-06 12:42:55 UTC (rev 37758)
@@ -20,6 +20,7 @@
from zope.app.rdb import ZopeCursor
from zope.app.rdb.tests.stubs import *
+
class MyConnectionStub(ConnectionStub):
def cursor(self):
return MyCursorStub()
@@ -42,6 +43,10 @@
description = ((None, 'string'), (None, 'int'), (None, 'foo'))
+ def execute(self, query, args=None):
+ self.query = query
+ self.args = args
+
def fetchone(self):
if self._raw:
return self._raw[0]
@@ -85,7 +90,8 @@
class ZopeCursorTests(TestCase):
def setUp(self):
- zc = ZopeConnection(MyConnectionStub(), MyTypeInfoStub())
+ self.typeInfo = MyTypeInfoStub()
+ zc = ZopeConnection(MyConnectionStub(), self.typeInfo)
self.cursor = ZopeCursor(zc.conn.cursor(), zc)
def test_cursor_fetchone(self):
@@ -119,7 +125,34 @@
'got %r,\n'
'expected %r' % (results, expected))
+ def test_cursor_query_encoding(self):
+ self.cursor.execute(u'\u0422\u0435\u0441\u0442')
+ self.assertEqual('\xd0\xa2\xd0\xb5\xd1\x81\xd1\x82',
+ self.cursor.cursor.query)
+ self.typeInfo.setEncoding("windows-1251")
+ self.cursor.execute(u'\u0422\u0435\u0441\u0442')
+ self.assertEqual('\xd2\xe5\xf1\xf2', self.cursor.cursor.query)
+
+ def test_cursor_tuple_args_encoding(self):
+ self.typeInfo.setEncoding("windows-1251")
+ self.cursor.execute("SELECT * FROM table",
+ (u'\u0422\u0435\u0441\u0442',))
+ self.assertEqual('\xd2\xe5\xf1\xf2', self.cursor.cursor.args[0])
+
+ def test_cursor_list_args_encoding(self):
+ self.typeInfo.setEncoding("windows-1251")
+ self.cursor.execute("SELECT * FROM table",
+ [u'\u0422\u0435\u0441\u0442'])
+ self.assertEqual('\xd2\xe5\xf1\xf2', self.cursor.cursor.args[0])
+
+ def test_cursor_dict_args_encoding(self):
+ self.typeInfo.setEncoding("windows-1251")
+ self.cursor.execute("SELECT * FROM table",
+ {"value": u'\u0422\u0435\u0441\u0442'})
+ self.assertEqual('\xd2\xe5\xf1\xf2', self.cursor.cursor.args["value"])
+
+
def test_suite():
return makeSuite(ZopeCursorTests)
More information about the Zope3-Checkins
mailing list