[Zope-Checkins] CVS: Products/DCOracle2 - connections.py:1.1.2.1
exceptions.py:1.1.2.1 db.py:1.13.2.3
Chris Withers
cvs-admin at zope.org
Tue Nov 4 16:38:37 EST 2003
Update of /cvs-repository/Products/DCOracle2
In directory cvs.zope.org:/tmp/cvs-serv21272
Modified Files:
Tag: chrisw_fixconnectionleak_branch
db.py
Added Files:
Tag: chrisw_fixconnectionleak_branch
connections.py exceptions.py
Log Message:
First pass at Thread Pool connection model.
Also better handling for errors in DB.query method.
=== Added File Products/DCOracle2/connections.py ===
import DCOracle2
import threading
from Shared.DC.ZRDB.TM import Surrogate
from exceptions import ConnectionReleasedError, NoConnectionError
# a pool of connections
# connection string -> list of available connections
pool = {}
# connections currently in use
# connection id -> connection object
assigned = {}
# connection count of created connections
# connecting string -> number of connections around
connection_count = {}
# the next id to be assigned
next_id = 0
# the big fat lock for when modifying the above
lock = threading.Lock()
def assignConnection(connectionString):
global lock
global pool
global assigned
global next_id
global connection_count
try:
lock.acquire()
available = pool.get(connectionString,None)
if available is None:
available = pool[connectionString] = []
if available:
id, conn = available.pop()
else:
conn = DCOracle2.connect(connectionString)
id = next_id
next_id += 1
connection_count[
connectionString
] = connection_count.get(
connectionString,
0) + 1
assigned[id]=(conn,connectionString)
finally:
lock.release()
return id
def getConnection(id):
try:
connpair = assigned[id]
except KeyError,e:
if e.args:
raise ConnectionReleasedError,e.args[0]
else:
raise NoConnectionError,id
return connpair[0]
def returnConnection(id):
global lock
global pool
global assigned
try:
lock.acquire()
try:
conn,connectionString = assigned[id]
except KeyError,e:
if e.args:
raise ConnectionReleasedError,e.args[0]
else:
raise NoConnectionError,id
del assigned[id]
pool[connectionString].append((id,conn))
finally:
lock.release()
def close(connectionString=None, theId=None):
global lock
global pool
global assigned
global next_id
global connection_count
try:
lock.acquire()
for connstring,connections in pool.items():
if connectionString is None or connstring==connectionString:
for id,conn in connections:
if theId is None or id==theId:
pool[connstring].remove((id,conn))
conn.cursor().close()
conn.close()
if not pool[connstring]:
del pool[connstring]
for id,connpair in assigned.items():
if theId is None or id==theId:
conn,connstring = connpair
if connectionString is None or connstring==connectionString:
conn.cursor().close()
conn.close()
del assigned[id]
if connectionString is None and theId is None:
next_id = 0
for key in connection_count.keys():
del connection_count[key]
elif theId is None:
if connection_count.has_key(connectionString):
del connection_count[connectionString]
else:
connection_count[connectionString] -= 1
finally:
lock.release()
def countConnections(connectionString):
return connection_count.get(connectionString,0)
class CTM:
"""A class providing transaction manager support
for connection pooling.
Also stores _registered and _finalised as
_v_ variables"""
_v_connection_id = _v_registered = None
def _register(self):
if not self._v_registered:
get_transaction().register(Surrogate(self))
self._begin()
self._v_registered = 1
self._v_finalize = 0
def tpc_begin(self, *ignored): pass
commit=tpc_begin
def tpc_vote(self, *ignored):
self._v_finalize = 1
def tpc_finish(self, *ignored):
if self._v_finalize:
try: self._finish()
finally: self._v_registered=0
def abort(self, *ignored):
try: self._abort()
finally: self._v_registered=0
tpc_abort = abort
def sortKey(self, *ignored):
""" The sortKey method is used for recent ZODB compatibility which
needs to have a known commit order for lock acquisition. Most
DA's talking to RDBMS systems do not care about commit order, so
return the constant 1
"""
return 1
def getDB(self):
if self._v_connection_id is None:
self._register()
return getConnection(
self._v_connection_id
)
def __del__(self):
if self._v_connection_id is not None:
returnConnection(
self._v_connection_id
)
self._v_connection_id = None
def _begin(self):
# assertion checks we've not already got a
# connection which would get leaked at this
# point
assert self._v_connection_id is None
# either use the connection string
# or, if it's not there (we're a stored procedure)
# then get the connection string from our DA
self._v_connection_id = assignConnection(
getattr(self,'connection_string',None) or \
getattr(self,self.connection).connection_string
)
def _finish(self, *ignored):
if self._v_connection_id is not None:
db = getConnection(
self._v_connection_id
)
db.commit()
returnConnection(
self._v_connection_id
)
self._v_connection_id = None
def _abort(self, *ignored):
if self._v_connection_id is not None:
db = getConnection(
self._v_connection_id
)
db.rollback()
returnConnection(
self._v_connection_id
)
self._v_connection_id = None
=== Added File Products/DCOracle2/exceptions.py ===
# Exceptions raised by ZOracleDA
class ConnectionReleasedError(KeyError):
"""This is raised when code attempts to retrieve
a connection from the connection pool by id,
but the id doesn't has a corresponding connection
in the pool.
Most common cause is people hitting the 'Close Connections'
button.
"""
pass
class NoConnectionError(KeyError):
"""This is raised when code attempts to retreive
a connection from the connection pool with an id of None.
This shouldn't happen, but if it does, it most likely
means a DB instance is being used without first having
it's _register method called to associate it with
the current transaction and make sure it has grabbed
a connection from the pool
"""
pass
=== Products/DCOracle2/db.py 1.13.2.2 => 1.13.2.3 ===
--- Products/DCOracle2/db.py:1.13.2.2 Mon Nov 3 10:22:48 2003
+++ Products/DCOracle2/db.py Tue Nov 4 16:38:07 2003
@@ -87,19 +87,21 @@
__version__='$Revision$'[11:-2]
import DCOracle2, DateTime
-from Shared.DC.ZRDB.TM import TM
import string, sys
from string import strip, split, find
from time import time
+from Products.ZOracleDA.connections import CTM, close
+from ZPublisher import Retry
+
failures=0
calls=0
last_call_time=time()
IMPLICITLOBS=1 # Turn this off to get old LOB behavior
-class DB(TM):
+class DB(CTM):
_p_oid=_p_changed=_registered=None
@@ -107,45 +109,31 @@
def __init__(self,connection_string):
self.connection_string=connection_string
- db=self.db=DCOracle2.connect(connection_string)
- self.cursor=db.cursor()
def close(self):
- if hasattr(self,'cursor'):
- if self.cursor.isOpen():
- self.cursor.close()
- del self.cursor
- if hasattr(self,'db'):
- if self.db.isOpen():
- self.db.close()
- del self.db
-
- def __del__(self):
- self.close()
-
+ # give our connection back to the pool
+ # if we have one
+ CTM.__del__(self)
+
def str(self,v, StringType=type('')):
if v is None: return ''
r=str(v)
if r[-1:]=='L' and type(v) is not StringType: r=r[:-1]
return r
- def _finish(self, *ignored):
- self.db.commit()
-
- def _abort(self, *ignored):
- self.db.rollback()
-
def tables(self, rdb=0,
_care=('TABLE', 'VIEW')):
r=[]
a=r.append
- for name, typ in self.db.objects():
+ db = self.getDB()
+ for name, typ in db.objects():
if typ in _care:
a({'TABLE_NAME': name, 'TABLE_TYPE': typ})
return r
def columns(self, table_name):
- c=self.cursor
+ db = self.getDB()
+ c=db.cursor()
try: r=c.execute('select * from %s' % table_name)
except: return ()
desc=c.description
@@ -174,9 +162,9 @@
calls=calls+1
desc=None
result=()
- self._register()
- c=self.cursor
try:
+ db = self.getDB()
+ c=db.cursor()
for qs in filter(None, map(strip,split(query_string, '\0'))):
r=c.execute(qs) # Returns 1 on SELECT
if desc is not None:
@@ -196,19 +184,32 @@
failures=0
last_call_time=time()
- except self.Database_Error, mess:
- failures=failures+1
- if (failures > 1000 or time()-last_call_time > 600):
- # Hm. maybe the db is hosed. Let's try once to restart it.
- failures=0
- last_call_time=time()
- c.close()
- self.db.close()
- db=self.db=DCOracle2.connect(self.connection_string)
- self.cursor=db.cursor()
- return self.query(query_string, max_rows)
- else: raise sys.exc_type, sys.exc_value, sys.exc_traceback
+ except self.Database_Error, e:
+ # if it looks like an error caused by a hosed connection
+ # then stomp on it and then use ZPublisher's Retry
+ # functionality.
+ # This will retry the request up to 3 times for HTTP requests
+ # if it still fails then, well, we should stop retrying ;-)
+
+ # This list should probably grow
+ if e.args[0] in (
+ 3114, # ORA-03114: not connected to ORACLE
+ 1012, # ORA-01012: not logged on
+ ):
+
+ # kill the connection and remove it from the pool
+ close(self.connection_string,self._v_connection_id)
+
+ # set our id back to None so _abort or _commit
+ # don't try and do anything with the hosed connection
+ self._v_connection_id = None
+
+ # retry the whole request
+ raise Retry(*sys.exc_info())
+ # otherwise, just raise it so we can see what's going on
+ raise
+
if desc is None: return (),()
items=[]
More information about the Zope-Checkins
mailing list