[Zope3-checkins] CVS: Zope3/src/zope/app/browser/content - fssync.py:1.18
Guido van Rossum
guido@python.org
Thu, 5 Jun 2003 12:38:38 -0400
Update of /cvs-repository/Zope3/src/zope/app/browser/content
In directory cvs.zope.org:/tmp/cvs-serv27205
Modified Files:
fssync.py
Log Message:
Refactor commit() using smaller helper methods. A bunch of local
variables become instance variables.
=== Zope3/src/zope/app/browser/content/fssync.py 1.17 => 1.18 ===
--- Zope3/src/zope/app/browser/content/fssync.py:1.17 Tue Jun 3 14:46:21 2003
+++ Zope3/src/zope/app/browser/content/fssync.py Thu Jun 5 12:38:37 2003
@@ -18,6 +18,7 @@
"""
import os
+import cgi
import shutil
import tempfile
@@ -66,63 +67,97 @@
"""
def commit(self):
+ self.check_content_type()
+ self.get_transaction()
+ self.set_note()
+ try:
+ self.make_tempdir()
+ self.unsnarf_body()
+ self.set_arguments()
+ self.make_metadata()
+ self.call_checker()
+ if self.errors:
+ return self.send_errors()
+ else:
+ self.call_committer()
+ self.write_to_filesystem()
+ return self.send_archive()
+ finally:
+ self.remove_tempdir()
+
+ def check_content_type(self):
if not self.request.getHeader("Content-Type") == "application/x-snarf":
- self.request.response.setHeader("Content-Type", "text/plain")
- return "ERROR: Content-Type is not application/x-snarf\n"
- txn = get_transaction() # Save for later
- # 00) Set transaction note
+ raise ValueError("Content-Type is not application/x-snarf")
+
+ def get_transaction(self):
+ self.txn = get_transaction()
+
+ def set_note(self):
note = self.request.get("note")
if not note:
# XXX Hack because cgi doesn't parse the query string
qs = self.request._environ.get("QUERY_STRING")
- if qs and qs.startswith("note="):
- note = qs[5:]
- import urllib
- note = urllib.unquote(note)
+ if qs:
+ d = cgi.parse_qs(qs)
+ notes = d.get("note")
+ if notes:
+ note = " ".join(notes)
if note:
- txn.note(note)
- # 0) Allocate temporary names
- working = tempfile.mktemp()
- try:
- # 1) Create the working directory
- os.mkdir(working)
- # 2) Unsnarf into the working directory
- istr = self.request.bodyFile
- istr.seek(0)
- uns = Unsnarfer(istr)
- uns.unsnarf(working)
- # 3) Check uptodateness (may raise SynchronizationError)
- name = objectName(self.context)
- container = getParent(self.context)
- if container is None and name == "":
- # Hack to get loading the root to work
- container = getRoot(self.context)
- fspath = os.path.join(working, "root")
- else:
- fspath = os.path.join(working, name)
- md = Metadata()
- c = Checker(md)
- c.check(container, name, fspath)
- errors = c.errors()
- if errors:
- # 3.1) Generate error response
- txn.abort()
- lines = ["Up-to-date check failed:"]
- working_sep = os.path.join(working, "") # E.g. foo -> foo/
- for e in errors:
- lines.append(e.replace(working_sep, ""))
- lines.append("")
- self.request.response.setHeader("Content-Type", "text/plain")
- return "\n".join(lines)
- # 4) Commit (may raise SynchronizationError)
- c = Committer(md)
- c.synch(container, name, fspath)
- # 5) Call toFS() to return the complete new state
- shutil.rmtree(working) # Start with clean slate
- os.mkdir(working)
- toFS(self.context, objectName(self.context) or "root", working)
- # 6) Return successful response
- return snarf_dir(self.request.response, working)
- finally:
- if os.path.exists(working):
- shutil.rmtree(working)
+ self.txn.note(note)
+
+ tempdir = None
+
+ def make_tempdir(self):
+ self.tempdir = tempfile.mktemp()
+ os.mkdir(self.tempdir)
+
+ def remove_tempdir(self):
+ if self.tempdir and os.path.exists(self.tempdir):
+ shutil.rmtree(self.tempdir)
+
+ def unsnarf_body(self):
+ fp = self.request.bodyFile
+ fp.seek(0)
+ uns = Unsnarfer(fp)
+ uns.unsnarf(self.tempdir)
+
+ def set_arguments(self):
+ # Set self.{name, container, fspath} based on self.context
+ self.name = objectName(self.context)
+ self.container = getParent(self.context)
+ if self.container is None and self.name == "":
+ # Hack to get loading the root to work
+ self.container = getRoot(self.context)
+ self.fspath = os.path.join(self.tempdir, "root")
+ else:
+ self.fspath = os.path.join(self.tempdir, self.name)
+
+ def make_metadata(self):
+ self.metadata = Metadata()
+
+ def call_checker(self):
+ c = Checker(self.metadata)
+ c.check(self.container, self.name, self.fspath)
+ self.errors = c.errors()
+
+ def send_errors(self):
+ self.txn.abort()
+ lines = ["Up-to-date check failed:"]
+ tempdir_sep = os.path.join(self.tempdir, "") # E.g. foo -> foo/
+ for e in self.errors:
+ lines.append(e.replace(tempdir_sep, ""))
+ lines.append("")
+ self.request.response.setHeader("Content-Type", "text/plain")
+ return "\n".join(lines)
+
+ def call_committer(self):
+ c = Committer(self.metadata)
+ c.synch(self.container, self.name, self.fspath)
+
+ def write_to_filesystem(self):
+ shutil.rmtree(self.tempdir) # Start with clean slate
+ os.mkdir(self.tempdir)
+ toFS(self.context, objectName(self.context) or "root", self.tempdir)
+
+ def send_archive(self):
+ return snarf_dir(self.request.response, self.tempdir)