[Zope-CVS] CVS: Products/Ape/lib/apelib/zodb3 - scanner.py:1.1.2.1 connection.py:1.5.4.1 db.py:1.3.4.1 storage.py:1.6.4.1
Shane Hathaway
shane@zope.com
Wed, 23 Jul 2003 00:12:59 -0400
Update of /cvs-repository/Products/Ape/lib/apelib/zodb3
In directory cvs.zope.org:/tmp/cvs-serv21220/lib/apelib/zodb3
Modified Files:
Tag: ape-scan-branch
connection.py db.py storage.py
Added Files:
Tag: ape-scan-branch
scanner.py
Log Message:
Rough implementation of cache freshness scanning.
This will hopefully enable smoother filesystem storage.
=== Added File Products/Ape/lib/apelib/zodb3/scanner.py ===
##############################################################################
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""Cache scanner.
Keeps a cache up to date by scanning for changes.
$Id: scanner.py,v 1.1.2.1 2003/07/23 04:12:52 shane Exp $
"""
from thread import allocate_lock
from time import time
from BTrees.OOBTree import OOBTree, OOSet, difference
from BTrees.IOBTree import IOBTree
# FUTURE_TIMEOUT defines how long to keep source information regarding
# OIDs that might be used soon.
FUTURE_TIMEOUT = 10 * 60
CONNECTION_UPDATE_INTERVAL = 15
SCAN_INTERVAL = 30
class ScanControl:
def __init__(self, db=None):
self.db = db
self.next_conn_id = 1
self.conn_oids = IOBTree() # IOBTree({ conn_id -> OOSet([oid]) } })
self.oids = OOSet() # OOSet([oid])
self.scanner = Scanner()
self.lock = allocate_lock()
self.next_scan = time() + SCAN_INTERVAL
def newConnection(self):
self.lock.acquire()
try:
conn_id = self.next_conn_id
self.next_conn_id = conn_id + 1
return ConnectionScanControl(self, conn_id)
finally:
self.lock.release()
def setConnectionOIDs(self, conn_id, oids):
changed = 0
new_oids = OOSet()
self.lock.acquire()
try:
if oids:
self.conn_oids[conn_id] = OOSet(oids)
else:
if self.conn_oids.has_key(conn_id):
del self.conn_oids[conn_id]
for set in self.conn_oids.values():
new_oids.update(set)
if self.oids != new_oids:
self.oids = new_oids
changed = 1
finally:
self.lock.release()
if changed:
self.scanner.setOIDs(new_oids)
print 'pre-scanning'
self.scanner.scan(new_only=1)
print 'pre-scan done'
self.mayScan()
def mayScan(self):
now = time()
if now >= self.next_scan:
self.next_scan = now + SCAN_INTERVAL
print 'Scanning %d objects' % len(self.oids)
inv = self.scanner.scan()
print 'Finished scanning'
if inv:
print 'Invalidating', inv
d = {}
for oid in inv:
d[oid] = 1
if self.db is not None:
self.db.invalidate(d)
else:
print 'No database set!'
class ConnectionScanControl:
def __init__(self, ctl, conn_id):
self.ctl = ctl
self.conn_id = conn_id
self.next_update = 0
def ready(self):
now = time()
if now >= self.next_update:
self.next_update = now + CONNECTION_UPDATE_INTERVAL
return 1
return 0
def setOIDs(self, oids):
self.ctl.setConnectionOIDs(self.conn_id, oids)
class Scanner:
def __init__(self):
self.current = OOBTree() # OOBTree({ oid -> { source -> status } })
self.future = {} # { oid -> ([source], atime) }
self.lock = allocate_lock()
def setOIDs(self, oids):
self.lock.acquire()
try:
removed = difference(self.current, oids)
for oid in removed.keys():
del self.current[oid]
added = difference(oids, self.current)
for oid in added.keys():
d = {}
info = self.future.get(oid)
if info:
# Source info for this OID was provided earlier.
del self.future[oid]
for source in info[0]:
d[source] = None
self.current[oid] = d
finally:
self.lock.release()
def setSources(self, oid, sources):
self.lock.acquire()
try:
if self.current.has_key(oid):
# This OID is known to be in use.
d = self.current[oid]
keys = d.keys()
keys.sort()
if keys != sources:
for key in keys:
if not key in sources:
# Remove a source
del d[key]
for source in sources:
if not d.has_key(source):
# Add a source with no status yet
d[source] = None
else:
# This OID might be useful soon.
self.future[oid] = (sources, time())
finally:
self.lock.release()
def scan(self, new_only=0):
to_scan = {} # { repo -> { source -> status } }
to_invalidate = {} # { oid -> 1 }
self.lock.acquire()
try:
for oid, statdict in self.current.items():
for source, status in statdict.items():
if new_only and status is not None:
continue
repo, location = source
to_scan.setdefault(repo, {})[source] = status
finally:
self.lock.release()
changes = {}
for repo, d in to_scan.items():
c = repo.freshen(d)
if c:
changes.update(c)
if changes:
# Something changed. Map the changes back to oids and
# update self.current.
self.lock.acquire()
try:
for oid, statdict in self.current.items():
for source, status in statdict.items():
if changes.has_key(source):
if statdict.get(source) is not None:
to_invalidate[oid] = 1
statdict[source] = changes[source]
finally:
self.lock.release()
self.pruneFuture()
return to_invalidate.keys()
def pruneFuture(self):
if self.future:
self.lock.acquire()
try:
# OIDs older than some timeout will probably never be loaded.
cutoff = time() - FUTURE_TIMEOUT
for oid, (sources, atime) in self.future.items():
if atime < cutoff:
del self.future[oid]
finally:
self.lock.release()
=== Products/Ape/lib/apelib/zodb3/connection.py 1.5 => 1.5.4.1 ===
--- Products/Ape/lib/apelib/zodb3/connection.py:1.5 Mon May 26 16:20:09 2003
+++ Products/Ape/lib/apelib/zodb3/connection.py Wed Jul 23 00:12:52 2003
@@ -47,9 +47,24 @@
tabular records.
"""
_osio = None
+ _scan_ctl = None
__implements__ = (IKeyedObjectSystem,
getattr(Connection, '__implements__', ()))
+
+
+ def _setDB(self, odb):
+ Connection._setDB(self, odb)
+ if odb._scan_ctl is not None:
+ ctl = self._scan_ctl
+ if ctl is None:
+ self._scan_ctl = ctl = odb._scan_ctl.newConnection()
+ if ctl.ready():
+ ctl.setOIDs(self._cache.cache_data.keys())
+ # If there were any invalidations, process them now.
+ if self._invalidated:
+ self._flush_invalidations()
+
def getObjectSystemIO(self):
osio = self._osio
=== Products/Ape/lib/apelib/zodb3/db.py 1.3 => 1.3.4.1 ===
--- Products/Ape/lib/apelib/zodb3/db.py:1.3 Wed Jun 4 11:44:45 2003
+++ Products/Ape/lib/apelib/zodb3/db.py Wed Jul 23 00:12:52 2003
@@ -20,6 +20,7 @@
from apelib.core.interfaces import IMapper
from apelib.core.exceptions import ConfigurationError
+
from connection import ApeConnection
from storage import ApeStorage
from oidencoder import OIDEncoder
@@ -47,11 +48,12 @@
klass = ApeConnection
- # SDH: two extra args.
+ # SDH: some extra args.
def __init__(self, storage,
mapper_resource=None,
factory=None,
oid_encoder=None,
+ scan=1,
pool_size=7,
cache_size=400,
cache_deactivate_after=60,
@@ -111,6 +113,13 @@
assert IOIDEncoder.isImplementedBy(oid_encoder)
self._oid_encoder = oid_encoder
self._mapper_resource = mapper_resource
+ if scan:
+ from scanner import ScanControl
+ ctl = ScanControl(self)
+ self._scan_ctl = ctl
+ storage.setScanner(ctl.scanner)
+ else:
+ self._scan_ctl = None
# Pass through methods:
for m in ('history',
=== Products/Ape/lib/apelib/zodb3/storage.py 1.6 => 1.6.4.1 ===
--- Products/Ape/lib/apelib/zodb3/storage.py:1.6 Wed Jun 4 11:45:21 2003
+++ Products/Ape/lib/apelib/zodb3/storage.py Wed Jul 23 00:12:52 2003
@@ -60,8 +60,12 @@
if not name:
name = 'ApeStorage: ' + ', '.join(names)
self._ltid = None
+ self._scanner = None
BaseStorage.BaseStorage.__init__(self, name)
+ def setScanner(self, s):
+ self._scanner = s
+
def __len__(self):
return 1
@@ -96,7 +100,7 @@
try:
self._mapper_resource.access(self) # Update mapper
keychain = self._oid_encoder.decode(oid)
- classified_state, hash_value = self._gwio.load(keychain)
+ event, classified_state, hash_value = self._gwio.load(keychain)
file = StringIO()
p = Pickler(file)
p.dump(classified_state)
@@ -104,6 +108,9 @@
h = self.hash64(hash_value)
if DEBUG:
print 'loaded', `oid`, `h`
+ if self._scanner is not None:
+ sources = event.getSources()
+ self._scanner.setSources(oid, sources)
return data, h
finally:
self._lock_release()
@@ -128,7 +135,7 @@
if h64 != HASH0:
# Overwriting an old object. Use the hash to verify
# that the new data was derived from the old data.
- old_cs, old_hash = self._gwio.load(keychain, 1)
+ event, old_cs, old_hash = self._gwio.load(keychain, 1)
old_h64 = self.hash64(old_hash)
if h64 != old_h64:
raise POSException.ConflictError(
@@ -140,7 +147,7 @@
# NoStateFoundError or a hash of None, otherwise
# there's a conflict.
try:
- cs, old_hash = self._gwio.load(keychain, 1)
+ event, cs, old_hash = self._gwio.load(keychain, 1)
except NoStateFoundError:
pass
else:
@@ -152,8 +159,11 @@
file = StringIO(data)
u = Unpickler(file)
classified_state = u.load()
- new_hash = self._gwio.store(keychain, classified_state)
+ event, new_hash = self._gwio.store(keychain, classified_state)
new_h64 = self.hash64(new_hash)
+ if self._scanner is not None:
+ sources = event.getSources()
+ self._scanner.setSources(oid, sources)
finally:
self._lock_release()