[Zodb-checkins] SVN: ZODB/branches/jim-thready-zeo2/src/ZODB/ Another stab at allowing multiple threads to read.
Jim Fulton
jim at zope.com
Wed Sep 30 09:24:37 EDT 2009
Log message for revision 104650:
Another stab at allowing multiple threads to read.
Changed:
U ZODB/branches/jim-thready-zeo2/src/ZODB/FileStorage/FileStorage.py
U ZODB/branches/jim-thready-zeo2/src/ZODB/FileStorage/format.py
U ZODB/branches/jim-thready-zeo2/src/ZODB/tests/testFileStorage.py
-=-
Modified: ZODB/branches/jim-thready-zeo2/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZODB/FileStorage/FileStorage.py 2009-09-30 13:22:11 UTC (rev 104649)
+++ ZODB/branches/jim-thready-zeo2/src/ZODB/FileStorage/FileStorage.py 2009-09-30 13:24:37 UTC (rev 104650)
@@ -38,6 +38,7 @@
import logging
import os
import sys
+import threading
import time
import ZODB.blob
import ZODB.interfaces
@@ -130,7 +131,7 @@
else:
self._tfile = None
- self._file_name = file_name
+ self._file_name = os.path.abspath(file_name)
self._pack_gc = pack_gc
self.pack_keep_old = pack_keep_old
@@ -169,6 +170,7 @@
self._file = open(file_name, 'w+b')
self._file.write(packed_version)
+ self._files = FilePool(self._file_name)
r = self._restore_index()
if r is not None:
self._used_index = 1 # Marker for testing
@@ -403,6 +405,7 @@
def close(self):
self._file.close()
+ self._files.close()
if hasattr(self,'_lock_file'):
self._lock_file.close()
if self._tfile:
@@ -428,22 +431,25 @@
"""Return pickle data and serial number."""
assert not version
- self._lock_acquire()
+ _file = self._files.get()
try:
+
pos = self._lookup_pos(oid)
- h = self._read_data_header(pos, oid)
+
+ h = self._read_data_header(pos, oid, _file)
if h.plen:
- data = self._file.read(h.plen)
+ data = _file.read(h.plen)
return data, h.tid
elif h.back:
# Get the data from the backpointer, but tid from
# current txn.
- data = self._loadBack_impl(oid, h.back)[0]
+ data = self._loadBack_impl(oid, h.back, _file=_file)[0]
return data, h.tid
else:
raise POSKeyError(oid)
+
finally:
- self._lock_release()
+ self._files.put(_file)
def loadSerial(self, oid, serial):
self._lock_acquire()
@@ -464,12 +470,13 @@
self._lock_release()
def loadBefore(self, oid, tid):
- self._lock_acquire()
+ _file = self._files.get()
try:
pos = self._lookup_pos(oid)
+
end_tid = None
while True:
- h = self._read_data_header(pos, oid)
+ h = self._read_data_header(pos, oid, _file)
if h.tid < tid:
break
@@ -479,14 +486,14 @@
return None
if h.back:
- data, _, _, _ = self._loadBack_impl(oid, h.back)
+ data, _, _, _ = self._loadBack_impl(oid, h.back, _file=_file)
return data, h.tid, end_tid
else:
- return self._file.read(h.plen), h.tid, end_tid
-
+ return _file.read(h.plen), h.tid, end_tid
finally:
- self._lock_release()
+ self._files.put(_file)
+
def store(self, oid, oldserial, data, version, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
@@ -736,6 +743,31 @@
finally:
self._lock_release()
+ def tpc_finish(self, transaction, f=None):
+
+ # Get write lock
+ self._files.write_lock()
+ try:
+ self._lock_acquire()
+ try:
+ if transaction is not self._transaction:
+ return
+ try:
+ if f is not None:
+ f(self._tid)
+ u, d, e = self._ude
+ self._finish(self._tid, u, d, e)
+ self._clear_temp()
+ finally:
+ self._ude = None
+ self._transaction = None
+ self._commit_lock_release()
+ finally:
+ self._lock_release()
+
+ finally:
+ self._files.write_unlock()
+
def _finish(self, tid, u, d, e):
# If self._nextpos is 0, then the transaction didn't write any
# data, so we don't bother writing anything to the file.
@@ -1129,8 +1161,10 @@
return
have_commit_lock = True
opos, index = pack_result
+ self._files.write_lock()
self._lock_acquire()
try:
+ self._files.empty()
self._file.close()
try:
os.rename(self._file_name, oldpath)
@@ -1152,6 +1186,7 @@
have_commit_lock = False
self._remove_blob_files_tagged_for_removal_during_pack()
finally:
+ self._files.write_unlock()
self._lock_release()
finally:
if have_commit_lock:
@@ -2001,3 +2036,72 @@
'description': d}
d.update(e)
return d
+
+class FilePool:
+
+ closed = False
+ writing = False
+
+ def __init__(self, file_name):
+ self.name = file_name
+ self._files = []
+ self._out = []
+ self._cond = threading.Condition()
+
+ def write_lock(self):
+ self._cond.acquire()
+ try:
+ self.writing = True
+ while self._out:
+ self._cond.wait()
+ finally:
+ self._cond.release()
+
+ def write_unlock(self):
+ self._cond.acquire()
+ self.writing = False
+ self._cond.notifyAll()
+ self._cond.release()
+
+ def get(self):
+ self._cond.acquire()
+ try:
+ while self.writing:
+ self._cond.wait()
+ if self.closed:
+ raise ValueError('closed')
+
+ try:
+ f = self._files.pop()
+ except IndexError:
+ f = open(self.name, 'rb')
+ self._out.append(f)
+ return f
+ finally:
+ self._cond.release()
+
+ def put(self, f):
+ self._out.remove(f)
+ self._files.append(f)
+ if not self._out:
+ self._cond.acquire()
+ try:
+ if self.writing and not self._out:
+ self._cond.notifyAll()
+ finally:
+ self._cond.release()
+
+ def empty(self):
+ while self._files:
+ self._files.pop().close()
+
+ def close(self):
+ self._cond.acquire()
+ self.closed = True
+ self._cond.release()
+
+ self.write_lock()
+ try:
+ self.empty()
+ finally:
+ self.write_unlock()
Modified: ZODB/branches/jim-thready-zeo2/src/ZODB/FileStorage/format.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZODB/FileStorage/format.py 2009-09-30 13:22:11 UTC (rev 104649)
+++ ZODB/branches/jim-thready-zeo2/src/ZODB/FileStorage/format.py 2009-09-30 13:24:37 UTC (rev 104650)
@@ -134,21 +134,24 @@
self._file.seek(pos)
return u64(self._file.read(8))
- def _read_data_header(self, pos, oid=None):
+ def _read_data_header(self, pos, oid=None, _file=None):
"""Return a DataHeader object for data record at pos.
If ois is not None, raise CorruptedDataError if oid passed
does not match oid in file.
"""
- self._file.seek(pos)
- s = self._file.read(DATA_HDR_LEN)
+ if _file is None:
+ _file = self._file
+
+ _file.seek(pos)
+ s = _file.read(DATA_HDR_LEN)
if len(s) != DATA_HDR_LEN:
raise CorruptedDataError(oid, s, pos)
h = DataHeaderFromString(s)
if oid is not None and oid != h.oid:
raise CorruptedDataError(oid, s, pos)
if not h.plen:
- h.back = u64(self._file.read(8))
+ h.back = u64(_file.read(8))
return h
def _read_txn_header(self, pos, tid=None):
@@ -164,20 +167,22 @@
h.ext = self._file.read(h.elen)
return h
- def _loadBack_impl(self, oid, back, fail=True):
+ def _loadBack_impl(self, oid, back, fail=True, _file=None):
# shared implementation used by various _loadBack methods
#
# If the backpointer ultimately resolves to 0:
# If fail is True, raise KeyError for zero backpointer.
# If fail is False, return the empty data from the record
# with no backpointer.
+ if _file is None:
+ _file = self._file
while 1:
if not back:
# If backpointer is 0, object does not currently exist.
raise POSKeyError(oid)
- h = self._read_data_header(back)
+ h = self._read_data_header(back, _file=_file)
if h.plen:
- return self._file.read(h.plen), h.tid, back, h.tloc
+ return _file.read(h.plen), h.tid, back, h.tloc
if h.back == 0 and not fail:
return None, h.tid, back, h.tloc
back = h.back
Modified: ZODB/branches/jim-thready-zeo2/src/ZODB/tests/testFileStorage.py
===================================================================
--- ZODB/branches/jim-thready-zeo2/src/ZODB/tests/testFileStorage.py 2009-09-30 13:22:11 UTC (rev 104649)
+++ ZODB/branches/jim-thready-zeo2/src/ZODB/tests/testFileStorage.py 2009-09-30 13:24:37 UTC (rev 104650)
@@ -583,10 +583,10 @@
>>> handler.uninstall()
- >>> fs.load('\0'*8, '')
+ >>> fs.load('\0'*8, '') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
- ValueError: I/O operation on closed file
+ ValueError: ...
>>> db.close()
>>> fs = ZODB.FileStorage.FileStorage('data.fs')
More information about the Zodb-checkins
mailing list