[Checkins] SVN: Sandbox/J1m/resumelb/ Updated to work with gevent 1. Woo hoo! No longer need our own thread
Jim Fulton
jim at zope.com
Fri Jan 27 11:39:31 UTC 2012
Log message for revision 124212:
Updated to work with gevent 1. Woo hoo! No longer need our own thread
pool implementation and no longer need to buid gevent.
In doing so, fixed a threading bug in worker.py and a race condition
in lb.test.
Changed:
U Sandbox/J1m/resumelb/buildout.cfg
U Sandbox/J1m/resumelb/setup.py
U Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
D Sandbox/J1m/resumelb/src/zc/resumelb/thread.py
D Sandbox/J1m/resumelb/src/zc/resumelb/thread.test
U Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
-=-
Modified: Sandbox/J1m/resumelb/buildout.cfg
===================================================================
--- Sandbox/J1m/resumelb/buildout.cfg 2012-01-27 11:26:35 UTC (rev 124211)
+++ Sandbox/J1m/resumelb/buildout.cfg 2012-01-27 11:39:31 UTC (rev 124212)
@@ -1,22 +1,12 @@
[buildout]
develop = .
-parts = gevent py ctl
+parts = py ctl
[ctl]
recipe = zc.recipe.rhrc
dest = ${buildout:bin-directory}
parts = lb worker
-[libevent]
-recipe = zc.recipe.cmmi
-url = https://github.com/downloads/libevent/libevent/libevent-2.0.14-stable.tar.gz
-
-[gevent]
-recipe = zc.recipe.egg:custom
-include-dirs = ${libevent:location}/include
-library-dirs = ${libevent:location}/lib
-rpath = ${:library-dirs}
-
[test]
recipe = zc.recipe.testrunner
eggs = zc.resumelb [test]
Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py 2012-01-27 11:26:35 UTC (rev 124211)
+++ Sandbox/J1m/resumelb/setup.py 2012-01-27 11:39:31 UTC (rev 124212)
@@ -14,7 +14,7 @@
name, version = 'zc.resumelb', '0'
install_requires = [
- 'setuptools', 'gevent', 'WebOb', 'zc.thread', 'zc.parse_addr',
+ 'setuptools', 'gevent >=1.0b1', 'WebOb', 'zc.thread', 'zc.parse_addr',
'zc.mappingobject', 'llist']
extras_require = dict(
test=['zope.testing', 'bobo', 'manuel', 'WebTest'])
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.test 2012-01-27 11:26:35 UTC (rev 124211)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.test 2012-01-27 11:39:31 UTC (rev 124212)
@@ -44,6 +44,7 @@
>>> write_message(worker1, 0, {'h1.com': 10.0})
>>> write_message(worker2, 0, {'h2.com': 10.0})
+ >>> gevent.sleep(.01) # Give resumes time to arrive
Now, let's make a request and make sure the data gets where it's
supposed to go.
@@ -53,6 +54,7 @@
>>> g1 = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
>>> rno, env1 = read_message(worker1)
+
>>> rno
1
>>> from pprint import pprint
@@ -305,7 +307,7 @@
>>> lb.connect_sleep = 0.01
>>> port = workers[0].server.server_port # We'll reuse below
- >>> workers[0].server.kill()
+ >>> workers[0].server.stop()
>>> socket = workers[0].socket
>>> socket.close()
>>> gevent.sleep(.01)
Deleted: Sandbox/J1m/resumelb/src/zc/resumelb/thread.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/thread.py 2012-01-27 11:26:35 UTC (rev 124211)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/thread.py 2012-01-27 11:39:31 UTC (rev 124212)
@@ -1,99 +0,0 @@
-##############################################################################
-#
-# Copyright Zope Foundation and Contributors.
-# All Rights Reserved.
-#
-# This software is subject to the provisions of the Zope Public License,
-# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
-# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
-# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
-# FOR A PARTICULAR PURPOSE.
-#
-##############################################################################
-#
-# Thread pool implementation based on: https://bitbucket.org/denis/
-# gevent-playground/src/49d1cdcdf643/geventutil/threadpool.py
-
-import fcntl
-import gevent.core
-import gevent.event
-import os
-import Queue
-import threading
-import zc.thread
-
-###############################################################################
-# The following code is from the above URL:
-
-# Simple wrapper to os.pipe() - but sets to non-block
-def _pipe():
- r, w = os.pipe()
- fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK)
- fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK)
- return r, w
-
-_core_pipe_read, _core_pipe_write = _pipe()
-
-def _core_pipe_read_callback(event, evtype):
- try:
- os.read(event.fd, 1)
- except EnvironmentError:
- pass
-
-gevent.core.event(gevent.core.EV_READ|gevent.core.EV_PERSIST,
- _core_pipe_read, _core_pipe_read_callback).add()
-
-def wake_gevent():
- os.write(_core_pipe_write, '\0')
-
-# MTAsyncResult is greatly simplified from version in https://bitbucket.org/
-# denis/gevent-playground/src/49d1cdcdf643/geventutil/threadpool.py
-class MTAsyncResult(gevent.event.AsyncResult):
-
- def set_exception(self, exception):
- gevent.event.AsyncResult.set_exception(self, exception)
- wake_gevent()
-
- def set(self, value=None):
- gevent.event.AsyncResult.set(self, value)
- wake_gevent()
-
-#
-###############################################################################
-
-class Pool:
-
- def __init__(self, size):
- self.size = size
- self.queue = queue = Queue.Queue()
-
- def run():
- while 1:
- result, job, args = queue.get()
- try:
- result.set(job(*args))
- except Exception, v:
- if result is None:
- return #closes
- result.set_exception(v)
-
- run.__name__ = __name__
-
- self.threads = [zc.thread.Thread(run) for i in range(size)]
-
- def result(self, job, *args):
- result = MTAsyncResult()
- self.queue.put((result, job, args))
- return result
-
- def apply(self, job, *args):
- result = self.result(job, *args)
- return result.get()
-
- def close(self, timeout=1):
- for thread in self.threads:
- self.queue.put((None, None, None))
- for thread in self.threads:
- thread.join(timeout)
-
Deleted: Sandbox/J1m/resumelb/src/zc/resumelb/thread.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/thread.test 2012-01-27 11:26:35 UTC (rev 124211)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/thread.test 2012-01-27 11:39:31 UTC (rev 124212)
@@ -1,54 +0,0 @@
-resumelb thread pool
-====================
-
-Applications bloc.
-
-To deal with this, we provide a very basic thread pool.
-
- >>> import zc.resumelb.thread
-
- >>> pool = zc.resumelb.thread.Pool(4)
-
-We specified a pool size of 4, so 4 threads are created:
-
- >>> import threading
- >>> len([t for t in threading.enumerate()
- ... if t.name == 'zc.resumelb.thread'])
- 4
-
-They are all deamonic:
-
- >>> len([t for t in threading.enumerate()
- ... if t.name == 'zc.resumelb.thread'])
- 4
-
-To get something done, call the pool result method with a callable and
-arguments:
-
- >>> import time
- >>> def job(t):
- ... time.sleep(t)
- ... return threading.current_thread().ident, t
-
- >>> result = pool.result(job, 0)
-
-The result is an async result:
-
- >>> ident, sleep = result.get()
- >>> idents = set(t.ident for t in threading.enumerate()
- ... if t.name == 'zc.resumelb.thread')
-
- >>> ident in idents and sleep == 0
- True
-
-If we actually sleep, so as to block, we can end up using all of the
-threads in the thread pool:
-
- >>> results = [pool.result(job, 0.01) for i in range(6)]
- >>> set(r.get()[0] for r in results) == idents
- True
-
-When we're done with a pool, it's noce to close it. This allows us to
-wait for pending jobs and close it down in an orderly fashion:
-
- >>> pool.close()
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.py 2012-01-27 11:26:35 UTC (rev 124211)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.py 2012-01-27 11:39:31 UTC (rev 124212)
@@ -3,12 +3,12 @@
import gevent
import gevent.hub
import gevent.server
+import gevent.threadpool
import logging
import sys
import time
import zc.mappingobject
import zc.resumelb.util
-import zc.resumelb.thread
logger = logging.getLogger(__name__)
@@ -32,10 +32,10 @@
self.connections = set()
if settings.get('threads'):
- pool = zc.resumelb.thread.Pool(self.settings.threads)
+ pool = gevent.threadpool.ThreadPool(settings['threads'])
self.apply = pool.apply
else:
- self.apply = lambda f, *a: f(*a)
+ self.apply = apply
self.server = gevent.server.StreamServer(addr, self.handle_connection)
self.server.start()
@@ -84,12 +84,15 @@
break
f.seek(0)
+ response = [0]
def start_response(status, headers, exc_info=None):
assert not exc_info # XXX
- conn.put((rno, (status, headers)))
+ response[0] = (status, headers)
try:
- for data in self.apply(self.app, env, start_response):
+ body = self.apply(self.app, (env, start_response))
+ conn.put((rno, response[0]))
+ for data in body:
conn.put((rno, data))
conn.put((rno, ''))
More information about the checkins
mailing list