tests/compare.py

branch
trunk
changeset 17
481a2b4be471
parent 15
28dbd52a8bb8
child 19
bb6e9f4af1a5
--- a/tests/compare.py	Sun Dec 31 19:27:23 2006 -0500
+++ b/tests/compare.py	Sun Dec 31 19:29:46 2006 -0500
@@ -18,9 +18,11 @@
 # 51 Franklin Street, 5th Floor, Boston, MA, 02111.
 
 import os
+import re
 import subprocess
 import syck
 import sys
+import tempfile
 
 from sets import Set as set
 
@@ -40,28 +42,33 @@
 set -e
 """
 
+output_buffer = tempfile.TemporaryFile()
+
 class ExtractorTestError(Exception):
     pass
 
 
 class ExtractorTest(object):
     def __init__(self, **kwargs):
-        for key in ('name', 'filename', 'baseline'):
+        for key in ('name',):
             setattr(self, key, kwargs[key])
-        for key in ('directory', 'prerun', 'posttest'):
+        for key in ('directory', 'prerun', 'posttest', 'baseline', 'error',
+                    'grep', 'antigrep'):
             setattr(self, key, kwargs.get(key, None))
-        for key in ('options',):
+        for key in ('options', 'filenames'):
             setattr(self, key, kwargs.get(key, '').split())
         
     def get_results(self, commands):
-        status = subprocess.call(commands)
-        if status != 0:
-            return None
-        process = subprocess.Popen(['find'], stdout=subprocess.PIPE)
+        print >>output_buffer, "Output from %s:" % (' '.join(commands),)
+        output_buffer.flush()
+        status = subprocess.call(commands, stdout=output_buffer,
+                                 stderr=output_buffer)
+        process = subprocess.Popen(['find', '!', '-name', TESTSCRIPT_NAME],
+                                   stdout=subprocess.PIPE)
         process.wait()
         output = process.stdout.read(-1)
         process.stdout.close()
-        return set(output.split('\n'))
+        return status, set(output.split('\n'))
         
     def write_script(self, commands):
         script = open(TESTSCRIPT_NAME, 'w')
@@ -71,13 +78,13 @@
 
     def get_shell_results(self):
         self.write_script(self.baseline)
-        return self.get_results(['sh', TESTSCRIPT_NAME, self.filename])
+        return self.get_results(['sh', TESTSCRIPT_NAME] + self.filenames)
 
     def get_extractor_results(self):
         if self.prerun:
             self.write_script(self.prerun)
             subprocess.call(['sh', TESTSCRIPT_NAME])
-        return self.get_results([X_SCRIPT] + self.options + [self.filename])
+        return self.get_results([X_SCRIPT] + self.options + self.filenames)
         
     def get_posttest_result(self):
         if not self.posttest:
@@ -103,47 +110,71 @@
                                      (status,))
 
     def show_status(self, status, message=None):
+        raw_status = status.lower()
+        if raw_status != 'passed':
+            output_buffer.seek(0, 0)
+            sys.stdout.write(output_buffer.read(-1))
         if message is None:
             last_part = ''
         else:
-            last_part = ': ' + str(message)
+            last_part = ': %s' % (message,)
         print "%7s: %s%s" % (status, self.name, last_part)
-        return status.lower()
+        return raw_status
 
-    def compare_results(self):
-        self.clean()
-        expected = self.get_shell_results()
-        self.clean()
-        actual = self.get_extractor_results()
+    def compare_results(self, actual):
         posttest_result = self.get_posttest_result()
         self.clean()
-        if expected is None:
-            raise ExtractorTestError("could not get baseline results")
-        elif actual is None:
-            raise ExtractorTestError("could not get extractor results")
-        elif expected != actual:
-            result = self.show_status('FAILED')
-            print "Only in baseline results:"
-            print '\n'.join(expected.difference(actual))
-            print "Only in actual results:"
-            print '\n'.join(actual.difference(expected))
+        status, expected = self.get_shell_results()
+        self.clean()
+        if expected != actual:
+            print >>output_buffer, "Only in baseline results:"
+            print >>output_buffer, '\n'.join(expected.difference(actual))
+            print >>output_buffer, "Only in actual results:"
+            print >>output_buffer, '\n'.join(actual.difference(expected))
+            return self.show_status('FAILED')
         elif posttest_result != 0:
-            result = self.show_status('FAILED')
-            print "Posttest returned status code", posttest_result
-        else:
-            result = self.show_status('Passed')
-        return result
+            print >>output_buffer, "Posttest gave status code", posttest_result
+            return self.show_status('FAILED')
+        return self.show_status('Passed')
     
+    def have_error_mismatch(self, status):
+        if self.error and (status == 0):
+            return "x did not return expected error"
+        elif (not self.error) and (status != 0):
+            return "x returned error code %s" % (status,)
+        return None
+
+    def grep_output(self):
+        output_buffer.seek(0, 0)
+        output_buffer.readline()
+        output = output_buffer.read(-1)
+        if self.grep and (not re.search(self.grep, output)):
+            return "output did not match %s" % (self.grep)
+        elif self.antigrep and re.search(self.antigrep, output):
+            return "output matched antigrep %s" % (self.antigrep)
+        return None
+
+    def check_results(self):
+        output_buffer.seek(0, 0)
+        output_buffer.truncate()
+        self.clean()
+        status, actual = self.get_extractor_results()
+        problem = self.have_error_mismatch(status) or self.grep_output()
+        if problem:
+            return self.show_status('FAILED', problem)
+        return self.compare_results(actual)
+
     def run(self):
         if self.directory:
             os.mkdir(self.directory)
             os.chdir(self.directory)
         try:
-            result = self.compare_results()
+            result = self.check_results()
         except ExtractorTestError, error:
             result = self.show_status('ERROR', error)
         if self.directory:
             os.chdir(ROOT_DIR)
+            subprocess.call(['chmod', '-R', '700', self.directory])
             subprocess.call(['rm', '-rf', self.directory])
         return result
 
@@ -153,12 +184,14 @@
 test_db.close()
 tests = [ExtractorTest(**data) for data in test_data]
 for original_data in test_data:
-    if original_data.has_key('directory'):
+    if (original_data.has_key('directory') or
+        (not original_data.has_key('baseline'))):
         continue
     data = original_data.copy()
     data['name'] += ' in ..'
     data['directory'] = 'inside-dir'
-    data['filename'] = os.path.join('..', data['filename'])
+    data['filenames'] = ' '.join(['../%s' % filename for filename in
+                                  data.get('filenames', '').split()])
     tests.append(ExtractorTest(**data))
 results = [test.run() for test in tests]
 counts = {}
@@ -167,3 +200,4 @@
 for result in results:
     counts[result] += 1
 print " Totals:", ', '.join(["%s %s" % (counts[key], key) for key in OUTCOMES])
+output_buffer.close()

mercurial