[Zope-CVS] CVS: Products/QueueCatalog - version.txt:1.1 CatalogEventQueue.py:1.4 QueueCatalog.py:1.13

Shane Hathaway shane@zope.com
Wed, 18 Jun 2003 15:38:00 -0400


Update of /cvs-repository/Products/QueueCatalog
In directory cvs.zope.org:/tmp/cvs-serv16520

Modified Files:
	CatalogEventQueue.py QueueCatalog.py 
Added Files:
	version.txt 
Log Message:
Added minimal unit tests and brought in sync with work done for a customer.

- The processing limit is now enforced with more precision.  Until
  now, you could get stuck in a situation where you had to process
  thousands of queue entries in a single transaction.  Fixed.

- Add to the catalog immediately only if there are indexes to update
  immediately.

- After processing the queue, show how many items were processed.

- Minor reformatting.



=== Added File Products/QueueCatalog/version.txt ===
QueueCatalog-1.0-unreleased


=== Products/QueueCatalog/CatalogEventQueue.py 1.3 => 1.4 ===
--- Products/QueueCatalog/CatalogEventQueue.py:1.3	Mon Oct 21 08:57:24 2002
+++ Products/QueueCatalog/CatalogEventQueue.py	Wed Jun 18 15:37:29 2003
@@ -99,7 +99,7 @@
     free to think of cases for which our decisions are unacceptably
     wrong and write unit tests for these cases.
 
-    There are two kinds of transactions that effect the queue:
+    There are two kinds of transactions that affect the queue:
 
     - Application transactions always add or modify events. They never
       remove events.
@@ -149,10 +149,23 @@
             state = state[1]
         return state
 
-    def process(self):
+    def process(self, limit=None):
+        """Removes and returns events from this queue.
+
+        If limit is specified, at most (limit) events are removed.
+        """
         data = self._data
-        self._data = {}
-        return data
+        if not limit or len(data) <= limit:
+            self._data = {}
+            return data
+        else:
+            self._p_changed = 1
+            res = {}
+            keys = data.keys()[:limit]
+            for key in keys:
+                res[key] = data[key]
+                del data[key]
+            return res
 
     def _p_resolveConflict(self, oldstate, committed, newstate):
         # Apply the changes made in going from old to newstate to


=== Products/QueueCatalog/QueueCatalog.py 1.12 => 1.13 ===
--- Products/QueueCatalog/QueueCatalog.py:1.12	Mon Jun  9 16:04:31 2003
+++ Products/QueueCatalog/QueueCatalog.py	Wed Jun 18 15:37:29 2003
@@ -225,6 +225,10 @@
             uid = '/'.join(uid)
 
         catalog = self.getZCatalog()
+        cat_indexes = list(catalog.indexes())
+        cat_indexes.sort()
+        immediate_indexes = list(self._immediate_indexes)
+        immediate_indexes.sort()
 
         # The ZCatalog API doesn't allow us to distinguish between
         # adds and updates, so we have to try to figure this out
@@ -241,22 +245,23 @@
 
         # Now, try to decide if the catalog has the uid (path).
 
-        if cataloged(catalog, uid):
-            event = CHANGED
-        else:
-            # Looks like we should add, but maybe there's already a
-            # pending add event. We'd better check the event queue:
-            if (self._queues[hash(uid) % self._buckets].getEvent(uid) in
-                ADDED_EVENTS):
+        if immediate_indexes != cat_indexes:
+            if cataloged(catalog, uid):
                 event = CHANGED
             else:
-                event = ADDED
+                # Looks like we should add, but maybe there's already a
+                # pending add event. We'd better check the event queue:
+                if (self._queues[hash(uid) % self._buckets].getEvent(uid) in
+                    ADDED_EVENTS):
+                    event = CHANGED
+                else:
+                    event = ADDED
 
-        self._update(uid, event)
+            self._update(uid, event)
 
-        if self._immediate_indexes:
+        if immediate_indexes:
             # Update some of the indexes immediately.
-            catalog.catalog_object(obj, uid, self._immediate_indexes)
+            catalog.catalog_object(obj, uid, immediate_indexes)
 
 
     def uncatalog_object(self, uid):
@@ -272,15 +277,22 @@
         if self._immediate_removal:
             self.process()
 
+
     def process(self, max=None):
-        """Process pending events
+        """ Process pending events and return number of events processed. """
+        if not self.manage_size():
+            return 0
 
-        Returns the number of events processed.
-        """
         count = 0
         catalog = self.getZCatalog()
         for queue in filter(None, self._queues):
-            events = queue.process()
+            limit = None
+            if max:
+                # limit the number of events
+                limit = max - count
+
+            events = queue.process(limit)
+
             for uid, (t, event) in events.items():
                 if event is REMOVED:
                     if cataloged(catalog, uid):
@@ -290,14 +302,18 @@
                     if event is CHANGED and not cataloged(catalog, uid):
                         continue
                     # Note that the uid may be relative to the catalog.
-                    obj = catalog.unrestrictedTraverse(uid)
-                    catalog.catalog_object(obj, uid)
+                    obj = catalog.unrestrictedTraverse(uid, None)
+                    if obj is not None:
+                        catalog.catalog_object(obj, uid)
+
                 count = count + 1
+
             if max and count >= max:
-                # On surpassing the maximum, return immediately
+                # On reaching the maximum, return immediately
                 # so the caller can commit the transaction,
                 # sleep for a while, or do something else.
                 break
+
         return count
 
     #
@@ -362,14 +378,11 @@
 
         return size
 
-    def manage_process(self, REQUEST):
+    def manage_process(self, REQUEST, count=100):
         "Web UI to manually process queues"
-        # make sure we have necessary perm
-        self.getZCatalog('catalog_object')
-        self.getZCatalog('uncatalog_object')
-        self.process()
-
-        msg = 'Queue processed'
+        count = int(count)
+        processed = self.process(max=count)
+        msg = '%i Queue item(s) processed' % processed
         return self.manage_queue(manage_tabs_message=msg)
 
     # Provide Zope 2 offerings
@@ -393,10 +406,12 @@
     # Disallow access to subobjects with no security assertions.
     security.setDefaultAccess('deny')
 
-    security.declarePublic('manage_process', 'getTitle', 'title_or_id')
+    security.declarePublic('getTitle', 'title_or_id')
 
     security.declareProtected(manage_zcatalog_entries,
-                              'catalog_object', 'uncatalog_object', 'refreshCatalog')
+                              'catalog_object', 'uncatalog_object',
+                              'refreshCatalog',
+                              'manage_process', 'process')
 
     security.declareProtected(
         'View management screens',