[Checkins] SVN: Sandbox/J1m/resumelb/s Updated simulation script to work with recent changes.
Jim Fulton
jim at zope.com
Tue Jan 31 12:50:13 UTC 2012
Log message for revision 124268:
Updated simulation script to work with recent changes.
Changed:
A Sandbox/J1m/resumelb/simul_setup.py
U Sandbox/J1m/resumelb/src/zc/resumelb/simul.py
-=-
Added: Sandbox/J1m/resumelb/simul_setup.py
===================================================================
--- Sandbox/J1m/resumelb/simul_setup.py (rev 0)
+++ Sandbox/J1m/resumelb/simul_setup.py 2012-01-31 12:50:12 UTC (rev 124268)
@@ -0,0 +1,20 @@
+import zc.zk
+
+zk = zc.zk.ZooKeeper()
+
+zk.import_tree("""
+/simul
+ cache_size=20000
+ clients=12
+ lambda=0.01
+ objects_per_site=1000
+ objects_per_request=100
+ sites=40
+ workers=2
+ /lb
+ /providers
+ /workers
+ history=999
+ threads=1
+ /providers
+""", trim=True)
Property changes on: Sandbox/J1m/resumelb/simul_setup.py
___________________________________________________________________
Added: svn:keywords
+ Id
Added: svn:eol-style
+ native
Modified: Sandbox/J1m/resumelb/src/zc/resumelb/simul.py
===================================================================
--- Sandbox/J1m/resumelb/src/zc/resumelb/simul.py 2012-01-31 12:50:09 UTC (rev 124267)
+++ Sandbox/J1m/resumelb/src/zc/resumelb/simul.py 2012-01-31 12:50:12 UTC (rev 124268)
@@ -29,13 +29,13 @@
- history lb worker history, which controls how many requests to
perform between resume updates.
-- sim_cache_size Size of client lru cache
-- sim_clients # concurrent requests
-- sim_lambda exponential distribution parameter for selecting sites (.1)
-- sim_objects_per_site average number of objects per site.
-- sim_objects_per_request
-- sim_sites number of sites
-- sim_workers number of workers
+- cache_size Size of client lru cache
+- clients # concurrent requests
+- lambda exponential distribution parameter for selecting sites (.1)
+- objects_per_site average number of objects per site.
+- objects_per_request
+- sites number of sites
+- workers number of workers
"""
from pprint import pprint
@@ -52,6 +52,7 @@
import zc.thread
import zc.zk
import zookeeper
+import zope.testing.wait
logger = logging.getLogger(__name__)
@@ -89,14 +90,14 @@
def __init__(self, properties):
settings = zc.mappingobject.mappingobject(properties)
- self.cache_size = settings.sim_cache_size
+ self.cache_size = settings.cache_size
self.cache = pylru.lrucache(self.cache_size)
self.hitrates = Sample()
@properties
def changed(*a):
- if settings.sim_cache_size != self.cache_size:
- self.cache_size = settings.sim_cache_size
+ if settings.cache_size != self.cache_size:
+ self.cache_size = settings.cache_size
self.cache.size(self.cache_size)
def __call__(self, environ, start_response):
@@ -122,6 +123,9 @@
start_response('200 OK', response_headers)
if n:
self.hitrates.add(100.0*nhit/n)
+ if self.hitrates.n % 1000 == 0:
+ print os.getpid(), 'hitrate', self.hitrates
+
return [result]
def worker(path):
@@ -129,38 +133,27 @@
logging.basicConfig()
logger = logging.getLogger(__name__+'-worker')
try:
- import zc.resumelb.worker
- import zc.zk
-
+ import zc.resumelb.zk
zk = zc.zk.ZooKeeper()
- lbpath = path + '/lb'
- while not (zk.exists(lbpath) and zk.get_children(lbpath)):
- time.sleep(.01)
- [lbaddr] = zk.get_children(lbpath)
-
properties = zk.properties(path)
-
- class Worker(zc.resumelb.worker.Worker):
-
- def new_resume(self, resume):
- print '\nNEW RESUME:', len(resume), os.getpid(), time.ctime()
- pprint(resume)
- stats = dict(hitrate=str(app.hitrates))
- stats.update(resume)
- zk.set(worker_path, json.dumps(stats))
- zc.resumelb.worker.Worker.new_resume(self, resume)
-
- worker_path = path + '/workers/%s' % os.getpid()
-
- zk.create(worker_path, '',
- zc.zk.OPEN_ACL_UNSAFE, zookeeper.EPHEMERAL)
-
app = App(properties)
- Worker(app, zc.parse_addr.parse_addr(lbaddr), properties)
+ resume_file = 'resume%s.mar' % os.getpid()
+ if os.path.exists(resume_file):
+ os.remove(resume_file)
+ zc.resumelb.zk.worker(
+ app, {},
+ zookeeper='127.0.0.1:2181',
+ path=path+'/lb/workers',
+ address='127.0.0.1:0',
+ resume_file=resume_file,
+ )
except:
logger.exception('worker')
def clients(path):
+ logging.basicConfig()
+ random.seed(0)
+
import zc.zk
zk = zc.zk.ZooKeeper()
@@ -171,27 +164,25 @@
@properties
def _(*a):
- n = settings.sim_sites
+ n = settings.sites
siteids[:] = [0]
for i in range(4):
if n:
siteids.extend(range(n))
n /= 2
- logging.basicConfig()
+ wpath = path + '/lb/providers'
+ zope.testing.wait.wait(lambda : zk.get_children(wpath))
- wpath = path + '/wsgi'
- while not (zk.exists(wpath) and zk.get_children(wpath)):
- time.sleep(.01)
[waddr] = zk.get_children(wpath)
waddr = zc.parse_addr.parse_addr(waddr)
stats = zc.mappingobject.mappingobject(dict(
- sim_truncated = 0,
- sim_requests = 0,
- sim_bypid = {},
- sim_nobs = 0,
- sim_nhits = 0,
+ truncated = 0,
+ requests = 0,
+ bypid = {},
+ nobs = 0,
+ nhits = 0,
))
spath = path + '/stats'
@@ -203,8 +194,8 @@
def do_request():
siteid = random.choice(siteids)
oids = set(
- int(random.gauss(0, settings.sim_objects_per_site/4))
- for i in range(settings.sim_objects_per_request)
+ int(random.gauss(0, settings.objects_per_site/4))
+ for i in range(settings.objects_per_request)
)
socket = gevent.socket.create_connection(waddr)
try:
@@ -218,7 +209,7 @@
while '\r\n\r\n' not in response:
data = socket.recv(9999)
if not data:
- stats.sim_truncated += 1
+ stats.truncated += 1
return
response += data
headers, body = response.split('\r\n\r\n')
@@ -230,22 +221,22 @@
while len(body) < content_length:
data = socket.recv(9999)
if not data:
- stats.sim_truncated += 1
+ stats.truncated += 1
return
body += data
pid, n, nhit, nmiss, nevict = map(int, body.strip().split())
- stats.sim_requests += 1
- stats.sim_nobs += n
- stats.sim_nhits += nhit
- bypid = stats.sim_bypid.get(pid)
+ stats.requests += 1
+ stats.nobs += n
+ stats.nhits += nhit
+ bypid = stats.bypid.get(pid)
if bypid is None:
- bypid = stats.sim_bypid[pid] = dict(nr=0, n=0, nhit=0)
+ bypid = stats.bypid[pid] = dict(nr=0, n=0, nhit=0)
bypid['nr'] += 1
bypid['n'] += n
bypid['nhit'] += nhit
logger.info(' '.join(map(str, (
- 100*stats.sim_nhits/stats.sim_nobs,
+ 100*stats.nhits/stats.nobs,
pid, n, nhit, 100*nhit/n,
))))
finally:
@@ -259,25 +250,24 @@
print 'client error'
logging.getLogger(__name__+'-client').exception('client')
- greenlets = [gevent.spawn(client) for i in range(settings.sim_clients)]
+ greenlets = [gevent.spawn(client) for i in range(settings.clients)]
- import gevent.queue, zc.resumelb.thread
- update_queue = gevent.queue.Queue()
- @properties
- def update(*a):
- print 'put update'
- update_queue.put(None)
- zc.resumelb.thread.wake_gevent()
-
- while 1:
- update_queue.get()
+ # Set up notification of address changes.
+ watcher = gevent.get_hub().loop.async()
+ @watcher.start
+ def _():
print 'got update event'
- while settings.sim_clients > len(greenlets):
+ while settings.clients > len(greenlets):
greenlets.append(gevent.spawn(client))
- while settings.sim_clients < len(greenlets):
+ while settings.clients < len(greenlets):
greenlets.pop().kill()
+ properties(lambda a: watcher.send())
+
+ while 1:
+ gevent.sleep(60.0)
+
request_template = """GET /%(data)s HTTP/1.1\r
Host: %(host)s\r
\r
@@ -285,12 +275,9 @@
class LBLogger:
- def __init__(self, lb, zk, path):
- self.lb = lb
+ def __init__(self):
self.requests = Sample()
self.nr = self.requests.n
- self.zk = zk
- self.path = path
self.then = time.time()
def write(self, line):
@@ -328,57 +315,36 @@
[path] = args
logging.basicConfig()
- random.seed(0)
+ zk = zc.zk.ZooKeeper()
+ properties = zk.properties(path)
+ settings = zc.mappingobject.mappingobject(properties)
+ workers = [zc.thread.Process(worker, args=(path,))
+ for i in range(settings.workers)]
+
@zc.thread.Process(args=(path,))
def lb(path):
import logging
logging.basicConfig()
logger = logging.getLogger(__name__+'-lb')
try:
- import zc.resumelb.lb
- import gevent.pywsgi
- zk = zc.zk.ZooKeeper()
- lb = zc.resumelb.lb.LB(
- ('127.0.0.1', 0), zc.resumelb.lb.host_classifier,
- settings=zk.properties(path))
- lbpath = path + '/lb'
- if not zk.exists(lbpath):
- zk.create(lbpath, '', zc.zk.OPEN_ACL_UNSAFE)
- zk.register_server(
- lbpath, ('127.0.0.1', lb.worker_server.server_port))
-
- wsgi_server = gevent.pywsgi.WSGIServer(
- ('127.0.0.1', 0), lb.handle_wsgi, log=LBLogger(lb, zk, lbpath),
- )
- wsgi_server.start()
- wpath = path + '/wsgi'
- if not zk.exists(wpath):
- zk.create(wpath, '', zc.zk.OPEN_ACL_UNSAFE)
- zk.register_server(wpath, ('127.0.0.1', wsgi_server.server_port))
- wsgi_server.serve_forever()
+ import zc.resumelb.zk
+ lb, server, logger = zc.resumelb.zk.lbmain(
+ ['-a127.0.0.1:0', '-l', LBLogger(),
+ '127.0.0.1:2181', '/simul/lb'],
+ run=False)
+ logger.lb = lb
+ server.serve_forever()
except:
logger.exception('lb')
- zk = zc.zk.ZooKeeper()
-
- workers_path = path + '/workers'
- if not zk.exists(workers_path):
- zk.create(workers_path, '', zc.zk.OPEN_ACL_UNSAFE)
-
- properties = zk.properties(path)
- settings = zc.mappingobject.mappingobject(properties)
-
- workers = [zc.thread.Process(worker, args=(path,))
- for i in range(settings.sim_workers)]
-
clients_process = zc.thread.Process(clients, args=(path,))
@properties
def update(*a):
- while settings.sim_workers > len(workers):
+ while settings.workers > len(workers):
workers.append(zc.thread.Process(worker, args=(path,)))
- while settings.sim_workers < len(workers):
+ while settings.workers < len(workers):
workers.pop().terminate()
threading.Event().wait() # sleep forever
More information about the checkins
mailing list