[Zope-CVS] CVS: Packages/zasync/client/zasync - client.py:1.6
plugins.py:1.5 zasync.conf:1.4
Gary Poster
gary at zope.com
Thu Oct 28 18:22:21 EDT 2004
Update of /cvs-repository/Packages/zasync/client/zasync
In directory cvs.zope.org:/tmp/cvs-serv21659/client/zasync
Modified Files:
client.py plugins.py zasync.conf
Log Message:
Add new "aggregate" plugin. Include exception info in Conflict Error logs.
=== Packages/zasync/client/zasync/client.py 1.5 => 1.6 ===
--- Packages/zasync/client/zasync/client.py:1.5 Tue Oct 26 10:35:34 2004
+++ Packages/zasync/client/zasync/client.py Thu Oct 28 18:22:20 2004
@@ -186,7 +186,8 @@
get_transaction().commit()
except ConflictError:
logging.getLogger('zasync').info(
- 'Received ConflictError while trying to retry tool; retrying.')
+ 'Received ConflictError while trying to retry tool; retrying.',
+ exc_info=True)
get_transaction().abort()
reactor.callLater(delay, retryTool, path, delay)
except ClientDisconnected:
@@ -233,7 +234,7 @@
def timeoutErrback(deferred):
if not deferred.called:
- deferred.errback(defer.TimeoutError())
+ deferred.errback(defer.TimeoutError('Timed out.'))
def cancelDelayedCall(value, call):
if call.active():
@@ -298,7 +299,8 @@
return
except ConflictError:
get_transaction().abort()
- log.warning('ZODB ConflictError in pollZope. Never give up.')
+ log.warning('ZODB ConflictError in pollZope. Never give up.',
+ exc_info=True)
reactor.callLater(1, pollZope, path) # never give up
return
except (KeyboardInterrupt, SystemExit):
@@ -314,7 +316,72 @@
if application is not None:
application._p_jar.close()
-def makeCall(path, zopeDeferredId, name, args, kwargs, count=0):
+def returnResult(value, path, zopeDeferredId, error=False, count=0):
+ global active, app
+ if isinstance(value, failure.Failure):
+ value.cleanFailure()
+ log = logging.getLogger('zasync')
+ log.debug('returnResult got value for %s (%s):\n\n%r',
+ zopeDeferredId, path, value)
+ try:
+ del active[zopeDeferredId]
+ except KeyError:
+ pass
+ application = None
+ try:
+ if is_connected():
+ try:
+ application = app()
+ try:
+ tool = getRequestApp(application).unrestrictedTraverse(path)
+ except (AttributeError, LookupError):
+ scheduleToolRetry(
+ path, returnResult, value, path, zopeDeferredId,
+ error, count)
+ return value
+ zopeDeferred = tool.getDeferred(zopeDeferredId)
+ if zopeDeferred is None:
+ return value
+ if error:
+ call = zopeDeferred.errback
+ else:
+ call = zopeDeferred.callback
+ res = call(value)
+ get_transaction().commit()
+ except ConflictError:
+ log.warning('ZODB ConflictError in returnResult', exc_info=True)
+ get_transaction().abort()
+ if count < max_conflict_resolution_attempts:
+ reactor.callLater(
+ count, returnResult, value, path, zopeDeferredId,
+ error, count+1)
+ else:
+ res = failure.Failure()
+ out = StringIO.StringIO()
+ res.printDetailedTraceback(out)
+ log.error(
+ 'Too many ConflictErrors in returnResult: '
+ 'giving up.\n\n%s', out.getvalue())
+ except ClientDisconnected:
+ scheduleServerRetry(
+ returnResult, value, path, zopeDeferredId,
+ error, count)
+ res = value
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except: # give up. Looks like a bug. Log should help fix it.
+ res = logException('Exception from Zope.', log=log)
+ return res
+ else:
+ scheduleServerRetry(
+ returnResult, value, path, zopeDeferredId, error, count)
+ finally:
+ if application is not None:
+ application._p_jar.close()
+ return res
+
+def makeCall(path, zopeDeferredId, name, args, kwargs,
+ returnResult=returnResult, count=0):
# zopeDeferredId is an id of a deferred in the asynchronous tool
global plugins
log = logging.getLogger('zasync')
@@ -336,12 +403,12 @@
res = call(*args, **kwargs)
get_transaction().commit() # just in case plugin touched Zope
except ConflictError:
- log.warning('ZODB ConflictError in makeCall')
+ log.warning('ZODB ConflictError in makeCall', exc_info=True)
get_transaction().abort()
if count < max_conflict_resolution_attempts:
reactor.callLater(
count, makeCall, path, zopeDeferredId,
- name, args, kwargs, count+1)
+ name, args, kwargs, returnResult, count+1)
return
else:
res = failure.Failure()
@@ -353,7 +420,8 @@
# now we use that res (Failure)
except ClientDisconnected:
scheduleServerRetry(
- makeCall, path, zopeDeferredId, name, args, kwargs, count)
+ makeCall, path, zopeDeferredId, name, args, kwargs,
+ returnResult, count)
return
except (KeyboardInterrupt, SystemExit):
raise
@@ -362,17 +430,20 @@
res = failure.Failure()
if isinstance(res, defer.Deferred):
reactor.callLater(
- 0, prepareDeferredCall, path, zopeDeferredId, res, call_info)
+ 0, prepareDeferredCall, path, zopeDeferredId, res, call_info,
+ returnResult)
elif isinstance(res, failure.Failure):
res.cleanFailure()
- reactor.callLater(1, returnResult, res, path, zopeDeferredId, True)
+ reactor.callLater(0, returnResult, res, path, zopeDeferredId, True)
else:
- reactor.callLater(1, returnResult, res, path, zopeDeferredId)
+ reactor.callLater(0, returnResult, res, path, zopeDeferredId)
else:
scheduleServerRetry(
- makeCall, path, zopeDeferredId, name, args, kwargs, count)
+ makeCall, path, zopeDeferredId, name, args, kwargs,
+ returnResult, count)
-def prepareDeferredCall(path, zopeDeferredId, deferred, call_info, count=0):
+def prepareDeferredCall(path, zopeDeferredId, deferred, call_info,
+ returnResult, count=0):
log = logging.getLogger('zasync')
log.debug('prepareDeferredCall called for %s (%s)',
zopeDeferredId, path)
@@ -386,7 +457,7 @@
except (AttributeError, LookupError):
scheduleToolRetry(
path, prepareDeferredCall, path, zopeDeferredId,
- deferred, call_info, count)
+ deferred, call_info, returnResult, count)
return
zopeDeferred = tool.getDeferred(zopeDeferredId)
if zopeDeferred is None:
@@ -398,12 +469,13 @@
remainingSeconds = zopeDeferred.remainingSeconds()
get_transaction().commit()
except ConflictError:
- log.warning('ZODB ConflictError in prepareDeferredCall')
+ log.warning('ZODB ConflictError in prepareDeferredCall',
+ exc_info=True)
get_transaction().abort()
if count < max_conflict_resolution_attempts:
reactor.callLater(
count, prepareDeferredCall, zopeDeferredId, deferred,
- call_info, count+1)
+ call_info, returnResult, count+1)
else:
res = failure.Failure()
out = StringIO.StringIO()
@@ -414,7 +486,7 @@
except ClientDisconnected:
scheduleServerRetry(
prepareDeferredCall, path, zopeDeferredId, deferred,
- call_info, count)
+ call_info, returnResult, count)
except (KeyboardInterrupt, SystemExit):
raise
except: # give up. Looks like a bug. Log should help fix it.
@@ -434,74 +506,11 @@
errbackArgs=(path, zopeDeferredId, True))
else:
scheduleServerRetry(
- makeCall, path, zopeDeferredId, name, args, kwargs, count)
+ prepareDeferredCall, path, zopeDeferredId, deferred, call_info,
+ returnResult, count)
finally:
if application is not None:
application._p_jar.close()
-
-def returnResult(value, path, zopeDeferredId, error=False, count=0):
- global active, app
- if isinstance(value, failure.Failure):
- value.cleanFailure()
- log = logging.getLogger('zasync')
- log.debug('returnResult got value for %s (%s):\n\n%r',
- zopeDeferredId, path, value)
- try:
- del active[zopeDeferredId]
- except KeyError:
- pass
- application = None
- try:
- if is_connected():
- try:
- application = app()
- try:
- tool = getRequestApp(application).unrestrictedTraverse(path)
- except (AttributeError, LookupError):
- scheduleToolRetry(
- path, returnResult, value, path, zopeDeferredId,
- error, count)
- return value
- zopeDeferred = tool.getDeferred(zopeDeferredId)
- if zopeDeferred is None:
- return value
- if error:
- call = zopeDeferred.errback
- else:
- call = zopeDeferred.callback
- res = call(value)
- get_transaction().commit()
- except ConflictError:
- log.warning('ZODB ConflictError in returnResult')
- get_transaction().abort()
- if count < max_conflict_resolution_attempts:
- reactor.callLater(
- count, returnResult, value, path, zopeDeferredId,
- error, count+1)
- else:
- res = failure.Failure()
- out = StringIO.StringIO()
- res.printDetailedTraceback(out)
- log.error(
- 'Too many ConflictErrors in returnResult: '
- 'giving up.\n\n%s', out.getvalue())
- except ClientDisconnected:
- scheduleServerRetry(
- returnResult, value, path, zopeDeferredId,
- error, count)
- res = value
- except (KeyboardInterrupt, SystemExit):
- raise
- except: # give up. Looks like a bug. Log should help fix it.
- res = logException('Exception from Zope.', log=log)
- return res
- else:
- scheduleServerRetry(
- returnResult, value, path, zopeDeferredId, error, count)
- finally:
- if application is not None:
- application._p_jar.close()
- return res
def run(path=None):
# to be called after config.initialize
=== Packages/zasync/client/zasync/plugins.py 1.4 => 1.5 ===
--- Packages/zasync/client/zasync/plugins.py:1.4 Tue Oct 26 10:35:34 2004
+++ Packages/zasync/client/zasync/plugins.py Thu Oct 28 18:22:20 2004
@@ -16,20 +16,86 @@
"""
import logging
+from twisted.python import failure
from twisted.internet import reactor, defer
+from zasync import client
#### simple schedule
def schedule(seconds):
"""proof of concept and "Hello World"; use to fire your callbacks after
- seconds (approximately). zope_exec (below) is better for potentially
+ seconds (approximately). zope_exec is better for potentially
expensive tasks because it is cancellable (can time out) and you can guage
better if the expensive task has started. You might schedule a zope_exec
within a schedule callback, if you needed a scheduled expensive task."""
d = defer.Deferred()
- reactor.callLater(seconds, d, seconds)
+ reactor.callLater(seconds, d.callback, seconds)
return d
+#### aggegate plugins
+
+def aggregatePlugins(zopeDeferredTuple, *calls):
+ """aggregate calls to other plugins. each call may either be a number
+ (integer or float) of seconds to pause, or a tuple of (plugin name, args
+ tuple, kwargs dict). For example,
+
+ aggregatePlugins(
+ ('zope_exec', ('/my_site', 'home/my_script'), {}),
+ 5,
+ ('zope_exec', ('/my_site', 'home/my_other_script'), {}))
+
+ would ask zope_exec to call my_script in one transaction, wait 5 seconds,
+ and then call my_other_script.
+
+ If any of the plugins fail, the failure is returned without proceeding
+ further down the remaining calls. The failure is annotated with a list of
+ the completed calls ('completed_calls'), the call active during the failure
+ ('active_call'), and remaining calls('remaining_calls').
+ """
+ thunkmaker = AggregateThunkMaker(zopeDeferredTuple, *calls)
+ reactor.callLater(0, thunkmaker.makeCall)
+ return thunkmaker.deferred
+
+class AggregateThunkMaker(object):
+
+ def __init__(self, zopeDeferredTuple, *calls):
+ self.deferred = defer.Deferred()
+ self.pending = list(calls)
+ self.completed = []
+ self.begun = None
+ self.results = []
+ self.path, self.zopeDeferredId = zopeDeferredTuple
+
+ def makeCall(self):
+ if not self.pending:
+ self.deferred.callback(self.results)
+ else:
+ if self.begun is not None:
+ self.completed.append(self.begun)
+ task = self.pending.pop(0)
+ self.begun = task
+ if isinstance(task, (int, float)):
+ reactor.callLater(task, self.makeCall)
+ else:
+ try:
+ plugin, args, kwargs = task
+ except (ValueError, TypeError):
+ self.deferred.errback(failure.Failure())
+ else:
+ client.makeCall(
+ self.path, self.zopeDeferredId, plugin, args, kwargs,
+ self.returnResult)
+
+ def returnResult(self, value, path, zopeDeferredId, error=False):
+ if error:
+ value.completed_calls = tuple(self.completed)
+ value.active_call = self.begun
+ value.remaining_calls = tuple(self.pending)
+ self.deferred.errback(value)
+ else:
+ self.results.append(value)
+ reactor.callLater(0, self.makeCall)
+
#### LDAP, protected (with SSL) and unprotected
try:
@@ -130,7 +196,6 @@
#### Zope Exec
import Queue, thread, time, sys, StringIO
-from twisted.python import failure
from ZODB.POSException import ConflictError
from ZEO.Exceptions import ClientDisconnected
from Acquisition import aq_parent, aq_inner
@@ -139,7 +204,6 @@
from Products.zasync.manager import Expression
from Products.zasync.bucketqueue import BucketQueue
-from zasync import client
MAXTHREADPOOL = 5
@@ -515,7 +579,7 @@
except ConflictError:
logger.debug(
'zope_exec: worker %s got conflict error',
- thread_id)
+ thread_id, exc_info=True)
get_transaction().abort()
if attempt < max_conflict_resolution_attempts:
time.sleep(attempt + 1) # XXX better idea?
=== Packages/zasync/client/zasync/zasync.conf 1.3 => 1.4 ===
--- Packages/zasync/client/zasync/zasync.conf:1.3 Wed Oct 27 21:31:27 2004
+++ Packages/zasync/client/zasync/zasync.conf Thu Oct 28 18:22:20 2004
@@ -274,6 +274,15 @@
# retry yes
</plugin>
+<plugin aggregate>
+ handler zasync.plugins.aggregatePlugins
+ # 14400 seconds is four hours
+ timeout 14400
+ zope-aware yes
+ # description Aggregate calls to other plugins
+ retry no
+</plugin>
+
##############################################################################
# configure the loggers
##############################################################################
More information about the Zope-CVS
mailing list