[Zope3-checkins] CVS: Zope3/src/zodb/zeo/tests - __init__.py:1.1.2.1 cache.py:1.1.2.1 commitlock.py:1.1.2.1 connection.py:1.1.2.1 forker.py:1.1.2.1 multi.py:1.1.2.1 speed.py:1.1.2.1 stress.py:1.1.2.1 test_cache.py:1.1.2.1 test_conn.py:1.1.2.1 test_tbuf.py:1.1.2.1 test_zeo.py:1.1.2.1 threadtests.py:1.1.2.1 zeoserver.py:1.1.2.1
Jim Fulton
jim@zope.com
Mon, 23 Dec 2002 14:30:55 -0500
Update of /cvs-repository/Zope3/src/zodb/zeo/tests
In directory cvs.zope.org:/tmp/cvs-serv19908/zodb/zeo/tests
Added Files:
Tag: NameGeddon-branch
__init__.py cache.py commitlock.py connection.py forker.py
multi.py speed.py stress.py test_cache.py test_conn.py
test_tbuf.py test_zeo.py threadtests.py zeoserver.py
Log Message:
Initial renaming before debugging
=== Added File Zope3/src/zodb/zeo/tests/__init__.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
=== Added File Zope3/src/zodb/zeo/tests/cache.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Tests of the ZEO cache"""
from zodb.ztransaction import Transaction
from zodb.storage.tests.minpo import MinPO
from zodb.storage.tests.base import zodb_unpickle
class TransUndoStorageWithCache:
def checkUndoInvalidation(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(23))
revid = self._dostore(oid, revid=revid, data=MinPO(24))
revid = self._dostore(oid, revid=revid, data=MinPO(25))
info = self._storage.undoInfo()
if not info:
# XXX perhaps we have an old storage implementation that
# does do the negative nonsense
info = self._storage.undoInfo(0, 20)
tid = info[0]['id']
# We may need to bail at this point if the storage doesn't
# support transactional undo
if not self._storage.supportsTransactionalUndo():
return
# Now start an undo transaction
t = Transaction()
t.note('undo1')
self._storage.tpc_begin(t)
oids = self._storage.transactionalUndo(tid, t)
# Make sure this doesn't load invalid data into the cache
self._storage.load(oid, '')
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
assert len(oids) == 1
assert oids[0] == oid
data, revid = self._storage.load(oid, '')
obj = zodb_unpickle(data)
assert obj == MinPO(24)
class StorageWithCache:
def checkAbortVersionInvalidation(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(1))
revid = self._dostore(oid, revid=revid, data=MinPO(2))
revid = self._dostore(oid, revid=revid, data=MinPO(3), version="foo")
revid = self._dostore(oid, revid=revid, data=MinPO(4), version="foo")
t = Transaction()
self._storage.tpc_begin(t)
self._storage.abortVersion("foo", t)
self._storage.load(oid, "foo")
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
data, revid = self._storage.load(oid, "foo")
obj = zodb_unpickle(data)
assert obj == MinPO(2), obj
def checkCommitEmptyVersionInvalidation(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(1))
revid = self._dostore(oid, revid=revid, data=MinPO(2))
revid = self._dostore(oid, revid=revid, data=MinPO(3), version="foo")
t = Transaction()
self._storage.tpc_begin(t)
self._storage.commitVersion("foo", "", t)
self._storage.load(oid, "")
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
data, revid = self._storage.load(oid, "")
obj = zodb_unpickle(data)
assert obj == MinPO(3), obj
def checkCommitVersionInvalidation(self):
oid = self._storage.new_oid()
revid = self._dostore(oid, data=MinPO(1))
revid = self._dostore(oid, revid=revid, data=MinPO(2))
revid = self._dostore(oid, revid=revid, data=MinPO(3), version="foo")
t = Transaction()
self._storage.tpc_begin(t)
self._storage.commitVersion("foo", "bar", t)
self._storage.load(oid, "")
self._storage.tpc_vote(t)
self._storage.tpc_finish(t)
data, revid = self._storage.load(oid, "bar")
obj = zodb_unpickle(data)
assert obj == MinPO(3), obj
=== Added File Zope3/src/zodb/zeo/tests/commitlock.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Tests of the distributed commit lock."""
import threading
import time
from zodb.ztransaction import Transaction
from zodb.timestamp import TimeStamp
from zodb.storage.tests.base import zodb_pickle, MinPO
import zodb.zeo.client
from zodb.zeo.exceptions import Disconnected
ZERO = '\0'*8
class DummyDB:
def invalidate(self, *args):
pass
class TestThread(threading.Thread):
__super_init = threading.Thread.__init__
__super_run = threading.Thread.run
def __init__(self, testcase, group=None, target=None, name=None,
args=(), kwargs={}, verbose=None):
self.__super_init(group, target, name, args, kwargs, verbose)
self.setDaemon(1)
self._testcase = testcase
def run(self):
try:
self.testrun()
except Exception:
s = StringIO()
traceback.print_exc(file=s)
self._testcase.fail("Exception in thread %s:\n%s\n" %
(self, s.getvalue()))
def cleanup(self, timeout=15):
self.join(timeout)
if self.isAlive():
self._testcase.fail("Thread did not finish: %s" % self)
class WorkerThread(TestThread):
# run the entire test in a thread so that the blocking call for
# tpc_vote() doesn't hang the test suite.
def __init__(self, testcase, storage, trans, method="tpc_finish"):
self.storage = storage
self.trans = trans
self.method = method
self.ready = threading.Event()
TestThread.__init__(self, testcase)
def testrun(self):
try:
self.storage.tpc_begin(self.trans)
oid = self.storage.new_oid()
p = zodb_pickle(MinPO("c"))
self.storage.store(oid, ZERO, p, '', self.trans)
oid = self.storage.new_oid()
p = zodb_pickle(MinPO("c"))
self.storage.store(oid, ZERO, p, '', self.trans)
self.ready.set()
self.storage.tpc_vote(self.trans)
if self.method == "tpc_finish":
self.storage.tpc_finish(self.trans)
else:
self.storage.tpc_abort(self.trans)
except Disconnected:
pass
class CommitLockTests:
# The commit lock tests verify that the storage successfully
# blocks and restarts transactions when there is content for a
# single storage. There are a lot of cases to cover.
# CommitLock1 checks the case where a single transaction delays
# other transactions before they actually block. IOW, by the time
# the other transactions get to the vote stage, the first
# transaction has finished.
def checkCommitLock1OnCommit(self):
self._storages = []
try:
self._checkCommitLock("tpc_finish", self._dosetup1, self._dowork1)
finally:
self._cleanup()
def checkCommitLock1OnAbort(self):
self._storages = []
try:
self._checkCommitLock("tpc_abort", self._dosetup1, self._dowork1)
finally:
self._cleanup()
def checkCommitLock2OnCommit(self):
self._storages = []
try:
self._checkCommitLock("tpc_finish", self._dosetup2, self._dowork2)
finally:
self._cleanup()
def checkCommitLock2OnAbort(self):
self._storages = []
try:
self._checkCommitLock("tpc_abort", self._dosetup2, self._dowork2)
finally:
self._cleanup()
def _cleanup(self):
for store, trans in self._storages:
store.tpc_abort(trans)
store.close()
self._storages = []
def _checkCommitLock(self, method_name, dosetup, dowork):
# check the commit lock when a client attemps a transaction,
# but fails/exits before finishing the commit.
# The general flow of these tests is to start a transaction by
# calling tpc_begin(). Then begin one or more other
# connections that also want to commit. This causes the
# commit lock code to be exercised. Once the other
# connections are started, the first transaction completes.
# Either by commit or abort, depending on whether method_name
# is "tpc_finish."
# The tests are parameterized by method_name, dosetup(), and
# dowork(). The dosetup() function is called with a
# connectioned client storage, transaction, and timestamp.
# Any work it does occurs after the first transaction has
# started, but before it finishes. The dowork() function
# executes after the first transaction has completed.
# Start on transaction normally and get the lock.
t = Transaction()
self._storage.tpc_begin(t)
oid = self._storage.new_oid()
self._storage.store(oid, ZERO, zodb_pickle(MinPO(1)), '', t)
self._storage.tpc_vote(t)
# Start a second transaction on a different connection without
# blocking the test thread.
self._storages = []
for i in range(4):
storage2 = self._duplicate_client()
t2 = Transaction()
tid = self._get_timestamp()
dosetup(storage2, t2, tid)
if i == 0:
storage2.close()
else:
self._storages.append((storage2, t2))
if method_name == "tpc_finish":
self._storage.tpc_finish(t)
self._storage.load(oid, '')
else:
self._storage.tpc_abort(t)
dowork(method_name)
# Make sure the server is still responsive
self._dostore()
def _dosetup1(self, storage, trans, tid):
storage.tpc_begin(trans, tid)
def _dowork1(self, method_name):
for store, trans in self._storages:
oid = store.new_oid()
store.store(oid, ZERO, zodb_pickle(MinPO("c")), '', trans)
store.tpc_vote(trans)
if method_name == "tpc_finish":
store.tpc_finish(trans)
else:
store.tpc_abort(trans)
def _dosetup2(self, storage, trans, tid):
self._threads = []
t = WorkerThread(self, storage, trans)
self._threads.append(t)
t.start()
t.ready.wait()
def _dowork2(self, method_name):
for t in self._threads:
t.cleanup()
def _duplicate_client(self):
"Open another ClientStorage to the same server."
# XXX argh it's hard to find the actual address
# The rpc mgr addr attribute is a list. Each element in the
# list is a socket domain (AF_INET, AF_UNIX, etc.) and an
# address.
addr = self._storage._addr
new = ZEO.ClientStorage.ClientStorage(addr, wait=1)
new.registerDB(DummyDB())
return new
def _get_timestamp(self):
t = time.time()
ts = TimeStamp(*(time.gmtime(t)[:5] + (t % 60,)))
return ts.raw()
=== Added File Zope3/src/zodb/zeo/tests/connection.py === (530/630 lines abridged)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
import sys
import time
import random
import select
import socket
import asyncore
import tempfile
import threading
import logging
from zodb.zeo.client import ClientStorage
from zodb.zeo.exceptions import Disconnected
from zodb.zeo.zrpc.marshal import Marshaller
from zodb.zeo.tests import forker
from transaction import get_transaction
from zodb.interfaces import ReadOnlyError
from zodb.ztransaction import Transaction
from zodb.storage.tests.base import StorageTestBase
from zodb.storage.tests.minpo import MinPO
from zodb.storage.tests.base import zodb_pickle, zodb_unpickle
from zodb.storage.tests.base import handle_all_serials, ZERO
class DummyDB:
def invalidate(self, *args, **kws):
pass
class CommonSetupTearDown(StorageTestBase):
"""Tests that explicitly manage the server process.
To test the cache or re-connection, these test cases explicit
start and stop a ZEO storage server.
"""
[-=- -=- -=- 530 lines omitted -=- -=- -=-]
# expanded in-line (mostly).
# Create oid->serial mappings
for c in clients:
c.__oids = []
c.__serials = {}
# Begin a transaction
t = Transaction()
for c in clients:
#print "%s.%s.%s begin\n" % (tname, c.__name, i),
c.tpc_begin(t)
for j in range(testcase.nobj):
for c in clients:
# Create and store a new object on each server
oid = c.new_oid()
c.__oids.append(oid)
data = MinPO("%s.%s.t%d.o%d" % (tname, c.__name, i, j))
#print data.value
data = zodb_pickle(data)
s = c.store(oid, ZERO, data, '', t)
c.__serials.update(handle_all_serials(oid, s))
# Vote on all servers and handle serials
for c in clients:
#print "%s.%s.%s vote\n" % (tname, c.__name, i),
s = c.tpc_vote(t)
c.__serials.update(handle_all_serials(None, s))
# Finish on all servers
for c in clients:
#print "%s.%s.%s finish\n" % (tname, c.__name, i),
c.tpc_finish(t)
for c in clients:
# Check that we got serials for all oids
for oid in c.__oids:
testcase.failUnless(c.__serials.has_key(oid))
# Check that we got serials for no other oids
for oid in c.__serials.keys():
testcase.failUnless(oid in c.__oids)
def closeclients(self):
# Close clients opened by run()
for c in self.clients:
try:
c.close()
except:
pass
=== Added File Zope3/src/zodb/zeo/tests/forker.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Library for forking storage server and connecting client storage"""
import os
import sys
import time
import errno
import random
import socket
import tempfile
import traceback
import logging
# Change value of PROFILE to enable server-side profiling
PROFILE = False
if PROFILE:
import hotshot
def get_port():
"""Return a port that is not in use.
Checks if a port is in use by trying to connect to it. Assumes it
is not in use if connect raises an exception.
Raises RuntimeError after 10 tries.
"""
for i in range(10):
port = random.randrange(20000, 30000)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
try:
s.connect(('localhost', port))
except socket.error:
# XXX check value of error?
return port
finally:
s.close()
raise RuntimeError, "Can't find port"
def start_zeo_server(conf, addr=None, ro_svr=False, keep=False):
"""Start a ZEO server in a separate process.
Returns the ZEO port, the test server port, and the pid.
"""
# Store the config info in a temp file.
tmpfile = tempfile.mktemp()
fp = open(tmpfile, 'w')
fp.write(conf)
fp.close()
# Create the server
import ZEO.tests.zeoserver
if addr is None:
port = get_port()
else:
port = addr[1]
script = ZEO.tests.zeoserver.__file__
if script.endswith('.pyc'):
script = script[:-1]
# Create a list of arguments, which we'll tuplify below
args = [sys.executable, script, '-C', tmpfile]
if ro_svr:
args.append('-r')
if keep:
args.append('-k')
args.append(str(port))
d = os.environ.copy()
d['PYTHONPATH'] = os.pathsep.join(sys.path)
pid = os.spawnve(os.P_NOWAIT, sys.executable, tuple(args), d)
adminaddr = ('localhost', port+1)
# We need to wait until the server starts, but not forever
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
for i in range(5):
try:
logging.debug('forker: connect %s', i)
s.connect(adminaddr)
ack = s.recv(1024)
logging.debug('forker: acked: %s', ack)
break
except socket.error, e:
if e[0] <> errno.ECONNREFUSED: raise
time.sleep(1)
else:
logging.debug('forker: boo hoo')
raise
return ('localhost', port), adminaddr, pid
def shutdown_zeo_server(adminaddr):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(adminaddr)
try:
ack = s.recv(1024)
except socket.error, e:
if e[0] <> errno.ECONNRESET: raise
ack = 'no ack received'
logging.debug('shutdownServer: acked: %s', ack)
s.close()
=== Added File Zope3/src/zodb/zeo/tests/multi.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A multi-client test of the ZEO storage server"""
# XXX This code is currently broken.
import ZODB, ZODB.DB, ZODB.FileStorage, ZODB.POSException
import persistence
import PersistentMapping
from zodb.zeo.tests import forker
import asyncore
import os
import tempfile
import time
import types
VERBOSE = 1
CLIENTS = 4
RECORDS_PER_CLIENT = 100
CONFLICT_DELAY = 0.1
CONNECT_DELAY = 0.1
CLIENT_CACHE = '' # use temporary cache
class Record(Persistence.Persistent):
def __init__(self, client=None, value=None):
self.client = client
self.value = None
self.next = None
def set_next(self, next):
self.next = next
class Stats(Persistence.Persistent):
def __init__(self):
self.begin = time.time()
self.end = None
def done(self):
self.end = time.time()
def init_storage():
path = tempfile.mktemp()
if VERBOSE:
print "FileStorage path:", path
fs = ZODB.FileStorage.FileStorage(path)
db = ZODB.DB(fs)
root = db.open().root()
root["multi"] = PersistentMapping.PersistentMapping()
get_transaction().commit()
return fs
def start_server(addr):
storage = init_storage()
pid, exit = forker.start_zeo_server(storage, addr)
return pid, exit
def start_client(addr, client_func=None):
pid = os.fork()
if pid == 0:
try:
import ZEO.ClientStorage
if VERBOSE:
print "Client process started:", os.getpid()
cli = ZEO.ClientStorage.ClientStorage(addr, client=CLIENT_CACHE)
if client_func is None:
run(cli)
else:
client_func(cli)
cli.close()
finally:
os._exit(0)
else:
return pid
def run(storage):
if hasattr(storage, 'is_connected'):
while not storage.is_connected():
time.sleep(CONNECT_DELAY)
pid = os.getpid()
print "Client process connected:", pid, storage
db = ZODB.DB(storage)
root = db.open().root()
while 1:
try:
s = root[pid] = Stats()
get_transaction().commit()
except ZODB.POSException.ConflictError:
get_transaction().abort()
time.sleep(CONFLICT_DELAY)
else:
break
dict = root["multi"]
prev = None
i = 0
while i < RECORDS_PER_CLIENT:
try:
size = len(dict)
r = dict[size] = Record(pid, size)
if prev:
prev.set_next(r)
get_transaction().commit()
except ZODB.POSException.ConflictError, err:
get_transaction().abort()
time.sleep(CONFLICT_DELAY)
else:
i = i + 1
if VERBOSE and (i < 5 or i % 10 == 0):
print "Client %s: %s of %s" % (pid, i, RECORDS_PER_CLIENT)
s.done()
get_transaction().commit()
print "Client completed:", pid
def main(client_func=None):
if VERBOSE:
print "Main process:", os.getpid()
addr = tempfile.mktemp()
t0 = time.time()
server_pid, server = start_server(addr)
t1 = time.time()
pids = []
for i in range(CLIENTS):
pids.append(start_client(addr, client_func))
for pid in pids:
assert type(pid) == types.IntType, "invalid pid type: %s (%s)" % \
(repr(pid), type(pid))
try:
if VERBOSE:
print "waitpid(%s)" % repr(pid)
os.waitpid(pid, 0)
except os.error, err:
print "waitpid(%s) failed: %s" % (repr(pid), err)
t2 = time.time()
server.close()
os.waitpid(server_pid, 0)
# XXX Should check that the results are consistent!
print "Total time:", t2 - t0
print "Server start time", t1 - t0
print "Client time:", t2 - t1
if __name__ == "__main__":
main()
=== Added File Zope3/src/zodb/zeo/tests/speed.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
usage="""Test speed of a ZODB storage
Options:
-d file The data file to use as input.
The default is this script.
-n n The number of repititions
-s module A module that defines a 'Storage'
attribute, which is an open storage.
If not specified, a FileStorage will ne
used.
-z Test compressing data
-D Run in debug mode
-L Test loads as well as stores by minimizing
the cache after eachrun
-M Output means only
-C Run with a persistent client cache
-U Run ZEO using a Unix domain socket
-t n Number of concurrent threads to run.
"""
import asyncore
import sys, os, getopt, string, time
##sys.path.insert(0, os.getcwd())
import ZODB, ZODB.FileStorage
import persistence
import ZEO.ClientStorage, ZEO.StorageServer
from zodb.zeo.tests import forker
from zodb.interfaces import ConflictError
class P(Persistence.Persistent):
pass
fs_name = "zeo-speed.fs"
class ZEOExit(asyncore.file_dispatcher):
"""Used to exit ZEO.StorageServer when run is done"""
def writable(self):
return 0
def readable(self):
return 1
def handle_read(self):
buf = self.recv(4)
assert buf == "done"
self.delete_fs()
os._exit(0)
def handle_close(self):
print "Parent process exited unexpectedly"
self.delete_fs()
os._exit(0)
def delete_fs(self):
os.unlink(fs_name)
os.unlink(fs_name + ".lock")
os.unlink(fs_name + ".tmp")
def work(db, results, nrep, compress, data, detailed, minimize, threadno=None):
for j in range(nrep):
for r in 1, 10, 100, 1000:
t = time.time()
conflicts = 0
jar = db.open()
while 1:
try:
get_transaction().begin()
rt = jar.root()
key = 's%s' % r
if rt.has_key(key):
p = rt[key]
else:
rt[key] = p =P()
for i in range(r):
v = getattr(p, str(i), P())
if compress is not None:
v.d = compress(data)
else:
v.d = data
setattr(p, str(i), v)
get_transaction().commit()
except ConflictError:
conflicts = conflicts + 1
else:
break
jar.close()
t = time.time() - t
if detailed:
if threadno is None:
print "%s\t%s\t%.4f\t%d" % (j, r, t, conflicts)
else:
print "%s\t%s\t%.4f\t%d\t%d" % (j, r, t, conflicts,
threadno)
results[r].append((t, conflicts))
rt=d=p=v=None # release all references
if minimize:
time.sleep(3)
jar.cacheMinimize(3)
def main(args):
opts, args = getopt.getopt(args, 'zd:n:Ds:LMt:U')
s = None
compress = None
data=sys.argv[0]
nrep=5
minimize=0
detailed=1
cache = None
domain = 'AF_INET'
threads = 1
for o, v in opts:
if o=='-n': nrep = int(v)
elif o=='-d': data = v
elif o=='-s': s = v
elif o=='-z':
import zlib
compress = zlib.compress
elif o=='-L':
minimize=1
elif o=='-M':
detailed=0
elif o=='-D':
global debug
os.environ['STUPID_LOG_FILE']=''
os.environ['STUPID_LOG_SEVERITY']='-999'
debug = 1
elif o == '-C':
cache = 'speed'
elif o == '-U':
domain = 'AF_UNIX'
elif o == '-t':
threads = int(v)
zeo_pipe = None
if s:
s = __import__(s, globals(), globals(), ('__doc__',))
s = s.Storage
server = None
else:
s, server, pid = forker.start_zeo("FileStorage",
(fs_name, 1), domain=domain)
data=open(data).read()
db=ZODB.DB(s,
# disable cache deactivation
cache_size=4000,
cache_deactivate_after=6000,)
print "Beginning work..."
results={1:[], 10:[], 100:[], 1000:[]}
if threads > 1:
import threading
l = []
for i in range(threads):
t = threading.Thread(target=work,
args=(db, results, nrep, compress, data,
detailed, minimize, i))
l.append(t)
for t in l:
t.start()
for t in l:
t.join()
else:
work(db, results, nrep, compress, data, detailed, minimize)
if server is not None:
server.close()
os.waitpid(pid, 0)
if detailed:
print '-'*24
print "num\tmean\tmin\tmax"
for r in 1, 10, 100, 1000:
times = []
for time, conf in results[r]:
times.append(time)
t = mean(times)
print "%d\t%.4f\t%.4f\t%.4f" % (r, t, min(times), max(times))
def mean(l):
tot = 0
for v in l:
tot = tot + v
return tot / len(l)
##def compress(s):
## c = zlib.compressobj()
## o = c.compress(s)
## return o + c.flush()
if __name__=='__main__':
main(sys.argv[1:])
=== Added File Zope3/src/zodb/zeo/tests/stress.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A ZEO client-server stress test to look for leaks.
The stress test should run in an infinite loop and should involve
multiple connections.
"""
# XXX This code is currently broken.
from __future__ import nested_scopes
import zodb
from zodb.zeo.client import ClientStorage
from zodb.storage.mapping import MappingStorage
from zodb.zeo.tests import forker
from zodb.storage.tests import MinPO
import os
import random
import sys
import types
NUM_TRANSACTIONS_PER_CONN = 10
NUM_CONNECTIONS = 10
NUM_ROOTS = 20
MAX_DEPTH = 20
MIN_OBJSIZE = 128
MAX_OBJSIZE = 2048
def an_object():
"""Return an object suitable for a PersistentMapping key"""
size = random.randrange(MIN_OBJSIZE, MAX_OBJSIZE)
if os.path.exists("/dev/urandom"):
f = open("/dev/urandom")
buf = f.read(size)
f.close()
return buf
else:
f = open(MinPO.__file__)
l = list(f.read(size))
f.close()
random.shuffle(l)
return "".join(l)
def setup(cn):
"""Initialize the database with some objects"""
root = cn.root()
for i in range(NUM_ROOTS):
prev = an_object()
for j in range(random.randrange(1, MAX_DEPTH)):
o = MinPO.MinPO(prev)
prev = o
root[an_object()] = o
get_transaction().commit()
cn.close()
def work(cn):
"""Do some work with a transaction"""
cn.sync()
root = cn.root()
obj = random.choice(root.values())
# walk down to the bottom
while not isinstance(obj.value, types.StringType):
obj = obj.value
obj.value = an_object()
get_transaction().commit()
def main():
# Yuck! Need to cleanup forker so that the API is consistent
# across Unix and Windows, at least if that's possible.
if os.name == "nt":
zaddr, tport, pid = forker.start_zeo_server('MappingStorage', ())
def exitserver():
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(tport)
s.close()
else:
zaddr = '', random.randrange(20000, 30000)
pid, exitobj = forker.start_zeo_server(MappingStorage(), zaddr)
def exitserver():
exitobj.close()
while 1:
pid = start_child(zaddr)
print "started", pid
os.waitpid(pid, 0)
exitserver()
def start_child(zaddr):
pid = os.fork()
if pid != 0:
return pid
try:
_start_child(zaddr)
finally:
os._exit(0)
def _start_child(zaddr):
storage = ClientStorage(zaddr, debug=1, min_disconnect_poll=0.5, wait=1)
db = ZODB.DB(storage, pool_size=NUM_CONNECTIONS)
setup(db.open())
conns = []
conn_count = 0
for i in range(NUM_CONNECTIONS):
c = db.open()
c.__count = 0
conns.append(c)
conn_count += 1
while conn_count < 25:
c = random.choice(conns)
if c.__count > NUM_TRANSACTIONS_PER_CONN:
conns.remove(c)
c.close()
conn_count += 1
c = db.open()
c.__count = 0
conns.append(c)
else:
c.__count += 1
work(c)
if __name__ == "__main__":
main()
=== Added File Zope3/src/zodb/zeo/tests/test_cache.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Test suite for the ZEO.ClientCache module.
At times, we do 'white box' testing, i.e. we know about the internals
of the ClientCache object.
"""
from __future__ import nested_scopes
import os
import time
import tempfile
import unittest
from zodb.zeo.cache import ClientCache
class ClientCacheTests(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
self.cachesize = 10*1000*1000
self.cache = ClientCache(size=self.cachesize)
self.cache.open()
def tearDown(self):
self.cache.close()
unittest.TestCase.tearDown(self)
def testOpenClose(self):
pass # All the work is done by setUp() / tearDown()
def testStoreLoad(self):
cache = self.cache
oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
loaded = cache.load(oid, '')
self.assertEqual(loaded, (data, serial))
def testMissingLoad(self):
cache = self.cache
oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
loaded = cache.load('garbage1', '')
self.assertEqual(loaded, None)
def testInvalidate(self):
cache = self.cache
oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
loaded = cache.load(oid, '')
self.assertEqual(loaded, (data, serial))
cache.invalidate(oid, '')
loaded = cache.load(oid, '')
self.assertEqual(loaded, None)
def testVersion(self):
cache = self.cache
oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
vname = 'myversion'
vdata = '5678'*200
vserial = 'IJKLMNOP'
cache.store(oid, data, serial, vname, vdata, vserial)
loaded = cache.load(oid, '')
self.assertEqual(loaded, (data, serial))
vloaded = cache.load(oid, vname)
self.assertEqual(vloaded, (vdata, vserial))
def testVersionOnly(self):
cache = self.cache
oid = 'abcdefgh'
data = ''
serial = ''
vname = 'myversion'
vdata = '5678'*200
vserial = 'IJKLMNOP'
cache.store(oid, data, serial, vname, vdata, vserial)
loaded = cache.load(oid, '')
self.assertEqual(loaded, None)
vloaded = cache.load(oid, vname)
self.assertEqual(vloaded, (vdata, vserial))
def testInvalidateNonVersion(self):
cache = self.cache
oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
vname = 'myversion'
vdata = '5678'*200
vserial = 'IJKLMNOP'
cache.store(oid, data, serial, vname, vdata, vserial)
loaded = cache.load(oid, '')
self.assertEqual(loaded, (data, serial))
vloaded = cache.load(oid, vname)
self.assertEqual(vloaded, (vdata, vserial))
cache.invalidate(oid, '')
loaded = cache.load(oid, '')
self.assertEqual(loaded, None)
# The version data is also invalidated at this point
vloaded = cache.load(oid, vname)
self.assertEqual(vloaded, None)
def testInvalidateVersion(self):
# Invalidating a version should not invalidate the non-version data.
# (This tests for the same bug as testInvalidatePersists below.)
cache = self.cache
oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
loaded = cache.load(oid, '')
self.assertEqual(loaded, (data, serial))
cache.invalidate(oid, 'bogus')
loaded = cache.load(oid, '')
self.assertEqual(loaded, (data, serial))
def testVerify(self):
cache = self.cache
results = []
def verifier(oid, serial, vserial):
results.append((oid, serial, vserial))
cache.verify(verifier)
self.assertEqual(results, [])
oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
results = []
cache.verify(verifier)
self.assertEqual(results, [(oid, serial, None)])
def testCheckSize(self):
# Make sure that cache._index[oid] is erased for oids that are
# stored in the cache file that's rewritten after a flip.
cache = self.cache
oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
cache.checkSize(10*self.cachesize) # Force a file flip
oid2 = 'abcdefgz'
data2 = '1234'*10
serial2 = 'ABCDEFGZ'
cache.store(oid2, data2, serial2, '', '', '')
cache.checkSize(10*self.cachesize) # Force another file flip
self.assertNotEqual(cache._index.get(oid2), None)
self.assertEqual(cache._index.get(oid), None)
def testCopyToCurrent(self):
# - write some objects to cache file 0
# - force a flip
# - write some objects to cache file 1
# - load some objects that are in cache file 0
# - load the same objects, making sure they are now in file 1
# - write some more objects
# - force another flip
# - load the same objects again
# - make sure they are now in file 0 again
cache = self.cache
# Create some objects
oid1 = 'abcdefgh'
data1 = '1234' * 100
serial1 = 'ABCDEFGH'
oid2 = 'bcdefghi'
data2 = '2345' * 200
serial2 = 'BCDEFGHI'
version2 = 'myversion'
nonversion = 'nada'
vdata2 = '5432' * 250
vserial2 = 'IHGFEDCB'
oid3 = 'cdefghij'
data3 = '3456' * 300
serial3 = 'CDEFGHIJ'
# Store them in the cache
cache.store(oid1, data1, serial1, '', '', '')
cache.store(oid2, data2, serial2, version2, vdata2, vserial2)
cache.store(oid3, data3, serial3, '', '', '')
# Verify that they are in file 0
self.assert_(None is not cache._index.get(oid1) > 0)
self.assert_(None is not cache._index.get(oid2) > 0)
self.assert_(None is not cache._index.get(oid3) > 0)
# Load them and verify that the loads return correct data
self.assertEqual(cache.load(oid1, ''), (data1, serial1))
self.assertEqual(cache.load(oid2, ''), (data2, serial2))
self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
self.assertEqual(cache.load(oid3, ''), (data3, serial3))
# Verify that they are still in file 0
self.assert_(None is not cache._index.get(oid1) > 0)
self.assert_(None is not cache._index.get(oid2) > 0)
self.assert_(None is not cache._index.get(oid3) > 0)
# Cause a cache flip
cache.checkSize(10*self.cachesize)
# Load o1, o2, o4 again and verify that the loads return correct data
self.assertEqual(cache.load(oid1, ''), (data1, serial1))
self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
self.assertEqual(cache.load(oid2, ''), (data2, serial2))
# Verify that o1, o2, 04 are now in file 1, o3 still in file 0
self.assert_(None is not cache._index.get(oid1) < 0)
self.assert_(None is not cache._index.get(oid2) < 0)
self.assert_(None is not cache._index.get(oid3) > 0)
# Cause another cache flip
cache.checkSize(10*self.cachesize)
# Load o1 and o2 again and verify that the loads return correct data
self.assertEqual(cache.load(oid1, ''), (data1, serial1))
self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
self.assertEqual(cache.load(oid2, ''), (data2, serial2))
# Verify that o1 and o2 are now back in file 0, o3 is lost
self.assert_(None is not cache._index.get(oid1) > 0)
self.assert_(None is not cache._index.get(oid2) > 0)
self.assert_(None is cache._index.get(oid3))
# Invalidate version data for o2
cache.invalidate(oid2, nonversion)
self.assertEqual(cache.load(oid2, ''), (data2, serial2))
self.assertEqual(cache.load(oid2, nonversion), None)
self.assertEqual(cache.load(oid2, version2), None)
# Cause another cache flip
cache.checkSize(10*self.cachesize)
# Load o1 and o2 again and verify that the loads return correct data
self.assertEqual(cache.load(oid1, ''), (data1, serial1))
self.assertEqual(cache.load(oid2, version2), None)
self.assertEqual(cache.load(oid2, nonversion), None)
self.assertEqual(cache.load(oid2, ''), (data2, serial2))
# Verify that o1 and o2 are now in file 1
self.assert_(None is not cache._index.get(oid1) < 0)
self.assert_(None is not cache._index.get(oid2) < 0)
class PersistentClientCacheTests(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
self.vardir = os.getcwd() # Don't use /tmp, it's a security risk
self.cachesize = 10*1000*1000
self.storagename = 'foo'
self.clientname = 'test'
# Predict file names
fn0 = 'c%s-%s-0.zec' % (self.storagename, self.clientname)
fn1 = 'c%s-%s-1.zec' % (self.storagename, self.clientname)
for fn in fn0, fn1:
fn = os.path.join(self.vardir, fn)
try:
os.unlink(fn)
except os.error:
pass
self.openCache()
def openCache(self):
self.cache = ClientCache(storage=self.storagename,
size=self.cachesize,
client=self.clientname,
var=self.vardir)
self.cache.open()
def reopenCache(self):
self.cache.close()
self.openCache()
return self.cache
def tearDown(self):
self.cache.close()
for filename in self.cache._p:
if filename is not None:
try:
os.unlink(filename)
except os.error:
pass
unittest.TestCase.tearDown(self)
def testCacheFileSelection(self):
# A bug in __init__ read the wrong slice of the file to determine
# the serial number of the first record, reading the
# last byte of the data size plus the first seven bytes of the
# serial number. This caused random selection of the proper
# 'current' file when a persistent cache was opened.
cache = self.cache
self.assertEqual(cache._current, 0) # Check that file 0 is current
oid = 'abcdefgh'
data = '1234'
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
cache.checkSize(10*self.cachesize) # Force a file flip
self.assertEqual(cache._current, 1) # Check that the flip worked
oid = 'abcdefgh'
data = '123'
serial = 'ABCDEFGZ'
cache.store(oid, data, serial, '', '', '')
cache = self.reopenCache()
loaded = cache.load(oid, '')
# Check that we got the most recent data:
self.assertEqual(loaded, (data, serial))
self.assertEqual(cache._current, 1) # Double check that 1 is current
def testInvalidationPersists(self):
# A bug in invalidate() caused invalidation to overwrite the
# 2nd byte of the data size on disk, rather rather than
# overwriting the status byte. For certain data sizes this
# can be observed by reopening a persistent cache: the
# invalidated data will appear valid (but with altered size).
cache = self.cache
magicsize = (ord('i') + 1) << 16
cache = self.cache
oid = 'abcdefgh'
data = '!'*magicsize
serial = 'ABCDEFGH'
cache.store(oid, data, serial, '', '', '')
loaded = cache.load(oid, '')
self.assertEqual(loaded, (data, serial))
cache.invalidate(oid, '')
cache = self.reopenCache()
loaded = cache.load(oid, '')
if loaded != None:
self.fail("invalidated data resurrected, size %d, was %d" %
(len(loaded[0]), len(data)))
def test_suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(ClientCacheTests))
suite.addTest(unittest.makeSuite(PersistentClientCacheTests))
return suite
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')
=== Added File Zope3/src/zodb/zeo/tests/test_conn.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Test setup for ZEO connection logic.
The actual tests are in ConnectionTests.py; this file provides the
platform-dependent scaffolding.
"""
# System imports
import unittest
# Import the actual test class
from zodb.zeo.tests import ConnectionTests
class FileStorageConfig:
def getConfig(self, path, create, read_only):
return """\
<Storage>
type FileStorage
file_name %s
create %s
read_only %s
</Storage>""" % (path,
create and 'yes' or 'no',
read_only and 'yes' or 'no')
class BerkeleyStorageConfig:
def getConfig(self, path, create, read_only):
# Full always creates and doesn't have a read_only flag
return """\
<Storage>
type BDBFullStorage
name %s
read_only %s
</Storage>""" % (path,
read_only and 'yes' or 'no')
class FileStorageConnectionTests(
FileStorageConfig,
ConnectionTests.ConnectionTests
):
"""FileStorage-specific connection tests."""
class FileStorageReconnectionTests(
FileStorageConfig,
ConnectionTests.ReconnectionTests
):
"""FileStorage-specific re-connection tests."""
class BDBConnectionTests(
BerkeleyStorageConfig,
ConnectionTests.ConnectionTests
):
"""Berkeley storage connection tests."""
class BDBReconnectionTests(
BerkeleyStorageConfig,
ConnectionTests.ReconnectionTests
):
"""Berkeley storage re-connection tests."""
test_classes = [FileStorageConnectionTests, FileStorageReconnectionTests]
try:
from zodb.storage.bdbfull import BDBFullStorage
except ImportError:
pass
else:
test_classes.append(BDBConnectionTests)
test_classes.append(BDBReconnectionTests)
def test_suite():
# shutup warnings about mktemp
import warnings
warnings.filterwarnings("ignore", "mktemp")
suite = unittest.TestSuite()
for klass in test_classes:
sub = unittest.makeSuite(klass, 'check')
suite.addTest(sub)
return suite
if __name__ == "__main__":
unittest.main(defaultTest='test_suite')
=== Added File Zope3/src/zodb/zeo/tests/test_tbuf.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import random
import unittest
from zodb.zeo.tbuf import TransactionBuffer
def random_string(size):
"""Return a random string of size size."""
l = [chr(random.randrange(256)) for i in range(size)]
return "".join(l)
def new_store_data():
"""Return arbitrary data to use as argument to store() method."""
return random_string(8), '', random_string(random.randrange(1000))
def new_invalidate_data():
"""Return arbitrary data to use as argument to invalidate() method."""
return random_string(8), ''
class TransBufTests(unittest.TestCase):
def checkTypicalUsage(self):
tbuf = TransactionBuffer()
tbuf.store(*new_store_data())
tbuf.invalidate(*new_invalidate_data())
tbuf.begin_iterate()
while 1:
o = tbuf.next()
if o is None:
break
tbuf.clear()
def doUpdates(self, tbuf):
data = []
for i in range(10):
d = new_store_data()
tbuf.store(*d)
data.append(d)
d = new_invalidate_data()
tbuf.invalidate(*d)
data.append(d)
tbuf.begin_iterate()
for i in range(len(data)):
x = tbuf.next()
if x[2] is None:
# the tbuf add a dummy None to invalidates
x = x[:2]
self.assertEqual(x, data[i])
def checkOrderPreserved(self):
tbuf = TransactionBuffer()
self.doUpdates(tbuf)
def checkReusable(self):
tbuf = TransactionBuffer()
self.doUpdates(tbuf)
tbuf.clear()
self.doUpdates(tbuf)
tbuf.clear()
self.doUpdates(tbuf)
def test_suite():
return unittest.makeSuite(TransBufTests, 'check')
=== Added File Zope3/src/zodb/zeo/tests/test_zeo.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Test suite for ZEO based on ZODB.tests."""
# System imports
import os
import sys
import time
import socket
import asyncore
import tempfile
import unittest
import logging
# ZODB test support
import zodb
from zodb.storage.tests.minpo import MinPO
from zodb.storage.tests.base import zodb_unpickle
# ZODB test mixin classes
from zodb.storage.tests import StorageTestBase, BasicStorage, VersionStorage, \
TransactionalUndoStorage, TransactionalUndoVersionStorage, \
PackableStorage, Synchronization, ConflictResolution, RevisionStorage, \
MTStorage, ReadOnlyStorage
# ZEO imports
from zodb.zeo.client import ClientStorage
from zodb.zeo.exceptions import Disconnected
# ZEO test support
from zodb.zeo.tests import forker, Cache
# ZEO test mixin classes
from zodb.zeo.tests import CommitLockTests, ThreadTests
class DummyDB:
def invalidate(self, *args):
pass
class MiscZEOTests:
"""ZEO tests that don't fit in elsewhere."""
def checkLargeUpdate(self):
obj = MinPO("X" * (10 * 128 * 1024))
self._dostore(data=obj)
def checkZEOInvalidation(self):
addr = self._storage._addr
storage2 = ClientStorage(addr, wait=1, min_disconnect_poll=0.1)
try:
oid = self._storage.new_oid()
ob = MinPO('first')
revid1 = self._dostore(oid, data=ob)
data, serial = storage2.load(oid, '')
self.assertEqual(zodb_unpickle(data), MinPO('first'))
self.assertEqual(serial, revid1)
revid2 = self._dostore(oid, data=MinPO('second'), revid=revid1)
for n in range(3):
# Let the server and client talk for a moment.
# Is there a better way to do this?
asyncore.poll(0.1)
data, serial = storage2.load(oid, '')
self.assertEqual(zodb_unpickle(data), MinPO('second'),
'Invalidation message was not sent!')
self.assertEqual(serial, revid2)
finally:
storage2.close()
class GenericTests(
# Base class for all ZODB tests
StorageTestBase.StorageTestBase,
# ZODB test mixin classes (in the same order as imported)
BasicStorage.BasicStorage,
VersionStorage.VersionStorage,
TransactionalUndoStorage.TransactionalUndoStorage,
TransactionalUndoVersionStorage.TransactionalUndoVersionStorage,
PackableStorage.PackableStorage,
Synchronization.SynchronizedStorage,
ConflictResolution.ConflictResolvingStorage,
ConflictResolution.ConflictResolvingTransUndoStorage,
RevisionStorage.RevisionStorage,
MTStorage.MTStorage,
ReadOnlyStorage.ReadOnlyStorage,
# ZEO test mixin classes (in the same order as imported)
Cache.StorageWithCache,
Cache.TransUndoStorageWithCache,
CommitLockTests.CommitLockTests,
ThreadTests.ThreadTests,
# Locally defined (see above)
MiscZEOTests
):
"""Combine tests from various origins in one class."""
def open(self, read_only=0):
# XXX Needed to support ReadOnlyStorage tests. Ought to be a
# cleaner way.
addr = self._storage._addr
self._storage.close()
self._storage = ClientStorage(addr, read_only=read_only, wait=1)
def unresolvable(self, klass):
# This helper method is used to test the implementation of
# conflict resolution. That code runs in the server, and there
# is no way for the test suite (a client) to inquire about it.
pass
class FileStorageTests(GenericTests):
"""Test ZEO backed by a FileStorage."""
def setUp(self):
logging.info("testZEO: setUp() %s", self.id())
zeoport, adminaddr, pid = forker.start_zeo_server(self.getConfig())
self._pids = [pid]
self._servers = [adminaddr]
self._storage = ClientStorage(zeoport, '1', cache_size=20000000,
min_disconnect_poll=0.5, wait=1)
self._storage.registerDB(DummyDB())
def tearDown(self):
self._storage.close()
for server in self._servers:
forker.shutdown_zeo_server(server)
if hasattr(os, 'waitpid'):
# Not in Windows Python until 2.3
for pid in self._pids:
os.waitpid(pid, 0)
def getConfig(self):
filename = self.__fs_base = tempfile.mktemp()
# Return a 1-tuple
return """\
<Storage>
type FileStorage
file_name %s
create yes
</Storage>
""" % filename
class BDBTests(FileStorageTests):
"""ZEO backed by a Berkeley Full storage."""
def getStorage(self):
self._envdir = tempfile.mktemp()
# Return a 1-tuple
return """\
<Storage>
type BDBFullStorage
name %s
</Storage>
""" % self._envdir
test_classes = [FileStorageTests]
try:
from zodb.storage.bdbfull import BDBFullStorage
except ImportError:
pass
else:
test_classes.append(BDBTests)
def test_suite():
# shutup warnings about mktemp
import warnings
warnings.filterwarnings("ignore", "mktemp")
suite = unittest.TestSuite()
for klass in test_classes:
sub = unittest.makeSuite(klass, 'check')
suite.addTest(sub)
return suite
if __name__ == "__main__":
unittest.main(defaultTest='test_suite')
=== Added File Zope3/src/zodb/zeo/tests/threadtests.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Compromising positions involving threads."""
import threading
from zodb.ztransaction import Transaction
from zodb.storage.tests.base import zodb_pickle, MinPO
import zodb.zeo.client
from zodb.zeo.exceptions import Disconnected
ZERO = '\0'*8
class BasicThread(threading.Thread):
def __init__(self, storage, doNextEvent, threadStartedEvent):
self.storage = storage
self.trans = Transaction()
self.doNextEvent = doNextEvent
self.threadStartedEvent = threadStartedEvent
self.gotValueError = 0
self.gotDisconnected = 0
threading.Thread.__init__(self)
self.setDaemon(1)
def join(self):
threading.Thread.join(self, 10)
assert not self.isAlive()
class GetsThroughVoteThread(BasicThread):
# This thread gets partially through a transaction before it turns
# execution over to another thread. We're trying to establish that a
# tpc_finish() after a storage has been closed by another thread will get
# a ClientStorageError error.
#
# This class gets does a tpc_begin(), store(), tpc_vote() and is waiting
# to do the tpc_finish() when the other thread closes the storage.
def run(self):
self.storage.tpc_begin(self.trans)
oid = self.storage.new_oid()
self.storage.store(oid, ZERO, zodb_pickle(MinPO("c")), '', self.trans)
self.storage.tpc_vote(self.trans)
self.threadStartedEvent.set()
self.doNextEvent.wait(10)
try:
self.storage.tpc_finish(self.trans)
except ZEO.ClientStorage.ClientStorageError:
self.gotValueError = 1
self.storage.tpc_abort(self.trans)
class GetsThroughBeginThread(BasicThread):
# This class is like the above except that it is intended to be run when
# another thread is already in a tpc_begin(). Thus, this thread will
# block in the tpc_begin until another thread closes the storage. When
# that happens, this one will get disconnected too.
def run(self):
try:
self.storage.tpc_begin(self.trans)
except ZEO.ClientStorage.ClientStorageError:
self.gotValueError = 1
class AbortsAfterBeginFailsThread(BasicThread):
# This class is identical to GetsThroughBeginThread except that it
# attempts to tpc_abort() after the tpc_begin() fails. That will raise a
# ClientDisconnected exception which implies that we don't have the lock,
# and that's what we really want to test (but it's difficult given the
# threading module's API).
def run(self):
try:
self.storage.tpc_begin(self.trans)
except ZEO.ClientStorage.ClientStorageError:
self.gotValueError = 1
try:
self.storage.tpc_abort(self.trans)
except Disconnected:
self.gotDisconnected = 1
class ThreadTests:
# Thread 1 should start a transaction, but not get all the way through it.
# Main thread should close the connection. Thread 1 should then get
# disconnected.
def checkDisconnectedOnThread2Close(self):
doNextEvent = threading.Event()
threadStartedEvent = threading.Event()
thread1 = GetsThroughVoteThread(self._storage,
doNextEvent, threadStartedEvent)
thread1.start()
threadStartedEvent.wait(10)
self._storage.close()
doNextEvent.set()
thread1.join()
self.assertEqual(thread1.gotValueError, 1)
# Thread 1 should start a transaction, but not get all the way through
# it. While thread 1 is in the middle of the transaction, a second thread
# should start a transaction, and it will block in the tcp_begin() --
# because thread 1 has acquired the lock in its tpc_begin(). Now the main
# thread closes the storage and both sub-threads should get disconnected.
def checkSecondBeginFails(self):
doNextEvent = threading.Event()
threadStartedEvent = threading.Event()
thread1 = GetsThroughVoteThread(self._storage,
doNextEvent, threadStartedEvent)
thread2 = GetsThroughBeginThread(self._storage,
doNextEvent, threadStartedEvent)
thread1.start()
threadStartedEvent.wait(1)
thread2.start()
self._storage.close()
doNextEvent.set()
thread1.join()
thread2.join()
self.assertEqual(thread1.gotValueError, 1)
self.assertEqual(thread2.gotValueError, 1)
def checkThatFailedBeginDoesNotHaveLock(self):
doNextEvent = threading.Event()
threadStartedEvent = threading.Event()
thread1 = GetsThroughVoteThread(self._storage,
doNextEvent, threadStartedEvent)
thread2 = AbortsAfterBeginFailsThread(self._storage,
doNextEvent, threadStartedEvent)
thread1.start()
threadStartedEvent.wait(1)
thread2.start()
self._storage.close()
doNextEvent.set()
thread1.join()
thread2.join()
self.assertEqual(thread1.gotValueError, 1)
self.assertEqual(thread2.gotValueError, 1)
self.assertEqual(thread2.gotDisconnected, 1)
# Run a bunch of threads doing small and large stores in parallel
def checkMTStores(self):
threads = []
for i in range(5):
t = threading.Thread(target=self.mtstorehelper)
threads.append(t)
t.start()
for t in threads:
t.join(30)
for i in threads:
self.failUnless(not t.isAlive())
# Helper for checkMTStores
def mtstorehelper(self):
name = threading.currentThread().getName()
objs = []
for i in range(10):
objs.append(MinPO("X" * 200000))
objs.append(MinPO("X"))
for obj in objs:
self._dostore(data=obj)
=== Added File Zope3/src/zodb/zeo/tests/zeoserver.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Helper file used to launch a ZEO server cross platform"""
import os
import sys
import errno
import getopt
import random
import socket
import logging
import asyncore
import .threadedasync
import ZConfig
from zodb import StorageConfig
import zodb.zeo.server
def load_storage(fp):
rootconf = ZConfig.loadfile(fp)
storageconf = rootconf.getSection('Storage')
return StorageConfig.createStorage(storageconf)
def cleanup(storage):
# FileStorage and the Berkeley storages have this method, which deletes
# all files and directories used by the storage. This prevents @-files
# from clogging up /tmp
try:
storage.cleanup()
except AttributeError:
pass
class ZEOTestServer(asyncore.dispatcher):
"""A server for killing the whole process at the end of a test.
The first time we connect to this server, we write an ack character down
the socket. The other end should block on a recv() of the socket so it
can guarantee the server has started up before continuing on.
The second connect to the port immediately exits the process, via
os._exit(), without writing data on the socket. It does close and clean
up the storage first. The other end will get the empty string from its
recv() which will be enough to tell it that the server has exited.
I think this should prevent us from ever getting a legitimate addr-in-use
error.
"""
__super_init = asyncore.dispatcher.__init__
def __init__(self, addr, storage, keep):
self.__super_init()
self._storage = storage
self._keep = keep
# Count down to zero, the number of connects
self._count = 1
# Create a logger
self.logger = logging.getLogger('zeoserver.%d.%s' %
(os.getpid(), addr))
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
# Some ZEO tests attempt a quick start of the server using the same
# port so we have to set the reuse flag.
self.set_reuse_addr()
try:
self.bind(addr)
except:
# We really want to see these exceptions
import traceback
traceback.print_exc()
raise
self.listen(5)
self.logger.info('bound and listening')
def handle_accept(self):
sock, addr = self.accept()
self.logger.info('in handle_accept()')
# When we're done with everything, close the storage. Do not write
# the ack character until the storage is finished closing.
if self._count <= 0:
self.logger.info('closing the storage')
self._storage.close()
if not self._keep:
cleanup(self._storage)
self.logger.info('exiting')
os._exit(0)
self.logger.info('continuing')
sock.send('X')
self._count -= 1
def main():
# Initialize the logging module.
import logging.config
logging.basicConfig()
logging.root.setLevel(logging.CRITICAL)
# If log.ini exists, use it
if os.path.exists("log.ini"):
logging.config.fileConfig("log.ini")
# Create a logger
logger = logging.getLogger('zeoserver.%d' % os.getpid())
logger.info('starting')
# We don't do much sanity checking of the arguments, since if we get it
# wrong, it's a bug in the test suite.
ro_svr = False
keep = False
configfile = None
# Parse the arguments and let getopt.error percolate
opts, args = getopt.getopt(sys.argv[1:], 'rkC:')
for opt, arg in opts:
if opt == '-r':
ro_svr = True
elif opt == '-k':
keep = True
elif opt == '-C':
configfile = arg
# Open the config file and let ZConfig parse the data there. Then remove
# the config file, otherwise we'll leave turds.
fp = open(configfile, 'r')
storage = load_storage(fp)
fp.close()
os.remove(configfile)
# The rest of the args are hostname, portnum
zeo_port = int(args[0])
test_port = zeo_port + 1
try:
logger.info('creating the test server, ro: %s, keep: %s',
ro_svr, keep)
t = ZEOTestServer(('', test_port), storage, keep)
except socket.error, e:
if e[0] <> errno.EADDRINUSE: raise
logger.info('addr in use, closing and exiting')
storage.close()
cleanup(storage)
sys.exit(2)
addr = ('', zeo_port)
logger.info('creating the storage server')
serv = ZEO.StorageServer.StorageServer(addr, {'1': storage}, ro_svr)
logger.info('entering ThreadedAsync loop')
ThreadedAsync.LoopCallback.loop()
if __name__ == '__main__':
main()