[Zodb-checkins] CVS: Packages/ZEO - multi.py:1.1.2.1 testZEO.py:1.1.2.6
jeremy@digicool.com
jeremy@digicool.com
Wed, 25 Apr 2001 17:05:43 -0400 (EDT)
Update of /cvs-repository/Packages/ZEO/tests
In directory korak:/tmp/cvs-serv30414
Modified Files:
Tag: ZEO-ZRPC-Dev
testZEO.py
Added Files:
Tag: ZEO-ZRPC-Dev
multi.py
Log Message:
Add optional argument to testZEO that allows you to limit the tests
that are run. A bit of a hack.
Add multi.py a test of multiple clients using the same server (and the
same storage on that server)
--- Added File multi.py in package Packages/ZEO ---
"""A multi-client test of the ZEO storage server"""
import ZODB, ZODB.DB, ZODB.FileStorage, ZODB.POSException
import Persistence
import PersistentMapping
import asyncore
import os
import tempfile
import time
import types
VERBOSE = 1
CLIENTS = 4
RECORDS_PER_CLIENT = 100
CONFLICT_DELAY = 0.1
CONNECT_DELAY = 0.1
CLIENT_CACHE = '' # use temporary cache
class Record(Persistence.Persistent):
def __init__(self, client=None, value=None):
self.client = client
self.value = None
self.next = None
def set_next(self, next):
self.next = next
class Stats(Persistence.Persistent):
def __init__(self):
self.begin = time.time()
self.end = None
def done(self):
self.end = time.time()
def init_storage():
path = tempfile.mktemp()
if VERBOSE:
print "FileStorage path:", path
fs = ZODB.FileStorage.FileStorage(path)
db = ZODB.DB(fs)
root = db.open().root()
root["multi"] = PersistentMapping.PersistentMapping()
get_transaction().commit()
return fs
def start_server(addr):
pid = os.fork()
if pid == 0:
import ZEO.StorageServer
storage = init_storage()
server = ZEO.StorageServer.StorageServer(addr, {'1': storage})
print "Server ready:", os.getpid()
asyncore.loop()
os._exit(0)
else:
return pid
def start_client(addr):
pid = os.fork()
if pid == 0:
import ZEO.ClientStorage
if VERBOSE:
print "Client process started:", os.getpid()
cli = ZEO.ClientStorage.ClientStorage(addr, client=CLIENT_CACHE)
run(cli)
cli.close()
os._exit(0)
else:
return pid
def run(storage):
if hasattr(storage, 'is_connected'):
while not storage.is_connected():
time.sleep(CONNECT_DELAY)
pid = os.getpid()
print "Client process connected:", pid, storage
db = ZODB.DB(storage)
root = db.open().root()
while 1:
try:
s = root[pid] = Stats()
get_transaction().commit()
except ZODB.POSException.ConflictError:
get_transaction().abort()
time.sleep(CONFLICT_DELAY)
else:
break
dict = root["multi"]
prev = None
i = 0
while i < RECORDS_PER_CLIENT:
time.sleep(CONFLICT_DELAY)
try:
size = len(dict)
r = dict[size] = Record(pid, size)
if prev:
prev.set_next(r)
get_transaction().commit()
except ZODB.POSException.ConflictError, err:
## print "ConflictError: %s: %s" % (pid, err)
get_transaction().abort()
time.sleep(CONFLICT_DELAY)
else:
i = i + 1
if VERBOSE and (i < 5 or i % 10 == 0):
print "Client %s: %s of %s" % (pid, i, RECORDS_PER_CLIENT)
s.done()
get_transaction().commit()
print "Client completed:", pid
def shutdown_server(addr):
import ZEO.ClientStorage
cli = ZEO.ClientStorage.ClientStorage(addr)
cli._server.rpc.callAsync('shutdown')
def main():
if VERBOSE:
print "Main process:", os.getpid()
addr = tempfile.mktemp()
t0 = time.time()
server_pid = start_server(addr)
t1 = time.time()
pids = [start_client(addr) for i in range(CLIENTS)]
for pid in pids:
assert type(pid) == types.IntType, "invalid pid type: %s (%s)" % \
(repr(pid), type(pid))
try:
if VERBOSE:
print "waitpid(%s)" % repr(pid)
os.waitpid(pid, 0)
except os.error, err:
print "waitpid(%s) failed: %s" % (repr(pid), err)
t2 = time.time()
shutdown_server(addr)
os.waitpid(server_pid, 0)
print "Total time:", t2 - t0
print "Server start time", t1 - t0
print "Client time:", t2 - t1
if __name__ == "__main__":
main()
--- Updated File testZEO.py in package Packages/ZEO --
--- testZEO.py 2001/04/20 19:18:52 1.1.2.5
+++ testZEO.py 2001/04/25 21:05:42 1.1.2.6
@@ -2,7 +2,6 @@
import asyncore
import os
-import sys
import tempfile
import time
import types
@@ -163,7 +162,20 @@
os.unlink(path)
def main():
- tests = unittest.makeSuite(ZEOFileStorageTests, 'check')
+ import sys, getopt
+
+ name_of_test = ''
+
+ opts, args = getopt.getopt(sys.argv[1:], 'n:')
+ for flag, val in opts:
+ if flag == '-n':
+ name_of_test = val
+
+ if args:
+ print >> sys.stderr, "Did not expect arguments. Got %s" % args
+ return 0
+
+ tests = unittest.makeSuite(ZEOFileStorageTests, 'check' + name_of_test)
runner = unittest.TextTestRunner()
runner.run(tests)