[Zodb-checkins] SVN: ZODB/branches/jim-zeo-registerdb/src/ZEO/ Got
fan-out tests to pass.
Jim Fulton
jim at zope.com
Fri May 11 15:10:40 EDT 2007
Log message for revision 75684:
Got fan-out tests to pass.
Changed:
U ZODB/branches/jim-zeo-registerdb/src/ZEO/ClientStorage.py
U ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py
U ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test
U ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py
U ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test
-=-
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/ClientStorage.py 2007-05-11 16:29:07 UTC (rev 75683)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/ClientStorage.py 2007-05-11 19:10:39 UTC (rev 75684)
@@ -1028,6 +1028,9 @@
self._server.vote(id(txn))
return self._check_serials()
+ def tpc_transaction(self):
+ return self._transaction
+
def tpc_begin(self, txn, tid=None, status=' '):
"""Storage API: begin a transaction."""
if self._is_read_only:
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py 2007-05-11 16:29:07 UTC (rev 75683)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/StorageServer.py 2007-05-11 19:10:39 UTC (rev 75684)
@@ -33,6 +33,7 @@
import ZODB.serialize
import ZEO.zrpc.error
+
from ZEO import ClientStub
from ZEO.CommitLog import CommitLog
from ZEO.monitor import StorageStats, StatsServer
@@ -627,13 +628,13 @@
self.log(msg, logging.ERROR)
err = StorageServerError(msg)
# The exception is reported back as newserial for this oid
- newserial = err
+ newserial = [(oid, err)]
else:
if serial != "\0\0\0\0\0\0\0\0":
self.invalidated.append((oid, version))
- if isinstance(newserial, str):
- newserial = (oid, newserial)
+ if isinstance(newserial, str):
+ newserial = [(oid, newserial)]
if newserial:
for oid, s in newserial:
@@ -764,10 +765,18 @@
self.references = ZODB.serialize.referencesf
def invalidate(self, tid, oids, version=''):
+ storage_id = self.storage_id
self.server.invalidate(
- None, self.storage_id, tid,
+ None, storage_id, tid,
[(oid, version) for oid in oids],
)
+ for zeo_server in self.server.connections.get(storage_id, ())[:]:
+ try:
+ zeo_server.connection.poll()
+ except ZEO.zrpc.error.DisconnectedError:
+ pass
+ else:
+ break # We only need to pull one :)
def invalidateCache(self):
self.server._invalidateCache(self.storage_id)
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test 2007-05-11 16:29:07 UTC (rev 75683)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/registerDB.test 2007-05-11 19:10:39 UTC (rev 75684)
@@ -59,6 +59,8 @@
... def should_close(self):
... print 'closed', self.obj.name
... self.mgr.close_conn(self)
+ ... def poll(self):
+ ... pass
>>> class ZEOStorage:
... def __init__(self, server, name):
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py 2007-05-11 16:29:07 UTC (rev 75683)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/testZEO.py 2007-05-11 19:10:39 UTC (rev 75684)
@@ -27,6 +27,7 @@
import shutil
import zope.testing.setupstack
+from zope.testing import doctest
# ZODB test support
import ZODB
Modified: ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test
===================================================================
--- ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test 2007-05-11 16:29:07 UTC (rev 75683)
+++ ZODB/branches/jim-zeo-registerdb/src/ZEO/tests/zeo-fan-out.test 2007-05-11 19:10:39 UTC (rev 75684)
@@ -30,6 +30,7 @@
... '<zeoclient 1>\n server %s\n</zeoclient>\n' % port0,
... zconf2, port2)
+
Now, let's create some client storages that connect to these:
>>> import ZEO.ClientStorage
@@ -55,21 +56,27 @@
>>> r2
{}
+ >>> db2 = DB(cs2)
+ >>> tm2 = transaction.TransactionManager()
+ >>> c2 = db2.open(transaction_manager=tm2)
+ >>> r2 = c2.root()
+ >>> r2
+ {}
+
If we update c1, we'll eventually see the change in c2:
- >>> import persistent
- >>> class P(persistent.Persistent):
- ... pass
+ >>> import persistent.mapping
- >>> r1[1] = P()
+ >>> r1[1] = persistent.mapping.PersistentMapping()
>>> r1[1].v = 1000
- >>> r1[2] = P()
+ >>> r1[2] = persistent.mapping.PersistentMapping()
>>> r1[2].v = -1000
+ >>> tm1.commit()
>>> import time
>>> for i in range(100):
- ... c2.sync()
- ... if r2:
+ ... t = tm2.begin()
+ ... if 1 in r2:
... break
... time.sleep(0.01)
@@ -82,26 +89,30 @@
Now, let's see if we can break it. :)
>>> def f():
- ... for in in range(100):
- ... r1[1] -= 1
- ... r1[2] += 1
+ ... for i in range(100):
+ ... r1[1].v -= 1
+ ... r1[2].v += 1
... tm1.commit()
... time.sleep(0.01)
- >>> import thread
- >>> thread.start_new_thread(f, ())
+ >>> import threading
+ >>> thread = threading.Thread(target=f)
+ >>> thread.start()
>>> for i in range(1000):
- ... c2.sync()
- ... if c2[1] + c2[2]:
- ... print 'oops', c2[1], c2[2]
- ... if not c2[1]:
- ... break
+ ... t = tm2.begin()
+ ... if r2[1].v + r2[2].v:
+ ... print 'oops', r2[1], r2[2]
+ ... if r1[1].v == 900:
+ ... break # we caught up
... time.sleep(0.01)
+
+ >>> thread.join()
+
If we shutdown and restart the source server, the variables will be
invalidated:
- >>> forker.shutdown_zeo_server(adminaddr0)
+ >>> ZEO.tests.forker.shutdown_zeo_server(adminaddr0)
>>> zport0, adminaddr0, pid0, path0 = ZEO.tests.forker.start_zeo_server(
... '<filestorage 1>\n path fs\n</filestorage>\n', zconf0, port0)
More information about the Zodb-checkins
mailing list