[Checkins] SVN: zc.resumelb/trunk/src/zc/resumelb/ Fixed: Temporary files created when buffering data in the load
jim
cvs-admin at zope.org
Wed May 9 21:42:23 UTC 2012
Log message for revision 125792:
Fixed: Temporary files created when buffering data in the load
balancers weren't closed explicitly. Generally, they were closed
through garbage collection, but in certain situations, their numbers
could build quickly, leading to file-descriptor exhaustion.
Moved some more helper functions from tests to tests module for reuse.
Added spawn halper cuz gevent green;et exception reporting sucks.
Changed:
U zc.resumelb/trunk/src/zc/resumelb/README.txt
U zc.resumelb/trunk/src/zc/resumelb/bufferedqueue.test
U zc.resumelb/trunk/src/zc/resumelb/lb.py
U zc.resumelb/trunk/src/zc/resumelb/lb.test
U zc.resumelb/trunk/src/zc/resumelb/tests.py
U zc.resumelb/trunk/src/zc/resumelb/util.py
U zc.resumelb/trunk/src/zc/resumelb/worker.py
-=-
Modified: zc.resumelb/trunk/src/zc/resumelb/README.txt
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/README.txt 2012-05-09 20:24:19 UTC (rev 125791)
+++ zc.resumelb/trunk/src/zc/resumelb/README.txt 2012-05-09 21:42:19 UTC (rev 125792)
@@ -242,9 +242,14 @@
Change History
==============
-0.5.2 (2012-05-??)
+0.5.2 (2012-05-09)
------------------
+- Fixed: Temporary files created when buffering data in the load
+ balancers weren't closed explicitly. Generally, they were closed
+ through garbage collection, but in certain situations, their numbers
+ could build quickly, leading to file-descriptor exhaustion.
+
- Fixed: Tracelog 'I' records didn't always contain input length information.
- Fixed: Tracelog 'I' records were only included when using thread pools.
Modified: zc.resumelb/trunk/src/zc/resumelb/bufferedqueue.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/bufferedqueue.test 2012-05-09 20:24:19 UTC (rev 125791)
+++ zc.resumelb/trunk/src/zc/resumelb/bufferedqueue.test 2012-05-09 21:42:19 UTC (rev 125792)
@@ -29,9 +29,9 @@
>>> q.qsize()
0
-If it hasn't started bufferng yet, it doesn't have a close method:
+If it hasn't started bufferng yet, it's queue is not a Buffer
- >>> hasattr(q, 'close')
+ >>> isinstance(q.queue, zc.resumelb.util.Buffer)
False
In testing, the underlying queue max size is only 999 bytes.
@@ -41,11 +41,11 @@
bytes in the queue are over the limit:
>>> q.put('a'*1000)
- >>> hasattr(q, 'close')
+ >>> isinstance(q.queue, zc.resumelb.util.Buffer)
False
>>> q.put('b'*1000)
- >>> hasattr(q, 'close')
+ >>> isinstance(q.queue, zc.resumelb.util.Buffer)
True
Now, we've triggered the buffering. We can keep adding data:
Modified: zc.resumelb/trunk/src/zc/resumelb/lb.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.py 2012-05-09 20:24:19 UTC (rev 125791)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.py 2012-05-09 21:42:19 UTC (rev 125792)
@@ -272,13 +272,10 @@
worker.dbacklog = worker.dbacklog*decay + worker.backlog
worker.nbacklog = worker.nbacklog*decay + 1
worker.mbacklog = worker.dbacklog / worker.nbacklog
+class Worker(zc.resumelb.util.LBWorker):
-class Worker(zc.resumelb.util.Worker):
-
maxrno = (1<<32) - 1
- ReadQueue = zc.resumelb.util.BufferedQueue
-
def __init__(self, pool, socket, addr):
self.pool = pool
self.nrequest = 0
@@ -324,7 +321,7 @@
self.nrequest = rno % self.maxrno
self.requests[rno] = time.time()
try:
- get = self.start(rno)
+ get = self.start(rno).get
put = self.put
try:
put((rno, env))
@@ -352,10 +349,12 @@
def content():
try:
+ # We yield a first value to get into the try so
+ # the generator close will execute thf finally block. :(
+ yield 1
while 1:
data = get()
if data:
- #logger.debug('yield %r', data)
yield data
else:
if data is None:
@@ -365,7 +364,10 @@
finally:
self.end(rno)
- return content()
+ # See the "yield 1" comment above. :(
+ content = content()
+ content.next()
+ return content
finally:
del self.requests[rno]
Modified: zc.resumelb/trunk/src/zc/resumelb/lb.test
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/lb.test 2012-05-09 20:24:19 UTC (rev 125791)
+++ zc.resumelb/trunk/src/zc/resumelb/lb.test 2012-05-09 21:42:19 UTC (rev 125792)
@@ -10,17 +10,6 @@
To test lb behavior, we'll create faux workers the lb can connect to.
- >>> import gevent.server
- >>> class Worker:
- ... def __init__(self):
- ... self.server = server = gevent.server.StreamServer(
- ... ('127.0.0.1', 0), self.handle)
- ... server.start()
- ... self.addr = '127.0.0.1', server.server_port
- ... def handle(self, socket, addr):
- ... self.socket = socket
-
-
>>> workers = [Worker() for i in range(2)]
We have some workers running. Now, let's create a load balancer:
@@ -50,6 +39,8 @@
>>> write_message(worker1, 0, {'h1.com': 10.0})
>>> write_message(worker2, 0, {'h2.com': 10.0})
+
+ >>> import gevent
>>> gevent.sleep(.01) # Give resumes time to arrive
Now, let's make a request and make sure the data gets where it's
@@ -57,7 +48,7 @@
>>> import webtest
>>> app1 = webtest.TestApp(lb.handle_wsgi)
- >>> g1 = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
+ >>> g1 = spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
>>> rno, env1 = read_message(worker1)
@@ -92,7 +83,7 @@
socket. This time, we'll make a request that provides a large body:
>>> app2 = webtest.TestApp(lb.handle_wsgi)
- >>> g2 = gevent.spawn(
+ >>> g2 = spawn(
... app2.put, '/hi.html', 'i'*200000, [('Host', 'h1.com')])
>>> rno, env2 = read_message(worker1)
@@ -143,7 +134,7 @@
If we make a request to h2.com, we'll get the request on worker2:
>>> app3 = webtest.TestApp(lb.handle_wsgi)
- >>> g3 = gevent.spawn(app3.get, '/hi.html', {}, [('Host', 'h2.com')])
+ >>> g3 = spawn(app3.get, '/hi.html', {}, [('Host', 'h2.com')])
>>> rno, env3 = read_message(worker2)
>>> rno
@@ -252,7 +243,7 @@
>>> import time
>>> t1 = time.time()
>>> app1 = webtest.TestApp(lb.handle_wsgi)
- >>> g1 = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
+ >>> g1 = spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
>>> rno = read_message(worker1)[0]
>>> read_message(worker1) == (rno, '')
True
@@ -264,7 +255,7 @@
>>> gevent.sleep(.01)
>>> app2 = webtest.TestApp(lb.handle_wsgi)
- >>> g2 = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
+ >>> g2 = spawn(app1.get, '/hi.html', {}, [('Host', 'h1.com')])
>>> gevent.sleep(.01)
>>> [ot] == [w.oldest_time for (_, w) in lb.pool.skilled['h1.com']]
True
@@ -308,7 +299,7 @@
... 'OPTIONS', 'TRACE'):
... app = webtest.TestApp(lb.handle_wsgi)
... greenlets.append(
- ... gevent.spawn(app.request, '/hi.html', method=method,
+ ... spawn(app.request, '/hi.html', method=method,
... headers=[('Host', 'h1.com')], status='*'))
... rno, data = read_message(worker1)
... rno2, blank = read_message(worker1)
@@ -457,7 +448,7 @@
If we submit an h4.com request, it will go to the new worker:
- >>> g = gevent.spawn(app1.get, '/hi.html', {}, [('Host', 'h4.com')])
+ >>> g = spawn(app1.get, '/hi.html', {}, [('Host', 'h4.com')])
>>> rno, env = read_message(workers[-1].socket)
>>> read_message(workers[-1].socket)
(1, '')
@@ -501,7 +492,7 @@
We already have a request in flight. Let's add another on a different
worker:
- >>> g2 = gevent.spawn(app2.get, '/hi.html', {}, [('Host', 'h2.com')])
+ >>> g2 = spawn(app2.get, '/hi.html', {}, [('Host', 'h2.com')])
>>> gevent.sleep(.01)
>>> [w.backlog for w in lb.pool.workers]
@@ -510,7 +501,7 @@
If we call shutdown, it will block until we have no in-flight
connections, so we'll call it in a greenlet:
- >>> shutdown_greenlet = gevent.spawn(lb.shutdown)
+ >>> shutdown_greenlet = spawn(lb.shutdown)
>>> gevent.sleep(.01)
>>> shutdown_greenlet.ready()
False
Modified: zc.resumelb/trunk/src/zc/resumelb/tests.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/tests.py 2012-05-09 20:24:19 UTC (rev 125791)
+++ zc.resumelb/trunk/src/zc/resumelb/tests.py 2012-05-09 21:42:19 UTC (rev 125792)
@@ -14,6 +14,7 @@
import bobo
import doctest
import gevent
+import gevent.server
import gevent.socket
import hashlib
import manuel.capture
@@ -28,6 +29,8 @@
import traceback
import unittest
import webob
+import webtest
+import zc.resumelb.lb
import zc.resumelb.util
import zc.resumelb.worker
import zc.zk.testing
@@ -125,6 +128,24 @@
if size_only:
print size
+def spawn(func, *a, **kw):
+ def run_func():
+ try:
+ return func(*a, **kw)
+ except Exception:
+ traceback.print_exc()
+ raise
+ return gevent.spawn(run_func)
+
+class FauxWorker:
+ def __init__(self):
+ self.server = server = gevent.server.StreamServer(
+ ('127.0.0.1', 0), self.handle)
+ server.start()
+ self.addr = '127.0.0.1', server.server_port
+ def handle(self, socket, addr):
+ self.socket = socket
+
def test_loading_recipes_with_no_history_argument():
"""A bug as introduced that caused resumes to be loaded
incorrectly when no history was given to the constructor. It
@@ -180,9 +201,52 @@
>>> handler.uninstall()
>>> worker.stop()
+ """ #"
+
+def Buffering_Temporary_Files_are_closed():
"""
+ When a worker sends data to an lb faster than it can send it to a
+ browser, the data gets buffered in a temporary file. When the
+ request is done, the tempirary fileis explicitly closed.
+ >>> worker = FauxWorker()
+ >>> lb = zc.resumelb.lb.LB([worker.addr], zc.resumelb.lb.host_classifier)
+ >>> wait(lambda : hasattr(worker, 'socket'))
+ >>> zc.resumelb.util.write_message(worker.socket, 0, {})
+ >>> wait(lambda : lb.pool.workers)
+Now make a request that doesn't read data, but waits until we tell it
+to close it's iterator:
+
+ >>> event = gevent.event.Event()
+ >>> @spawn
+ ... def client():
+ ... def start(*a):
+ ... print 'start_response', a
+ ... body = lb.handle_wsgi(
+ ... webob.Request.blank('/hi.html').environ, start)
+ ... event.wait()
+ ... body.close()
+ ... print 'closed body'
+
+Now, we'll send it enough data to make it ise a temporary file:
+
+ >>> [lbworker] = list(lb.pool.workers)
+ >>> wait(lambda : lbworker.queues)
+
+ >>> zc.resumelb.util.write_message(
+ ... worker.socket, 1, ('200 OK', []), 'x'*10000, 'x'*10000)
+
+ >>> wait(lambda : hasattr(lbworker.queues[1].queue, 'file'))
+ start_response ('200 OK', [])
+
+ >>> f = lbworker.queues[1].queue.file
+ >>> event.set()
+ >>> wait(lambda : f.closed)
+ closed body
+
+ """
+
def test_classifier(env):
return "yup, it's a test"
@@ -203,6 +267,8 @@
zc.resumelb.util.queue_size_bytes = 999
test.globs['newenv'] = newenv
test.globs['print_response'] = print_response
+ test.globs['spawn'] = spawn
+ test.globs['Worker'] = FauxWorker
def zkSetUp(test):
setUp(test)
Modified: zc.resumelb/trunk/src/zc/resumelb/util.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/util.py 2012-05-09 20:24:19 UTC (rev 125791)
+++ zc.resumelb/trunk/src/zc/resumelb/util.py 2012-05-09 21:42:19 UTC (rev 125792)
@@ -115,6 +115,9 @@
def qsize(self):
return self.queue.qsize()
+ def close(self):
+ pass
+
class Buffer:
size = size_bytes = read_position = write_position = 0
@@ -181,7 +184,7 @@
self.size_bytes -= len(data)
self.size -= 1
else:
- assert size == -1
+ assert self.size == -1
class Worker:
@@ -206,15 +209,13 @@
def start(self, rno):
readq = self.ReadQueue()
self.readers[rno] = readq.put
- return readq.get
+ return readq
def end(self, rno):
try:
queue = self.readers.pop(rno)
except KeyError:
return # previously cancelled
- if hasattr(queue, 'close'):
- queue.close()
def put_disconnected(self, *a, **k):
raise Disconnected()
@@ -226,3 +227,22 @@
put(None)
self.put = self.put_disconnected
+
+class LBWorker(Worker):
+
+ ReadQueue = BufferedQueue
+
+ def connected(self, socket, addr=None):
+ self.queues = {}
+ return Worker.connected(self, socket, addr)
+
+ def start(self, rno):
+ self.queues[rno] = queue = Worker.start(self, rno)
+ return queue
+
+ def end(self, rno):
+ try:
+ queue = self.readers.pop(rno)
+ except KeyError:
+ return # previously cancelled
+ self.queues.pop(rno).close()
Modified: zc.resumelb/trunk/src/zc/resumelb/worker.py
===================================================================
--- zc.resumelb/trunk/src/zc/resumelb/worker.py 2012-05-09 20:24:19 UTC (rev 125791)
+++ zc.resumelb/trunk/src/zc/resumelb/worker.py 2012-05-09 21:42:19 UTC (rev 125792)
@@ -189,7 +189,7 @@
env = data
env['zc.resumelb.lb_addr'] = addr
gevent.spawn(
- self.handle, conn, rno, conn.start(rno), env)
+ self.handle, conn, rno, conn.start(rno).get, env)
else:
rput(data)
if data is None:
More information about the checkins
mailing list