# # DAV client library # # Copyright (C) 1998-1999 Guido van Rossum. All Rights Reserved. # Written by Greg Stein. Given to Guido. Licensed using the Python license. # # This module is maintained by Greg and is available at: # http://www.lyra.org/greg/python/davlib.py # # Since this isn't in the Python distribution yet, we'll use the CVS ID # for tracking: # $Id: davlib.py,v 1.2 1999/11/07 13:11:05 gstein Exp $ # import httplib import string import types import mimetypes import qp_xml import base64 error = 'davlib.error' INFINITY = 'infinity' XML_DOC_HEADER = '' XML_CONTENT_TYPE = 'text/xml; charset="utf-8"' # block size for copying files up to the server BLOCKSIZE = 16384 class HTTPConnectionAuth(httplib.HTTPConnection): def __init__(self, *args, **kw): apply(httplib.HTTPConnection.__init__, (self,) + args, kw) self.__username = None self.__password = None self.__nonce = None self.__opaque = None def setauth(self, username, password): self.__username = username self.__password = password def getreply(self): result = httplib.HTTPConnection.getreply(self) if result[0] != 401 or not self.__username: return result response = result[2] challenges = response.getallmatchingheaders('www-authenticate') assert challenges, 'HTTP violation: 401 with no WWW-Authenticate hdr' ### fill in stuff here... return result _textof = qp_xml.textof def _parse_status(elem): text = _textof(elem) idx1 = string.find(text, ' ') idx2 = string.find(text, ' ', idx1+1) return int(text[idx1:idx2]), text[idx2+1:] class _blank: def __init__(self, **kw): self.__dict__.update(kw) class _propstat(_blank): pass class _response(_blank): pass class _multistatus(_blank): pass def _extract_propstat(elem): ps = _propstat(prop={}, status=None, responsedescription=None) for child in elem.children: if child.ns != 'DAV:': continue if child.name == 'prop': for prop in child.children: ps.prop[(prop.ns, prop.name)] = prop elif child.name == 'status': ps.status = _parse_status(child) elif child.name == 'responsedescription': ps.responsedescription = _textof(child) ### unknown element name return ps def _extract_response(elem): resp = _response(href=[], status=None, responsedescription=None, propstat=[]) for child in elem.children: if child.ns != 'DAV:': continue if child.name == 'href': resp.href.append(_textof(child)) elif child.name == 'status': resp.status = _parse_status(child) elif child.name == 'responsedescription': resp.responsedescription = _textof(child) elif child.name == 'propstat': resp.propstat.append(_extract_propstat(child)) ### unknown child element return resp def _extract_msr(root): if root.ns != 'DAV:' or root.name != 'multistatus': raise 'invalid response: expected' msr = _multistatus(responses=[ ], responsedescription=None) for child in root.children: if child.ns != 'DAV:': continue if child.name == 'responsedescription': msr.responsedescription = _textof(child) elif child.name == 'response': msr.responses.append(_extract_response(child)) ### unknown child element return msr def _extract_locktoken(root): if root.ns != 'DAV:' or root.name != 'prop': raise 'invalid response: expected' elem = root.find('lockdiscovery', 'DAV:') if not elem: raise 'invalid response: expected' elem = elem.find('activelock', 'DAV:') if not elem: raise 'invalid response: expected' elem = elem.find('locktoken', 'DAV:') if not elem: raise 'invalid response: expected' elem = elem.find('href', 'DAV:') if not elem: raise 'invalid response: expected' return elem.textof() class DAVResponse(httplib.HTTPResponse): def parse_multistatus(self): self.root = qp_xml.Parser().parse(self) self.msr = _extract_msr(self.root) def parse_lock_response(self): self.root = qp_xml.Parser().parse(self) self.locktoken = _extract_locktoken(self.root) class DAV(HTTPConnectionAuth): response_class = DAVResponse __username=None __password=None def setauth(self,username=None,password=None): self.__username=username self.__password=password def get(self, url): return self._request('GET', url) def head(self, url): return self._request('HEAD', url) def post(self, url, data={ }, body=None, extra_hdrs={ }): headers = { } headers.update(extra_hdrs) assert body or data, "body or data must be supplied" assert not (body and data), "cannot supply both body and data" if data: body = '' for key, value in data.items(): if type(value) == types.ListType: for item in value: body = body + '&' + key + '=' + urllib.quote(str(item)) else: body = body + '&' + key + '=' + urllib.quote(str(value)) body = body[1:] headers['Content-Type'] = 'application/x-www-form-urlencoded' return self._request('POST', url, body, headers) def options(self, url='*'): return self._request('OPTIONS', url) def trace(self, url): return self._request('TRACE', url) def put(self, url, contents, content_type=None, content_enc=None): if not content_type: if type(contents) is types.FileType: content_type, content_enc = mimetypes.guess_type(contents.name) else: content_type, content_enc = mimetypes.guess_type(url) headers = { } if content_type: headers['Content-Type'] = content_type if content_enc: headers['Content-Encoding'] = content_enc return self._request('PUT', url, contents, headers) def delete(self, url): return self._request('DELETE', url) def propfind(self, url, body=None, depth=None): extra_hdrs = { 'Content-Type' : XML_CONTENT_TYPE } if depth is not None: extra_hdrs['Depth'] = str(depth) return self._request('PROPFIND', url, body, extra_hdrs) def proppatch(self, url, body): extra_hdrs = { 'Content-Type' : XML_CONTENT_TYPE } return self._request('PROPPATCH', url, body, extra_hdrs) def mkcol(self, url): return self._request('MKCOL', url) def move(self, src, dst): return self._request('MOVE', src, extra_hdrs={ 'Destination' : dst }) def copy(self, src, dst, depth=None): extra_hdrs = { 'Destination' : dst } if depth is not None: extra_hdrs['Depth'] = str(depth) return self._request('COPY', src, extra_hdrs=extra_hdrs) def lock(self, url, owner='', timeout=None, depth=None): extra_hdrs = { 'Content-Type' : XML_CONTENT_TYPE } if depth is not None: extra_hdrs['Depth'] = str(depth) if timeout is not None: extra_hdrs['Timeout'] = timeout body = XML_DOC_HEADER + \ '' + \ '' + \ '' + \ owner + \ '' return self._request('LOCK', url, body, extra_hdrs=extra_hdrs) def unlock(self, url, locktoken): if locktoken[0] != '<': locktoken = '<' + locktoken + '>' return self._request('UNLOCK', url, extra_hdrs={'Lock-Token' : locktoken}) ### the _request() method needs some more work for reconnects... ### (especially w/ regard to body values that are Files) def _request(self, method, url, body=None, extra_hdrs={}): "Internal method for sending a request." self.putrequest(method, url) if body: ### length broken for files... self.putheader('Content-Length', str(len(body))) for hdr, value in extra_hdrs.items(): self.putheader(hdr, value) if self.__username and self.__password: self.putheader("AUTHORIZATION","Basic %s" % string.replace(base64.encodestring("%s:%s" % (self.__username,self.__password)),"\012","")) self.endheaders() if body: if type(body) is types.FileType: while 1: block = body.read(BLOCKSIZE) if not block: break self.send(block) else: self.send(body) errcode, errmsg, response = self.getreply() if errcode == -1: raise error, (errmsg, response) response.errcode = errcode response.errmsg = errmsg return response # # Higher-level methods for typical client use # def allprops(self, url, depth=None): return self.propfind(url, depth=depth) def propnames(self, url, depth=None): body = XML_DOC_HEADER + \ '' return self.propfind(url, body, depth) def getprops(self, url, *names, **kw): assert names, 'at least one property name must be provided' if kw.has_key('ns'): xmlns = ' xmlns:NS="' + kw['ns'] + '"' ns = 'NS:' del kw['ns'] else: xmlns = ns = '' if kw.has_key('depth'): depth = kw['depth'] del kw['depth'] else: depth = 0 assert not kw, 'unknown arguments' body = XML_DOC_HEADER + \ '<' + ns + \ string.joinfields(names, '/><' + ns) + \ '/>' return self.propfind(url, body, depth) def delprops(self, url, *names, **kw): assert names, 'at least one property name must be provided' if kw.has_key('ns'): xmlns = ' xmlns:NS="' + kw['ns'] + '"' ns = 'NS:' del kw['ns'] else: xmlns = ns = '' assert not kw, 'unknown arguments' body = XML_DOC_HEADER + \ '<' + ns + \ string.joinfields(names, '/><' + ns) + \ '/>' return self.proppatch(url, body) def setprops(self, url, *xmlprops, **props): assert xmlprops or props, 'at least one property must be provided' elems = string.joinfields(xmlprops, '') if props.has_key('ns'): xmlns = ' xmlns:NS="' + props['ns'] + '"' ns = 'NS:' del props['ns'] else: xmlns = ns = '' for key, value in props.items(): if value: elems = '%s<%s%s>%s' % (elems, ns, key, value, ns, key) else: elems = '%s<%s%s/>' % (elems, ns, key) body = XML_DOC_HEADER + \ '' + \ elems + \ '' return self.proppatch(url, body) def get_lock(self, url, owner='', timeout=None, depth=None): response = self.lock(url, owner, timeout, depth) response.parse_lock_response() return response.locktoken