# HG changeset patch # User brett # Date 1167610826 18000 # Node ID 6f9e1bb597197395e8608fe84e271f25e839906e # Parent 0a3ef1b9f6d4244a9b323f33df992cf931894acd [svn] Add support for just decompressing files that are compressed. So, if you have foobar.txt.gz, running x on it will give you foobar.txt. I also moved the test definitions to a YAML file, which makes them easier to write and understand. Hopefully. diff -r 0a3ef1b9f6d4 -r 6f9e1bb59719 scripts/x --- a/scripts/x Tue Dec 19 19:27:14 2006 -0500 +++ b/scripts/x Sun Dec 31 19:20:26 2006 -0500 @@ -28,7 +28,7 @@ from cStringIO import StringIO -VERSION = "2.0" +VERSION = "2.1" VERSION_BANNER = """x version %s Copyright (c) 2006 Brett Smith @@ -46,10 +46,57 @@ # ONE_DIRECTORY = 2 BOMB = 3 EMPTY = 4 +DECOMPRESSED = 5 mimetypes.encodings_map.setdefault('.bz2', 'bzip2') mimetypes.types_map['.exe'] = 'application/x-msdos-program' +def run_command(command, description, stdout=None, stderr=None, stdin=None): + process = subprocess.Popen(command, stdin=stdin, stdout=stdout, + stderr=stderr) + status = process.wait() + for pipe in (process.stdout, process.stderr): + try: + pipe.close() + except AttributeError: + pass + if status != 0: + return ("%s error: '%s' returned status code %s" % + (description, ' '.join(command), status)) + return None + +class FilenameChecker(object): + def __init__(self, original_name): + self.original_name = original_name + + def is_free(self, filename=None): + if filename is None: + filename = self.original_name + return self._is_free(filename) + + def _is_free(self, filename): + return not os.path.exists(filename) + + def check(self): + for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]: + filename = '%s%s' % (self.original_name, suffix) + if self.is_free(filename): + return filename + raise ValueError("all alternatives for name %s taken" % + (self.original_name,)) + + +class DirectoryChecker(FilenameChecker): + def _is_free(self, filename): + try: + os.mkdir(filename) + except OSError, error: + if error.errno == errno.EEXIST: + return False + raise + return True + + class ExtractorError(Exception): pass @@ -89,10 +136,12 @@ class BaseExtractor(object): decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'} + name_checker = DirectoryChecker + def __init__(self, filename, mimetype, encoding): if encoding and (not self.decoders.has_key(encoding)): raise ValueError("unrecognized encoding %s" % (encoding,)) - self.filename = filename + self.filename = os.path.realpath(filename) self.mimetype = mimetype self.encoding = encoding self.included_archives = [] @@ -107,17 +156,9 @@ def run(self, command, description="extraction", stdout=None, stderr=None, stdin=None): - process = subprocess.Popen(command, stdin=stdin, stdout=stdout, - stderr=stderr) - status = process.wait() - if status != 0: - raise ExtractorError("%s error: '%s' returned status code %s" % - (description, ' '.join(command), status)) - for pipe in (process.stdout, process.stderr): - try: - pipe.close() - except AttributeError: - pass + error = run_command(command, description, stdout, stderr, stdin) + if error: + raise ExtractorError(error) def pipe(self, command, description, stderr=None): output = tempfile.TemporaryFile() @@ -166,21 +207,24 @@ return '.'.join(pieces) def extract(self, path): + old_path = os.path.realpath(os.curdir) + os.chdir(path) self.archive.seek(0, 0) self.extract_archive() + os.chdir(old_path) class TarExtractor(BaseExtractor): def get_filenames(self): return ProcessStreamer(['tar', '-t'], self.archive) - def extract_archive(self): + def extract_archive(self): self.run(['tar', '-x'], stdin=self.archive) class ZipExtractor(BaseExtractor): def __init__(self, filename, mimetype, encoding): - self.filename = filename + self.filename = os.path.realpath(filename) self.mimetype = mimetype self.encoding = encoding self.included_archives = [] @@ -189,8 +233,8 @@ def get_filenames(self): return ProcessStreamer(['zipinfo', '-1', self.filename], None) - def extract(self, path): - self.run(['unzip', '-q', os.path.join(path, self.filename)]) + def extract_archive(self): + self.run(['unzip', '-q', self.filename]) class CpioExtractor(BaseExtractor): @@ -247,64 +291,90 @@ return BOMB -class MatchHandler(object): - def __init__(self, extractor, contents): +class CompressionExtractor(BaseExtractor): + name_checker = FilenameChecker + + def basename(self): + pieces = os.path.basename(self.filename).split('.') + extension = '.' + pieces[-1] + if mimetypes.encodings_map.has_key(extension): + pieces.pop() + return '.'.join(pieces) + + def suggest_target(self): + return FilenameChecker().check(self.basename()) + + def check_contents(self): + return self.basename() + + def extract(self, path): + output = open(path, 'w') + self.run(['cat'], "output write", stdin=self.archive, stdout=output) + output.close() + + +class BaseHandler(object): + def __init__(self, extractor, contents, options): self.logger = logging.getLogger('x-log') self.extractor = extractor self.contents = contents - self.directory = extractor.basename() + self.options = options - def extract(self, directory='.'): + def extract(self): + checker = self.extractor.name_checker(self.extractor.basename()) + if self.options.overwrite or checker.is_free(): + self.target = self.extractor.basename() + self.overwrite() + else: + self.target = checker.check() + self.safe_extract() + + def do_extract(self, directory): try: self.extractor.extract(directory) except ExtractorError, error: return error.strerror - + def cleanup(self): - command = 'chmod' - status = subprocess.call(['chmod', '-R', 'u+rw', self.directory]) + command = 'find' + status = subprocess.call(['find', self.target, '-type', 'd', + '-exec', 'chmod', 'u+rwx', '{}', ';']) if status == 0: - command = 'find' - status = subprocess.call(['find', self.directory, '-type', 'd', - '-exec', 'chmod', 'u+x', '{}', ';']) + command = 'chmod' + status = subprocess.call(['chmod', '-R', 'u+rw', self.target]) if status != 0: return "%s returned with exit status %s" % (command, status) + + +class MatchHandler(BaseHandler): + def overwrite(self): + return self.do_extract('.') + + def safe_extract(self): + tempdir = tempfile.mkdtemp() + result = self.do_extract(tempdir) + if result is None: + os.rename(os.path.join(tempdir, self.extractor.basename()), + self.target) + os.rmdir(tempdir) + return result -class BombHandler(MatchHandler): - def __init__(self, extractor, contents): - MatchHandler.__init__(self, extractor, contents) - basename = self.directory - for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]: - self.directory = '%s%s' % (basename, suffix) - try: - os.mkdir(self.directory) - except OSError, error: - if error.errno == errno.EEXIST: - continue - raise ValueError("could not make extraction directory %s: %s" % - (error.filename, error.strerror)) - if suffix != '': - self.logger.warning("%s: extracted to %s", - extractor.filename, self.directory) - break - else: - raise ValueError("all good names for an extraction directory taken") +class BombHandler(BaseHandler): + def safe_extract(self): + return self.do_extract(self.target) - def extract(self): - os.chdir(self.directory) - return MatchHandler.extract(self, '..') + def overwrite(self): + self.target = self.extractor.basename() + return self.do_extract(self.target) - def cleanup(self): - os.chdir('..') - return MatchHandler.cleanup(self) - class EmptyHandler(object): - def __init__(self, extractor, contents): pass + def __init__(self, extractor, contents, options): pass def extract(self): pass def cleanup(self): pass + extractor_map = {'application/x-tar': TarExtractor, 'application/zip': ZipExtractor, 'application/x-msdos-program': ZipExtractor, @@ -314,7 +384,8 @@ 'application/x-cpio': CpioExtractor} handler_map = {EMPTY: EmptyHandler, - MATCHING_DIRECTORY: MatchHandler} + MATCHING_DIRECTORY: MatchHandler, + DECOMPRESSED: BombHandler} class ExtractorApplication(object): def __init__(self, arguments): @@ -338,6 +409,15 @@ parser.add_option('-v', '--verbose', dest='verbose', action='count', default=0, help='be verbose/print debugging information') + parser.add_option('-o', '--overwrite', dest='overwrite', + action='store_true', default=False, + help='overwrite any existing target directory') +## parser.add_option('-f', '--flat', '--no-directory', dest='flat', +## action='store_true', default=False, +## help="don't put contents in their own directory") +## parser.add_option('-n', '--noninteractive', dest='batch', +## action='store_true', default=False, +## help="don't ask how to handle special cases") self.options, filenames = parser.parse_args(arguments) if not filenames: parser.error("you did not list any archives") @@ -357,13 +437,17 @@ try: extractor = extractor_map[mimetype] except KeyError: - return "not a known archive type" + if encoding: + extractor = CompressionExtractor + else: + return "not a known archive type" try: self.current_extractor = extractor(self.current_filename, mimetype, encoding) content = self.current_extractor.check_contents() handler = handler_map.get(content, BombHandler) - self.current_handler = handler(self.current_extractor, content) + self.current_handler = handler(self.current_extractor, content, + self.options) except ExtractorError, error: return str(error) @@ -373,7 +457,7 @@ for filename in self.current_extractor.included_archives: tail_path, basename = os.path.split(filename) directory = os.path.join(self.current_directory, - self.current_handler.directory, tail_path) + self.current_handler.target, tail_path) self.archives.setdefault(directory, []).append(basename) def report(self, function, *args): diff -r 0a3ef1b9f6d4 -r 6f9e1bb59719 tests/compare.py --- a/tests/compare.py Tue Dec 19 19:27:14 2006 -0500 +++ b/tests/compare.py Sun Dec 31 19:20:26 2006 -0500 @@ -19,46 +19,11 @@ import os import subprocess +import syck import sys from sets import Set as set -TESTSCRIPT_NAME = 'testscript.sh' -SCRIPT_PROLOGUE = """#!/bin/sh -set -e -""" - -tests = {'test-1.23.tar': ([], ['tar -xf $1'], []), - 'test-1.23.tar.gz': ([], ['tar -xzf $1'], []), - 'test-1.23.tar.bz2': ([], ['mkdir test-1.23', - 'cd test-1.23', - 'tar -jxf ../$1'], []), - 'test-1.23.zip': ([], ['mkdir test-1.23', - 'cd test-1.23', - 'unzip -q ../$1'], []), - 'test-1.23.cpio': ([], ['cpio -i --make-directories \ - <$1 2>/dev/null'], []), - 'test-1.23_all.deb': ([], ['TD=$PWD', - 'mkdir test-1.23', - 'cd /tmp', - 'ar x $TD/$1 data.tar.gz', - 'cd $TD/test-1.23', - 'tar -zxf /tmp/data.tar.gz', - 'rm /tmp/data.tar.gz'], []), - 'test-recursive-badperms.tar.bz2': ( - ['-r'], - ['mkdir test-recursive-badperms', - 'cd test-recursive-badperms', - 'tar -jxf ../$1', - 'mkdir test-badperms', - 'cd test-badperms', - 'tar -xf ../test-badperms.tar', - 'chmod 755 testdir'], - ['if [ "x`cat test-recursive-badperms/test-badperms/testdir/testfile`" = \ - "xhey" ]', - 'then exit 0; else exit 1; fi'] - )} - if os.path.exists('scripts/x') and os.path.exists('tests'): os.chdir('tests') elif os.path.exists('../scripts/x') and os.path.exists('../tests'): @@ -67,15 +32,26 @@ print "ERROR: Can't run tests in this directory!" sys.exit(2) +X_SCRIPT = os.path.realpath('../scripts/x') +ROOT_DIR = os.path.realpath(os.curdir) +OUTCOMES = ['error', 'failed', 'passed'] +TESTSCRIPT_NAME = 'testscript.sh' +SCRIPT_PROLOGUE = """#!/bin/sh +set -e +""" + class ExtractorTestError(Exception): pass class ExtractorTest(object): - def __init__(self, directory, archive_filename, info): - self.directory = directory - self.archive_filename = os.path.join(directory, archive_filename) - self.arguments, self.shell_commands, self.shell_test = info + def __init__(self, **kwargs): + for key in ('name', 'filename', 'baseline'): + setattr(self, key, kwargs[key]) + for key in ('directory', 'prerun', 'posttest'): + setattr(self, key, kwargs.get(key, None)) + for key in ('options',): + setattr(self, key, kwargs.get(key, '').split()) def get_results(self, commands): status = subprocess.call(commands) @@ -89,35 +65,52 @@ def write_script(self, commands): script = open(TESTSCRIPT_NAME, 'w') - script.write("%s%s\n" % (SCRIPT_PROLOGUE, '\n'.join(commands))) + script.write("%s%s\n" % (SCRIPT_PROLOGUE, commands)) script.close() subprocess.call(['chmod', 'u+w', TESTSCRIPT_NAME]) def get_shell_results(self): - self.write_script(self.shell_commands) - return self.get_results(['sh', TESTSCRIPT_NAME, self.archive_filename]) + self.write_script(self.baseline) + return self.get_results(['sh', TESTSCRIPT_NAME, self.filename]) def get_extractor_results(self): - script = os.path.join(self.directory, '../scripts/x') - return self.get_results([script] + self.arguments + - [self.archive_filename]) + if self.prerun: + self.write_script(self.prerun) + subprocess.call(['sh', TESTSCRIPT_NAME]) + return self.get_results([X_SCRIPT] + self.options + [self.filename]) def get_posttest_result(self): - if not self.shell_test: + if not self.posttest: return 0 - self.write_script(self.shell_test) + self.write_script(self.posttest) return subprocess.call(['sh', TESTSCRIPT_NAME]) def clean(self): - status = subprocess.call(['find', '-mindepth', '1', '-maxdepth', '1', - '-type', 'd', - '!', '-name', 'CVS', '!', '-name', '.svn', - '-exec', 'rm', '-rf', '{}', ';']) + if self.directory: + target = os.path.join(ROOT_DIR, self.directory) + extra_options = ['!', '-name', TESTSCRIPT_NAME] + else: + target = ROOT_DIR + extra_options = ['-type', 'd', + '!', '-name', 'CVS', + '!', '-name', '.svn'] + status = subprocess.call(['find', target, + '-mindepth', '1', '-maxdepth', '1'] + + extra_options + + ['-exec', 'rm', '-rf', '{}', ';']) if status != 0: raise ExtractorTestError("cleanup exited with status code %s" % (status,)) - def run(self): + def show_status(self, status, message=None): + if message is None: + last_part = '' + else: + last_part = ': ' + message + print "%7s: %s%s" % (status, self.name, last_part) + return status.lower() + + def compare_results(self): self.clean() expected = self.get_shell_results() self.clean() @@ -129,41 +122,48 @@ elif actual is None: raise ExtractorTestError("could not get extractor results") elif expected != actual: - print "FAILED:", self.archive_filename + result = self.show_status('FAILED') print "Only in baseline results:" print '\n'.join(expected.difference(actual)) print "Only in actual results:" print '\n'.join(actual.difference(expected)) - return False elif posttest_result != 0: - print "FAILED:", self.archive_filename + result = self.show_status('FAILED') print "Posttest returned status code", posttest_result - print actual - return False else: - print "Passed:", self.archive_filename - return True + result = self.show_status('Passed') + return result + + def run(self): + if self.directory: + os.mkdir(self.directory) + os.chdir(self.directory) + try: + result = self.compare_results() + except ExtractorTestError, error: + result = self.show_status('ERROR', error) + if self.directory: + os.chdir(ROOT_DIR) + subprocess.call(['rm', '-rf', self.directory]) + return result -def run_tests(directory, testnames): - successes = 0 - failures = 0 - for testname in testnames: - test = ExtractorTest(directory, testname, tests[testname]) - if test.run(): - successes += 1 - else: - failures += 1 - return successes, failures - -results = [] -testnames = tests.keys() -testnames.sort() -results.append(run_tests('.', testnames)) -os.mkdir('inside-dir') -os.chdir('inside-dir') -results.append(run_tests('..', testnames)) -os.chdir('..') -subprocess.call(['rm', '-rf', 'inside-dir']) -print "Totals: %s successes, %s failures" % \ - tuple([sum(total) for total in zip(*results)]) +test_db = open('tests.yml') +test_data = syck.load(test_db.read(-1)) +test_db.close() +tests = [ExtractorTest(**data) for data in test_data] +for original_data in test_data: + if original_data.has_key('directory'): + continue + data = original_data.copy() + data['name'] += ' in ..' + data['directory'] = 'inside-dir' + data['filename'] = os.path.join('..', data['filename']) + tests.append(ExtractorTest(**data)) +results = [test.run() for test in tests] +counts = {} +for outcome in OUTCOMES: + counts[outcome] = 0 +for result in results: + counts[result] += 1 +print " Totals:", ', '.join(["%s %s" % (counts[key], key) for key in OUTCOMES]) diff -r 0a3ef1b9f6d4 -r 6f9e1bb59719 tests/test-text.gz Binary file tests/test-text.gz has changed diff -r 0a3ef1b9f6d4 -r 6f9e1bb59719 tests/tests.yml --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/tests.yml Sun Dec 31 19:20:26 2006 -0500 @@ -0,0 +1,81 @@ +- name: basic .tar + filename: test-1.23.tar + baseline: | + tar -xf $1 + +- name: basic .tar.gz + filename: test-1.23.tar.gz + baseline: | + tar -zxf $1 + +- name: basic .tar.bz2 + filename: test-1.23.tar.bz2 + baseline: | + mkdir test-1.23 + cd test-1.23 + tar -jxf ../$1 + +- name: basic .zip + filename: test-1.23.zip + baseline: | + mkdir test-1.23 + cd test-1.23 + unzip -q ../$1 + +- name: basic .deb + filename: test-1.23_all.deb + baseline: | + TD=$PWD + mkdir test-1.23 + cd /tmp + ar x $TD/$1 data.tar.gz + cd $TD/test-1.23 + tar -zxf /tmp/data.tar.gz + rm /tmp/data.tar.gz + +- name: recursion and permissions + filename: test-recursive-badperms.tar.bz2 + options: -r + baseline: | + mkdir test-recursive-badperms + cd test-recursive-badperms + tar -jxf ../$1 + mkdir test-badperms + cd test-badperms + tar -xf ../test-badperms.tar + chmod 755 testdir + posttest: | + if [ "x`cat test-recursive-badperms/test-badperms/testdir/testfile`" = \ + "xhey" ]; then exit 0; else exit 1; fi + +- name: decompression + directory: inside-dir + filename: ../test-text.gz + baseline: | + zcat $1 >test-text + +- name: decompression with -r + directory: inside-dir + filename: ../test-text.gz + options: -r + baseline: | + zcat $1 >test-text + +- name: overwrite protection + filename: test-1.23.tar.bz2 + baseline: | + mkdir test-1.23 test-1.23.1 + cd test-1.23.1 + tar -jxf ../$1 + prerun: | + mkdir test-1.23 + +- name: overwrite option + filename: test-1.23.tar.bz2 + options: -o + baseline: | + mkdir test-1.23 + cd test-1.23 + tar -jxf ../$1 + prerun: | + mkdir test-1.23