[Zope-CVS] CVS: Products/Ape/lib/apelib/zodb3 - connection.py:1.5.4.2 gateways.py:1.2.2.1 scanner.py:1.1.2.2 storage.py:1.6.4.2

Shane Hathaway shane@zope.com
Thu, 24 Jul 2003 08:15:48 -0400


Update of /cvs-repository/Products/Ape/lib/apelib/zodb3
In directory cvs.zope.org:/tmp/cvs-serv8851/lib/apelib/zodb3

Modified Files:
      Tag: ape-scan-branch
	connection.py gateways.py scanner.py storage.py 
Log Message:
Modified strategy that asks gateways for sources directly.


=== Products/Ape/lib/apelib/zodb3/connection.py 1.5.4.1 => 1.5.4.2 ===
--- Products/Ape/lib/apelib/zodb3/connection.py:1.5.4.1	Wed Jul 23 00:12:52 2003
+++ Products/Ape/lib/apelib/zodb3/connection.py	Thu Jul 24 08:15:40 2003
@@ -503,7 +503,7 @@
             if obj is None:
                 return
             if serial == ResolvedSerial:
-                obj._p_changed = None
+                del obj._p_changed
             else:
                 if change:
                     obj._p_changed = 0
@@ -517,7 +517,7 @@
                 if obj is None:
                     continue
                 if serial == ResolvedSerial:
-                    obj._p_changed = None
+                    del obj._p_changed
                 else:
                     if change:
                         obj._p_changed = 0


=== Products/Ape/lib/apelib/zodb3/gateways.py 1.2 => 1.2.2.1 ===
--- Products/Ape/lib/apelib/zodb3/gateways.py:1.2	Wed Jul  9 11:40:12 2003
+++ Products/Ape/lib/apelib/zodb3/gateways.py	Thu Jul 24 08:15:40 2003
@@ -56,3 +56,6 @@
                 % (repr(data), repr(expect)))
         return None
 
+    def getSources(self, event):
+        return None
+


=== Products/Ape/lib/apelib/zodb3/scanner.py 1.1.2.1 => 1.1.2.2 ===
--- Products/Ape/lib/apelib/zodb3/scanner.py:1.1.2.1	Wed Jul 23 00:12:52 2003
+++ Products/Ape/lib/apelib/zodb3/scanner.py	Thu Jul 24 08:15:40 2003
@@ -27,8 +27,8 @@
 # OIDs that might be used soon.
 FUTURE_TIMEOUT = 10 * 60
 
-CONNECTION_UPDATE_INTERVAL = 15
-SCAN_INTERVAL = 30
+CONNECTION_UPDATE_INTERVAL = 10
+SCAN_INTERVAL = 10
 
 
 class ScanControl:
@@ -42,6 +42,7 @@
         self.lock = allocate_lock()
         self.next_scan = time() + SCAN_INTERVAL
 
+
     def newConnection(self):
         self.lock.acquire()
         try:
@@ -51,6 +52,7 @@
         finally:
             self.lock.release()
 
+
     def setConnectionOIDs(self, conn_id, oids):
         changed = 0
         new_oids = OOSet()
@@ -70,17 +72,16 @@
             self.lock.release()
         if changed:
             self.scanner.setOIDs(new_oids)
-            print 'pre-scanning'
-            self.scanner.scan(new_only=1)
-            print 'pre-scan done'
         self.mayScan()
 
+
     def mayScan(self):
         now = time()
         if now >= self.next_scan:
             self.next_scan = now + SCAN_INTERVAL
             print 'Scanning %d objects' % len(self.oids)
             inv = self.scanner.scan()
+            self.scanner.pruneFuture()
             print 'Finished scanning'
             if inv:
                 print 'Invalidating', inv
@@ -114,8 +115,9 @@
 class Scanner:
 
     def __init__(self):
-        self.current = OOBTree()  # OOBTree({ oid -> { source -> status } })
-        self.future = {}          # { oid -> ([source], atime) }
+        self.current = OOBTree()  # OOBTree({ oid -> {source->state} })
+        self.future = {}          # { oid -> ({source->state}, atime) }
+        self.uncommitted = {}     # { tid -> {oid->{source->state}} }
         self.lock = allocate_lock()
 
 
@@ -127,35 +129,25 @@
                 del self.current[oid]
             added = difference(oids, self.current)
             for oid in added.keys():
-                d = {}
-                info = self.future.get(oid)
-                if info:
+                if self.future.has_key(oid):
                     # Source info for this OID was provided earlier.
+                    sources, atime = self.future[oid]
                     del self.future[oid]
-                    for source in info[0]:
-                        d[source] = None
-                self.current[oid] = d
+                else:
+                    sources = {}
+                self.current[oid] = sources
         finally:
             self.lock.release()
 
 
     def setSources(self, oid, sources):
+        if sources is None:
+            sources = {}
         self.lock.acquire()
         try:
             if self.current.has_key(oid):
                 # This OID is known to be in use.
-                d = self.current[oid]
-                keys = d.keys()
-                keys.sort()
-                if keys != sources:
-                    for key in keys:
-                        if not key in sources:
-                            # Remove a source
-                            del d[key]
-                    for source in sources:
-                        if not d.has_key(source):
-                            # Add a source with no status yet
-                            d[source] = None
+                self.current[oid] = sources
             else:
                 # This OID might be useful soon.
                 self.future[oid] = (sources, time())
@@ -163,17 +155,24 @@
             self.lock.release()
 
 
-    def scan(self, new_only=0):
-        to_scan = {}        # { repo -> { source -> status } }
-        to_invalidate = {}  # { oid -> 1 }
+    def setUncommittedSources(self, tid, oid, sources):
         self.lock.acquire()
         try:
-            for oid, statdict in self.current.items():
-                for source, status in statdict.items():
-                    if new_only and status is not None:
-                        continue
+            t = self.uncommitted.setdefault(tid, {})
+            t[oid] = sources
+        finally:
+            self.lock.release()
+
+
+    def scan(self):
+        to_scan = {}        # { repo -> { source -> state } }
+        to_invalidate = {}  # { oid -> 1 }
+        self.lock.acquire()  # lock because oid_states might be self.current.
+        try:
+            for oid, sources in self.current.items():
+                for source, state in sources.items():
                     repo, location = source
-                    to_scan.setdefault(repo, {})[source] = status
+                    to_scan.setdefault(repo, {})[source] = state
         finally:
             self.lock.release()
         changes = {}
@@ -186,15 +185,13 @@
             # update self.current.
             self.lock.acquire()
             try:
-                for oid, statdict in self.current.items():
-                    for source, status in statdict.items():
+                for oid, sources in self.current.items():
+                    for source, state in sources.items():
                         if changes.has_key(source):
-                            if statdict.get(source) is not None:
-                                to_invalidate[oid] = 1
-                            statdict[source] = changes[source]
+                            to_invalidate[oid] = 1
+                            sources[source] = changes[source]
             finally:
                 self.lock.release()
-        self.pruneFuture()
         return to_invalidate.keys()
 
 
@@ -209,4 +206,41 @@
                         del self.future[oid]
             finally:
                 self.lock.release()
+
+
+    def afterCommit(self, tid):
+        self.lock.acquire()
+        try:
+            if not self.uncommitted.has_key(tid):
+                return
+            t = self.uncommitted[tid]
+            del self.uncommitted[tid]
+        finally:
+            self.lock.release()
+        # Update the sources with new states for the committed OIDs.
+        to_scan = {}        # { repo -> { source -> state } }
+        for oid, sources in t.items():
+            for source, state in sources.items():
+                repo, location = source
+                to_scan.setdefault(repo, {})[source] = state
+        changes = {}
+        for repo, d in to_scan.items():
+            c = repo.freshen(d)
+            if c:
+                changes.update(c)
+        for oid, sources in t.items():
+            new_sources = {}
+            for source, state in sources.items():
+                state = changes.get(source, state)
+                new_sources[source] = state
+            self.setSources(oid, new_sources)
+
+
+    def afterAbort(self, tid):
+        self.lock.acquire()
+        try:
+            if self.uncommitted.has_key(tid):
+                del self.uncommitted[tid]
+        finally:
+            self.lock.release()
 


=== Products/Ape/lib/apelib/zodb3/storage.py 1.6.4.1 => 1.6.4.2 ===
--- Products/Ape/lib/apelib/zodb3/storage.py:1.6.4.1	Wed Jul 23 00:12:52 2003
+++ Products/Ape/lib/apelib/zodb3/storage.py	Thu Jul 24 08:15:40 2003
@@ -109,7 +109,8 @@
             if DEBUG:
                 print 'loaded', `oid`, `h`
             if self._scanner is not None:
-                sources = event.getSources()
+                gw = event.getMapper().getGateway()
+                sources = gw.getSources(event)
                 self._scanner.setSources(oid, sources)
             return data, h
         finally:
@@ -135,7 +136,7 @@
             if h64 != HASH0:
                 # Overwriting an old object.  Use the hash to verify
                 # that the new data was derived from the old data.
-                event, old_cs, old_hash = self._gwio.load(keychain, 1)
+                event, old_cs, old_hash = self._gwio.load(keychain)
                 old_h64 = self.hash64(old_hash)
                 if h64 != old_h64:
                     raise POSException.ConflictError(
@@ -147,7 +148,7 @@
                 # NoStateFoundError or a hash of None, otherwise
                 # there's a conflict.
                 try:
-                    event, cs, old_hash = self._gwio.load(keychain, 1)
+                    event, cs, old_hash = self._gwio.load(keychain)
                 except NoStateFoundError:
                     pass
                 else:
@@ -162,8 +163,9 @@
             event, new_hash = self._gwio.store(keychain, classified_state)
             new_h64 = self.hash64(new_hash)
             if self._scanner is not None:
-                sources = event.getSources()
-                self._scanner.setSources(oid, sources)
+                gw = event.getMapper().getGateway()
+                sources = gw.getSources(event)
+                self._scanner.setUncommittedSources(self._serial, oid, sources)
         finally:
             self._lock_release()
 
@@ -184,6 +186,8 @@
     def _abort(self):
         for c in self._conn_list:
             c.abort()
+        if self._scanner is not None:
+            self._scanner.afterAbort(self._serial)
 
     def _begin(self, tid, u, d, e):
         for c in self._conn_list:
@@ -193,6 +197,8 @@
         for c in self._conn_list:
             c.finish()
         self._ltid = self._serial
+        if self._scanner is not None:
+            self._scanner.afterCommit(self._serial)
 
     def _vote(self):
         for c in self._conn_list: