[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/ Actually get rid of Blobs
directory
Jim Fulton
jim at zope.com
Mon Jun 4 11:16:08 EDT 2007
Log message for revision 76304:
Actually get rid of Blobs directory
Changed:
D ZODB/trunk/src/ZODB/Blobs/
U ZODB/trunk/src/ZODB/blob.py
-=-
Modified: ZODB/trunk/src/ZODB/blob.py
===================================================================
--- ZODB/trunk/src/ZODB/blob.py 2007-06-04 13:30:08 UTC (rev 76303)
+++ ZODB/trunk/src/ZODB/blob.py 2007-06-04 15:16:08 UTC (rev 76304)
@@ -16,12 +16,14 @@
import base64
import logging
+import logging
import os
import shutil
import sys
-import time
import tempfile
-import logging
+import threading
+import time
+import weakref
import zope.interface
@@ -42,78 +44,114 @@
valid_modes = 'r', 'w', 'r+', 'a'
+# Threading issues:
+# We want to support closing blob files when they are destroyed.
+# This introduces a threading issue, since a blob file may be destroyed
+# via GC in any thread.
+
+
class Blob(persistent.Persistent):
"""A BLOB supports efficient handling of large data within ZODB."""
zope.interface.implements(ZODB.interfaces.IBlob)
- _os_link = os.rename
-
- _p_blob_readers = 0
- _p_blob_writers = 0
_p_blob_uncommitted = None # Filename of the uncommitted (dirty) data
- _p_blob_data = None # Filename of the committed data
+ _p_blob_committed = None # Filename of the committed data
- # All persistent object store a reference to their data manager, a database
- # connection in the _p_jar attribute. So we are going to do the same with
- # blobs here.
- _p_blob_manager = None
+ def __setstate__(self, state=None):
+ # We use lists here because it will allow is to add and remove
+ # atomically
+ self.readers = []
+ self.writers = []
+
+ __init__ = __setstate__
- # Blobs need to participate in transactions even when not connected to
- # a database yet. If you want to use a non-default transaction manager,
- # you can override it via _p_blob_transaction. This is currently
- # required for unit testing.
- _p_blob_transaction = None
+ def __getstate__(self):
+ return None
+ def _p_deactivate(self):
+ # Only ghostify if we are unopened.
+ if self.readers or self.writers:
+ return
+ super(Blob, self)._p_deactivate()
+
+ def _p_invalidate(self):
+ # Force-close any open readers or writers,
+ # XXX should we warn of this? Maybe?
+ for ref in self.readers+self.writers:
+ f = ref()
+ if f is not None:
+ f.close()
+ super(Blob, self)._p_invalidate()
+
+ @property
+ def opened(self):
+ return bool(self.readers or self.writers)
+
+ def closed(self, f):
+
+ # We use try/except below because another thread might remove
+ # the ref after we check it if the file is GCed.
+
+ for file_refs in (self.readers, self.writers):
+ for ref in self.file_refs:
+ if ref() is f:
+ try:
+ file_refs.remove(ref)
+ except ValueError:
+ pass
+ return
+
def open(self, mode="r"):
- """Returns a file(-like) object representing blob data."""
- result = None
-
if mode not in valid_modes:
raise ValueError("invalid mode", mode)
+ if self.writers:
+ raise BlobError("Already opened for writing.")
+
if mode == 'r':
if self._current_filename() is None:
- raise BlobError("Blob does not exist.")
+ self._create_uncommitted_file()
- if self._p_blob_writers != 0:
- raise BlobError("Already opened for writing.")
-
- self._p_blob_readers += 1
result = BlobFile(self._current_filename(), mode, self)
- elif mode == 'w':
- if self._p_blob_readers != 0:
+ def destroyed(ref, readers=self.readers):
+ try:
+ readers.remove(ref)
+ except ValueError:
+ pass
+
+ self.readers.append(weakref.ref(result, destroyed))
+ else:
+ if self._p_blob_readers:
raise BlobError("Already opened for reading.")
- self._p_blob_writers += 1
- if self._p_blob_uncommitted is None:
- self._create_uncommitted_file()
- result = BlobFile(self._p_blob_uncommitted, mode, self)
-
- elif mode in ('a', 'r+'):
- if self._p_blob_readers != 0:
- raise BlobError("Already opened for reading.")
-
- if self._p_blob_uncommitted is None:
- # Create a new working copy
- uncommitted = BlobFile(self._create_uncommitted_file(),
- mode, self)
- # NOTE: _p_blob data appears by virtue of Connection._setstate
- utils.cp(file(self._p_blob_data), uncommitted)
- uncommitted.seek(0)
+ if mode == 'w':
+ if self._p_blob_uncommitted is None:
+ self._create_uncommitted_file()
+ result = BlobFile(self._p_blob_uncommitted, mode, self)
else:
- # Re-use existing working copy
- uncommitted = BlobFile(self._p_blob_uncommitted, mode, self)
+ if self._p_blob_uncommitted is None:
+ # Create a new working copy
+ self._create_uncommitted_file()
+ result = BlobFile(self._p_blob_uncommitted, mode, self)
+ utils.cp(file(self._p_blob_committed), result)
+ if mode == 'r+':
+ result.seek(0)
+ else:
+ # Re-use existing working copy
+ result = BlobFile(self._p_blob_uncommitted, mode, self)
- self._p_blob_writers += 1
- result = uncommitted
+ def destroyed(ref, writers=self.writers):
+ try:
+ writers.remove(ref)
+ except ValueError:
+ pass
+
+ self.writers.append(weakref.ref(result, destroyed))
- else:
- raise IOError('invalid mode: %s ' % mode)
+ self._p_changed = True
- if result is not None:
- self._setup_transaction_manager(result)
return result
def openDetached(self, class_=file):
@@ -151,7 +189,7 @@
os.unlink(target)
try:
- self._os_link(filename, target)
+ os.rename(filename, target)
except:
# Recover from the failed consumption: First remove the file, it
# might exist and mark the pointer to the uncommitted file.
@@ -175,14 +213,14 @@
# We changed the blob state and have to make sure we join the
# transaction.
- self._change()
+ self._p_changed = True
# utility methods
def _current_filename(self):
- # NOTE: _p_blob_data and _p_blob_uncommitted appear by virtue of
+ # NOTE: _p_blob_committed and _p_blob_uncommitted appear by virtue of
# Connection._setstate
- return self._p_blob_uncommitted or self._p_blob_data
+ return self._p_blob_uncommitted or self._p_blob_committed
def _create_uncommitted_file(self):
assert self._p_blob_uncommitted is None, (
@@ -191,148 +229,6 @@
self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
return self._p_blob_uncommitted
- def _change(self):
- self._p_changed = 1
-
- def _setup_transaction_manager(self, result):
- # We join the transaction with our own data manager in order to be
- # notified of commit/vote/abort events. We do this because at
- # transaction boundaries, we need to fix up _p_ reference counts
- # that keep track of open readers and writers and close any
- # writable filehandles we've opened.
- if self._p_blob_manager is None:
- # Blobs need to always participate in transactions.
- if self._p_jar is not None:
- # If we are connected to a database, then we use the
- # transaction manager that belongs to this connection
- tm = self._p_jar.transaction_manager
- else:
- # If we are not connected to a database, we check whether
- # we have been given an explicit transaction manager
- if self._p_blob_transaction:
- tm = self._p_blob_transaction
- else:
- # Otherwise we use the default
- # transaction manager as an educated guess.
- tm = transaction.manager
- # Create our datamanager and join he current transaction.
- dm = BlobDataManager(self, result, tm)
- tm.get().join(dm)
- elif result:
- # Each blob data manager should manage only the one blob
- # assigned to it. Assert that this is the case and it is the
- # correct blob
- assert self._p_blob_manager.blob is self
- self._p_blob_manager.register_fh(result)
-
- # utility methods which should not cause the object's state to be
- # loaded if they are called while the object is a ghost. Thus,
- # they are named with the _p_ convention and only operate against
- # other _p_ instance attributes. We conventionally name these methods
- # and attributes with a _p_blob prefix.
-
- def _p_blob_clear(self):
- self._p_blob_readers = 0
- self._p_blob_writers = 0
-
- def _p_blob_decref(self, mode):
- if mode == 'r':
- self._p_blob_readers = max(0, self._p_blob_readers - 1)
- else:
- assert mode in valid_modes, "Invalid mode %r" % mode
- self._p_blob_writers = max(0, self._p_blob_writers - 1)
-
- def _p_blob_refcounts(self):
- # used by unit tests
- return self._p_blob_readers, self._p_blob_writers
-
-
-class BlobDataManager:
- """Special data manager to handle transaction boundaries for blobs.
-
- Blobs need some special care-taking on transaction boundaries. As
-
- a) the ghost objects might get reused, the _p_reader and _p_writer
- refcount attributes must be set to a consistent state
- b) the file objects might get passed out of the thread/transaction
- and must deny any relationship to the original blob.
- c) writable blob filehandles must be closed at the end of a txn so
- as to not allow reuse between two transactions.
-
- """
-
- zope.interface.implements(transaction.interfaces.IDataManager)
-
- def __init__(self, blob, filehandle, tm):
- self.blob = blob
- self.transaction = tm.get()
- # we keep a weakref to the file handle because we don't want to
- # keep it alive if all other references to it die (e.g. in the
- # case it's opened without assigning it to a name).
- self.fhrefs = utils.WeakSet()
- self.register_fh(filehandle)
- self.sortkey = time.time()
- self.prepared = False
-
- # Blob specific methods
-
- def register_fh(self, filehandle):
- self.fhrefs.add(filehandle)
-
- def _remove_uncommitted_data(self):
- self.blob._p_blob_clear()
- self.fhrefs.map(lambda fhref: fhref.close())
- if (self.blob._p_blob_uncommitted is not None and
- os.path.exists(self.blob._p_blob_uncommitted)):
- os.unlink(self.blob._p_blob_uncommitted)
- self.blob._p_blob_uncommitted = None
-
- # IDataManager
-
- def tpc_begin(self, transaction):
- if self.prepared:
- raise TypeError('Already prepared')
- self._checkTransaction(transaction)
- self.prepared = True
- self.transaction = transaction
- self.fhrefs.map(lambda fhref: fhref.close())
-
- def commit(self, transaction):
- if not self.prepared:
- raise TypeError('Not prepared to commit')
- self._checkTransaction(transaction)
- self.transaction = None
- self.prepared = False
-
- self.blob._p_blob_clear()
-
- def abort(self, transaction):
- self.tpc_abort(transaction)
-
- def tpc_abort(self, transaction):
- self._checkTransaction(transaction)
- if self.transaction is not None:
- self.transaction = None
- self.prepared = False
-
- self._remove_uncommitted_data()
-
- def tpc_finish(self, transaction):
- pass
-
- def tpc_vote(self, transaction):
- pass
-
- def sortKey(self):
- return self.sortkey
-
- def _checkTransaction(self, transaction):
- if (self.transaction is not None and
- self.transaction is not transaction):
- raise TypeError("Transaction missmatch",
- transaction, self.transaction)
-
-
class BlobFile(file):
"""A BlobFile that holds a file handle to actual blob data.
@@ -349,36 +245,11 @@
def __init__(self, name, mode, blob):
super(BlobFile, self).__init__(name, mode+'b')
self.blob = blob
- self.close_called = False
-
- def write(self, data):
- super(BlobFile, self).write(data)
- self.blob._change()
-
- def writelines(self, lines):
- super(BlobFile, self).writelines(lines)
- self.blob._change()
-
- def truncate(self, size=0):
- super(BlobFile, self).truncate(size)
- self.blob._change()
-
+
def close(self):
- # we don't want to decref twice
- if not self.close_called:
- self.blob._p_blob_decref(self.mode[:-1])
- self.close_called = True
- super(BlobFile, self).close()
+ self.blob.closed(self)
+ file.close(self)
- def __del__(self):
- # XXX we need to ensure that the file is closed at object
- # expiration or our blob's refcount won't be decremented.
- # This probably needs some work; I don't know if the names
- # 'BlobFile' or 'super' will be available at program exit, but
- # we'll assume they will be for now in the name of not
- # muddying the code needlessly.
- self.close()
-
_pid = str(os.getpid())
def log(msg, level=logging.INFO, subsys=_pid, exc_info=False):
More information about the Zodb-checkins
mailing list