[Checkins] SVN: Sandbox/J1m/resumelb/s New simpler lb algorithm. More rweeking in future, but enough for now.
Jim Fulton
jim at zope.com
Sun Jan 15 18:41:50 UTC 2012
Log message for revision 124054:
New simpler lb algorithm. More rweeking in future, but enough for now.
Changed:
U Sandbox/J1m/resumelb/setup.py
U Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
U Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
-=-
Modified: Sandbox/J1m/resumelb/setup.py
===================================================================
--- Sandbox/J1m/resumelb/setup.py 2012-01-15 18:41:47 UTC (rev 124053)
+++ Sandbox/J1m/resumelb/setup.py 2012-01-15 18:41:49 UTC (rev 124054)
@@ -15,7 +15,7 @@
install_requires = [
'setuptools', 'gevent', 'WebOb', 'zc.thread', 'zc.parse_addr',
- 'zc.mappingobject']
+ 'zc.mappingobject', 'llist']
extras_require = dict(
test=['zope.testing', 'bobo', 'manuel', 'WebTest'])
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/lb.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/lb.py 2012-01-15 18:41:47 UTC (rev 124053)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/lb.py 2012-01-15 18:41:49 UTC (rev 124054)
@@ -3,6 +3,7 @@
import gevent.hub
import gevent.pywsgi
import gevent.server
+import llist
import logging
import sys
import webob
@@ -66,15 +67,11 @@
def __init__(self, settings=None):
if settings is None:
- settings = dict(
- max_backlog = 40,
- unskilled_score = 1.0,
- )
+ settings = {}
self.settings = settings
self.workers = set()
- self.unskilled = [] # sorted([(uscore, poolworker)])
+ self.unskilled = llist.dllist()
self.skilled = {} # rclass -> {(score, workers)}
- self.nskills = 0 # sum of resume lengths
self.event = gevent.event.Event()
def __repr__(self):
@@ -101,51 +98,39 @@
def new_resume(self, worker, resume=None):
skilled = self.skilled
unskilled = self.unskilled
- if worker in self.workers:
- if worker.backlog < self.settings['max_backlog']:
- del unskilled[bisect_left(unskilled, (worker.uscore, worker))]
+ workers = self.workers
+
+ target_skills_per_worker = 1 + (
+ self.settings.get('redundancy', 1) * len(skilled) /
+ (len(workers) or 1))
+
+ if worker in workers:
for rclass, score in worker.resume.iteritems():
skilled[rclass].remove((score, worker))
- self.nskills -= len(worker.resume)
+ if resume is None:
+ workers.remove(worker)
+ if worker.lnode is not None:
+ unskilled.remove(worker.lnode)
+ worker.lnode = None
+ return
else:
- self.workers.add(worker)
worker.backlog = 0
+ workers.add(worker)
+ worker.lnode = unskilled.appendleft(worker)
- if resume is None:
- self.workers.remove(worker)
- else:
- worker.resume = resume
- self.nskills += len(resume)
- if resume:
- scores = sorted(resume.values())
- worker.unskilled_score = max(
- self.settings['unskilled_score'],
- scores[
- min(
- max(3, len(scores)/4),
- len(scores)-1,
- )
- ] / 10.0
- )
- else:
- worker.unskilled_score = (
- self.settings['unskilled_score'] * (1.0 + self.nskills) /
- len(self.workers))
+ resumeitems = resume.items()
+ drop = (len(resume) - target_skills_per_worker) / 2
+ if drop > 0:
+ resumeitems = sorted(resumeitems, key=lambda i: i[1])[drop:]
- uscore = (
- worker.unskilled_score /
- (1.0 + worker.backlog)
- )
- worker.uscore = uscore
- insort(unskilled, (uscore, worker))
- for rclass, score in resume.iteritems():
- try:
- skilled[rclass].add((score, worker))
- except KeyError:
- skilled[rclass] = set(((score, worker), ))
+ worker.resume = dict(resumeitems)
+ for rclass, score in resumeitems:
+ try:
+ skilled[rclass].add((score, worker))
+ except KeyError:
+ skilled[rclass] = set(((score, worker), ))
-
- if self.unskilled:
+ if unskilled:
self.event.set()
def remove(self, worker):
@@ -154,76 +139,91 @@
def get(self, rclass, timeout=None):
"""Get a worker to handle a request class
"""
-
unskilled = self.unskilled
if not unskilled:
self.event.wait(timeout)
- if not self.unskilled:
+ if not unskilled:
return None
# Look for a skilled worker
- best_score, unskilled_worker = unskilled[-1]
- best_worker = best_backlog = None
- max_backlog = self.settings['max_backlog']
- skilled = self.skilled.get(rclass, ())
+ max_backlog = self.settings.get('max_backlog', 40)
+ min_score = self.settings.get('min_score', 1.0)
+ best_score = 0
+ best_worker = None
+ skilled = self.skilled.get(rclass)
+ if skilled is None:
+ skilled = self.skilled[rclass] = set()
for score, worker in skilled:
backlog = worker.backlog + 1
- if backlog > max_backlog:
- continue
+ if backlog > 2:
+ if (
+ # Don't let a worker get too backed up
+ backlog > max_backlog or
+
+ # We use min score as a way of allowing other workers
+ # a chance to pick up work even if the skilled workers
+ # haven't reached their backlog. This is mainly a tuning
+ # tool for when a worker is doing OK, but maybe still
+ # doing too much.
+ (score < min_score and
+ unskilled and unskilled.first.value.backlog == 0
+ )
+ ):
+ continue
score /= backlog
- if (score > best_score
- or
- (best_worker is None and worker is unskilled_worker)
- ):
+ if (score > best_score):
best_score = score
best_worker = worker
- best_backlog = backlog
- if best_worker is not None:
- uscore = best_worker.uscore
- del unskilled[bisect_left(unskilled, (uscore, best_worker))]
- else:
- uscore, best_worker = unskilled.pop()
- best_backlog = best_worker.backlog + 1
- self.nskills += 1
- resume = best_worker.resume
- score = max(uscore, self.settings['unskilled_score'] * 10)
- best_worker.resume[rclass] = score
- if skilled == ():
- self.skilled[rclass] = set(((score, best_worker),))
- else:
+ if not best_score:
+ while unskilled.first.value.backlog >= max_backlog:
+ # Edge case. max_backlog was reduced after a worker
+ # with a larger backlog was added.
+ #import pdb; pdb.set_trace()
+ unskilled.first.value.lnode = None
+ unskilled.popleft()
+ if not unskilled:
+ # OK, now we need to wait. Just start over.
+ return self.get(rclass, timeout)
+
+ best_worker = unskilled.first.value
+ if rclass not in best_worker.resume:
+
+ # We now have an unskilled worker and we need to
+ # assign it a score.
+ # - It has to be >= min score, or it won't get future work.
+ # - We want to give it work somewhat gradually.
+ # - We got here because:
+ # - there are no skilled workers,
+ # - The skilled workers have all either:
+ # - Eached their max backlog, or
+ # - Have scores > min score
+ # Let's set it to min score because either:
+ # - There are no skilled workers, so they'll all get the same
+ # - Other workers are maxed out, or
+ # - The score will be higher than some the existing, so it'll
+ # get work
+ # We also allow for an unskilled_score setting to override.
+ score = self.settings.get('unskilled_score', min_score)
+ best_worker.resume[rclass] = score
skilled.add((score, best_worker))
- lresume = len(resume)
- uscore *= lresume/(lresume + 1.0)
- uscore *= best_backlog / (1.0 + best_backlog)
- best_worker.uscore = uscore
- best_worker.backlog = best_backlog
- if best_backlog < max_backlog:
- insort(unskilled, (uscore, best_worker))
+ unskilled.remove(best_worker.lnode)
+ best_worker.backlog += 1
+ if best_worker.backlog < max_backlog:
+ best_worker.lnode = unskilled.append(best_worker)
+ else:
+ best_worker.lnode = None
+
return best_worker
def put(self, worker):
- backlog = worker.backlog
- if backlog < 1:
- return
- unskilled = self.unskilled
- max_backlog = self.settings['max_backlog']
- uscore = worker.uscore
- if backlog < max_backlog:
- del unskilled[bisect_left(unskilled, (uscore, worker))]
+ if worker.lnode is None:
+ worker.lnode = self.unskilled.append(worker)
+ self.event.set()
+ if worker.backlog:
+ worker.backlog -= 1
- uscore *= (backlog + 1.0) / backlog
- worker.uscore = uscore
-
- backlog -= 1
- worker.backlog = backlog
-
- if backlog < max_backlog:
- insort(unskilled, (uscore, worker))
-
- self.event.set()
-
class Worker(zc.resumelb.util.Worker):
maxrno = (1<<32) - 1
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/pool.test
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/pool.test 2012-01-15 18:41:47 UTC (rev 124053)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/pool.test 2012-01-15 18:41:49 UTC (rev 124054)
@@ -1,3 +1,4 @@
+===============================
Resume-based load balancer pool
===============================
@@ -11,15 +12,14 @@
the local workers, which, in term, forwward the requests to the remote
workers.
-We'll test the pool with stand-ins for the local workers.
+We'll test the pool with stand-ins for the local workers. The pool
+constructor takes a settings mapping object. This allows the settings
+to be managed in real time.
>>> import zc.resumelb.lb
- >>> pool = zc.resumelb.lb.Pool(dict(max_backlog=5, unskilled_score=1.0))
+ >>> settings = {}
+ >>> pool = zc.resumelb.lb.Pool(settings)
-We specified a maximum per-worker backlog for the pool. We specified
-a fairly low max backlog to make it easier to see what happens when
-a worker gets too backed up.
-
The get method is used to get a worker from the pool. A request class
and an optional timeout is passed. (The timeout is mainly useful for
testing.)
@@ -42,25 +42,25 @@
... Disconnected = None
>>> w1 = Worker('w1')
+
>>> pool.new_resume(w1, {})
As far as the pool is concerned, any object that can be in a set or be
used as a dictionary key can be used as a worker. The pool doesn't
-care. We could have used ``object`` as out worker class, but we
-constructed a worker class that makes testing output more useful.
+care. The pool does add some extra attrobutes to workers.
- >>> pool.get('foo', 0.0)
- w1
+ >>> pool.get('foo', 0.0)
+ w1
- This time, we got the one we registered.
+This time, we got the one we registered.
- If we create another and register it, we'll still get the original:
+If we create another and register it, we'll still get the original:
- >>> w2 = Worker('w2')
- >>> pool.new_resume(w2, {})
+ >>> w2 = Worker('w2')
+ >>> pool.new_resume(w2, {})
- >>> pool.get('foo')
- w1
+ >>> pool.get('foo')
+ w1
This is because w1 is known to be good at handling foo requests.
@@ -74,15 +74,20 @@
>>> pool
Request classes:
- bar: w2(10.0,1)
- foo: w1(10.0,2)
+ bar: w2(1.0,1)
+ foo: w1(1.0,2)
Backlogs:
1: [w2]
2: [w1]
Here, we can see that w1 is used for the foo class and w2 for the bar
-class. Let's add another worker:
+class. In the request classes, the worker's score and it's overall
+backlog if shown in paretheses. We see that both workers have a score
+of 1.0. This is the default score for new workers. We'll say more
+about this later.
+Let's add another worker:
+
>>> w3 = Worker('w3')
>>> pool.new_resume(w3, {})
@@ -93,15 +98,21 @@
>>> pool
Request classes:
- bar: w2(10.0,1)
- foo: w1(10.0,5)
+ bar: w2(1.0,1)
+ foo: w1(1.0,5)
Backlogs:
0: [w3]
1: [w2]
5: [w1]
Even though we still had a worker with no backlog, we kept sending
-requests to w1. But but now, w1 has reached it's maximum backlog. If
+requests to w1. This is because w1 hasn't reached it's maximum
+backlog. Also, it's score is greater than the min score, which
+defaults to 1.0. Let's reduce the maximum backlog to 5:
+
+ >>> settings['max_backlog'] = 5
+
+So now, w1 has reached it's maximum backlog. If
we make another foo request, we'll start using w3, and when that's
reached it's maximum backlog, we'll start using w2:
@@ -110,8 +121,8 @@
>>> pool
Request classes:
- bar: w2(10.0,3)
- foo: w1(10.0,5), w2(10.0,3), w3(10.0,5)
+ bar: w2(1.0,3)
+ foo: w1(1.0,5), w2(1.0,3), w3(1.0,5)
Backlogs:
3: [w2]
5: [w1, w3]
@@ -134,8 +145,8 @@
>>> pool.put(w3)
>>> pool
Request classes:
- bar: w2(10.0,4)
- foo: w1(10.0,2), w2(10.0,4), w3(10.0,3)
+ bar: w2(1.0,4)
+ foo: w1(1.0,2), w2(1.0,4), w3(1.0,3)
Backlogs:
2: [w1]
3: [w3]
@@ -146,22 +157,24 @@
>>> pool.get('foo', 0.0)
w1
-Why? We adjust each score by the worker's backlog.
+Why? We adjust each score by the worker's backlog, so even though all
+2 workers had the same score, w1 is chosen because it has the smallest
+backlog.
-Now that we've done some work, let's updaye the resumes. This will
-normally be done by workers after periodically collecting performance
+Now that we've done some work, let's update the resumes. This will
+normally be done by workers periodically, after collecting performance
data.
- >>> pool.new_resume(w1, {'foo': 30.0})
+ >>> pool.new_resume(w1, {'foo': 6.0})
- ;>>> pool.new_resume(w2, {'bar': 10.0, 'foo': 10.0})
+ >>> pool.new_resume(w2, {'bar': 2.0, 'foo': 2.0})
- >>> pool.new_resume(w3, {'foo': 19.0})
+ >>> pool.new_resume(w3, {'foo': 3.8})
>>> pool
Request classes:
- bar: w2(10.0,4)
- foo: w2(10.0,4), w3(19.0,3), w1(30.0,3)
+ bar: w2(2.0,4)
+ foo: w2(2.0,4), w3(3.8,3), w1(6.0,3)
Backlogs:
3: [w1, w3]
4: [w2]
@@ -173,8 +186,8 @@
>>> pool
Request classes:
- bar: w2(10.0,4)
- foo: w2(10.0,4), w3(19.0,3), w1(30.0,5)
+ bar: w2(2.0,4)
+ foo: w2(2.0,4), w3(3.8,3), w1(6.0,5)
Backlogs:
3: [w3]
4: [w2]
@@ -197,8 +210,8 @@
>>> pool.put(w3)
>>> pool
Request classes:
- bar: w2(10.0,5)
- foo: w2(10.0,5), w3(19.0,0), w1(30.0,4)
+ bar: w2(2.0,5)
+ foo: w2(2.0,5), w3(3.8,0), w1(6.0,4)
Backlogs:
0: [w3]
4: [w1]
@@ -207,13 +220,164 @@
>>> [pool.get('foo') for i in range(5)]
[w3, w3, w3, w1, w3]
+Pool settings
+=============
+
+There are several settings that effect pools:
+
+redundancy
+ Target number of workers for each request class, defaulting to 1.
+
+max_backlog
+ Maximum worker backlog, defaulting to 40.
+
+min_score
+ A worker won't be used if it has a backlog greater than 1 and it's
+ score is less than min_score.
+
+unskilled_score
+ The score assigned to workers when given a new request class. This
+ defaults to min_score.
+
+We've already seen max_backlog at work. Let's test the other
+settings.
+
+redundancy
+----------
+
+Given a redundancy, we can compute an expected number of request
+classes per worker (resumne size), which is the number of request
+classes divided by the number of workers times the redundancy. With
+special handling for no workers or request classes, this works out
+to::
+
+ 1 + redundancy * n_request_classes / max(nworkers, 1)
+
+When we get a new resume and it is larger than the expected size, we
+discard half of the excess number of items with the lowest score.
+Given the pool data:
+
+ >>> pool
+ Request classes:
+ bar: w2(2.0,5)
+ foo: w2(2.0,5), w3(3.8,4), w1(6.0,5)
+ Backlogs:
+ 4: [w3]
+ 5: [w1, w2]
+
+We see there are 2 request classes and 3 workers, so we expect one
+request class per worker.
+
+Let's add a new worker with a much larger resume:
+
+ >>> w4 = Worker('w4')
+ >>> pool.new_resume(w4, dict((str(i), float(i)) for i in range(9)))
+
+When we look at the pool, we see that 4 of the items were discarded:
+
+ >>> pool
+ Request classes:
+ 4: w4(4.0,0)
+ 5: w4(5.0,0)
+ 6: w4(6.0,0)
+ 7: w4(7.0,0)
+ 8: w4(8.0,0)
+ bar: w2(2.0,5)
+ foo: w2(2.0,5), w3(3.8,4), w1(6.0,5)
+ Backlogs:
+ 0: [w4]
+ 4: [w3]
+ 5: [w1, w2]
+
+Now we have 7 request classes and 4 workers. If we set redundancy to
+3, then the expected resume size is 6, so::
+
+ >>> settings['redundancy'] = 3
+ >>> pool.new_resume(w4, dict((str(i), float(i)) for i in range(9)))
+
+ >>> pool
+ Request classes:
+ 1: w4(1.0,0)
+ 2: w4(2.0,0)
+ 3: w4(3.0,0)
+ 4: w4(4.0,0)
+ 5: w4(5.0,0)
+ 6: w4(6.0,0)
+ 7: w4(7.0,0)
+ 8: w4(8.0,0)
+ bar: w2(2.0,5)
+ foo: w2(2.0,5), w3(3.8,4), w1(6.0,5)
+ Backlogs:
+ 0: [w4]
+ 4: [w3]
+ 5: [w1, w2]
+
+min_score
+---------
+
+min_score is mainly provided as a tool to balance work accross
+skills. The algorithm favors giving work to skilled workers.
+If one worker handles a large number of request classes, relative to
+other workers, it might perform sub-optimally, but if load is too low
+to force it to it's maximum backlog, it won't transfer work to other
+workers. min_score provides a tool to help with this. If a worker has
+a low score and only a modest backlog, it won't be used.
+
+To see this, let's reduce w3's backlog:
+
+ >>> pool.put(w3); pool.put(w3)
+
+but set the min_score to 4:
+
+ >>> settings['min_score'] = 4.0
+
+And the get a worker for foo:
+
+ >>> pool.get('foo')
+ w4
+ >>> pool
+ Request classes:
+ 1: w4(1.0,1)
+ 2: w4(2.0,1)
+ 3: w4(3.0,1)
+ 4: w4(4.0,1)
+ 5: w4(5.0,1)
+ 6: w4(6.0,1)
+ 7: w4(7.0,1)
+ 8: w4(8.0,1)
+ bar: w2(2.0,5)
+ foo: w2(2.0,5), w3(3.8,2), w4(4.0,1), w1(6.0,5)
+ Backlogs:
+ 1: [w4]
+ 2: [w3]
+ 5: [w1, w2]
+
+We get the unskilled w4 because w1 and w2 are at their maximum
+backlogs, and w3 has a backloh of 2 and a score of only 3.8.
+
+Note that w4 is assigned a skill for foo of 4, which is min_score.
+
+XXX It's unclear if min_score provides much, if any, benefit.
+
+Worker disconnect
+=================
+
When a worker disconnect, it's removed from the pool:
>>> pool.remove(w1)
>>> pool.remove(w3)
>>> pool
Request classes:
- bar: w2(10.0,5)
- foo: w2(10.0,5)
+ 1: w4(1.0,1)
+ 2: w4(2.0,1)
+ 3: w4(3.0,1)
+ 4: w4(4.0,1)
+ 5: w4(5.0,1)
+ 6: w4(6.0,1)
+ 7: w4(7.0,1)
+ 8: w4(8.0,1)
+ bar: w2(2.0,5)
+ foo: w2(2.0,5), w4(4.0,1)
Backlogs:
+ 1: [w4]
5: [w2]
More information about the checkins
mailing list