[Zope-CVS] CVS: Packages/FunctionalTests/FunctionalTests - ConfigParserExt.py:1.1 FTRunner.py:1.1 Framework.py:1.1 LoggingProxy.py:1.1 ScenarioGenerator.py:1.1 TkLoggingProxy.py:1.1 __init__.py:1.1
Tres Seaver
tseaver@zope.com
Thu, 24 Apr 2003 17:39:53 -0400
Update of /cvs-repository/Packages/FunctionalTests/FunctionalTests
In directory cvs.zope.org:/tmp/cvs-serv3971/FunctionalTests
Added Files:
ConfigParserExt.py FTRunner.py Framework.py LoggingProxy.py
ScenarioGenerator.py TkLoggingProxy.py __init__.py
Log Message:
- Repackage for distutils.
=== Added File Packages/FunctionalTests/FunctionalTests/ConfigParserExt.py ===
"""
Extend the standard ConfigParser.
"""
import ConfigParser
import re
import StringIO
import os
SECTION_HEADER = re.compile( r'^\[([A-Za-z][A-Za-z0-9_]*)\]', re.MULTILINE )
class ConfigParserExt( ConfigParser.ConfigParser ):
"""
Allow ordered query of sections in a config file; some sections
may be "privileged", and not show up in the main list.
"""
def __init__( self, special_sections=( 'DEFAULT', ) ):
ConfigParser.ConfigParser.__init__( self )
self.clear()
self._special_sections = list( special_sections )
#
# Accessors
#
def listSpecialSections( self, include_all=1 ):
if include_all:
return tuple( self._special_sections )
else:
return filter( lambda x, sections=self.sections(): x in sections
, self._special_sections )
def listOtherSections( self ):
return tuple( self._other_sections )
#
# Mutators
#
def clear( self ):
"""
"""
self._defaults = {}
self._special_sections = []
self._other_sections = []
def setDefaults( self, **kw ):
"""
Add kw to our defaults
"""
self._defaults.update( kw )
def read( self, filename ):
"""
"""
if filename is None:
raise ValueError, "Must supply 'filename'."
if '%' in filename:
filename = filename % self._defaults
dir, fn = os.path.split( filename )
self._defaults[ 'config_dir' ] = dir
self._defaults[ 'config_file' ] = fn
file = open( filename )
contents = file.read()
self._other_sections.extend(
filter( lambda x, ss=self.listSpecialSections():
x not in ss
, SECTION_HEADER.findall( contents ) ) )
self.defaults().update( self._defaults )
ConfigParser.ConfigParser.read( self, filename )
=== Added File Packages/FunctionalTests/FunctionalTests/FTRunner.py ===
"""
FunctionTests driver module.
"""
import sys, getopt, re, ConfigParser
_marker = []
class FTRunner:
_verbosity = 1
_time_requests = 1
_check_responses = 1
_check_elapsed_times = 1
_time_requests = 1
_config_file = '.zftrc'
_portal_url = 'http://localhost:8080/'
_site_path = '/'
_filenames = ()
_app = _marker
def __init__( self, args ):
self._defaults = {}
self.parseOptions( args )
def printUsage( self, message=None ):
"""
Print command-line help.
"""
print """\
%(RUNNER_EXE)s [-htTvq] [-c config_file] \\
[-u portal_url] [-r site_path] \\
[name1=value]* scenario_1 ...
-?, --help Print this help message
-t, --time-requests Time requests (default)
-T, --no-time-requests Don't time requests
-r, --check-responses Check expected HTTP respose codes (default)
-R, --no-check-responses Don't check expected HTTP respose codes
-e, --check-elapsed_times Check elapsed times (default)
-E, --no-check-elapsed_times Don't check elapsed times
-v, --verbose Increment verbosity (default is '1')
-q, --quiet Set verbosity to '0'
-c, --config-file Use defaults from 'config_file'
(default is '%(CONFIG_FILE)s')
-u, --portal-url Use 'portal_url' as test target server
(default is '%(PORTAL_URL)s')
-p, --site-path Use 'site_path' as test target path
(default is '%(SITE_PATH)s')
Notes:
- Actually running a test may require being able to 'import Zope',
which means that the appropriate SOFTWARE_HOME, INSTANCE_HOME, and
PYTHONPATH variables should be set up by the script which calls us.
%(MESSAGE)s
""" % ( { 'RUNNER_EXE' : sys.argv[0]
, 'CONFIG_FILE' : self._config_file
, 'PORTAL_URL' : self._portal_url
, 'SITE_PATH' : self._site_path
, 'MESSAGE' : message and 'Error: %s' % message or ''
} )
sys.exit( 1 )
def parseOptions( self, args ):
"""
Parse command line options.
"""
verbosity = 1
portal_url = site_path = config_file = None
time_requests = check_responses = check_elapsed_times = None
try:
opts, arglist = getopt.getopt( args
, "?tTrReEvqc:u:p:"
, [ 'help'
, 'time-requests'
, 'no-time-requests'
, 'check-responses'
, 'no-check-responses'
, 'check-elapsed_times'
, 'no-check-elapsed_times'
, 'verbose'
, 'quiet'
, 'config-file='
, 'portal-url='
, 'site-path='
]
)
except getopt.GetoptError, message:
self.printUsage( message=message)
for o, v in opts:
if o == '-?' or o == '--help':
self.printUsage()
if o == '-t' or o == '--time-requests':
time_requests = 1
if o == '-T' or o == '--no-time-requests':
time_requests = 0
if o == '-r' or o == '--check-responses':
check_responses = 1
if o == '-R' or o == '--no-check-responses':
check_responses = 0
if o == '-e' or o == '--check-elapsed_times':
check_elapsed_times = 1
if o == '-E' or o == '--no-check-elapsed_times':
check_elapsed_times = 0
if o == '-v' or o == '--verbose':
verbosity = verbosity + 1
if o == '-q' or o == '--quiet':
verbosity = 0
if o == '-c' or o == '--config-file':
config_file = v
if o == '-u' or o == '--portal-url':
portal_url = v
if o == '-p' or o == '--site-path':
site_path = v
defaults, filenames = self.parseDefaults( arglist )
if len( filenames ) == 0:
self.printUsage( message='must specify at least one scenario' )
self._filenames = filenames
# First initial defaults, ...
self._defaults = { 'portal_url' : self._portal_url
, 'site_path' : self._site_path
}
self._defaults.update( defaults )
# ... then config file, ...
if config_file is None:
try:
f = open( '.zftrc' )
except IOError:
pass
else:
config_file = '.zftrc'
self._config_file = config_file
if self._config_file is not None:
self.readConfigFile( self._defaults )
# ... then command-line args.
if verbosity is not None:
self._verbosity = verbosity
if time_requests is not None:
self._time_requests = time_requests
if check_responses is not None:
self._check_responses = check_responses
if check_elapsed_times is not None:
self._check_elapsed_times = check_elapsed_times
if portal_url is not None:
self._defaults[ 'portal_url' ] = portal_url
if site_path is not None:
self._defaults[ 'site_path' ] = site_path
def parseDefaults( self, arglist ):
"""
Return a dictionary containing name-value pairs from
arglist (those whose arg matched 'name=value'), plus
a sequence of "normal" arguments.
"""
ARG_PATTERN = re.compile( r'([A-Za-z_][\w_]*)[ ]*=[ ]*(.*)' )
defaults = {}
filenames = []
for arg in arglist:
match = ARG_PATTERN.match( arg )
if match:
defaults[ match.group(1) ] = match.group(2)
else:
filenames.append( arg )
return defaults, filenames
def readConfigFile( self, defaults ):
"""
Initialize defaults from the command line.
"""
cp = ConfigParser.ConfigParser( defaults )
try:
f = open( self._config_file )
cp.readfp( f )
self._defaults.update( cp.defaults() )
if 'Options' in cp.sections():
self._time_requests = cp.get( 'Options'
, 'Time_Requests'
, self._time_requests
)
self._check_responses = cp.get( 'Options'
, 'Check_Responses'
, self._check_responses
)
self._check_elapsed_times = cp.get( 'Options'
, 'Check_Elapsed_Times'
, self._check_elapsed_times
)
self._verbosity = cp.get( 'Options'
, 'Verbosity'
, self._verbosity
)
except IOError:
print '*** No such config file: %s' % self._config_file
except ConfigParser.Error, msg:
print '*** Config file error: %s' % msg
def printHeader( self, title ):
print "=" * 79
print "Test: %s" % title
print "=" * 79
print "%-12s: %-55s: %s" % ( 'Request', 'URL', 'Time (s)' )
print "-" * 79
def printInvocation( self, invocation ):
"""
Print out a single invocation.
"""
request = invocation.getRequest()
print "%-12s: %-55s" % ( request.getName()[:12]
, request.getURI()[:55]
), # note comma
print ' %0.3f' % invocation.getInterval()
def printResult( self, result ):
"""
Print out a Result.
"""
if self._verbosity > 0:
for invocation in result.listInvocations():
self.printInvocation( invocation )
if self._verbosity > 0:
print "-" * 79
outcome = result() and "OK" or "Failed"
print "\n Outcome: %s" % outcome
errors = result.listErrors()
if errors:
print "\n Errors"
for error in errors:
print error
print
if self._verbosity > 1:
for dump in result.listDumpedResults():
print dump
print
def getZopeApp( self ):
"""
Get and return the Zope Application object, if available.
"""
if self._app is _marker: # only try once
try:
import Zope
self._app = Zope.app()
except:
self._app = None
return self._app
def runTestFile( self, filename ):
"""
Run the scenario contained in 'filename'.
"""
import Framework
test = Framework.buildTest( filename, self._defaults )
if self._verbosity > 0:
self.printHeader( test.getTitle() )
result = test( defaults=self._defaults
, app=self.getZopeApp()
, time_requests=self._time_requests
, check_responses=self._check_responses
, check_elapsed_times=self._check_elapsed_times
)
self.printResult( result )
def runAllTests( self ):
for filename in self._filenames:
self.runTestFile( filename )
if __name__ == '__main__':
runner = FTRunner( sys.argv[1:] )
runner.runAllTests()
=== Added File Packages/FunctionalTests/FunctionalTests/Framework.py === (1156/1256 lines abridged)
import string
import os
import re
import time
import httplib
import urllib
import urlparse
import base64
import Cookie
import MimeWriter
import StringIO
import ConfigParserExt
#
# Result classes
#
class Result:
"""
Hold on to the results of running a test or suite of tests.
Implements the GoF Composite pattern.
"""
def __init__( self
, test
, application=None
, defaults={}
, time_requests=1
, check_responses=1
, check_elapsed_times=1
):
self._test = test
self._application = application
self._defaults = {}
self._defaults.update( defaults )
self._state = {}
self._time_requests = time_requests
self._check_responses = check_responses
self._check_elapsed_times = check_elapsed_times and time_requests
self._invocations = []
self._errors = []
self._fatal_errors = []
self._children = []
self._cookies = []
self._parent = None
#
# Queries
#
[-=- -=- -=- 1156 lines omitted -=- -=- -=-]
filename = filename % defaults
dir, fn = os.path.split( filename )
defaults[ 'config_dir' ] = dir
defaults[ 'config_file' ] = fn
cp = ConfigParserExt.ConfigParserExt( _SPECIAL_SECTIONS )
apply( cp.setDefaults, (), defaults )
cp.read( filename )
test = None
isScenario = 0
if _SUITE_SECTION in cp.listSpecialSections( 0 ):
isScenario = 0
test = Suite()
test.setTitle( cp.get( _SUITE_SECTION, 'Title' ) )
if _SCENARIO_SECTION in cp.listSpecialSections( 0 ):
isScenario = 1
test = Scenario()
test.setTitle( cp.get( _SCENARIO_SECTION, 'Title' ) )
test.setUseCase( cp.get( _SCENARIO_SECTION, 'Use_case' ) )
if cp.has_option( _SCENARIO_SECTION, 'load_sequence' ):
test.setLoadSequenece( cp.get( _SCENARIO_SECTION
, 'load_sequence' ) )
if _SETUP_SECTION in cp.listSpecialSections( 0 ):
test.setSetup( _buildRequest( cp, _SETUP_SECTION ) )
for section in cp.listOtherSections():
options = cp.options( section )
if 'repeat' in options:
repeat_count = cp.getint( section, 'repeat' )
else:
repeat_count = 1
if isScenario:
test.addRequest( _buildRequest( cp, section ), repeat_count )
else:
child = buildTest( cp.get( section, 'file' ), defaults )
test.addChild( child, repeat_count )
if isScenario and _POSTCONDITION_SECTION in cp.listSpecialSections( 0 ):
test.setPostcondition( _buildRequest( cp, _POSTCONDITION_SECTION ) )
if _TEARDOWN_SECTION in cp.listSpecialSections( 0 ):
test.setTeardown( _buildRequest( cp, _TEARDOWN_SECTION ) )
return test
=== Added File Packages/FunctionalTests/FunctionalTests/LoggingProxy.py ===
import sys, os, string, re
import socket, threading, getopt
from SocketServer import BaseRequestHandler, TCPServer
OUTBOUND = 1
INBOUND = 0
class LoggingServer( TCPServer ):
_verbosity = 2
_listen_host = 'localhost'
_listen_port = 9999
_logfile_directory = '/tmp'
_logfile_prefix = 'zft'
_record_inbound = 0
def __init__( self, args, filter_class ):
self.parseOptions( args )
address = ( self._listen_host, self._listen_port )
TCPServer.__init__( self, address, filter_class )
def printUsage( self, msg=None ):
"""
Print command-line help.
"""
print """\
%(PROXY_EXE)s [-?vqiI] [-h listen_host] [-p listen_port] \\
[-l log_dir] [-f log_prefix]
-?, --help Print this help message
-v, --verbose Increment verbosity (default is '1')
-q, --quiet Set verbosity to '0'
-i, --record-inbound Record inbound data
-I, --no-record-inbound Don't record inbound data (default)
-h, --listen-host Host address to listen on
(default '%(LISTEN_HOST)s')
-p, --listen-port Port number to listen on
(default '%(LISTEN_PORT)s')
-l, --logfile-directory Directory to write log files
(default '%(LOGFILE_DIRECTORY)s')
-f, --logfile-prefix Prefix for log file names
(default '%(LOGFILE_PREFIX)s')
%(MESSAGE)s
""" % ( { 'PROXY_EXE' : sys.argv[0]
, 'LISTEN_HOST' : self._listen_host
, 'LISTEN_PORT' : self._listen_port
, 'LOGFILE_DIRECTORY' : self._logfile_directory
, 'LOGFILE_PREFIX' : self._logfile_prefix
, 'MESSAGE' : msg and 'Error: %s' % msg or ''
} )
sys.exit( 1 )
def parseOptions( self, args ):
"""
Parse command line options.
"""
verbosity = self._verbosity
record_inbound = self._record_inbound
listen_host = listen_port = logfile_directory = logfile_prefix = None
try:
opts, ignored = getopt.getopt( args
, "?vqiIh:p:l:f:"
, [ 'help'
, 'verbose'
, 'quiet'
, 'record-inbound'
, 'no-record-inbound'
, 'listen-host='
, 'listen-port='
, 'logfile-directory='
, 'logfile-prefix='
]
)
except getopt.GetoptError, msg:
self.printUsage( msg=msg)
for o, v in opts:
if o == '-?' or o == '--help':
self.printUsage()
if o == '-v' or o == '--verbose':
verbosity = verbosity + 1
if o == '-q' or o == '--quiet':
verbosity = 0
if o == '-i' or o == '--record-inbound':
record_inbound = 1
if o == '-I' or o == '--no-record-inbound':
record_inbound = 0
if o == '-h' or o == '--listen-host':
listen_host = v
if o == '-p' or o == '--listen-port':
listen_port = v
if o == '-l' or o == '--logfile-directory':
logfile_directory = v
if o == '-f' or o == '--logfile-prefix':
logfile_prefix = v
self._verbosity = verbosity
self._record_inbound = record_inbound
if listen_host is not None:
self._listen_host = listen_host
if listen_port is not None:
self._listen_port = int( listen_port )
if logfile_directory is not None:
self._logfile_directory = logfile_directory
if logfile_prefix is not None:
self._logfile_prefix = logfile_prefix
def process_request( self, request, client_address ):
"""
Start a new thread to process the request.
"""
t = threading.Thread( target=self.finish_request
, args=( request, client_address ) )
t.start()
handler_count = 0
hc_lock = threading.RLock()
def incrementHC():
global handler_count
hc_lock.acquire()
try:
handler_count = handler_count + 1
return handler_count
finally:
hc_lock.release()
class LoggingFilter( BaseRequestHandler ):
"""
"""
def handle( self ):
"""
Organise the data flows.
"""
LSW = self._getWrapperClass()
count = incrementHC()
try:
directory = self._getDirectory()
prefix = self._getPrefix()
client = LSW( skt=self.request
, directory=directory
, prefix=prefix
, count=count
, direction=INBOUND
, suppress_logging=not self.server._record_inbound
, verbosity=self.server._verbosity
)
server = LSW( skt=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
, directory=directory
, prefix=prefix
, count=count
, direction=OUTBOUND
, verbosity=self.server._verbosity
)
self.doRequest( client, server )
self.doTransfer( client, server )
except:
self._logTraceback()
self.close( client, server )
def doRequest( self
, client
, server
, HTTP_REQUEST = re.compile( r'^\s*(\S+)\s+'
+ r'http://([^\s/:]+)(:(\d+))'
+ r'?(\S*)\s*(\S+)\s*([\n\r]+)$'
)
):
"""
Process the request line (first line from the client).
"""
req = client.readln()
match = HTTP_REQUEST.search( req )
if not match:
raise IOError, 'Unexpected Url format: %s' % req
cmd, name, junk, port, path, protocol, cr = match.groups()
if not port:
port = "80"
address = ( name, int( port ) )
req = string.join( ( cmd, ' ', path, ' ', 'HTTP/1.0', '\r\n' ), '' )
server.connect( address )
server.send( req ) # send the modified request
server.log( 'Request: %s' % req )
return name
def doTransfer( self, client, server ):
"""
Transfer data back and forth.
"""
length = client.forwardHeaders( server, OUTBOUND )
if length:
client.forwardBinary( server, OUTBOUND, length )
length = server.forwardHeaders( client, INBOUND )
server.forwardBinary( client, INBOUND, length )
def close( self, client, server ):
"""
Close the sockets.
"""
try:
client.close()
except:
pass
try:
server.close()
except:
pass
#
# Derived filters should override these to adapt to a different UI.
#
def _getWrapperClass( self ):
"""
Return the class from which to make socket wrappers.
"""
return LoggingServerWrapper
def _getDirectory( self ):
"""
Return the directory path used for storing logfiles.
"""
return self.server._logfile_directory
def _getPrefix( self ):
"""
Return the prefixed used for naming logfiles.
"""
return self.server._logfile_prefix
def _logTraceback( self ):
"""
Log an error.
"""
import traceback
traceback.print_exc()
class LoggingServerWrapper:
"""
Extends SW to log outbound headers and data, plus returned headers.
"""
def __init__( self
, skt
, directory
, prefix
, count
, direction
, suppress_logging=0
, verbosity=0
):
self.skt = skt
self.inBuffer = ''
self.chunkSize = 8192
self.readAll = 0 # no more data to read
self.verbosity = verbosity
if suppress_logging:
self.teeFile = None
else:
teeFileName = '%s/%s_%03d.%s' % ( directory
, prefix
, count
, direction and 'out' or 'in'
)
self.teeFile = open( teeFileName, 'w' )
def readln( self
, LINE=re.compile( r'^([^\n]*\n\r?)(.*)', re.S )
):
"""
Get a line from the buffer (reading if necessary).
Return None on error.
"""
while 1:
match = LINE.search( self.inBuffer )
if match:
break
else: # no match, but may have all the data anyway
if self.readAll: # so return what there is
if self.inBuffer == '':
return None # error?
line = self.inBuffer
self.inBuffer = ''
return line
else: # or read more
self.extendBuffer()
line, self.inBuffer = match.groups() # save remainder in buffer
return line
def extendBuffer( self ):
"""
Read more from the socket, extending the buffer.
"""
if not self.readAll:
more = self.skt.recv( self.chunkSize )
if not more:
self.readAll = 1
else:
self.inBuffer = string.join((self.inBuffer, more), '')
def connect( self, address ):
"""
Connect the underlying socket.
"""
self.skt.connect( address )
def send( self, string ):
"""
Send to the underlying socket.
"""
self.skt.send( string )
if self.teeFile:
self.teeFile.write( string )
def log( self, msg ):
if self.verbosity:
sys.stderr.write( '* %s\n' % msg )
def forwardHeaders( self
, skt
, direction
, CONTENT_LENGTH = re.compile(r'Content-length:\s*(\d+)')
, EMPTY = re.compile( r'^\s*\n\r?$' )
):
"""
Copy headers from this socket to the given destination.
"""
length = 0
sent_connection_close = 0
while 1:
line = self.readln()
if line == None:
break # connection closed
if line and line[0] not in ( ' ', '\t', '\r', '\n' ):
try:
header_name, rest = string.split( line, ':', 1 )
except ValueError: # split -> unpack failed.
pass
else:
if header_name.lower() in ( 'connection'
, 'proxy-connection'
, 'keep-alive'
):
if not sent_connection_close:
skt.send( 'Connection: close' )
sent_connection_close = 1
continue
match = CONTENT_LENGTH.search( line )
if match:
length = int( match.group( 1 ) )
skt.send( line )
self.log( '%s header: %s' % ( direction and 'out' or 'in'
, line ) )
if EMPTY.search( line ):
break
return length
def forwardBinary( self, skt, direction, length=0 ):
"""
Extend base to record actual data sent.
"""
count = length > 0
while 1:
if self.inBuffer:
skt.send( self.inBuffer )
size = len( self.inBuffer )
self.log( '%s data: %d' % ( direction and 'out' or 'in'
, size ) )
self.inBuffer = ''
if count:
length = length - size
if length <= 0:
break
self.extendBuffer()
if self.readAll:
break
def close(self):
"""
Close the underlying socket.
"""
self.skt.close()
if __name__ == '__main__':
import sys
server = LoggingServer( sys.argv[1:], LoggingFilter )
server.serve_forever()
=== Added File Packages/FunctionalTests/FunctionalTests/ScenarioGenerator.py ===
"""
LoggingProxy request post-processor.
"""
import sys, string, re
import getopt, glob, rfc822, urllib
SCENARIO_SKELETON = """\
[Scenario]
Title : Enter scenario title here
Use_case: Enter use case name here
"""
REQUEST_SKELETON = """
[Request_%(REQUEST_NUM)03d]
HTTP_Verb: %(HTTP_VERB)s
HTTP_Version: %(HTTP_VERSION)s
URL: %%(portal_url)s/%%(site_path)s%(REQUEST_URI)s
"""
AUTHENTICATION_SKELETON = """\
Authentication: %%(userid)s:%%(password)s
"""
HEADER_SKELETON = """\
Header_%(HEADER_NUM)d: %(HEADER_NAME)=%(HEADER_VALUE)s
"""
COOKIE_SKELETON = """\
Cookie_%(COOKIE_NUM)d: %(COOKIE_VALUE)s
"""
FIELD_SKELETON = """\
Field_%(FIELD_NUM)d: %(FIELD_NAME)s=%(FIELD_VALUE)s
"""
RESULT_SKELETON = """\
Expected_Result: %(RESULT_CODE)d
"""
DEFAULT_SKELETON = """
[DEFAULT]
userid: userid
password: password
"""
class ScenarioGenerator:
"""
Convert a series of HTTP requests files (as generated by
LoggingProxy) into a scenario file (to be consumed by
FT_Runner).
"""
_verbosity = 1
_capture_cookies = 0
_logfile_directory = '/tmp'
_logfile_prefix = 'zft'
_output_file = None
_exclude_patterns = []
_exclude_file = None
_exclude_regex = None
def __init__( self, args ):
self.parseOptions( args )
def printUsage( self, msg=None ):
"""
Dump a help message, and bail out.
"""
sys.stderr.write( """
%(GENERATOR_EXE)s [-?vq] \\
[-l log_dir] [-f log_prefix] [-o file] \\
[-x pattern] [-X file]
-?, --help Print this help message
-v, --verbose Increment verbosity (default is '1')
-q, --quiet Set verbosity to '0'
-c, --caputure-cookies Capture cookies
-C, --no-capture-cookies Don't capture cookies (default)
-l, --logfile-directory Directory from which to read log files
(default '%(LOGFILE_DIRECTORY)s')
-f, --logfile-prefix Prefix for log file names
(default '%(LOGFILE_PREFIX)s')
-o, --output-file Write to 'file', instead of default
(%(LOGFILE_PREFIX)s.zft). Use '-'
to write to stdout.
-x, --exclude-pattern Exclude requests which match 'pattern'
(e.g., to suppress stylesheet or images).
-X, --exclude-file Exclude requests which match any pattern
read from 'file' (one pattern per line).
%(MESSAGE)s\n""" % { 'GENERATOR_EXE' : sys.argv[0]
, 'LOGFILE_DIRECTORY' : self._logfile_directory
, 'LOGFILE_PREFIX' : self._logfile_prefix
, 'MESSAGE' : msg or ''
} )
sys.exit( 1 )
def parseOptions( self, args ):
"""
Parse command-line options.
"""
verbosity = self._verbosity
capture_cookies = self._capture_cookies
logfile_directory = logfile_prefix = output_file = None
exclude_file = portal_url = site_path = None
try:
opts, ignored = getopt.getopt( args
, "?vqcCl:f:o:x:X:"
, [ 'help'
, 'verbose'
, 'quiet'
, 'capture-cookies'
, 'no-capture-cookies'
, 'logfile-directory='
, 'logfile-prefix='
, 'output-file'
, 'exclude-pattern'
, 'exclude-file'
]
)
except getopt.GetoptError, msg:
self.printUsage( msg=msg)
for o, v in opts:
if o == '-?' or o == '--help':
self.printUsage()
if o == '-v' or o == '--verbose':
verbosity = verbosity + 1
if o == '-q' or o == '--quiet':
verbosity = 0
if o == '-c' or o == '--capture-cookies':
capture_cookies = 1
if o == '-C' or o == '--no-capture-cookies':
capture_cookies = 0
if o == '-l' or o == '--logfile-directory':
logfile_directory = v
if o == '-f' or o == '--logfile-prefix':
logfile_prefix = v
if o == '-o' or o == '--output-file':
output_file = v
if o == '-x' or o == '--exclude-pattern':
self._addExcludePattern( v )
if o == '-X' or o == '--exclude-file':
exclude_file = v
self._verbosity = verbosity
self._capture_cookies = capture_cookies
if logfile_directory is not None:
self._logfile_directory = logfile_directory
if logfile_prefix is not None:
self._logfile_prefix = logfile_prefix
if output_file is not None:
if output_file == '-':
self._output_file = sys.stdout
else:
self._output_file = open( output_file, 'w' )
else:
self._output_file = open( '%s/%s.zft'
% ( self._logfile_directory
, self._logfile_prefix
), 'w' )
if exclude_file is not None:
self._exclude_file = exclude_file
def _log( self, msg, level ):
"""
Write a note to stderr (if verbosity enabled).
"""
if level <= self._verbosity:
sys.stderr.write( "%s\n" % msg )
def _print( self, fmt, **kw ):
"""
Dump the appropriately-formatted values to our output
file.
"""
self._output_file.write( fmt % kw )
def _handleCookie( self, value, parms ):
"""
Dump a cookie value.
"""
if self._capture_cookies:
count = parms.get( 'cookie_count', 0 ) + 1
parms[ 'cookie_count' ] = count
self._print( COOKIE_SKELETON
, COOKIE_NUM=count
, COOKIE_VALUE=value
)
def _handleAuthentication( self, value, parms ):
"""
Dump a placeholder for authentication, and record the
fact that we saw it (so 'processScenario' can write the
'[DEFAULT]' section).
"""
parms[ 'saw_authentication' ] = 1
self._print( AUTHENTICATION_SKELETON )
def _handleContentType( self, value, parms ):
"""
Record the content type for later processing.
"""
parms[ 'content_type' ] = value
HEADER_HANDLERS =\
{ 'cookie' : _handleCookie
, 'authentication' : _handleAuthentication
, 'content-type' : _handleContentType
}
def _handleURLEncoded( self, data ):
"""
Emit Field_## values for data, which is encoded using
'application/x-www-form-urlencoded'.
"""
count = 0
for key_val in string.split( data, '&' ):
k, v = map( urllib.unquote_plus, string.split( key_val, '=' ) )
count = count + 1
self._print( FIELD_SKELETON
, FIELD_NUM=count
, FIELD_NAME=k
, FIELD_VALUE=v
)
def _handleFormdata( self, data ):
"""
Emit Field_## values for data, which is encoded using
'multipart/formdata'.
"""
raise NotImplemented # TODO: Implement me!
CONTENT_HANDLERS =\
{ 'application/x-www-form-urlencoded' : _handleURLEncoded
, 'multipart/formdata' : _handleFormdata
}
def _addExcludePattern( self, pattern ):
"""
Add a pattern to our list of excluded patterns.
"""
self._exclude_patterns.append( r'(%s)' % pattern )
self._exclude_regex = None
def _getExcludeRegex( self
, join=string.join
, compile=re.compile
):
"""
Return a regex which, if matched, indicates that we should
skip the file.
"""
if self._exclude_regex is None:
if self._exclude_file:
f = open( self._exclude_file )
for line in f.readlines():
line = line.strip()
self._addExcludePattern( line )
if self._exclude_patterns:
self._exclude_regex = compile( join( self._exclude_patterns
, '|' ) )
return self._exclude_regex
def processFile( self
, filename
, request_num
, parms={}
, REQUEST_LINE=re.compile( r'^([^\s]+)\s+'
r'([^\s]+)\s+'
r'([^\s]+)$' )
):
"""
Process a single request file; record global context
in parms.
"""
self._log( 'Scanning file: %s' % filename, 1 )
parms[ 'content_type' ] = None
parms[ 'header_count' ] = 0
parms[ 'cookie_count' ] = 0
f = open( filename )
all_text = f.read()
f.seek( 0 )
exclude = self._getExcludeRegex()
if exclude is not None and exclude.search( all_text ):
self._log( '** matches exclude regex, skipping', 1 )
return request_num
request = string.rstrip( f.readline() )
match = REQUEST_LINE.match( request )
if not match:
self._log( 'Invalid request line: %s' % request, 0 )
return
http_verb, uri, http_version = match.groups()
request_num = request_num + 1
self._print( REQUEST_SKELETON
, REQUEST_NUM=request_num
, HTTP_VERB=http_verb
, HTTP_VERSION=http_version
, REQUEST_URI=uri
)
headers = rfc822.Message( f )
payload = f.read()
f.close()
for k, v in headers.items():
self._log( 'Header %s: %s' % ( k, v ), 3 )
handler = self.HEADER_HANDLERS.get( k, None )
if handler:
handler( self, v, parms )
content_type = parms[ 'content_type' ]
if content_type is not None:
handler = self.CONTENT_HANDLERS.get( content_type, None )
if handler is None:
self._log( "Can't handle content type: %s"
% content_type, 0 )
else:
handler( self, payload )
self._print( RESULT_SKELETON
, RESULT_CODE=200 # TODO: Read from response?
)
return request_num
def processScenario( self ):
"""
Read all files in '_logfile_directory' whose prefix
matches '_logfile_prefix'; create a scenario file
with one section per request file, dumping to specified
output file.
"""
self._print( SCENARIO_SKELETON )
glob_pattern = '%s/%s_*.out' % ( self._logfile_directory
, self._logfile_prefix )
request_num = 0
parms = { 'saw_authentication' : 0 }
for filename in glob.glob( glob_pattern ):
request_num = self.processFile( filename, request_num, parms )
if parms[ 'saw_authentication' ]:
self._print( DEFAULT_SKELETON )
if __name__ == '__main__':
ScenarioGenerator( sys.argv[1:] ).processScenario()
=== Added File Packages/FunctionalTests/FunctionalTests/TkLoggingProxy.py ===
from Tkinter import *
class FormDialog( Frame ):
"""
"""
def __init__( self, parent, labels_and_vars ):
Frame.__init__( self, parent )
self._org_vars = []
self._working_vars = []
self._makeWidgets( labels_and_vars )
def _makeLabeledEntry( self, frame, row, label_text, linkvar ):
l = Label( frame, text=label_text )
l.grid( row=row, col=0, sticky="w" )
e = Entry( frame, textvariable=linkvar )
e.grid( row=row, col=1, sticky="we" )
return e
def _makeLabeledCheckbox( self, frame, row, label_text, linkvar ):
l = Label( frame, text=label_text )
l.grid( row=row, col=0, sticky="w" )
e = Checkbutton( frame, variable=linkvar )
e.grid( row=row, col=1, sticky="w" )
return e
def _makeButton( self, bar, caption, command ):
b = Button( bar, text=caption, command=command )
b.pack( side=LEFT )
return b
def _makeWidgets( self, labels_and_vars ):
self.pack()
frame = Frame( self )
frame.grid_columnconfigure( 0, weight=0 )
frame.grid_columnconfigure( 1, weight=100 )
row = 0
for label, var in labels_and_vars:
self._org_vars.append( var )
new_var = var.__class__()
new_var.set( var.get() )
self._working_vars.append( new_var )
if var.__class__ is BooleanVar:
self._makeLabeledCheckbox( frame, row, label, new_var )
else:
self._makeLabeledEntry( frame, row, label, new_var )
row = row + 1
frame.pack( side=TOP )
bar = Frame( self )
self._makeButton( bar, "OK", self._ok )
self._makeButton( bar, "Cancel", self.master.destroy )
bar.pack( side=BOTTOM )
def _ok( self ):
"""
Copy working vars back to originals.
"""
for i in range( len ( self._working_vars ) ):
self._org_vars[i].set( self._working_vars[i].get() )
self.master.destroy()
class TkLoggingProxy( Frame ):
def __init__( self, parent=None ):
Frame.__init__( self, parent )
self._verbosity = IntVar()
self._listen_host = StringVar()
self._listen_port = IntVar()
self._logfile_directory = StringVar()
self._logfile_prefix = StringVar()
self._record_inbound = BooleanVar()
self._makeWidgets()
self._verbosity.set( 2 )
self._listen_host.set( 'localhost' )
self._listen_port.set( 9999 )
self._logfile_directory.set( '/tmp' )
self._logfile_prefix.set( 'zft' )
self._record_inbound.set( 0 )
self.master.title( "Logging HTTP Proxy" )
def _makeLabeledEntry( self, frame, row, label_text, linkvar ):
l = Label( frame, text=label_text )
l.grid( row=row, col=0, sticky="w" )
e = Entry( frame, textvariable=linkvar )
e.grid( row=row, col=1, sticky="we" )
return e
def _makeLabeledCheckbox( self, frame, row, label_text, linkvar ):
l = Label( frame, text=label_text )
l.grid( row=row, col=0, sticky="w" )
e = Checkbutton( frame, variable=linkvar )
e.grid( row=row, col=1, sticky="w" )
return e
def _makeButton( self, bar, caption, command ):
b = Button( bar, text=caption, command=command )
b.pack( side=LEFT )
return b
def _showOptions( self ):
self._showLogMessage( 'Listen: %s:%d'
% ( self._listen_host.get()
, self._listen_port.get()
)
)
self._showLogMessage( 'Logging: %s, %s'
% ( self._logfile_directory.get()
, self._logfile_prefix.get()
)
)
def _optionsDialog( self, labels_and_vars ):
top = Toplevel()
dialog = FormDialog( top, labels_and_vars )
top.grab_set()
top.focus_set()
top.wait_window()
def _serverOptions( self ):
self._optionsDialog( ( ( "Host", self._listen_host )
, ( "Port", self._listen_port )
)
)
def _loggingOptions( self ):
self._optionsDialog( ( ( "Directory", self._logfile_directory )
, ( "Prefix", self._logfile_prefix )
)
)
def _showLogMessage( self, message ):
self._text.configure( state=NORMAL )
try:
self._text.insert( END, '%s\n' % message )
finally:
self._text.configure( state=DISABLED )
self._text.see( END )
def _makeWidgets( self ):
self.pack()
bar = Frame( self )
self._makeButton( bar, "Server Options", self._serverOptions )
self._makeButton( bar, "Logging Options", self._loggingOptions )
self._makeButton( bar, "Check", self._showOptions )
bar.pack( side=TOP )
frame = Frame( self )
vbar = Scrollbar( frame, name='vbar' )
text = self._text = Text( frame, name='text', padx=5
, background="white", wrap="none" )
vbar[ 'command' ] = text.yview
vbar.pack( side=RIGHT, fill=Y )
text[ 'yscrollcommand' ] = vbar.set
text.configure( state=DISABLED )
text.pack( side=LEFT, fill=BOTH, expand=1 )
frame.pack( fill=BOTH, expand=1 )
self._showLogMessage( "We're up!" )
if __name__ == '__main__':
main_window = TkLoggingProxy()
main_window.mainloop()
=== Added File Packages/FunctionalTests/FunctionalTests/__init__.py ===
import Framework
functest_globals = globals()