[Zodb-checkins] SVN: ZODB/trunk/src/ZODB/ Actually get rid of Blobs directory

Jim Fulton jim at zope.com
Mon Jun 4 11:16:08 EDT 2007


Log message for revision 76304:
  Actually get rid of Blobs directory

Changed:
  D   ZODB/trunk/src/ZODB/Blobs/
  U   ZODB/trunk/src/ZODB/blob.py

-=-
Modified: ZODB/trunk/src/ZODB/blob.py
===================================================================
--- ZODB/trunk/src/ZODB/blob.py	2007-06-04 13:30:08 UTC (rev 76303)
+++ ZODB/trunk/src/ZODB/blob.py	2007-06-04 15:16:08 UTC (rev 76304)
@@ -16,12 +16,14 @@
 
 import base64
 import logging
+import logging
 import os
 import shutil
 import sys
-import time
 import tempfile
-import logging
+import threading
+import time
+import weakref
 
 import zope.interface
 
@@ -42,78 +44,114 @@
 
 valid_modes = 'r', 'w', 'r+', 'a'
 
+# Threading issues:
+# We want to support closing blob files when they are destroyed.
+# This introduces a threading issue, since a blob file may be destroyed
+# via GC in any thread.
+
+
 class Blob(persistent.Persistent):
     """A BLOB supports efficient handling of large data within ZODB."""
 
     zope.interface.implements(ZODB.interfaces.IBlob)
 
-    _os_link = os.rename
-
-    _p_blob_readers = 0
-    _p_blob_writers = 0
     _p_blob_uncommitted = None  # Filename of the uncommitted (dirty) data
-    _p_blob_data = None         # Filename of the committed data
+    _p_blob_committed = None    # Filename of the committed data
 
-    # All persistent object store a reference to their data manager, a database
-    # connection in the _p_jar attribute. So we are going to do the same with
-    # blobs here.
-    _p_blob_manager = None
+    def __setstate__(self, state=None):
+        # We use lists here because it will allow is to add and remove
+        # atomically
+        self.readers = []
+        self.writers = []
+        
+    __init__ = __setstate__
 
-    # Blobs need to participate in transactions even when not connected to
-    # a database yet. If you want to use a non-default transaction manager,
-    # you can override it via _p_blob_transaction. This is currently
-    # required for unit testing.
-    _p_blob_transaction = None
+    def __getstate__(self):
+        return None
 
+    def _p_deactivate(self):
+        # Only ghostify if we are unopened.
+        if self.readers or self.writers:
+            return
+        super(Blob, self)._p_deactivate()
+
+    def _p_invalidate(self):
+        # Force-close any open readers or writers,
+        # XXX should we warn of this? Maybe?
+        for ref in self.readers+self.writers:
+            f = ref()
+            if f is not None:
+                f.close()
+        super(Blob, self)._p_invalidate()
+
+    @property
+    def opened(self):
+        return bool(self.readers or self.writers)
+
+    def closed(self, f):
+        
+        # We use try/except below because another thread might remove
+        # the ref after we check it if the file is GCed.
+
+        for file_refs in (self.readers, self.writers):
+            for ref in self.file_refs:
+                if ref() is f:
+                    try:
+                        file_refs.remove(ref)
+                    except ValueError:
+                        pass
+                    return
+
     def open(self, mode="r"):
-        """Returns a file(-like) object representing blob data."""
-        result = None
-            
         if mode not in valid_modes:
             raise ValueError("invalid mode", mode)
 
+        if self.writers:
+            raise BlobError("Already opened for writing.")
+
         if mode == 'r':
             if self._current_filename() is None:
-                raise BlobError("Blob does not exist.")
+                self._create_uncommitted_file()
 
-            if self._p_blob_writers != 0:
-                raise BlobError("Already opened for writing.")
-
-            self._p_blob_readers += 1
             result = BlobFile(self._current_filename(), mode, self)
 
-        elif mode == 'w':
-            if self._p_blob_readers != 0:
+            def destroyed(ref, readers=self.readers):
+                try:
+                    readers.remove(ref)
+                except ValueError:
+                    pass
+            
+            self.readers.append(weakref.ref(result, destroyed))
+        else:
+            if self._p_blob_readers:
                 raise BlobError("Already opened for reading.")
 
-            self._p_blob_writers += 1
-            if self._p_blob_uncommitted is None:
-                self._create_uncommitted_file()
-            result = BlobFile(self._p_blob_uncommitted, mode, self)
-
-        elif mode in ('a', 'r+'):
-            if self._p_blob_readers != 0:
-                raise BlobError("Already opened for reading.")
-
-            if self._p_blob_uncommitted is None:
-                # Create a new working copy
-                uncommitted = BlobFile(self._create_uncommitted_file(),
-                                       mode, self)
-                # NOTE: _p_blob data appears by virtue of Connection._setstate
-                utils.cp(file(self._p_blob_data), uncommitted)
-                uncommitted.seek(0)
+            if mode == 'w':
+                if self._p_blob_uncommitted is None:
+                    self._create_uncommitted_file()
+                result = BlobFile(self._p_blob_uncommitted, mode, self)
             else:
-                # Re-use existing working copy
-                uncommitted = BlobFile(self._p_blob_uncommitted, mode, self)
+                if self._p_blob_uncommitted is None:
+                    # Create a new working copy
+                    self._create_uncommitted_file()
+                    result = BlobFile(self._p_blob_uncommitted, mode, self)
+                    utils.cp(file(self._p_blob_committed), result)
+                    if mode == 'r+':
+                        result.seek(0)
+                else:
+                    # Re-use existing working copy
+                    result = BlobFile(self._p_blob_uncommitted, mode, self)
 
-            self._p_blob_writers += 1
-            result = uncommitted
+            def destroyed(ref, writers=self.writers):
+                try:
+                    writers.remove(ref)
+                except ValueError:
+                    pass
+            
+            self.writers.append(weakref.ref(result, destroyed))
 
-        else:
-            raise IOError('invalid mode: %s ' % mode)
+            self._p_changed = True
 
-        if result is not None:
-            self._setup_transaction_manager(result)
         return result
 
     def openDetached(self, class_=file):
@@ -151,7 +189,7 @@
             os.unlink(target)
 
         try:
-            self._os_link(filename, target)
+            os.rename(filename, target)
         except:
             # Recover from the failed consumption: First remove the file, it
             # might exist and mark the pointer to the uncommitted file.
@@ -175,14 +213,14 @@
 
             # We changed the blob state and have to make sure we join the
             # transaction.
-            self._change()
+            self._p_changed = True
 
     # utility methods
 
     def _current_filename(self):
-        # NOTE: _p_blob_data and _p_blob_uncommitted appear by virtue of
+        # NOTE: _p_blob_committed and _p_blob_uncommitted appear by virtue of
         # Connection._setstate
-        return self._p_blob_uncommitted or self._p_blob_data
+        return self._p_blob_uncommitted or self._p_blob_committed
 
     def _create_uncommitted_file(self):
         assert self._p_blob_uncommitted is None, (
@@ -191,148 +229,6 @@
         self._p_blob_uncommitted = utils.mktemp(dir=tempdir)
         return self._p_blob_uncommitted
 
-    def _change(self):
-        self._p_changed = 1
-
-    def _setup_transaction_manager(self, result):
-        # We join the transaction with our own data manager in order to be
-        # notified of commit/vote/abort events.  We do this because at
-        # transaction boundaries, we need to fix up _p_ reference counts
-        # that keep track of open readers and writers and close any
-        # writable filehandles we've opened.
-        if self._p_blob_manager is None:
-            # Blobs need to always participate in transactions.
-            if self._p_jar is not None:
-                # If we are connected to a database, then we use the
-                # transaction manager that belongs to this connection
-                tm = self._p_jar.transaction_manager
-            else:
-                # If we are not connected to a database, we check whether
-                # we have been given an explicit transaction manager
-                if self._p_blob_transaction:
-                    tm = self._p_blob_transaction
-                else:
-                    # Otherwise we use the default
-                    # transaction manager as an educated guess.
-                    tm = transaction.manager
-            # Create our datamanager and join he current transaction.
-            dm = BlobDataManager(self, result, tm)
-            tm.get().join(dm)
-        elif result:
-            # Each blob data manager should manage only the one blob
-            # assigned to it.  Assert that this is the case and it is the
-            # correct blob
-            assert self._p_blob_manager.blob is self
-            self._p_blob_manager.register_fh(result)
-
-    # utility methods which should not cause the object's state to be
-    # loaded if they are called while the object is a ghost.  Thus,
-    # they are named with the _p_ convention and only operate against
-    # other _p_ instance attributes. We conventionally name these methods
-    # and attributes with a _p_blob prefix.
-
-    def _p_blob_clear(self):
-        self._p_blob_readers = 0
-        self._p_blob_writers = 0
-
-    def _p_blob_decref(self, mode):
-        if mode == 'r':
-            self._p_blob_readers = max(0, self._p_blob_readers - 1)
-        else:
-            assert mode in valid_modes, "Invalid mode %r" % mode
-            self._p_blob_writers = max(0, self._p_blob_writers - 1)
-
-    def _p_blob_refcounts(self):
-        # used by unit tests
-        return self._p_blob_readers, self._p_blob_writers
-
-
-class BlobDataManager:
-    """Special data manager to handle transaction boundaries for blobs.
-
-    Blobs need some special care-taking on transaction boundaries. As
-
-    a) the ghost objects might get reused, the _p_reader and _p_writer
-       refcount attributes must be set to a consistent state
-    b) the file objects might get passed out of the thread/transaction
-       and must deny any relationship to the original blob.
-    c) writable blob filehandles must be closed at the end of a txn so
-       as to not allow reuse between two transactions.
-
-    """
-
-    zope.interface.implements(transaction.interfaces.IDataManager)
-
-    def __init__(self, blob, filehandle, tm):
-        self.blob = blob
-        self.transaction = tm.get()
-        # we keep a weakref to the file handle because we don't want to
-        # keep it alive if all other references to it die (e.g. in the
-        # case it's opened without assigning it to a name).
-        self.fhrefs = utils.WeakSet()
-        self.register_fh(filehandle)
-        self.sortkey = time.time()
-        self.prepared = False
-
-    # Blob specific methods
-
-    def register_fh(self, filehandle):
-        self.fhrefs.add(filehandle)
-
-    def _remove_uncommitted_data(self):
-        self.blob._p_blob_clear()
-        self.fhrefs.map(lambda fhref: fhref.close())
-        if (self.blob._p_blob_uncommitted is not None and
-            os.path.exists(self.blob._p_blob_uncommitted)):
-            os.unlink(self.blob._p_blob_uncommitted)
-            self.blob._p_blob_uncommitted = None
-
-    # IDataManager
-
-    def tpc_begin(self, transaction):
-        if self.prepared:
-            raise TypeError('Already prepared')
-        self._checkTransaction(transaction)
-        self.prepared = True
-        self.transaction = transaction
-        self.fhrefs.map(lambda fhref: fhref.close())
-
-    def commit(self, transaction):
-        if not self.prepared:
-            raise TypeError('Not prepared to commit')
-        self._checkTransaction(transaction)
-        self.transaction = None
-        self.prepared = False
-
-        self.blob._p_blob_clear() 
-
-    def abort(self, transaction):
-        self.tpc_abort(transaction)
-
-    def tpc_abort(self, transaction):
-        self._checkTransaction(transaction)
-        if self.transaction is not None:
-            self.transaction = None
-        self.prepared = False
-
-        self._remove_uncommitted_data()
-
-    def tpc_finish(self, transaction):
-        pass
-
-    def tpc_vote(self, transaction):
-        pass
-
-    def sortKey(self):
-        return self.sortkey
-
-    def _checkTransaction(self, transaction):
-        if (self.transaction is not None and
-            self.transaction is not transaction):
-            raise TypeError("Transaction missmatch",
-                            transaction, self.transaction)
-
-
 class BlobFile(file):
     """A BlobFile that holds a file handle to actual blob data.
 
@@ -349,36 +245,11 @@
     def __init__(self, name, mode, blob):
         super(BlobFile, self).__init__(name, mode+'b')
         self.blob = blob
-        self.close_called = False
-
-    def write(self, data):
-        super(BlobFile, self).write(data)
-        self.blob._change()
-
-    def writelines(self, lines):
-        super(BlobFile, self).writelines(lines)
-        self.blob._change()
-
-    def truncate(self, size=0):
-        super(BlobFile, self).truncate(size)
-        self.blob._change()
-
+            
     def close(self):
-        # we don't want to decref twice
-        if not self.close_called:
-            self.blob._p_blob_decref(self.mode[:-1])
-            self.close_called = True
-            super(BlobFile, self).close()
+        self.blob.closed(self)
+        file.close(self)
 
-    def __del__(self):
-        # XXX we need to ensure that the file is closed at object
-        # expiration or our blob's refcount won't be decremented.
-        # This probably needs some work; I don't know if the names
-        # 'BlobFile' or 'super' will be available at program exit, but
-        # we'll assume they will be for now in the name of not
-        # muddying the code needlessly.
-        self.close()
-
 _pid = str(os.getpid())
 
 def log(msg, level=logging.INFO, subsys=_pid, exc_info=False):



More information about the Zodb-checkins mailing list