[Zope-Checkins] CVS: Products/DCOracle2/test -
test_connections.py:1.1.2.1 test_DB.py:1.1.2.1
Chris Withers
cvs-admin at zope.org
Tue Nov 4 16:38:08 EST 2003
Update of /cvs-repository/Products/DCOracle2/test
In directory cvs.zope.org:/tmp/cvs-serv21272/test
Added Files:
Tag: chrisw_fixconnectionleak_branch
test_connections.py test_DB.py
Log Message:
First pass at Thread Pool connection model.
Also better handling for errors in DB.query method.
=== Added File Products/DCOracle2/test/test_connections.py ===
import unittest
import common
def _clearPool(c):
# clear pool
for connstring,connections in c.pool.items():
for junk,conn in connections:
if conn.isOpen():
conn.cursor().close()
conn.close()
del c.pool[connstring]
# clear assigned
for id,connpair in c.assigned.items():
conn = connpair[0]
if conn is not None: # None = dummu conn
if conn.isOpen():
conn.cursor().close()
conn.close()
del c.assigned[id]
# clear next id variable
c.next_id = 0
# clear the assigned count
for key in c.connection_count.keys():
del c.connection_count[key]
class test_connections(unittest.TestCase):
# white-box tests of the connection module
def setUp(self):
import Testing
import Zope
if hasattr(Zope,'startup'):
Zope.startup()
import Products.ZOracleDA.connections
self.c = Products.ZOracleDA.connections
_clearPool(self.c)
self.cs = common.getConnectionString()
def test_getConnection(self):
from Products.ZOracleDA.DCOracle2.DCOracle2 import connection
# get connection
id = self.c.assignConnection(self.cs)
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(len(self.c.pool[self.cs]),0)
self.assertEqual(len(self.c.assigned.keys()),1)
self.assertEqual(id,0)
conn1 = self.c.getConnection(id)
self.failUnless(isinstance(conn1,connection))
# get second connection
id2 = self.c.assignConnection(self.cs)
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(len(self.c.pool[self.cs]),0)
self.assertEqual(len(self.c.assigned.keys()),2)
self.assertEqual(id2,1)
conn2 = self.c.getConnection(id2)
self.failUnless(isinstance(conn2,connection))
# make sure they're different
self.failIf(conn1 is conn2)
def test_returnConnection(self):
# assign connections
id1 = self.c.assignConnection(self.cs)
conn1 = self.c.getConnection(id1)
id2 = self.c.assignConnection(self.cs)
# return connection
self.c.returnConnection(id1)
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(len(self.c.assigned.keys()),1)
# re-assign conection
id3 = self.c.assignConnection(self.cs)
conn2 = self.c.getConnection(id3)
self.failUnless(conn1 is conn2)
# return both connections
self.c.returnConnection(id2)
self.c.returnConnection(id3)
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(len(self.c.assigned.keys()),0)
self.assertEqual(len(self.c.pool[self.cs]),2)
def test_close(self):
# setup
id1 = self.c.assignConnection(self.cs)
id2 = self.c.assignConnection(self.cs)
conn1 = self.c.getConnection(id1)
conn2 = self.c.getConnection(id2)
self.c.returnConnection(id1)
# close
self.c.close()
# next
self.failIf(conn1.isOpen())
self.failIf(conn2.isOpen())
self.failIf(self.c.pool.keys())
self.failIf(self.c.assigned.keys())
self.failIf(self.c.connection_count.keys())
self.assertEqual(self.c.next_id,0)
def test_closeSpecific(self):
# setup
id1 = self.c.assignConnection(self.cs)
conn1 = self.c.getConnection(id1)
# setup dummy keys
self.c.pool['dummy']=[]
self.c.assigned[666]=(None,'dummy')
self.c.connection_count['dummy']=1
# close
self.c.close(self.cs)
# check one connection is gone, but other key remains
self.failIf(conn1.isOpen())
self.c.pool['dummy']
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(self.c.assigned[666],(None,'dummy'))
self.assertEqual(len(self.c.assigned.keys()),1)
self.assertEqual(self.c.connection_count['dummy'],1)
self.assertEqual(len(self.c.connection_count.keys()),1)
self.assertEqual(self.c.next_id,1)
def test_closeSpecificBad(self):
# call close with a string that has no open connections
# should not error...
self.c.close(self.cs)
def test_closeSpecificID(self):
# setup
id1 = self.c.assignConnection(self.cs)
conn1 = self.c.getConnection(id1)
id2 = self.c.assignConnection(self.cs)
conn2 = self.c.getConnection(id2)
id3 = self.c.assignConnection(self.cs)
conn3 = self.c.getConnection(id3)
self.c.returnConnection(id2)
# setup dummy keys
self.c.pool['dummy']=[]
self.c.assigned[666]=(None,'dummy')
self.c.connection_count['dummy']=1
# close
self.c.close(self.cs,id1)
self.c.close(self.cs,id3)
# check one connection is gone, but other 2 remain
self.failIf(conn1.isOpen())
self.failUnless(conn2.isOpen())
self.failIf(conn3.isOpen())
self.c.pool['dummy']
csp = self.c.pool[self.cs]
self.assertEqual(len(csp),1)
self.failUnless(csp[0]) is conn2
self.assertEqual(len(self.c.pool.keys()),2)
self.assertEqual(self.c.assigned[666],(None,'dummy'))
self.failIf(self.c.assigned.has_key(id3))
self.assertEqual(len(self.c.assigned.keys()),1)
self.assertEqual(self.c.connection_count['dummy'],1)
self.assertEqual(self.c.connection_count[self.cs],1)
self.assertEqual(len(self.c.connection_count.keys()),2)
self.assertEqual(self.c.next_id,3)
def test_count_assigned(self):
self.assertEqual(self.c.countConnections(self.cs),0)
id1 = self.c.assignConnection(self.cs)
self.assertEqual(self.c.countConnections(self.cs),1)
id2 = self.c.assignConnection(self.cs)
self.assertEqual(self.c.countConnections(self.cs),2)
self.c.returnConnection(id1)
self.assertEqual(self.c.countConnections(self.cs),2)
self.c.returnConnection(id2)
self.assertEqual(self.c.countConnections(self.cs),2)
self.c.assignConnection(self.cs)
self.c.assignConnection(self.cs)
self.assertEqual(self.c.countConnections(self.cs),2)
self.c.close()
self.assertEqual(self.c.countConnections(self.cs),0)
def test_conn_gone_away(self):
from Products.ZOracleDA.exceptions import ConnectionReleasedError
id = self.c.assignConnection(self.cs)
self.c.returnConnection(id)
self.assertRaises(ConnectionReleasedError,self.c.getConnection,id)
def test_no_conn_id(self):
from Products.ZOracleDA.exceptions import NoConnectionError
self.assertRaises(NoConnectionError,self.c.getConnection,None)
def test_conn_gone_return(self):
from Products.ZOracleDA.exceptions import ConnectionReleasedError
id = self.c.assignConnection(self.cs)
self.c.returnConnection(id)
self.assertRaises(ConnectionReleasedError,self.c.returnConnection,id)
def test_no_conn_id_return(self):
from Products.ZOracleDA.exceptions import NoConnectionError
self.assertRaises(NoConnectionError,self.c.returnConnection,None)
if __name__ == '__main__':
unittest.main()
=== Added File Products/DCOracle2/test/test_DB.py ===
import common
import unittest
class test_DB(unittest.TestCase):
def setUp(self):
import Testing
import Zope
if hasattr(Zope,'startup'):
Zope.startup()
from Products.ZOracleDA.db import DB
from Products.ZOracleDA import connections
from test_connections import _clearPool
_clearPool(connections)
self.DB = DB
self.c = connections
self.cs = common.getConnectionString()
def test_hosed_db(self):
DB = self.DB(self.cs)
# get connection
DB._begin()
conn = self.c.getConnection(DB._v_connection_id)
# hose it
def hosed_getDB():
from Products.ZOracleDA.DCOracle2 import DatabaseError
raise DatabaseError(3114,'ORA-03114: not connected to ORACLE')
DB.getDB=hosed_getDB
# query
from ZPublisher import Retry
self.assertRaises(Retry,DB.query,'SELECT * from EMP')
self.failIf(conn.isOpen())
# check the conn is dead and burried
for junk,theConn in self.c.pool.get(self.cs,[]):
self.failIf(conn is theConn)
for theConn,junk in self.c.assigned.values():
self.failIf(conn is theConn)
self.assertEqual(self.c.connection_count.get(self.cs,0),0)
def test_close_on_del(self,close=0):
DB = self.DB(self.cs)
# get connection id
DB._begin()
id = DB._v_connection_id
# close it
if close:
DB.close()
# make sure connection id is empty
self.failUnless(DB._v_connection_id is None,DB._v_connection_id)
else:
DB = None
# make sure connection has been given back to the pool
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(len(self.c.pool[self.cs]),1)
self.assertEqual(len(self.c.assigned.keys()),0)
def test_close_on_close(self):
self.test_close_on_del(1)
def test_close_on_del_nc(self, close=0):
# check we don't get errors closing if we have no connection
DB = self.DB(self.cs)
# close it
if close:
DB.close()
# make sure connection id is empty
self.failUnless(DB._v_connection_id is None)
else:
DB = None
def test_close_on_close_nc(self):
self.test_close_on_del_nc(1)
def test_commit(self):
DB = self.DB(self.cs)
DB._register()
# check we now have an id
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(len(self.c.pool[self.cs]),0)
self.assertEqual(len(self.c.assigned.keys()),1)
# check this matches a DB connection
self.assertEqual(DB._v_connection_id,0)
DB.tpc_begin()
DB.tpc_vote()
DB.tpc_finish()
# check the id is None
self.assertEqual(DB._v_connection_id,None)
# check DB connection is in pool
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(len(self.c.assigned.keys()),0)
self.assertEqual(len(self.c.pool[self.cs]),1)
def test_abort(self):
DB = self.DB(self.cs)
DB._register()
# check we now have an id
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(len(self.c.pool[self.cs]),0)
self.assertEqual(len(self.c.assigned.keys()),1)
# check this matches a DB connection
self.assertEqual(DB._v_connection_id,0)
DB.abort()
# check the id is None
self.assertEqual(DB._v_connection_id,None)
# check DB connection is in pool
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(len(self.c.assigned.keys()),0)
self.assertEqual(len(self.c.pool[self.cs]),1)
def test_tpc_abort(self):
DB = self.DB(self.cs)
DB._register()
# check we now have an id
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(len(self.c.pool[self.cs]),0)
self.assertEqual(len(self.c.assigned.keys()),1)
# check this matches a DB connection
self.assertEqual(DB._v_connection_id,0)
DB.tpc_begin()
DB.tpc_vote()
DB.tpc_abort()
# check the id is None
self.assertEqual(DB._v_connection_id,None)
# check DB connection is in pool
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(len(self.c.assigned.keys()),0)
self.assertEqual(len(self.c.pool[self.cs]),1)
def test_concurrent(self):
DB1 = self.DB(self.cs)
DB1._register()
DB2 = self.DB(self.cs)
DB2._register()
# check we now have an id
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(len(self.c.pool[self.cs]),0)
self.assertEqual(len(self.c.assigned.keys()),2)
# check this matches a DB connection
self.assertEqual(DB1._v_connection_id,0)
self.assertEqual(DB2._v_connection_id,1)
# check they're different connections
conn1 = self.c.getConnection(DB1._v_connection_id)
conn2 = self.c.getConnection(DB2._v_connection_id)
self.failIf(conn1 is conn2)
def test_concurrent_abort(self):
DB1 = self.DB(self.cs)
DB1._register()
DB2 = self.DB(self.cs)
DB2._register()
# check we now have an id
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(len(self.c.pool[self.cs]),0)
self.assertEqual(len(self.c.assigned.keys()),2)
# abort 1
DB1.abort()
# check things are split
self.assertEqual(len(self.c.pool.keys()),1)
self.assertEqual(len(self.c.pool[self.cs]),1)
self.assertEqual(len(self.c.assigned.keys()),1)
if __name__ == '__main__':
unittest.main()
More information about the Zope-Checkins
mailing list