[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/ Made a number of blob
changes:
Jim Fulton
jim at zope.com
Wed Jun 6 12:25:13 EDT 2007
Log message for revision 76436:
Made a number of blob changes:
- Unwritten blobs can now be read, and are empty.
- Blobs are considered modified when opened for writing. This is a
little bit more conservative than before but fixes a bug that a file
opened with 'w' actually does modify the file and wasn't considered
to be a change before.
- Optimistic savepoints now work.
- Fixed bug: could open multiple files for writing.
- Fixed bug: aborting a transaction removed uncommitted data for
uncommitted blobs.
Todo:
Need to remove uncommitted data file if a blob is GCed even when a
transaction isn't aborted or when it hasn't been added to anything.
- No-longer close files on transaction boundaries.
This allows us to get rid of the transaction-manager dance.
Changed:
U ZODB/trunk/src/ZODB/blob.py
U ZODB/trunk/src/ZODB/tests/blob_basic.txt
U ZODB/trunk/src/ZODB/tests/blob_transaction.txt
-=-
Modified: ZODB/trunk/src/ZODB/blob.py
===================================================================
--- ZODB/trunk/src/ZODB/blob.py 2007-06-06 16:21:05 UTC (rev 76435)
+++ ZODB/trunk/src/ZODB/blob.py 2007-06-06 16:25:12 UTC (rev 76436)
@@ -16,12 +16,14 @@
import base64
import logging
+import logging
import os
import shutil
import sys
-import time
import tempfile
-import logging
+import threading
+import time
+import weakref
import zope.interface
@@ -42,78 +44,123 @@
valid_modes = 'r', 'w', 'r+', 'a'
+# Threading issues:
+# We want to support closing blob files when they are destroyed.
+# This introduces a threading issue, since a blob file may be destroyed
+# via GC in any thread.
+
+
class Blob(persistent.Persistent):
"""A BLOB supports efficient handling of large data within ZODB."""
zope.interface.implements(ZODB.interfaces.IBlob)
- _os_link = os.rename
-
- _p_blob_readers = 0
- _p_blob_writers = 0
_p_blob_uncommitted = None # Filename of the uncommitted (dirty) data
- _p_blob_data = None # Filename of the committed data
+ _p_blob_committed = None # Filename of the committed data
- # All persistent object store a reference to their data manager, a database
- # connection in the _p_jar attribute. So we are going to do the same with
- # blobs here.
- _p_blob_manager = None
+ readers = writers = None
+ def __setstate__(self, state=None):
+ # We use lists here because it will allow is to add and remove
+ # atomically
+ self.readers = []
+ self.writers = []
+
+ __init__ = __setstate__
- # Blobs need to participate in transactions even when not connected to
- # a database yet. If you want to use a non-default transaction manager,
- # you can override it via _p_blob_transaction. This is currently
- # required for unit testing.
- _p_blob_transaction = None
+ def __getstate__(self):
+ return None
- def open(self, mode="r"):
- """Returns a file(-like) object representing blob data."""
- result = None
+ def _p_deactivate(self):
+ # Only ghostify if we are unopened.
+ if self.readers or self.writers:
+ return
+ super(Blob, self)._p_deactivate()
+
+ def _p_invalidate(self):
+ # Force-close any open readers or writers,
+ # XXX should we warn of this? Maybe?
+ if self._p_changed is None:
+ return
+ for ref in self.readers+self.writers:
+ f = ref()
+ if f is not None:
+ f.close()
+
+ if (self._p_blob_uncommitted
+ and os.path.exists(self._p_blob_uncommitted)
+ ):
+ os.remove(self._p_blob_uncommitted)
+ super(Blob, self)._p_invalidate()
+
+ @property
+ def opened(self):
+ return bool(self.readers or self.writers)
+
+ def closed(self, f):
+
+ # We use try/except below because another thread might remove
+ # the ref after we check it if the file is GCed.
+
+ for file_refs in (self.readers, self.writers):
+ for ref in file_refs:
+ if ref() is f:
+ try:
+ file_refs.remove(ref)
+ except ValueError:
+ pass
+ return
+
+ def open(self, mode="r"):
if mode not in valid_modes:
raise ValueError("invalid mode", mode)
+ if self.writers:
+ raise BlobError("Already opened for writing.")
+
if mode == 'r':
if self._current_filename() is None:
- raise BlobError("Blob does not exist.")
+ self._create_uncommitted_file()
- if self._p_blob_writers != 0:
- raise BlobError("Already opened for writing.")
-
- self._p_blob_readers += 1
result = BlobFile(self._current_filename(), mode, self)
- elif mode == 'w':
- if self._p_blob_readers != 0:
+ def destroyed(ref, readers=self.readers):
+ try:
+ readers.remove(ref)
+ except ValueError:
+ pass
+
+ self.readers.append(weakref.ref(result, destroyed))
+ else:
+ if self.readers:
raise BlobError("Already opened for reading.")
- self._p_blob_writers += 1
- if self._p_blob_uncommitted is None:
- self._create_uncommitted_file()
- result = BlobFile(self._p_blob_uncommitted, mode, self)
-
- elif mode in ('a', 'r+'):
- if self._p_blob_readers != 0:
- raise BlobError("Already opened for reading.")
-
- if self._p_blob_uncommitted is None:
- # Create a new working copy
- uncommitted = BlobFile(self._create_uncommitted_file(),
- mode, self)
- # NOTE: _p_blob data appears by virtue of Connection._setstate
- utils.cp(file(self._p_blob_data), uncommitted)
- uncommitted.seek(0)
+ if mode == 'w':
+ if self._p_blob_uncommitted is None:
+ self._create_uncommitted_file()
+ result = BlobFile(self._p_blob_uncommitted, mode, self)
else:
- # Re-use existing working copy
- uncommitted = BlobFile(self._p_blob_uncommitted, mode, self)
+ if self._p_blob_uncommitted is None:
+ # Create a new working copy
+ self._create_uncommitted_file()
+ result = BlobFile(self._p_blob_uncommitted, mode, self)
+ utils.cp(file(self._p_blob_committed), result)
+ if mode == 'r+':
+ result.seek(0)
+ else:
+ # Re-use existing working copy
+ result = BlobFile(self._p_blob_uncommitted, mode, self)
- self._p_blob_writers += 1
- result = uncommitted
+ def destroyed(ref, writers=self.writers):
+ try:
+ writers.remove(ref)
+ except ValueError:
+ pass
+
+ self.writers.append(weakref.ref(result, destroyed))
- else:
- raise IOError('invalid mode: %s ' % mode)
+ self._p_changed = True
- if result is not None:
- self._setup_transaction_manager(result)
return result
def openDetached(self, class_=file):
@@ -123,7 +170,7 @@
"""
if self._current_filename() is None:
raise BlobError("Blob does not exist.")
- if self._p_blob_writers != 0:
+ if self.writers:
raise BlobError("Already opened for writing.")
# XXX this should increase the reader number and have a test !?!
return class_(self._current_filename(), "rb")
@@ -132,9 +179,9 @@
"""Will replace the current data of the blob with the file given under
filename.
"""
- if self._p_blob_writers != 0:
+ if self.writers:
raise BlobError("Already opened for writing.")
- if self._p_blob_readers != 0:
+ if self.readers:
raise BlobError("Already opened for reading.")
previous_uncommitted = bool(self._p_blob_uncommitted)
@@ -151,7 +198,7 @@
os.unlink(target)
try:
- self._os_link(filename, target)
+ os.rename(filename, target)
except:
# Recover from the failed consumption: First remove the file, it
# might exist and mark the pointer to the uncommitted file.
@@ -175,14 +222,14 @@
# We changed the blob state and have to make sure we join the
# transaction.
- self._change()
+ self._p_changed = True
# utility methods
def _current_filename(self):
- # NOTE: _p_blob_data and _p_blob_uncommitted appear by virtue of
+ # NOTE: _p_blob_committed and _p_blob_uncommitted appear by virtue of
# Connection._setstate
- return self._p_blob_uncommitted or self._p_blob_data
+ return self._p_blob_uncommitted or self._p_blob_committed
def _create_uncommitted_file(self):
assert self._p_blob_uncommitted is None, (
@@ -191,148 +238,6 @@
self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
return self._p_blob_uncommitted
- def _change(self):
- self._p_changed = 1
-
- def _setup_transaction_manager(self, result):
- # We join the transaction with our own data manager in order to be
- # notified of commit/vote/abort events. We do this because at
- # transaction boundaries, we need to fix up _p_ reference counts
- # that keep track of open readers and writers and close any
- # writable filehandles we've opened.
- if self._p_blob_manager is None:
- # Blobs need to always participate in transactions.
- if self._p_jar is not None:
- # If we are connected to a database, then we use the
- # transaction manager that belongs to this connection
- tm = self._p_jar.transaction_manager
- else:
- # If we are not connected to a database, we check whether
- # we have been given an explicit transaction manager
- if self._p_blob_transaction:
- tm = self._p_blob_transaction
- else:
- # Otherwise we use the default
- # transaction manager as an educated guess.
- tm = transaction.manager
- # Create our datamanager and join he current transaction.
- dm = BlobDataManager(self, result, tm)
- tm.get().join(dm)
- elif result:
- # Each blob data manager should manage only the one blob
- # assigned to it. Assert that this is the case and it is the
- # correct blob
- assert self._p_blob_manager.blob is self
- self._p_blob_manager.register_fh(result)
-
- # utility methods which should not cause the object's state to be
- # loaded if they are called while the object is a ghost. Thus,
- # they are named with the _p_ convention and only operate against
- # other _p_ instance attributes. We conventionally name these methods
- # and attributes with a _p_blob prefix.
-
- def _p_blob_clear(self):
- self._p_blob_readers = 0
- self._p_blob_writers = 0
-
- def _p_blob_decref(self, mode):
- if mode == 'r':
- self._p_blob_readers = max(0, self._p_blob_readers - 1)
- else:
- assert mode in valid_modes, "Invalid mode %r" % mode
- self._p_blob_writers = max(0, self._p_blob_writers - 1)
-
- def _p_blob_refcounts(self):
- # used by unit tests
- return self._p_blob_readers, self._p_blob_writers
-
-
-class BlobDataManager:
- """Special data manager to handle transaction boundaries for blobs.
-
- Blobs need some special care-taking on transaction boundaries. As
-
- a) the ghost objects might get reused, the _p_reader and _p_writer
- refcount attributes must be set to a consistent state
- b) the file objects might get passed out of the thread/transaction
- and must deny any relationship to the original blob.
- c) writable blob filehandles must be closed at the end of a txn so
- as to not allow reuse between two transactions.
-
- """
-
- zope.interface.implements(transaction.interfaces.IDataManager)
-
- def __init__(self, blob, filehandle, tm):
- self.blob = blob
- self.transaction = tm.get()
- # we keep a weakref to the file handle because we don't want to
- # keep it alive if all other references to it die (e.g. in the
- # case it's opened without assigning it to a name).
- self.fhrefs = utils.WeakSet()
- self.register_fh(filehandle)
- self.sortkey = time.time()
- self.prepared = False
-
- # Blob specific methods
-
- def register_fh(self, filehandle):
- self.fhrefs.add(filehandle)
-
- def _remove_uncommitted_data(self):
- self.blob._p_blob_clear()
- self.fhrefs.map(lambda fhref: fhref.close())
- if (self.blob._p_blob_uncommitted is not None and
- os.path.exists(self.blob._p_blob_uncommitted)):
- os.unlink(self.blob._p_blob_uncommitted)
- self.blob._p_blob_uncommitted = None
-
- # IDataManager
-
- def tpc_begin(self, transaction):
- if self.prepared:
- raise TypeError('Already prepared')
- self._checkTransaction(transaction)
- self.prepared = True
- self.transaction = transaction
- self.fhrefs.map(lambda fhref: fhref.close())
-
- def commit(self, transaction):
- if not self.prepared:
- raise TypeError('Not prepared to commit')
- self._checkTransaction(transaction)
- self.transaction = None
- self.prepared = False
-
- self.blob._p_blob_clear()
-
- def abort(self, transaction):
- self.tpc_abort(transaction)
-
- def tpc_abort(self, transaction):
- self._checkTransaction(transaction)
- if self.transaction is not None:
- self.transaction = None
- self.prepared = False
-
- self._remove_uncommitted_data()
-
- def tpc_finish(self, transaction):
- pass
-
- def tpc_vote(self, transaction):
- pass
-
- def sortKey(self):
- return self.sortkey
-
- def _checkTransaction(self, transaction):
- if (self.transaction is not None and
- self.transaction is not transaction):
- raise TypeError("Transaction missmatch",
- transaction, self.transaction)
-
-
class BlobFile(file):
"""A BlobFile that holds a file handle to actual blob data.
@@ -349,36 +254,11 @@
def __init__(self, name, mode, blob):
super(BlobFile, self).__init__(name, mode+'b')
self.blob = blob
- self.close_called = False
-
- def write(self, data):
- super(BlobFile, self).write(data)
- self.blob._change()
-
- def writelines(self, lines):
- super(BlobFile, self).writelines(lines)
- self.blob._change()
-
- def truncate(self, size=0):
- super(BlobFile, self).truncate(size)
- self.blob._change()
-
+
def close(self):
- # we don't want to decref twice
- if not self.close_called:
- self.blob._p_blob_decref(self.mode[:-1])
- self.close_called = True
- super(BlobFile, self).close()
+ self.blob.closed(self)
+ file.close(self)
- def __del__(self):
- # XXX we need to ensure that the file is closed at object
- # expiration or our blob's refcount won't be decremented.
- # This probably needs some work; I don't know if the names
- # 'BlobFile' or 'super' will be available at program exit, but
- # we'll assume they will be for now in the name of not
- # muddying the code needlessly.
- self.close()
-
_pid = str(os.getpid())
def log(msg, level=logging.INFO, subsys=_pid, exc_info=False):
Modified: ZODB/trunk/src/ZODB/tests/blob_basic.txt
===================================================================
--- ZODB/trunk/src/ZODB/tests/blob_basic.txt 2007-06-06 16:21:05 UTC (rev 76435)
+++ ZODB/trunk/src/ZODB/tests/blob_basic.txt 2007-06-06 16:25:12 UTC (rev 76436)
@@ -26,12 +26,10 @@
>>> IBlob.providedBy(myblob)
True
-Opening a new Blob for reading fails:
+We can open a new blob file for reading, but it won't have any data:
- >>> myblob.open("r")
- Traceback (most recent call last):
- ...
- BlobError: Blob does not exist.
+ >>> myblob.open("r").read()
+ ''
But we can write data to a new Blob by opening it for writing:
Modified: ZODB/trunk/src/ZODB/tests/blob_transaction.txt
===================================================================
--- ZODB/trunk/src/ZODB/tests/blob_transaction.txt 2007-06-06 16:21:05 UTC (rev 76435)
+++ ZODB/trunk/src/ZODB/tests/blob_transaction.txt 2007-06-06 16:25:12 UTC (rev 76436)
@@ -35,88 +35,98 @@
>>> blob1 = Blob()
>>> blob1.open('w').write('this is blob 1')
>>> root1['blob1'] = blob1
- >>> transaction.commit()
+ >>> 'blob1' in root1
+ True
+
+Aborting a blob add leaves the blob unchanged:
-Aborting a transaction involving a blob write cleans up uncommitted
-file data::
+ >>> transaction.abort()
+ >>> 'blob1' in root1
+ False
- >>> dead_blob = Blob()
- >>> dead_blob.open('w').write('this is a dead blob')
- >>> root1['dead_blob'] = dead_blob
- >>> fname = dead_blob._p_blob_uncommitted
+ >>> blob1._p_oid
+ >>> blob1._p_jar
+ >>> blob1.open().read()
+ 'this is blob 1'
+
+It doesn't clear the file because there is no previously committed version:
+
+ >>> fname = blob1._p_blob_uncommitted
>>> import os
>>> os.path.exists(fname)
True
+
+Let's put the blob back into the root and commit the change:
+
+ >>> root1['blob1'] = blob1
+ >>> transaction.commit()
+
+Now, if we make a change and abort it, we'll return to the committed
+state:
+
+ >>> os.path.exists(fname)
+ False
+ >>> blob1._p_blob_uncommitted
+
+ >>> blob1.open('w').write('this is new blob 1')
+ >>> blob1.open().read()
+ 'this is new blob 1'
+ >>> fname = blob1._p_blob_uncommitted
+ >>> os.path.exists(fname)
+ True
+
>>> transaction.abort()
>>> os.path.exists(fname)
False
+ >>> blob1._p_blob_uncommitted
+ >>> blob1.open().read()
+ 'this is blob 1'
+
Opening a blob gives us a filehandle. Getting data out of the
resulting filehandle is accomplished via the filehandle's read method::
>>> connection2 = database.open()
>>> root2 = connection2.root()
>>> blob1a = root2['blob1']
- >>> blob1a._p_blob_refcounts()
- (0, 0)
- >>>
+
>>> blob1afh1 = blob1a.open("r")
>>> blob1afh1.read()
'this is blob 1'
- >>> # The filehandle keeps a reference to its blob object
- >>> blob1afh1.blob._p_blob_refcounts()
- (1, 0)
-Let's make another filehandle for read only to blob1a, this should bump
-up its refcount by one, and each file handle has a reference to the
-(same) underlying blob::
+Let's make another filehandle for read only to blob1a. Aach file
+handle has a reference to the (same) underlying blob::
>>> blob1afh2 = blob1a.open("r")
- >>> blob1afh2.blob._p_blob_refcounts()
- (2, 0)
- >>> blob1afh1.blob._p_blob_refcounts()
- (2, 0)
>>> blob1afh2.blob is blob1afh1.blob
True
-Let's close the first filehandle we got from the blob, this should decrease
-its refcount by one::
+Let's close the first filehandle we got from the blob::
>>> blob1afh1.close()
- >>> blob1a._p_blob_refcounts()
- (1, 0)
Let's abort this transaction, and ensure that the filehandles that we
-opened are now closed and that the filehandle refcounts on the blob
-object are cleared::
+opened are still open::
>>> transaction.abort()
- >>> blob1afh1.blob._p_blob_refcounts()
- (0, 0)
- >>> blob1afh2.blob._p_blob_refcounts()
- (0, 0)
- >>> blob1a._p_blob_refcounts()
- (0, 0)
>>> blob1afh2.read()
- Traceback (most recent call last):
- ...
- ValueError: I/O operation on closed file
+ 'this is blob 1'
-If we open a blob for append, its write refcount should be nonzero.
-Additionally, writing any number of bytes to the blobfile should
-result in the blob being marked "dirty" in the connection (we just
-aborted above, so the object should be "clean" when we start)::
+ >>> blob1afh2.close()
+If we open a blob for append, writing any number of bytes to the
+blobfile should result in the blob being marked "dirty" in the
+connection (we just aborted above, so the object should be "clean"
+when we start)::
+
>>> bool(blob1a._p_changed)
False
>>> blob1a.open('r').read()
'this is blob 1'
>>> blob1afh3 = blob1a.open('a')
- >>> blob1afh3.write('woot!')
- >>> blob1a._p_blob_refcounts()
- (0, 1)
>>> bool(blob1a._p_changed)
True
+ >>> blob1afh3.write('woot!')
We can open more than one blob object during the course of a single
transaction::
@@ -125,10 +135,6 @@
>>> blob2.open('w').write('this is blob 3')
>>> root2['blob2'] = blob2
>>> transaction.commit()
- >>> blob2._p_blob_refcounts()
- (0, 0)
- >>> blob1._p_blob_refcounts()
- (0, 0)
Since we committed the current transaction above, the aggregate
changes we've made to blob, blob1a (these refer to the same object) and
@@ -200,7 +206,7 @@
Savepoints and Blobs
--------------------
-We do support optimistic savepoints ::
+We do support optimistic savepoints:
>>> connection5 = database.open()
>>> root5 = connection5.root()
@@ -222,17 +228,16 @@
"I'm a happy blob. And I'm singing."
>>> transaction.get().commit()
-We do not support non-optimistic savepoints::
+We support optimistic savepoints too:
- >>> blob_fh = root5['blob'].open("a")
- >>> blob_fh.write(" And the weather is beautiful.")
- >>> blob_fh.close()
+ >>> root5['blob'].open("a").write(" And I'm dancing.")
>>> root5['blob'].open("r").read()
- "I'm a happy blob. And I'm singing. And the weather is beautiful."
- >>> savepoint = transaction.savepoint() # doctest: +ELLIPSIS
- Traceback (most recent call last):
- ...
- TypeError: ('Savepoints unsupported', <ZODB.blob.BlobDataManager instance at 0x...>)
+ "I'm a happy blob. And I'm singing. And I'm dancing."
+ >>> savepoint = transaction.savepoint()
+ >>> root5['blob'].open("w").write(" And the weather is beautiful.")
+ >>> savepoint.rollback()
+ >>> root5['blob'].open("r").read()
+ "I'm a happy blob. And I'm singing. And I'm dancing."
>>> transaction.abort()
Reading Blobs outside of a transaction
More information about the Zodb-checkins
mailing list