[ZODB-Dev] Problems with ReadConflictError
Andrew McLean
andrew-zodb at andros.org.uk
Mon Aug 21 14:44:17 EDT 2006
I'm having a problem with my first multi-threaded application using ZODB
(v3.6).
The program basically uses an OOBTree to record whether tasks have been
completed. The key is the task identifier. There are multiple Downloader
threads that need to read the OOBTree (to check if a task has been
completed). There is a single ProcessResults thread that writes to the
OOBTree (when a task is completed). Each thread uses the same DB
instance, but uses it's own connection instance. I also have a Packer
thread that periodically packs the database.
The problem is that the Downloader thread occasionally throws a
ReadConfictError exception.
I had thought that I could just wait for a bit and then retry the read,
but that didn't help. I have a workaround, which is to assume the task
is not done when the exception is thrown. But this isn't very elegant.
I suspect that I may not understand how the transaction manager works.
Currently I just use "transaction.get().commit()" after each write to
the database.
Any advice gratefully received.
I have attached a skeleton of the code below, if that helps.
- Andrew McLean
# Import various modules to support the ZODB
from ZODB import FileStorage, DB
from ZODB.POSException import ReadConflictError
from BTrees.OOBTree import OOBTree
import transaction
import BTrees.check
class Packer(threading.Thread):
"""Packer class to pack the ZODB at periodic intervals"""
def __init__(self, db, interval):
self.__db = db
self.__interval = interval
threading.Thread.__init__(self)
def run(self):
while 1:
print "Packing database..."
self.__db.pack()
time.sleep(self.__interval)
class Downloader(threading.Thread):
"""Downloader class to download urls"""
def __init__(self, id, taskQueue, resultQueue, db):
self.id = id
self.__taskQueue = taskQueue
self.__resultQueue = resultQueue
self.__results = db.open().root()['done']
threading.Thread.__init__(self)
def run(self):
while 1:
# Get task from the queue
task = self.__taskQueue.get()
if (task[1] == 1) or (not self.done(task)):
# ....
# Add to results queue
self.__resultQueue.put((task, result))
def done(self, task):
## backoffDelay = 1
while 1:
try:
return self.__results.has_key(task)
except ReadConflictError:
## print "*** ReadConfictError raised: retrying in %d
seconds ***" % backoffDelay
## time.sleep(backoffDelay)
## backoffDelay = backoffDelay * 2
print "*** ReadConflictError raised: Assume task not
completed for safety ***"
return False
class ProcessResults(threading.Thread):
"""ProcessPage class to process downloaded URLs"""
def __init__(self, taskQueue, resultQueue, resultWriter, db, problems):
self.__taskQueue = taskQueue
self.__resultQueue = resultQueue
self.__writer = resultWriter
self.__results = db.open().root()['done']
self.__problems = problems
threading.Thread.__init__(self)
def run(self):
while 1:
fullResult = self.__resultQueue.get()
# ...
# Write results to database
self.__results[task] = True
transaction.get().commit()
def main():
# Setting up the ZODB database
# ...
storage = FileStorage.FileStorage(databasePath)
db = DB(storage)
# Create the "done" object (if required)
root = db.open().root()
if not root.has_key('done'):
root['done'] = OOBTree()
transaction.get().commit()
# ...
# Set up the workers to process the task queue
print "Starting threads..."
ProcessResults(taskQueue, resultQueue, resultWriter, db,
problems).start()
Packer(db, 15*60).start()
for id in range(nDownloaders):
Downloader(id, taskQueue, resultQueue, db).start()
main()
More information about the ZODB-Dev
mailing list