[Zope-CVS] CVS: Products/QueueCatalog - version.txt:1.1 CatalogEventQueue.py:1.4 QueueCatalog.py:1.13
Shane Hathaway
shane@zope.com
Wed, 18 Jun 2003 15:38:00 -0400
Update of /cvs-repository/Products/QueueCatalog
In directory cvs.zope.org:/tmp/cvs-serv16520
Modified Files:
CatalogEventQueue.py QueueCatalog.py
Added Files:
version.txt
Log Message:
Added minimal unit tests and brought in sync with work done for a customer.
- The processing limit is now enforced with more precision. Until
now, you could get stuck in a situation where you had to process
thousands of queue entries in a single transaction. Fixed.
- Add to the catalog immediately only if there are indexes to update
immediately.
- After processing the queue, show how many items were processed.
- Minor reformatting.
=== Added File Products/QueueCatalog/version.txt ===
QueueCatalog-1.0-unreleased
=== Products/QueueCatalog/CatalogEventQueue.py 1.3 => 1.4 ===
--- Products/QueueCatalog/CatalogEventQueue.py:1.3 Mon Oct 21 08:57:24 2002
+++ Products/QueueCatalog/CatalogEventQueue.py Wed Jun 18 15:37:29 2003
@@ -99,7 +99,7 @@
free to think of cases for which our decisions are unacceptably
wrong and write unit tests for these cases.
- There are two kinds of transactions that effect the queue:
+ There are two kinds of transactions that affect the queue:
- Application transactions always add or modify events. They never
remove events.
@@ -149,10 +149,23 @@
state = state[1]
return state
- def process(self):
+ def process(self, limit=None):
+ """Removes and returns events from this queue.
+
+ If limit is specified, at most (limit) events are removed.
+ """
data = self._data
- self._data = {}
- return data
+ if not limit or len(data) <= limit:
+ self._data = {}
+ return data
+ else:
+ self._p_changed = 1
+ res = {}
+ keys = data.keys()[:limit]
+ for key in keys:
+ res[key] = data[key]
+ del data[key]
+ return res
def _p_resolveConflict(self, oldstate, committed, newstate):
# Apply the changes made in going from old to newstate to
=== Products/QueueCatalog/QueueCatalog.py 1.12 => 1.13 ===
--- Products/QueueCatalog/QueueCatalog.py:1.12 Mon Jun 9 16:04:31 2003
+++ Products/QueueCatalog/QueueCatalog.py Wed Jun 18 15:37:29 2003
@@ -225,6 +225,10 @@
uid = '/'.join(uid)
catalog = self.getZCatalog()
+ cat_indexes = list(catalog.indexes())
+ cat_indexes.sort()
+ immediate_indexes = list(self._immediate_indexes)
+ immediate_indexes.sort()
# The ZCatalog API doesn't allow us to distinguish between
# adds and updates, so we have to try to figure this out
@@ -241,22 +245,23 @@
# Now, try to decide if the catalog has the uid (path).
- if cataloged(catalog, uid):
- event = CHANGED
- else:
- # Looks like we should add, but maybe there's already a
- # pending add event. We'd better check the event queue:
- if (self._queues[hash(uid) % self._buckets].getEvent(uid) in
- ADDED_EVENTS):
+ if immediate_indexes != cat_indexes:
+ if cataloged(catalog, uid):
event = CHANGED
else:
- event = ADDED
+ # Looks like we should add, but maybe there's already a
+ # pending add event. We'd better check the event queue:
+ if (self._queues[hash(uid) % self._buckets].getEvent(uid) in
+ ADDED_EVENTS):
+ event = CHANGED
+ else:
+ event = ADDED
- self._update(uid, event)
+ self._update(uid, event)
- if self._immediate_indexes:
+ if immediate_indexes:
# Update some of the indexes immediately.
- catalog.catalog_object(obj, uid, self._immediate_indexes)
+ catalog.catalog_object(obj, uid, immediate_indexes)
def uncatalog_object(self, uid):
@@ -272,15 +277,22 @@
if self._immediate_removal:
self.process()
+
def process(self, max=None):
- """Process pending events
+ """ Process pending events and return number of events processed. """
+ if not self.manage_size():
+ return 0
- Returns the number of events processed.
- """
count = 0
catalog = self.getZCatalog()
for queue in filter(None, self._queues):
- events = queue.process()
+ limit = None
+ if max:
+ # limit the number of events
+ limit = max - count
+
+ events = queue.process(limit)
+
for uid, (t, event) in events.items():
if event is REMOVED:
if cataloged(catalog, uid):
@@ -290,14 +302,18 @@
if event is CHANGED and not cataloged(catalog, uid):
continue
# Note that the uid may be relative to the catalog.
- obj = catalog.unrestrictedTraverse(uid)
- catalog.catalog_object(obj, uid)
+ obj = catalog.unrestrictedTraverse(uid, None)
+ if obj is not None:
+ catalog.catalog_object(obj, uid)
+
count = count + 1
+
if max and count >= max:
- # On surpassing the maximum, return immediately
+ # On reaching the maximum, return immediately
# so the caller can commit the transaction,
# sleep for a while, or do something else.
break
+
return count
#
@@ -362,14 +378,11 @@
return size
- def manage_process(self, REQUEST):
+ def manage_process(self, REQUEST, count=100):
"Web UI to manually process queues"
- # make sure we have necessary perm
- self.getZCatalog('catalog_object')
- self.getZCatalog('uncatalog_object')
- self.process()
-
- msg = 'Queue processed'
+ count = int(count)
+ processed = self.process(max=count)
+ msg = '%i Queue item(s) processed' % processed
return self.manage_queue(manage_tabs_message=msg)
# Provide Zope 2 offerings
@@ -393,10 +406,12 @@
# Disallow access to subobjects with no security assertions.
security.setDefaultAccess('deny')
- security.declarePublic('manage_process', 'getTitle', 'title_or_id')
+ security.declarePublic('getTitle', 'title_or_id')
security.declareProtected(manage_zcatalog_entries,
- 'catalog_object', 'uncatalog_object', 'refreshCatalog')
+ 'catalog_object', 'uncatalog_object',
+ 'refreshCatalog',
+ 'manage_process', 'process')
security.declareProtected(
'View management screens',