[Zope3-checkins] CVS: Zope3/src/zodb/zeo/tests - auth_plaintext.py:1.2.2.1 basethread.py:1.2.2.1 invalid.py:1.4.2.1 test_auth.py:1.2.2.1 test_mon.py:1.2.2.1 test_options.py:1.2.2.1 commitlock.py:1.7.2.1 connection.py:1.8.20.1 forker.py:1.5.26.1 test_cache.py:1.3.26.1 test_conn.py:1.5.2.1 test_zeo.py:1.12.2.1 zeoserver.py:1.8.26.1 thread.py:NONE
Grégoire Weber
zope@i-con.ch
Sun, 22 Jun 2003 10:24:04 -0400
Update of /cvs-repository/Zope3/src/zodb/zeo/tests
In directory cvs.zope.org:/tmp/cvs-serv24874/src/zodb/zeo/tests
Modified Files:
Tag: cw-mail-branch
commitlock.py connection.py forker.py test_cache.py
test_conn.py test_zeo.py zeoserver.py
Added Files:
Tag: cw-mail-branch
auth_plaintext.py basethread.py invalid.py test_auth.py
test_mon.py test_options.py
Removed Files:
Tag: cw-mail-branch
thread.py
Log Message:
Synced up with HEAD
=== Added File Zope3/src/zodb/zeo/tests/auth_plaintext.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Implements plaintext password authentication. The password is stored in
an SHA hash in the Database. The client sends over the plaintext
password, and the SHA hashing is done on the server side.
This mechanism offers *no network security at all*; the only security
is provided by not storing plaintext passwords on disk. (See the
auth_srp module for a secure mechanism)"""
import sha
from zodb.zeo.server import ZEOStorage
from zodb.zeo.auth import register_module
from zodb.zeo.auth.base import Client, Database
class StorageClass(ZEOStorage):
def auth(self, username, password):
try:
dbpw = self.database.get_password(username)
except LookupError:
return 0
password = sha.new(password).hexdigest()
return self.finish_auth(dbpw == password)
class PlaintextClient(Client):
extensions = ["auth"]
def start(self, username, realm, password):
return self.stub.auth(username, password)
register_module("plaintext", StorageClass, PlaintextClient, Database)
=== Added File Zope3/src/zodb/zeo/tests/basethread.py ===
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A Thread base class for use with unittest."""
from cStringIO import StringIO
import threading
import traceback
class TestThread(threading.Thread):
__super_init = threading.Thread.__init__
__super_run = threading.Thread.run
def __init__(self, testcase, group=None, target=None, name=None,
args=(), kwargs={}, verbose=None):
self.__super_init(group, target, name, args, kwargs, verbose)
self.setDaemon(1)
self._testcase = testcase
def run(self):
try:
self.testrun()
except Exception:
s = StringIO()
traceback.print_exc(file=s)
self._testcase.fail("Exception in thread %s:\n%s\n" %
(self, s.getvalue()))
def cleanup(self, timeout=15):
self.join(timeout)
if self.isAlive():
self._testcase.fail("Thread did not finish: %s" % self)
=== Added File Zope3/src/zodb/zeo/tests/invalid.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from thread import get_ident
import threading
import time
from zodb.btrees.check import check, display
from zodb.btrees.OOBTree import OOBTree
from zodb.db import DB
from zodb.interfaces import ReadConflictError, ConflictError, VersionLockError
from zodb.zeo.tests.basethread import TestThread
from zodb.zeo.tests.connection import CommonSetupTearDown
from transaction import get_transaction
import logging
# XXX stopped porting here
# The tests here let several threads have a go at one or more database
# instances simultaneously. Each thread appends a disjoint (from the
# other threads) sequence of increasing integers to an OOBTree, one at
# at time (per thread). This provokes lots of conflicts, and BTrees
# work hard at conflict resolution too. An OOBTree is used because
# that flavor has the smallest maximum bucket size, and so splits buckets
# more often than other BTree flavors.
#
# When these tests were first written, they provoked an amazing number
# of obscure timing-related bugs in cache consistency logic, revealed
# by failure of the BTree to pass internal consistency checks at the end,
# and/or by failure of the BTree to contain all the keys the threads
# thought they added (i.e., the keys for which get_transaction().commit()
# did not raise any exception).
class StressThread(TestThread):
# Append integers startnum, startnum + step, startnum + 2*step, ...
# to 'tree' until Event stop is set. If sleep is given, sleep
# that long after each append. At the end, instance var .added_keys
# is a list of the ints the thread believes it added successfully.
def __init__(self, testcase, db, stop, threadnum, startnum,
step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
self.stop = stop
self.threadnum = threadnum
self.startnum = startnum
self.step = step
self.sleep = sleep
self.added_keys = []
def testrun(self):
cn = self.db.open()
while not self.stop.isSet():
try:
tree = cn.root()["tree"]
break
except (ConflictError, KeyError):
get_transaction().abort()
cn.sync()
key = self.startnum
while not self.stop.isSet():
try:
tree[key] = self.threadnum
get_transaction().note("add key %s" % key)
get_transaction().commit()
if self.sleep:
time.sleep(self.sleep)
except (ReadConflictError, ConflictError), msg:
get_transaction().abort()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn.sync()
else:
self.added_keys.append(key)
key += self.step
cn.close()
class VersionStressThread(TestThread):
def __init__(self, testcase, db, stop, threadnum, startnum,
step=2, sleep=None):
TestThread.__init__(self, testcase)
self.db = db
self.stop = stop
self.threadnum = threadnum
self.startnum = startnum
self.step = step
self.sleep = sleep
self.added_keys = []
self.log = logging.getLogger("thread:%s" % get_ident()).info
def testrun(self):
self.log("thread begin")
commit = 0
key = self.startnum
while not self.stop.isSet():
version = "%s:%s" % (self.threadnum, key)
commit = not commit
self.log("attempt to add key=%s version=%s commit=%d" %
(key, version, commit))
if self.oneupdate(version, key, commit):
self.added_keys.append(key)
key += self.step
def oneupdate(self, version, key, commit=1):
# The mess of sleeps below were added to reduce the number
# of VersionLockErrors, based on empirical observation.
# It looks like the threads don't switch enough without
# the sleeps.
cn = self.db.open(version)
while not self.stop.isSet():
try:
tree = cn.root()["tree"]
break
except (ConflictError, KeyError), msg:
cn.sync()
while not self.stop.isSet():
try:
tree[key] = self.threadnum
get_transaction().note("add key %d" % key)
get_transaction().commit()
if self.sleep:
time.sleep(self.sleep)
break
except (VersionLockError, ReadConflictError, ConflictError), msg:
self.log(msg)
get_transaction().abort()
# sync() is necessary here to process invalidations
# if we get a read conflict. In the read conflict case,
# no objects were modified so cn never got registered
# with the transaction.
cn.sync()
if self.sleep:
time.sleep(self.sleep)
try:
while not self.stop.isSet():
try:
if commit:
self.db.commitVersion(version)
get_transaction().note("commit version %s" % version)
else:
self.db.abortVersion(version)
get_transaction().note("abort version %s" % version)
get_transaction().commit()
if self.sleep:
time.sleep(self.sleep)
return commit
except ConflictError, msg:
self.log(msg)
get_transaction().abort()
cn.sync()
finally:
cn.close()
return 0
class InvalidationTests(CommonSetupTearDown):
level = 2
DELAY = 15 # number of seconds the main thread lets the workers run
def setUp(self):
super(InvalidationTests, self).setUp()
self.dbs = []
def tearDown(self):
for db in self.dbs:
db.close()
super(InvalidationTests, self).tearDown()
def db(self, storage):
db = DB(storage)
self.dbs.append(db)
return db
def _check_tree(self, cn, tree):
# Make sure the BTree is sane and that all the updates persisted.
retries = 3
while retries:
retries -= 1
try:
check(tree)
tree._check()
except ReadConflictError:
if retries:
get_transaction().abort()
cn.sync()
else:
raise
except:
display(tree)
raise
def _check_threads(self, tree, *threads):
# Make sure the thread's view of the world is consistent with
# the actual database state.
expected_keys = []
errormsgs = []
err = errormsgs.append
for t in threads:
if not t.added_keys:
err("thread %d didn't add any keys" % t.threadnum)
expected_keys.extend(t.added_keys)
expected_keys.sort()
actual_keys = list(tree.keys())
if expected_keys != actual_keys:
err("expected keys != actual keys")
for k in expected_keys:
if k not in actual_keys:
err("key %s expected but not in tree" % k)
for k in actual_keys:
if k not in expected_keys:
err("key %s in tree but not expected" % k)
if errormsgs:
display(tree)
self.fail('\n'.join(errormsgs))
def go(self, stop, *threads):
# Run the threads
for t in threads:
t.start()
time.sleep(self.DELAY)
stop.set()
for t in threads:
t.cleanup()
def testConcurrentUpdates2Storages(self):
self._storage = storage1 = self.openClientStorage()
storage2 = self.openClientStorage(cache="2")
db1 = self.db(storage1)
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
db2 = self.db(storage2)
# Run two threads that update the BTree
t1 = StressThread(self, db1, stop, 1, 1)
t2 = StressThread(self, db2, stop, 2, 2)
self.go(stop, t1, t2)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2)
cn.close()
def testConcurrentUpdates1Storage(self):
self._storage = storage1 = self.openClientStorage()
db1 = self.db(storage1)
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
# Run two threads that update the BTree
t1 = StressThread(self, db1, stop, 1, 1, sleep=0.001)
t2 = StressThread(self, db1, stop, 2, 2, sleep=0.001)
self.go(stop, t1, t2)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2)
cn.close()
def testConcurrentUpdates2StoragesMT(self):
self._storage = storage1 = self.openClientStorage()
db1 = self.db(storage1)
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
db2 = self.db(self.openClientStorage(cache="2"))
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
t1 = StressThread(self, db1, stop, 1, 1, 3)
t2 = StressThread(self, db2, stop, 2, 2, 3, 0.001)
t3 = StressThread(self, db2, stop, 3, 3, 3, 0.001)
self.go(stop, t1, t2, t3)
cn.sync()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2, t3)
cn.close()
def testConcurrentUpdatesInVersions(self):
self._storage = storage1 = self.openClientStorage()
db1 = self.db(storage1)
stop = threading.Event()
cn = db1.open()
tree = cn.root()["tree"] = OOBTree()
get_transaction().commit()
cn.close()
db2 = self.db(self.openClientStorage(cache="2"))
# Run three threads that update the BTree.
# Two of the threads share a single storage so that it
# is possible for both threads to read the same object
# at the same time.
t1 = VersionStressThread(self, db1, stop, 1, 1, 3)
t2 = VersionStressThread(self, db2, stop, 2, 2, 3, 0.001)
t3 = VersionStressThread(self, db2, stop, 3, 3, 3, 0.001)
self.go(stop, t1, t2, t3)
cn = db1.open()
self._check_tree(cn, tree)
self._check_threads(tree, t1, t2, t3)
cn.close()
=== Added File Zope3/src/zodb/zeo/tests/test_auth.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Test suite for AuthZEO."""
import os
import tempfile
import time
import unittest
from zodb.zeo.client import ClientStorage
from zodb.zeo.server import StorageServer
from zodb.zeo.tests.connection import CommonSetupTearDown
from zodb.storage.file import FileStorage
class AuthTest(CommonSetupTearDown):
__super_getServerConfig = CommonSetupTearDown.getServerConfig
__super_setUp = CommonSetupTearDown.setUp
__super_tearDown = CommonSetupTearDown.tearDown
realm = None
def setUp(self):
self.pwfile = tempfile.mktemp()
if self.realm:
self.pwdb = self.dbclass(self.pwfile, self.realm)
else:
self.pwdb = self.dbclass(self.pwfile)
self.pwdb.add_user("foo", "bar")
self.pwdb.save()
self.__super_setUp()
def tearDown(self):
self.__super_tearDown()
os.remove(self.pwfile)
def getConfig(self, path, create, read_only):
return "<mappingstorage 1/>"
def getServerConfig(self, addr, ro_svr):
zconf = self.__super_getServerConfig(addr, ro_svr)
zconf.authentication_protocol = self.protocol
zconf.authentication_database = self.pwfile
zconf.authentication_realm = self.realm
return zconf
def wait(self):
for i in range(25):
if self._storage.test_connection:
return
time.sleep(0.1)
self.fail("Timed out waiting for client to authenticate")
def testOK(self):
# Sleep for 0.2 seconds to give the server some time to start up
# seems to be needed before and after creating the storage
self._storage = self.openClientStorage(wait=0, username="foo",
password="bar", realm=self.realm)
self.wait()
self.assert_(self._storage._connection)
self._storage._connection.poll()
self.assert_(self._storage.is_connected())
def testNOK(self):
self._storage = self.openClientStorage(wait=0, username="foo",
password="noogie",
realm=self.realm)
self.wait()
# If the test established a connection, then it failed.
self.failIf(self._storage._connection)
class PlainTextAuth(AuthTest):
import zodb.zeo.tests.auth_plaintext
protocol = "plaintext"
database = "authdb.sha"
dbclass = zodb.zeo.tests.auth_plaintext.Database
realm = "Plaintext Realm"
class DigestAuth(AuthTest):
import zodb.zeo.auth.auth_digest
protocol = "digest"
database = "authdb.digest"
dbclass = zodb.zeo.auth.auth_digest.DigestDatabase
realm = "Digest Realm"
test_classes = [PlainTextAuth, DigestAuth]
def test_suite():
suite = unittest.TestSuite()
for klass in test_classes:
sub = unittest.makeSuite(klass)
suite.addTest(sub)
return suite
if __name__ == "__main__":
unittest.main(defaultTest='test_suite')
=== Added File Zope3/src/zodb/zeo/tests/test_mon.py ===
##############################################################################
#
# Copyright (c) 2003 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Test that the monitor produce sensible results.
$Id: test_mon.py,v 1.2.2.1 2003/06/22 14:22:32 gregweb Exp $
"""
import socket
import time
import unittest
from zodb.zeo.tests.connection import CommonSetupTearDown
from zodb.zeo.monitor import StorageStats
class MonitorTests(CommonSetupTearDown):
monitor = 1
def get_monitor_output(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('localhost', 42000))
L = []
while 1:
buf = s.recv(8192)
if buf:
L.append(buf)
else:
break
s.close()
return "".join(L)
def parse(self, s):
# Return a list of StorageStats, one for each storage.
lines = s.split("\n")
self.assert_(lines[0].startswith("ZEO monitor server"))
# lines[1] is a date
# Break up rest of lines into sections starting with Storage:
# and ending with a blank line.
sections = []
cur = None
for line in lines[2:]:
if line.startswith("Storage:"):
cur = [line]
elif line:
cur.append(line)
else:
if cur is not None:
sections.append(cur)
cur = None
assert cur is None # bug in the test code if this fails
d = {}
for sect in sections:
hdr = sect[0]
key, value = hdr.split(":")
storage = int(value)
s = d[storage] = StorageStats()
s.parse("\n".join(sect[1:]))
return d
def getConfig(self, path, create, read_only):
return """<mappingstorage 1/>"""
def testMonitor(self):
# just open a client to know that the server is up and running
# XXX should put this in setUp
self.storage = self.openClientStorage()
s = self.get_monitor_output()
self.storage.close()
self.assert_(s.find("monitor") != -1)
d = self.parse(s)
stats = d[1]
self.assertEqual(stats.clients, 1)
self.assertEqual(stats.commits, 0)
def test_suite():
return unittest.makeSuite(MonitorTests)
=== Added File Zope3/src/zodb/zeo/tests/test_options.py ===
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Test suite for ZEO.runzeo.ZEOOptions."""
import os
import sys
import tempfile
import unittest
from StringIO import StringIO
import zodb.config
from zodb.zeo.runzeo import ZEOOptions
from zdaemon.tests.testzdoptions import TestZDOptions
class TestZEOOptions(TestZDOptions):
OptionsClass = ZEOOptions
input_args = ["-f", "Data.fs", "-a", "5555"]
output_opts = [("-f", "Data.fs"), ("-a", "5555")]
output_args = []
configdata = """
<zeo>
address 5555
</zeo>
<filestorage fs>
path Data.fs
</filestorage>
"""
def setUp(self):
self.tempfilename = tempfile.mktemp()
f = open(self.tempfilename, "w")
f.write(self.configdata)
f.close()
def tearDown(self):
try:
os.remove(self.tempfilename)
except os.error:
pass
def test_configure(self):
# Hide the base class test_configure
pass
def test_defaults_with_schema(self):
options = self.OptionsClass()
options.realize(["-C", self.tempfilename])
self.assertEqual(options.address, ("", 5555))
self.assertEqual(len(options.storages), 1)
opener = options.storages[0]
self.assertEqual(opener.name, "fs")
self.assertEqual(opener.__class__, zodb.config.FileStorage)
self.assertEqual(options.read_only, 0)
self.assertEqual(options.transaction_timeout, None)
self.assertEqual(options.invalidation_queue_size, 100)
def test_defaults_without_schema(self):
options = self.OptionsClass()
options.realize(["-a", "5555", "-f", "Data.fs"])
self.assertEqual(options.address, ("", 5555))
self.assertEqual(len(options.storages), 1)
opener = options.storages[0]
self.assertEqual(opener.name, "1")
self.assertEqual(opener.__class__, zodb.config.FileStorage)
self.assertEqual(opener.config.path, "Data.fs")
self.assertEqual(options.read_only, 0)
self.assertEqual(options.transaction_timeout, None)
self.assertEqual(options.invalidation_queue_size, 100)
def test_commandline_overrides(self):
options = self.OptionsClass()
options.realize(["-C", self.tempfilename,
"-a", "6666", "-f", "Wisdom.fs"])
self.assertEqual(options.address, ("", 6666))
self.assertEqual(len(options.storages), 1)
opener = options.storages[0]
self.assertEqual(opener.__class__, zodb.config.FileStorage)
self.assertEqual(opener.config.path, "Wisdom.fs")
self.assertEqual(options.read_only, 0)
self.assertEqual(options.transaction_timeout, None)
self.assertEqual(options.invalidation_queue_size, 100)
def test_suite():
suite = unittest.TestSuite()
for cls in [TestZEOOptions]:
suite.addTest(unittest.makeSuite(cls))
return suite
if __name__ == "__main__":
unittest.main(defaultTest='test_suite')
=== Zope3/src/zodb/zeo/tests/commitlock.py 1.7 => 1.7.2.1 ===
--- Zope3/src/zodb/zeo/tests/commitlock.py:1.7 Fri May 16 17:49:36 2003
+++ Zope3/src/zodb/zeo/tests/commitlock.py Sun Jun 22 10:22:32 2003
@@ -23,7 +23,7 @@
from zodb.zeo.client import ClientStorage
from zodb.zeo.interfaces import ClientDisconnected
-from zodb.zeo.tests.thread import TestThread
+from zodb.zeo.tests.basethread import TestThread
from zodb.zeo.tests.common import DummyDB
@@ -179,7 +179,7 @@
def _get_timestamp(self):
t = time.time()
- t = apply(TimeStamp,(time.gmtime(t)[:5]+(t%60,)))
+ t = TimeStamp(*time.gmtime(t)[:5]+(t%60,))
return `t`
class CommitLockUndoTests(CommitLockTests):
=== Zope3/src/zodb/zeo/tests/connection.py 1.8 => 1.8.20.1 ===
--- Zope3/src/zodb/zeo/tests/connection.py:1.8 Thu Mar 13 16:32:30 2003
+++ Zope3/src/zodb/zeo/tests/connection.py Sun Jun 22 10:22:32 2003
@@ -30,6 +30,7 @@
from zodb.zeo.tests.common import TestClientStorage, DummyDB
from transaction import get_transaction
+from zodb.db import DB
from zodb.ztransaction import Transaction
from zodb.storage.interfaces import ReadOnlyError
from zodb.storage.tests.base import StorageTestBase
@@ -37,16 +38,36 @@
from zodb.storage.tests.base import zodb_pickle, zodb_unpickle
from zodb.storage.tests.base import handle_all_serials, ZERO
+class TestClientStorage(ClientStorage):
+
+ test_connection = 0
+
+ def verify_cache(self, stub):
+ self.end_verify = threading.Event()
+ self.verify_result = ClientStorage.verify_cache(self, stub)
+
+ def endVerify(self):
+ ClientStorage.endVerify(self)
+ self.end_verify.set()
+
+ def testConnection(self, conn):
+ try:
+ return ClientStorage.testConnection(self, conn)
+ finally:
+ self.test_connection = 1
+
+class DummyDB:
+ def invalidate(self, *args, **kwargs):
+ pass
+
class CommonSetupTearDown(StorageTestBase):
"""Common boilerplate"""
- __super_setUp = StorageTestBase.setUp
- __super_tearDown = StorageTestBase.tearDown
-
keep = 0
invq = None
timeout = None
monitor = 0
+ db_class = DummyDB
def setUp(self):
"""Test setup for connection tests.
@@ -55,19 +76,23 @@
calling self._newAddr() and then self.startServer(index=i)
for i in 1, 2, ...
"""
- self.__super_setUp()
+ super(CommonSetupTearDown, self).setUp()
self.logger = logging.getLogger("testZEO")
self.logger.warn("setUp() %s", self.id())
self.file = tempfile.mktemp()
self.addr = []
self._pids = []
self._servers = []
+ self._conf_paths = []
+ self._caches = []
self._newAddr()
self.startServer()
def tearDown(self):
"""Try to cause the tests to halt"""
self.logger.warn("tearDown() %s", self.id())
+ for p in self._conf_paths:
+ os.remove(p)
if getattr(self, '_storage', None) is not None:
self._storage.close()
if hasattr(self._storage, 'cleanup'):
@@ -79,14 +104,28 @@
# Not in Windows Python until 2.3
for pid in self._pids:
os.waitpid(pid, 0)
- for i in 0, 1:
- path = "c1-test-%d.zec" % i
- if os.path.exists(path):
- try:
- os.unlink(path)
- except os.error:
- pass
- self.__super_tearDown()
+ for c in self._caches:
+ for i in 0, 1:
+ path = "c1-%s-%d.zec" % (c, i)
+ # On Windows before 2.3, we don't have a way to wait for
+ # the spawned server(s) to close, and they inherited
+ # file descriptors for our open files. So long as those
+ # processes are alive, we can't delete the files. Try
+ # a few times then give up.
+ need_to_delete = 0
+ if os.path.exists(path):
+ need_to_delete = 1
+ for dummy in range(5):
+ try:
+ os.unlink(path)
+ except:
+ time.sleep(0.5)
+ else:
+ need_to_delete = 0
+ break
+ if need_to_delete:
+ os.unlink(path) # sometimes this is just gonna fail
+ super(CommonSetupTearDown, self).tearDown()
def _newAddr(self):
self.addr.append(self._getAddr())
@@ -95,38 +134,51 @@
# port+1 is also used, so only draw even port numbers
return 'localhost', random.randrange(25000, 30000, 2)
+ def getConfig(self, path, create, read_only):
+ raise NotImplementedError
+
def openClientStorage(self, cache='', cache_size=200000, wait=True,
read_only=False, read_only_fallback=False,
- addr=None):
- if addr is None:
- addr = self.addr
- storage = TestClientStorage(addr,
+ username=None, password=None, realm=None):
+ self._caches.append(cache)
+ storage = TestClientStorage(self.addr,
client=cache,
cache_size=cache_size,
wait=wait,
min_disconnect_poll=0.1,
read_only=read_only,
- read_only_fallback=read_only_fallback)
+ read_only_fallback=read_only_fallback,
+ username=username,
+ password=password,
+ realm=realm)
storage.registerDB(DummyDB())
return storage
- # The start_zeo_server() function attempts to connect to the new
- # server process once a second. After forker_admin_retries attempts,
- # it fails with an error.
- forker_admin_retries = 10
-
- # Concrete test classes must provide a getConfig() method
+ def getServerConfig(self, addr, ro_svr):
+ zconf = forker.ZEOConfig(addr)
+ if ro_svr:
+ zconf.read_only = 1
+ if self.monitor:
+ zconf.monitor_address = ("", 42000)
+ if self.invq:
+ zconf.invalidation_queue_size = self.invq
+ if self.timeout:
+ zconf.transaction_timeout = self.timeout
+ return zconf
def startServer(self, create=True, index=0, read_only=False, ro_svr=False,
- keep=False):
+ keep=None):
addr = self.addr[index]
self.logger.warn("startServer(create=%d, index=%d, read_only=%d) @ %s",
create, index, read_only, addr)
path = "%s.%d" % (self.file, index)
- conf = self.getConfig(path, create, read_only)
- zeoport, adminaddr, pid = forker.start_zeo_server(
- conf, addr, ro_svr,
- self.monitor, self.keep, self.invq, self.timeout)
+ sconf = self.getConfig(path, create, read_only)
+ zconf = self.getServerConfig(addr, ro_svr)
+ if keep is None:
+ keep = self.keep
+ zeoport, adminaddr, pid, path = forker.start_zeo_server(
+ sconf, zconf, addr[1], keep)
+ self._conf_paths.append(path)
self._pids.append(pid)
self._servers.append(adminaddr)
@@ -168,7 +220,7 @@
start and stop a ZEO storage server.
"""
- def checkMultipleAddresses(self):
+ def testMultipleAddresses(self):
for i in range(4):
self._newAddr()
self._storage = self.openClientStorage('test', 100000)
@@ -177,7 +229,7 @@
self._dostore(oid, data=obj)
self._storage.close()
- def checkMultipleServers(self):
+ def testMultipleServers(self):
# XXX crude test at first -- just start two servers and do a
# commit at each one.
@@ -191,15 +243,18 @@
# If we can still store after shutting down one of the
# servers, we must be reconnecting to the other server.
+ did_a_store = False
for i in range(10):
try:
self._dostore()
+ did_a_store = True
break
except ClientDisconnected:
time.sleep(0.5)
self._storage.sync()
+ self.assert_(did_a_store)
- def checkReadOnlyClient(self):
+ def testReadOnlyClient(self):
# Open a read-only client to a read-write server; stores fail
# Start a read-only client for a read-write server
@@ -207,7 +262,7 @@
# Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore)
- def checkReadOnlyServer(self):
+ def testReadOnlyServer(self):
# Open a read-only client to a read-only *server*; stores fail
# We don't want the read-write server created by setUp()
@@ -220,7 +275,7 @@
# Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore)
- def checkReadOnlyFallbackWritable(self):
+ def testReadOnlyFallbackWritable(self):
# Open a fallback client to a read-write server; stores succeed
# Start a read-only-fallback client for a read-write server
@@ -228,7 +283,7 @@
# Stores should succeed here
self._dostore()
- def checkReadOnlyFallbackReadOnlyServer(self):
+ def testReadOnlyFallbackReadOnlyServer(self):
# Open a fallback client to a read-only *server*; stores fail
# We don't want the read-write server created by setUp()
@@ -245,7 +300,7 @@
# further down. Is the code here hopelessly naive, or is
# checkReconnection() overwrought?
- def checkReconnectWritable(self):
+ def testReconnectWritable(self):
# A read-write client reconnects to a read-write server
# Start a client
@@ -268,7 +323,7 @@
# Stores should succeed here
self._dostore()
- def checkDisconnectionError(self):
+ def testDisconnectionError(self):
# Make sure we get a ClientDisconnected when we try to read an
# object when we're not connected to a storage server and the
# object is not in the cache.
@@ -277,7 +332,7 @@
self.assertRaises(ClientDisconnected,
self._storage.load, 'fredwash', '')
- def checkDisconnectedAbort(self):
+ def testDisconnectedAbort(self):
self._storage = self.openClientStorage()
self._dostore()
oids = [self._storage.newObjectId() for i in range(5)]
@@ -293,7 +348,7 @@
self._storage._wait()
self._dostore()
- def checkBasicPersistence(self):
+ def testBasicPersistence(self):
# Verify cached data persists across client storage instances.
# To verify that the cache is being used, the test closes the
@@ -312,7 +367,7 @@
self.assertEqual(revid1, revid2)
self._storage.close()
- def checkRollover(self):
+ def testRollover(self):
# Check that the cache works when the files are swapped.
# In this case, only one object fits in a cache file. When the
@@ -331,7 +386,7 @@
self._storage.load(oid1, '')
self._storage.load(oid2, '')
- def checkReconnection(self):
+ def testReconnection(self):
# Check that the client reconnects when a server restarts.
# XXX Seem to get occasional errors that look like this:
@@ -365,11 +420,11 @@
self.fail("Could not reconnect to server")
self.logger.warn("checkReconnection: finished")
- def checkBadMessage1(self):
+ def testBadMessage1(self):
# not even close to a real message
self._bad_message("salty")
- def checkBadMessage2(self):
+ def testBadMessage2(self):
# just like a real message, but with an unpicklable argument
global Hack
class Hack:
@@ -403,49 +458,44 @@
self._storage = self.openClientStorage()
self._dostore()
- # Test case for multiple storages participating in a single
- # transaction. This is not really a connection test, but it needs
- # about the same infrastructure (several storage servers).
-
- # XXX WARNING: with the current ZEO code, this occasionally fails.
- # That's the point of this test. :-)
-
- def NOcheckMultiStorageTransaction(self):
- # Configuration parameters (larger values mean more likely deadlocks)
- N = 2
- # These don't *have* to be all the same, but it's convenient this way
- self.nservers = N
- self.nthreads = N
- self.ntrans = N
- self.nobj = N
+ def testCrossDBInvalidations(self):
+ db1 = DB(self.openClientStorage())
+ c1 = db1.open()
+ r1 = c1.root()
- # Start extra servers
- for i in range(1, self.nservers):
- self._newAddr()
- self.startServer(index=i)
+ r1["a"] = MinPO("a")
+ get_transaction().commit()
- # Spawn threads that each do some transactions on all storages
- threads = []
- try:
- for i in range(self.nthreads):
- t = MSTThread(self, "T%d" % i)
- threads.append(t)
- t.start()
- # Wait for all threads to finish
- for t in threads:
- t.join(60)
- self.failIf(t.isAlive(), "%s didn't die" % t.getName())
- finally:
- for t in threads:
- t.closeclients()
+ db2 = DB(self.openClientStorage())
+ r2 = db2.open().root()
+
+ self.assertEqual(r2["a"].value, "a")
+
+ r2["b"] = MinPO("b")
+ get_transaction().commit()
+
+ # make sure the invalidation is received in the other client
+ for i in range(10):
+ c1._storage.sync()
+ if r1._p_oid in c1._invalidated:
+ break
+ time.sleep(0.1)
+ self.assert_(r1._p_oid in c1._invalidated)
+
+ # force the invalidations to be applied...
+ c1.sync()
+ r1.keys() # unghostify
+ self.assertEqual(r1._p_serial, r2._p_serial)
+ db2.close()
+ db1.close()
class ReconnectionTests(CommonSetupTearDown):
keep = True
forker_admin_retries = 20
invq = 2
- def checkReadOnlyStorage(self):
+ def testReadOnlyStorage(self):
# Open a read-only client to a read-only *storage*; stores fail
# We don't want the read-write server created by setUp()
@@ -458,7 +508,7 @@
# Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore)
- def checkReadOnlyFallbackReadOnlyStorage(self):
+ def testReadOnlyFallbackReadOnlyStorage(self):
# Open a fallback client to a read-only *storage*; stores fail
# We don't want the read-write server created by setUp()
@@ -471,7 +521,7 @@
# Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore)
- def checkReconnectReadOnly(self):
+ def testReconnectReadOnly(self):
# A read-only client reconnects from a read-write to a
# read-only server
@@ -495,7 +545,7 @@
# Stores should still fail
self.assertRaises(ReadOnlyError, self._dostore)
- def checkReconnectFallback(self):
+ def testReconnectFallback(self):
# A fallback client reconnects from a read-write to a
# read-only server
@@ -519,7 +569,7 @@
# Stores should fail here
self.assertRaises(ReadOnlyError, self._dostore)
- def checkReconnectUpgrade(self):
+ def testReconnectUpgrade(self):
# A fallback client reconnects from a read-only to a
# read-write server
@@ -548,7 +598,7 @@
# Stores should now succeed
self._dostore()
- def checkReconnectSwitch(self):
+ def testReconnectSwitch(self):
# A fallback client initially connects to a read-only server,
# then discovers a read-write server and switches to that
@@ -577,7 +627,7 @@
else:
self.fail("Couldn't store after starting a read-write server")
- def checkNoVerificationOnServerRestart(self):
+ def testNoVerificationOnServerRestart(self):
self._storage = self.openClientStorage()
# When we create a new storage, it should always do a full
# verification
@@ -592,7 +642,7 @@
# should be needed.
self.assertEqual(self._storage.verify_result, "no verification")
- def checkNoVerificationOnServerRestartWith2Clients(self):
+ def testNoVerificationOnServerRestartWith2Clients(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
@@ -622,7 +672,7 @@
self.assertEqual(perstorage.verify_result, "no verification")
perstorage.close()
- def checkQuickVerificationWith2Clients(self):
+ def testQuickVerificationWith2Clients(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
@@ -646,10 +696,9 @@
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
+ perstorage.close()
-
-
- def checkVerificationWith2ClientsInvqOverflow(self):
+ def testVerificationWith2ClientsInvqOverflow(self):
perstorage = self.openClientStorage(cache="test")
self.assertEqual(perstorage.verify_result, "full verification")
@@ -681,13 +730,12 @@
self.assertEqual(self._storage.load(oid, '')[1], revid)
self.assertEqual(perstorage.load(oid, ''),
self._storage.load(oid, ''))
-
perstorage.close()
class TimeoutTests(CommonSetupTearDown):
timeout = 1
- def checkTimeout(self):
+ def testTimeout(self):
storage = self.openClientStorage()
txn = Transaction()
storage.tpcBegin(txn)
@@ -695,14 +743,14 @@
time.sleep(2)
self.assertRaises(ClientDisconnected, storage.tpcFinish, txn)
- def checkTimeoutOnAbort(self):
+ def testTimeoutOnAbort(self):
storage = self.openClientStorage()
txn = Transaction()
storage.tpcBegin(txn)
storage.tpcVote(txn)
storage.tpcAbort(txn)
- def checkTimeoutOnAbortNoLock(self):
+ def testTimeoutOnAbortNoLock(self):
storage = self.openClientStorage()
txn = Transaction()
storage.tpcBegin(txn)
=== Zope3/src/zodb/zeo/tests/forker.py 1.5 => 1.5.26.1 ===
--- Zope3/src/zodb/zeo/tests/forker.py:1.5 Tue Feb 25 13:55:05 2003
+++ Zope3/src/zodb/zeo/tests/forker.py Sun Jun 22 10:22:32 2003
@@ -16,104 +16,103 @@
import os
import sys
import time
+import logging
import errno
import random
import socket
+import StringIO
import tempfile
-import traceback
-import logging
-
-# Change value of PROFILE to enable server-side profiling
-PROFILE = False
-if PROFILE:
- import hotshot
-
-
-def get_port():
- """Return a port that is not in use.
- Checks if a port is in use by trying to connect to it. Assumes it
- is not in use if connect raises an exception.
-
- Raises RuntimeError after 10 tries.
- """
- for i in range(10):
- port = random.randrange(20000, 30000)
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- try:
- s.connect(('localhost', port))
- except socket.error:
- # XXX check value of error?
- return port
- finally:
- s.close()
- raise RuntimeError, "Can't find port"
+class ZEOConfig:
+ """Class to generate ZEO configuration file. """
+ def __init__(self, addr):
+ self.address = addr
+ self.read_only = None
+ self.invalidation_queue_size = None
+ self.monitor_address = None
+ self.transaction_timeout = None
+ self.authentication_protocol = None
+ self.authentication_database = None
+ self.authentication_realm = None
+
+ def dump(self, f):
+ print >> f, "<zeo>"
+ print >> f, "address %s:%s" % self.address
+ if self.read_only is not None:
+ print >> f, "read-only", self.read_only and "true" or "false"
+ if self.invalidation_queue_size is not None:
+ print >> f, "invalidation-queue-size", self.invalidation_queue_size
+ if self.monitor_address is not None:
+ print >> f, "monitor-address %s:%s" % self.monitor_address
+ if self.transaction_timeout is not None:
+ print >> f, "transaction-timeout", self.transaction_timeout
+ if self.authentication_protocol is not None:
+ print >> f, "authentication-protocol", self.authentication_protocol
+ if self.authentication_database is not None:
+ print >> f, "authentication-database", self.authentication_database
+ if self.authentication_realm is not None:
+ print >> f, "authentication-realm", self.authentication_realm
+ print >> f, "</zeo>"
+
+ def __str__(self):
+ f = StringIO.StringIO()
+ self.dump(f)
+ return f.getvalue()
-def start_zeo_server(conf, addr=None, ro_svr=False, monitor=False, keep=False,
- invq=None, timeout=None):
+def start_zeo_server(storage_conf, zeo_conf, port, keep=0):
"""Start a ZEO server in a separate process.
- Returns the ZEO port, the test server port, and the pid.
+ Takes two positional arguments a string containing the storage conf
+ and a ZEOConfig object.
+
+ Returns the ZEO port, the test server port, the pid, and the path
+ to the config file.
"""
+
# Store the config info in a temp file.
- tmpfile = tempfile.mktemp()
+ tmpfile = tempfile.mktemp(".conf")
fp = open(tmpfile, 'w')
- fp.write(conf)
+ zeo_conf.dump(fp)
+ fp.write(storage_conf)
fp.close()
- # Create the server
+
+ # Find the zeoserver script
import zodb.zeo.tests.zeoserver
- if addr is None:
- port = get_port()
- else:
- port = addr[1]
script = zodb.zeo.tests.zeoserver.__file__
if script.endswith('.pyc'):
script = script[:-1]
+
# Create a list of arguments, which we'll tuplify below
qa = _quote_arg
args = [qa(sys.executable), qa(script), '-C', qa(tmpfile)]
- if ro_svr:
- args.append('-r')
if keep:
- args.append('-k')
- if invq:
- args += ['-Q', str(invq)]
- if timeout:
- args += ['-T', str(timeout)]
- if monitor:
- # XXX Is it safe to reuse the port?
- args += ['-m', '42000']
- args.append(str(port))
+ args.append("-k")
d = os.environ.copy()
d['PYTHONPATH'] = os.pathsep.join(sys.path)
pid = os.spawnve(os.P_NOWAIT, sys.executable, tuple(args), d)
- adminaddr = ('localhost', port+1)
- # We need to wait until the server starts, but not forever.
- # Always do a sleep as the first thing, since we don't expect
- # the spawned process to get started right away.
- delay = 0.25
- for i in range(10):
- time.sleep(delay)
- logging.debug('forker: connect %s', i)
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ adminaddr = ('localhost', port + 1)
+ logger = logging.getLogger("forker")
+ # We need to wait until the server starts, but not forever
+ for i in range(20):
+ time.sleep(0.25)
try:
+ logger.debug("connect %s", i)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(adminaddr)
ack = s.recv(1024)
+ s.close()
+ logger.debug("acked: %s", ack)
+ break
except socket.error, e:
if e[0] not in (errno.ECONNREFUSED, errno.ECONNRESET):
raise
- logging.debug('forker: failed %s' % i)
s.close()
- else:
- logging.debug('forker: acked: %s', ack)
- s.close()
- break
else:
- logging.debug('forker: boo hoo')
+ logger.debug("boo foo")
raise
- return ('localhost', port), adminaddr, pid
+ return ('localhost', port), adminaddr, pid, tmpfile
+
if sys.platform[:3].lower() == "win":
def _quote_arg(s):
@@ -122,6 +121,7 @@
def _quote_arg(s):
return s
+
def shutdown_zeo_server(adminaddr):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(adminaddr)
@@ -130,5 +130,5 @@
except socket.error, e:
if e[0] <> errno.ECONNRESET: raise
ack = 'no ack received'
- logging.debug('shutdownServer: acked: %s', ack)
+ logging.getLogger("forker").debug("shutdown server acked: %s", ack)
s.close()
=== Zope3/src/zodb/zeo/tests/test_cache.py 1.3 => 1.3.26.1 ===
--- Zope3/src/zodb/zeo/tests/test_cache.py:1.3 Tue Feb 25 13:55:05 2003
+++ Zope3/src/zodb/zeo/tests/test_cache.py Sun Jun 22 10:22:32 2003
@@ -26,109 +26,108 @@
class ClientCacheTests(unittest.TestCase):
+ _oid = 'abcdefgh'
+ _oid2 = 'bcdefghi'
+ _oid3 = 'cdefghij'
+
def setUp(self):
- unittest.TestCase.setUp(self)
self.cachesize = 10*1000*1000
self.cache = ClientCache(size=self.cachesize)
self.cache.open()
def tearDown(self):
self.cache.close()
- unittest.TestCase.tearDown(self)
+ for i in 0, 1:
+ path = "c1--%d.zec" % i
+ if os.path.exists(path):
+ os.remove(path)
def testOpenClose(self):
pass # All the work is done by setUp() / tearDown()
def testStoreLoad(self):
cache = self.cache
- oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
- cache.store(oid, data, serial, '', '', '')
- loaded = cache.load(oid, '')
+ cache.store(self._oid, data, serial, '', '', '')
+ loaded = cache.load(self._oid, '')
self.assertEqual(loaded, (data, serial))
def testMissingLoad(self):
cache = self.cache
- oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
- cache.store(oid, data, serial, '', '', '')
+ cache.store(self._oid, data, serial, '', '', '')
loaded = cache.load('garbage1', '')
self.assertEqual(loaded, None)
def testInvalidate(self):
cache = self.cache
- oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
- cache.store(oid, data, serial, '', '', '')
- loaded = cache.load(oid, '')
+ cache.store(self._oid, data, serial, '', '', '')
+ loaded = cache.load(self._oid, '')
self.assertEqual(loaded, (data, serial))
- cache.invalidate(oid, '')
- loaded = cache.load(oid, '')
+ cache.invalidate(self._oid, '')
+ loaded = cache.load(self._oid, '')
self.assertEqual(loaded, None)
def testVersion(self):
cache = self.cache
- oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
vname = 'myversion'
vdata = '5678'*200
vserial = 'IJKLMNOP'
- cache.store(oid, data, serial, vname, vdata, vserial)
- loaded = cache.load(oid, '')
+ cache.store(self._oid, data, serial, vname, vdata, vserial)
+ loaded = cache.load(self._oid, '')
self.assertEqual(loaded, (data, serial))
- vloaded = cache.load(oid, vname)
+ vloaded = cache.load(self._oid, vname)
self.assertEqual(vloaded, (vdata, vserial))
def testVersionOnly(self):
cache = self.cache
- oid = 'abcdefgh'
data = ''
serial = ''
vname = 'myversion'
vdata = '5678'*200
vserial = 'IJKLMNOP'
- cache.store(oid, data, serial, vname, vdata, vserial)
- loaded = cache.load(oid, '')
+ cache.store(self._oid, data, serial, vname, vdata, vserial)
+ loaded = cache.load(self._oid, '')
self.assertEqual(loaded, None)
- vloaded = cache.load(oid, vname)
+ vloaded = cache.load(self._oid, vname)
self.assertEqual(vloaded, (vdata, vserial))
def testInvalidateNonVersion(self):
cache = self.cache
- oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
vname = 'myversion'
vdata = '5678'*200
vserial = 'IJKLMNOP'
- cache.store(oid, data, serial, vname, vdata, vserial)
- loaded = cache.load(oid, '')
+ cache.store(self._oid, data, serial, vname, vdata, vserial)
+ loaded = cache.load(self._oid, '')
self.assertEqual(loaded, (data, serial))
- vloaded = cache.load(oid, vname)
+ vloaded = cache.load(self._oid, vname)
self.assertEqual(vloaded, (vdata, vserial))
- cache.invalidate(oid, '')
- loaded = cache.load(oid, '')
+ cache.invalidate(self._oid, '')
+ loaded = cache.load(self._oid, '')
self.assertEqual(loaded, None)
# The version data is also invalidated at this point
- vloaded = cache.load(oid, vname)
+ vloaded = cache.load(self._oid, vname)
self.assertEqual(vloaded, None)
def testInvalidateVersion(self):
# Invalidating a version should not invalidate the non-version data.
# (This tests for the same bug as testInvalidatePersists below.)
cache = self.cache
- oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
- cache.store(oid, data, serial, '', '', '')
- loaded = cache.load(oid, '')
+ cache.store(self._oid, data, serial, '', '', '')
+ loaded = cache.load(self._oid, '')
self.assertEqual(loaded, (data, serial))
- cache.invalidate(oid, 'bogus')
- loaded = cache.load(oid, '')
+ cache.invalidate(self._oid, 'bogus')
+ loaded = cache.load(self._oid, '')
self.assertEqual(loaded, (data, serial))
def testVerify(self):
@@ -138,30 +137,27 @@
results.append((oid, serial, vserial))
cache.verify(verifier)
self.assertEqual(results, [])
- oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
- cache.store(oid, data, serial, '', '', '')
+ cache.store(self._oid, data, serial, '', '', '')
results = []
cache.verify(verifier)
- self.assertEqual(results, [(oid, serial, None)])
+ self.assertEqual(results, [(self._oid, serial, None)])
def testCheckSize(self):
# Make sure that cache._index[oid] is erased for oids that are
# stored in the cache file that's rewritten after a flip.
cache = self.cache
- oid = 'abcdefgh'
data = '1234'*100
serial = 'ABCDEFGH'
- cache.store(oid, data, serial, '', '', '')
+ cache.store(self._oid, data, serial, '', '', '')
cache.checkSize(10*self.cachesize) # Force a file flip
- oid2 = 'abcdefgz'
data2 = '1234'*10
serial2 = 'ABCDEFGZ'
- cache.store(oid2, data2, serial2, '', '', '')
+ cache.store(self._oid2, data2, serial2, '', '', '')
cache.checkSize(10*self.cachesize) # Force another file flip
- self.assertNotEqual(cache._index.get(oid2), None)
- self.assertEqual(cache._index.get(oid), None)
+ self.assertNotEqual(cache._index.get(self._oid2), None)
+ self.assertEqual(cache._index.get(self._oid), None)
def testCopyToCurrent(self):
# - write some objects to cache file 0
@@ -177,91 +173,141 @@
cache = self.cache
# Create some objects
- oid1 = 'abcdefgh'
data1 = '1234' * 100
serial1 = 'ABCDEFGH'
- oid2 = 'bcdefghi'
data2 = '2345' * 200
serial2 = 'BCDEFGHI'
version2 = 'myversion'
nonversion = 'nada'
vdata2 = '5432' * 250
vserial2 = 'IHGFEDCB'
- oid3 = 'cdefghij'
data3 = '3456' * 300
serial3 = 'CDEFGHIJ'
# Store them in the cache
- cache.store(oid1, data1, serial1, '', '', '')
- cache.store(oid2, data2, serial2, version2, vdata2, vserial2)
- cache.store(oid3, data3, serial3, '', '', '')
+ cache.store(self._oid, data1, serial1, '', '', '')
+ cache.store(self._oid2, data2, serial2, version2, vdata2, vserial2)
+ cache.store(self._oid3, data3, serial3, '', '', '')
# Verify that they are in file 0
- self.assert_(None is not cache._index.get(oid1) > 0)
- self.assert_(None is not cache._index.get(oid2) > 0)
- self.assert_(None is not cache._index.get(oid3) > 0)
+ self.assert_(None is not cache._index.get(self._oid) > 0)
+ self.assert_(None is not cache._index.get(self._oid2) > 0)
+ self.assert_(None is not cache._index.get(self._oid3) > 0)
# Load them and verify that the loads return correct data
- self.assertEqual(cache.load(oid1, ''), (data1, serial1))
- self.assertEqual(cache.load(oid2, ''), (data2, serial2))
- self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
- self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
- self.assertEqual(cache.load(oid3, ''), (data3, serial3))
+ self.assertEqual(cache.load(self._oid, ''), (data1, serial1))
+ self.assertEqual(cache.load(self._oid2, ''), (data2, serial2))
+ self.assertEqual(cache.load(self._oid2, nonversion), (data2, serial2))
+ self.assertEqual(cache.load(self._oid2, version2), (vdata2, vserial2))
+ self.assertEqual(cache.load(self._oid3, ''), (data3, serial3))
# Verify that they are still in file 0
- self.assert_(None is not cache._index.get(oid1) > 0)
- self.assert_(None is not cache._index.get(oid2) > 0)
- self.assert_(None is not cache._index.get(oid3) > 0)
+ self.assert_(None is not cache._index.get(self._oid) > 0)
+ self.assert_(None is not cache._index.get(self._oid2) > 0)
+ self.assert_(None is not cache._index.get(self._oid3) > 0)
# Cause a cache flip
cache.checkSize(10*self.cachesize)
# Load o1, o2, o4 again and verify that the loads return correct data
- self.assertEqual(cache.load(oid1, ''), (data1, serial1))
- self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
- self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
- self.assertEqual(cache.load(oid2, ''), (data2, serial2))
+ self.assertEqual(cache.load(self._oid, ''), (data1, serial1))
+ self.assertEqual(cache.load(self._oid2, version2), (vdata2, vserial2))
+ self.assertEqual(cache.load(self._oid2, nonversion), (data2, serial2))
+ self.assertEqual(cache.load(self._oid2, ''), (data2, serial2))
# Verify that o1, o2, 04 are now in file 1, o3 still in file 0
- self.assert_(None is not cache._index.get(oid1) < 0)
- self.assert_(None is not cache._index.get(oid2) < 0)
- self.assert_(None is not cache._index.get(oid3) > 0)
+ self.assert_(None is not cache._index.get(self._oid) < 0)
+ self.assert_(None is not cache._index.get(self._oid2) < 0)
+ self.assert_(None is not cache._index.get(self._oid3) > 0)
# Cause another cache flip
cache.checkSize(10*self.cachesize)
# Load o1 and o2 again and verify that the loads return correct data
- self.assertEqual(cache.load(oid1, ''), (data1, serial1))
- self.assertEqual(cache.load(oid2, nonversion), (data2, serial2))
- self.assertEqual(cache.load(oid2, version2), (vdata2, vserial2))
- self.assertEqual(cache.load(oid2, ''), (data2, serial2))
+ self.assertEqual(cache.load(self._oid, ''), (data1, serial1))
+ self.assertEqual(cache.load(self._oid2, nonversion), (data2, serial2))
+ self.assertEqual(cache.load(self._oid2, version2), (vdata2, vserial2))
+ self.assertEqual(cache.load(self._oid2, ''), (data2, serial2))
# Verify that o1 and o2 are now back in file 0, o3 is lost
- self.assert_(None is not cache._index.get(oid1) > 0)
- self.assert_(None is not cache._index.get(oid2) > 0)
- self.assert_(None is cache._index.get(oid3))
+ self.assert_(None is not cache._index.get(self._oid) > 0)
+ self.assert_(None is not cache._index.get(self._oid2) > 0)
+ self.assert_(None is cache._index.get(self._oid3))
# Invalidate version data for o2
- cache.invalidate(oid2, nonversion)
- self.assertEqual(cache.load(oid2, ''), (data2, serial2))
- self.assertEqual(cache.load(oid2, nonversion), None)
- self.assertEqual(cache.load(oid2, version2), None)
+ cache.invalidate(self._oid2, nonversion)
+ self.assertEqual(cache.load(self._oid2, ''), (data2, serial2))
+ self.assertEqual(cache.load(self._oid2, nonversion), None)
+ self.assertEqual(cache.load(self._oid2, version2), None)
# Cause another cache flip
cache.checkSize(10*self.cachesize)
# Load o1 and o2 again and verify that the loads return correct data
- self.assertEqual(cache.load(oid1, ''), (data1, serial1))
- self.assertEqual(cache.load(oid2, version2), None)
- self.assertEqual(cache.load(oid2, nonversion), None)
- self.assertEqual(cache.load(oid2, ''), (data2, serial2))
+ self.assertEqual(cache.load(self._oid, ''), (data1, serial1))
+ self.assertEqual(cache.load(self._oid2, version2), None)
+ self.assertEqual(cache.load(self._oid2, nonversion), None)
+ self.assertEqual(cache.load(self._oid2, ''), (data2, serial2))
# Verify that o1 and o2 are now in file 1
- self.assert_(None is not cache._index.get(oid1) < 0)
- self.assert_(None is not cache._index.get(oid2) < 0)
+ self.assert_(None is not cache._index.get(self._oid) < 0)
+ self.assert_(None is not cache._index.get(self._oid2) < 0)
+
+ def testLastTid(self):
+ cache = self.cache
+ self.failUnless(cache.getLastTid() is None)
+ ltid = 'pqrstuvw'
+ cache.setLastTid(ltid)
+ self.assertEqual(cache.getLastTid(), ltid)
+ cache.checkSize(10*self.cachesize) # Force a file flip
+ self.assertEqual(cache.getLastTid(), ltid)
+ cache.setLastTid(None)
+ self.failUnless(cache.getLastTid() is None)
+ cache.checkSize(10*self.cachesize) # Force a file flip
+ self.failUnless(cache.getLastTid() is None)
+
+ def testLoadNonversionWithVersionInFlippedCache(self):
+ # This test provokes an error seen once in an unrelated test.
+ # The object is stored in the old cache file with version data,
+ # a load for non-version data occurs. The attempt to copy the
+ # non-version data to the new file fails.
+ nvdata = "Mend your speech a little, lest it may mar your fortunes."
+ nvserial = "12345678"
+ version = "folio"
+ vdata = "Mend your speech a little, lest you may mar your fortunes."
+ vserial = "12346789"
+
+ self.cache.store(self._oid, nvdata, nvserial, version, vdata, vserial)
+ self.cache.checkSize(10 * self.cachesize) # force a cache flip
+
+ for i in 1, 2: # check the we can load before and after copying
+ for xversion, xdata, xserial in [("", nvdata, nvserial),
+ (version, vdata, vserial)]:
+ data, serial = self.cache.load(self._oid, xversion)
+ self.assertEqual(data, xdata)
+ self.assertEqual(serial, xserial)
+
+ # now cause two more cache flips and make sure the data is still there
+ self.cache.store(self._oid2, "", "", "foo", "bar", "23456789")
+ self.cache.checkSize(10 * self.cachesize) # force a cache flip
+ self.cache.load(self._oid, "")
+ self.cache.store(self._oid3, "bar", "34567890", "", "", "")
+ self.cache.checkSize(10 * self.cachesize) # force a cache flip
+ self.cache.load(self._oid, "")
+
+ for i in 1, 2: # check the we can load before and after copying
+ for xversion, xdata, xserial in [("", nvdata, nvserial),
+ (version, vdata, vserial)]:
+ data, serial = self.cache.load(self._oid, xversion)
+ self.assertEqual(data, xdata)
+ self.assertEqual(serial, xserial)
class PersistentClientCacheTests(unittest.TestCase):
+ _oid = 'abcdefgh'
+ _oid2 = 'bcdefghi'
+ _oid3 = 'cdefghij'
+
def setUp(self):
unittest.TestCase.setUp(self)
self.vardir = os.getcwd() # Don't use /tmp, it's a security risk
@@ -309,18 +355,18 @@
# 'current' file when a persistent cache was opened.
cache = self.cache
self.assertEqual(cache._current, 0) # Check that file 0 is current
- oid = 'abcdefgh'
data = '1234'
serial = 'ABCDEFGH'
- cache.store(oid, data, serial, '', '', '')
+ cache.store(self._oid, data, serial, '', '', '')
+ cache.setLastTid(serial)
cache.checkSize(10*self.cachesize) # Force a file flip
self.assertEqual(cache._current, 1) # Check that the flip worked
- oid = 'abcdefgh'
data = '123'
serial = 'ABCDEFGZ'
- cache.store(oid, data, serial, '', '', '')
+ cache.store(self._oid, data, serial, '', '', '')
+ cache.setLastTid(serial)
cache = self.reopenCache()
- loaded = cache.load(oid, '')
+ loaded = cache.load(self._oid, '')
# Check that we got the most recent data:
self.assertEqual(loaded, (data, serial))
self.assertEqual(cache._current, 1) # Double check that 1 is current
@@ -334,23 +380,50 @@
cache = self.cache
magicsize = (ord('i') + 1) << 16
cache = self.cache
- oid = 'abcdefgh'
data = '!'*magicsize
serial = 'ABCDEFGH'
- cache.store(oid, data, serial, '', '', '')
- loaded = cache.load(oid, '')
+ cache.store(self._oid, data, serial, '', '', '')
+ loaded = cache.load(self._oid, '')
self.assertEqual(loaded, (data, serial))
- cache.invalidate(oid, '')
+ cache.invalidate(self._oid, '')
cache = self.reopenCache()
- loaded = cache.load(oid, '')
+ loaded = cache.load(self._oid, '')
if loaded != None:
self.fail("invalidated data resurrected, size %d, was %d" %
(len(loaded[0]), len(data)))
+ def testPersistentLastTid(self):
+ cache = self.cache
+ self.failUnless(cache.getLastTid() is None)
+ ltid = 'pqrstuvw'
+ cache.setLastTid(ltid)
+ self.assertEqual(cache.getLastTid(), ltid)
+ data = '1234'
+ serial = 'ABCDEFGH'
+ cache.store(self._oid, data, serial, '', '', '')
+ self.assertEqual(cache.getLastTid(), ltid)
+ cache.checkSize(10*self.cachesize) # Force a file flip
+ self.assertEqual(cache.getLastTid(), ltid)
+ cache = self.reopenCache()
+ self.assertEqual(cache.getLastTid(), ltid)
+ cache.setLastTid(None)
+ self.failUnless(cache.getLastTid() is None)
+ cache.checkSize(10*self.cachesize) # Force a file flip
+ self.failUnless(cache.getLastTid() is None)
+
+class ClientCacheLongOIDTests(ClientCacheTests):
+ _oid = 'abcdefghijklmnop' * 2
+ _oid2 = 'bcdefghijklmnopq' * 2
+ _oid3 = 'cdefghijklmnopqr' * 2
+
+class PersistentClientCacheLongOIDTests(PersistentClientCacheTests):
+ _oid = 'abcdefghijklmnop' * 2
+
def test_suite():
suite = unittest.TestSuite()
- suite.addTest(unittest.makeSuite(ClientCacheTests))
- suite.addTest(unittest.makeSuite(PersistentClientCacheTests))
+ for klass in (ClientCacheTests, PersistentClientCacheTests,
+ ClientCacheLongOIDTests, PersistentClientCacheLongOIDTests):
+ suite.addTest(unittest.makeSuite(klass))
return suite
if __name__ == '__main__':
=== Zope3/src/zodb/zeo/tests/test_conn.py 1.5 => 1.5.2.1 ===
--- Zope3/src/zodb/zeo/tests/test_conn.py:1.5 Fri May 16 17:49:36 2003
+++ Zope3/src/zodb/zeo/tests/test_conn.py Sun Jun 22 10:22:32 2003
@@ -1,6 +1,6 @@
##############################################################################
#
-# Copyright (c) 2001 Zope Corporation and Contributors.
+# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
@@ -19,59 +19,117 @@
import unittest
-from zodb.zeo.tests.connection import ConnectionTests, ReconnectionTests
+from zodb.zeo.tests.connection \
+ import ConnectionTests, ReconnectionTests, TimeoutTests
+from zodb.zeo.tests.invalid import InvalidationTests
from zodb.storage.base import berkeley_is_available
class FileStorageConfig:
def getConfig(self, path, create, read_only):
return """\
- <Storage>
- type FileStorage
- file_name %s
- create %s
- read_only %s
- </Storage>""" % (path,
- create and 'yes' or 'no',
- read_only and 'yes' or 'no')
+ <filestorage 1>
+ path %s
+ create %s
+ read-only %s
+ </filestorage>""" % (path,
+ create and 'yes' or 'no',
+ read_only and 'yes' or 'no')
class BerkeleyStorageConfig:
def getConfig(self, path, create, read_only):
- # Full always creates and doesn't have a read_only flag
return """\
- <Storage>
- type BDBFullStorage
- name %s
- read_only %s
- </Storage>""" % (path,
- read_only and 'yes' or 'no')
+ <fullstorage 1>
+ name %s
+ read-only %s
+ </fullstorage>""" % (path, read_only and "yes" or "no")
class MappingStorageConfig:
def getConfig(self, path, create, read_only):
- return """\
- <Storage>
- type MappingStorage
- name %s
- </Storage>""" % path
-
-tests = [
- (MappingStorageConfig, ConnectionTests, 1),
- (FileStorageConfig, ReconnectionTests, 1),
- (FileStorageConfig, ConnectionTests, 2),
- ]
-
+ return """<mappingstorage 1/>"""
+
+
+class FileStorageConnectionTests(
+ FileStorageConfig,
+ ConnectionTests,
+ InvalidationTests
+ ):
+ """FileStorage-specific connection tests."""
+ level = 2
+
+class FileStorageReconnectionTests(
+ FileStorageConfig,
+ ReconnectionTests
+ ):
+ """FileStorage-specific re-connection tests."""
+ # Run this at level 1 because MappingStorage can't do reconnection tests
+ level = 1
+
+class FileStorageTimeoutTests(
+ FileStorageConfig,
+ TimeoutTests
+ ):
+ level = 2
+
+
+class BDBConnectionTests(
+ BerkeleyStorageConfig,
+ ConnectionTests,
+ InvalidationTests
+ ):
+ """Berkeley storage connection tests."""
+ level = 2
+
+class BDBReconnectionTests(
+ BerkeleyStorageConfig,
+ ReconnectionTests
+ ):
+ """Berkeley storage re-connection tests."""
+ level = 2
+
+class BDBTimeoutTests(
+ BerkeleyStorageConfig,
+ TimeoutTests
+ ):
+ level = 2
+
+
+class MappingStorageConnectionTests(
+ MappingStorageConfig,
+ ConnectionTests
+ ):
+ """Mapping storage connection tests."""
+ level = 1
+
+# The ReconnectionTests can't work with MappingStorage because it's only an
+# in-memory storage and has no persistent state.
+
+class MappingStorageTimeoutTests(
+ MappingStorageConfig,
+ TimeoutTests
+ ):
+ level = 1
+
+
+
+test_classes = [FileStorageConnectionTests,
+ FileStorageReconnectionTests,
+ FileStorageTimeoutTests,
+ MappingStorageConnectionTests,
+ MappingStorageTimeoutTests]
+
if berkeley_is_available:
- tests += [
- (BerkeleyStorageConfig, ConnectionTests, 2),
- (BerkeleyStorageConfig, ReconnectionTests, 2),
- ]
+ test_classes.append(BDBConnectionTests)
+ test_classes.append(BDBReconnectionTests)
+ test_classes.append(BDBTimeoutTests)
+
def test_suite():
suite = unittest.TestSuite()
- for testclass, configclass, level in tests:
- # synthesize a concrete class combining tests and configuration
- name = "%s:%s" % (testclass.__name__, configclass.__name__)
- aclass = type.__new__(type, name, (configclass, testclass, object), {})
- aclass.level = level
- sub = unittest.makeSuite(aclass)
+ for klass in test_classes:
+ sub = unittest.makeSuite(klass)
suite.addTest(sub)
return suite
+
+
+if __name__ == "__main__":
+ unittest.main(defaultTest='test_suite')
=== Zope3/src/zodb/zeo/tests/test_zeo.py 1.12 => 1.12.2.1 ===
--- Zope3/src/zodb/zeo/tests/test_zeo.py:1.12 Fri May 16 18:53:47 2003
+++ Zope3/src/zodb/zeo/tests/test_zeo.py Sun Jun 22 10:22:32 2003
@@ -19,6 +19,7 @@
import time
import errno
import socket
+import random
import logging
import asyncore
import tempfile
@@ -40,6 +41,27 @@
from zodb.zeo.tests import commitlock, threadtests
from zodb.zeo.tests.common import TestClientStorage, DummyDB
+def get_port():
+ """Return a port that is not in use.
+
+ Checks if a port is in use by trying to connect to it. Assumes it
+ is not in use if connect raises an exception.
+
+ Raises RuntimeError after 10 tries.
+ """
+ for i in range(10):
+ port = random.randrange(20000, 30000)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ try:
+ s.connect(('localhost', port))
+ except socket.error:
+ # XXX check value of error?
+ return port
+ finally:
+ s.close()
+ raise RuntimeError, "Can't find port"
+
class DummyDB:
def invalidate(self, *args):
pass
@@ -102,8 +124,11 @@
logging.info("testZEO: setUp() %s", self.id())
config = self.getConfig()
for i in range(10):
+ port = get_port()
+ zconf = forker.ZEOConfig(('', port))
try:
- zeoport, adminaddr, pid = forker.start_zeo_server(config)
+ zport, adminaddr, pid, path = \
+ forker.start_zeo_server(config, zconf, port)
except socket.error, e:
if e[0] not in (errno.ECONNREFUSED, errno.ECONNRESET):
raise
@@ -113,14 +138,16 @@
raise
self._pids = [pid]
self._servers = [adminaddr]
- self._storage = TestClientStorage(zeoport, '1', cache_size=20000000,
- min_disconnect_poll=0.5, wait=1)
+ self._conf_path = path
+ self._storage = ClientStorage(zport, '1', cache_size=20000000,
+ min_disconnect_poll=0.5, wait=1)
self._storage.registerDB(DummyDB())
def tearDown(self):
# Clean up any transaction that might be left hanging around
get_transaction().abort()
self._storage.close()
+ os.remove(self._conf_path)
for server in self._servers:
forker.shutdown_zeo_server(server)
if hasattr(os, 'waitpid'):
@@ -162,7 +189,7 @@
def testTransactionalUndoIterator(self):
pass
-
+
def _iterate(self):
pass
@@ -174,11 +201,9 @@
def getConfig(self):
filename = self.__fs_base = tempfile.mktemp()
return """\
- <Storage>
- type FileStorage
- file_name %s
- create yes
- </Storage>
+ <filestorage 1>
+ path %s
+ </filestorage>
""" % filename
class BDBTests(UndoVersionStorageTests):
@@ -189,10 +214,9 @@
def getConfig(self):
self._envdir = tempfile.mktemp()
return """\
- <Storage>
- type BDBFullStorage
- name %s
- </Storage>
+ <fullstorage 1>
+ name %s
+ </fullstorage>
""" % self._envdir
# XXX These test seems to have massive failures when I run them.
@@ -207,13 +231,7 @@
class MappingStorageTests(StorageTests):
def getConfig(self):
- self._envdir = tempfile.mktemp()
- return """\
- <Storage>
- type MappingStorage
- name %s
- </Storage>
- """ % self._envdir
+ return """<mappingstorage 1/>"""
test_classes = [FileStorageTests, MappingStorageTests]
=== Zope3/src/zodb/zeo/tests/zeoserver.py 1.8 => 1.8.26.1 ===
--- Zope3/src/zodb/zeo/tests/zeoserver.py:1.8 Tue Feb 25 13:55:05 2003
+++ Zope3/src/zodb/zeo/tests/zeoserver.py Sun Jun 22 10:22:32 2003
@@ -28,12 +28,8 @@
from zodb import config
import zodb.zeo.server
from zodb.zeo import threadedasync
-
-def load_storage(fp):
- context = ZConfig.Context.Context()
- rootconf = context.loadFile(fp)
- storageconf = rootconf.getSection('Storage')
- return config.createStorage(storageconf)
+from zodb.zeo.runzeo import ZEOOptions
+from zodb.zeo.server import StorageServer
class ZEOTestServer(asyncore.dispatcher):
"""A server for killing the whole process at the end of a test.
@@ -123,56 +119,58 @@
# We don't do much sanity checking of the arguments, since if we get it
# wrong, it's a bug in the test suite.
- ro_svr = False
- keep = False
+ keep = 0
configfile = None
- invalidation_queue_size = 100
- transaction_timeout = None
- monitor_address = None
# Parse the arguments and let getopt.error percolate
- opts, args = getopt.getopt(sys.argv[1:], 'rkC:Q:T:m:')
+ opts, args = getopt.getopt(sys.argv[1:], 'kC:')
for opt, arg in opts:
- if opt == '-r':
- ro_svr = True
- elif opt == '-k':
- keep = True
+ if opt == '-k':
+ keep = 1
elif opt == '-C':
configfile = arg
- elif opt == '-Q':
- invalidation_queue_size = int(arg)
- elif opt == '-T':
- transaction_timeout = int(arg)
- elif opt == '-m':
- monitor_address = '', int(arg)
+
+ zo = ZEOOptions()
+ zo.realize(["-C", configfile])
+ zeo_port = int(zo.address[1])
+
+ # XXX a hack
+ if zo.auth_protocol == "plaintext":
+ import zodb.zeo.tests.auth_plaintext
+
# Open the config file and let ZConfig parse the data there. Then remove
# the config file, otherwise we'll leave turds.
- fp = open(configfile, 'r')
- storage = load_storage(fp)
- fp.close()
- os.remove(configfile)
# The rest of the args are hostname, portnum
- zeo_port = int(args[0])
test_port = zeo_port + 1
- test_addr = ('', test_port)
- addr = ('', zeo_port)
- serv = zodb.zeo.server.StorageServer(
- addr, {'1': storage}, ro_svr,
- invalidation_queue_size=invalidation_queue_size,
- transaction_timeout=transaction_timeout,
- monitor_address=monitor_address)
+ test_addr = ('localhost', test_port)
+ addr = ('localhost', zeo_port)
+ logger.info('creating the storage server')
+ storage = zo.storages[0].open()
+ mon_addr = None
+ if zo.monitor_address:
+ mon_addr = zo.monitor_address.address
+ server = StorageServer(
+ zo.address,
+ {"1": storage},
+ read_only=zo.read_only,
+ invalidation_queue_size=zo.invalidation_queue_size,
+ transaction_timeout=zo.transaction_timeout,
+ monitor_address=mon_addr,
+ auth_protocol=zo.auth_protocol,
+ auth_filename=zo.auth_database,
+ auth_realm=zo.auth_realm)
+
try:
- logger.info('creating the test server, ro: %s, keep: %s',
- ro_svr, keep)
- t = ZEOTestServer(test_addr, serv, keep)
+ logger.info('creating the test server, keep: %s', keep)
+ t = ZEOTestServer(test_addr, server, keep)
except socket.error, e:
if e[0] <> errno.EADDRINUSE: raise
logger.info('addr in use, closing and exiting')
storage.close()
storage.cleanup()
sys.exit(2)
- addr = ('', zeo_port)
+
logger.info('creating the storage server')
- t.register_socket(serv.dispatcher)
+ t.register_socket(server.dispatcher)
# Loop for socket events
logger.info('entering threadedasync loop')
threadedasync.loop()
=== Removed File Zope3/src/zodb/zeo/tests/thread.py ===