[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/blob.py Reverted
accidental checkin.
Jim Fulton
jim at zope.com
Mon Jun 4 12:03:23 EDT 2007
Log message for revision 76314:
Reverted accidental checkin.
Changed:
U ZODB/trunk/src/ZODB/blob.py
-=-
Modified: ZODB/trunk/src/ZODB/blob.py
===================================================================
--- ZODB/trunk/src/ZODB/blob.py 2007-06-04 15:53:36 UTC (rev 76313)
+++ ZODB/trunk/src/ZODB/blob.py 2007-06-04 16:03:22 UTC (rev 76314)
@@ -16,14 +16,12 @@
import base64
import logging
-import logging
import os
import shutil
import sys
+import time
import tempfile
-import threading
-import time
-import weakref
+import logging
import zope.interface
@@ -44,114 +42,78 @@
valid_modes = 'r', 'w', 'r+', 'a'
-# Threading issues:
-# We want to support closing blob files when they are destroyed.
-# This introduces a threading issue, since a blob file may be destroyed
-# via GC in any thread.
-
-
class Blob(persistent.Persistent):
"""A BLOB supports efficient handling of large data within ZODB."""
zope.interface.implements(ZODB.interfaces.IBlob)
+ _os_link = os.rename
+
+ _p_blob_readers = 0
+ _p_blob_writers = 0
_p_blob_uncommitted = None # Filename of the uncommitted (dirty) data
- _p_blob_committed = None # Filename of the committed data
+ _p_blob_data = None # Filename of the committed data
- def __setstate__(self, state=None):
- # We use lists here because it will allow is to add and remove
- # atomically
- self.readers = []
- self.writers = []
-
- __init__ = __setstate__
+ # All persistent object store a reference to their data manager, a database
+ # connection in the _p_jar attribute. So we are going to do the same with
+ # blobs here.
+ _p_blob_manager = None
- def __getstate__(self):
- return None
+ # Blobs need to participate in transactions even when not connected to
+ # a database yet. If you want to use a non-default transaction manager,
+ # you can override it via _p_blob_transaction. This is currently
+ # required for unit testing.
+ _p_blob_transaction = None
- def _p_deactivate(self):
- # Only ghostify if we are unopened.
- if self.readers or self.writers:
- return
- super(Blob, self)._p_deactivate()
-
- def _p_invalidate(self):
- # Force-close any open readers or writers,
- # XXX should we warn of this? Maybe?
- for ref in self.readers+self.writers:
- f = ref()
- if f is not None:
- f.close()
- super(Blob, self)._p_invalidate()
-
- @property
- def opened(self):
- return bool(self.readers or self.writers)
-
- def closed(self, f):
-
- # We use try/except below because another thread might remove
- # the ref after we check it if the file is GCed.
-
- for file_refs in (self.readers, self.writers):
- for ref in self.file_refs:
- if ref() is f:
- try:
- file_refs.remove(ref)
- except ValueError:
- pass
- return
-
def open(self, mode="r"):
+ """Returns a file(-like) object representing blob data."""
+ result = None
+
if mode not in valid_modes:
raise ValueError("invalid mode", mode)
- if self.writers:
- raise BlobError("Already opened for writing.")
-
if mode == 'r':
if self._current_filename() is None:
- self._create_uncommitted_file()
+ raise BlobError("Blob does not exist.")
+ if self._p_blob_writers != 0:
+ raise BlobError("Already opened for writing.")
+
+ self._p_blob_readers += 1
result = BlobFile(self._current_filename(), mode, self)
- def destroyed(ref, readers=self.readers):
- try:
- readers.remove(ref)
- except ValueError:
- pass
-
- self.readers.append(weakref.ref(result, destroyed))
- else:
- if self._p_blob_readers:
+ elif mode == 'w':
+ if self._p_blob_readers != 0:
raise BlobError("Already opened for reading.")
- if mode == 'w':
- if self._p_blob_uncommitted is None:
- self._create_uncommitted_file()
- result = BlobFile(self._p_blob_uncommitted, mode, self)
+ self._p_blob_writers += 1
+ if self._p_blob_uncommitted is None:
+ self._create_uncommitted_file()
+ result = BlobFile(self._p_blob_uncommitted, mode, self)
+
+ elif mode in ('a', 'r+'):
+ if self._p_blob_readers != 0:
+ raise BlobError("Already opened for reading.")
+
+ if self._p_blob_uncommitted is None:
+ # Create a new working copy
+ uncommitted = BlobFile(self._create_uncommitted_file(),
+ mode, self)
+ # NOTE: _p_blob data appears by virtue of Connection._setstate
+ utils.cp(file(self._p_blob_data), uncommitted)
+ uncommitted.seek(0)
else:
- if self._p_blob_uncommitted is None:
- # Create a new working copy
- self._create_uncommitted_file()
- result = BlobFile(self._p_blob_uncommitted, mode, self)
- utils.cp(file(self._p_blob_committed), result)
- if mode == 'r+':
- result.seek(0)
- else:
- # Re-use existing working copy
- result = BlobFile(self._p_blob_uncommitted, mode, self)
+ # Re-use existing working copy
+ uncommitted = BlobFile(self._p_blob_uncommitted, mode, self)
- def destroyed(ref, writers=self.writers):
- try:
- writers.remove(ref)
- except ValueError:
- pass
-
- self.writers.append(weakref.ref(result, destroyed))
+ self._p_blob_writers += 1
+ result = uncommitted
- self._p_changed = True
+ else:
+ raise IOError('invalid mode: %s ' % mode)
+ if result is not None:
+ self._setup_transaction_manager(result)
return result
def openDetached(self, class_=file):
@@ -189,7 +151,7 @@
os.unlink(target)
try:
- os.rename(filename, target)
+ self._os_link(filename, target)
except:
# Recover from the failed consumption: First remove the file, it
# might exist and mark the pointer to the uncommitted file.
@@ -213,14 +175,14 @@
# We changed the blob state and have to make sure we join the
# transaction.
- self._p_changed = True
+ self._change()
# utility methods
def _current_filename(self):
- # NOTE: _p_blob_committed and _p_blob_uncommitted appear by virtue of
+ # NOTE: _p_blob_data and _p_blob_uncommitted appear by virtue of
# Connection._setstate
- return self._p_blob_uncommitted or self._p_blob_committed
+ return self._p_blob_uncommitted or self._p_blob_data
def _create_uncommitted_file(self):
assert self._p_blob_uncommitted is None, (
@@ -229,6 +191,148 @@
self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
return self._p_blob_uncommitted
+ def _change(self):
+ self._p_changed = 1
+
+ def _setup_transaction_manager(self, result):
+ # We join the transaction with our own data manager in order to be
+ # notified of commit/vote/abort events. We do this because at
+ # transaction boundaries, we need to fix up _p_ reference counts
+ # that keep track of open readers and writers and close any
+ # writable filehandles we've opened.
+ if self._p_blob_manager is None:
+ # Blobs need to always participate in transactions.
+ if self._p_jar is not None:
+ # If we are connected to a database, then we use the
+ # transaction manager that belongs to this connection
+ tm = self._p_jar.transaction_manager
+ else:
+ # If we are not connected to a database, we check whether
+ # we have been given an explicit transaction manager
+ if self._p_blob_transaction:
+ tm = self._p_blob_transaction
+ else:
+ # Otherwise we use the default
+ # transaction manager as an educated guess.
+ tm = transaction.manager
+ # Create our datamanager and join he current transaction.
+ dm = BlobDataManager(self, result, tm)
+ tm.get().join(dm)
+ elif result:
+ # Each blob data manager should manage only the one blob
+ # assigned to it. Assert that this is the case and it is the
+ # correct blob
+ assert self._p_blob_manager.blob is self
+ self._p_blob_manager.register_fh(result)
+
+ # utility methods which should not cause the object's state to be
+ # loaded if they are called while the object is a ghost. Thus,
+ # they are named with the _p_ convention and only operate against
+ # other _p_ instance attributes. We conventionally name these methods
+ # and attributes with a _p_blob prefix.
+
+ def _p_blob_clear(self):
+ self._p_blob_readers = 0
+ self._p_blob_writers = 0
+
+ def _p_blob_decref(self, mode):
+ if mode == 'r':
+ self._p_blob_readers = max(0, self._p_blob_readers - 1)
+ else:
+ assert mode in valid_modes, "Invalid mode %r" % mode
+ self._p_blob_writers = max(0, self._p_blob_writers - 1)
+
+ def _p_blob_refcounts(self):
+ # used by unit tests
+ return self._p_blob_readers, self._p_blob_writers
+
+
+class BlobDataManager:
+ """Special data manager to handle transaction boundaries for blobs.
+
+ Blobs need some special care-taking on transaction boundaries. As
+
+ a) the ghost objects might get reused, the _p_reader and _p_writer
+ refcount attributes must be set to a consistent state
+ b) the file objects might get passed out of the thread/transaction
+ and must deny any relationship to the original blob.
+ c) writable blob filehandles must be closed at the end of a txn so
+ as to not allow reuse between two transactions.
+
+ """
+
+ zope.interface.implements(transaction.interfaces.IDataManager)
+
+ def __init__(self, blob, filehandle, tm):
+ self.blob = blob
+ self.transaction = tm.get()
+ # we keep a weakref to the file handle because we don't want to
+ # keep it alive if all other references to it die (e.g. in the
+ # case it's opened without assigning it to a name).
+ self.fhrefs = utils.WeakSet()
+ self.register_fh(filehandle)
+ self.sortkey = time.time()
+ self.prepared = False
+
+ # Blob specific methods
+
+ def register_fh(self, filehandle):
+ self.fhrefs.add(filehandle)
+
+ def _remove_uncommitted_data(self):
+ self.blob._p_blob_clear()
+ self.fhrefs.map(lambda fhref: fhref.close())
+ if (self.blob._p_blob_uncommitted is not None and
+ os.path.exists(self.blob._p_blob_uncommitted)):
+ os.unlink(self.blob._p_blob_uncommitted)
+ self.blob._p_blob_uncommitted = None
+
+ # IDataManager
+
+ def tpc_begin(self, transaction):
+ if self.prepared:
+ raise TypeError('Already prepared')
+ self._checkTransaction(transaction)
+ self.prepared = True
+ self.transaction = transaction
+ self.fhrefs.map(lambda fhref: fhref.close())
+
+ def commit(self, transaction):
+ if not self.prepared:
+ raise TypeError('Not prepared to commit')
+ self._checkTransaction(transaction)
+ self.transaction = None
+ self.prepared = False
+
+ self.blob._p_blob_clear()
+
+ def abort(self, transaction):
+ self.tpc_abort(transaction)
+
+ def tpc_abort(self, transaction):
+ self._checkTransaction(transaction)
+ if self.transaction is not None:
+ self.transaction = None
+ self.prepared = False
+
+ self._remove_uncommitted_data()
+
+ def tpc_finish(self, transaction):
+ pass
+
+ def tpc_vote(self, transaction):
+ pass
+
+ def sortKey(self):
+ return self.sortkey
+
+ def _checkTransaction(self, transaction):
+ if (self.transaction is not None and
+ self.transaction is not transaction):
+ raise TypeError("Transaction missmatch",
+ transaction, self.transaction)
+
+
class BlobFile(file):
"""A BlobFile that holds a file handle to actual blob data.
@@ -245,11 +349,36 @@
def __init__(self, name, mode, blob):
super(BlobFile, self).__init__(name, mode+'b')
self.blob = blob
-
+ self.close_called = False
+
+ def write(self, data):
+ super(BlobFile, self).write(data)
+ self.blob._change()
+
+ def writelines(self, lines):
+ super(BlobFile, self).writelines(lines)
+ self.blob._change()
+
+ def truncate(self, size=0):
+ super(BlobFile, self).truncate(size)
+ self.blob._change()
+
def close(self):
- self.blob.closed(self)
- file.close(self)
+ # we don't want to decref twice
+ if not self.close_called:
+ self.blob._p_blob_decref(self.mode[:-1])
+ self.close_called = True
+ super(BlobFile, self).close()
+ def __del__(self):
+ # XXX we need to ensure that the file is closed at object
+ # expiration or our blob's refcount won't be decremented.
+ # This probably needs some work; I don't know if the names
+ # 'BlobFile' or 'super' will be available at program exit, but
+ # we'll assume they will be for now in the name of not
+ # muddying the code needlessly.
+ self.close()
+
_pid = str(os.getpid())
def log(msg, level=logging.INFO, subsys=_pid, exc_info=False):
More information about the Zodb-checkins
mailing list