[Zodb-checkins] CVS: Zope3/src/zodb/tests - test_pool.py:1.1 test_invalidation.py:1.2
Jeremy Hylton
jeremy@zope.com
Tue, 21 Jan 2003 13:20:00 -0500
Update of /cvs-repository/Zope3/src/zodb/tests
In directory cvs.zope.org:/tmp/cvs-serv9061/zodb/tests
Modified Files:
test_invalidation.py
Added Files:
test_pool.py
Log Message:
Repair pool and invalidation logic in database.
Make sure invalidations are sent to all connections, regardless of
whether they are opened. Fix logic of connection pools so that
_allocated contains all connections and _pool contains the ones
currently available.
Add tests cases to verify fixes.
=== Added File Zope3/src/zodb/tests/test_pool.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Test the pool size logic in the database."""
import threading
import time
import unittest
from zodb.storage.mapping import DB
class Counter:
def __init__(self):
self._count = 0
self._lock = threading.Lock()
def inc(self):
self._lock.acquire()
try:
self._count += 1
finally:
self._lock.release()
def get(self):
self._lock.acquire()
try:
return self._count
finally:
self._lock.release()
class ConnectThread(threading.Thread):
def __init__(self, db, start_counter, open_counter, close_event):
threading.Thread.__init__(self)
self._db = db
self._start = start_counter
self._open = open_counter
self._close = close_event
def run(self):
self._start.inc()
cn = self._db.open()
self._open.inc()
cn.root()
self._close.wait()
cn.close()
class PoolTest(unittest.TestCase):
def setUp(self):
self.close = threading.Event()
self.db = DB(pool_size=7)
self.threads = []
def tearDown(self):
self.close.set()
for t in self.threads:
t.join()
def testPoolLimit(self):
# The default limit is 7, so try it with 10 threads.
started = Counter()
opened = Counter()
for i in range(10):
t = ConnectThread(self.db, started, opened, self.close)
t.start()
self.threads.append(t)
# It's hard to get the thread synchronization right, but
# this seems like it will sometimes do the right thing.
# Wait for all the threads to call open(). It's possible
# that a thread has started but not yet called open.
for i in range(10):
if started.get() < 10:
time.sleep(0.1)
else:
break
else:
if started.get() < 10:
self.fail("Only started %d threads out of 10" % started.get())
# Now make sure that only 7 of the 10 threads opened.
for i in range(10):
if opened.get() < 7:
time.sleep(0.1)
else:
break
else:
if opened.get() != 7:
self.fail("Expected 7 threads to open, %d did" % opened.get())
self.close.set()
# Now make sure the last three get opened
for i in range(10):
if opened.get() < 10:
time.sleep(0.1)
else:
break
else:
if opened.get() != 10:
self.fail("Expected 10 threads to open, %d did" % opened.get())
def test_suite():
return unittest.makeSuite(PoolTest)
=== Zope3/src/zodb/tests/test_invalidation.py 1.1 => 1.2 ===
--- Zope3/src/zodb/tests/test_invalidation.py:1.1 Wed Jan 15 18:33:06 2003
+++ Zope3/src/zodb/tests/test_invalidation.py Tue Jan 21 13:19:56 2003
@@ -16,6 +16,7 @@
import unittest
from zodb.db import DB
+from zodb.connection import Connection
from zodb.storage.mapping import MappingStorage
from zodb.storage.tests.minpo import MinPO
@@ -23,12 +24,12 @@
class InvalidationTests(unittest.TestCase):
- num_connections = 3
+ num_conn = 4
def setUp(self):
self.storage = MappingStorage()
self.db = DB(self.storage)
- self.connections = [self.db.open() for i in range(4)]
+ self.connections = [self.db.open() for i in range(self.num_conn)]
def tearDown(self):
self.db.close()
@@ -47,7 +48,26 @@
cn.sync()
root = cn.root()
self.assertEqual(root[1].value, 2)
-
+
+ def testSimpleInvalidationClosedConnection(self):
+ # Test that invalidations are still sent to closed connections.
+ root = self.connections[0].root()
+ obj = root[1] = MinPO(1)
+ get_transaction().commit()
+ for cn in self.connections[1:]:
+ cn.sync()
+ root = cn.root()
+ self.assertEqual(root[1].value, 1)
+ cn.close()
+ obj.value = 2
+ get_transaction().commit()
+ for cn in self.connections[1:]:
+ self.assert_(obj._p_oid in cn._invalidated)
+ # Call reset() which is equivalent to re-opening it via
+ # the db connection pool.
+ cn.reset()
+ root = cn.root()
+ self.assertEqual(root[1].value, 2)
def test_suite():
return unittest.makeSuite(InvalidationTests)