[Zope-CVS] CVS: Packages/zasync/client/zasync - client.py:1.7
plugins.py:1.6
Gary Poster
gary at zope.com
Fri Oct 29 21:39:22 EDT 2004
Update of /cvs-repository/Packages/zasync/client/zasync
In directory cvs.zope.org:/tmp/cvs-serv27722/client/zasync
Modified Files:
client.py plugins.py
Log Message:
Add sanitize method so that calls to (and from zope_exec) do not have persistent objects.
Remove an opportunity for Conflict Error in the client.
Fix bug in zope_exec plugin.
=== Packages/zasync/client/zasync/client.py 1.6 => 1.7 ===
--- Packages/zasync/client/zasync/client.py:1.6 Thu Oct 28 18:22:20 2004
+++ Packages/zasync/client/zasync/client.py Fri Oct 29 21:39:22 2004
@@ -241,11 +241,10 @@
call.cancel()
return value
-# pollZope schedules makeCall which schedules returnResult, possibly using
-# prepareDeferredCall if the result is a deferred (and it usually is).
+# pollZope schedules makeCall which schedules returnResult. That's it, really.
def pollZope(path):
"polls Zope to check for new calls, timeout changes, and expired calls"
- global active, app, DB
+ global active, app, DB, plugins
application = None
try:
if is_connected():
@@ -269,18 +268,28 @@
delay = 0
interval = 0.05
for zopeDeferred in tool.acceptAll():
- if zopeDeferred.remainingSeconds() <= 0:
+ remainingSeconds = zopeDeferred.remainingSeconds()
+ if remainingSeconds <= 0:
# don't bother calling if it has already timed out
zopeDeferred.errback(
failure.Failure(defer.TimeoutError('Timed out.')))
continue
- zopeDeferredId = zopeDeferred.key
name, args, kwargs = zopeDeferred.getSignature()
+ call_info = plugins.get(name)
+ if call_info is None:
+ zopeDeferred.errback(failure.Failure(KeyError(name)))
+ continue
log.debug('got call for plugin %s (args %r; kwargs %r)',
name, args, kwargs)
+ zopeDeferredId = zopeDeferred.key
+ timeout = zopeDeferred.timeout
+ calltimeout = call_info['timeout']
+ if timeout is None or calltimeout < timeout:
+ zopeDeferred.timeout = calltimeout
reactor.callLater(
delay, makeCall,
- path, zopeDeferredId, name, args, kwargs)
+ path, zopeDeferredId, name, args, kwargs,
+ remainingSeconds, timeout)
delay += interval
bad = []
for zopeDeferredId, (oldTimeout, deferred) in active.items():
@@ -380,8 +389,8 @@
application._p_jar.close()
return res
-def makeCall(path, zopeDeferredId, name, args, kwargs,
- returnResult=returnResult, count=0):
+def makeCall(path, zopeDeferredId, name, args, kwargs, remainingSeconds,
+ timeout, returnResult=returnResult, count=0):
# zopeDeferredId is an id of a deferred in the asynchronous tool
global plugins
log = logging.getLogger('zasync')
@@ -401,14 +410,19 @@
res = call((path, zopeDeferredId), *args, **kwargs)
else:
res = call(*args, **kwargs)
- get_transaction().commit() # just in case plugin touched Zope
+ get_transaction().commit() # just in case plugin touched Zope:
+ # please do not! Use zope_exec, or follow that pattern...
+ # If you do, and you start a conflict error, it may throw
+ # the timeout check off. Just don't, and regard this as
+ # paranoia.
except ConflictError:
log.warning('ZODB ConflictError in makeCall', exc_info=True)
get_transaction().abort()
if count < max_conflict_resolution_attempts:
reactor.callLater(
count, makeCall, path, zopeDeferredId,
- name, args, kwargs, returnResult, count+1)
+ name, args, kwargs, remainingSeconds, timeout,
+ returnResult, count+1)
return
else:
res = failure.Failure()
@@ -421,7 +435,7 @@
except ClientDisconnected:
scheduleServerRetry(
makeCall, path, zopeDeferredId, name, args, kwargs,
- returnResult, count)
+ remainingSeconds, timeout, returnResult, count)
return
except (KeyboardInterrupt, SystemExit):
raise
@@ -429,9 +443,15 @@
get_transaction().abort()
res = failure.Failure()
if isinstance(res, defer.Deferred):
- reactor.callLater(
- 0, prepareDeferredCall, path, zopeDeferredId, res, call_info,
- returnResult)
+ res.timeoutCall = reactor.callLater(
+ max(remainingSeconds, 0),
+ timeoutErrback, res)
+ res.addBoth(cancelDelayedCall, res.timeoutCall)
+ active[zopeDeferredId] = (timeout, res)
+ res.addCallbacks(
+ returnResult, returnResult,
+ callbackArgs=(path, zopeDeferredId),
+ errbackArgs=(path, zopeDeferredId, True))
elif isinstance(res, failure.Failure):
res.cleanFailure()
reactor.callLater(0, returnResult, res, path, zopeDeferredId, True)
@@ -440,98 +460,12 @@
else:
scheduleServerRetry(
makeCall, path, zopeDeferredId, name, args, kwargs,
- returnResult, count)
-
-def prepareDeferredCall(path, zopeDeferredId, deferred, call_info,
- returnResult, count=0):
- log = logging.getLogger('zasync')
- log.debug('prepareDeferredCall called for %s (%s)',
- zopeDeferredId, path)
- application = None
- try:
- if is_connected():
- try:
- application = app()
- try:
- tool = application.unrestrictedTraverse(path)
- except (AttributeError, LookupError):
- scheduleToolRetry(
- path, prepareDeferredCall, path, zopeDeferredId,
- deferred, call_info, returnResult, count)
- return
- zopeDeferred = tool.getDeferred(zopeDeferredId)
- if zopeDeferred is None:
- return
- timeout = zopeDeferred.timeout
- calltimeout = call_info['timeout']
- if timeout is None or calltimeout < timeout:
- zopeDeferred.timeout = calltimeout
- remainingSeconds = zopeDeferred.remainingSeconds()
- get_transaction().commit()
- except ConflictError:
- log.warning('ZODB ConflictError in prepareDeferredCall',
- exc_info=True)
- get_transaction().abort()
- if count < max_conflict_resolution_attempts:
- reactor.callLater(
- count, prepareDeferredCall, zopeDeferredId, deferred,
- call_info, returnResult, count+1)
- else:
- res = failure.Failure()
- out = StringIO.StringIO()
- res.printDetailedTraceback(out)
- log.error(
- 'Too many ConflictErrors in prepareDeferredCall: '
- 'giving up.\n\n%s', out.getvalue())
- except ClientDisconnected:
- scheduleServerRetry(
- prepareDeferredCall, path, zopeDeferredId, deferred,
- call_info, returnResult, count)
- except (KeyboardInterrupt, SystemExit):
- raise
- except: # give up. Looks like a bug. Log should help fix it.
- logException('Exception from Zope.', log=log)
- deferred.addErrback(
- returnResult, path, zopeDeferredId, True)
- deferred.errback(failure.Failure())
- else:
- deferred.timeoutCall = reactor.callLater(
- max(remainingSeconds, 0),
- timeoutErrback, deferred)
- deferred.addBoth(cancelDelayedCall, deferred.timeoutCall)
- active[zopeDeferredId] = (timeout, deferred)
- deferred.addCallbacks(
- returnResult, returnResult,
- callbackArgs=(path, zopeDeferredId),
- errbackArgs=(path, zopeDeferredId, True))
- else:
- scheduleServerRetry(
- prepareDeferredCall, path, zopeDeferredId, deferred, call_info,
- returnResult, count)
- finally:
- if application is not None:
- application._p_jar.close()
-
-def run(path=None):
- # to be called after config.initialize
- d = setPlugins(path)
- d.addCallbacks(handlePastCalls, stop)
- d.addErrback(stop)
- log = logging.getLogger('zasync')
- log.debug('beginning reactor')
- try:
- reactor.run()
- finally:
- shutdown(None)
+ remainingSeconds, timeout, returnResult, count)
def stop(ignored=None):
raise SystemExit()
-def setPlugins(path=None):
- global tool_path
- if path is None:
- path = tool_path
- tool_path = path = tuple(path)
+def setPlugins(path):
d = defer.Deferred()
reactor.callLater(0, _setPlugins, d, path)
return d
@@ -642,21 +576,32 @@
if application is not None:
application._p_jar.close()
-def shutdown(ignored=None):
+def run(path=None):
+ # to be called after config.initialize
global tool_path, app
- application = None
+ if path is None:
+ path = tool_path
+ tool_path = path = tuple(path)
+ d = setPlugins(path)
+ d.addCallbacks(handlePastCalls, stop)
+ d.addErrback(stop)
+ log = logging.getLogger('zasync')
+ log.debug('beginning reactor')
try:
- get_transaction().abort()
- application = app()
+ reactor.run()
+ finally:
+ application = None
try:
- tool = application.unrestrictedTraverse(tool_path)
- tool.setPlugins(())
- get_transaction().commit()
- except (AttributeError, KeyError, ConflictError, ClientDisconnected):
get_transaction().abort()
- finally:
- if application is not None:
- application._p_jar.close()
- logging.getLogger('zasync').critical('Shutting down')
- logging.shutdown()
- return ignored
+ application = app()
+ try:
+ tool = application.unrestrictedTraverse(tool_path)
+ tool.setPlugins(())
+ get_transaction().commit()
+ except (AttributeError, KeyError, ConflictError, ClientDisconnected):
+ get_transaction().abort()
+ finally:
+ if application is not None:
+ application._p_jar.close()
+ logging.getLogger('zasync').critical('Shutting down')
+ logging.shutdown()
=== Packages/zasync/client/zasync/plugins.py 1.5 => 1.6 ===
--- Packages/zasync/client/zasync/plugins.py:1.5 Thu Oct 28 18:22:20 2004
+++ Packages/zasync/client/zasync/plugins.py Fri Oct 29 21:39:22 2004
@@ -35,6 +35,9 @@
#### aggegate plugins
def aggregatePlugins(zopeDeferredTuple, *calls):
+ # XXX remove timeout feature; use the schedule plugin!
+ # XXX also? instead? incorporate easy and introspectable chaining into
+ # zopeDeferreds
"""aggregate calls to other plugins. each call may either be a number
(integer or float) of seconds to pause, or a tuple of (plugin name, args
tuple, kwargs dict). For example,
@@ -82,9 +85,16 @@
except (ValueError, TypeError):
self.deferred.errback(failure.Failure())
else:
- client.makeCall(
- self.path, self.zopeDeferredId, plugin, args, kwargs,
- self.returnResult)
+ try:
+ call_info = client.plugins[plugin]
+ except KeyError:
+ self.deferred.errback(failure.Failure())
+ else:
+ timeout = call_info["timeout"]
+ remainingSeconds = client.active[self.zopeDeferredId][0]
+ client.makeCall(
+ self.path, self.zopeDeferredId, plugin, args, kwargs,
+ remainingSeconds, timeout, self.returnResult)
def returnResult(self, value, path, zopeDeferredId, error=False):
if error:
@@ -202,7 +212,7 @@
from AccessControl.SecurityManagement import newSecurityManager
from Products.PageTemplates.Expressions import getEngine, SecureModuleImporter
-from Products.zasync.manager import Expression
+from Products.zasync.manager import Expression, sanitize
from Products.zasync.bucketqueue import BucketQueue
MAXTHREADPOOL = 5
@@ -216,14 +226,14 @@
serverDown = False
CANCEL = object()
-CALLBACK = 'callback'
-ERRBACK = 'errback'
+CALLBACK = 'CALLBACK'
+ERRBACK = 'ERRBACK'
-def getTaskStatus(key, del_if_cancel=False, delete=False):
+def getTaskStatus(key, del_if_cancel=False):
taskStatusLock.acquire()
try:
val = taskStatus.get(key)
- if delete or (del_if_cancel and val is CANCEL):
+ if del_if_cancel and val is CANCEL:
try:
del taskStatus[key]
except KeyError:
@@ -232,6 +242,19 @@
taskStatusLock.release()
return val
+def popTaskStatus(key):
+ taskStatusLock.acquire()
+ try:
+ val = taskStatus.get(key)
+ try:
+ del taskStatus[key]
+ except KeyError:
+ pass
+ finally:
+ taskStatusLock.release()
+ return val
+
+
def setTaskStatus(key, value):
taskStatusLock.acquire()
try:
@@ -396,7 +419,7 @@
if getTaskStatus(zopeDeferredTuple) is not None:
setTaskStatus(zopeDeferredTuple, CANCEL)
logging.getLogger('zasync.plugins').debug(
- 'zope_exec: cancelling pending %r for worker thread',
+ 'zope_exec: attempting to cancel pending %r for worker thread',
zopeDeferredTuple)
return failure
@@ -547,9 +570,10 @@
else:
result = context_dict["result"] = res
success = True
- deferred = getTaskStatus(zopeDeferredTuple, delete=True)
+ deferred = getTaskStatus(zopeDeferredTuple)
if deferred is not CANCEL:
if success:
+ result = sanitize(result)
get_transaction().commit()
logger.debug(
'zope_exec: worker %r committed successful '
@@ -626,8 +650,7 @@
thread_id, exc_info=True)
get_transaction().abort()
if deferred is None:
- deferred = getTaskStatus(
- zopeDeferredTuple, delete=True)
+ deferred = popTaskStatus(zopeDeferredTuple)
if deferred is not CANCEL: # one last chance
if deferred is None:
if out is None:
@@ -640,7 +663,9 @@
logger.error(
'zope_exec: worker %s cannot find the task '
'status for %r, so it cannot schedule '
- 'zasync to pass a failure back to Zope.%s',
+ 'zasync to pass a failure back to Zope. '
+ 'This is a significant problem. Here is a '
+ 'lot of diagnostic information:\n\n%s',
thread_id, zopeDeferredTuple, out)
else:
callbacks.put((deferred, f))
@@ -654,6 +679,7 @@
# we want to leverage the ConflictError handling in the
# main zasync process, so we need to pass the result
# back to the main thread for zasync to communicate.
+ deferred = popTaskStatus(zopeDeferredTuple)
if deferred is None:
f = failure.Failure()
out = StringIO.StringIO()
@@ -664,7 +690,7 @@
'status for %r, so it cannot schedule '
'zasync to pass a result back to Zope. '
'This is a significant problem. Here is a lot '
- 'of diagnostic information:\n\n%r',
+ 'of diagnostic information:\n\n%s',
thread_id, zopeDeferredTuple, out)
elif success:
logger.debug(
@@ -687,7 +713,7 @@
logger.debug(
'zope_exec: worker %s got too many conflict errors: '
'giving up.', thread_id)
- deferred = getTaskStatus(zopeDeferredTuple, delete=True)
+ deferred = popTaskStatus(zopeDeferredTuple)
if deferred is not None and deferred is not CANCEL:
logger.debug(
'zope_exec: worker %s scheduling failure message with '
More information about the Zope-CVS
mailing list