[Zope-Checkins] CVS: Products/DCOracle2 - connections.py:1.1.2.1 exceptions.py:1.1.2.1 db.py:1.13.2.3

Chris Withers cvs-admin at zope.org
Tue Nov 4 16:38:37 EST 2003


Update of /cvs-repository/Products/DCOracle2
In directory cvs.zope.org:/tmp/cvs-serv21272

Modified Files:
      Tag: chrisw_fixconnectionleak_branch
	db.py 
Added Files:
      Tag: chrisw_fixconnectionleak_branch
	connections.py exceptions.py 
Log Message:
First pass at Thread Pool connection model.
Also better handling for errors in DB.query method.


=== Added File Products/DCOracle2/connections.py ===
import DCOracle2
import threading

from Shared.DC.ZRDB.TM import Surrogate
from exceptions import ConnectionReleasedError, NoConnectionError

# a pool of connections
# connection string -> list of available connections
pool = {}

# connections currently in use
# connection id -> connection object
assigned = {}

# connection count of created connections
# connecting string -> number of connections around
connection_count = {}


# the next id to be assigned
next_id = 0

# the big fat lock for when modifying the above
lock = threading.Lock()

def assignConnection(connectionString):
    global lock
    global pool
    global assigned
    global next_id
    global connection_count

    try:
        
        lock.acquire()

        available = pool.get(connectionString,None)

        if available is None:
            available = pool[connectionString] = []

        if available:
            id, conn = available.pop()
        else:
            conn = DCOracle2.connect(connectionString)
            id = next_id
            next_id += 1

            connection_count[
                connectionString
                ] = connection_count.get(
                        connectionString,
                        0) + 1
        
        assigned[id]=(conn,connectionString)

    finally:

        lock.release()
    
    return id

def getConnection(id):
    try:
        connpair = assigned[id]
    except KeyError,e:
        if e.args:
            raise ConnectionReleasedError,e.args[0]
        else:
            raise NoConnectionError,id
    return connpair[0]

def returnConnection(id):
    global lock
    global pool
    global assigned

    try:
        
        lock.acquire()

        try:
            conn,connectionString = assigned[id]
        except KeyError,e:
            if e.args:
                raise ConnectionReleasedError,e.args[0]
            else:
                raise NoConnectionError,id
        
        del assigned[id]

        pool[connectionString].append((id,conn))

    finally:

        lock.release()

def close(connectionString=None, theId=None):
    global lock
    global pool
    global assigned
    global next_id
    global connection_count

    try:
        
        lock.acquire()

        for connstring,connections in pool.items():
            if connectionString is None or connstring==connectionString:
                for id,conn in connections:
                    if theId is None or id==theId:
                        pool[connstring].remove((id,conn))
                        conn.cursor().close()
                        conn.close()
                if not pool[connstring]:
                    del pool[connstring]

        for id,connpair in assigned.items():
            if theId is None or id==theId:
                conn,connstring = connpair
                if connectionString is None or connstring==connectionString:
                    conn.cursor().close()
                    conn.close()            
                    del assigned[id]

        if connectionString is None and theId is None:

            next_id = 0        

            for key in connection_count.keys():
                del connection_count[key]

        elif theId is None:

            if connection_count.has_key(connectionString):
                del connection_count[connectionString]

        else:

            connection_count[connectionString] -= 1

    finally:

        lock.release()

def countConnections(connectionString):
    return connection_count.get(connectionString,0)

class CTM:
    """A class providing transaction manager support
       for connection pooling.
       Also stores _registered and _finalised as
       _v_ variables"""

    _v_connection_id = _v_registered = None

    def _register(self):
        if not self._v_registered:
            get_transaction().register(Surrogate(self))
            self._begin()
            self._v_registered = 1
            self._v_finalize = 0

    def tpc_begin(self, *ignored): pass
    commit=tpc_begin

    def tpc_vote(self, *ignored):
        self._v_finalize = 1

    def tpc_finish(self, *ignored):

        if self._v_finalize:
            try: self._finish()
            finally: self._v_registered=0

    def abort(self, *ignored):
        try: self._abort()
        finally: self._v_registered=0

    tpc_abort = abort

    def sortKey(self, *ignored):
        """ The sortKey method is used for recent ZODB compatibility which
        needs to have a known commit order for lock acquisition.  Most
        DA's talking to RDBMS systems do not care about commit order, so
        return the constant 1
        """
        return 1
    
    def getDB(self):
        if self._v_connection_id is None:            
            self._register()
        return getConnection(
            self._v_connection_id
            )
        
    def __del__(self):
        if self._v_connection_id is not None:
            returnConnection(
                self._v_connection_id
                )
            self._v_connection_id = None
        
    def _begin(self):
        # assertion checks we've not already got a
        # connection which would get leaked at this
        # point
        assert self._v_connection_id is None
        
        # either use the connection string
        # or, if it's not there (we're a stored procedure)
        # then get the connection string from our DA
        self._v_connection_id = assignConnection(
            getattr(self,'connection_string',None) or \
            getattr(self,self.connection).connection_string
            )
        
    def _finish(self, *ignored):
        if self._v_connection_id is not None:
            db = getConnection(
                self._v_connection_id
                )
            db.commit()
            returnConnection(
                self._v_connection_id
                )
            self._v_connection_id = None
            
    def _abort(self, *ignored):
        if self._v_connection_id is not None:
            db = getConnection(
                self._v_connection_id
                )
            db.rollback()
            returnConnection(
                self._v_connection_id
                )
            self._v_connection_id = None
    


=== Added File Products/DCOracle2/exceptions.py ===
# Exceptions raised by ZOracleDA

class ConnectionReleasedError(KeyError):
    """This is raised when code attempts to retrieve
    a connection from the connection pool by id,
    but the id doesn't has a corresponding connection
    in the pool.

    Most common cause is people hitting the 'Close Connections'
    button.
    """
    
    pass

class NoConnectionError(KeyError):
    """This is raised when code attempts to retreive
    a connection from the connection pool with an id of None.

    This shouldn't happen, but if it does, it most likely
    means a DB instance is being used without first having
    it's _register method called to associate it with
    the current transaction and make sure it has grabbed
    a connection from the pool
    """
    pass



=== Products/DCOracle2/db.py 1.13.2.2 => 1.13.2.3 ===
--- Products/DCOracle2/db.py:1.13.2.2	Mon Nov  3 10:22:48 2003
+++ Products/DCOracle2/db.py	Tue Nov  4 16:38:07 2003
@@ -87,19 +87,21 @@
 __version__='$Revision$'[11:-2]
 
 import DCOracle2, DateTime
-from Shared.DC.ZRDB.TM import TM
 
 import string, sys
 from string import strip, split, find
 from time import time
 
+from Products.ZOracleDA.connections import CTM, close
+from ZPublisher import Retry
+
 failures=0
 calls=0
 last_call_time=time()
 
 IMPLICITLOBS=1      # Turn this off to get old LOB behavior
 
-class DB(TM):
+class DB(CTM):
 
     _p_oid=_p_changed=_registered=None
 
@@ -107,45 +109,31 @@
 
     def __init__(self,connection_string):
         self.connection_string=connection_string
-        db=self.db=DCOracle2.connect(connection_string)
-        self.cursor=db.cursor()
 
     def close(self):
-        if hasattr(self,'cursor'):
-            if self.cursor.isOpen():
-                self.cursor.close()
-            del self.cursor
-        if hasattr(self,'db'):
-            if self.db.isOpen():
-                self.db.close()
-            del self.db
-            
-    def __del__(self):
-        self.close()
-        
+        # give our connection back to the pool
+        # if we have one
+        CTM.__del__(self)
+    
     def str(self,v, StringType=type('')):
         if v is None: return ''
         r=str(v)
         if r[-1:]=='L' and type(v) is not StringType: r=r[:-1]
         return r
 
-    def _finish(self, *ignored):
-        self.db.commit()
-
-    def _abort(self, *ignored):
-        self.db.rollback()
-
     def tables(self, rdb=0,
                _care=('TABLE', 'VIEW')):
         r=[]
         a=r.append
-        for name, typ in self.db.objects():
+        db = self.getDB()
+        for name, typ in db.objects():
             if typ in _care:
                 a({'TABLE_NAME': name, 'TABLE_TYPE': typ})
         return r
 
     def columns(self, table_name):
-        c=self.cursor
+        db = self.getDB()
+        c=db.cursor()
         try: r=c.execute('select * from %s' % table_name)
         except: return ()
         desc=c.description
@@ -174,9 +162,9 @@
         calls=calls+1
         desc=None
         result=()
-        self._register()
-        c=self.cursor            
         try:
+            db = self.getDB()
+            c=db.cursor()
             for qs in filter(None, map(strip,split(query_string, '\0'))):
                 r=c.execute(qs) # Returns 1 on SELECT
                 if desc is not None:
@@ -196,19 +184,32 @@
                                 
             failures=0
             last_call_time=time()
-        except self.Database_Error, mess:
-            failures=failures+1
-            if (failures > 1000 or time()-last_call_time > 600):
-                # Hm. maybe the db is hosed.  Let's try once to restart it.
-                failures=0
-                last_call_time=time()
-		c.close()
-		self.db.close()
-		db=self.db=DCOracle2.connect(self.connection_string)
-		self.cursor=db.cursor()
-                return self.query(query_string, max_rows)
-	    else: raise sys.exc_type, sys.exc_value, sys.exc_traceback
+        except self.Database_Error, e:
+            # if it looks like an error caused by a hosed connection
+            # then stomp on it and then use ZPublisher's Retry
+            # functionality.
+            # This will retry the request up to 3 times for HTTP requests
+            # if it still fails then, well, we should stop retrying ;-)
+
+            # This list should probably grow
+            if e.args[0] in (
+                3114, # ORA-03114: not connected to ORACLE
+                1012, # ORA-01012: not logged on
+                ):
+
+                # kill the connection and remove it from the pool
+                close(self.connection_string,self._v_connection_id)
+
+                # set our id back to None so _abort or _commit
+                # don't try and do anything with the hosed connection
+                self._v_connection_id = None
+                
+                # retry the whole request
+                raise Retry(*sys.exc_info())
 
+            # otherwise, just raise it so we can see what's going on
+            raise
+            
         if desc is None: return (),()
 
         items=[]




More information about the Zope-Checkins mailing list