[Zodb-checkins] SVN: ZODB/branches/jim-thready-zeo/src/ZODB/ Allow multiple file-storage loads to happen at once.
Jim Fulton
jim at zope.com
Wed Sep 16 16:10:08 EDT 2009
Log message for revision 104153:
Allow multiple file-storage loads to happen at once.
Changed:
U ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/FileStorage.py
U ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/format.py
U ZODB/branches/jim-thready-zeo/src/ZODB/tests/testFileStorage.py
-=-
Modified: ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/FileStorage.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/FileStorage.py 2009-09-16 15:38:14 UTC (rev 104152)
+++ ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/FileStorage.py 2009-09-16 20:10:08 UTC (rev 104153)
@@ -38,6 +38,7 @@
import logging
import os
import sys
+import threading
import time
import ZODB.blob
import ZODB.interfaces
@@ -130,7 +131,7 @@
else:
self._tfile = None
- self._file_name = file_name
+ self._file_name = os.path.abspath(file_name)
self._pack_gc = pack_gc
self.pack_keep_old = pack_keep_old
@@ -139,6 +140,9 @@
BaseStorage.BaseStorage.__init__(self, file_name)
+ self._oid_locks = {}
+ self._oid_lock_condition = threading.Condition()
+
index, tindex = self._newIndexes()
self._initIndex(index, tindex)
@@ -147,7 +151,7 @@
self._file = None
if not create:
try:
- self._file = open(file_name, read_only and 'rb' or 'r+b')
+ self._file = open(file_name, read_only and 'rb' or 'r+b', 0)
except IOError, exc:
if exc.errno == errno.EFBIG:
# The file is too big to open. Fail visibly.
@@ -166,9 +170,10 @@
if self._file is None and create:
if os.path.exists(file_name):
os.remove(file_name)
- self._file = open(file_name, 'w+b')
+ self._file = open(file_name, 'w+b', 0)
self._file.write(packed_version)
+ self._files = FilePool(self._file_name)
r = self._restore_index()
if r is not None:
self._used_index = 1 # Marker for testing
@@ -218,6 +223,39 @@
self.blob_dir = None
self._blob_init_no_blobs()
+ def _lock_oid(self, oid, read=True):
+ self._oid_lock_condition.acquire()
+ while 1:
+ lock = self._oid_locks.get(oid, 0)
+ if read:
+ if lock < 0:
+ self._oid_lock_condition.wait()
+ continue
+ lock += 1
+ else:
+ if lock:
+ self._oid_lock_condition.wait()
+ continue
+ lock = -1
+ break
+ self._oid_locks[oid] = lock
+ self._oid_lock_condition.release()
+
+ def _unlock_oid(self, oid, read=True):
+ self._oid_lock_condition.acquire()
+ lock = self._oid_locks[oid]
+ if lock > 1:
+ assert read
+ self._oid_locks[oid] = lock - 1
+ else:
+ if read:
+ assert lock == 1
+ else:
+ assert lock == -1
+ del self._oid_locks[oid]
+ self._oid_lock_condition.notifyAll()
+ self._oid_lock_condition.release()
+
def copyTransactionsFrom(self, other):
if self.blob_dir:
return ZODB.blob.BlobStorageMixin.copyTransactionsFrom(self, other)
@@ -403,6 +441,7 @@
def close(self):
self._file.close()
+ self._files.close()
if hasattr(self,'_lock_file'):
self._lock_file.close()
if self._tfile:
@@ -428,22 +467,32 @@
"""Return pickle data and serial number."""
assert not version
- self._lock_acquire()
+ self._lock_oid(oid)
try:
- pos = self._lookup_pos(oid)
- h = self._read_data_header(pos, oid)
- if h.plen:
- data = self._file.read(h.plen)
- return data, h.tid
- elif h.back:
- # Get the data from the backpointer, but tid from
- # current txn.
- data = self._loadBack_impl(oid, h.back)[0]
- return data, h.tid
- else:
- raise POSKeyError(oid)
+
+ self._lock_acquire()
+ try:
+ pos = self._lookup_pos(oid)
+ _file = self._files.get()
+ finally:
+ self._lock_release()
+
+ try:
+ h = self._read_data_header(pos, oid, _file)
+ if h.plen:
+ data = _file.read(h.plen)
+ return data, h.tid
+ elif h.back:
+ # Get the data from the backpointer, but tid from
+ # current txn.
+ data = self._loadBack_impl(oid, h.back, _file=_file)[0]
+ return data, h.tid
+ else:
+ raise POSKeyError(oid)
+ finally:
+ self._files.put(_file)
finally:
- self._lock_release()
+ self._unlock_oid(oid)
def loadSerial(self, oid, serial):
self._lock_acquire()
@@ -467,9 +516,14 @@
self._lock_acquire()
try:
pos = self._lookup_pos(oid)
+ _file = self._files.get()
+ finally:
+ self._lock_release()
+
+ try:
end_tid = None
while True:
- h = self._read_data_header(pos, oid)
+ h = self._read_data_header(pos, oid, _file)
if h.tid < tid:
break
@@ -479,14 +533,14 @@
return None
if h.back:
- data, _, _, _ = self._loadBack_impl(oid, h.back)
+ data, _, _, _ = self._loadBack_impl(oid, h.back, _file=_file)
return data, h.tid, end_tid
else:
- return self._file.read(h.plen), h.tid, end_tid
-
+ return _file.read(h.plen), h.tid, end_tid
finally:
- self._lock_release()
+ self._files.put(_file)
+
def store(self, oid, oldserial, data, version, transaction):
if self._is_read_only:
raise POSException.ReadOnlyError()
@@ -736,6 +790,36 @@
finally:
self._lock_release()
+ def tpc_finish(self, transaction, f=None):
+
+ try:
+ # Get write locks
+ locked = []
+ for oid in self._tindex:
+ self._lock_oid(oid, False)
+ locked.append(oid)
+
+ self._lock_acquire()
+ try:
+ if transaction is not self._transaction:
+ return
+ try:
+ if f is not None:
+ f(self._tid)
+ u, d, e = self._ude
+ self._finish(self._tid, u, d, e)
+ self._clear_temp()
+ finally:
+ self._ude = None
+ self._transaction = None
+ self._commit_lock_release()
+ finally:
+ self._lock_release()
+
+ finally:
+ for oid in locked:
+ self._unlock_oid(oid, False)
+
def _finish(self, tid, u, d, e):
# If self._nextpos is 0, then the transaction didn't write any
# data, so we don't bother writing anything to the file.
@@ -1131,18 +1215,20 @@
opos, index = pack_result
self._lock_acquire()
try:
+ self._files.close()
self._file.close()
try:
os.rename(self._file_name, oldpath)
except Exception:
- self._file = open(self._file_name, 'r+b')
+ self._file = open(self._file_name, 'r+b', 0)
raise
# OK, we're beyond the point of no return
os.rename(self._file_name + '.pack', self._file_name)
if not self.pack_keep_old:
os.remove(oldpath)
- self._file = open(self._file_name, 'r+b')
+ self._file = open(self._file_name, 'r+b', 0)
+ self._files.open()
self._initIndex(index, self._tindex)
self._pos = opos
self._save_index()
@@ -2001,3 +2087,61 @@
'description': d}
d.update(e)
return d
+
+class FilePool:
+
+ closed = False
+
+ def __init__(self, file_name):
+ self.name = file_name
+ self._files = []
+ self._out = []
+ self._cond = threading.Condition()
+
+ def get(self):
+ self._cond.acquire()
+ try:
+ if self.closed:
+ raise ValueError('closed')
+ try:
+ f = self._files.pop()
+ except IndexError:
+ f = open(self.name, 'rb', 0)
+ self._out.append(f)
+ return f
+ finally:
+ self._cond.release()
+
+ def put(self, f):
+ self._cond.acquire()
+ try:
+ self._out.remove(f)
+ if self.closed:
+ f.close()
+ if not self._out:
+ self._cond.notifyAll()
+ else:
+ self._files.append(f)
+ finally:
+ self._cond.release()
+
+ def close(self):
+ self._cond.acquire()
+ try:
+ self.closed = True
+ while self._out:
+ self._cond.wait()
+ while self._files:
+ self._files.pop().close()
+ finally:
+ self._cond.release()
+
+ def open(self):
+ self._cond.acquire()
+ try:
+ assert self.closed
+ assert not self._files
+ assert not self._out
+ self.closed = False
+ finally:
+ self._cond.release()
Modified: ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/format.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/format.py 2009-09-16 15:38:14 UTC (rev 104152)
+++ ZODB/branches/jim-thready-zeo/src/ZODB/FileStorage/format.py 2009-09-16 20:10:08 UTC (rev 104153)
@@ -134,21 +134,24 @@
self._file.seek(pos)
return u64(self._file.read(8))
- def _read_data_header(self, pos, oid=None):
+ def _read_data_header(self, pos, oid=None, _file=None):
"""Return a DataHeader object for data record at pos.
If ois is not None, raise CorruptedDataError if oid passed
does not match oid in file.
"""
- self._file.seek(pos)
- s = self._file.read(DATA_HDR_LEN)
+ if _file is None:
+ _file = self._file
+
+ _file.seek(pos)
+ s = _file.read(DATA_HDR_LEN)
if len(s) != DATA_HDR_LEN:
raise CorruptedDataError(oid, s, pos)
h = DataHeaderFromString(s)
if oid is not None and oid != h.oid:
raise CorruptedDataError(oid, s, pos)
if not h.plen:
- h.back = u64(self._file.read(8))
+ h.back = u64(_file.read(8))
return h
def _read_txn_header(self, pos, tid=None):
@@ -164,20 +167,22 @@
h.ext = self._file.read(h.elen)
return h
- def _loadBack_impl(self, oid, back, fail=True):
+ def _loadBack_impl(self, oid, back, fail=True, _file=None):
# shared implementation used by various _loadBack methods
#
# If the backpointer ultimately resolves to 0:
# If fail is True, raise KeyError for zero backpointer.
# If fail is False, return the empty data from the record
# with no backpointer.
+ if _file is None:
+ _file = self._file
while 1:
if not back:
# If backpointer is 0, object does not currently exist.
raise POSKeyError(oid)
- h = self._read_data_header(back)
+ h = self._read_data_header(back, _file=_file)
if h.plen:
- return self._file.read(h.plen), h.tid, back, h.tloc
+ return _file.read(h.plen), h.tid, back, h.tloc
if h.back == 0 and not fail:
return None, h.tid, back, h.tloc
back = h.back
Modified: ZODB/branches/jim-thready-zeo/src/ZODB/tests/testFileStorage.py
===================================================================
--- ZODB/branches/jim-thready-zeo/src/ZODB/tests/testFileStorage.py 2009-09-16 15:38:14 UTC (rev 104152)
+++ ZODB/branches/jim-thready-zeo/src/ZODB/tests/testFileStorage.py 2009-09-16 20:10:08 UTC (rev 104153)
@@ -583,10 +583,10 @@
>>> handler.uninstall()
- >>> fs.load('\0'*8, '')
+ >>> fs.load('\0'*8, '') # doctest: +ELLIPSIS
Traceback (most recent call last):
...
- ValueError: I/O operation on closed file
+ ValueError: ...
>>> db.close()
>>> fs = ZODB.FileStorage.FileStorage('data.fs')
More information about the Zodb-checkins
mailing list