[Zope-CVS] CVS: Packages/pcgi/Util - README.txt:1.1 killpcgi.py:1.1 pcgifile.py:1.1
Fred L. Drake, Jr.
fred@zope.com
Thu, 20 Mar 2003 15:39:57 -0500
Update of /cvs-repository/Packages/pcgi/Util
In directory cvs.zope.org:/tmp/cvs-serv8256/Util
Added Files:
README.txt killpcgi.py pcgifile.py
Log Message:
Make the PCGI support code available via CVS without having to maintain and
distribute it as part of the Zope application server. The appserver still
contains the necessary support to allow a PCGI-capable web server to talk
to it.
=== Added File Packages/pcgi/Util/README.txt ===
pcgifile.py - JeffBauer@bigfoot.com - May 21, 1998
The pcgifile.py is a sanity tester for pcgi info files and
can be run as a stand-alone program or a CGI script. For
more information, refer to:
http://starship.skyport.net/crew/jbauer/persistcgi/pcgifile/
=== Added File Packages/pcgi/Util/killpcgi.py ===
#!/usr/bin/env python
# killpcgi.py - kill a running pcgi process
# Copyright(c) 1998, Jeff Bauer, All rights reserved.
# killpcgi is a convenience script to kill a running
# pcgi process, provided that it is running on a
# Unix system. You pass it the path of the pcgi info
# file, and killpcgi will kill the running process and
# remove the socket/pid files.
#
# killpcgi has been built at the request of several users,
# but you may want to first examine the script and modify it
# to suit your environment.
#
# Bugs: skimpy error handling
# 0.3a minor win32 exception caught
# 0.2a Added support for NT - 8/8/98
# 0.1a Initial version
__version__ = "0.3a"
def win32_kill(pid):
"""posix-style kill command for the Win32 environment"""
import win32api, pywintypes
try:
handle = win32api.OpenProcess(1, 0, pid)
except pywintypes.error:
return -1
return (0 != win32api.TerminateProcess(handle, 0))
def killpcgi(pcgiInfoFile):
import os, sys, string
delimiter = '='
pidFile = None
socketFile = None
infoFile = open(pcgiInfoFile, 'r')
for i in infoFile.readlines():
if delimiter in i:
n,v = string.split(string.strip(i), delimiter)
if n == 'PCGI_PID_FILE':
pidFile = v
elif n == 'PCGI_SOCKET_FILE':
socketFile = v
infoFile.close()
if pidFile is None:
print "unknown pid file"
else:
f = open(pidFile)
pid = int(f.read())
f.close()
if os.name == 'nt':
if not win32_kill(pid):
print "process %d killed" % pid
elif os.name == 'posix':
if os.kill(pid, 0):
print "process %d doesn't appear to be running" % pid
else:
# safety measure (exclude it if you're fearless)
if os.system('/bin/ps -p %d | grep python > /dev/null' % pid):
print "process %d doesn't appear to be python" % pid
else:
os.kill(pid, 15)
print "process %d killed" % pid
# conservative approach: don't remove pid/socket files
# unless we're reasonably certain we have killed the
# running process
os.unlink(pidFile)
if socketFile is not None:
os.unlink(socketFile)
else:
print "kill not supported for platform:", os.name
if __name__ == '__main__':
import sys
usage = "usage: killpcgi pcgi-info-file"
if len(sys.argv) < 2:
print usage
else:
killpcgi(sys.argv[1])
=== Added File Packages/pcgi/Util/pcgifile.py ===
#!/usr/local/bin/python
# pcgifile.py - pcgi info file sanity testing - jeffbauer@bigfoot.com
# Copyright(c) 1998, Jeff Bauer
# 0.6a August 9, 1998
# - added socket checking
#
# 0.5a August 7, 1998
# - added NT compatibility
# - improved import checking
#
# 0.4a July 27, 1998
# - added checks for executable permissions
# - print versions of relevant modules
__version__ = "0.6a"
Delimiter = '='
# no class-based exceptions due to 1.4 compatbility
PcgiFileException='PcgiFileException'
class PcgiFile:
def __init__(self, pcgifile):
self.pcgifile = pcgifile
self.infodict = {}
self.combined = {}
self.log = []
self.resource = []
self.file_contents = []
self.module_version = []
self.continue_testing = 1
self.pcgi_wrapper = None
try:
self.readInfoFile()
self.checkRequiredKeys()
self.checkPCGIValues()
self.lookupPCGIPublisher()
self.checkWritePermissions()
self.checkImports()
self.checkSockets()
except PcgiFileException:
self.continue_testing = 0
def checkImports(self):
try:
from cgi_module_publisher import publish_module
except ImportError:
self.log.append("error attempting: 'from cgi_module_publisher import publish_module'")
raise PcgiFileException
try:
import cgi_module_publisher, CGIResponse
self.module_version.append("%-20s %-7s %s" % \
('cgi_module_publisher',
cgi_module_publisher.__version__,
cgi_module_publisher.__file__) )
self.module_version.append("%-20s %-7s %s" % \
('CGIResponse',
CGIResponse.__version__,
CGIResponse.__file__) )
self.module_version.append("%-20s %-7s %s" %
('pcgifile',
__version__,
sys.argv[0]) )
self.module_version.append("%-20s %-7s %s" % \
('pcgi-wrapper',
self.getPcgiWrapperVersion(),
self.pcgi_wrapper))
except ImportError:
pass
except NameError:
pass
try:
import pcgi_publisher
except ImportError:
if self.combined.has_key('PCGI_PUBLISHER'):
d, s = os.path.split(self.combined['PCGI_PUBLISHER'])
if not d in sys.path:
sys.path.append(d)
try:
import pcgi_publisher
self.module_version.append("%-20s %-7s %s" % \
('pcgi_publisher',
pcgi_publisher.__version__,
pcgi_publisher.__file__))
except ImportError:
pass
except NameError:
pass
def checkPCGIValues(self):
if self.combined.has_key('PCGI_EXE'):
sw_exe = self.combined['PCGI_EXE']
else:
sw_exe = sys.executable
self.log.append("advisory recommendation: specify PCGI_EXE=%s" % \
sys.executable)
if os.path.isfile(sw_exe):
self.resource.append("Executable:\t%s" % sw_exe)
else:
self.log.append("executable not found: %s" % sw_exe)
raise PcgiFileException
if self.combined.has_key('PCGI_PID_FILE'):
f = self.combined['PCGI_PID_FILE']
d = os.path.split(f)[0]
if os.path.isdir(d):
self.resource.append("PID file:\t%s" % f)
else:
self.log.append("directory not found: %s" % d)
raise PcgiFileException
if self.combined.has_key('PCGI_SOCKET_FILE'):
f = self.combined['PCGI_SOCKET_FILE']
d = os.path.split(f)[0]
if os.path.isdir(d):
self.resource.append("Socket file:\t%s" % f)
else:
self.log.append("directory not found: %s" % d)
raise PcgiFileException
if not self.combined.has_key('PCGI_NAME'):
self.log.append("advisory recommendation: specify PCGI_NAME")
if self.combined.has_key('PCGI_MODULE_PATH'):
p = self.combined['PCGI_MODULE_PATH']
if os.path.isfile(p):
self.resource.append("Module:\t%s" % p)
else:
self.log.append("module not found: %s" % p)
raise PcgiFileException
if self.combined.has_key('PCGI_ERROR_LOG'):
self.resource.append("Error Log:\t%s" % \
self.combined['PCGI_ERROR_LOG'])
if self.combined.has_key('PCGI_WORKING_DIR'): # deprecated
d = self.combined['PCGI_WORKING_DIR']
if os.path.isfile(d):
self.resource.append("Working Directory:\t%s" % d)
else:
self.log.append("working directory not found: %s" % d)
raise PcgiFileException
def checkRequiredKeys(self):
"""
Check for the required PCGI keys.
"""
for (k,v) in os.environ.items():
self.combined[k] = v
for (k,v) in self.infodict.items():
if self.combined.has_key(k):
self.log.append("%s=%s, overwrites: %s" % (k, v, self.combined[k]))
self.combined[k] = v
for k in ['PCGI_PID_FILE','PCGI_SOCKET_FILE','PCGI_MODULE_PATH']:
if not self.combined.has_key(k):
self.log.append("missing parameter: %s" % k)
raise PcgiFileException
# PCGI_INFO_FILE is assigned by the pcgi-wrapper, so that it
# may be known (made available) to pcgi_publisher.
self.combined['PCGI_INFO_FILE'] = self.pcgifile
def checkSockets(self):
"""
Check for possible socket-related error conditions.
"""
try:
import socket
except ImportError:
self.log.append("unable to import socket module")
raise PcgiFileException
port = None
if self.combined.has_key('PCGI_PORT'):
try:
port = string.atoi(self.combined['PCGI_PORT'])
except ValueError:
self.log.append("invalid port '%s', PCGI_PORT must be an integer" % self.combined['PCGI_PORT'])
raise PcgiFileException
if os.name == 'posix':
if port:
self.log.append("cannot specify PCGI_PORT directive on Unix - no support for INET sockets")
raise PcgiFileException
elif not port:
self.log.append("win32 platform must specify missing PCGI_PORT directive (default=7244)");
raise PcgiFileException
if port:
if self.combined.has_key('PCGI_HOST'):
hostname = self.combined['PCGI_HOST']
if hostname != socket.gethostname():
self.log.append("advisory recommendation: PCGI_HOST '%s' doesn't match '%s'" % (hostname, socket.gethostname()))
else:
hostname = socket.gethostname()
if port:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((hostname, port))
except socket.error:
self.log.append("error creating/binding INET socket (%s, %s)" % (hostname, port))
raise PcgiFileException
else:
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
except socket.error:
self.log.append("error creating UNIX socket")
raise PcgiFileException
socketFile = self.combined.get('PCGI_SOCKET_FILE')
if os.path.exists(socketFile):
self.log.append("advisory: socket %s in use, bind() not tested" % socketFile)
else:
try:
sock.bind(socketFile)
os.unlink(socketFile)
except socket.error:
self.log.append("error binding UNIX socket (%s)" % socketFile)
raise PcgiFileException
def checkWritePermissions(self):
"""
Check write permissions for PCGI_SOCKET_FILE, PCGI_PID_FILE, and
(if specified) PCGI_ERROR_LOG.
"""
fn = {}
if self.combined.has_key('PCGI_PID_FILE'):
fn['PCGI_PID_FILE'] = self.combined['PCGI_PID_FILE']
if self.combined.has_key('PCGI_SOCKET_FILE'):
fn['PCGI_SOCKET_FILE'] = self.combined['PCGI_SOCKET_FILE']
if self.combined.has_key('PCGI_ERROR_LOG'):
fn['PCGI_ERROR_LOG'] = self.combined['PCGI_ERROR_LOG']
for key, file in fn.items():
if os.path.exists(file):
try:
f = open(file,'a+')
f.close()
except IOError:
self.log.append("%s write permission error: %s" % \
(key, file))
raise PcgiFileException
else:
path = os.path.split(file)[0]
import tempfile
tempfile.tempdir = path
tmpfile = tempfile.mktemp()
try:
f = open(tmpfile,'w+')
f.close()
os.unlink(tmpfile)
except IOError:
self.log.append("%s write permission error: %s" % \
(key, file))
raise PcgiFileException
def environList(self):
"""
return a sorted list of how the environment would likely appear
if run through the pcgi-wrapper.
"""
e = []
keys = self.combined.keys()
keys.sort()
for k in keys:
e.append("%s\t%s" % (k, self.combined[k]))
return e
def getPcgiWrapperVersion(self):
"""
Execute pcgi-wrapper with no arguments and grab the version id.
"""
try:
import tempfile
tmpfile = tempfile.mktemp()
os.system("%s > %s" % (self.pcgi_wrapper, tmpfile))
f = open(tmpfile, 'r')
r = f.readlines()
f.close()
os.unlink(tmpfile)
for l in r:
s = string.strip(l)
if s[:21] == 'pcgi-wrapper-version ':
return string.split(s)[1]
except ImportError:
pass
return None
def isexecutable(self, path, real=None):
if os.name == 'posix':
return self.pathperm(path, real)[2]
else:
return 1
def lookupPCGIPublisher(self):
"""
The most efficient way for pcgi-wrapper to determine which
pcgi_publisher to use is for the pcgi info file to specify
it with the PCGI_PUBLISHER directive. Using the PCGI_PUBLISHER
is arguably the *best* method, as pcgi-wrapper will find it
quicker than otherwise. Still, in the interest of flexibility,
pcgi-wrapper will attempt to locate pcgi_publisher using the
following rules:
1. PCGI_PUBLISHER (*best*)
Rules 2-5, look in the paths below for files named: pcgi_publisher.py,
pcgi_publisher.pyc, pcgi_publisher.pyo, pcgi_publisher.
2. PCGI_INSERT_PATH, if available
3. PYTHONPATH, if available
4. Look in the directory of PCGI_MODULE_PATH
5. Look in the directory of the pcgi info file
"""
if self.combined.has_key('PCGI_PUBLISHER'):
p = self.combined['PCGI_PUBLISHER']
if os.path.isfile(p):
self.resource.append("Publisher:\t%s" % p)
else:
self.log.append("publisher not found: %s" % p)
raise PcgiFileException
return
self.log.append("advisory recommendation: specify PCGI_PUBLISHER")
# search through combined PCGI_INSERT_PATH + PYTHONPATH directories
searchPath = ""
if self.combined.has_key('PCGI_INSERT_PATH'):
searchPath = searchPath + self.combined['PCGI_INSERT_PATH']
if self.combined.has_key('PYTHONPATH'):
searchPath = searchPath + self.combined['PYTHONPATH']
publisherName = ['pcgi_publisher.py','pcgi_publisher.pyc','pcgi_publisher.pyo','pcgi_publisher']
for d in string.split(searchPath, ':'):
for p in publisherName:
pcgiPublisher = "%s%s%s" % (d, os.sep, p)
if os.path.isfile(pcgiPublisher):
self.resource.append("Publisher:\t%s" % pcgiPublisher)
return
# look in module directory
if self.combined.has_key('PCGI_MODULE_PATH'):
(d, x) = os.path.split(self.combined['PCGI_MODULE_PATH'])
for p in publisherName:
pcgiPublisher = "%s%s%s" % (d, os.sep, p)
if os.path.isfile(pcgiPublisher):
self.resource.append("Publisher:\t%s" % pcgiPublisher)
return
# look in pcgi info file directory
(d, x) = os.path.split(self.pcgifile)
for p in publisherName:
pcgiPublisher = "%s%s%s" % (d, os.sep, p)
if os.path.isfile(pcgiPublisher):
self.resource.append("Publisher:\t%s" % pcgiPublisher)
return
self.log.append("Unable to locate the pcgi_publisher")
raise PcgiFileException
def pathperm(self, path, real=None):
"""
Returns a 3-tuple of booleans indicating whether the process has
(read, write, execute) permission. A true value for the 'real'
argument indicates the test should occur for the real id rather
than the effective id.
"""
stat = os.stat(path)
if real is None:
uid = os.geteuid()
gid = os.getegid()
else:
uid = os.getuid()
gid = os.getgid()
if uid == stat[4]:
return (00400 & stat[0], 00200 & stat[0], 00100 & stat[0])
elif gid == stat[5]:
return (00040 & stat[0], 00020 & stat[0], 00010 & stat[0])
else:
return (00004 & stat[0], 00002 & stat[0], 00001 & stat[0])
def readInfoFile(self):
max_directives = 12 # arbitrary number defined in pcgi.h
if not os.path.isfile(self.pcgifile):
self.log.append("unable to locate file: %s" % self.pcgifile)
raise PcgiFileException
elif not self.isexecutable(self.pcgifile):
self.log.append("info file '%s' not executable" % self.pcgifile)
raise PcgiFileException
f = open(self.pcgifile, 'r')
lc = 0
for r in f.readlines():
lc = lc + 1
s = string.strip(r)
self.file_contents.append(s)
if lc == 1:
if s[:2] != '#!':
self.log.append("first line missing header, e.g. #!/usr/local/bin/pcgi-wrapper")
raise PcgiFileException
else:
self.pcgi_wrapper = string.strip(s[2:])
if not os.path.isfile(self.pcgi_wrapper):
self.log.append("unable to find wrapper: %s" % \
self.pcgi_wrapper)
raise PcgiFileException
elif not self.isexecutable(self.pcgi_wrapper):
self.log.append("wrapper '%s' is not executable" % \
self.pcgi_wrapper)
raise PcgiFileException
if len(s) < 1 or s[0] == '#':
continue
pos = string.find(s, Delimiter)
if pos < 0:
self.log.append("missing '%s' delimiter at line %d: %s" % \
(Delimiter, lc, s))
else:
self.infodict[string.strip(s[0:pos])] = string.strip(s[pos+1:])
f.close()
if len(self.infodict.keys()) > max_directives:
self.log.append("info fileexceeds maximum (%d) number of directives" % max_directives)
raise PcgiFileException
class PcgiFileTest:
"""
CGI sanity check of the pcgi info file.
"""
def __init__(self):
fs = cgi.FieldStorage()
infofile = None
if fs.has_key('infofile'):
infofile = fs['infofile'].value
elif fs.has_key('filename'):
infofile = fs['filename'].value
if infofile is None:
print "Please specify the pcgi info file in the following manner:"
print "<pre>"
print " http://.../cgi-bin/pcgifile.py?<STRONG>infofile=</STRONG><I>pcgifile</I>"
print "</pre>"
print "where <I>pcgifile</I> is the absolute path of the pcgi info file."
else:
print "<pre>"
print "<strong>Python %s</strong>" % sys.version
if os.environ.has_key('SERVER_SOFTWARE'):
print "<strong>%s</strong>" % os.environ['SERVER_SOFTWARE']
print
print "PCGI info file:\t%s" % infofile
pcgiFile = PcgiFile(infofile)
print "PCGI wrapper:\t%s" % pcgiFile.pcgi_wrapper
for m in pcgiFile.log:
print m
if not pcgiFile.continue_testing:
print "status: FAILURE"
print
print "<STRONG>%s</STRONG>" % infofile
for r in pcgiFile.file_contents:
print " %s" % r
else:
print "looks OK"
print
print "<STRONG>%s</STRONG>" % infofile
for r in pcgiFile.file_contents:
print " %s" % r
print
print "<STRONG>Likely publisher resource values:</STRONG>"
for r in pcgiFile.resource:
print " %s" % r
print
print "<STRONG>Versions of modules used:</STRONG>"
for r in pcgiFile.module_version:
print " %s" % r
print
print "<STRONG>Resulting environment will probably appear to the publisher as:</STRONG>"
for e in pcgiFile.environList():
print " %s" % e
def test():
usage = 'usage: pcgifile pcgi_info_file'
if len(sys.argv) < 2:
print usage
sys.exit(1)
infoFile = sys.argv[1]
pcgiFile = PcgiFile(infoFile)
for m in pcgiFile.log:
print m
if pcgiFile.continue_testing:
print "%s looks OK" % infoFile
if __name__ == '__main__':
try:
import cgi, os, sys, string, traceback
if os.environ.has_key('SERVER_PROTOCOL'):
print "Content-type: text/html"
print "Expires: Monday, 1-Jan-96 00:00:00 GMT"
print "Pragma: no-cache"
print
sys.stderr = sys.stdout
try:
pcgiFileTest = PcgiFileTest()
except:
print "<pre>"
traceback.print_exc()
else:
test()
except ImportError:
print "Content-type: text/html"
print
print "error during python imports; to fix try adding to pcgifile.py: <br><pre>"
print " sys.path[0:0] = [path1, path2, ...]"