[Checkins] SVN: relstorage/branches/1.1/ Merged from trunk (-r95006:95060)
Shane Hathaway
shane at hathawaymix.org
Mon Jan 26 21:40:20 EST 2009
Log message for revision 95065:
Merged from trunk (-r95006:95060)
Changed:
U relstorage/branches/1.1/CHANGES.txt
U relstorage/branches/1.1/relstorage/adapters/common.py
U relstorage/branches/1.1/relstorage/adapters/mysql.py
U relstorage/branches/1.1/relstorage/adapters/oracle.py
U relstorage/branches/1.1/relstorage/adapters/postgresql.py
U relstorage/branches/1.1/relstorage/component.xml
U relstorage/branches/1.1/relstorage/relstorage.py
U relstorage/branches/1.1/relstorage/tests/fakecache.py
U relstorage/branches/1.1/relstorage/tests/reltestbase.py
-=-
Modified: relstorage/branches/1.1/CHANGES.txt
===================================================================
--- relstorage/branches/1.1/CHANGES.txt 2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/CHANGES.txt 2009-01-27 02:40:19 UTC (rev 95065)
@@ -1,5 +1,10 @@
Next Release
+- When both cache-servers and poll-interval are set, we now poll the
+ cache for changes on every request. This makes it possible to use
+ a high poll-interval to reduce the database polling burden, yet
+ every client can see changes immediately.
+
- Added the pack-dry-run option, which causes pack operations to only
populate the pack tables with the list of objects and states to pack,
but not actually pack.
Modified: relstorage/branches/1.1/relstorage/adapters/common.py
===================================================================
--- relstorage/branches/1.1/relstorage/adapters/common.py 2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/adapters/common.py 2009-01-27 02:40:19 UTC (rev 95065)
@@ -176,7 +176,27 @@
stmt = '\n'.join(lines)
self._run_script_stmt(cursor, stmt, params)
+ def _open_and_call(self, callback):
+ """Call a function with an open connection and cursor.
+ If the function returns, commits the transaction and returns the
+ result returned by the function.
+ If the function raises an exception, aborts the transaction
+ then propagates the exception.
+ """
+ conn, cursor = self.open()
+ try:
+ try:
+ res = callback(conn, cursor)
+ except:
+ conn.rollback()
+ raise
+ else:
+ conn.commit()
+ return res
+ finally:
+ self.close(conn, cursor)
+
def _transaction_iterator(self, cursor):
"""Iterate over a list of transactions returned from the database.
@@ -727,7 +747,7 @@
pass
- def pack(self, pack_tid, options):
+ def pack(self, pack_tid, options, sleep=time.sleep):
"""Pack. Requires the information provided by pre_pack."""
# Read committed mode is sufficient.
@@ -777,7 +797,7 @@
if delay > 0:
log.debug('pack: sleeping %.4g second(s)',
delay)
- time.sleep(delay)
+ sleep(delay)
self._hold_commit_lock(cursor)
start = time.time()
Modified: relstorage/branches/1.1/relstorage/adapters/mysql.py
===================================================================
--- relstorage/branches/1.1/relstorage/adapters/mysql.py 2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/adapters/mysql.py 2009-01-27 02:40:19 UTC (rev 95065)
@@ -235,66 +235,39 @@
def prepare_schema(self):
"""Create the database schema if it does not already exist."""
- conn, cursor = self.open()
- try:
- try:
- cursor.execute("SHOW TABLES LIKE 'object_state'")
- if not cursor.rowcount:
- self.create_schema(cursor)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
+ def callback(conn, cursor):
+ cursor.execute("SHOW TABLES LIKE 'object_state'")
+ if not cursor.rowcount:
+ self.create_schema(cursor)
+ self._open_and_call(callback)
-
def zap_all(self):
"""Clear all data out of the database."""
- conn, cursor = self.open()
- try:
- try:
- stmt = """
- DELETE FROM object_refs_added;
- DELETE FROM object_ref;
- DELETE FROM current_object;
- DELETE FROM object_state;
- TRUNCATE new_oid;
- DELETE FROM transaction;
- -- Create a transaction to represent object creation.
- INSERT INTO transaction (tid, username, description) VALUES
- (0, 'system', 'special transaction for object creation');
- """
- self._run_script(cursor, stmt)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
+ def callback(conn, cursor):
+ stmt = """
+ DELETE FROM object_refs_added;
+ DELETE FROM object_ref;
+ DELETE FROM current_object;
+ DELETE FROM object_state;
+ TRUNCATE new_oid;
+ DELETE FROM transaction;
+ -- Create a transaction to represent object creation.
+ INSERT INTO transaction (tid, username, description) VALUES
+ (0, 'system', 'special transaction for object creation');
+ """
+ self._run_script(cursor, stmt)
+ self._open_and_call(callback)
-
def drop_all(self):
"""Drop all tables and sequences."""
- conn, cursor = self.open()
- try:
- try:
- for tablename in ('pack_state_tid', 'pack_state',
- 'pack_object', 'object_refs_added', 'object_ref',
- 'current_object', 'object_state', 'new_oid',
- 'transaction'):
- cursor.execute("DROP TABLE IF EXISTS %s" % tablename)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
+ def callback(conn, cursor):
+ for tablename in ('pack_state_tid', 'pack_state',
+ 'pack_object', 'object_refs_added', 'object_ref',
+ 'current_object', 'object_state', 'new_oid',
+ 'transaction'):
+ cursor.execute("DROP TABLE IF EXISTS %s" % tablename)
+ self._open_and_call(callback)
-
def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED"):
"""Open a database connection and return (conn, cursor)."""
try:
Modified: relstorage/branches/1.1/relstorage/adapters/oracle.py
===================================================================
--- relstorage/branches/1.1/relstorage/adapters/oracle.py 2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/adapters/oracle.py 2009-01-27 02:40:19 UTC (rev 95065)
@@ -15,6 +15,8 @@
import logging
import re
+import time
+
import cx_Oracle
from ZODB.POSException import StorageError
@@ -263,77 +265,53 @@
);
"""
self._run_script(cursor, stmt)
+ # Let Oracle catch up with the new data definitions by sleeping.
+ # This reduces the likelihood of spurious ORA-01466 errors.
+ time.sleep(5)
def prepare_schema(self):
"""Create the database schema if it does not already exist."""
- conn, cursor = self.open()
- try:
- try:
- cursor.execute("""
- SELECT 1 FROM USER_TABLES WHERE TABLE_NAME = 'OBJECT_STATE'
- """)
- if not cursor.fetchall():
- self.create_schema(cursor)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
+ def callback(conn, cursor):
+ cursor.execute("""
+ SELECT 1 FROM USER_TABLES WHERE TABLE_NAME = 'OBJECT_STATE'
+ """)
+ if not cursor.fetchall():
+ self.create_schema(cursor)
+ self._open_and_call(callback)
-
def zap_all(self):
"""Clear all data out of the database."""
- conn, cursor = self.open()
- try:
- try:
- stmt = """
- DELETE FROM object_refs_added;
- DELETE FROM object_ref;
- DELETE FROM current_object;
- DELETE FROM object_state;
- DELETE FROM transaction;
- -- Create a transaction to represent object creation.
- INSERT INTO transaction (tid, username, description) VALUES
- (0, UTL_I18N.STRING_TO_RAW('system', 'US7ASCII'),
- UTL_I18N.STRING_TO_RAW(
- 'special transaction for object creation', 'US7ASCII'));
- DROP SEQUENCE zoid_seq;
- CREATE SEQUENCE zoid_seq;
- """
- self._run_script(cursor, stmt)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
+ def callback(conn, cursor):
+ stmt = """
+ DELETE FROM object_refs_added;
+ DELETE FROM object_ref;
+ DELETE FROM current_object;
+ DELETE FROM object_state;
+ DELETE FROM transaction;
+ -- Create a transaction to represent object creation.
+ INSERT INTO transaction (tid, username, description) VALUES
+ (0, UTL_I18N.STRING_TO_RAW('system', 'US7ASCII'),
+ UTL_I18N.STRING_TO_RAW(
+ 'special transaction for object creation', 'US7ASCII'));
+ DROP SEQUENCE zoid_seq;
+ CREATE SEQUENCE zoid_seq;
+ """
+ self._run_script(cursor, stmt)
+ self._open_and_call(callback)
-
def drop_all(self):
"""Drop all tables and sequences."""
- conn, cursor = self.open()
- try:
- try:
- for tablename in ('pack_state_tid', 'pack_state',
- 'pack_object', 'object_refs_added', 'object_ref',
- 'current_object', 'object_state', 'transaction',
- 'commit_lock', 'pack_lock',
- 'temp_store', 'temp_undo', 'temp_pack_visit'):
- cursor.execute("DROP TABLE %s" % tablename)
- cursor.execute("DROP SEQUENCE zoid_seq")
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
+ def callback(conn, cursor):
+ for tablename in ('pack_state_tid', 'pack_state',
+ 'pack_object', 'object_refs_added', 'object_ref',
+ 'current_object', 'object_state', 'transaction',
+ 'commit_lock', 'pack_lock',
+ 'temp_store', 'temp_undo', 'temp_pack_visit'):
+ cursor.execute("DROP TABLE %s" % tablename)
+ cursor.execute("DROP SEQUENCE zoid_seq")
+ self._open_and_call(callback)
-
def open(self, transaction_mode="ISOLATION LEVEL READ COMMITTED",
twophase=False):
"""Open a database connection and return (conn, cursor)."""
Modified: relstorage/branches/1.1/relstorage/adapters/postgresql.py
===================================================================
--- relstorage/branches/1.1/relstorage/adapters/postgresql.py 2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/adapters/postgresql.py 2009-01-27 02:40:19 UTC (rev 95065)
@@ -142,73 +142,46 @@
def prepare_schema(self):
"""Create the database schema if it does not already exist."""
- conn, cursor = self.open()
- try:
- try:
- cursor.execute("""
- SELECT tablename
- FROM pg_tables
- WHERE tablename = 'object_state'
- """)
- if not cursor.rowcount:
- self.create_schema(cursor)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
+ def callback(conn, cursor):
+ cursor.execute("""
+ SELECT tablename
+ FROM pg_tables
+ WHERE tablename = 'object_state'
+ """)
+ if not cursor.rowcount:
+ self.create_schema(cursor)
+ self._open_and_call(callback)
-
def zap_all(self):
"""Clear all data out of the database."""
- conn, cursor = self.open()
- try:
- try:
- cursor.execute("""
- DELETE FROM object_refs_added;
- DELETE FROM object_ref;
- DELETE FROM current_object;
- DELETE FROM object_state;
- DELETE FROM transaction;
- -- Create a special transaction to represent object creation.
- INSERT INTO transaction (tid, username, description) VALUES
- (0, 'system', 'special transaction for object creation');
- ALTER SEQUENCE zoid_seq START WITH 1;
- """)
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
+ def callback(conn, cursor):
+ cursor.execute("""
+ DELETE FROM object_refs_added;
+ DELETE FROM object_ref;
+ DELETE FROM current_object;
+ DELETE FROM object_state;
+ DELETE FROM transaction;
+ -- Create a special transaction to represent object creation.
+ INSERT INTO transaction (tid, username, description) VALUES
+ (0, 'system', 'special transaction for object creation');
+ ALTER SEQUENCE zoid_seq START WITH 1;
+ """)
+ self._open_and_call(callback)
-
def drop_all(self):
"""Drop all tables and sequences."""
- conn, cursor = self.open()
- try:
- try:
- cursor.execute("SELECT tablename FROM pg_tables")
- existent = set([name for (name,) in cursor])
- for tablename in ('pack_state_tid', 'pack_state',
- 'pack_object', 'object_refs_added', 'object_ref',
- 'current_object', 'object_state', 'transaction',
- 'commit_lock', 'pack_lock'):
- if tablename in existent:
- cursor.execute("DROP TABLE %s" % tablename)
- cursor.execute("DROP SEQUENCE zoid_seq")
- except:
- conn.rollback()
- raise
- else:
- conn.commit()
- finally:
- self.close(conn, cursor)
+ def callback(conn, cursor):
+ cursor.execute("SELECT tablename FROM pg_tables")
+ existent = set([name for (name,) in cursor])
+ for tablename in ('pack_state_tid', 'pack_state',
+ 'pack_object', 'object_refs_added', 'object_ref',
+ 'current_object', 'object_state', 'transaction',
+ 'commit_lock', 'pack_lock'):
+ if tablename in existent:
+ cursor.execute("DROP TABLE %s" % tablename)
+ cursor.execute("DROP SEQUENCE zoid_seq")
+ self._open_and_call(callback)
-
def open(self,
isolation=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED):
"""Open a database connection and return (conn, cursor)."""
@@ -278,12 +251,10 @@
def get_db_size(self):
"""Returns the approximate size of the database in bytes"""
- conn, cursor = self.open()
- try:
+ def callback(conn, cursor):
cursor.execute("SELECT pg_database_size(current_database())")
return cursor.fetchone()[0]
- finally:
- self.close(conn, cursor)
+ return self._open_and_call(callback)
def get_current_tid(self, cursor, oid):
"""Returns the current integer tid for an object.
Modified: relstorage/branches/1.1/relstorage/component.xml
===================================================================
--- relstorage/branches/1.1/relstorage/component.xml 2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/component.xml 2009-01-27 02:40:19 UTC (rev 95065)
@@ -26,18 +26,26 @@
</key>
<key name="poll-interval" datatype="float" required="no">
<description>
- Defer polling the database for the specified maximum time interval.
- Set to 0 (the default) to always poll. Fractional seconds are
- allowed.
+ Defer polling the database for the specified maximum time interval,
+ in seconds. Set to 0 (the default) to always poll. Fractional
+ seconds are allowed. Use this to lighten the database load on
+ servers with high read volume and low write volume.
- Use this to lighten the database load on servers with high read
- volume and low write volume. A setting of 1-5 seconds is sufficient
- for most systems.
+ The poll-interval option works best in conjunction with
+ the cache-servers option. If both are enabled, RelStorage will
+ poll a single cache key for changes on every request.
+ The database will not be polled unless the cache indicates
+ there have been changes, or the timeout specified by poll-interval
+ has expired. This configuration keeps clients fully up to date,
+ while removing much of the polling burden from the database.
+ A good cluster configuration is to use memcache servers
+ and a high poll-interval (say, 60 seconds).
- While this setting should not affect database integrity,
- it increases the probability of basing transactions on stale data,
- leading to conflicts. Thus a nonzero setting can hurt
- the performance of servers with high write volume.
+ This option can be used without the cache-servers option,
+ but a large poll-interval without cache-servers increases the
+ probability of basing transactions on stale data, which does not
+ affect database consistency, but does increase the probability
+ of conflict errors, leading to low performance.
</description>
</key>
<key name="pack-gc" datatype="boolean" default="true">
Modified: relstorage/branches/1.1/relstorage/relstorage.py
===================================================================
--- relstorage/branches/1.1/relstorage/relstorage.py 2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/relstorage.py 2009-01-27 02:40:19 UTC (rev 95065)
@@ -664,6 +664,15 @@
txn = self._prepared_txn
assert txn is not None
self._adapter.commit_phase2(self._store_cursor, txn)
+ cache = self._cache_client
+ if cache is not None:
+ if cache.incr('commit_count') is None:
+ # Use the current time as an initial commit_count value.
+ cache.add('commit_count', int(time.time()))
+ # A concurrent committer could have won the race to set the
+ # initial commit_count. Increment commit_count so that it
+ # doesn't matter who won.
+ cache.incr('commit_count')
self._prepared_txn = None
self._ltid = self._tid
self._tid = None
@@ -820,7 +829,7 @@
self._lock_release()
- def pack(self, t, referencesf):
+ def pack(self, t, referencesf, sleep=time.sleep):
if self._is_read_only:
raise POSException.ReadOnlyError()
@@ -867,7 +876,7 @@
log.info("pack: dry run complete")
else:
# Now pack.
- adapter.pack(tid_int, self._options)
+ adapter.pack(tid_int, self._options, sleep=sleep)
self._after_pack()
finally:
adapter.release_pack_lock(lock_cursor)
@@ -901,6 +910,10 @@
options=parent._options)
# _prev_polled_tid contains the tid at the previous poll
self._prev_polled_tid = None
+ # _commit_count contains the last polled value of the
+ # 'commit_count' cache key
+ self._commit_count = 0
+ # _poll_at is the time to poll regardless of commit_count
self._poll_at = 0
def _get_oid_cache_key(self, oid_int):
@@ -925,6 +938,30 @@
finally:
self._lock_release()
+ def need_poll(self):
+ """Return true if polling is needed"""
+ now = time.time()
+
+ cache = self._cache_client
+ if cache is not None:
+ new_commit_count = cache.get('commit_count')
+ if new_commit_count != self._commit_count:
+ # There is new data ready to poll
+ self._commit_count = new_commit_count
+ self._poll_at = now
+ return True
+
+ if not self._load_transaction_open:
+ # Since the load connection is closed or does not have
+ # a transaction in progress, polling is required.
+ return True
+
+ if now >= self._poll_at:
+ # The poll timeout has expired
+ return True
+
+ return False
+
def poll_invalidations(self):
"""Looks for OIDs of objects that changed since _prev_polled_tid
@@ -938,14 +975,10 @@
return {}
if self._options.poll_interval:
- now = time.time()
- if self._load_transaction_open and now < self._poll_at:
- # It's not yet time to poll again. The previous load
- # transaction is still open, so it's safe to
- # ignore this poll.
+ if not self.need_poll():
return {}
- # else poll now after resetting the timeout
- self._poll_at = now + self._options.poll_interval
+ # reset the timeout
+ self._poll_at = time.time() + self._options.poll_interval
self._restart_load()
conn = self._load_conn
Modified: relstorage/branches/1.1/relstorage/tests/fakecache.py
===================================================================
--- relstorage/branches/1.1/relstorage/tests/fakecache.py 2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/tests/fakecache.py 2009-01-27 02:40:19 UTC (rev 95065)
@@ -14,14 +14,27 @@
"""A memcache-like module sufficient for testing without an actual memcache.
"""
+data = {}
+
class Client(object):
def __init__(self, servers):
self.servers = servers
- self.data = {}
def get(self, key):
- return self.data.get(key)
+ return data.get(key)
def set(self, key, value):
- self.data[key] = value
+ data[key] = value
+
+ def add(self, key, value):
+ if key not in data:
+ data[key] = value
+
+ def incr(self, key):
+ value = data.get(key)
+ if value is None:
+ return None
+ value = int(value) + 1
+ data[key] = value
+ return value
Modified: relstorage/branches/1.1/relstorage/tests/reltestbase.py
===================================================================
--- relstorage/branches/1.1/relstorage/tests/reltestbase.py 2009-01-27 02:18:53 UTC (rev 95064)
+++ relstorage/branches/1.1/relstorage/tests/reltestbase.py 2009-01-27 02:40:19 UTC (rev 95065)
@@ -16,6 +16,7 @@
import itertools
import time
from relstorage.relstorage import RelStorage
+from relstorage.tests import fakecache
from ZODB.DB import DB
from ZODB.utils import p64
@@ -42,13 +43,14 @@
def open(self, **kwargs):
adapter = self.make_adapter()
- self._storage = RelStorage(adapter, **kwargs)
+ self._storage = RelStorage(adapter, pack_gc=True, **kwargs)
def setUp(self):
self.open(create=1)
self._storage.zap_all()
def tearDown(self):
+ transaction.abort()
self._storage.close()
self._storage.cleanup()
@@ -231,25 +233,32 @@
def checkLoadFromCache(self):
# Store an object, cache it, then retrieve it from the cache
self._storage._options.cache_servers = 'x:1 y:2'
- self._storage._options.cache_module_name = 'relstorage.tests.fakecache'
+ self._storage._options.cache_module_name = fakecache.__name__
+ fakecache.data.clear()
db = DB(self._storage)
try:
c1 = db.open()
- cache = c1._storage._cache_client
- self.assertEqual(cache.servers, ['x:1', 'y:2'])
- self.assertEqual(len(cache.data), 0)
+ self.assertEqual(c1._storage._cache_client.servers, ['x:1', 'y:2'])
+ self.assertEqual(len(fakecache.data), 0)
r1 = c1.root()
- self.assertEqual(len(cache.data), 2)
+ # the root tid and state should now be cached
+ self.assertEqual(len(fakecache.data), 2)
r1['alpha'] = PersistentMapping()
+ self.assertFalse('commit_count' in fakecache.data)
transaction.commit()
+ self.assertTrue('commit_count' in fakecache.data)
+ self.assertEqual(len(fakecache.data), 3)
oid = r1['alpha']._p_oid
- self.assertEqual(len(cache.data), 2)
- got, serialno = c1._storage.load(oid, '')
- self.assertEqual(len(cache.data), 4)
- # load the object from the cache
- got, serialno = c1._storage.load(oid, '')
+ got, serial = c1._storage.load(oid, '')
+ # another tid and state should now be cached
+ self.assertEqual(len(fakecache.data), 5)
+
+ # load the object via loadSerial()
+ got2 = c1._storage.loadSerial(oid, serial)
+ self.assertEqual(got, got2)
+
# try to load an object that doesn't exist
self.assertRaises(KeyError, c1._storage.load, 'bad.oid.', '')
finally:
@@ -291,7 +300,7 @@
finally:
db.close()
- def checkPollInterval(self):
+ def checkPollInterval(self, using_cache=False):
# Verify the poll_interval parameter causes RelStorage to
# delay invalidation polling.
self._storage._options.poll_interval = 3600
@@ -318,9 +327,19 @@
# flush invalidations to c2, but the poll timer has not
# yet expired, so the change to r2 should not be seen yet.
self.assertTrue(c2._storage._poll_at > 0)
- c2._flush_invalidations()
- r2 = c2.root()
- self.assertEqual(r2['alpha'], 1)
+ if using_cache:
+ # The cache reveals that a poll is needed even though
+ # the poll timeout has not expired.
+ self.assertTrue(c2._storage.need_poll())
+ c2._flush_invalidations()
+ r2 = c2.root()
+ self.assertEqual(r2['alpha'], 2)
+ self.assertFalse(c2._storage.need_poll())
+ else:
+ self.assertFalse(c2._storage.need_poll())
+ c2._flush_invalidations()
+ r2 = c2.root()
+ self.assertEqual(r2['alpha'], 1)
# expire the poll timer and verify c2 sees the change
c2._storage._poll_at -= 3601
@@ -335,7 +354,13 @@
finally:
db.close()
+ def checkPollIntervalWithCache(self):
+ self._storage._options.cache_servers = 'x:1'
+ self._storage._options.cache_module_name = fakecache.__name__
+ fakecache.data.clear()
+ self.checkPollInterval(using_cache=True)
+
def checkTransactionalUndoIterator(self):
# this test overrides the broken version in TransactionalUndoStorage.
@@ -452,7 +477,7 @@
finally:
db.close()
- def checkPackGC(self, gc_enabled=True):
+ def checkPackGC(self, expect_object_deleted=True):
db = DB(self._storage)
try:
c1 = db.open()
@@ -473,7 +498,7 @@
packtime = time.time()
self._storage.pack(packtime, referencesf)
- if gc_enabled:
+ if expect_object_deleted:
# The object should now be gone
self.assertRaises(KeyError, self._storage.load, oid, '')
else:
@@ -484,8 +509,12 @@
def checkPackGCDisabled(self):
self._storage._options.pack_gc = False
- self.checkPackGC(gc_enabled=False)
+ self.checkPackGC(expect_object_deleted=False)
+ def checkPackGCDryRun(self):
+ self._storage._options.pack_dry_run = True
+ self.checkPackGC(expect_object_deleted=False)
+
def checkPackOldUnreferenced(self):
db = DB(self._storage)
try:
@@ -515,6 +544,27 @@
finally:
db.close()
+
+ def checkPackDutyCycle(self):
+ # Exercise the code in the pack algorithm that releases the
+ # commit lock for a time to allow concurrent transactions to commit.
+ self._storage._options.pack_batch_timeout = 0 # pause after every txn
+
+ slept = []
+ def sim_sleep(seconds):
+ slept.append(seconds)
+
+ db = DB(self._storage)
+ try:
+ # Pack
+ now = packtime = time.time()
+ while packtime <= now:
+ packtime = time.time()
+ self._storage.pack(packtime, referencesf, sleep=sim_sleep)
+
+ self.assertEquals(len(slept), 1)
+ finally:
+ db.close()
More information about the Checkins
mailing list