[Zodb-checkins] SVN: ZODB/branches/ctheune-bushy-directory-3.8/ Removed merge of changes from 3.8 branch.
Jim Fulton
jim at zope.com
Wed Aug 27 05:47:18 EDT 2008
Log message for revision 90416:
Removed merge of changes from 3.8 branch.
Changed:
U ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py
D ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt
U ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py
-=-
Modified: ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/NEWS.txt 2008-08-27 09:47:18 UTC (rev 90416)
@@ -4,15 +4,6 @@
Bugs Fixed:
-- (???) Fixed bug #251037: Made packing of blob storages non-blocking.
-
-- (beta 6) Fixed a bug that could cause InvalidObjectReference errors
- for objects that were explicitly added to a database if the object
- was modified after a savepoint that added the object.
-
-- (beta 5) Fixed several bugs that caused ZEO cache corruption when connecting
- to servers. These bugs affected both persistent and non-persistent caches.
-
- (beta 5) Improved the the ZEO client shutdown support to try to
avoid spurious errors on exit, especially for scripts, such as zeopack.
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/ClientStorage.py 2008-08-27 09:47:18 UTC (rev 90416)
@@ -251,6 +251,8 @@
# _is_read_only stores the constructor argument
self._is_read_only = read_only
+ # _conn_is_read_only stores the status of the current connection
+ self._conn_is_read_only = 0
self._storage = storage
self._read_only_fallback = read_only_fallback
self._username = username
@@ -338,6 +340,8 @@
else:
cache_path = None
self._cache = self.ClientCacheClass(cache_path, size=cache_size)
+ # TODO: maybe there's a better time to open the cache? Unclear.
+ self._cache.open()
self._rpc_mgr = self.ConnectionManagerClass(addr, self,
tmin=min_disconnect_poll,
@@ -378,18 +382,13 @@
def close(self):
"""Storage API: finalize the storage, releasing external resources."""
- if self._rpc_mgr is not None:
- self._rpc_mgr.close()
- self._rpc_mgr = None
- if self._connection is not None:
- self._connection.register_object(None) # Don't call me!
- self._connection.close()
- self._connection = None
-
self._tbuf.close()
if self._cache is not None:
self._cache.close()
self._cache = None
+ if self._rpc_mgr is not None:
+ self._rpc_mgr.close()
+ self._rpc_mgr = None
def registerDB(self, db):
"""Storage API: register a database for invalidation messages.
@@ -455,7 +454,7 @@
"""
log2("Testing connection %r" % conn)
# TODO: Should we check the protocol version here?
- conn._is_read_only = self._is_read_only
+ self._conn_is_read_only = 0
stub = self.StorageServerStubClass(conn)
auth = stub.getAuthProtocol()
@@ -477,7 +476,7 @@
raise
log2("Got ReadOnlyError; trying again with read_only=1")
stub.register(str(self._storage), read_only=1)
- conn._is_read_only = True
+ self._conn_is_read_only = 1
return 0
def notifyConnected(self, conn):
@@ -491,26 +490,24 @@
# this method before it was stopped.
return
+ # invalidate our db cache
+ if self._db is not None:
+ self._db.invalidateCache()
+ # TODO: report whether we get a read-only connection.
if self._connection is not None:
- # If we are upgrading from a read-only fallback connection,
- # we must close the old connection to prevent it from being
- # used while the cache is verified against the new connection.
- self._connection.register_object(None) # Don't call me!
- self._connection.close()
- self._connection = None
- self._ready.clear()
reconnect = 1
else:
reconnect = 0
-
self.set_server_addr(conn.get_addr())
+
+ # If we are upgrading from a read-only fallback connection,
+ # we must close the old connection to prevent it from being
+ # used while the cache is verified against the new connection.
+ if self._connection is not None:
+ self._connection.close()
self._connection = conn
- # invalidate our db cache
- if self._db is not None:
- self._db.invalidateCache()
-
if reconnect:
log2("Reconnected to storage: %s" % self._server_addr)
else:
@@ -564,6 +561,54 @@
else:
return '%s:%s' % (self._storage, self._server_addr)
+ def verify_cache(self, server):
+ """Internal routine called to verify the cache.
+
+ The return value (indicating which path we took) is used by
+ the test suite.
+ """
+
+ # If verify_cache() finishes the cache verification process,
+ # it should set self._server. If it goes through full cache
+ # verification, then endVerify() should self._server.
+
+ last_inval_tid = self._cache.getLastTid()
+ if last_inval_tid is not None:
+ ltid = server.lastTransaction()
+ if ltid == last_inval_tid:
+ log2("No verification necessary (last_inval_tid up-to-date)")
+ self._server = server
+ self._ready.set()
+ return "no verification"
+
+ # log some hints about last transaction
+ log2("last inval tid: %r %s\n"
+ % (last_inval_tid, tid2time(last_inval_tid)))
+ log2("last transaction: %r %s" %
+ (ltid, ltid and tid2time(ltid)))
+
+ pair = server.getInvalidations(last_inval_tid)
+ if pair is not None:
+ log2("Recovering %d invalidations" % len(pair[1]))
+ self.invalidateTransaction(*pair)
+ self._server = server
+ self._ready.set()
+ return "quick verification"
+
+ log2("Verifying cache")
+ # setup tempfile to hold zeoVerify results
+ self._tfile = tempfile.TemporaryFile(suffix=".inv")
+ self._pickler = cPickle.Pickler(self._tfile, 1)
+ self._pickler.fast = 1 # Don't use the memo
+
+ # TODO: should batch these operations for efficiency; would need
+ # to acquire lock ...
+ for oid, tid, version in self._cache.contents():
+ server.verify(oid, version, tid)
+ self._pending_server = server
+ server.endZeoVerify()
+ return "full verification"
+
### Is there a race condition between notifyConnected and
### notifyDisconnected? In Particular, what if we get
### notifyDisconnected in the middle of notifyConnected?
@@ -629,16 +674,12 @@
def isReadOnly(self):
"""Storage API: return whether we are in read-only mode."""
if self._is_read_only:
- return True
+ return 1
else:
# If the client is configured for a read-write connection
- # but has a read-only fallback connection, conn._is_read_only
- # will be True. If self._connection is None, we'll behave as
- # read_only
- try:
- return self._connection._is_read_only
- except AttributeError:
- return True
+ # but has a read-only fallback connection, _conn_is_read_only
+ # will be True.
+ return self._conn_is_read_only
def _check_trans(self, trans):
"""Internal helper to check a transaction argument for sanity."""
@@ -1111,7 +1152,7 @@
return
for oid, version, data in self._tbuf:
- self._cache.invalidate(oid, version, tid, False)
+ self._cache.invalidate(oid, version, tid)
# If data is None, we just invalidate.
if data is not None:
s = self._seriald[oid]
@@ -1169,6 +1210,8 @@
"""Storage API: return a sequence of versions in the storage."""
return self._server.versions(max)
+ # Below are methods invoked by the StorageServer
+
def serialnos(self, args):
"""Server callback to pass a list of changed (oid, serial) pairs."""
self._serials.extend(args)
@@ -1177,57 +1220,6 @@
"""Server callback to update the info dictionary."""
self._info.update(dict)
- def verify_cache(self, server):
- """Internal routine called to verify the cache.
-
- The return value (indicating which path we took) is used by
- the test suite.
- """
-
- self._pending_server = server
-
- # setup tempfile to hold zeoVerify results and interim
- # invalidation results
- self._tfile = tempfile.TemporaryFile(suffix=".inv")
- self._pickler = cPickle.Pickler(self._tfile, 1)
- self._pickler.fast = 1 # Don't use the memo
-
- # allow incoming invalidations:
- self._connection.register_object(self)
-
- # If verify_cache() finishes the cache verification process,
- # it should set self._server. If it goes through full cache
- # verification, then endVerify() should self._server.
-
- last_inval_tid = self._cache.getLastTid()
- if last_inval_tid is not None:
- ltid = server.lastTransaction()
- if ltid == last_inval_tid:
- log2("No verification necessary (last_inval_tid up-to-date)")
- self.finish_verification()
- return "no verification"
-
- # log some hints about last transaction
- log2("last inval tid: %r %s\n"
- % (last_inval_tid, tid2time(last_inval_tid)))
- log2("last transaction: %r %s" %
- (ltid, ltid and tid2time(ltid)))
-
- pair = server.getInvalidations(last_inval_tid)
- if pair is not None:
- log2("Recovering %d invalidations" % len(pair[1]))
- self.finish_verification(pair)
- return "quick verification"
-
- log2("Verifying cache")
-
- # TODO: should batch these operations for efficiency; would need
- # to acquire lock ...
- for oid, tid, version in self._cache.contents():
- server.verify(oid, version, tid)
- server.endZeoVerify()
- return "full verification"
-
def invalidateVerify(self, args):
"""Server callback to invalidate an (oid, version) pair.
@@ -1239,93 +1231,68 @@
# This should never happen. TODO: assert it doesn't, or log
# if it does.
return
- oid, version = args
- self._pickler.dump((oid, version, None))
+ self._pickler.dump(args)
- def endVerify(self):
- """Server callback to signal end of cache validation."""
+ def _process_invalidations(self, invs):
+ # Invalidations are sent by the ZEO server as a sequence of
+ # oid, version pairs. The DB's invalidate() method expects a
+ # dictionary of oids.
- log2("endVerify finishing")
- self.finish_verification()
- log2("endVerify finished")
-
- def finish_verification(self, catch_up=None):
self._lock.acquire()
try:
- if catch_up:
- # process catch-up invalidations
- tid, invalidations = catch_up
- self._process_invalidations(
- (oid, version, tid)
- for oid, version in invalidations
- )
-
- if self._pickler is None:
- return
- # write end-of-data marker
- self._pickler.dump((None, None, None))
- self._pickler = None
- self._tfile.seek(0)
- unpickler = cPickle.Unpickler(self._tfile)
- min_tid = self._cache.getLastTid()
- def InvalidationLogIterator():
- while 1:
- oid, version, tid = unpickler.load()
- if oid is None:
- break
- if ((tid is None)
- or (min_tid is None)
- or (tid > min_tid)
- ):
- yield oid, version, tid
+ # versions maps version names to dictionary of invalidations
+ versions = {}
+ for oid, version, tid in invs:
+ if oid == self._load_oid:
+ self._load_status = 0
+ self._cache.invalidate(oid, version, tid)
+ oids = versions.get((version, tid))
+ if not oids:
+ versions[(version, tid)] = [oid]
+ else:
+ oids.append(oid)
- self._process_invalidations(InvalidationLogIterator())
- self._tfile.close()
- self._tfile = None
+ if self._db is not None:
+ for (version, tid), d in versions.items():
+ self._db.invalidate(tid, d, version=version)
finally:
self._lock.release()
+ def endVerify(self):
+ """Server callback to signal end of cache validation."""
+ if self._pickler is None:
+ return
+ # write end-of-data marker
+ self._pickler.dump((None, None))
+ self._pickler = None
+ self._tfile.seek(0)
+ f = self._tfile
+ self._tfile = None
+ self._process_invalidations(InvalidationLogIterator(f))
+ f.close()
+
+ log2("endVerify finishing")
self._server = self._pending_server
self._ready.set()
- self._pending_server = None
+ self._pending_conn = None
+ log2("endVerify finished")
-
def invalidateTransaction(self, tid, args):
- """Server callback: Invalidate objects modified by tid."""
+ """Invalidate objects modified by tid."""
self._lock.acquire()
try:
- if self._pickler is not None:
- log2("Transactional invalidation during cache verification",
- level=BLATHER)
- for oid, version in args:
- self._pickler.dump((oid, version, tid))
- return
- self._process_invalidations([(oid, version, tid)
- for oid, version in args])
+ self._cache.setLastTid(tid)
finally:
self._lock.release()
+ if self._pickler is not None:
+ log2("Transactional invalidation during cache verification",
+ level=BLATHER)
+ for t in args:
+ self._pickler.dump(t)
+ return
+ self._process_invalidations([(oid, version, tid)
+ for oid, version in args])
- def _process_invalidations(self, invs):
- # Invalidations are sent by the ZEO server as a sequence of
- # oid, version, tid triples. The DB's invalidate() method expects a
- # dictionary of oids.
-
- # versions maps version names to dictionary of invalidations
- versions = {}
- for oid, version, tid in invs:
- if oid == self._load_oid:
- self._load_status = 0
- self._cache.invalidate(oid, version, tid)
- oids = versions.get((version, tid))
- if not oids:
- versions[(version, tid)] = [oid]
- else:
- oids.append(oid)
-
- if self._db is not None:
- for (version, tid), d in versions.items():
- self._db.invalidate(tid, d, version=version)
-
# The following are for compatibility with protocol version 2.0.0
def invalidateTrans(self, args):
@@ -1334,3 +1301,11 @@
invalidate = invalidateVerify
end = endVerify
Invalidate = invalidateTrans
+
+def InvalidationLogIterator(fileobj):
+ unpickler = cPickle.Unpickler(fileobj)
+ while 1:
+ oid, version = unpickler.load()
+ if oid is None:
+ break
+ yield oid, version, None
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/cache.py 2008-08-27 09:47:18 UTC (rev 90416)
@@ -30,7 +30,6 @@
import logging
import os
import tempfile
-import threading
import time
import ZODB.fsIndex
@@ -122,22 +121,7 @@
# to the end of the file that the new object can't fit in one
# contiguous chunk, currentofs is reset to ZEC3_HEADER_SIZE first.
-class locked(object):
- def __init__(self, func):
- self.func = func
-
- def __get__(self, inst, class_):
- if inst is None:
- return self
- def call(*args, **kw):
- inst._lock.acquire()
- try:
- return self.func(inst, *args, **kw)
- finally:
- inst._lock.release()
- return call
-
class ClientCache(object):
"""A simple in-memory cache."""
@@ -216,10 +200,6 @@
self._setup_trace(path)
- self.open()
-
- self._lock = threading.RLock()
-
# Backward compatibility. Client code used to have to use the fc
# attr to get to the file cache to get cache stats.
@property
@@ -373,7 +353,6 @@
# instance, and also written out near the start of the cache file. The
# new tid must be strictly greater than our current idea of the most
# recent tid.
- @locked
def setLastTid(self, tid):
if self.tid is not None and tid <= self.tid:
raise ValueError("new last tid (%s) must be greater than "
@@ -390,11 +369,10 @@
# @return a transaction id
# @defreturn string, or None if no transaction is yet known
def getLastTid(self):
- tid = self.tid
- if tid == z64:
+ if self.tid == z64:
return None
else:
- return tid
+ return self.tid
##
# Return the current data record for oid and version.
@@ -404,7 +382,6 @@
# in the cache
# @defreturn 3-tuple: (string, string, string)
- @locked
def load(self, oid, version=""):
ofs = self.current.get(oid)
if ofs is None:
@@ -437,7 +414,6 @@
# @return data record, serial number, start tid, and end tid
# @defreturn 4-tuple: (string, string, string, string)
- @locked
def loadBefore(self, oid, before_tid):
noncurrent_for_oid = self.noncurrent.get(u64(oid))
if noncurrent_for_oid is None:
@@ -479,7 +455,6 @@
# @defreturn string or None
# XXX This approac is wrong, but who cares
- @locked
def modifiedInVersion(self, oid):
ofs = self.current.get(oid)
if ofs is None:
@@ -507,7 +482,6 @@
# @param data the actual data
# @exception ValueError tried to store non-current version data
- @locked
def store(self, oid, version, start_tid, end_tid, data):
# It's hard for the client to avoid storing the same object
# more than once. One case is when the client requests
@@ -607,30 +581,14 @@
# data for `oid`, stop believing we have current data, and mark the
# data we had as being valid only up to `tid`. In all other cases, do
# nothing.
- #
- # Paramters:
- #
- # - oid object id
- # - version name of version to invalidate.
- # - tid the id of the transaction that wrote a new revision of oid,
+ # @param oid object id
+ # @param version name of version to invalidate.
+ # @param tid the id of the transaction that wrote a new revision of oid,
# or None to forget all cached info about oid (version, current
# revision, and non-current revisions)
- # - server_invalidation, a flag indicating whether the
- # invalidation has come from the server. It's possible, due
- # to threading issues, that when applying a local
- # invalidation after a store, that later invalidations from
- # the server may already have arrived.
-
- @locked
- def invalidate(self, oid, version, tid, server_invalidation=True):
- if tid is not None:
- if tid > self.tid:
- self.setLastTid(tid)
- elif tid < self.tid:
- if server_invalidation:
- raise ValueError("invalidation tid (%s) must not be less"
- " than previous one (%s)" %
- (u64(tid), u64(self.tid)))
+ def invalidate(self, oid, version, tid):
+ if tid > self.tid and tid is not None:
+ self.setLastTid(tid)
ofs = self.current.get(oid)
if ofs is None:
@@ -672,25 +630,17 @@
seek = self.f.seek
read = self.f.read
for oid, ofs in self.current.iteritems():
+ seek(ofs)
+ assert read(1) == 'a', (ofs, self.f.tell(), oid)
+ size, saved_oid, tid, end_tid, lver = unpack(">I8s8s8sh", read(30))
+ assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
+ assert end_tid == z64, (ofs, self.f.tell(), oid)
+ if lver:
+ version = read(lver)
+ else:
+ version = ''
+ yield oid, tid, version
- self._lock.acquire()
- try:
- seek(ofs)
- assert read(1) == 'a', (ofs, self.f.tell(), oid)
- size, saved_oid, tid, end_tid, lver = unpack(
- ">I8s8s8sh", read(30))
- assert saved_oid == oid, (ofs, self.f.tell(), oid, saved_oid)
- assert end_tid == z64, (ofs, self.f.tell(), oid)
- if lver:
- version = read(lver)
- else:
- version = ''
- result = oid, tid, version
- finally:
- self._lock.release()
-
- yield result
-
def dump(self):
from ZODB.utils import oid_repr
print "cache size", len(self)
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/ConnectionTests.py 2008-08-27 09:47:18 UTC (rev 90416)
@@ -158,7 +158,8 @@
self.addr.append(self._getAddr())
def _getAddr(self):
- return 'localhost', forker.get_port()
+ # port+1 is also used, so only draw even port numbers
+ return 'localhost', random.randrange(25000, 30000, 2)
def getConfig(self, path, create, read_only):
raise NotImplementedError
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/InvalidationTests.py 2008-08-27 09:47:18 UTC (rev 90416)
@@ -144,25 +144,24 @@
self.commitdict = commitdict
def _testrun(self):
- tm = transaction.TransactionManager()
- cn = self.db.open(transaction_manager=tm)
+ cn = self.db.open()
while not self.stop.isSet():
try:
tree = cn.root()["tree"]
break
except (ConflictError, KeyError):
- tm.abort()
+ transaction.abort()
key = self.startnum
while not self.stop.isSet():
try:
tree[key] = self.threadnum
- tm.get().note("add key %s" % key)
- tm.commit()
+ transaction.get().note("add key %s" % key)
+ transaction.commit()
self.commitdict[self] = 1
if self.sleep:
time.sleep(self.sleep)
except (ReadConflictError, ConflictError), msg:
- tm.abort()
+ transaction.abort()
else:
self.added_keys.append(key)
key += self.step
@@ -339,23 +338,16 @@
def _check_threads(self, tree, *threads):
# Make sure the thread's view of the world is consistent with
# the actual database state.
-
expected_keys = []
+ errormsgs = []
+ err = errormsgs.append
for t in threads:
if not t.added_keys:
err("thread %d didn't add any keys" % t.threadnum)
expected_keys.extend(t.added_keys)
expected_keys.sort()
-
- for i in range(100):
- tree._p_jar.sync()
- actual_keys = list(tree.keys())
- if expected_keys == actual_keys:
- break
- time.sleep(.1)
- else:
- errormsgs = []
- err = errormsgs.append
+ actual_keys = list(tree.keys())
+ if expected_keys != actual_keys:
err("expected keys != actual keys")
for k in expected_keys:
if k not in actual_keys:
@@ -363,7 +355,8 @@
for k in actual_keys:
if k not in expected_keys:
err("key %s in tree but not expected" % k)
-
+ if errormsgs:
+ display(tree)
self.fail('\n'.join(errormsgs))
def go(self, stop, commitdict, *threads):
@@ -495,9 +488,10 @@
self.go(stop, cd, t1, t2, t3)
while db1.lastTransaction() != db2.lastTransaction():
- time.sleep(.1)
+ db1._storage.sync()
+ db2._storage.sync()
- time.sleep(.1)
+
cn = db1.open()
tree = cn.root()["tree"]
self._check_tree(cn, tree)
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/forker.py 2008-08-27 09:47:18 UTC (rev 90416)
@@ -14,7 +14,6 @@
"""Library for forking storage server and connecting client storage"""
import os
-import random
import sys
import time
import errno
@@ -202,29 +201,3 @@
ack = 'no ack received'
logger.debug('shutdown_zeo_server(): acked: %s' % ack)
s.close()
-
-def get_port():
- """Return a port that is not in use.
-
- Checks if a port is in use by trying to connect to it. Assumes it
- is not in use if connect raises an exception. We actually look for
- 2 consective free ports because most of the clients of this
- function will use the returned port and the next one.
-
- Raises RuntimeError after 10 tries.
- """
- for i in range(10):
- port = random.randrange(20000, 30000)
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- try:
- s.connect(('localhost', port))
- s1.connect(('localhost', port+1))
- except socket.error:
- # Perhaps we should check value of error too.
- return port
- finally:
- s.close()
- s1.close()
- raise RuntimeError("Can't find port")
Deleted: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/invalidations_while_connecting.test 2008-08-27 09:47:18 UTC (rev 90416)
@@ -1,104 +0,0 @@
-Invalidations while connecting
-==============================
-
-As soon as a client registers with a server, it will recieve
-invalidations from the server. The client must be careful to queue
-these invalidations until it is ready to deal with them. At the time
-of the writing of this test, clients weren't careful enogh about
-queing invalidations. This led to cache corruption in the form of
-both low-level file corruption as well as out-of-date records marked
-as current.
-
-This tests tries to provoke this bug by:
-
-- starting a server
-
- >>> import ZEO.tests.testZEO, ZEO.tests.forker
- >>> addr = 'localhost', ZEO.tests.testZEO.get_port()
- >>> zconf = ZEO.tests.forker.ZEOConfig(addr)
- >>> sconf = '<filestorage 1>\npath Data.fs\n</filestorage>\n'
- >>> _, adminaddr, pid, conf_path = ZEO.tests.forker.start_zeo_server(
- ... sconf, zconf, addr[1])
-
-- opening a client to the server that writes some objects, filling
- it's cache at the same time,
-
- >>> import ZEO.ClientStorage, ZODB.tests.MinPO, transaction
- >>> db = ZODB.DB(ZEO.ClientStorage.ClientStorage(addr, client='x'))
- >>> conn = db.open()
- >>> nobs = 1000
- >>> for i in range(nobs):
- ... conn.root()[i] = ZODB.tests.MinPO.MinPO(0)
- >>> transaction.commit()
-
-- disconnecting the first client (closing it with a persistent cache),
-
- >>> db.close()
-
-- starting a second client that writes objects more or less
- constantly,
-
- >>> import random, threading
- >>> stop = False
- >>> db2 = ZODB.DB(ZEO.ClientStorage.ClientStorage(addr))
- >>> tm = transaction.TransactionManager()
- >>> conn2 = db2.open(transaction_manager=tm)
- >>> random = random.Random(0)
- >>> lock = threading.Lock()
- >>> def run():
- ... while 1:
- ... i = random.randint(0, nobs-1)
- ... if stop:
- ... return
- ... lock.acquire()
- ... try:
- ... conn2.root()[i].value += 1
- ... tm.commit()
- ... finally:
- ... lock.release()
- ... time.sleep(0)
- >>> thread = threading.Thread(target=run)
- >>> thread.start()
-
-- restarting the first client, and
-- testing for cache validity.
-
- >>> import zope.testing.loggingsupport, logging
- >>> handler = zope.testing.loggingsupport.InstalledHandler(
- ... 'ZEO', level=logging.ERROR)
-
- >>> import time
- >>> for c in range(10):
- ... time.sleep(.1)
- ... db = ZODB.DB(ZEO.ClientStorage.ClientStorage(addr, client='x'))
- ... _ = lock.acquire()
- ... try:
- ... time.sleep(.1)
- ... assert (db._storage.lastTransaction()
- ... == db._storage._server.lastTransaction()), (
- ... db._storage.lastTransaction(),
- ... db._storage._server.lastTransactiion())
- ... conn = db.open()
- ... for i in range(1000):
- ... if conn.root()[i].value != conn2.root()[i].value:
- ... print 'bad', c, i, conn.root()[i].value,
- ... print conn2.root()[i].value
- ... finally:
- ... _ = lock.release()
- ... db.close()
-
- >>> stop = True
- >>> thread.join(10)
- >>> thread.isAlive()
- False
-
- >>> for record in handler.records:
- ... print record.name, record.levelname
- ... print handler.format(record)
-
- >>> handler.uninstall()
-
- >>> db.close()
- >>> db2.close()
- >>> ZEO.tests.forker.shutdown_zeo_server(adminaddr)
-
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testConnection.py 2008-08-27 09:47:18 UTC (rev 90416)
@@ -21,8 +21,8 @@
import unittest
# Import the actual test class
from ZEO.tests import ConnectionTests, InvalidationTests
-from zope.testing import doctest, setupstack
+
class FileStorageConfig:
def getConfig(self, path, create, read_only):
return """\
@@ -135,10 +135,6 @@
for klass in test_classes:
sub = unittest.makeSuite(klass, 'check')
suite.addTest(sub)
- suite.addTest(doctest.DocFileSuite(
- 'invalidations_while_connecting.test',
- setUp=setupstack.setUpDirectory, tearDown=setupstack.tearDown,
- ))
return suite
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/testZEO.py 2008-08-27 09:47:18 UTC (rev 90416)
@@ -18,7 +18,9 @@
import doctest
import logging
import os
+import random
import signal
+import socket
import stat
import tempfile
import threading
@@ -48,7 +50,6 @@
import ZEO.zrpc.connection
from ZEO.tests import forker, Cache, CommitLockTests, ThreadTests
-from ZEO.tests.forker import get_port
import ZEO.tests.ConnectionTests
@@ -127,6 +128,27 @@
finally:
storage2.close()
+def get_port():
+ """Return a port that is not in use.
+
+ Checks if a port is in use by trying to connect to it. Assumes it
+ is not in use if connect raises an exception.
+
+ Raises RuntimeError after 10 tries.
+ """
+ for i in range(10):
+ port = random.randrange(20000, 30000)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try:
+ try:
+ s.connect(('localhost', port))
+ except socket.error:
+ # Perhaps we should check value of error too.
+ return port
+ finally:
+ s.close()
+ raise RuntimeError("Can't find port")
+
class GenericTests(
# Base class for all ZODB tests
StorageTestBase.StorageTestBase,
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/tests/test_cache.py 2008-08-27 09:47:18 UTC (rev 90416)
@@ -35,6 +35,7 @@
# testSerialization reads the entire file into a string, it's not
# good to leave it that big.
self.cache = ZEO.cache.ClientCache(size=1024**2)
+ self.cache.open()
def tearDown(self):
if self.cache.path:
@@ -44,6 +45,7 @@
self.assertEqual(self.cache.getLastTid(), None)
self.cache.setLastTid(n2)
self.assertEqual(self.cache.getLastTid(), n2)
+ self.cache.invalidate(n1, "", n1)
self.assertEqual(self.cache.getLastTid(), n2)
self.cache.invalidate(n1, "", n3)
self.assertEqual(self.cache.getLastTid(), n3)
@@ -63,8 +65,8 @@
def testInvalidate(self):
data1 = "data for n1"
self.cache.store(n1, "", n3, None, data1)
- self.cache.invalidate(n2, "", n2)
self.cache.invalidate(n1, "", n4)
+ self.cache.invalidate(n2, "", n2)
self.assertEqual(self.cache.load(n1, ""), None)
self.assertEqual(self.cache.loadBefore(n1, n4),
(data1, n3, n4))
@@ -140,6 +142,7 @@
dst.write(src.read(self.cache.maxsize))
dst.close()
copy = ZEO.cache.ClientCache(path)
+ copy.open()
# Verify that internals of both objects are the same.
# Could also test that external API produces the same results.
@@ -155,6 +158,7 @@
if self.cache.path:
os.remove(self.cache.path)
cache = ZEO.cache.ClientCache(size=50)
+ cache.open()
# We store an object that is a bit larger than the cache can handle.
cache.store(n1, '', n2, None, "x"*64)
@@ -170,6 +174,7 @@
if self.cache.path:
os.remove(self.cache.path)
cache = ZEO.cache.ClientCache(size=50)
+ cache.open()
# We store an object that is a bit larger than the cache can handle.
cache.store(n1, '', n2, n3, "x"*64)
@@ -213,6 +218,7 @@
... _ = os.spawnl(os.P_WAIT, sys.executable, sys.executable, 't')
... if os.path.exists('cache'):
... cache = ZEO.cache.ClientCache('cache')
+ ... cache.open()
... cache.close()
... os.remove('cache')
... os.remove('cache.lock')
@@ -232,6 +238,7 @@
>>> cache.store(ZODB.utils.p64(1), '', ZODB.utils.p64(1), None, data)
>>> cache.close()
>>> cache = ZEO.cache.ClientCache('cache', 1000)
+ >>> cache.open()
>>> cache.store(ZODB.utils.p64(2), '', ZODB.utils.p64(2), None, 'XXX')
>>> cache.close()
@@ -248,57 +255,6 @@
>>> cache.close()
""",
-
- thread_safe =
- r"""
-
- >>> import ZEO.cache, ZODB.utils
- >>> cache = ZEO.cache.ClientCache('cache', 1000000)
-
- >>> for i in range(100):
- ... cache.store(ZODB.utils.p64(i), '', ZODB.utils.p64(1), None, '0')
-
- >>> import random, sys, threading
- >>> random = random.Random(0)
- >>> stop = False
- >>> read_failure = None
-
- >>> def read_thread():
- ... def pick_oid():
- ... return ZODB.utils.p64(random.randint(0,99))
- ...
- ... try:
- ... while not stop:
- ... cache.load(pick_oid())
- ... cache.loadBefore(pick_oid(), ZODB.utils.p64(2))
- ... cache.modifiedInVersion(pick_oid())
- ... except:
- ... global read_failure
- ... read_failure = sys.exc_info()
-
- >>> thread = threading.Thread(target=read_thread)
- >>> thread.start()
-
- >>> for tid in range(2,10):
- ... for oid in range(100):
- ... oid = ZODB.utils.p64(oid)
- ... cache.invalidate(oid, '', ZODB.utils.p64(tid))
- ... cache.store(oid, '', ZODB.utils.p64(tid), None, str(tid))
-
- >>> stop = True
- >>> thread.join()
- >>> if read_failure:
- ... print 'Read failure:'
- ... import traceback
- ... traceback.print_exception(*read_failure)
-
- >>> expected = '9', ZODB.utils.p64(9), ''
- >>> for oid in range(100):
- ... loaded = cache.load(ZODB.utils.p64(oid))
- ... if loaded != expected:
- ... print oid, loaded
-
- """,
)
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/client.py 2008-08-27 09:47:18 UTC (rev 90416)
@@ -447,7 +447,8 @@
Call the client's testConnection(), giving the client a chance
to do app-level check of the connection.
"""
- self.conn = ManagedClientConnection(self.sock, self.addr, self.mgr)
+ self.conn = ManagedClientConnection(self.sock, self.addr,
+ self.client, self.mgr)
self.sock = None # The socket is now owned by the connection
try:
self.preferred = self.client.testConnection(self.conn)
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZEO/zrpc/connection.py 2008-08-27 09:47:18 UTC (rev 90416)
@@ -555,23 +555,14 @@
self.replies_cond.release()
def handle_request(self, msgid, flags, name, args):
- obj = self.obj
-
- if name.startswith('_') or not hasattr(obj, name):
- if obj is None:
- if __debug__:
- self.log("no object calling %s%s"
- % (name, short_repr(args)),
- level=logging.DEBUG)
- return
-
- msg = "Invalid method name: %s on %s" % (name, repr(obj))
+ if not self.check_method(name):
+ msg = "Invalid method name: %s on %s" % (name, repr(self.obj))
raise ZRPCError(msg)
if __debug__:
self.log("calling %s%s" % (name, short_repr(args)),
level=logging.DEBUG)
- meth = getattr(obj, name)
+ meth = getattr(self.obj, name)
try:
self.waiting_for_reply = True
try:
@@ -610,6 +601,12 @@
level=logging.ERROR, exc_info=True)
self.close()
+ def check_method(self, name):
+ # TODO: This is hardly "secure".
+ if name.startswith('_'):
+ return None
+ return hasattr(self.obj, name)
+
def send_reply(self, msgid, ret):
# encode() can pass on a wide variety of exceptions from cPickle.
# While a bare `except` is generally poor practice, in this case
@@ -900,7 +897,7 @@
__super_close = Connection.close
base_message_output = Connection.message_output
- def __init__(self, sock, addr, mgr):
+ def __init__(self, sock, addr, obj, mgr):
self.mgr = mgr
# We can't use the base smac's message_output directly because the
@@ -917,7 +914,7 @@
self.queue_output = True
self.queued_messages = []
- self.__super_init(sock, addr, None, tag='C', map=client_map)
+ self.__super_init(sock, addr, obj, tag='C', map=client_map)
self.thr_async = True
self.trigger = client_trigger
client_trigger.pull_trigger()
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/Connection.py 2008-08-27 09:47:18 UTC (rev 90416)
@@ -594,14 +594,7 @@
oid = obj._p_oid
serial = getattr(obj, "_p_serial", z64)
- if ((serial == z64)
- and
- ((self._savepoint_storage is None)
- or (oid not in self._savepoint_storage.creating)
- or self._savepoint_storage.creating[oid]
- )
- ):
-
+ if serial == z64:
# obj is a new object
# Because obj was added, it is now in _creating, so it
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/blob.py 2008-08-27 09:47:18 UTC (rev 90416)
@@ -531,10 +531,6 @@
LAYOUTS['lawn'] = LawnLayout()
-class BlobStorageError(Exception):
- """The blob storage encountered an invalid state."""
-
-
class BlobStorage(SpecificationDecoratorBase):
"""A storage to support blobs."""
@@ -542,8 +538,7 @@
# Proxies can't have a __dict__ so specifying __slots__ here allows
# us to have instance attributes explicitly on the proxy.
- __slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo',
- '_blobs_pack_is_in_progress', )
+ __slots__ = ('fshelper', 'dirty_oids', '_BlobStorage__supportsUndo')
def __new__(self, base_directory, storage, layout='automatic'):
return SpecificationDecoratorBase.__new__(self, storage)
@@ -562,7 +557,6 @@
else:
supportsUndo = supportsUndo()
self.__supportsUndo = supportsUndo
- self._blobs_pack_is_in_progress = False
@non_overridable
def temporaryDirectory(self):
@@ -668,29 +662,21 @@
@non_overridable
def pack(self, packtime, referencesf):
- """Remove all unused OID/TID combinations."""
- self._lock_acquire()
- try:
- if self._blobs_pack_is_in_progress:
- raise BlobStorageError('Already packing')
- self._blobs_pack_is_in_progress = True
- finally:
- self._lock_release()
+ """Remove all unused oid/tid combinations."""
+ unproxied = getProxiedObject(self)
- try:
- # Pack the underlying storage, which will allow us to determine
- # which serials are current.
- unproxied = getProxiedObject(self)
- result = unproxied.pack(packtime, referencesf)
+ # pack the underlying storage, which will allow us to determine
+ # which serials are current.
+ result = unproxied.pack(packtime, referencesf)
- # Perform a pack on the blob data.
+ # perform a pack on blob data
+ self._lock_acquire()
+ try:
if self.__supportsUndo:
self._packUndoing(packtime, referencesf)
else:
self._packNonUndoing(packtime, referencesf)
finally:
- self._lock_acquire()
- self._blobs_pack_is_in_progress = False
self._lock_release()
return result
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/blob_packing.txt 2008-08-27 09:47:18 UTC (rev 90416)
@@ -240,37 +240,6 @@
>>> os.path.exists(os.path.split(fns[0])[0])
False
-Avoiding parallel packs
-=======================
-
-Blob packing (similar to FileStorage) can only be run once at a time. For
-this, a flag (_blobs_pack_is_in_progress) is set. If the pack method is called
-while this flag is set, it will refuse to perform another pack, until the flag
-is reset:
-
- >>> blob_storage._blobs_pack_is_in_progress
- False
- >>> blob_storage._blobs_pack_is_in_progress = True
- >>> blob_storage.pack(packtime, referencesf)
- Traceback (most recent call last):
- BlobStorageError: Already packing
- >>> blob_storage._blobs_pack_is_in_progress = False
- >>> blob_storage.pack(packtime, referencesf)
-
-We can also see, that the flag is set during the pack, by leveraging the
-knowledge that the underlying storage's pack method is also called:
-
- >>> def dummy_pack(time, ref):
- ... print "_blobs_pack_is_in_progress =", blob_storage._blobs_pack_is_in_progress
- ... return base_pack(time, ref)
- >>> base_pack = base_storage.pack
- >>> base_storage.pack = dummy_pack
- >>> blob_storage.pack(packtime, referencesf)
- _blobs_pack_is_in_progress = True
- >>> blob_storage._blobs_pack_is_in_progress
- False
- >>> base_storage.pack = base_pack
-
Clean up our blob directory:
>>> shutil.rmtree(blob_dir)
Modified: ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py
===================================================================
--- ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py 2008-08-27 09:33:59 UTC (rev 90415)
+++ ZODB/branches/ctheune-bushy-directory-3.8/src/ZODB/tests/testcrossdatabasereferences.py 2008-08-27 09:47:18 UTC (rev 90416)
@@ -146,32 +146,6 @@
"""
-def test_explicit_adding_with_savepoint2():
- """
-
- >>> import ZODB.tests.util, transaction, persistent
- >>> databases = {}
- >>> db1 = ZODB.tests.util.DB(databases=databases, database_name='1')
- >>> db2 = ZODB.tests.util.DB(databases=databases, database_name='2')
- >>> tm = transaction.TransactionManager()
- >>> conn1 = db1.open(transaction_manager=tm)
- >>> conn2 = conn1.get_connection('2')
- >>> z = MyClass()
-
- >>> conn1.root()['z'] = z
- >>> conn1.add(z)
- >>> s = tm.savepoint()
- >>> conn2.root()['z'] = z
- >>> z.x = 1
- >>> tm.commit()
- >>> z._p_jar.db().database_name
- '1'
-
- >>> db1.close()
- >>> db2.close()
-
-"""
-
def tearDownDbs(test):
test.globs['db1'].close()
test.globs['db2'].close()
More information about the Zodb-checkins
mailing list