[Checkins] SVN: Sandbox/J1m/resumelb/ Added a simulation script.
Jim Fulton
jim at zope.com
Fri Jan 6 10:41:10 UTC 2012
Log message for revision 123965:
Added a simulation script.
Refactored the way unskilled workers are handled.
Changed the way settings are handled to facilitate managing settings
with ZooKeeper (as the simulation script does).
Changed:
U Sandbox/J1m/resumelb/buildout.cfg
U Sandbox/J1m/resumelb/setup.py
U Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
U Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
U Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
A Sandbox/J1m/resumelb/src/zc/resumelb/simul.py
U Sandbox/J1m/resumelb/src/zc/resumelb/thread.py
U Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
-=-
Modified: Sandbox/J1m/resumelb/buildout.cfg
===================================================================
--- Sandbox/J1m/resumelb/buildout.cfg 2012-01-05 22:42:50 UTC (rev 123964)
+++ Sandbox/J1m/resumelb/buildout.cfg 2012-01-06 10:41:09 UTC (rev 123965)
@@ -1,6 +1,5 @@
[buildout]
develop = .
-#parts = test py
parts = gevent py ctl
[ctl]
@@ -25,7 +24,10 @@
recipe = zc.recipe.egg
eggs = ${test:eggs}
PasteScript
-
+ zc.zk [static]
+ pylru
+ ZODB3
+entry-points = simul=zc.resumelb.simul:main
interpreter = py
[lb]
Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py 2012-01-05 22:42:50 UTC (rev 123964)
+++ Sandbox/J1m/resumelb/setup.py 2012-01-06 10:41:09 UTC (rev 123965)
@@ -14,7 +14,8 @@
name, version = 'zc.resumelb', '0'
install_requires = [
- 'setuptools', 'gevent', 'WebOb', 'zc.thread', 'zc.mappingobject']
+ 'setuptools', 'gevent', 'WebOb', 'zc.thread', 'zc.parse_addr',
+ 'zc.mappingobject']
extras_require = dict(
test=['zope.testing', 'bobo', 'manuel', 'WebTest'])
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py 2012-01-05 22:42:50 UTC (rev 123964)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py 2012-01-06 10:41:09 UTC (rev 123965)
@@ -1,4 +1,4 @@
-import bisect
+from bisect import bisect_left, insort
import gevent
import gevent.hub
import gevent.pywsgi
@@ -6,6 +6,7 @@
import logging
import sys
import webob
+import zc.mappingobject
import zc.resumelb.util
block_size = 1<<16
@@ -22,11 +23,12 @@
class LB:
def __init__(self, worker_addr, classifier,
- disconnect_message=default_disconnect_message
+ settings=None,
+ disconnect_message=default_disconnect_message,
):
self.classifier = classifier
self.disconnect_message = disconnect_message
- self.pool = Pool()
+ self.pool = Pool(settings)
self.worker_server = gevent.server.StreamServer(
worker_addr, self.handle_worker)
self.worker_server.start()
@@ -62,117 +64,164 @@
class Pool:
- def __init__(self, max_backlog=40):
- self.max_backlog = max_backlog
- self.unskilled = [set() for i in range(max_backlog+1)]
- self.skilled = {}
- self.resumes = {}
- self.backlogs = {}
+ def __init__(self, settings=None):
+ if settings is None:
+ settings = dict(
+ max_backlog = 40,
+ unskilled_score = 1.0,
+ )
+ self.settings = settings
+ self.workers = set()
+ self.unskilled = [] # sorted([(uscore, poolworker)])
+ self.skilled = {} # rclass -> {(score, workers)}
+ self.nskills = 0 # sum of resume lengths
self.event = gevent.event.Event()
def __repr__(self):
- skilled = self.skilled
- backlogs = self.backlogs
outl = []
out = outl.append
out('Request classes:')
- for rclass in sorted(skilled):
+ for (rclass, skilled) in sorted(self.skilled.items()):
out(' %s: %s'
% (rclass,
- ', '.join('%s(%s,%s)' % (worker, score, backlogs[worker])
- for (score, worker) in skilled[rclass])
+ ', '.join(
+ '%s(%s,%s)' %
+ (worker, score, worker.backlog)
+ for (score, worker) in sorted(skilled)
))
+ )
out('Backlogs:')
- for backlog, workers in enumerate(self.unskilled):
- if workers:
- out(' %s: %s' % (backlog, sorted(workers)))
+ backlogs = {}
+ for worker in self.workers:
+ backlogs.setdefault(worker.backlog, []).append(worker)
+ for backlog, workers in sorted(backlogs.items()):
+ out(' %s: %r' % (backlog, sorted(workers)))
return '\n'.join(outl)
- def new_resume(self, worker, resume):
+ def new_resume(self, worker, resume=None):
skilled = self.skilled
- resumes = self.resumes
- try:
- old = resumes[worker]
- except KeyError:
- self.backlogs[worker] = 0
- self.unskilled[0].add(worker)
- self.event.set()
+ unskilled = self.unskilled
+ if worker in self.workers:
+ if worker.backlog < self.settings['max_backlog']:
+ del unskilled[bisect_left(unskilled, (worker.uscore, worker))]
+ for rclass, score in worker.resume.iteritems():
+ skilled[rclass].remove((score, worker))
+ self.nskills -= len(worker.resume)
else:
- for rclass, score in old.iteritems():
- workers = skilled[rclass]
- del workers[bisect.bisect_left(workers, (score, worker))]
+ self.workers.add(worker)
+ worker.backlog = 0
- for rclass, score in resume.iteritems():
- bisect.insort(skilled.setdefault(rclass, []), (score, worker))
+ if resume is None:
+ self.workers.remove(worker)
+ else:
+ worker.resume = resume
+ self.nskills += len(resume)
+ if resume:
+ scores = sorted(resume.values())
+ worker.unskilled_score = max(
+ self.settings['unskilled_score'],
+ scores[
+ min(
+ max(3, len(scores)/4),
+ len(scores)-1,
+ )
+ ] / 10.0
+ )
+ else:
+ worker.unskilled_score = (
+ self.settings['unskilled_score'] * (1.0 + self.nskills) /
+ len(self.workers))
- resumes[worker] = resume
+ uscore = (
+ worker.unskilled_score /
+ (1.0 + worker.backlog)
+ )
+ worker.uscore = uscore
+ insort(unskilled, (uscore, worker))
+ for rclass, score in resume.iteritems():
+ try:
+ skilled[rclass].add((score, worker))
+ except KeyError:
+ skilled[rclass] = set(((score, worker), ))
+
+ if self.unskilled:
+ self.event.set()
+
def remove(self, worker):
- self.new_resume(worker, {})
- backlog = self.backlogs.pop(worker)
- self.unskilled[backlog].remove(worker)
- del self.resumes[worker]
+ self.new_resume(worker)
def get(self, rclass, timeout=None):
"""Get a worker to handle a request class
"""
- max_backlog = self.max_backlog
- backlogs = self.backlogs
+
unskilled = self.unskilled
- while 1:
+ if not unskilled:
+ self.event.wait(timeout)
+ if not self.unskilled:
+ return None
- # Look for a skilled worker
- best_score = 0
- for score, worker in reversed(self.skilled.get(rclass, ())):
- backlog = backlogs[worker] + 1
- if backlog > max_backlog:
- continue
- score /= backlog
- if score <= best_score:
- break
+ # Look for a skilled worker
+ best_score, unskilled_worker = unskilled[-1]
+ best_worker = best_backlog = None
+ max_backlog = self.settings['max_backlog']
+ skilled = self.skilled.get(rclass, ())
+ for score, worker in skilled:
+ backlog = worker.backlog + 1
+ if backlog > max_backlog:
+ continue
+ score /= backlog
+ if (score > best_score
+ or
+ (best_worker is None and worker is unskilled_worker)
+ ):
best_score = score
+ best_worker = worker
best_backlog = backlog
- best_worker = worker
- if best_score:
- unskilled[best_backlog-1].remove(best_worker)
- unskilled[best_backlog].add(best_worker)
- backlogs[best_worker] = best_backlog
- return best_worker
+ if best_worker is not None:
+ uscore = best_worker.uscore
+ del unskilled[bisect_left(unskilled, (uscore, best_worker))]
+ else:
+ uscore, best_worker = unskilled.pop()
+ best_backlog = best_worker.backlog + 1
+ self.nskills += 1
+ resume = best_worker.resume
+ score = max(uscore, self.settings['unskilled_score'] * 10)
+ best_worker.resume[rclass] = score
+ if skilled == ():
+ self.skilled[rclass] = set(((score, best_worker),))
+ else:
+ skilled.add((score, best_worker))
+ lresume = len(resume)
+ uscore *= lresume/(lresume + 1.0)
- # Look for an unskilled worker
- for backlog, workers in enumerate(unskilled):
- if workers:
- worker = workers.pop()
- backlog += 1
- try:
- unskilled[backlog].add(worker)
- except IndexError:
- workers.add(worker)
- else:
- backlogs[worker] = backlog
- resume = self.resumes[worker]
- if rclass not in resume:
- self.resumes[worker][rclass] = 1.0
- bisect.insort(self.skilled.setdefault(rclass, []),
- (1.0, worker))
- return worker
+ uscore *= best_backlog / (1.0 + best_backlog)
+ best_worker.uscore = uscore
+ best_worker.backlog = best_backlog
+ if best_backlog < max_backlog:
+ insort(unskilled, (uscore, best_worker))
+ return best_worker
- # Dang. Couldn't find a worker, either because we don't
- # have any yet, or because they're all too busy.
- self.event.clear()
- self.event.wait(timeout)
- if timeout is not None and not self.event.is_set():
- return None
-
def put(self, worker):
- backlogs = self.backlogs
+ backlog = worker.backlog
+ if backlog < 1:
+ return
unskilled = self.unskilled
- backlog = backlogs[worker]
- unskilled[backlog].remove(worker)
+ max_backlog = self.settings['max_backlog']
+ uscore = worker.uscore
+ if backlog < max_backlog:
+ del unskilled[bisect_left(unskilled, (uscore, worker))]
+
+ uscore *= (backlog + 1.0) / backlog
+ worker.uscore = uscore
+
backlog -= 1
- unskilled[backlog].add(worker)
- backlogs[worker] = backlog
+ worker.backlog = backlog
+
+ if backlog < max_backlog:
+ insort(unskilled, (uscore, worker))
+
self.event.set()
class Worker(zc.resumelb.util.Worker):
@@ -197,6 +246,9 @@
else:
readers[rno](data)
+ def __repr__(self):
+ return "worker-%s" % id(self)
+
def handle(self, rclass, env, start_response):
logger.debug('handled by %s', self.addr)
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.test 2012-01-05 22:42:50 UTC (rev 123964)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.test 2012-01-06 10:41:09 UTC (rev 123965)
@@ -15,11 +15,11 @@
>>> from zc.resumelb.util import read_message, write_message
>>> worker1 = gevent.socket.create_connection(
... ('127.0.0.1', lb.worker_server.server_port))
- >>> write_message(worker1, 0, {'h1.com': 1.0})
+ >>> write_message(worker1, 0, {'h1.com': 10.0})
>>> worker2 = gevent.socket.create_connection(
... ('127.0.0.1', lb.worker_server.server_port))
- >>> write_message(worker2, 0, {'h2.com': 1.0})
+ >>> write_message(worker2, 0, {'h2.com': 10.0})
Now, let's make a request and make sure the data gets where it's
supposed to go.
@@ -64,6 +64,7 @@
... app2.put, '/hi.html', 'i'*200000, [('Host', 'h1.com')])
>>> rno, env2 = read_message(worker1)
+
>>> rno
2
>>> pprint(env2)
@@ -176,7 +177,7 @@
At this point, there are no outstanding requests. The pool back-logs
should all be 0:
- >>> sum(lb.pool.backlogs.values())
+ >>> sum(worker.backlog for worker in lb.pool.workers)
0
Worker disconnection
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/pool.test 2012-01-05 22:42:50 UTC (rev 123964)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/pool.test 2012-01-06 10:41:09 UTC (rev 123965)
@@ -14,7 +14,7 @@
We'll test the pool with stand-ins for the local workers.
>>> import zc.resumelb.lb
- >>> pool = zc.resumelb.lb.Pool(max_backlog=5)
+ >>> pool = zc.resumelb.lb.Pool(dict(max_backlog=5, unskilled_score=1.0))
We specified a maximum per-worker backlog for the pool. We specified
a fairly low max backlog to make it easier to see what happens when
@@ -37,6 +37,9 @@
... return cmp(self.name, other.name)
... def __hash__(self):
... return hash(self.name)
+ ... def handle(self, *args):
+ ... pass
+ ... Disconnected = None
>>> w1 = Worker('w1')
>>> pool.new_resume(w1, {})
@@ -71,8 +74,8 @@
>>> pool
Request classes:
- bar: w2(1.0,1)
- foo: w1(1.0,2)
+ bar: w2(10.0,1)
+ foo: w1(10.0,2)
Backlogs:
1: [w2]
2: [w1]
@@ -90,8 +93,8 @@
>>> pool
Request classes:
- bar: w2(1.0,1)
- foo: w1(1.0,5)
+ bar: w2(10.0,1)
+ foo: w1(10.0,5)
Backlogs:
0: [w3]
1: [w2]
@@ -107,8 +110,8 @@
>>> pool
Request classes:
- bar: w2(1.0,3)
- foo: w1(1.0,5), w2(1.0,3), w3(1.0,5)
+ bar: w2(10.0,3)
+ foo: w1(10.0,5), w2(10.0,3), w3(10.0,5)
Backlogs:
3: [w2]
5: [w1, w3]
@@ -131,49 +134,61 @@
>>> pool.put(w3)
>>> pool
Request classes:
- bar: w2(1.0,4)
- foo: w1(1.0,2), w2(1.0,4), w3(1.0,3)
+ bar: w2(10.0,4)
+ foo: w1(10.0,2), w2(10.0,4), w3(10.0,3)
Backlogs:
2: [w1]
3: [w3]
4: [w2]
-Now, when we get a worker, we'll get w3.
+Now, when we get a worker, we'll get w1.
>>> pool.get('foo', 0.0)
- w3
+ w1
-Why? We adjust each score by the worker's backlog and search workers
-from high score to low until the adjusted score increases. Because w2
-has a higher backlog than w3, it's adjusted score is lower so we stop
-looking. This is for 2 reasons:
+Why? We adjust each score by the worker's backlog.
-1. We want to bias selection towards a smaller number of workers,
- ideally those with the best scores,
-
-2. We want to reduce the amount of work done in each get call.
-
Now that we've done some work, let's updaye the resumes. This will
normally be done by workers after periodically collecting performance
data.
- >>> pool.new_resume(w1, {'foo': 3.0})
+ >>> pool.new_resume(w1, {'foo': 30.0})
- >>> pool.new_resume(w2, {'bar': 1.0, 'foo': 1.0})
+ ;>>> pool.new_resume(w2, {'bar': 10.0, 'foo': 10.0})
- >>> pool.new_resume(w3, {'foo': 2.0})
+ >>> pool.new_resume(w3, {'foo': 19.0})
- pool
+ >>> pool
Request classes:
- bar: w2(1.0,4)
- foo: w2(1.0,4), w3(2.0,4), w1(3.0,2)
+ bar: w2(10.0,4)
+ foo: w2(10.0,4), w3(19.0,3), w1(30.0,3)
Backlogs:
- 2: [w1]
- 4: [w2, w3]
+ 3: [w1, w3]
+ 4: [w2]
- >>> [pool.get('foo') for i in range(5)]
- [w1, w1, w1, w3, w2]
+ >>> pool.get('foo')
+ w1
+ >>> pool.get('foo')
+ w1
+ >>> pool
+ Request classes:
+ bar: w2(10.0,4)
+ foo: w2(10.0,4), w3(19.0,3), w1(30.0,5)
+ Backlogs:
+ 3: [w3]
+ 4: [w2]
+ 5: [w1]
+
+Because w1 has reached the maximum backlog, it's out of the running.
+
+ >>> pool.get('foo')
+ w3
+ >>> pool.get('foo')
+ w3
+ >>> pool.get('foo')
+ w2
+
>>> pool.put(w1)
>>> pool.put(w3)
>>> pool.put(w3)
@@ -182,8 +197,8 @@
>>> pool.put(w3)
>>> pool
Request classes:
- bar: w2(1.0,5)
- foo: w2(1.0,5), w3(2.0,0), w1(3.0,4)
+ bar: w2(10.0,5)
+ foo: w2(10.0,5), w3(19.0,0), w1(30.0,4)
Backlogs:
0: [w3]
4: [w1]
@@ -195,10 +210,10 @@
When a worker disconnect, it's removed from the pool:
>>> pool.remove(w1)
+ >>> pool.remove(w3)
>>> pool
Request classes:
- bar: w2(1.0,5)
- foo: w2(1.0,5), w3(2.0,4)
+ bar: w2(10.0,5)
+ foo: w2(10.0,5)
Backlogs:
- 4: [w3]
5: [w2]
Added: Sandbox/J1m/resumelb/src/zc/resumelb/simul.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/simul.py (rev 0)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/simul.py 2012-01-06 10:41:09 UTC (rev 123965)
@@ -0,0 +1,388 @@
+##############################################################################
+#
+# Copyright Zope Foundation and Contributors.
+# All Rights Reserved.
+#
+# This software is subject to the provisions of the Zope Public License,
+# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
+# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
+# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
+# FOR A PARTICULAR PURPOSE.
+#
+##############################################################################
+"""Simulation
+
+Client makes requests. It has some number of outstanding requests. It
+accomplishes this through a pool of greenlets.
+
+Have some number of workers.
+
+Each worker app has an lru cache of a given size.
+
+There are some number of "sites". Each site has a number of objects.
+
+A request "requests" a set of objects for a site. The worker app
+sleeps .01 for every object not in it's cache.
+
+The whole thing is controlled from a zookeeper node with properties:
+
+- history lb worker history, which controls how many requests to
+ perform between resume updates.
+- sim_cache_size Size of client lru cache
+- sim_clients # concurrent requests
+- sim_lambda exponential distribution parameter for selecting sites (.1)
+- sim_objects_per_site average number of objects per site.
+- sim_objects_per_request
+- sim_sites number of sites
+- sim_workers number of workers
+
+"""
+import json
+import logging
+import os
+import pylru
+import random
+import sys
+import time
+import threading
+import zc.mappingobject
+import zc.parse_addr
+import zc.thread
+import zc.zk
+import zookeeper
+
+logger = logging.getLogger(__name__)
+
+class Sample:
+
+ def __init__(self, size=1000, data=None):
+ self.size = size
+ self.data = data or []
+ self.n = len(self.data)
+
+ def add(self, v):
+ self.n += 1
+ try:
+ self.data[self.n % self.size] = v
+ except IndexError:
+ self.data.append(v)
+
+ def stats(self, prefix=None):
+ data = sorted(self.data)
+ return {
+ 'n': self.n,
+ 'mean': float(sum(data))/len(data),
+ 'min': data[0],
+ '10': data[len(data)/10],
+ '50': data[len(data)/2],
+ '90': data[9*len(data)/10],
+ 'max': data[-1],
+ }
+
+ def __repr__(self):
+ return ("%(n)s %(min)s %(10)s %(50)s(%(mean)s) %(90)s %(max)s" %
+ self.stats(''))
+
+class App:
+
+ def __init__(self, properties):
+ settings = zc.mappingobject.mappingobject(properties)
+ self.cache_size = settings.sim_cache_size
+ self.cache = pylru.lrucache(self.cache_size)
+ self.hitrates = Sample()
+
+ @properties
+ def changed(*a):
+ if settings.sim_cache_size != self.cache_size:
+ self.cache_size = settings.sim_cache_size
+ self.cache.size(self.cache_size)
+
+ def __call__(self, environ, start_response):
+ """Simplest possible application object"""
+
+ n = nhit = nmiss = nevict = 0
+ for oid in environ['PATH_INFO'].rsplit('/', 1)[1].split('_'):
+ n += 1
+ key = environ['HTTP_HOST'], oid
+ if key in self.cache:
+ nhit += 1
+ else:
+ nmiss += 1
+ if len(self.cache) >= self.cache_size:
+ nevict += 1
+ self.cache[key] = 1
+
+ time.sleep(.01)
+
+ result = ' '.join(map(str, (os.getpid(), n, nhit, nmiss, nevict)))+'\n'
+ response_headers = [
+ ('Content-type', 'text/plain'),
+ ('Content-Length', str(len(result))),
+ ]
+ start_response('200 OK', response_headers)
+ if n:
+ self.hitrates.add(100.0*nhit/n)
+ return [result]
+
+def worker(path):
+ import logging
+ logging.basicConfig()
+ logger = logging.getLogger(__name__+'-worker')
+ try:
+ import zc.resumelb.worker
+ import zc.zk
+
+ zk = zc.zk.ZooKeeper()
+ lbpath = path + '/lb'
+ while not (zk.exists(lbpath) and zk.get_children(lbpath)):
+ time.sleep(.01)
+ [lbaddr] = zk.get_children(lbpath)
+
+ properties = zk.properties(path)
+
+ class Worker(zc.resumelb.worker.Worker):
+
+ def new_resume(self, resume):
+ stats = dict(hitrate=str(app.hitrates))
+ stats.update(resume)
+ zk.set(worker_path, json.dumps(stats))
+ zc.resumelb.worker.Worker.new_resume(self, resume)
+
+ worker_path = path + '/workers/%s' % os.getpid()
+
+ zk.create(worker_path, '',
+ zc.zk.OPEN_ACL_UNSAFE, zookeeper.EPHEMERAL)
+
+ app = App(properties)
+ Worker(app, zc.parse_addr.parse_addr(lbaddr), properties)
+ except:
+ logger.exception('worker')
+
+def clients(path):
+ import zc.zk
+ zk = zc.zk.ZooKeeper()
+
+ properties = zk.properties(path)
+ settings = zc.mappingobject.mappingobject(properties)
+
+ logging.basicConfig()
+
+ wpath = path + '/wsgi'
+ while not (zk.exists(wpath) and zk.get_children(wpath)):
+ time.sleep(.01)
+ [waddr] = zk.get_children(wpath)
+ waddr = zc.parse_addr.parse_addr(waddr)
+
+ stats = zc.mappingobject.mappingobject(dict(
+ sim_truncated = 0,
+ sim_requests = 0,
+ sim_bypid = {},
+ sim_nobs = 0,
+ sim_nhits = 0,
+ ))
+
+ spath = path + '/stats'
+ if not zk.exists(spath):
+ zk.create(spath, '', zc.zk.OPEN_ACL_UNSAFE)
+
+ import gevent.socket
+
+ def do_request():
+ siteid = random.randint(0, settings.sim_sites)
+ oids = set(
+ int(random.gauss(0, settings.sim_objects_per_site/4))
+ for i in range(settings.sim_objects_per_request)
+ )
+ socket = gevent.socket.create_connection(waddr)
+ try:
+ socket.sendall(
+ request_template % dict(
+ data='_'.join(map(str, oids)),
+ host='h%s' % siteid,
+ )
+ )
+ response = ''
+ while '\r\n\r\n' not in response:
+ data = socket.recv(9999)
+ if not data:
+ stats.sim_truncated += 1
+ return
+ response += data
+ headers, body = response.split('\r\n\r\n')
+ headers = headers.split('\r\n')
+ status = headers.pop(0)
+ headers = dict(l.strip().lower().split(':', 1)
+ for l in headers if ':' in l)
+ content_length = int(headers['content-length'])
+ while len(body) < content_length:
+ data = socket.recv(9999)
+ if not data:
+ stats.sim_truncated += 1
+ return
+ body += data
+
+ pid, n, nhit, nmiss, nevict = map(int, body.strip().split())
+ stats.sim_requests += 1
+ stats.sim_nobs += n
+ stats.sim_nhits += nhit
+ bypid = stats.sim_bypid.get(pid)
+ if bypid is None:
+ bypid = stats.sim_bypid[pid] = dict(nr=0, n=0, nhit=0)
+ bypid['nr'] += 1
+ bypid['n'] += n
+ bypid['nhit'] += nhit
+ logger.info(' '.join(map(str, (
+ 100*stats.sim_nhits/stats.sim_nobs,
+ pid, n, nhit, 100*nhit/n,
+ ))))
+ finally:
+ socket.close()
+
+ def client():
+ try:
+ while 1:
+ do_request()
+ except:
+ print 'client error'
+ logging.getLogger(__name__+'-client').exception('client')
+
+ greenlets = [gevent.spawn(client) for i in range(settings.sim_clients)]
+
+ import gevent.queue, zc.resumelb.thread
+ update_queue = gevent.queue.Queue()
+
+ @properties
+ def update(*a):
+ print 'put update'
+ update_queue.put(None)
+ zc.resumelb.thread.wake_gevent()
+
+ while 1:
+ update_queue.get()
+ print 'got update event'
+ while settings.sim_clients > len(greenlets):
+ greenlets.append(gevent.spawn(client))
+ while settings.sim_clients < len(greenlets):
+ greenlets.pop().kill()
+
+request_template = """GET /%(data)s HTTP/1.1\r
+Host: %(host)s\r
+\r
+"""
+
+class LBLogger:
+
+ def __init__(self, lb, zk, path):
+ self.lb = lb
+ self.requests = Sample()
+ self.nr = self.requests.n
+ self.zk = zk
+ self.path = path
+ self.then = time.time()
+
+ def write(self, line):
+ status, _, t = line.split()[-3:]
+ if status != '200':
+ print 'error', line
+ self.requests.add(float(t))
+ if ((time.time() - self.then > 30)
+ #or self.nr < 30
+ ):
+ pool = self.lb.pool
+ self.then = time.time()
+ print
+ print 'requests', self.requests.n-self.nr, self.requests
+ self.nr = self.requests.n
+ print pool.unskilled
+ print 'backlogs', str(Sample(data=[
+ worker.backlog for worker in pool.workers]))
+ print 'resumes', str(Sample(data=[
+ len(worker.resume) for worker in pool.workers]))
+ print 'skilled', str(Sample(
+ data=map(len, pool.skilled.values())))
+
+ for rclass, skilled in sorted(pool.skilled.items()):
+ if (len(skilled) > len(pool.workers) or
+ len(set(i[1] for i in skilled)) != len(skilled)
+ ):
+ print 'bad skilled', sorted(skilled, key=lambda i: i[1])
+
+
+
+
+
+
+
+ # print 'backlogs', str(Sample(data=self.lb.pool.backlogs.values()))
+ # print 'resumes', str(Sample(
+ # data=map(len, self.lb.pool.resumes.values())))
+ # print 'skilled', str(Sample(
+ # data=map(len, self.lb.pool.skilled.values())))
+ # self.zk.set(self.path, json.dumps(dict(
+ # requests = str(self.requests),
+ # backlogs = str(Sample(data=self.lb.pool.backlogs.values())),
+ # resumes = str(Sample(
+ # data=map(len, self.lb.pool.resumes.values()))),
+ # skilled = str(Sample(
+ # data=map(len, self.lb.pool.skilled.values()))),
+ # )))
+
+def main(args=None):
+ if args is None:
+ args = sys.argv[1:]
+ [path] = args
+ logging.basicConfig()
+
+ @zc.thread.Process(args=(path,))
+ def lb(path):
+ import logging
+ logging.basicConfig()
+ logger = logging.getLogger(__name__+'-lb')
+ try:
+ import zc.resumelb.lb
+ import gevent.pywsgi
+ zk = zc.zk.ZooKeeper()
+ lb = zc.resumelb.lb.LB(
+ ('127.0.0.1', 0), zc.resumelb.lb.host_classifier,
+ settings=zk.properties(path))
+ lbpath = path + '/lb'
+ if not zk.exists(lbpath):
+ zk.create(lbpath, '', zc.zk.OPEN_ACL_UNSAFE)
+ zk.register_server(
+ lbpath, ('127.0.0.1', lb.worker_server.server_port))
+
+ wsgi_server = gevent.pywsgi.WSGIServer(
+ ('127.0.0.1', 0), lb.handle_wsgi, log=LBLogger(lb, zk, lbpath),
+ )
+ wsgi_server.start()
+ wpath = path + '/wsgi'
+ if not zk.exists(wpath):
+ zk.create(wpath, '', zc.zk.OPEN_ACL_UNSAFE)
+ zk.register_server(wpath, ('127.0.0.1', wsgi_server.server_port))
+ wsgi_server.serve_forever()
+ except:
+ logger.exception('lb')
+
+ zk = zc.zk.ZooKeeper()
+
+ workers_path = path + '/workers'
+ if not zk.exists(workers_path):
+ zk.create(workers_path, '', zc.zk.OPEN_ACL_UNSAFE)
+
+ properties = zk.properties(path)
+ settings = zc.mappingobject.mappingobject(properties)
+
+ workers = [zc.thread.Process(worker, args=(path,))
+ for i in range(settings.sim_workers)]
+
+ clients_process = zc.thread.Process(clients, args=(path,))
+
+ @properties
+ def update(*a):
+ while settings.sim_workers > len(workers):
+ workers.append(zc.thread.Process(worker, args=(path,)))
+ while settings.sim_workers < len(workers):
+ workers.pop().terminate()
+
+ threading.Event().wait() # sleep forever
Property changes on: Sandbox/J1m/resumelb/src/zc/resumelb/simul.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/thread.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/thread.py 2012-01-05 22:42:50 UTC (rev 123964)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/thread.py 2012-01-06 10:41:09 UTC (rev 123965)
@@ -41,20 +41,23 @@
except EnvironmentError:
pass
-gevent.core.event(gevent.core.EV_READ|gevent.core.EV_PERSIST, \
- _core_pipe_read, _core_pipe_read_callback).add()
+gevent.core.event(gevent.core.EV_READ|gevent.core.EV_PERSIST,
+ _core_pipe_read, _core_pipe_read_callback).add()
+def wake_gevent():
+ os.write(_core_pipe_write, '\0')
+
# MTAsyncResult is greatly simplified from version in https://bitbucket.org/
# denis/gevent-playground/src/49d1cdcdf643/geventutil/threadpool.py
class MTAsyncResult(gevent.event.AsyncResult):
def set_exception(self, exception):
gevent.event.AsyncResult.set_exception(self, exception)
- os.write(_core_pipe_write, '\0')
+ wake_gevent()
def set(self, value=None):
gevent.event.AsyncResult.set(self, value)
- os.write(_core_pipe_write, '\0')
+ wake_gevent()
#
###############################################################################
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/worker.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/worker.py 2012-01-05 22:42:50 UTC (rev 123964)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/worker.py 2012-01-06 10:41:09 UTC (rev 123965)
@@ -98,15 +98,19 @@
sumn[1] += 1
else:
byrclass[rclass] = [elapsed, 1]
- self.resume = dict(
+ self.new_resume(dict(
(rclass, n/sum)
for (rclass, (sum, n)) in byrclass.iteritems()
- )
- self.put((0, self.resume))
+ ))
except self.Disconnected:
return # whatever
+ def new_resume(self, resume):
+ self.resume = resume
+ self.put((0, resume))
+
+
def server_runner(app, global_conf, lb, history=500): # paste deploy hook
logging.basicConfig(level=logging.INFO)
host, port = lb.split(':')
More information about the checkins
mailing list