[Zope-CVS] CVS: Products/Ape/lib/apelib/zodb3 - scanner.py:1.4.2.1
storage.py:1.10.2.2
Shane Hathaway
shane at zope.com
Thu Feb 26 11:32:31 EST 2004
Update of /cvs-repository/Products/Ape/lib/apelib/zodb3
In directory cvs.zope.org:/tmp/cvs-serv11522/lib/apelib/zodb3
Modified Files:
Tag: ape-fs-oid-branch
scanner.py storage.py
Log Message:
Re-enabled scanning.
It was necessary to defer scanning new objects until after transaction
commit.
=== Products/Ape/lib/apelib/zodb3/scanner.py 1.4 => 1.4.2.1 ===
--- Products/Ape/lib/apelib/zodb3/scanner.py:1.4 Tue Feb 17 00:25:13 2004
+++ Products/Ape/lib/apelib/zodb3/scanner.py Thu Feb 26 11:32:30 2004
@@ -157,7 +157,6 @@
def __init__(self):
self.current = OOBTree() # OOBTree({ oid -> {source->state} })
self.future = {} # { oid -> ({source->state}, atime) }
- self.uncommitted = {} # { tid -> {oid->{source->state}} }
self.lock = allocate_lock()
self.storage = None
@@ -230,17 +229,6 @@
self.lock.release()
- def afterStore(self, oid, tid, sources):
- """Called by the storage after an object is stored (but not committed.)
- """
- self.lock.acquire()
- try:
- t = self.uncommitted.setdefault(tid, {})
- t[oid] = sources
- finally:
- self.lock.release()
-
-
def scan(self):
"""Scan sources, returning the OIDs of changed objects.
"""
@@ -291,53 +279,9 @@
'Future sources cache size: %d objects.' % len(self.future))
- def afterCommit(self, tid):
- """Commits information recorded by setUncommittedSources().
- """
- self.lock.acquire()
- try:
- if not self.uncommitted.has_key(tid):
- return
- t = self.uncommitted[tid]
- del self.uncommitted[tid]
- finally:
- self.lock.release()
- # Update the sources with new states for the committed OIDs.
- to_scan = {} # { repo -> { source -> state } }
- for oid, sources in t.items():
- if sources:
- for source, state in sources.items():
- repo, location = source
- to_scan.setdefault(repo, {})[source] = state
- changes = {}
- for repo, d in to_scan.items():
- c = repo.poll(d)
- if c:
- changes.update(c)
- self.lock.acquire()
- try:
- now = time()
- for oid, sources in t.items():
- new_sources = {}
- if sources:
- for source, state in sources.items():
- state = changes.get(source, state)
- new_sources[source] = state
- if self.current.has_key(oid):
- self.current[oid] = new_sources
- else:
- self.future[oid] = (new_sources, now)
- finally:
- self.lock.release()
-
-
- def afterAbort(self, tid):
- """Aborts information recorded by setUncommittedSources().
+ def afterCommit(self, oid, sources):
+ """Records changes to sources after commit..
"""
- self.lock.acquire()
- try:
- if self.uncommitted.has_key(tid):
- del self.uncommitted[tid]
- finally:
- self.lock.release()
-
+ self.current[oid] = sources
+ if self.future.has_key(oid):
+ del self.future[oid]
=== Products/Ape/lib/apelib/zodb3/storage.py 1.10.2.1 => 1.10.2.2 ===
--- Products/Ape/lib/apelib/zodb3/storage.py:1.10.2.1 Wed Feb 25 11:03:29 2004
+++ Products/Ape/lib/apelib/zodb3/storage.py Thu Feb 26 11:32:30 2004
@@ -52,6 +52,7 @@
name = 'ApeStorage: ' + ', '.join(names)
self._ltid = None
self.scanner = None
+ self.changed = {} # {tid: {oid: 1}}
BaseStorage.BaseStorage.__init__(self, name)
def __len__(self):
@@ -142,9 +143,13 @@
event, new_hash = self._gwio.store(
oid, classification, state, is_new)
new_h64 = self.hash64(new_hash)
-## if self.scanner is not None:
-## sources = event.mapper.gateway.getPollSources(event)
-## self.scanner.afterStore(oid, self._serial, sources)
+
+ # Remember that this OID changed (for scanning)
+ t = self.changed.get(self._serial)
+ if t is None:
+ t = {}
+ self.changed[self._serial] = t
+ t[oid] = 1
finally:
self._lock_release()
@@ -171,8 +176,8 @@
def _abort(self):
for c in self._conn_list:
c.abort()
- if self.scanner is not None:
- self.scanner.afterAbort(self._serial)
+ if self.changed.has_key(self._serial):
+ del self.changed[self._serial]
def _begin(self, tid, u, d, e):
for c in self._conn_list:
@@ -182,8 +187,13 @@
for c in self._conn_list:
c.finish()
self._ltid = self._serial
- if self.scanner is not None:
- self.scanner.afterCommit(self._serial)
+ if self.changed.has_key(self._serial):
+ oids = self.changed[self._serial]
+ del self.changed[self._serial]
+ if self.scanner:
+ for oid in oids:
+ sources = self._gwio.getPollSources(oid)
+ self.scanner.afterCommit(oid, sources)
def _vote(self):
for c in self._conn_list:
@@ -201,4 +211,3 @@
for c in self._conn_list:
c.close()
self.conf_resource.release(self)
-
More information about the Zope-CVS
mailing list