[Zope-CVS] CVS: Packages/zasync/client/zasync - client.py:1.7 plugins.py:1.6

Gary Poster gary at zope.com
Fri Oct 29 21:39:22 EDT 2004


Update of /cvs-repository/Packages/zasync/client/zasync
In directory cvs.zope.org:/tmp/cvs-serv27722/client/zasync

Modified Files:
	client.py plugins.py 
Log Message:
Add sanitize method so that calls to (and from zope_exec) do not have persistent objects.

Remove an opportunity for Conflict Error in the client.

Fix bug in zope_exec plugin.



=== Packages/zasync/client/zasync/client.py 1.6 => 1.7 ===
--- Packages/zasync/client/zasync/client.py:1.6	Thu Oct 28 18:22:20 2004
+++ Packages/zasync/client/zasync/client.py	Fri Oct 29 21:39:22 2004
@@ -241,11 +241,10 @@
         call.cancel()
     return value
 
-# pollZope schedules makeCall which schedules returnResult, possibly using
-# prepareDeferredCall if the result is a deferred (and it usually is).
+# pollZope schedules makeCall which schedules returnResult.  That's it, really.
 def pollZope(path):
     "polls Zope to check for new calls, timeout changes, and expired calls"
-    global active, app, DB
+    global active, app, DB, plugins
     application = None
     try:
         if is_connected():
@@ -269,18 +268,28 @@
                 delay = 0
                 interval = 0.05
                 for zopeDeferred in tool.acceptAll():
-                    if zopeDeferred.remainingSeconds() <= 0:
+                    remainingSeconds = zopeDeferred.remainingSeconds()
+                    if remainingSeconds <= 0:
                         # don't bother calling if it has already timed out
                         zopeDeferred.errback(
                             failure.Failure(defer.TimeoutError('Timed out.')))
                         continue
-                    zopeDeferredId = zopeDeferred.key
                     name, args, kwargs = zopeDeferred.getSignature()
+                    call_info = plugins.get(name)
+                    if call_info is None:
+                        zopeDeferred.errback(failure.Failure(KeyError(name)))
+                        continue
                     log.debug('got call for plugin %s (args %r; kwargs %r)', 
                               name, args, kwargs)
+                    zopeDeferredId = zopeDeferred.key
+                    timeout = zopeDeferred.timeout
+                    calltimeout = call_info['timeout']
+                    if timeout is None or calltimeout < timeout:
+                        zopeDeferred.timeout = calltimeout
                     reactor.callLater(
                         delay, makeCall, 
-                        path, zopeDeferredId, name, args, kwargs)
+                        path, zopeDeferredId, name, args, kwargs, 
+                        remainingSeconds, timeout)
                     delay += interval
                 bad = []
                 for zopeDeferredId, (oldTimeout, deferred) in active.items():
@@ -380,8 +389,8 @@
             application._p_jar.close()
     return res
 
-def makeCall(path, zopeDeferredId, name, args, kwargs, 
-             returnResult=returnResult, count=0):
+def makeCall(path, zopeDeferredId, name, args, kwargs, remainingSeconds,
+             timeout, returnResult=returnResult, count=0):
     # zopeDeferredId is an id of a deferred in the asynchronous tool
     global plugins
     log = logging.getLogger('zasync')
@@ -401,14 +410,19 @@
                     res = call((path, zopeDeferredId), *args, **kwargs)
                 else:
                     res = call(*args, **kwargs)
-                get_transaction().commit() # just in case plugin touched Zope
+                get_transaction().commit() # just in case plugin touched Zope:
+                # please do not!  Use zope_exec, or follow that pattern...
+                # If you do, and you start a conflict error, it may throw
+                # the timeout check off.  Just don't, and regard this as 
+                # paranoia.
             except ConflictError:
                 log.warning('ZODB ConflictError in makeCall', exc_info=True)
                 get_transaction().abort()
                 if count < max_conflict_resolution_attempts:
                     reactor.callLater(
                         count, makeCall, path, zopeDeferredId, 
-                        name, args, kwargs, returnResult, count+1)
+                        name, args, kwargs, remainingSeconds, timeout, 
+                        returnResult, count+1)
                     return
                 else:
                     res = failure.Failure()
@@ -421,7 +435,7 @@
             except ClientDisconnected:
                 scheduleServerRetry(
                     makeCall, path, zopeDeferredId, name, args, kwargs, 
-                    returnResult, count)
+                    remainingSeconds, timeout, returnResult, count)
                 return
             except (KeyboardInterrupt, SystemExit):
                 raise
@@ -429,9 +443,15 @@
                 get_transaction().abort()
                 res = failure.Failure()
         if isinstance(res, defer.Deferred):
-            reactor.callLater(
-                0, prepareDeferredCall, path, zopeDeferredId, res, call_info, 
-                returnResult)
+            res.timeoutCall = reactor.callLater(
+                max(remainingSeconds, 0), 
+                timeoutErrback, res)
+            res.addBoth(cancelDelayedCall, res.timeoutCall)
+            active[zopeDeferredId] = (timeout, res)
+            res.addCallbacks(
+                returnResult, returnResult,
+                callbackArgs=(path, zopeDeferredId),
+                errbackArgs=(path, zopeDeferredId, True))
         elif isinstance(res, failure.Failure):
             res.cleanFailure()
             reactor.callLater(0, returnResult, res, path, zopeDeferredId, True)
@@ -440,98 +460,12 @@
     else:
         scheduleServerRetry(
             makeCall, path, zopeDeferredId, name, args, kwargs, 
-            returnResult, count)
-
-def prepareDeferredCall(path, zopeDeferredId, deferred, call_info, 
-                        returnResult, count=0):
-    log = logging.getLogger('zasync')
-    log.debug('prepareDeferredCall called for %s (%s)',
-        zopeDeferredId, path)
-    application = None
-    try:
-        if is_connected():
-            try:
-                application = app()
-                try:
-                    tool = application.unrestrictedTraverse(path)
-                except (AttributeError, LookupError):
-                    scheduleToolRetry(
-                        path, prepareDeferredCall, path, zopeDeferredId, 
-                        deferred, call_info, returnResult, count)
-                    return
-                zopeDeferred = tool.getDeferred(zopeDeferredId)
-                if zopeDeferred is None:
-                    return
-                timeout = zopeDeferred.timeout
-                calltimeout = call_info['timeout']
-                if timeout is None or calltimeout < timeout:
-                    zopeDeferred.timeout = calltimeout
-                remainingSeconds = zopeDeferred.remainingSeconds()
-                get_transaction().commit()
-            except ConflictError:
-                log.warning('ZODB ConflictError in prepareDeferredCall', 
-                            exc_info=True)
-                get_transaction().abort()
-                if count < max_conflict_resolution_attempts:
-                    reactor.callLater(
-                        count, prepareDeferredCall, zopeDeferredId, deferred, 
-                        call_info, returnResult, count+1)
-                else:
-                    res = failure.Failure()
-                    out = StringIO.StringIO()
-                    res.printDetailedTraceback(out)
-                    log.error(
-                        'Too many ConflictErrors in prepareDeferredCall: '
-                        'giving up.\n\n%s', out.getvalue())
-            except ClientDisconnected:
-                scheduleServerRetry(
-                    prepareDeferredCall, path, zopeDeferredId, deferred, 
-                    call_info, returnResult, count)
-            except (KeyboardInterrupt, SystemExit):
-                raise
-            except: # give up.  Looks like a bug.  Log should help fix it.
-                logException('Exception from Zope.', log=log)
-                deferred.addErrback(
-                    returnResult, path, zopeDeferredId, True)
-                deferred.errback(failure.Failure())
-            else:
-                deferred.timeoutCall = reactor.callLater(
-                    max(remainingSeconds, 0), 
-                    timeoutErrback, deferred)
-                deferred.addBoth(cancelDelayedCall, deferred.timeoutCall)
-                active[zopeDeferredId] = (timeout, deferred)
-                deferred.addCallbacks(
-                    returnResult, returnResult,
-                    callbackArgs=(path, zopeDeferredId),
-                    errbackArgs=(path, zopeDeferredId, True))
-        else:
-            scheduleServerRetry(
-                prepareDeferredCall, path, zopeDeferredId, deferred, call_info,
-                returnResult, count)
-    finally:
-        if application is not None:
-            application._p_jar.close()
-
-def run(path=None):
-    # to be called after config.initialize
-    d = setPlugins(path)
-    d.addCallbacks(handlePastCalls, stop)
-    d.addErrback(stop)
-    log = logging.getLogger('zasync')
-    log.debug('beginning reactor')
-    try:
-        reactor.run()
-    finally:
-        shutdown(None)
+            remainingSeconds, timeout, returnResult, count)
 
 def stop(ignored=None):
     raise SystemExit()
 
-def setPlugins(path=None):
-    global tool_path
-    if path is None:
-        path = tool_path
-    tool_path = path = tuple(path)
+def setPlugins(path):
     d = defer.Deferred()
     reactor.callLater(0, _setPlugins, d, path)
     return d
@@ -642,21 +576,32 @@
         if application is not None:
             application._p_jar.close()
 
-def shutdown(ignored=None):
+def run(path=None):
+    # to be called after config.initialize
     global tool_path, app
-    application = None
+    if path is None:
+        path = tool_path
+    tool_path = path = tuple(path)
+    d = setPlugins(path)
+    d.addCallbacks(handlePastCalls, stop)
+    d.addErrback(stop)
+    log = logging.getLogger('zasync')
+    log.debug('beginning reactor')
     try:
-        get_transaction().abort()
-        application = app()
+        reactor.run()
+    finally:
+        application = None
         try:
-            tool = application.unrestrictedTraverse(tool_path)
-            tool.setPlugins(())
-            get_transaction().commit()
-        except (AttributeError, KeyError, ConflictError, ClientDisconnected):
             get_transaction().abort()
-    finally:
-        if application is not None:
-            application._p_jar.close()
-        logging.getLogger('zasync').critical('Shutting down')
-        logging.shutdown()
-    return ignored
+            application = app()
+            try:
+                tool = application.unrestrictedTraverse(tool_path)
+                tool.setPlugins(())
+                get_transaction().commit()
+            except (AttributeError, KeyError, ConflictError, ClientDisconnected):
+                get_transaction().abort()
+        finally:
+            if application is not None:
+                application._p_jar.close()
+            logging.getLogger('zasync').critical('Shutting down')
+            logging.shutdown()


=== Packages/zasync/client/zasync/plugins.py 1.5 => 1.6 ===
--- Packages/zasync/client/zasync/plugins.py:1.5	Thu Oct 28 18:22:20 2004
+++ Packages/zasync/client/zasync/plugins.py	Fri Oct 29 21:39:22 2004
@@ -35,6 +35,9 @@
 #### aggegate plugins
 
 def aggregatePlugins(zopeDeferredTuple, *calls):
+    # XXX remove timeout feature; use the schedule plugin!
+    # XXX also? instead? incorporate easy and introspectable chaining into
+    # zopeDeferreds
     """aggregate calls to other plugins.  each call may either be a number
     (integer or float) of seconds to pause, or a tuple of (plugin name, args 
     tuple, kwargs dict).  For example, 
@@ -82,9 +85,16 @@
                 except (ValueError, TypeError):
                     self.deferred.errback(failure.Failure())
                 else:
-                    client.makeCall(
-                        self.path, self.zopeDeferredId, plugin, args, kwargs, 
-                        self.returnResult)
+                    try:
+                        call_info = client.plugins[plugin]
+                    except KeyError:
+                        self.deferred.errback(failure.Failure())
+                    else:
+                        timeout = call_info["timeout"]
+                        remainingSeconds = client.active[self.zopeDeferredId][0]
+                        client.makeCall(
+                            self.path, self.zopeDeferredId, plugin, args, kwargs, 
+                            remainingSeconds, timeout, self.returnResult)
             
     def returnResult(self, value, path, zopeDeferredId, error=False):
         if error:
@@ -202,7 +212,7 @@
 from AccessControl.SecurityManagement import newSecurityManager
 from Products.PageTemplates.Expressions import getEngine, SecureModuleImporter
 
-from Products.zasync.manager import Expression
+from Products.zasync.manager import Expression, sanitize
 from Products.zasync.bucketqueue import BucketQueue
 
 MAXTHREADPOOL = 5
@@ -216,14 +226,14 @@
 serverDown = False
 
 CANCEL = object()
-CALLBACK = 'callback'
-ERRBACK = 'errback'
+CALLBACK = 'CALLBACK'
+ERRBACK = 'ERRBACK'
 
-def getTaskStatus(key, del_if_cancel=False, delete=False):
+def getTaskStatus(key, del_if_cancel=False):
     taskStatusLock.acquire()
     try:
         val = taskStatus.get(key)
-        if delete or (del_if_cancel and val is CANCEL):
+        if del_if_cancel and val is CANCEL:
             try:
                 del taskStatus[key]
             except KeyError:
@@ -232,6 +242,19 @@
         taskStatusLock.release()
     return val
 
+def popTaskStatus(key):
+    taskStatusLock.acquire()
+    try:
+        val = taskStatus.get(key)
+        try:
+            del taskStatus[key]
+        except KeyError:
+            pass
+    finally:
+        taskStatusLock.release()
+    return val
+        
+
 def setTaskStatus(key, value):
     taskStatusLock.acquire()
     try:
@@ -396,7 +419,7 @@
     if getTaskStatus(zopeDeferredTuple) is not None:
         setTaskStatus(zopeDeferredTuple, CANCEL)
         logging.getLogger('zasync.plugins').debug(
-            'zope_exec: cancelling pending %r for worker thread',
+            'zope_exec: attempting to cancel pending %r for worker thread',
             zopeDeferredTuple)
     return failure
 
@@ -547,9 +570,10 @@
                             else:
                                 result = context_dict["result"] = res
                                 success = True
-                        deferred = getTaskStatus(zopeDeferredTuple, delete=True)
+                        deferred = getTaskStatus(zopeDeferredTuple)
                         if deferred is not CANCEL:
                             if success:
+                                result = sanitize(result)
                                 get_transaction().commit()
                                 logger.debug(
                                     'zope_exec: worker %r committed successful '
@@ -626,8 +650,7 @@
                                 thread_id, exc_info=True)
                         get_transaction().abort()
                         if deferred is None:
-                            deferred = getTaskStatus(
-                                zopeDeferredTuple, delete=True)
+                            deferred = popTaskStatus(zopeDeferredTuple)
                         if deferred is not CANCEL: # one last chance
                             if deferred is None:
                                 if out is None:
@@ -640,7 +663,9 @@
                                 logger.error(
                                     'zope_exec: worker %s cannot find the task '
                                     'status for %r, so it cannot schedule '
-                                    'zasync to pass a failure back to Zope.%s',
+                                    'zasync to pass a failure back to Zope.  '
+                                    'This is a significant problem.  Here is a '
+                                    'lot of diagnostic information:\n\n%s',
                                     thread_id, zopeDeferredTuple, out)
                             else:
                                 callbacks.put((deferred, f))
@@ -654,6 +679,7 @@
                         # we want to leverage the ConflictError handling in the
                         # main zasync process, so we need to pass the result
                         # back to the main thread for zasync to communicate.
+                        deferred = popTaskStatus(zopeDeferredTuple)
                         if deferred is None:
                             f = failure.Failure()
                             out = StringIO.StringIO()
@@ -664,7 +690,7 @@
                                 'status for %r, so it cannot schedule '
                                 'zasync to pass a result back to Zope.  '
                                 'This is a significant problem.  Here is a lot '
-                                'of diagnostic information:\n\n%r',
+                                'of diagnostic information:\n\n%s',
                                 thread_id, zopeDeferredTuple, out)
                         elif success:
                             logger.debug(
@@ -687,7 +713,7 @@
                 logger.debug(
                     'zope_exec: worker %s got too many conflict errors: '
                     'giving up.', thread_id)
-                deferred = getTaskStatus(zopeDeferredTuple, delete=True)
+                deferred = popTaskStatus(zopeDeferredTuple)
                 if deferred is not None and deferred is not CANCEL:
                     logger.debug(
                         'zope_exec: worker %s scheduling failure message with '



More information about the Zope-CVS mailing list