[Zope-CVS] CVS: Products/Ape/lib/apelib/zodb3 - connection.py:1.5.4.2 gateways.py:1.2.2.1 scanner.py:1.1.2.2 storage.py:1.6.4.2
Shane Hathaway
shane@zope.com
Thu, 24 Jul 2003 08:15:48 -0400
Update of /cvs-repository/Products/Ape/lib/apelib/zodb3
In directory cvs.zope.org:/tmp/cvs-serv8851/lib/apelib/zodb3
Modified Files:
Tag: ape-scan-branch
connection.py gateways.py scanner.py storage.py
Log Message:
Modified strategy that asks gateways for sources directly.
=== Products/Ape/lib/apelib/zodb3/connection.py 1.5.4.1 => 1.5.4.2 ===
--- Products/Ape/lib/apelib/zodb3/connection.py:1.5.4.1 Wed Jul 23 00:12:52 2003
+++ Products/Ape/lib/apelib/zodb3/connection.py Thu Jul 24 08:15:40 2003
@@ -503,7 +503,7 @@
if obj is None:
return
if serial == ResolvedSerial:
- obj._p_changed = None
+ del obj._p_changed
else:
if change:
obj._p_changed = 0
@@ -517,7 +517,7 @@
if obj is None:
continue
if serial == ResolvedSerial:
- obj._p_changed = None
+ del obj._p_changed
else:
if change:
obj._p_changed = 0
=== Products/Ape/lib/apelib/zodb3/gateways.py 1.2 => 1.2.2.1 ===
--- Products/Ape/lib/apelib/zodb3/gateways.py:1.2 Wed Jul 9 11:40:12 2003
+++ Products/Ape/lib/apelib/zodb3/gateways.py Thu Jul 24 08:15:40 2003
@@ -56,3 +56,6 @@
% (repr(data), repr(expect)))
return None
+ def getSources(self, event):
+ return None
+
=== Products/Ape/lib/apelib/zodb3/scanner.py 1.1.2.1 => 1.1.2.2 ===
--- Products/Ape/lib/apelib/zodb3/scanner.py:1.1.2.1 Wed Jul 23 00:12:52 2003
+++ Products/Ape/lib/apelib/zodb3/scanner.py Thu Jul 24 08:15:40 2003
@@ -27,8 +27,8 @@
# OIDs that might be used soon.
FUTURE_TIMEOUT = 10 * 60
-CONNECTION_UPDATE_INTERVAL = 15
-SCAN_INTERVAL = 30
+CONNECTION_UPDATE_INTERVAL = 10
+SCAN_INTERVAL = 10
class ScanControl:
@@ -42,6 +42,7 @@
self.lock = allocate_lock()
self.next_scan = time() + SCAN_INTERVAL
+
def newConnection(self):
self.lock.acquire()
try:
@@ -51,6 +52,7 @@
finally:
self.lock.release()
+
def setConnectionOIDs(self, conn_id, oids):
changed = 0
new_oids = OOSet()
@@ -70,17 +72,16 @@
self.lock.release()
if changed:
self.scanner.setOIDs(new_oids)
- print 'pre-scanning'
- self.scanner.scan(new_only=1)
- print 'pre-scan done'
self.mayScan()
+
def mayScan(self):
now = time()
if now >= self.next_scan:
self.next_scan = now + SCAN_INTERVAL
print 'Scanning %d objects' % len(self.oids)
inv = self.scanner.scan()
+ self.scanner.pruneFuture()
print 'Finished scanning'
if inv:
print 'Invalidating', inv
@@ -114,8 +115,9 @@
class Scanner:
def __init__(self):
- self.current = OOBTree() # OOBTree({ oid -> { source -> status } })
- self.future = {} # { oid -> ([source], atime) }
+ self.current = OOBTree() # OOBTree({ oid -> {source->state} })
+ self.future = {} # { oid -> ({source->state}, atime) }
+ self.uncommitted = {} # { tid -> {oid->{source->state}} }
self.lock = allocate_lock()
@@ -127,35 +129,25 @@
del self.current[oid]
added = difference(oids, self.current)
for oid in added.keys():
- d = {}
- info = self.future.get(oid)
- if info:
+ if self.future.has_key(oid):
# Source info for this OID was provided earlier.
+ sources, atime = self.future[oid]
del self.future[oid]
- for source in info[0]:
- d[source] = None
- self.current[oid] = d
+ else:
+ sources = {}
+ self.current[oid] = sources
finally:
self.lock.release()
def setSources(self, oid, sources):
+ if sources is None:
+ sources = {}
self.lock.acquire()
try:
if self.current.has_key(oid):
# This OID is known to be in use.
- d = self.current[oid]
- keys = d.keys()
- keys.sort()
- if keys != sources:
- for key in keys:
- if not key in sources:
- # Remove a source
- del d[key]
- for source in sources:
- if not d.has_key(source):
- # Add a source with no status yet
- d[source] = None
+ self.current[oid] = sources
else:
# This OID might be useful soon.
self.future[oid] = (sources, time())
@@ -163,17 +155,24 @@
self.lock.release()
- def scan(self, new_only=0):
- to_scan = {} # { repo -> { source -> status } }
- to_invalidate = {} # { oid -> 1 }
+ def setUncommittedSources(self, tid, oid, sources):
self.lock.acquire()
try:
- for oid, statdict in self.current.items():
- for source, status in statdict.items():
- if new_only and status is not None:
- continue
+ t = self.uncommitted.setdefault(tid, {})
+ t[oid] = sources
+ finally:
+ self.lock.release()
+
+
+ def scan(self):
+ to_scan = {} # { repo -> { source -> state } }
+ to_invalidate = {} # { oid -> 1 }
+ self.lock.acquire() # lock because oid_states might be self.current.
+ try:
+ for oid, sources in self.current.items():
+ for source, state in sources.items():
repo, location = source
- to_scan.setdefault(repo, {})[source] = status
+ to_scan.setdefault(repo, {})[source] = state
finally:
self.lock.release()
changes = {}
@@ -186,15 +185,13 @@
# update self.current.
self.lock.acquire()
try:
- for oid, statdict in self.current.items():
- for source, status in statdict.items():
+ for oid, sources in self.current.items():
+ for source, state in sources.items():
if changes.has_key(source):
- if statdict.get(source) is not None:
- to_invalidate[oid] = 1
- statdict[source] = changes[source]
+ to_invalidate[oid] = 1
+ sources[source] = changes[source]
finally:
self.lock.release()
- self.pruneFuture()
return to_invalidate.keys()
@@ -209,4 +206,41 @@
del self.future[oid]
finally:
self.lock.release()
+
+
+ def afterCommit(self, tid):
+ self.lock.acquire()
+ try:
+ if not self.uncommitted.has_key(tid):
+ return
+ t = self.uncommitted[tid]
+ del self.uncommitted[tid]
+ finally:
+ self.lock.release()
+ # Update the sources with new states for the committed OIDs.
+ to_scan = {} # { repo -> { source -> state } }
+ for oid, sources in t.items():
+ for source, state in sources.items():
+ repo, location = source
+ to_scan.setdefault(repo, {})[source] = state
+ changes = {}
+ for repo, d in to_scan.items():
+ c = repo.freshen(d)
+ if c:
+ changes.update(c)
+ for oid, sources in t.items():
+ new_sources = {}
+ for source, state in sources.items():
+ state = changes.get(source, state)
+ new_sources[source] = state
+ self.setSources(oid, new_sources)
+
+
+ def afterAbort(self, tid):
+ self.lock.acquire()
+ try:
+ if self.uncommitted.has_key(tid):
+ del self.uncommitted[tid]
+ finally:
+ self.lock.release()
=== Products/Ape/lib/apelib/zodb3/storage.py 1.6.4.1 => 1.6.4.2 ===
--- Products/Ape/lib/apelib/zodb3/storage.py:1.6.4.1 Wed Jul 23 00:12:52 2003
+++ Products/Ape/lib/apelib/zodb3/storage.py Thu Jul 24 08:15:40 2003
@@ -109,7 +109,8 @@
if DEBUG:
print 'loaded', `oid`, `h`
if self._scanner is not None:
- sources = event.getSources()
+ gw = event.getMapper().getGateway()
+ sources = gw.getSources(event)
self._scanner.setSources(oid, sources)
return data, h
finally:
@@ -135,7 +136,7 @@
if h64 != HASH0:
# Overwriting an old object. Use the hash to verify
# that the new data was derived from the old data.
- event, old_cs, old_hash = self._gwio.load(keychain, 1)
+ event, old_cs, old_hash = self._gwio.load(keychain)
old_h64 = self.hash64(old_hash)
if h64 != old_h64:
raise POSException.ConflictError(
@@ -147,7 +148,7 @@
# NoStateFoundError or a hash of None, otherwise
# there's a conflict.
try:
- event, cs, old_hash = self._gwio.load(keychain, 1)
+ event, cs, old_hash = self._gwio.load(keychain)
except NoStateFoundError:
pass
else:
@@ -162,8 +163,9 @@
event, new_hash = self._gwio.store(keychain, classified_state)
new_h64 = self.hash64(new_hash)
if self._scanner is not None:
- sources = event.getSources()
- self._scanner.setSources(oid, sources)
+ gw = event.getMapper().getGateway()
+ sources = gw.getSources(event)
+ self._scanner.setUncommittedSources(self._serial, oid, sources)
finally:
self._lock_release()
@@ -184,6 +186,8 @@
def _abort(self):
for c in self._conn_list:
c.abort()
+ if self._scanner is not None:
+ self._scanner.afterAbort(self._serial)
def _begin(self, tid, u, d, e):
for c in self._conn_list:
@@ -193,6 +197,8 @@
for c in self._conn_list:
c.finish()
self._ltid = self._serial
+ if self._scanner is not None:
+ self._scanner.afterCommit(self._serial)
def _vote(self):
for c in self._conn_list: