[Zodb-checkins] SVN: ZODB/trunk/src/ ZEO Servers now provide an option, invalidation-age, that allows
Jim Fulton
jim at zope.com
Fri Jan 2 16:22:40 EST 2009
Log message for revision 94461:
ZEO Servers now provide an option, invalidation-age, that allows
quick verification of ZEO clients less than a given age even if the
number of transactions the client hasn't seen exceeds the
invalidation queue size. This is only recommended if the storage
being served supports effecient iteration from a point near the end
of the transaction history.
Also refactored ZEO/tests/zeoserver and ZEO/runzeo to have the test
server use, and this exercisem, the option-handling code from runzeo.
Changed:
U ZODB/trunk/src/CHANGES.txt
U ZODB/trunk/src/ZEO/StorageServer.py
U ZODB/trunk/src/ZEO/component.xml
U ZODB/trunk/src/ZEO/runzeo.py
U ZODB/trunk/src/ZEO/tests/forker.py
A ZODB/trunk/src/ZEO/tests/invalidation-age.txt
U ZODB/trunk/src/ZEO/tests/testZEO.py
U ZODB/trunk/src/ZEO/tests/zeoserver.py
-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt 2009-01-02 19:36:41 UTC (rev 94460)
+++ ZODB/trunk/src/CHANGES.txt 2009-01-02 21:22:39 UTC (rev 94461)
@@ -2,7 +2,7 @@
Change History
================
-3.9.0 (2008-??-??)
+3.9.0 (2009-??-??)
====================
New Features
@@ -22,7 +22,7 @@
XXX There are known issues with this implementation that need to be
sorted out before it is "released".
-3.9.0a9 (2008-12-??)
+3.9.0a9 (2009-01-??)
====================
New Features
@@ -42,10 +42,18 @@
- As a small convenience (mainly for tests), you can now specify
initial data as a string argument to the Blob constructor.
-- The FileStorage iterator now handles large files better. Whenm
+- ZEO Servers now provide an option, invalidation-age, that allows
+ quick verification of ZEO clients less than a given age even if the
+ number of transactions the client hasn't seen exceeds the
+ invalidation queue size. This is only recommended if the storage
+ being served supports effecient iteration from a point near the end
+ of the transaction history.
+
+- The FileStorage iterator now handles large files better. When
iteratng from a starting transaction near the end of the file, the
iterator will scan backward from the end of the file to find the
- starting point.
+ starting point. This enhancement makes it practical to take
+ advantage of the new storage server invalidation-age option.
3.9.0a8 (2008-12-15)
====================
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py 2009-01-02 19:36:41 UTC (rev 94460)
+++ ZODB/trunk/src/ZEO/StorageServer.py 2009-01-02 21:22:39 UTC (rev 94461)
@@ -34,6 +34,7 @@
import transaction
import ZODB.serialize
+import ZODB.TimeStamp
import ZEO.zrpc.error
import zope.interface
@@ -48,7 +49,7 @@
from ZODB.POSException import StorageError, StorageTransactionError
from ZODB.POSException import TransactionError, ReadOnlyError, ConflictError
from ZODB.serialize import referencesf
-from ZODB.utils import u64, oid_repr, mktemp
+from ZODB.utils import u64, p64, oid_repr, mktemp
from ZODB.loglevels import BLATHER
@@ -817,6 +818,7 @@
def __init__(self, addr, storages, read_only=0,
invalidation_queue_size=100,
+ invalidation_age=None,
transaction_timeout=None,
monitor_address=None,
auth_protocol=None,
@@ -854,6 +856,13 @@
speed client cache verification when a client disconnects
for a short period of time.
+ invalidation_age --
+ If the invalidation queue isn't big enough to support a
+ quick verification, but the last transaction seen by a
+ client is younger than the invalidation age, then
+ invalidations will be computed by iterating over
+ transactions later than the given transaction.
+
transaction_timeout -- The maximum amount of time to wait for
a transaction to commit after acquiring the storage lock.
If the transaction takes too long, the client connection
@@ -907,7 +916,7 @@
for name, storage in storages.items():
self._setup_invq(name, storage)
storage.registerDB(StorageServerDB(self, name))
-
+ self.invalidation_age = invalidation_age
self.connections = {}
self.dispatcher = self.DispatcherClass(addr,
factory=self.new_connection)
@@ -1126,31 +1135,35 @@
do full cache verification.
"""
-
- invq = self.invq[storage_id]
-
# We make a copy of invq because it might be modified by a
# foreign (other than main thread) calling invalidate above.
- invq = invq[:]
+ invq = self.invq[storage_id][:]
- if not invq:
+ oids = set()
+ latest_tid = None
+ if invq and invq[-1][0] <= tid:
+ # We have needed data in the queue
+ for _tid, L in invq:
+ if _tid <= tid:
+ break
+ oids.update(L)
+ latest_tid = invq[0][0]
+ elif (self.invalidation_age and
+ (self.invalidation_age >
+ (time.time()-ZODB.TimeStamp.TimeStamp(tid).timeTime())
+ )
+ ):
+ for t in self.storages[storage_id].iterator(p64(u64(tid)+1)):
+ for r in t:
+ oids.add(r.oid)
+ latest_tid = t.tid
+ elif not invq:
log("invq empty")
- return None, []
+ else:
+ log("tid to old for invq %s < %s" % (u64(tid), u64(invq[-1][0])))
- earliest_tid = invq[-1][0]
- if earliest_tid > tid:
- log("tid to old for invq %s < %s" % (u64(tid), u64(earliest_tid)))
- return None, []
+ return latest_tid, list(oids)
- oids = {}
- for _tid, L in invq:
- if _tid <= tid:
- break
- for key in L:
- oids[key] = 1
- latest_tid = invq[0][0]
- return latest_tid, oids.keys()
-
def close_server(self):
"""Close the dispatcher so that there are no new connections.
Modified: ZODB/trunk/src/ZEO/component.xml
===================================================================
--- ZODB/trunk/src/ZEO/component.xml 2009-01-02 19:36:41 UTC (rev 94460)
+++ ZODB/trunk/src/ZEO/component.xml 2009-01-02 21:22:39 UTC (rev 94461)
@@ -45,6 +45,16 @@
</description>
</key>
+ <key name="invalidation-age" datatype="float" required="no">
+ <description>
+ The maximum age of a client for which quick-verification
+ invalidations will be provided by iterating over the served
+ storage. This option should only be used if the served storage
+ supports efficient iteration from a starting point near the
+ end of the transaction history (e.g. end of file).
+ </description>
+ </key>
+
<key name="monitor-address" datatype="socket-binding-address"
required="no">
<description>
Modified: ZODB/trunk/src/ZEO/runzeo.py
===================================================================
--- ZODB/trunk/src/ZEO/runzeo.py 2009-01-02 19:36:41 UTC (rev 94460)
+++ ZODB/trunk/src/ZEO/runzeo.py 2009-01-02 21:22:39 UTC (rev 94461)
@@ -100,6 +100,7 @@
self.add("read_only", "zeo.read_only", default=0)
self.add("invalidation_queue_size", "zeo.invalidation_queue_size",
default=100)
+ self.add("invalidation_age", "zeo.invalidation_age")
self.add("transaction_timeout", "zeo.transaction_timeout",
"t:", "timeout=", float)
self.add("monitor_address", "zeo.monitor_address.address",
@@ -137,8 +138,8 @@
if s.name is None:
s.name = '1'
break
-
+
class ZEOServer:
def __init__(self, options):
@@ -243,17 +244,7 @@
SignalHandler.registerHandler(SIGUSR2, self.handle_sigusr2)
def create_server(self):
- from ZEO.StorageServer import StorageServer
- self.server = StorageServer(
- self.options.address,
- self.storages,
- read_only=self.options.read_only,
- invalidation_queue_size=self.options.invalidation_queue_size,
- transaction_timeout=self.options.transaction_timeout,
- monitor_address=self.options.monitor_address,
- auth_protocol=self.options.auth_protocol,
- auth_database=self.options.auth_database,
- auth_realm=self.options.auth_realm)
+ self.server = create_server(self.storages, self.options)
def loop_forever(self):
asyncore.loop()
@@ -332,6 +323,23 @@
except IOError:
logger.error("PID file '%s' could not be removed" % pidfile)
+
+def create_server(storages, options):
+ from ZEO.StorageServer import StorageServer
+ return StorageServer(
+ options.address,
+ storages,
+ read_only = options.read_only,
+ invalidation_queue_size = options.invalidation_queue_size,
+ invalidation_age = options.invalidation_age,
+ transaction_timeout = options.transaction_timeout,
+ monitor_address = options.monitor_address,
+ auth_protocol = options.auth_protocol,
+ auth_database = options.auth_database,
+ auth_realm = options.auth_realm,
+ )
+
+
# Signal names
signames = None
Modified: ZODB/trunk/src/ZEO/tests/forker.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/forker.py 2009-01-02 19:36:41 UTC (rev 94460)
+++ ZODB/trunk/src/ZEO/tests/forker.py 2009-01-02 21:22:39 UTC (rev 94461)
@@ -36,6 +36,7 @@
self.address = addr
self.read_only = None
self.invalidation_queue_size = None
+ self.invalidation_age = None
self.monitor_address = None
self.transaction_timeout = None
self.authentication_protocol = None
@@ -49,6 +50,8 @@
print >> f, "read-only", self.read_only and "true" or "false"
if self.invalidation_queue_size is not None:
print >> f, "invalidation-queue-size", self.invalidation_queue_size
+ if self.invalidation_age is not None:
+ print >> f, "invalidation-age", self.invalidation_age
if self.monitor_address is not None:
print >> f, "monitor-address %s:%s" % self.monitor_address
if self.transaction_timeout is not None:
Added: ZODB/trunk/src/ZEO/tests/invalidation-age.txt
===================================================================
--- ZODB/trunk/src/ZEO/tests/invalidation-age.txt (rev 0)
+++ ZODB/trunk/src/ZEO/tests/invalidation-age.txt 2009-01-02 21:22:39 UTC (rev 94461)
@@ -0,0 +1,141 @@
+Invalidation age
+================
+
+When a ZEO client with a non-empty cache connects to the server, it
+needs to verify whether the data in its cache is current. It does
+this in one of 2 ways:
+
+quick verification
+ It gets a list of invalidations from the server since the last
+ transaction the client has seen and applies those to it's disk and
+ in-memory caches. This is only possible if there haven't been too
+ many transactions since the client was last connected.
+
+full verification
+ If quick verification isn't possible, the client iterates through
+ it's disk cache asking the server to verify whether each current
+ entry is valid.
+
+Unfortunately, for large caches, full verification is soooooo not
+quick that it is impractical. Quick verificatioin is highly
+desireable.
+
+To support quick verification, the server keeps a list of recent
+invalidations. The size of this list is controlled by the
+invalidation_queue_size parameter. If there is a lot of database
+activity, the size might need to be quite large to support having
+clients be disconnected for more than a few minutes. A very large
+invalidation queue size can use a lot of memory.
+
+To suppliment the invalidation queue, you can also specify an
+invalidation_age parameter. When a client connects and presents the
+last transaction id it has seen, we first check to see if the
+invalidation queue has that transaction id. It it does, then we send
+all transactions since that id. Otherwise, we check to see if the
+difference between storage's last transaction id and the given id is
+less than or equal to the invalidation age. If it is, then we iterate
+over the storage, starting with the given id, to get the invalidations
+since the given id.
+
+NOTE: This assumes that iterating from a point near the "end" of a
+database is inexpensive. Don't use this option for a storage for which
+that is not the case.
+
+Here's an example. We set up a server, using an
+invalidation-queue-size of 5:
+
+ >>> addr, admin = start_server(zeo_conf=dict(invalidation_queue_size=5),
+ ... keep=True)
+
+Now, we'll open a client with a persistent cache, set up some data,
+and then close client:
+
+ >>> import ZEO, transaction
+ >>> db = ZEO.DB(addr, client='test')
+ >>> conn = db.open()
+ >>> for i in range(9):
+ ... conn.root()[i] = conn.root().__class__()
+ ... conn.root()[i].x = 0
+ >>> transaction.commit()
+ >>> db.close()
+
+We'll open another client, and commit some transactions:
+
+ >>> db = ZEO.DB(addr)
+ >>> conn = db.open()
+ >>> import transaction
+ >>> for i in range(2):
+ ... conn.root()[i].x = 1
+ ... transaction.commit()
+ >>> db.close()
+
+If we reopen the first client, we'll do quick verification. We'll
+turn on logging so we can see this:
+
+ >>> import logging, sys
+ >>> logging.getLogger().setLevel(logging.INFO)
+ >>> handler = logging.StreamHandler(sys.stdout)
+ >>> logging.getLogger().addHandler(handler)
+
+ >>> db = ZEO.DB(addr, client='test') # doctest: +ELLIPSIS
+ ('localhost', ...
+ ('localhost', ...) Recovering 2 invalidations
+
+ >>> logging.getLogger().removeHandler(handler)
+
+ >>> [v.x for v in db.open().root().values()]
+ [1, 1, 0, 0, 0, 0, 0, 0, 0]
+
+Now, if we disconnect and commit more than 5 transactions, we'll see
+that verification is necessary:
+
+ >>> db.close()
+ >>> db = ZEO.DB(addr)
+ >>> conn = db.open()
+ >>> import transaction
+ >>> for i in range(9):
+ ... conn.root()[i].x = 2
+ ... transaction.commit()
+ >>> db.close()
+
+ >>> logging.getLogger().addHandler(handler)
+ >>> db = ZEO.DB(addr, client='test') # doctest: +ELLIPSIS
+ ('localhost', ...
+ ('localhost', ...) Verifying cache
+ ('localhost', ...) endVerify finishing
+ ('localhost', ...) endVerify finished
+
+ >>> logging.getLogger().removeHandler(handler)
+
+ >>> [v.x for v in db.open().root().values()]
+ [2, 2, 2, 2, 2, 2, 2, 2, 2]
+
+ >>> db.close()
+
+But if we restart the server with invalidation-age set, we can
+do quick verification:
+
+ >>> stop_server(admin)
+ >>> addr, admin = start_server(zeo_conf=dict(invalidation_queue_size=5,
+ ... invalidation_age=100))
+ >>> db = ZEO.DB(addr)
+ >>> conn = db.open()
+ >>> import transaction
+ >>> for i in range(9):
+ ... conn.root()[i].x = 3
+ ... transaction.commit()
+ >>> db.close()
+
+
+ >>> logging.getLogger().addHandler(handler)
+ >>> db = ZEO.DB(addr, client='test') # doctest: +ELLIPSIS
+ ('localhost', ...
+ ('localhost', ...) Recovering 9 invalidations
+
+ >>> logging.getLogger().removeHandler(handler)
+
+ >>> [v.x for v in db.open().root().values()]
+ [3, 3, 3, 3, 3, 3, 3, 3, 3]
+
+ >>> db.close()
+
Property changes on: ZODB/trunk/src/ZEO/tests/invalidation-age.txt
___________________________________________________________________
Added: svn:eol-style
+ native
Modified: ZODB/trunk/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/testZEO.py 2009-01-02 19:36:41 UTC (rev 94460)
+++ ZODB/trunk/src/ZEO/tests/testZEO.py 2009-01-02 21:22:39 UTC (rev 94461)
@@ -68,7 +68,7 @@
"""ZEO tests that don't fit in elsewhere."""
def checkCreativeGetState(self):
- # This test covers persistent objects that provide their own
+ # This test covers persistent objects that provide their own
# __getstate__ which modifies the state of the object.
# For details see bug #98275
@@ -456,7 +456,7 @@
)
ZEO.zrpc.connection.client_map[None] = Evil()
-
+
try:
ZEO.zrpc.connection.client_trigger.pull_trigger()
except DisconnectedError:
@@ -498,7 +498,7 @@
self._invalidatedCache += 1
def invalidate(*a, **k):
pass
-
+
db = DummyDB()
storage.registerDB(db)
@@ -517,8 +517,8 @@
# Now, the root object in the connection should have been invalidated:
self.assertEqual(db._invalidatedCache, base+1)
-
+
class CommonBlobTests:
def getConfig(self):
@@ -638,7 +638,7 @@
for i in range(1000000):
somedata.write("%s\n" % i)
somedata.seek(0)
-
+
blob = Blob()
bd_fh = blob.open('w')
ZODB.utils.cp(somedata, bd_fh)
@@ -665,7 +665,7 @@
def check_data(path):
self.assert_(os.path.exists(path))
f = open(path, 'rb')
- somedata.seek(0)
+ somedata.seek(0)
d1 = d2 = 1
while d1 or d2:
d1 = f.read(8096)
@@ -681,7 +681,7 @@
self.blobdir,
ZODB.blob.BushyLayout().getBlobFilePath(oid, revid),
)
-
+
self.assert_(server_filename.startswith(self.blobdir))
check_data(server_filename)
@@ -800,12 +800,12 @@
>>> for i in range(20):
... o2.x = PersistentDict(); o2 = o2.x
... commit()
-
+
>>> trans, oids = s1.getInvalidations(last)
>>> from ZODB.utils import u64
>>> sorted([int(u64(oid)) for oid in oids])
[10, 11, 12, 13, 14]
-
+
>>> server.close_server()
"""
@@ -834,7 +834,7 @@
>>> db.close()
Now we'll open a storage server on the data, simulating a restart:
-
+
>>> fs = FileStorage('t.fs')
>>> sv = StorageServer(('', get_port()), dict(fs=fs))
>>> s = ZEOStorage(sv, sv.read_only)
@@ -884,7 +884,7 @@
>>> sv = StorageServer(('', get_port()), dict(fs=fs))
>>> st = StorageServerWrapper(sv, 'fs')
>>> s = st.server
-
+
Now, if we ask for the invalidations since the last committed
transaction, we'll get a result:
@@ -1070,7 +1070,7 @@
>>> handler.uninstall()
>>> stop_server(admin)
-
+
"""
def history_over_zeo():
@@ -1106,7 +1106,7 @@
"""If we delete on one client, the delete should be reflected on the other.
First, we'll create an object:
-
+
>>> addr, _ = start_server()
>>> db = ZEO.DB(addr)
>>> conn = db.open()
@@ -1116,10 +1116,10 @@
We verify that we can read it in another client, which also loads
it into the client cache.
-
+
>>> cs = ClientStorage(addr)
>>> p, s = cs.load(oid)
-
+
Now, we'll remove the object:
>>> txn = transaction.begin()
@@ -1137,7 +1137,7 @@
We'll wait for our other storage to get the invalidation and then
try to access the object. We'll get a POSKeyError there too:
-
+
>>> tid = db.storage.lastTransaction()
>>> forker.wait_until(lambda : cs.lastTransaction() == tid)
>>> cs.load(oid) # doctest: +ELLIPSIS
@@ -1153,7 +1153,7 @@
BlobAdaptedFileStorageTests, BlobWritableCacheTests,
DemoStorageTests, FileStorageTests, MappingStorageTests,
]
-
+
quick_test_classes = [
FileStorageRecoveryTests, ConfigurationTests, HeartbeatTests,
CatastrophicClientLoopFailure, ConnectionInvalidationOnReconnect,
@@ -1194,7 +1194,7 @@
shared_blob_dir=True)
else:
ClientStorage.__init__(self, addr, blob_dir=blob_dir)
-
+
def close(self):
ClientStorage.close(self)
zope.testing.setupstack.tearDown(self)
@@ -1228,7 +1228,7 @@
doctest.DocFileSuite(
'zeo-fan-out.test', 'zdoptions.test',
'drop_cache_rather_than_verify.txt', 'client-config.test',
- 'protocols.test', 'zeo_blob_cache.test',
+ 'protocols.test', 'zeo_blob_cache.test', 'invalidation-age.txt',
setUp=forker.setUp, tearDown=zope.testing.setupstack.tearDown,
),
)
Modified: ZODB/trunk/src/ZEO/tests/zeoserver.py
===================================================================
--- ZODB/trunk/src/ZEO/tests/zeoserver.py 2009-01-02 19:36:41 UTC (rev 94460)
+++ ZODB/trunk/src/ZEO/tests/zeoserver.py 2009-01-02 21:22:39 UTC (rev 94461)
@@ -13,20 +13,18 @@
##############################################################################
"""Helper file used to launch a ZEO server cross platform"""
-from ZEO.StorageServer import StorageServer
-from ZEO.runzeo import ZEOOptions
-
import asyncore
-import os
-import sys
-import time
import errno
import getopt
-import socket
+import logging
+import os
import signal
-import asyncore
+import socket
+import sys
import threading
-import logging
+import time
+import ZEO.runzeo
+import ZEO.zrpc.connection
def cleanup(storage):
# FileStorage and the Berkeley storages have this method, which deletes
@@ -164,15 +162,14 @@
elif opt == '-S':
suicide = False
elif opt == '-v':
- import ZEO.zrpc.connection
ZEO.zrpc.connection.Connection.current_protocol = arg
- zo = ZEOOptions()
+ zo = ZEO.runzeo.ZEOOptions()
zo.realize(["-C", configfile])
zeo_port = int(zo.address[1])
if zo.auth_protocol == "plaintext":
- import ZEO.tests.auth_plaintext
+ __import__('ZEO.tests.auth_plaintext')
# Open the config file and let ZConfig parse the data there. Then remove
# the config file, otherwise we'll leave turds.
@@ -185,16 +182,7 @@
mon_addr = None
if zo.monitor_address:
mon_addr = zo.monitor_address
- server = StorageServer(
- zo.address,
- {"1": storage},
- read_only=zo.read_only,
- invalidation_queue_size=zo.invalidation_queue_size,
- transaction_timeout=zo.transaction_timeout,
- monitor_address=mon_addr,
- auth_protocol=zo.auth_protocol,
- auth_database=zo.auth_database,
- auth_realm=zo.auth_realm)
+ server = ZEO.runzeo.create_server({"1": storage}, zo)
try:
log(label, 'creating the test server, keep: %s', keep)
More information about the Zodb-checkins
mailing list