[Zodb-checkins] SVN: ZODB/trunk/src/ Fixed a threading bug in the StorageServerDB implementation that cause
Jim Fulton
jim at zope.com
Tue Mar 23 17:12:20 EDT 2010
Log message for revision 110118:
Fixed a threading bug in the StorageServerDB implementation that cause
the fan-out to fail.
Changed:
U ZODB/trunk/src/CHANGES.txt
U ZODB/trunk/src/ZEO/StorageServer.py
U ZODB/trunk/src/ZEO/tests/zeo-fan-out.test
-=-
Modified: ZODB/trunk/src/CHANGES.txt
===================================================================
--- ZODB/trunk/src/CHANGES.txt 2010-03-23 12:51:06 UTC (rev 110117)
+++ ZODB/trunk/src/CHANGES.txt 2010-03-23 21:12:19 UTC (rev 110118)
@@ -5,6 +5,12 @@
3.10.0a2 (2010-??-??)
=====================
+Bugs Fixed
+----------
+
+- When using using a ClientStorage in a Storage server, there was a
+ threading bug that caused clients to get disconnected.
+
New Features
------------
Modified: ZODB/trunk/src/ZEO/StorageServer.py
===================================================================
--- ZODB/trunk/src/ZEO/StorageServer.py 2010-03-23 12:51:06 UTC (rev 110117)
+++ ZODB/trunk/src/ZEO/StorageServer.py 2010-03-23 21:12:19 UTC (rev 110118)
@@ -786,13 +786,6 @@
raise StorageServerError("Versions aren't supported.")
storage_id = self.storage_id
self.server.invalidate(None, storage_id, tid, oids)
- for zeo_server in self.server.connections.get(storage_id, ())[:]:
- try:
- zeo_server.connection.poll()
- except ZEO.zrpc.error.DisconnectedError:
- pass
- else:
- break # We only need to pull one :)
def invalidateCache(self):
self.server._invalidateCache(self.storage_id)
Modified: ZODB/trunk/src/ZEO/tests/zeo-fan-out.test
===================================================================
--- ZODB/trunk/src/ZEO/tests/zeo-fan-out.test 2010-03-23 12:51:06 UTC (rev 110117)
+++ ZODB/trunk/src/ZEO/tests/zeo-fan-out.test 2010-03-23 21:12:19 UTC (rev 110118)
@@ -10,25 +10,29 @@
We'll start the first server:
>>> (_, port0), adminaddr0 = start_server(
- ... '<filestorage>\npath fs\n</filestorage>', keep=1)
+ ... '<filestorage>\npath fs\nblob-dir blobs\n</filestorage>', keep=1)
Then we'll start 2 others that use this one:
- >>> addr1, _ = start_server('<zeoclient>\nserver %s\n</zeoclient>' % port0)
- >>> addr2, _ = start_server('<zeoclient>\nserver %s\n</zeoclient>' % port0)
+ >>> addr1, _ = start_server(
+ ... '<zeoclient>\nserver %s\nblob-dir b1\n</zeoclient>' % port0)
+ >>> addr2, _ = start_server(
+ ... '<zeoclient>\nserver %s\nblob-dir b2\n</zeoclient>' % port0)
+
Now, let's create some client storages that connect to these:
- >>> import ZEO, transaction
+ >>> import os, ZEO, ZODB.blob, ZODB.POSException, transaction
- >>> db1 = ZEO.DB(addr1)
+ >>> db0 = ZEO.DB(port0, blob_dir='cb0')
+ >>> db1 = ZEO.DB(addr1, blob_dir='cb1')
>>> tm1 = transaction.TransactionManager()
>>> c1 = db1.open(transaction_manager=tm1)
>>> r1 = c1.root()
>>> r1
{}
- >>> db2 = ZEO.DB(addr2)
+ >>> db2 = ZEO.DB(addr2, blob_dir='cb2')
>>> tm2 = transaction.TransactionManager()
>>> c2 = db2.open(transaction_manager=tm2)
>>> r2 = c2.root()
@@ -43,7 +47,12 @@
>>> r1[1].v = 1000
>>> r1[2] = persistent.mapping.PersistentMapping()
>>> r1[2].v = -1000
+ >>> r1[3] = ZODB.blob.Blob('x'*4111222)
+ >>> for i in range(1000, 2000):
+ ... r1[i] = persistent.mapping.PersistentMapping()
+ ... r1[i].v = 0
>>> tm1.commit()
+ >>> blob_id = r1[3]._p_oid, r1[1]._p_serial
>>> import time
>>> for i in range(100):
@@ -51,7 +60,9 @@
... if 1 in r2:
... break
... time.sleep(0.01)
-
+ >>> tm2.abort()
+
+
>>> r2[1].v
1000
@@ -61,33 +72,60 @@
Now, let's see if we can break it. :)
>>> def f():
+ ... c = db1.open(transaction.TransactionManager())
+ ... r = c.root()
+ ... i = 0
+ ... while i < 100:
+ ... r[1].v -= 1
+ ... r[2].v += 1
+ ... try:
+ ... c.transaction_manager.commit()
+ ... i += 1
+ ... except ZODB.POSException.ConflictError:
+ ... c.transaction_manager.abort()
+ ... c.close()
+
+ >>> def g():
+ ... c = db0.open(transaction.TransactionManager())
+ ... r = c.root()
... for i in range(100):
- ... r1[1].v -= 1
- ... r1[2].v += 1
- ... tm1.commit()
- ... time.sleep(0.01)
+ ... for j in range(1000, 2000):
+ ... r[j].v += 1
+ ... c.transaction_manager.commit()
+ ... c.close()
+
>>> import threading
- >>> thread = threading.Thread(target=f)
- >>> thread.start()
+ >>> threadf = threading.Thread(target=f)
+ >>> threadg = threading.Thread(target=f)
+ >>> threadf.start()
- >>> for i in range(1000):
+ >>> threadg.start()
+
+ >>> s2 = db2.storage
+ >>> start_time = time.time()
+ >>> while time.time() - start_time < 999:
... t = tm2.begin()
... if r2[1].v + r2[2].v:
... print 'oops', r2[1], r2[2]
- ... if r1[1].v == 900:
+ ... if r2[1].v == 900:
... break # we caught up
- ... time.sleep(0.01)
+ ... path = s2.fshelper.getBlobFilename(*blob_id)
+ ... if os.path.exists(path):
+ ... ZODB.blob.remove_committed(path)
+ ... s2._server.sendBlob(*blob_id)
+ ... else: print 'Dang'
- >>> thread.join()
-
-
+ >>> threadf.join()
+
+ >>> threadg.join()
+
If we shutdown and restart the source server, the variables will be
invalidated:
>>> stop_server(adminaddr0)
>>> _ = start_server('<filestorage 1>\npath fs\n</filestorage>\n',
... port=port0)
-
+
>>> for i in range(1000):
... c1.sync()
... c2.sync()
@@ -109,5 +147,6 @@
Cleanup:
+ >>> db0.close()
>>> db1.close()
>>> db2.close()
More information about the Zodb-checkins
mailing list