[Zope-Checkins] CVS: Products/DCOracle2/test - test_connections.py:1.1.2.1 test_DB.py:1.1.2.1

Chris Withers cvs-admin at zope.org
Tue Nov 4 16:38:08 EST 2003


Update of /cvs-repository/Products/DCOracle2/test
In directory cvs.zope.org:/tmp/cvs-serv21272/test

Added Files:
      Tag: chrisw_fixconnectionleak_branch
	test_connections.py test_DB.py 
Log Message:
First pass at Thread Pool connection model.
Also better handling for errors in DB.query method.


=== Added File Products/DCOracle2/test/test_connections.py ===
import unittest
import common

def _clearPool(c):
    # clear pool
    for connstring,connections in c.pool.items():
        for junk,conn in connections:
            if conn.isOpen():
                conn.cursor().close()
                conn.close()
        del c.pool[connstring]
    # clear assigned
    for id,connpair in c.assigned.items():
        conn = connpair[0]
        if conn is not None: # None = dummu conn
            if conn.isOpen():
                conn.cursor().close()
                conn.close()
        del c.assigned[id]
    # clear next id variable
    c.next_id = 0
    # clear the assigned count
    for key in c.connection_count.keys():
        del c.connection_count[key]
    
class test_connections(unittest.TestCase):

    # white-box tests of the connection module

    def setUp(self):
        import Testing
        import Zope
        if hasattr(Zope,'startup'):
            Zope.startup()            
        import Products.ZOracleDA.connections
        self.c = Products.ZOracleDA.connections
        _clearPool(self.c)
        self.cs = common.getConnectionString()

    def test_getConnection(self):
        from Products.ZOracleDA.DCOracle2.DCOracle2 import connection
        # get connection
        id = self.c.assignConnection(self.cs)
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(len(self.c.pool[self.cs]),0)
        self.assertEqual(len(self.c.assigned.keys()),1)
        self.assertEqual(id,0)
        conn1 = self.c.getConnection(id)
        self.failUnless(isinstance(conn1,connection))
        # get second connection
        id2 = self.c.assignConnection(self.cs)
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(len(self.c.pool[self.cs]),0)
        self.assertEqual(len(self.c.assigned.keys()),2)
        self.assertEqual(id2,1)
        conn2 = self.c.getConnection(id2)
        self.failUnless(isinstance(conn2,connection))
        # make sure they're different
        self.failIf(conn1 is conn2)
        
    def test_returnConnection(self):
        # assign connections
        id1 = self.c.assignConnection(self.cs)
        conn1 = self.c.getConnection(id1)
        id2 = self.c.assignConnection(self.cs)
        # return connection
        self.c.returnConnection(id1)
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(len(self.c.assigned.keys()),1)
        # re-assign conection
        id3 = self.c.assignConnection(self.cs)
        conn2 = self.c.getConnection(id3)
        self.failUnless(conn1 is conn2)
        # return both connections
        self.c.returnConnection(id2)
        self.c.returnConnection(id3)
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(len(self.c.assigned.keys()),0)
        self.assertEqual(len(self.c.pool[self.cs]),2)

    def test_close(self):
        # setup
        id1 = self.c.assignConnection(self.cs)
        id2 = self.c.assignConnection(self.cs)
        conn1 = self.c.getConnection(id1)
        conn2 = self.c.getConnection(id2)
        self.c.returnConnection(id1)
        # close
        self.c.close()
        # next
        self.failIf(conn1.isOpen())
        self.failIf(conn2.isOpen())
        self.failIf(self.c.pool.keys())
        self.failIf(self.c.assigned.keys())
        self.failIf(self.c.connection_count.keys())
        self.assertEqual(self.c.next_id,0)

    def test_closeSpecific(self):
        # setup
        id1 = self.c.assignConnection(self.cs)
        conn1 = self.c.getConnection(id1)
        # setup dummy keys
        self.c.pool['dummy']=[]
        self.c.assigned[666]=(None,'dummy')
        self.c.connection_count['dummy']=1
        # close
        self.c.close(self.cs)
        # check one connection is gone, but other key remains
        self.failIf(conn1.isOpen())
        self.c.pool['dummy']
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(self.c.assigned[666],(None,'dummy'))
        self.assertEqual(len(self.c.assigned.keys()),1)
        self.assertEqual(self.c.connection_count['dummy'],1)
        self.assertEqual(len(self.c.connection_count.keys()),1)
        self.assertEqual(self.c.next_id,1)

    def test_closeSpecificBad(self):
        # call close with a string that has no open connections
        # should not error...
        self.c.close(self.cs)

    def test_closeSpecificID(self):
        # setup
        id1 = self.c.assignConnection(self.cs)
        conn1 = self.c.getConnection(id1)
        id2 = self.c.assignConnection(self.cs)
        conn2 = self.c.getConnection(id2)
        id3 = self.c.assignConnection(self.cs)
        conn3 = self.c.getConnection(id3)
        self.c.returnConnection(id2)        
        # setup dummy keys
        self.c.pool['dummy']=[]
        self.c.assigned[666]=(None,'dummy')
        self.c.connection_count['dummy']=1
        # close
        self.c.close(self.cs,id1)
        self.c.close(self.cs,id3)
        # check one connection is gone, but other 2 remain
        self.failIf(conn1.isOpen())
        self.failUnless(conn2.isOpen())
        self.failIf(conn3.isOpen())
        self.c.pool['dummy']
        csp = self.c.pool[self.cs]
        self.assertEqual(len(csp),1)
        self.failUnless(csp[0]) is conn2
        self.assertEqual(len(self.c.pool.keys()),2)
        self.assertEqual(self.c.assigned[666],(None,'dummy'))
        self.failIf(self.c.assigned.has_key(id3))
        self.assertEqual(len(self.c.assigned.keys()),1)
        self.assertEqual(self.c.connection_count['dummy'],1)
        self.assertEqual(self.c.connection_count[self.cs],1)
        self.assertEqual(len(self.c.connection_count.keys()),2)
        self.assertEqual(self.c.next_id,3)
        
    def test_count_assigned(self):
        self.assertEqual(self.c.countConnections(self.cs),0)
        id1 = self.c.assignConnection(self.cs)
        self.assertEqual(self.c.countConnections(self.cs),1)
        id2 = self.c.assignConnection(self.cs)
        self.assertEqual(self.c.countConnections(self.cs),2)
        self.c.returnConnection(id1)
        self.assertEqual(self.c.countConnections(self.cs),2)
        self.c.returnConnection(id2)
        self.assertEqual(self.c.countConnections(self.cs),2)
        self.c.assignConnection(self.cs)
        self.c.assignConnection(self.cs)
        self.assertEqual(self.c.countConnections(self.cs),2)
        self.c.close()
        self.assertEqual(self.c.countConnections(self.cs),0)

    def test_conn_gone_away(self):
        from Products.ZOracleDA.exceptions import ConnectionReleasedError
        id = self.c.assignConnection(self.cs)
        self.c.returnConnection(id)
        self.assertRaises(ConnectionReleasedError,self.c.getConnection,id)
        
    def test_no_conn_id(self):
        from Products.ZOracleDA.exceptions import NoConnectionError
        self.assertRaises(NoConnectionError,self.c.getConnection,None)

    def test_conn_gone_return(self):
        from Products.ZOracleDA.exceptions import ConnectionReleasedError
        id = self.c.assignConnection(self.cs)
        self.c.returnConnection(id)
        self.assertRaises(ConnectionReleasedError,self.c.returnConnection,id)
        
    def test_no_conn_id_return(self):
        from Products.ZOracleDA.exceptions import NoConnectionError
        self.assertRaises(NoConnectionError,self.c.returnConnection,None)

if __name__ == '__main__':
    unittest.main()


=== Added File Products/DCOracle2/test/test_DB.py ===
import common
import unittest

class test_DB(unittest.TestCase):

    def setUp(self):
        import Testing
        import Zope
        if hasattr(Zope,'startup'):
            Zope.startup()
        from Products.ZOracleDA.db import DB
        from Products.ZOracleDA import connections
        from test_connections import _clearPool
        _clearPool(connections)
        self.DB = DB
        self.c = connections
        self.cs = common.getConnectionString()
        
    def test_hosed_db(self):
        DB = self.DB(self.cs)

        # get connection
        DB._begin()
        conn = self.c.getConnection(DB._v_connection_id)

        # hose it
        def hosed_getDB():
            from Products.ZOracleDA.DCOracle2 import DatabaseError
            raise DatabaseError(3114,'ORA-03114: not connected to ORACLE')
        DB.getDB=hosed_getDB
        
        # query
        from ZPublisher import Retry
        self.assertRaises(Retry,DB.query,'SELECT * from EMP')
        self.failIf(conn.isOpen())

        # check the conn is dead and burried
        for junk,theConn in self.c.pool.get(self.cs,[]):
            self.failIf(conn is theConn)
        for theConn,junk in self.c.assigned.values():
            self.failIf(conn is theConn)
        self.assertEqual(self.c.connection_count.get(self.cs,0),0)
        
    def test_close_on_del(self,close=0):
        DB = self.DB(self.cs)

        # get connection id
        DB._begin()
        id = DB._v_connection_id

        # close it
        if close:
            DB.close()
            # make sure connection id is empty
            self.failUnless(DB._v_connection_id is None,DB._v_connection_id)
        else:
            DB = None

        # make sure connection has been given back to the pool
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(len(self.c.pool[self.cs]),1)
        self.assertEqual(len(self.c.assigned.keys()),0)

    def test_close_on_close(self):
        self.test_close_on_del(1)

    def test_close_on_del_nc(self, close=0):
        # check we don't get errors closing if we have no connection
        DB = self.DB(self.cs)

        # close it
        if close:
            DB.close()
            # make sure connection id is empty
            self.failUnless(DB._v_connection_id is None)
        else:
            DB = None
            
    def test_close_on_close_nc(self):
        self.test_close_on_del_nc(1)
        
    def test_commit(self):
        DB = self.DB(self.cs)
        DB._register()
        # check we now have an id
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(len(self.c.pool[self.cs]),0)
        self.assertEqual(len(self.c.assigned.keys()),1)
        # check this matches a DB connection
        self.assertEqual(DB._v_connection_id,0)
        DB.tpc_begin()
        DB.tpc_vote()
        DB.tpc_finish()
        # check the id is None
        self.assertEqual(DB._v_connection_id,None)
        # check DB connection is in pool
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(len(self.c.assigned.keys()),0)
        self.assertEqual(len(self.c.pool[self.cs]),1)
        
    def test_abort(self):
        DB = self.DB(self.cs)
        DB._register()
        # check we now have an id
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(len(self.c.pool[self.cs]),0)
        self.assertEqual(len(self.c.assigned.keys()),1)
        # check this matches a DB connection
        self.assertEqual(DB._v_connection_id,0)
        DB.abort()
        # check the id is None
        self.assertEqual(DB._v_connection_id,None)
        # check DB connection is in pool
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(len(self.c.assigned.keys()),0)
        self.assertEqual(len(self.c.pool[self.cs]),1)
        
    def test_tpc_abort(self):
        DB = self.DB(self.cs)
        DB._register()
        # check we now have an id
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(len(self.c.pool[self.cs]),0)
        self.assertEqual(len(self.c.assigned.keys()),1)
        # check this matches a DB connection
        self.assertEqual(DB._v_connection_id,0)
        DB.tpc_begin()
        DB.tpc_vote()
        DB.tpc_abort()
        # check the id is None
        self.assertEqual(DB._v_connection_id,None)
        # check DB connection is in pool
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(len(self.c.assigned.keys()),0)
        self.assertEqual(len(self.c.pool[self.cs]),1)
        
    def test_concurrent(self):
        DB1 = self.DB(self.cs)
        DB1._register()
        DB2 = self.DB(self.cs)
        DB2._register()
        # check we now have an id
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(len(self.c.pool[self.cs]),0)
        self.assertEqual(len(self.c.assigned.keys()),2)
        # check this matches a DB connection
        self.assertEqual(DB1._v_connection_id,0)
        self.assertEqual(DB2._v_connection_id,1)
        # check they're different connections
        conn1 = self.c.getConnection(DB1._v_connection_id)
        conn2 = self.c.getConnection(DB2._v_connection_id)
        self.failIf(conn1 is conn2)

        
    def test_concurrent_abort(self):
        DB1 = self.DB(self.cs)
        DB1._register()
        DB2 = self.DB(self.cs)
        DB2._register()
        # check we now have an id
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(len(self.c.pool[self.cs]),0)
        self.assertEqual(len(self.c.assigned.keys()),2)
        # abort 1
        DB1.abort()
        # check things are split
        self.assertEqual(len(self.c.pool.keys()),1)
        self.assertEqual(len(self.c.pool[self.cs]),1)
        self.assertEqual(len(self.c.assigned.keys()),1)
        
if __name__ == '__main__':
    unittest.main()




More information about the Zope-Checkins mailing list