import pdb import sys import time import threading import os from ZODB import DB from ZODB.PersistentList import PersistentList from ZODB.FileStorage import FileStorage from ZODB.POSException import ReadConflictError from Persistence import Persistent from twisted.trial import unittest class Computer(Persistent): def __init__(self, oidInteger): self._oid = oidInteger self._articleStatus = None self._endUserPrice = None self._location = None def setArticleStatus(self, anObject): self._articleStatus = anObject def setEndUserPrice(self, anObject): self._endUserPrice = anObject def setLocation(self, anObject): self._location = anObject class MockArticleDb(Persistent): def __init__(self): self._articles = PersistentList() def addArticle(self, anArticle): self._articles.append(anArticle) def articles(self): return self._articles class TestReadConflictError(unittest.TestCase): def setUp(self): try: os.unlink('test.fs') except OSError, err: pass storage = FileStorage( 'test.fs', create=True) db = DB(storage, cache_size=100000) connection = db.open() connection.setLocalTransaction() connection.root()['articledb'] = MockArticleDb() computer = Computer(1) computer.setArticleStatus('For sale') computer.setEndUserPrice(10050) computer.setLocation('H0101') connection.root()['articledb'].addArticle(computer) connection.getTransaction().commit() connection.close() db.close() storage.close() storage = FileStorage( 'test.fs', create=False) self._db = DB(storage, cache_size=100000) def writeStuff(self, connection): begin = time.time() connection.sync() articleDb = connection.root()['articledb'] eachBegin = time.time() computer = Computer(1) computer.setArticleStatus('For sale') computer.setEndUserPrice(10050) computer.setLocation('H0101') articleDb.addArticle(computer) connection.getTransaction().commit() print 'write list length', len(articleDb.articles()) def doRead(self, oddRead, connection): tryCount = 1 articleDb = connection.root()['articledb'] try: for article in articleDb.articles(): pass except ReadConflictError, err: if True: connection.sync() else: # this also works connection.close() connection = self._db.open() connection.setLocalTransaction() for article in articleDb.articles(): pass tryCount += 1 print 'read list length', len(articleDb.articles()) if oddRead: # on first, third, fifth read we get a ReadConflictError self.assertEquals( 2, tryCount) else: # but on second, fourth and so forth we don't self.assertEquals( 1, tryCount) def testProvokeReadConflictError(self): conn2 = self._db.open() conn2.setLocalTransaction() for each in range(1, 11): # 1000 works also but takes a bit of time begin = time.time() connection = self._db.open() connection.setLocalTransaction() self.writeStuff(conn2) oddRead = each % 2 != 0 self.doRead(oddRead, connection) connection.close() if __name__ == '__main__': from twisted.scripts.trial import run import sys sys.argv.append(sys.argv[0]) run()