[Zope-CVS] CVS: Products/Ape/lib/apelib/zodb3 -
connection.py:1.6.2.6 db.py:1.5.2.3 scanner.py:1.2.2.4
storage.py:1.8.2.3
Shane Hathaway
shane at zope.com
Sat Jan 3 15:37:10 EST 2004
Update of /cvs-repository/Products/Ape/lib/apelib/zodb3
In directory cvs.zope.org:/tmp/cvs-serv4617/zodb3
Modified Files:
Tag: ape-0_8-branch
connection.py db.py scanner.py storage.py
Log Message:
Changed names in the scanner to make it a little easier to understand.
=== Products/Ape/lib/apelib/zodb3/connection.py 1.6.2.5 => 1.6.2.6 ===
--- Products/Ape/lib/apelib/zodb3/connection.py:1.6.2.5 Sat Jan 3 00:42:44 2004
+++ Products/Ape/lib/apelib/zodb3/connection.py Sat Jan 3 15:36:38 2004
@@ -56,13 +56,17 @@
def _setDB(self, odb):
Connection._setDB(self, odb)
- if odb._scan_ctl is not None:
+ pool_ctl = odb.pool_scan_ctl
+ if pool_ctl is not None:
ctl = self._scan_ctl
if ctl is None:
- self._scan_ctl = ctl = odb._scan_ctl.newConnection()
+ self._scan_ctl = ctl = pool_ctl.newConnection()
if ctl.elapsed():
- # Scan, letting the scanner know which OIDs still matter.
+ # Let the scanner know which OIDs matter.
ctl.setOIDs(self._cache.cache_data.keys())
+ # If it's time, scan on behalf of the whole pool.
+ if pool_ctl.elapsed():
+ pool_ctl.scan()
# If there were any invalidations, process them now.
if self._invalidated:
self._flush_invalidations()
=== Products/Ape/lib/apelib/zodb3/db.py 1.5.2.2 => 1.5.2.3 ===
--- Products/Ape/lib/apelib/zodb3/db.py:1.5.2.2 Sat Dec 20 02:31:08 2003
+++ Products/Ape/lib/apelib/zodb3/db.py Sat Jan 3 15:36:38 2004
@@ -109,11 +109,12 @@
self._conf_resource = conf_resource
scan_interval = int(scan_interval)
if scan_interval > 0:
- from scanner import ScanControl
- ctl = ScanControl(db=self, scan_interval=scan_interval)
- self._scan_ctl = ctl
- ctl.scanner.setStorage(storage)
- storage.setScanner(ctl.scanner)
+ from scanner import PoolScanControl, Scanner
+ pool_ctl = PoolScanControl(storage, db=self, scan_interval=scan_interval)
+ self.pool_scan_ctl = pool_ctl
+ scanner = Scanner()
+ storage.scanner = scanner
+ scanner.storage = storage
else:
self._scan_ctl = None
=== Products/Ape/lib/apelib/zodb3/scanner.py 1.2.2.3 => 1.2.2.4 ===
--- Products/Ape/lib/apelib/zodb3/scanner.py:1.2.2.3 Sat Jan 3 00:42:44 2004
+++ Products/Ape/lib/apelib/zodb3/scanner.py Sat Jan 3 15:36:38 2004
@@ -27,27 +27,28 @@
# FUTURE_TIMEOUT defines how long to keep source information regarding
# OIDs that might be used soon.
-FUTURE_TIMEOUT = 10 * 60
+future_timeout = 10 * 60
-class ScanControl:
+class PoolScanControl:
"""Scanning for a pool of connections.
A ScanControl instance is an attribute of an ApeDB instance. The
actual scanning is delegated to a Scanner instance attached to an
- ApeStorage. The delegation permits scanning to occur on a ZEO
- server while the ScanControl instances exist on ZEO clients.
+ ApeStorage. The delegation theoretically permits scanning to
+ occur on a ZEO server while the ScanControl instances run on
+ separate ZEO clients.
Assigns scanner-specific identities to database connections for
the purpose of tracking which OIDs are still in use.
"""
- def __init__(self, db=None, scan_interval=10):
+ def __init__(self, storage, db=None, scan_interval=10):
+ self.storage = storage
self.db = db
self.next_conn_id = 1
self.conn_oids = IOBTree() # IOBTree({ conn_id -> OOSet([oid]) } })
self.oids = OOSet() # OOSet([oid])
- self.scanner = Scanner()
self.lock = allocate_lock()
self.scan_interval = scan_interval
self.next_scan = time() + scan_interval
@@ -67,8 +68,6 @@
def setConnectionOIDs(self, conn_id, oids):
"""Records the OIDs a connection is using and periodically scans.
-
- Scans only if a timeout for the whole connection pool has elapsed.
"""
changed = 0
new_oids = OOSet()
@@ -87,30 +86,37 @@
finally:
self.lock.release()
if changed:
- self.scanner.setOIDs(new_oids)
- self._mayScan()
+ self.storage.scanner.setOIDs(new_oids)
- def _mayScan(self):
- """Scans for changes if the scanning interval has elapsed.
+ def elapsed(self):
+ """Returns true if the scan interval has elapsed.
"""
now = time()
if now >= self.next_scan:
self.next_scan = now + self.scan_interval
- LOG('Ape', DEBUG, 'Scanning %d objects.' % len(self.oids))
- inv = self.scanner.scan()
- self.scanner.pruneFuture()
- LOG('Ape', DEBUG,
- 'Finished scanning. %d objects changed.' % len(inv))
- if inv:
- # Some objects changed and the caches need to be invalidated.
- d = {}
- for oid in inv:
- d[oid] = 1
- if self.db is not None:
- self.db.invalidate(d)
- else:
- LOG('Ape', DEBUG, "No database set, so can't invalidate!")
+ return 1
+ return 0
+
+
+ def scan(self):
+ """Runs a scan and sends invalidation messages to the database.
+ """
+ LOG('Ape', DEBUG, 'Scanning %d objects.' % len(self.oids))
+ scanner = self.storage.scanner
+ inv = scanner.scan(prune)
+ scanner.pruneFuture()
+ LOG('Ape', DEBUG,
+ 'Finished scanning. %d objects changed.' % len(inv))
+ if inv:
+ # Some objects changed and the caches need to be invalidated.
+ d = {}
+ for oid in inv:
+ d[oid] = 1
+ if self.db is not None:
+ self.db.invalidate(d)
+ else:
+ LOG('Ape', DEBUG, "No database set, so can't invalidate!")
class ConnectionScanControl:
@@ -119,27 +125,27 @@
Delegates to a ScanControl, which in turn delegates to a Scanner.
"""
- def __init__(self, ctl, conn_id):
- self.ctl = ctl
+ def __init__(self, pool_ctl, conn_id):
+ self.pool_ctl = pool_ctl
self.conn_id = conn_id
self.next_update = 0
def elapsed(self):
"""Returns true if the connection-specific scan interval has elapsed.
- The interval is designed to prevent connections from calling
- scanOIDs() with excessive frequency.
+ The interval prevents connections from calling setOIDs() with
+ excessive frequency.
"""
now = time()
if now >= self.next_update:
- self.next_update = now + self.ctl.scan_interval
+ self.next_update = now + self.pool_ctl.scan_interval
return 1
return 0
def setOIDs(self, oids):
"""Records the OIDs this connection is using.
"""
- self.ctl.setConnectionOIDs(self.conn_id, oids)
+ self.pool_ctl.setConnectionOIDs(self.conn_id, oids)
class Scanner:
@@ -155,13 +161,6 @@
self.lock = allocate_lock()
self.storage = None
- def setStorage(self, s):
- """Attaches this scanner to an ApeStorage.
-
- This must be called before storage.getPollSources() will work.
- """
- self.storage = s
-
def setOIDs(self, oids):
"""Sets the list of OIDs to scan.
@@ -211,30 +210,28 @@
self.lock.release()
- def setPollSources(self, oid, sources):
- """Sets the poll sources for one OID.
-
- This method lets ApeStorage provide the source information
- before it is actually requested, which might make the system
- faster overall. The source information is recorded in either
- the 'current' or 'future' table.
+ def afterLoad(self, oid, sources):
+ """Called by the storage after an object is loaded.
"""
if sources is None:
sources = {}
self.lock.acquire()
try:
- if self.current.has_key(oid):
- # This OID is known to be in use.
- self.current[oid] = sources
- else:
- # This OID might be useful soon.
+ if not self.current.has_key(oid):
+ # This object is being loaded for the first time.
+ # Make a record of its current state immediately
+ # so that the next scan can pick up changes.
self.future[oid] = (sources, time())
+ # else we already have info about this object, and now
+ # isn't a good time to update self.current since that that
+ # would prevent changes from being detected at a time when
+ # it's possible to send invalidation messages.
finally:
self.lock.release()
- def setUncommittedSources(self, tid, oid, sources):
- """Records source information that should only be used after commit.
+ def afterStore(self, oid, tid, sources):
+ """Called by the storage after an object is stored (but not committed.)
"""
self.lock.acquire()
try:
@@ -279,14 +276,12 @@
def pruneFuture(self):
"""Prunes the cache of future source information.
-
- See setPollSources().
"""
if self.future:
self.lock.acquire()
try:
# OIDs older than some timeout will probably never be loaded.
- cutoff = time() - FUTURE_TIMEOUT
+ cutoff = time() - future_timeout
for oid, (sources, atime) in self.future.items():
if atime < cutoff:
del self.future[oid]
@@ -319,13 +314,21 @@
c = repo.poll(d)
if c:
changes.update(c)
- for oid, sources in t.items():
- new_sources = {}
- if sources:
- for source, state in sources.items():
- state = changes.get(source, state)
- new_sources[source] = state
- self.setPollSources(oid, new_sources)
+ self.lock.acquire()
+ try:
+ now = time()
+ for oid, sources in t.items():
+ new_sources = {}
+ if sources:
+ for source, state in sources.items():
+ state = changes.get(source, state)
+ new_sources[source] = state
+ if self.current.has_key(oid):
+ self.current[oid] = new_sources
+ else:
+ self.future[oid] = (new_sources, now)
+ finally:
+ self.lock.release()
def afterAbort(self, tid):
=== Products/Ape/lib/apelib/zodb3/storage.py 1.8.2.2 => 1.8.2.3 ===
--- Products/Ape/lib/apelib/zodb3/storage.py:1.8.2.2 Sat Dec 20 23:24:06 2003
+++ Products/Ape/lib/apelib/zodb3/storage.py Sat Jan 3 15:36:38 2004
@@ -51,12 +51,9 @@
if not name:
name = 'ApeStorage: ' + ', '.join(names)
self._ltid = None
- self._scanner = None
+ self.scanner = None
BaseStorage.BaseStorage.__init__(self, name)
- def setScanner(self, s):
- self._scanner = s
-
def __len__(self):
return 1
@@ -99,9 +96,9 @@
h = self.hash64(hash_value)
if DEBUG:
print 'loaded', `oid`, `h`
- if self._scanner is not None:
+ if self.scanner is not None:
sources = event.mapper.gateway.getPollSources(event)
- self._scanner.setPollSources(oid, sources)
+ self.scanner.afterLoad(oid, sources)
return data, h
finally:
self._lock_release()
@@ -143,9 +140,9 @@
classified_state = u.load()
event, new_hash = self._gwio.store(oid, classified_state, is_new)
new_h64 = self.hash64(new_hash)
- if self._scanner is not None:
+ if self.scanner is not None:
sources = event.mapper.gateway.getPollSources(event)
- self._scanner.setUncommittedSources(self._serial, oid, sources)
+ self.scanner.afterStore(oid, self._serial, sources)
finally:
self._lock_release()
@@ -172,8 +169,8 @@
def _abort(self):
for c in self._conn_list:
c.abort()
- if self._scanner is not None:
- self._scanner.afterAbort(self._serial)
+ if self.scanner is not None:
+ self.scanner.afterAbort(self._serial)
def _begin(self, tid, u, d, e):
for c in self._conn_list:
@@ -183,8 +180,8 @@
for c in self._conn_list:
c.finish()
self._ltid = self._serial
- if self._scanner is not None:
- self._scanner.afterCommit(self._serial)
+ if self.scanner is not None:
+ self.scanner.afterCommit(self._serial)
def _vote(self):
for c in self._conn_list:
More information about the Zope-CVS
mailing list