[Zope3-checkins] CVS: Zope3/src/zope/fssync - fssync.py:1.7
Guido van Rossum
guido@python.org
Tue, 13 May 2003 13:16:56 -0400
Update of /cvs-repository/Zope3/src/zope/fssync
In directory cvs.zope.org:/tmp/cvs-serv13651
Modified Files:
fssync.py
Log Message:
Unittests for the Network class.
=== Zope3/src/zope/fssync/fssync.py 1.6 => 1.7 ===
--- Zope3/src/zope/fssync/fssync.py:1.6 Tue May 13 11:28:03 2003
+++ Zope3/src/zope/fssync/fssync.py Tue May 13 13:16:25 2003
@@ -81,18 +81,58 @@
class Network(object):
+ """Handle network communication.
+
+ This class has various methods for managing the root url (which is
+ stored in a file @@Zope/Root) and has a method to send an HTTP(S)
+ request to the root URL, expecting a zip file back (that's all the
+ application needs).
+
+ Public instance variables:
+
+ rooturl -- full root url, e.g. 'http://user:passwd@host:port/path'
+ roottype -- 'http' or 'https'
+ user_passwd -- 'user:passwd'
+ host_port -- 'host:port'
+ rootpath -- '/path'
+ """
+
+ def __init__(self, rooturl=None):
+ """Constructor. Optionally pass the root url."""
+ self.setrooturl(rooturl)
+
def loadrooturl(self, target):
+ """Load the root url for the given target.
+
+ This calls findrooturl() to find the root url for the target,
+ and then calls setrooturl() to set it. If self.findrooturl()
+ can't find a root url, Error() is raised.
+ """
rooturl = self.findrooturl(target)
if not rooturl:
raise Error("can't find root url for target", target)
self.setrooturl(rooturl)
def saverooturl(self, target):
+ """Save the root url in the target's @@Zope directory.
+
+ This writes the file <target>/@@Zope/Root; the directory
+ <target>/@@Zope must already exist.
+ """
if self.rooturl:
self.writefile(self.rooturl + "\n",
join(target, "@@Zope", "Root"))
def findrooturl(self, target):
+ """Find the root url for the given target.
+
+ This looks in <target>/@@Zope/Root, and then in the
+ corresponding place for target's parent, and then further
+ ancestors, until the filesystem root is reached.
+
+ If no root url is found, return None.
+ """
+ unwanted = ("", os.curdir, os.pardir)
dir = realpath(target)
while dir:
zopedir = join(dir, "@@Zope")
@@ -105,24 +145,39 @@
data = data.strip()
if data:
return data
- dir = dirname(dir)
+ head, tail = split(dir)
+ if tail in unwanted:
+ break
+ dir = head
return None
def setrooturl(self, rooturl):
+ """Set the root url.
+
+ If the argument is None or empty, self.rooturl and all derived
+ instance variables are set to None. Otherwise, self.rooturl
+ is set to the argument the broken-down root url is stored in
+ the other instance variables.
+ """
+ if not rooturl:
+ rooturl = roottype = rootpath = user_passwd = host_port = None
+ else:
+ roottype, rest = urllib.splittype(rooturl)
+ if roottype not in ("http", "https"):
+ raise Error("root url must be 'http' or 'https'", rooturl)
+ if roottype == "https" and not hasattr(httplib, "HTTPS"):
+ raise Error("https not supported by this Python build")
+ netloc, rootpath = urllib.splithost(rest)
+ user_passwd, host_port = urllib.splituser(netloc)
+
self.rooturl = rooturl
- if not self.rooturl:
- self.roottype = self.rootpath = None
- self.user_passwd = self.host_port = None
- return
- self.roottype, rest = urllib.splittype(self.rooturl)
- if self.roottype not in ("http", "https"):
- raise Error("root url must be 'http' or 'https'", self.rooturl)
- if self.roottype == "https" and not hasattr(httplib, "HTTPS"):
- raise Error("https not supported by this Python build")
- netloc, self.rootpath = urllib.splithost(rest)
- self.user_passwd, self.host_port = urllib.splituser(netloc)
+ self.roottype = roottype
+ self.rootpath = rootpath
+ self.user_passwd = user_passwd
+ self.host_port = host_port
def readfile(self, file, mode="r"):
+ # Internal helper to read a file
f = open(file, mode)
try:
return f.read()
@@ -130,6 +185,7 @@
f.close()
def writefile(self, data, file, mode="w"):
+ # Internal helper to write a file
f = open(file, mode)
try:
f.write(data)
@@ -137,6 +193,38 @@
f.close()
def httpreq(self, path, view, datafp=None, content_type="application/zip"):
+ """Issue an HTTP or HTTPS request.
+
+ The request parameters are taken from the root url, except
+ that the requested path is constructed by concatenating the
+ path and view arguments.
+
+ If the optional 'datafp' argument is not None, it should be a
+ seekable stream from which the input document for the request
+ is taken. In this case, a POST request is issued, and the
+ content-type header is set to the 'content_type' argument,
+ defaulting to 'application/zip'. Otherwise (if datafp is
+ None), a GET request is issued and no input document is sent.
+
+ If the request succeeds and returns a document whose
+ content-type is 'application/zip', the return value is a tuple
+ (fp, headers) where fp is a non-seekable stream from which the
+ return document can be read, and headers is a case-insensitive
+ mapping giving the response headers.
+
+ If the request returns an HTTP error, the Error exception is
+ raised. If it returns success (error code 200) but the
+ content-type of the result is not 'application/zip', the Error
+ exception is also raised. In these error cases, if the result
+ document's content-type is a text type (anything starting with
+ 'text/'), the text of the result document is included in the
+ Error exception object; in the specific case that the type is
+ text/html, HTML formatting is removed using a primitive
+ formatter.
+
+ XXX This doesn't support proxies or redirect responses.
+ """
+ assert self.rooturl
if not path.endswith("/"):
path += "/"
path += view
@@ -175,10 +263,16 @@
errcode, errmsg,
self.slurptext(fp, headers))
if headers["Content-type"] != "application/zip":
- raise Error(self.slurptext(fp, headers).strip())
+ raise Error(self.slurptext(fp, headers))
return fp, headers
def slurptext(self, fp, headers):
+ """Helper to read the result document.
+
+ This removes the formatting from a text/html document; returns
+ other text documents as-is; and for non-text documents,
+ returns just a string giving the content-type.
+ """
data = fp.read()
ctype = headers["Content-type"]
if ctype == "text/html":
@@ -187,10 +281,10 @@
p = htmllib.HTMLParser(f)
p.feed(data)
p.close()
- return s.getvalue()
+ return s.getvalue().strip()
if ctype.startswith("text/"):
- return data
- return "Content-Type %r" % ctype
+ return data.strip()
+ return "Content-type: %s" % ctype
class FSSync(object):