[Zodb-checkins] SVN: ZODB/trunk/ Port rev 26199 from the 3.3 branch.

Tim Peters tim.one at comcast.net
Wed Jul 7 22:45:19 EDT 2004


Log message for revision 26200:
Port rev 26199 from the 3.3 branch.

Zope3-dev Collector #139: Memory leak involving buckets and connections

Connection objects were typically immortal because the threaded
transaction manager kept them in ever-growing lists.  Reworked the
transaction manager internals to use a simple implementation of weak sets
instead.  This plugs all leaks in the test program attached to the
collector report (which was leaking about 100KB/sec on my box).



-=-
Modified: ZODB/trunk/NEWS.txt
===================================================================
--- ZODB/trunk/NEWS.txt	2004-07-08 02:14:43 UTC (rev 26199)
+++ ZODB/trunk/NEWS.txt	2004-07-08 02:45:19 UTC (rev 26200)
@@ -2,6 +2,21 @@
 =======================
 Release date: DD-MMM-2004
 
+Transaction Managers
+--------------------
+
+Zope3-dev Collector #139: Memory leak involving buckets and connections
+
+The transaction manager internals effectively made every Connection
+object immortal, except for those explicitly closed.  Since typical
+practice is not to close connections explicitly (and closing a DB
+happens not to close the connections to it -- although that may
+change), this caused massive memory leaks when many connections were
+opened.  The transaction manager internals were reworked to use weak
+references instead, so that connection memory (and other registered
+synch objects) now get cleaned up when nothing other than the
+transaction manager knows about them.
+
 Storages
 --------
 

Modified: ZODB/trunk/src/transaction/_manager.py
===================================================================
--- ZODB/trunk/src/transaction/_manager.py	2004-07-08 02:14:43 UTC (rev 26199)
+++ ZODB/trunk/src/transaction/_manager.py	2004-07-08 02:45:19 UTC (rev 26200)
@@ -18,24 +18,70 @@
 """
 
 import thread
+import weakref
 
 from transaction._transaction import Transaction
 
+# We have to remember sets of synch objects, especially Connections.
+# But we don't want mere registration with a transaction manager to
+# keep a synch object alive forever; in particular, it's common
+# practice not to explicitly close Connection objects, and keeping
+# a Connection alive keeps a potentially huge number of other objects
+# alive (e.g., the cache, and everything reachable from it too).
+#
+# Therefore we use "weak sets" internally.  The implementation here
+# implements just enough of Python's sets.Set interface for our needs.
+
+class WeakSet(object):
+    """A set of objects that doesn't keep its elements alive.
+
+    The objects in the set must be weakly referencable.
+    The objects need not be hashable, and need not support comparison.
+    Two objects are considered to be the same iff their id()s are equal.
+
+    When the only references to an object are weak references (including
+    those from WeakSets), the object can be garbage-collected, and
+    will vanish from any WeakSets it may be a member of at that time.
+    """
+
+    def __init__(self):
+        # Map id(obj) to obj.  By using ids as keys, we avoid requiring
+        # that the elements be hashable or comparable.
+        self.data = weakref.WeakValueDictionary()
+
+    # Same as a Set, add obj to the collection.
+    def add(self, obj):
+        self.data[id(obj)] = obj
+
+    # Same as a Set, remove obj from the collection, and raise
+    # KeyError if obj not in the collection.
+    def remove(self, obj):
+        del self.data[id(obj)]
+
+    # Return a list of all the objects in the collection.
+    # Because a weak dict is used internally, iteration
+    # is dicey (the underlying dict may change size during
+    # iteration, due to gc or activity from other threads).
+    # as_list() attempts to be safe.
+    def as_list(self):
+        return self.data.values()
+
+
 class TransactionManager(object):
 
     def __init__(self):
         self._txn = None
-        self._synchs = []
+        self._synchs = WeakSet()
 
     def begin(self):
         if self._txn is not None:
             self._txn.abort()
-        self._txn = Transaction(self._synchs, self)
+        self._txn = Transaction(self._synchs.as_list(), self)
         return self._txn
 
     def get(self):
         if self._txn is None:
-            self._txn = Transaction(self._synchs, self)
+            self._txn = Transaction(self._synchs.as_list(), self)
         return self._txn
 
     def free(self, txn):
@@ -43,7 +89,7 @@
         self._txn = None
 
     def registerSynch(self, synch):
-        self._synchs.append(synch)
+        self._synchs.add(synch)
 
     def unregisterSynch(self, synch):
         self._synchs.remove(synch)
@@ -57,9 +103,9 @@
     def __init__(self):
         # _threads maps thread ids to transactions
         self._txns = {}
-        # _synchs maps a thread id to a list of registered synchronizers.
-        # The list is passed to the Transaction constructor, because
-        # it needs to call the synchronizers when it commits.
+        # _synchs maps a thread id to a WeakSet of registered synchronizers.
+        # The set elements are passed to the Transaction constructor,
+        # because the latter needs to call the synchronizers when it commits.
         self._synchs = {}
 
     def begin(self):
@@ -67,14 +113,20 @@
         txn = self._txns.get(tid)
         if txn is not None:
             txn.abort()
-        txn = self._txns[tid] = Transaction(self._synchs.get(tid), self)
+        synchs = self._synchs.get(tid)
+        if synchs is not None:
+            synchs = synchs.as_list()
+        txn = self._txns[tid] = Transaction(synchs, self)
         return txn
 
     def get(self):
         tid = thread.get_ident()
         txn = self._txns.get(tid)
         if txn is None:
-            txn = self._txns[tid] = Transaction(self._synchs.get(tid), self)
+            synchs = self._synchs.get(tid)
+            if synchs is not None:
+                synchs = synchs.as_list()
+            txn = self._txns[tid] = Transaction(synchs, self)
         return txn
 
     def free(self, txn):
@@ -84,10 +136,12 @@
 
     def registerSynch(self, synch):
         tid = thread.get_ident()
-        L = self._synchs.setdefault(tid, [])
-        L.append(synch)
+        ws = self._synchs.get(tid)
+        if ws is None:
+            ws = self._synchs[tid] = WeakSet()
+        ws.add(synch)
 
     def unregisterSynch(self, synch):
         tid = thread.get_ident()
-        L = self._synchs.get(tid)
-        L.remove(synch)
+        ws = self._synchs[tid]
+        ws.remove(synch)



More information about the Zodb-checkins mailing list