[Zope-CVS] CVS: Products/Ape/lib/apelib/zodb3 - scanner.py:1.2 connection.py:1.6 db.py:1.4 gateways.py:1.3 storage.py:1.7
Shane Hathaway
shane@zope.com
Wed, 30 Jul 2003 17:33:50 -0400
Update of /cvs-repository/Products/Ape/lib/apelib/zodb3
In directory cvs.zope.org:/tmp/cvs-serv5368/lib/apelib/zodb3
Modified Files:
connection.py db.py gateways.py storage.py
Added Files:
scanner.py
Log Message:
Merged ape-scan-branch, sneaking in interface updates and minor reformatting.
Ape now watches the filesystem for changes to objects that Zope has in its
cache.
=== Products/Ape/lib/apelib/zodb3/scanner.py 1.1 => 1.2 ===
--- /dev/null Wed Jul 30 17:33:49 2003
+++ Products/Ape/lib/apelib/zodb3/scanner.py Wed Jul 30 17:33:12 2003
@@ -0,0 +1,277 @@
+##############################################################################
+#
+# Copyright (c) 2003 Zope Corporation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Cache scanner.
+
+Keeps a cache up to date by scanning for changes.
+
+$Id$
+"""
+
+from thread import allocate_lock
+from time import time
+
+from BTrees.OOBTree import OOBTree, OOSet, difference
+from BTrees.IOBTree import IOBTree
+from zLOG import LOG, DEBUG
+
+# FUTURE_TIMEOUT defines how long to keep source information regarding
+# OIDs that might be used soon.
+FUTURE_TIMEOUT = 10 * 60
+
+
+class ScanControl:
+
+ def __init__(self, db=None, scan_interval=10):
+ self.db = db
+ self.next_conn_id = 1
+ self.conn_oids = IOBTree() # IOBTree({ conn_id -> OOSet([oid]) } })
+ self.oids = OOSet() # OOSet([oid])
+ self.scanner = Scanner()
+ self.lock = allocate_lock()
+ self.scan_interval = scan_interval
+ self.next_scan = time() + scan_interval
+
+
+ def newConnection(self):
+ self.lock.acquire()
+ try:
+ conn_id = self.next_conn_id
+ self.next_conn_id = conn_id + 1
+ return ConnectionScanControl(self, conn_id)
+ finally:
+ self.lock.release()
+
+
+ def setConnectionOIDs(self, conn_id, oids):
+ changed = 0
+ new_oids = OOSet()
+ self.lock.acquire()
+ try:
+ if oids:
+ self.conn_oids[conn_id] = OOSet(oids)
+ else:
+ if self.conn_oids.has_key(conn_id):
+ del self.conn_oids[conn_id]
+ for set in self.conn_oids.values():
+ new_oids.update(set)
+ if self.oids != new_oids:
+ self.oids = new_oids
+ changed = 1
+ finally:
+ self.lock.release()
+ if changed:
+ self.scanner.setOIDs(new_oids)
+ self.mayScan()
+
+
+ def mayScan(self):
+ now = time()
+ if now >= self.next_scan:
+ self.next_scan = now + self.scan_interval
+ LOG('Ape', DEBUG, 'Scanning %d objects.' % len(self.oids))
+ inv = self.scanner.scan()
+ self.scanner.pruneFuture()
+ LOG('Ape', DEBUG,
+ 'Finished scanning. %d objects changed.' % len(inv))
+ if inv:
+ d = {}
+ for oid in inv:
+ d[oid] = 1
+ if self.db is not None:
+ self.db.invalidate(d)
+ else:
+ LOG('Ape', DEBUG, "No database set, so can't invalidate!")
+
+
+class ConnectionScanControl:
+
+ def __init__(self, ctl, conn_id):
+ self.ctl = ctl
+ self.conn_id = conn_id
+ self.next_update = 0
+
+ def ready(self):
+ now = time()
+ if now >= self.next_update:
+ self.next_update = now + self.ctl.scan_interval
+ return 1
+ return 0
+
+ def setOIDs(self, oids):
+ self.ctl.setConnectionOIDs(self.conn_id, oids)
+
+
+class Scanner:
+
+ def __init__(self):
+ self.current = OOBTree() # OOBTree({ oid -> {source->state} })
+ self.future = {} # { oid -> ({source->state}, atime) }
+ self.uncommitted = {} # { tid -> {oid->{source->state}} }
+ self.lock = allocate_lock()
+ self.storage = None
+
+ def setStorage(self, s):
+ # This is needed for calling storage.getSources().
+ self.storage = s
+
+ def setOIDs(self, oids):
+ new_sources = {} # { oid -> sourcedict }
+ self.lock.acquire()
+ try:
+ removed = difference(self.current, oids)
+ for oid in removed.keys():
+ del self.current[oid]
+ added = difference(oids, self.current)
+ for oid in added.keys():
+ if self.future.has_key(oid):
+ # Source info for this OID was provided earlier.
+ sources, atime = self.future[oid]
+ del self.future[oid]
+ self.current[oid] = sources
+ else:
+ new_sources[oid] = None
+ finally:
+ self.lock.release()
+ if new_sources:
+ # Load source info the slow way.
+ if self.storage is not None:
+ LOG('Ape', DEBUG, 'Getting sources for %d oids.'
+ % len(new_sources))
+ for oid in new_sources.keys():
+ new_sources[oid] = self.storage.getSources(oid)
+ else:
+ LOG('Ape', DEBUG, "Can't get sources for %d oids. "
+ "Assuming no sources!" % len(new_sources))
+ # This will cause the scanner to miss changes, but
+ # since no storage is known, there is little we can
+ # do.
+ for oid in new_sources.keys():
+ new_sources[oid] = {}
+ self.lock.acquire()
+ try:
+ for oid, sources in new_sources.items():
+ if not self.current.has_key(oid):
+ self.current[oid] = sources
+ # else something else added the source info
+ # while self.lock was released.
+ finally:
+ self.lock.release()
+
+
+ def setSources(self, oid, sources):
+ if sources is None:
+ sources = {}
+ self.lock.acquire()
+ try:
+ if self.current.has_key(oid):
+ # This OID is known to be in use.
+ self.current[oid] = sources
+ else:
+ # This OID might be useful soon.
+ self.future[oid] = (sources, time())
+ finally:
+ self.lock.release()
+
+
+ def setUncommittedSources(self, tid, oid, sources):
+ self.lock.acquire()
+ try:
+ t = self.uncommitted.setdefault(tid, {})
+ t[oid] = sources
+ finally:
+ self.lock.release()
+
+
+ def scan(self):
+ to_scan = {} # { repo -> { source -> state } }
+ to_invalidate = {} # { oid -> 1 }
+ self.lock.acquire() # lock because oid_states might be self.current.
+ try:
+ for oid, sources in self.current.items():
+ for source, state in sources.items():
+ repo, location = source
+ to_scan.setdefault(repo, {})[source] = state
+ finally:
+ self.lock.release()
+ changes = {}
+ for repo, d in to_scan.items():
+ c = repo.freshen(d)
+ if c:
+ changes.update(c)
+ if changes:
+ # Something changed. Map the changes back to oids and
+ # update self.current.
+ self.lock.acquire()
+ try:
+ for oid, sources in self.current.items():
+ for source, state in sources.items():
+ if changes.has_key(source):
+ to_invalidate[oid] = 1
+ sources[source] = changes[source]
+ finally:
+ self.lock.release()
+ return to_invalidate.keys()
+
+
+ def pruneFuture(self):
+ if self.future:
+ self.lock.acquire()
+ try:
+ # OIDs older than some timeout will probably never be loaded.
+ cutoff = time() - FUTURE_TIMEOUT
+ for oid, (sources, atime) in self.future.items():
+ if atime < cutoff:
+ del self.future[oid]
+ finally:
+ self.lock.release()
+ LOG('Ape', DEBUG,
+ 'Future sources cache size: %d objects.' % len(self.future))
+
+
+ def afterCommit(self, tid):
+ self.lock.acquire()
+ try:
+ if not self.uncommitted.has_key(tid):
+ return
+ t = self.uncommitted[tid]
+ del self.uncommitted[tid]
+ finally:
+ self.lock.release()
+ # Update the sources with new states for the committed OIDs.
+ to_scan = {} # { repo -> { source -> state } }
+ for oid, sources in t.items():
+ for source, state in sources.items():
+ repo, location = source
+ to_scan.setdefault(repo, {})[source] = state
+ changes = {}
+ for repo, d in to_scan.items():
+ c = repo.freshen(d)
+ if c:
+ changes.update(c)
+ for oid, sources in t.items():
+ new_sources = {}
+ for source, state in sources.items():
+ state = changes.get(source, state)
+ new_sources[source] = state
+ self.setSources(oid, new_sources)
+
+
+ def afterAbort(self, tid):
+ self.lock.acquire()
+ try:
+ if self.uncommitted.has_key(tid):
+ del self.uncommitted[tid]
+ finally:
+ self.lock.release()
+
=== Products/Ape/lib/apelib/zodb3/connection.py 1.5 => 1.6 ===
--- Products/Ape/lib/apelib/zodb3/connection.py:1.5 Mon May 26 16:20:09 2003
+++ Products/Ape/lib/apelib/zodb3/connection.py Wed Jul 30 17:33:12 2003
@@ -47,10 +47,25 @@
tabular records.
"""
_osio = None
+ _scan_ctl = None
__implements__ = (IKeyedObjectSystem,
getattr(Connection, '__implements__', ()))
+
+ def _setDB(self, odb):
+ Connection._setDB(self, odb)
+ if odb._scan_ctl is not None:
+ ctl = self._scan_ctl
+ if ctl is None:
+ self._scan_ctl = ctl = odb._scan_ctl.newConnection()
+ if ctl.ready():
+ ctl.setOIDs(self._cache.cache_data.keys())
+ # If there were any invalidations, process them now.
+ if self._invalidated:
+ self._flush_invalidations()
+
+
def getObjectSystemIO(self):
osio = self._osio
if osio is None:
@@ -488,7 +503,7 @@
if obj is None:
return
if serial == ResolvedSerial:
- obj._p_changed = None
+ del obj._p_changed
else:
if change:
obj._p_changed = 0
@@ -502,7 +517,7 @@
if obj is None:
continue
if serial == ResolvedSerial:
- obj._p_changed = None
+ del obj._p_changed
else:
if change:
obj._p_changed = 0
=== Products/Ape/lib/apelib/zodb3/db.py 1.3 => 1.4 ===
--- Products/Ape/lib/apelib/zodb3/db.py:1.3 Wed Jun 4 11:44:45 2003
+++ Products/Ape/lib/apelib/zodb3/db.py Wed Jul 30 17:33:12 2003
@@ -20,6 +20,7 @@
from apelib.core.interfaces import IMapper
from apelib.core.exceptions import ConfigurationError
+
from connection import ApeConnection
from storage import ApeStorage
from oidencoder import OIDEncoder
@@ -47,11 +48,12 @@
klass = ApeConnection
- # SDH: two extra args.
+ # SDH: some extra args.
def __init__(self, storage,
mapper_resource=None,
factory=None,
oid_encoder=None,
+ scan_interval=10,
pool_size=7,
cache_size=400,
cache_deactivate_after=60,
@@ -111,6 +113,14 @@
assert IOIDEncoder.isImplementedBy(oid_encoder)
self._oid_encoder = oid_encoder
self._mapper_resource = mapper_resource
+ if scan_interval > 0:
+ from scanner import ScanControl
+ ctl = ScanControl(db=self, scan_interval=scan_interval)
+ self._scan_ctl = ctl
+ ctl.scanner.setStorage(storage)
+ storage.setScanner(ctl.scanner)
+ else:
+ self._scan_ctl = None
# Pass through methods:
for m in ('history',
=== Products/Ape/lib/apelib/zodb3/gateways.py 1.2 => 1.3 ===
--- Products/Ape/lib/apelib/zodb3/gateways.py:1.2 Wed Jul 9 11:40:12 2003
+++ Products/Ape/lib/apelib/zodb3/gateways.py Wed Jul 30 17:33:12 2003
@@ -56,3 +56,6 @@
% (repr(data), repr(expect)))
return None
+ def getSources(self, event):
+ return None
+
=== Products/Ape/lib/apelib/zodb3/storage.py 1.6 => 1.7 ===
--- Products/Ape/lib/apelib/zodb3/storage.py:1.6 Wed Jun 4 11:45:21 2003
+++ Products/Ape/lib/apelib/zodb3/storage.py Wed Jul 30 17:33:12 2003
@@ -60,8 +60,12 @@
if not name:
name = 'ApeStorage: ' + ', '.join(names)
self._ltid = None
+ self._scanner = None
BaseStorage.BaseStorage.__init__(self, name)
+ def setScanner(self, s):
+ self._scanner = s
+
def __len__(self):
return 1
@@ -96,7 +100,7 @@
try:
self._mapper_resource.access(self) # Update mapper
keychain = self._oid_encoder.decode(oid)
- classified_state, hash_value = self._gwio.load(keychain)
+ event, classified_state, hash_value = self._gwio.load(keychain)
file = StringIO()
p = Pickler(file)
p.dump(classified_state)
@@ -104,6 +108,10 @@
h = self.hash64(hash_value)
if DEBUG:
print 'loaded', `oid`, `h`
+ if self._scanner is not None:
+ gw = event.getMapper().getGateway()
+ sources = gw.getSources(event)
+ self._scanner.setSources(oid, sources)
return data, h
finally:
self._lock_release()
@@ -128,7 +136,7 @@
if h64 != HASH0:
# Overwriting an old object. Use the hash to verify
# that the new data was derived from the old data.
- old_cs, old_hash = self._gwio.load(keychain, 1)
+ event, old_cs, old_hash = self._gwio.load(keychain)
old_h64 = self.hash64(old_hash)
if h64 != old_h64:
raise POSException.ConflictError(
@@ -140,7 +148,7 @@
# NoStateFoundError or a hash of None, otherwise
# there's a conflict.
try:
- cs, old_hash = self._gwio.load(keychain, 1)
+ event, cs, old_hash = self._gwio.load(keychain)
except NoStateFoundError:
pass
else:
@@ -152,8 +160,12 @@
file = StringIO(data)
u = Unpickler(file)
classified_state = u.load()
- new_hash = self._gwio.store(keychain, classified_state)
+ event, new_hash = self._gwio.store(keychain, classified_state)
new_h64 = self.hash64(new_hash)
+ if self._scanner is not None:
+ gw = event.getMapper().getGateway()
+ sources = gw.getSources(event)
+ self._scanner.setUncommittedSources(self._serial, oid, sources)
finally:
self._lock_release()
@@ -161,6 +173,14 @@
print 'stored', `oid`, `h64`, `new_h64`
return new_h64
+ def getSources(self, oid):
+ keychain = self._oid_encoder.decode(oid)
+ self._lock_acquire()
+ try:
+ return self._gwio.getSources(keychain)
+ finally:
+ self._lock_release()
+
def new_oid(self):
keychain = self._gwio.newKeychain()
return self._oid_encoder.encode(keychain)
@@ -174,6 +194,8 @@
def _abort(self):
for c in self._conn_list:
c.abort()
+ if self._scanner is not None:
+ self._scanner.afterAbort(self._serial)
def _begin(self, tid, u, d, e):
for c in self._conn_list:
@@ -183,6 +205,8 @@
for c in self._conn_list:
c.finish()
self._ltid = self._serial
+ if self._scanner is not None:
+ self._scanner.afterCommit(self._serial)
def _vote(self):
for c in self._conn_list: