[Zope3-checkins] CVS: Zope3/src/zope/app/rdb - meta.zcml:1.1 metaconfigure.py:1.1 __init__.py:1.20
Sidnei da Silva
sidnei@x3ng.com.br
Mon, 7 Jul 2003 13:15:32 -0400
Update of /cvs-repository/Zope3/src/zope/app/rdb
In directory cvs.zope.org:/tmp/cvs-serv17331/src/zope/app/rdb
Modified Files:
__init__.py
Added Files:
meta.zcml metaconfigure.py
Log Message:
Adding GlobalConnectionService, with tests (few, but theres not much to test anyway)
=== Added File Zope3/src/zope/app/rdb/meta.zcml ===
<zopeConfigure xmlns='http://namespaces.zope.org/zope'>
<serviceType id='SQLDatabaseConnections'
interface='zope.app.interfaces.rdb.IGlobalConnectionService' />
<service serviceType='SQLDatabaseConnections'
permission='zope.ManageContent'
component='zope.app.rdb.connectionService' />
<directives namespace="http://namespaces.zope.org/rdb">
<directive name="provideConnection" attributes="name component dsn"
handler="zope.app.rdb.metaconfigure.connectionhandler" />
</directives>
</zopeConfigure>
=== Added File Zope3/src/zope/app/rdb/metaconfigure.py ===
from zope.component import getService
from zope.configuration.action import Action
from zope.app.services.servicenames import SQLDatabaseConnections
def connectionhandler(_context, name, component, dsn):
component = _context.resolve(component)
connection = component(dsn)
return [
Action(
discriminator = ('provideConnection', name),
callable = provideConnection,
args = (name, connection),
)
]
def provideConnection(name, connection):
getService(None, SQLDatabaseConnections).provideConnection(name, connection)
=== Zope3/src/zope/app/rdb/__init__.py 1.19 => 1.20 ===
--- Zope3/src/zope/app/rdb/__init__.py:1.19 Fri Jul 4 09:52:27 2003
+++ Zope3/src/zope/app/rdb/__init__.py Mon Jul 7 13:14:55 2003
@@ -36,6 +36,7 @@
from zope.app.interfaces.rdb import IResultSet, ISQLCommand
from zope.app.interfaces.rdb import IZopeConnection, IZopeCursor
from zope.app.interfaces.rdb import IZopeDatabaseAdapter
+from zope.app.interfaces.rdb import IGlobalConnectionService
from zope.app.component.nextservice import getNextService
@@ -411,3 +412,47 @@
klass_namespace['__slots__'] = tuple(columns)
return type('GeneratedRowClass', (Row,), klass_namespace)
+
+class GlobalConnectionService:
+
+ implements(IGlobalConnectionService)
+
+ def __init__(self):
+ self.__registry = {}
+
+ def getConnection(self, name):
+ """Returns a connection object by name."""
+ dbadapter = self.__registry[name]
+ if dbadapter is not None:
+ return dbadapter()
+ raise KeyError, name
+
+ def queryConnection(self, name, default=None):
+ """Returns a connection object by name or default."""
+ try:
+ return self.getConnection(name)
+ except KeyError:
+ return default
+
+ def getAvailableConnections(self):
+ """Returns the connections available from this connection service."""
+ return self.__registry.keys()
+
+ def provideConnection(self, name, connection):
+ """ Register a connection instance for site-wide use """
+ self.__registry[name] = connection
+
+ _clear = __init__
+
+connectionService = GlobalConnectionService()
+getConnection = connectionService.getConnection
+queryConnection = connectionService.queryConnection
+getAvailableConnections = connectionService.getAvailableConnections
+provideConnection = connectionService.provideConnection
+
+_clear = connectionService._clear
+
+# Register our cleanup with Testing.CleanUp to make writing unit tests simpler.
+from zope.testing.cleanup import addCleanUp
+addCleanUp(_clear)
+del addCleanUp