[Zodb-checkins]
SVN: ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py
- better locking strategy
Christian Theune
ct at gocept.com
Sat Feb 25 17:26:17 EST 2006
Log message for revision 65482:
- better locking strategy
Changed:
U ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py
-=-
Modified: ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py
===================================================================
--- ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py 2006-02-25 22:25:43 UTC (rev 65481)
+++ ZODB/branches/blob-merge-branch/src/ZEO/ClientStorage.py 2006-02-25 22:26:17 UTC (rev 65482)
@@ -112,7 +112,7 @@
wait=None, wait_timeout=None,
read_only=0, read_only_fallback=0,
username='', password='', realm=None,
- blob_dir=tempfile.gettempdir()):
+ blob_dir=None):
"""ClientStorage constructor.
This is typically invoked from a custom_zodb.py file.
@@ -316,6 +316,10 @@
self.blob_dir = blob_dir
+ # Initialize locks
+ self.blob_status_lock = threading.Lock()
+ self.blob_status = {}
+
# Decide whether to use non-temporary files
if client is not None:
dir = var or os.getcwd()
@@ -759,6 +763,7 @@
return self.loadEx(oid, version)[:2]
def loadEx(self, oid, version):
+ print "LOAD"
self._lock.acquire() # for atomic processing of invalidations
try:
t = self._cache.load(oid, version)
@@ -899,6 +904,7 @@
return self._check_serials()
def storeBlob(self, oid, serial, data, blobfilename, version, txn):
+ """Storage API: store a blob object."""
serials = self.store(oid, serial, data, version, txn)
blobfile = open(blobfilename, "rb")
while True:
@@ -925,33 +931,87 @@
BLOB_SUFFIX,)
)
+ def _do_load_blob(self, oid, serial):
+ """Do the actual loading from the RPC server."""
+ blob_filename = self._getCleanFilename(oid, serial)
+ if self._server is None:
+ raise ClientDisconnected()
+
+ # We write to a temporary file first, so we do not accidentally
+ # allow half-baked copies of this blob be loaded
+ tempfilename = self._getDirtyFilename(oid, serial)
+ tempfile = open(tempfilename, "wb")
+
+ offset = 0
+ while True:
+ chunk = self._server.loadBlob(oid, serial, version, offset)
+ if not chunk:
+ break
+ offset += len(chunk)
+ tempfile.write(chunk)
+
+ tempfile.close()
+ utils.best_rename(tempfilename, blob_filename)
+ return blob_filename
+
def loadBlob(self, oid, serial, version):
+ """Loading a blob has to know about loading the same blob
+ from another thread as the same time.
+
+ 1. Check if the blob is downloaded already
+ 2. Check whether it is currently beeing downloaded
+ 2a. Wait for other download to finish, return
+ 3. If not beeing downloaded, start download
+ """
+ if self.blob_dir is None:
+ raise POSException.Unsupported("No blob cache directory is configured. Can not load blob.")
+
blob_filename = self._getCleanFilename(oid, serial)
- if os.path.exists(blob_filename): # XXX see race condition below
- return blob_filename
+ # Case 1: Blob is available already, just use it
+ if os.path.exists(blob_filename):
+ return blob_filename
- self._load_lock.acquire()
+ # Case 2,3: Blob might still be downloading or not there yet
+
+ # Try to get or create a lock for the downloading of this blob,
+ # identified by it's oid and serial
+ lock_key = (oid, serial)
+
+ # We need to make the check for an existing lock and the possible
+ # creation of a new one atomic, so there is another lock:
+ self.blob_status_lock.acquire()
try:
- if self._server is None:
- raise ClientDisconnected()
+ if not self.blob_status.has_key(oid):
+ self.blob_status[lock_key] = Lock()
+ lock = self.blob_status[lock_key]
+ finally:
+ self.blob_status_lock.release()
- tempfilename = self._getDirtyFilename(oid, serial)
- tempfile = open(tempfilename, "wb")
-
- offset = 0
- while True:
- chunk = self._server.loadBlob(oid, serial, version, offset)
- if not chunk:
- break
- offset += len(chunk)
- tempfile.write(chunk)
+ # We acquire the lock to either start downloading, or wait
+ # for another download to finish
+ lock.acquire()
+ try:
+ # If there was another download that is finished by now,
+ # we just take the result.
+ if os.path.exists(blob_filename):
+ return blob_filename
- tempfile.close()
- utils.best_rename(tempfilename, blob_filename)
- return blob_filename
+ # Otherwise we download and use that
+ return self._do_load_blob(oid, serial)
finally:
- self._load_lock.release()
+ # When done we remove the download lock ...
+ lock.release()
+ # And the status information isn't needed as well,
+ # but we have to use the second lock here as well, to avoid
+ # making the creation of this status lock non-atomic (see above)
+ self.blob_status_lock.acquire()
+ try:
+ del self.blob_status_lock[lock_key]
+ finally:
+ self.blob_status_lock.release()
+
+
def tpc_vote(self, txn):
"""Storage API: vote on a transaction."""
if txn is not self._transaction:
More information about the Zodb-checkins
mailing list