[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/ Multi-threaded IO support.
Jim Fulton
jim at zope.com
Mon Feb 1 14:12:20 EST 2010
Log message for revision 108696:
Multi-threaded IO support.
Changed:
U ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
U ZODB/trunk/src/ZODB/FileStorage/format.py
U ZODB/trunk/src/ZODB/tests/testFileStorage.py
-=-
Modified: ZODB/trunk/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/FileStorage.py 2010-02-01 19:12:17 UTC (rev 108695)
+++ ZODB/trunk/src/ZODB/FileStorage/FileStorage.py 2010-02-01 19:12:19 UTC (rev 108696)
@@ -36,6 +36,7 @@
import logging
import os
import sys
+import threading
import time
import ZODB.blob
import ZODB.interfaces
@@ -128,7 +129,7 @@
else:
self._tfile = None
- self._file_name = file_name
+ self._file_name = os.path.abspath(file_name)
self._pack_gc = pack_gc
self.pack_keep_old = pack_keep_old
@@ -167,6 +168,7 @@
self._file = open(file_name, 'w+b')
self._file.write(packed_version)
+ self._files = FilePool(self._file_name)
r = self._restore_index()
if r is not None:
self._used_index = 1 # Marker for testing
@@ -401,6 +403,7 @@
def close(self):
self._file.close()
+ self._files.close()
if hasattr(self,'_lock_file'):
self._lock_file.close()
if self._tfile:
@@ -426,22 +429,22 @@
"""Return pickle data and serial number."""
assert not version
- self._lock_acquire()
+ _file = self._files.get()
try:
pos = self._lookup_pos(oid)
- h = self._read_data_header(pos, oid)
+ h = self._read_data_header(pos, oid, _file)
if h.plen:
- data = self._file.read(h.plen)
+ data = _file.read(h.plen)
return data, h.tid
elif h.back:
# Get the data from the backpointer, but tid from
# current txn.
- data = self._loadBack_impl(oid, h.back)[0]
+ data = self._loadBack_impl(oid, h.back, _file=_file)[0]
return data, h.tid
else:
raise POSKeyError(oid)
finally:
- self._lock_release()
+ self._files.put(_file)
def loadSerial(self, oid, serial):
self._lock_acquire()
@@ -462,12 +465,12 @@
self._lock_release()
def loadBefore(self, oid, tid):
- self._lock_acquire()
+ _file = self._files.get()
try:
pos = self._lookup_pos(oid)
end_tid = None
while True:
- h = self._read_data_header(pos, oid)
+ h = self._read_data_header(pos, oid, _file)
if h.tid < tid:
break
@@ -477,13 +480,12 @@
return None
if h.back:
- data, _, _, _ = self._loadBack_impl(oid, h.back)
+ data, _, _, _ = self._loadBack_impl(oid, h.back, _file=_file)
return data, h.tid, end_tid
else:
- return self._file.read(h.plen), h.tid, end_tid
-
+ return _file.read(h.plen), h.tid, end_tid
finally:
- self._lock_release()
+ self._files.put(_file)
def store(self, oid, oldserial, data, version, transaction):
if self._is_read_only:
@@ -735,6 +737,32 @@
finally:
self._lock_release()
+ def tpc_finish(self, transaction, f=None):
+
+ # Get write lock
+ self._files.write_lock()
+ try:
+ self._lock_acquire()
+ try:
+ if transaction is not self._transaction:
+ raise POSException.StorageTransactionError(
+ "tpc_finish called with wrong transaction")
+ try:
+ if f is not None:
+ f(self._tid)
+ u, d, e = self._ude
+ self._finish(self._tid, u, d, e)
+ self._clear_temp()
+ finally:
+ self._ude = None
+ self._transaction = None
+ self._commit_lock_release()
+ finally:
+ self._lock_release()
+
+ finally:
+ self._files.write_unlock()
+
def _finish(self, tid, u, d, e):
# If self._nextpos is 0, then the transaction didn't write any
# data, so we don't bother writing anything to the file.
@@ -1131,8 +1159,10 @@
return
have_commit_lock = True
opos, index = pack_result
+ self._files.write_lock()
self._lock_acquire()
try:
+ self._files.empty()
self._file.close()
try:
os.rename(self._file_name, oldpath)
@@ -1146,6 +1176,7 @@
self._initIndex(index, self._tindex)
self._pos = opos
finally:
+ self._files.write_unlock()
self._lock_release()
# We're basically done. Now we need to deal with removed
@@ -2037,3 +2068,72 @@
'description': d}
d.update(e)
return d
+
+class FilePool:
+
+ closed = False
+ writing = False
+
+ def __init__(self, file_name):
+ self.name = file_name
+ self._files = []
+ self._out = []
+ self._cond = threading.Condition()
+
+ def write_lock(self):
+ self._cond.acquire()
+ try:
+ self.writing = True
+ while self._out:
+ self._cond.wait()
+ finally:
+ self._cond.release()
+
+ def write_unlock(self):
+ self._cond.acquire()
+ self.writing = False
+ self._cond.notifyAll()
+ self._cond.release()
+
+ def get(self):
+ self._cond.acquire()
+ try:
+ while self.writing:
+ self._cond.wait()
+ if self.closed:
+ raise ValueError('closed')
+
+ try:
+ f = self._files.pop()
+ except IndexError:
+ f = open(self.name, 'rb')
+ self._out.append(f)
+ return f
+ finally:
+ self._cond.release()
+
+ def put(self, f):
+ self._out.remove(f)
+ self._files.append(f)
+ if not self._out:
+ self._cond.acquire()
+ try:
+ if self.writing and not self._out:
+ self._cond.notifyAll()
+ finally:
+ self._cond.release()
+
+ def empty(self):
+ while self._files:
+ self._files.pop().close()
+
+ def close(self):
+ self._cond.acquire()
+ self.closed = True
+ self._cond.release()
+
+ self.write_lock()
+ try:
+ self.empty()
+ finally:
+ self.write_unlock()
Modified: ZODB/trunk/src/ZODB/FileStorage/format.py
===================================================================
--- ZODB/trunk/src/ZODB/FileStorage/format.py 2010-02-01 19:12:17 UTC (rev 108695)
+++ ZODB/trunk/src/ZODB/FileStorage/format.py 2010-02-01 19:12:19 UTC (rev 108696)
@@ -134,21 +134,24 @@
self._file.seek(pos)
return u64(self._file.read(8))
- def _read_data_header(self, pos, oid=None):
+ def _read_data_header(self, pos, oid=None, _file=None):
"""Return a DataHeader object for data record at pos.
If ois is not None, raise CorruptedDataError if oid passed
does not match oid in file.
"""
- self._file.seek(pos)
- s = self._file.read(DATA_HDR_LEN)
+ if _file is None:
+ _file = self._file
+
+ _file.seek(pos)
+ s = _file.read(DATA_HDR_LEN)
if len(s) != DATA_HDR_LEN:
raise CorruptedDataError(oid, s, pos)
h = DataHeaderFromString(s)
if oid is not None and oid != h.oid:
raise CorruptedDataError(oid, s, pos)
if not h.plen:
- h.back = u64(self._file.read(8))
+ h.back = u64(_file.read(8))
return h
def _read_txn_header(self, pos, tid=None):
@@ -164,20 +167,22 @@
h.ext = self._file.read(h.elen)
return h
- def _loadBack_impl(self, oid, back, fail=True):
+ def _loadBack_impl(self, oid, back, fail=True, _file=None):
# shared implementation used by various _loadBack methods
#
# If the backpointer ultimately resolves to 0:
# If fail is True, raise KeyError for zero backpointer.
# If fail is False, return the empty data from the record
# with no backpointer.
+ if _file is None:
+ _file = self._file
while 1:
if not back:
# If backpointer is 0, object does not currently exist.
raise POSKeyError(oid)
- h = self._read_data_header(back)
+ h = self._read_data_header(back, _file=_file)
if h.plen:
- return self._file.read(h.plen), h.tid, back, h.tloc
+ return _file.read(h.plen), h.tid, back, h.tloc
if h.back == 0 and not fail:
return None, h.tid, back, h.tloc
back = h.back
Modified: ZODB/trunk/src/ZODB/tests/testFileStorage.py
===================================================================
--- ZODB/trunk/src/ZODB/tests/testFileStorage.py 2010-02-01 19:12:17 UTC (rev 108695)
+++ ZODB/trunk/src/ZODB/tests/testFileStorage.py 2010-02-01 19:12:19 UTC (rev 108696)
@@ -587,10 +587,10 @@
>>> handler.uninstall()
- >>> fs.load('\0'*8, '')
+ >>> fs.load('\0'*8, '') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
- ValueError: I/O operation on closed file
+ ValueError: ...
>>> db.close()
>>> fs = ZODB.FileStorage.FileStorage('data.fs')
More information about the Zodb-checkins
mailing list