[svn] Add support for just decompressing files that are compressed. So, if you trunk

Sun, 31 Dec 2006 19:20:26 -0500

author
brett
date
Sun, 31 Dec 2006 19:20:26 -0500
branch
trunk
changeset 14
6f9e1bb59719
parent 13
0a3ef1b9f6d4
child 15
28dbd52a8bb8

[svn] Add support for just decompressing files that are compressed. So, if you
have foobar.txt.gz, running x on it will give you foobar.txt.

I also moved the test definitions to a YAML file, which makes them easier
to write and understand. Hopefully.

scripts/x file | annotate | diff | comparison | revisions
tests/compare.py file | annotate | diff | comparison | revisions
tests/test-text.gz file | annotate | diff | comparison | revisions
tests/tests.yml file | annotate | diff | comparison | revisions
--- a/scripts/x	Tue Dec 19 19:27:14 2006 -0500
+++ b/scripts/x	Sun Dec 31 19:20:26 2006 -0500
@@ -28,7 +28,7 @@
 
 from cStringIO import StringIO
 
-VERSION = "2.0"
+VERSION = "2.1"
 VERSION_BANNER = """x version %s
 Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>
 
@@ -46,10 +46,57 @@
 # ONE_DIRECTORY = 2
 BOMB = 3
 EMPTY = 4
+DECOMPRESSED = 5
 
 mimetypes.encodings_map.setdefault('.bz2', 'bzip2')
 mimetypes.types_map['.exe'] = 'application/x-msdos-program'
 
+def run_command(command, description, stdout=None, stderr=None, stdin=None):
+    process = subprocess.Popen(command, stdin=stdin, stdout=stdout,
+                               stderr=stderr)
+    status = process.wait()
+    for pipe in (process.stdout, process.stderr):
+        try:
+            pipe.close()
+        except AttributeError:
+            pass
+    if status != 0:
+        return ("%s error: '%s' returned status code %s" %
+                (description, ' '.join(command), status))
+    return None
+
+class FilenameChecker(object):
+    def __init__(self, original_name):
+        self.original_name = original_name
+
+    def is_free(self, filename=None):
+        if filename is None:
+            filename = self.original_name
+        return self._is_free(filename)
+
+    def _is_free(self, filename):
+        return not os.path.exists(filename)
+
+    def check(self):
+        for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]:
+            filename = '%s%s' % (self.original_name, suffix)
+            if self.is_free(filename):
+                return filename
+        raise ValueError("all alternatives for name %s taken" %
+                         (self.original_name,))
+        
+
+class DirectoryChecker(FilenameChecker):
+    def _is_free(self, filename):
+        try:
+            os.mkdir(filename)
+        except OSError, error:
+            if error.errno == errno.EEXIST:
+                return False
+            raise
+        return True
+
+
 class ExtractorError(Exception):
     pass
 
@@ -89,10 +136,12 @@
 class BaseExtractor(object):
     decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'}
 
+    name_checker = DirectoryChecker
+
     def __init__(self, filename, mimetype, encoding):
         if encoding and (not self.decoders.has_key(encoding)):
             raise ValueError("unrecognized encoding %s" % (encoding,))
-        self.filename = filename
+        self.filename = os.path.realpath(filename)
         self.mimetype = mimetype
         self.encoding = encoding
         self.included_archives = []
@@ -107,17 +156,9 @@
 
     def run(self, command, description="extraction", stdout=None, stderr=None,
             stdin=None):
-        process = subprocess.Popen(command, stdin=stdin, stdout=stdout,
-                                   stderr=stderr)
-        status = process.wait()
-        if status != 0:
-            raise ExtractorError("%s error: '%s' returned status code %s" %
-                                 (description, ' '.join(command), status))
-        for pipe in (process.stdout, process.stderr):
-            try:
-                pipe.close()
-            except AttributeError:
-                pass
+        error = run_command(command, description, stdout, stderr, stdin)
+        if error:
+            raise ExtractorError(error)
 
     def pipe(self, command, description, stderr=None):
         output = tempfile.TemporaryFile()
@@ -166,21 +207,24 @@
         return '.'.join(pieces)
 
     def extract(self, path):
+        old_path = os.path.realpath(os.curdir)
+        os.chdir(path)
         self.archive.seek(0, 0)
         self.extract_archive()
+        os.chdir(old_path)
     
 
 class TarExtractor(BaseExtractor):
     def get_filenames(self):
         return ProcessStreamer(['tar', '-t'], self.archive)
 
-    def extract_archive(self):
+    def extract_archive(self): 
         self.run(['tar', '-x'], stdin=self.archive)
         
         
 class ZipExtractor(BaseExtractor):
     def __init__(self, filename, mimetype, encoding):
-        self.filename = filename
+        self.filename = os.path.realpath(filename)
         self.mimetype = mimetype
         self.encoding = encoding
         self.included_archives = []
@@ -189,8 +233,8 @@
     def get_filenames(self):
         return ProcessStreamer(['zipinfo', '-1', self.filename], None)
 
-    def extract(self, path):
-        self.run(['unzip', '-q', os.path.join(path, self.filename)])
+    def extract_archive(self):
+        self.run(['unzip', '-q', self.filename])
 
 
 class CpioExtractor(BaseExtractor):
@@ -247,64 +291,90 @@
         return BOMB
         
 
-class MatchHandler(object):
-    def __init__(self, extractor, contents):
+class CompressionExtractor(BaseExtractor):
+    name_checker = FilenameChecker
+
+    def basename(self):
+        pieces = os.path.basename(self.filename).split('.')
+        extension = '.' + pieces[-1]
+        if mimetypes.encodings_map.has_key(extension):
+            pieces.pop()
+        return '.'.join(pieces)
+
+    def suggest_target(self):
+        return FilenameChecker().check(self.basename())
+
+    def check_contents(self):
+        return self.basename()
+
+    def extract(self, path):
+        output = open(path, 'w')
+        self.run(['cat'], "output write", stdin=self.archive, stdout=output)
+        output.close()
+        
+
+class BaseHandler(object):
+    def __init__(self, extractor, contents, options):
         self.logger = logging.getLogger('x-log')
         self.extractor = extractor
         self.contents = contents
-        self.directory = extractor.basename()
+        self.options = options
 
-    def extract(self, directory='.'):
+    def extract(self):
+        checker = self.extractor.name_checker(self.extractor.basename())
+        if self.options.overwrite or checker.is_free():
+            self.target = self.extractor.basename()
+            self.overwrite()
+        else:
+            self.target = checker.check()
+            self.safe_extract()
+
+    def do_extract(self, directory):
         try:
             self.extractor.extract(directory)
         except ExtractorError, error:
             return error.strerror
-
+        
     def cleanup(self):
-        command = 'chmod'
-        status = subprocess.call(['chmod', '-R', 'u+rw', self.directory])
+        command = 'find'
+        status = subprocess.call(['find', self.target, '-type', 'd',
+                                  '-exec', 'chmod', 'u+rwx', '{}', ';'])
         if status == 0:
-            command = 'find'
-            status = subprocess.call(['find', self.directory, '-type', 'd',
-                                      '-exec', 'chmod', 'u+x', '{}', ';'])
+            command = 'chmod'
+            status = subprocess.call(['chmod', '-R', 'u+rw', self.target])
         if status != 0:
             return "%s returned with exit status %s" % (command, status)
+
+
+class MatchHandler(BaseHandler):
+    def overwrite(self):
+        return self.do_extract('.')
+
+    def safe_extract(self):
+        tempdir = tempfile.mkdtemp()
+        result = self.do_extract(tempdir)
+        if result is None:
+            os.rename(os.path.join(tempdir, self.extractor.basename()),
+                      self.target)
+            os.rmdir(tempdir)
+        return result
         
         
-class BombHandler(MatchHandler):
-    def __init__(self, extractor, contents):
-        MatchHandler.__init__(self, extractor, contents)
-        basename = self.directory
-        for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]:
-            self.directory = '%s%s' % (basename, suffix)
-            try:
-                os.mkdir(self.directory)
-            except OSError, error:
-                if error.errno == errno.EEXIST:
-                    continue
-                raise ValueError("could not make extraction directory %s: %s" %
-                                 (error.filename, error.strerror))
-            if suffix != '':
-                self.logger.warning("%s: extracted to %s",
-                                    extractor.filename, self.directory)
-            break
-        else:
-            raise ValueError("all good names for an extraction directory taken")
+class BombHandler(BaseHandler):
+    def safe_extract(self):
+        return self.do_extract(self.target)
 
-    def extract(self):
-        os.chdir(self.directory)
-        return MatchHandler.extract(self, '..')
+    def overwrite(self):
+        self.target = self.extractor.basename()
+        return self.do_extract(self.target)
 
-    def cleanup(self):
-        os.chdir('..')
-        return MatchHandler.cleanup(self)
-        
 
 class EmptyHandler(object):
-    def __init__(self, extractor, contents): pass
+    def __init__(self, extractor, contents, options): pass
     def extract(self): pass
     def cleanup(self): pass
 
+
 extractor_map = {'application/x-tar': TarExtractor,
                  'application/zip': ZipExtractor,
                  'application/x-msdos-program': ZipExtractor,
@@ -314,7 +384,8 @@
                  'application/x-cpio': CpioExtractor}
 
 handler_map = {EMPTY: EmptyHandler,
-               MATCHING_DIRECTORY: MatchHandler}
+               MATCHING_DIRECTORY: MatchHandler,
+               DECOMPRESSED: BombHandler}
 
 class ExtractorApplication(object):
     def __init__(self, arguments):
@@ -338,6 +409,15 @@
         parser.add_option('-v', '--verbose', dest='verbose',
                           action='count', default=0,
                           help='be verbose/print debugging information')
+        parser.add_option('-o', '--overwrite', dest='overwrite',
+                          action='store_true', default=False,
+                          help='overwrite any existing target directory')
+##         parser.add_option('-f', '--flat', '--no-directory', dest='flat',
+##                           action='store_true', default=False,
+##                           help="don't put contents in their own directory")
+##         parser.add_option('-n', '--noninteractive', dest='batch',
+##                           action='store_true', default=False,
+##                           help="don't ask how to handle special cases")
         self.options, filenames = parser.parse_args(arguments)
         if not filenames:
             parser.error("you did not list any archives")
@@ -357,13 +437,17 @@
         try:
             extractor = extractor_map[mimetype]
         except KeyError:
-            return "not a known archive type"
+            if encoding:
+                extractor = CompressionExtractor
+            else:
+                return "not a known archive type"
         try:
             self.current_extractor = extractor(self.current_filename, mimetype,
                                                encoding)
             content = self.current_extractor.check_contents()
             handler = handler_map.get(content, BombHandler)
-            self.current_handler = handler(self.current_extractor, content)
+            self.current_handler = handler(self.current_extractor, content,
+                                           self.options)
         except ExtractorError, error:
             return str(error)
 
@@ -373,7 +457,7 @@
         for filename in self.current_extractor.included_archives:
             tail_path, basename = os.path.split(filename)
             directory = os.path.join(self.current_directory,
-                                     self.current_handler.directory, tail_path)
+                                     self.current_handler.target, tail_path)
             self.archives.setdefault(directory, []).append(basename)
 
     def report(self, function, *args):
--- a/tests/compare.py	Tue Dec 19 19:27:14 2006 -0500
+++ b/tests/compare.py	Sun Dec 31 19:20:26 2006 -0500
@@ -19,46 +19,11 @@
 
 import os
 import subprocess
+import syck
 import sys
 
 from sets import Set as set
 
-TESTSCRIPT_NAME = 'testscript.sh'
-SCRIPT_PROLOGUE = """#!/bin/sh
-set -e
-"""
-
-tests = {'test-1.23.tar': ([], ['tar -xf $1'], []),
-         'test-1.23.tar.gz': ([], ['tar -xzf $1'], []),
-         'test-1.23.tar.bz2': ([], ['mkdir test-1.23',
-                                    'cd test-1.23',
-                                    'tar -jxf ../$1'], []),
-         'test-1.23.zip': ([], ['mkdir test-1.23',
-                                'cd test-1.23',
-                                'unzip -q ../$1'], []),
-         'test-1.23.cpio': ([], ['cpio -i --make-directories \
-                                  <$1 2>/dev/null'], []),
-         'test-1.23_all.deb': ([], ['TD=$PWD',
-                                    'mkdir test-1.23',
-                                    'cd /tmp',
-                                    'ar x $TD/$1 data.tar.gz',
-                                    'cd $TD/test-1.23',
-                                    'tar -zxf /tmp/data.tar.gz',
-                                    'rm /tmp/data.tar.gz'], []),
-         'test-recursive-badperms.tar.bz2': (
-    ['-r'],
-    ['mkdir test-recursive-badperms',
-     'cd test-recursive-badperms',
-     'tar -jxf ../$1',
-     'mkdir test-badperms',
-     'cd test-badperms',
-     'tar -xf ../test-badperms.tar',
-     'chmod 755 testdir'],
-    ['if [ "x`cat test-recursive-badperms/test-badperms/testdir/testfile`" = \
-      "xhey" ]',
-     'then exit 0; else exit 1; fi']
-    )}
-
 if os.path.exists('scripts/x') and os.path.exists('tests'):
     os.chdir('tests')
 elif os.path.exists('../scripts/x') and os.path.exists('../tests'):
@@ -67,15 +32,26 @@
     print "ERROR: Can't run tests in this directory!"
     sys.exit(2)
 
+X_SCRIPT = os.path.realpath('../scripts/x')
+ROOT_DIR = os.path.realpath(os.curdir)
+OUTCOMES = ['error', 'failed', 'passed']
+TESTSCRIPT_NAME = 'testscript.sh'
+SCRIPT_PROLOGUE = """#!/bin/sh
+set -e
+"""
+
 class ExtractorTestError(Exception):
     pass
 
 
 class ExtractorTest(object):
-    def __init__(self, directory, archive_filename, info):
-        self.directory = directory
-        self.archive_filename = os.path.join(directory, archive_filename)
-        self.arguments, self.shell_commands, self.shell_test = info
+    def __init__(self, **kwargs):
+        for key in ('name', 'filename', 'baseline'):
+            setattr(self, key, kwargs[key])
+        for key in ('directory', 'prerun', 'posttest'):
+            setattr(self, key, kwargs.get(key, None))
+        for key in ('options',):
+            setattr(self, key, kwargs.get(key, '').split())
         
     def get_results(self, commands):
         status = subprocess.call(commands)
@@ -89,35 +65,52 @@
         
     def write_script(self, commands):
         script = open(TESTSCRIPT_NAME, 'w')
-        script.write("%s%s\n" % (SCRIPT_PROLOGUE, '\n'.join(commands)))
+        script.write("%s%s\n" % (SCRIPT_PROLOGUE, commands))
         script.close()
         subprocess.call(['chmod', 'u+w', TESTSCRIPT_NAME])
 
     def get_shell_results(self):
-        self.write_script(self.shell_commands)
-        return self.get_results(['sh', TESTSCRIPT_NAME, self.archive_filename])
+        self.write_script(self.baseline)
+        return self.get_results(['sh', TESTSCRIPT_NAME, self.filename])
 
     def get_extractor_results(self):
-        script = os.path.join(self.directory, '../scripts/x')
-        return self.get_results([script] + self.arguments +
-                                [self.archive_filename])
+        if self.prerun:
+            self.write_script(self.prerun)
+            subprocess.call(['sh', TESTSCRIPT_NAME])
+        return self.get_results([X_SCRIPT] + self.options + [self.filename])
         
     def get_posttest_result(self):
-        if not self.shell_test:
+        if not self.posttest:
             return 0
-        self.write_script(self.shell_test)
+        self.write_script(self.posttest)
         return subprocess.call(['sh', TESTSCRIPT_NAME])
 
     def clean(self):
-        status = subprocess.call(['find', '-mindepth', '1', '-maxdepth', '1',
-                                  '-type', 'd',
-                                  '!', '-name', 'CVS', '!', '-name', '.svn',
-                                  '-exec', 'rm', '-rf', '{}', ';'])
+        if self.directory:
+            target = os.path.join(ROOT_DIR, self.directory)
+            extra_options = ['!', '-name', TESTSCRIPT_NAME]
+        else:
+            target = ROOT_DIR
+            extra_options = ['-type', 'd',
+                             '!', '-name', 'CVS',
+                             '!', '-name', '.svn']
+        status = subprocess.call(['find', target,
+                                  '-mindepth', '1', '-maxdepth', '1'] +
+                                 extra_options +
+                                 ['-exec', 'rm', '-rf', '{}', ';'])
         if status != 0:
             raise ExtractorTestError("cleanup exited with status code %s" %
                                      (status,))
 
-    def run(self):
+    def show_status(self, status, message=None):
+        if message is None:
+            last_part = ''
+        else:
+            last_part = ': ' + message
+        print "%7s: %s%s" % (status, self.name, last_part)
+        return status.lower()
+
+    def compare_results(self):
         self.clean()
         expected = self.get_shell_results()
         self.clean()
@@ -129,41 +122,48 @@
         elif actual is None:
             raise ExtractorTestError("could not get extractor results")
         elif expected != actual:
-            print "FAILED:", self.archive_filename
+            result = self.show_status('FAILED')
             print "Only in baseline results:"
             print '\n'.join(expected.difference(actual))
             print "Only in actual results:"
             print '\n'.join(actual.difference(expected))
-            return False
         elif posttest_result != 0:
-            print "FAILED:", self.archive_filename
+            result = self.show_status('FAILED')
             print "Posttest returned status code", posttest_result
-            print actual
-            return False
         else:
-            print "Passed:", self.archive_filename
-            return True
+            result = self.show_status('Passed')
+        return result
+    
+    def run(self):
+        if self.directory:
+            os.mkdir(self.directory)
+            os.chdir(self.directory)
+        try:
+            result = self.compare_results()
+        except ExtractorTestError, error:
+            result = self.show_status('ERROR', error)
+        if self.directory:
+            os.chdir(ROOT_DIR)
+            subprocess.call(['rm', '-rf', self.directory])
+        return result
 
 
-def run_tests(directory, testnames):
-    successes = 0
-    failures = 0
-    for testname in testnames:
-        test = ExtractorTest(directory, testname, tests[testname])
-        if test.run():
-            successes += 1
-        else:
-            failures += 1
-    return successes, failures
-    
-results = []
-testnames = tests.keys()
-testnames.sort()
-results.append(run_tests('.', testnames))
-os.mkdir('inside-dir')
-os.chdir('inside-dir')
-results.append(run_tests('..', testnames))
-os.chdir('..')
-subprocess.call(['rm', '-rf', 'inside-dir'])
-print "Totals: %s successes, %s failures" % \
-      tuple([sum(total) for total in zip(*results)])
+test_db = open('tests.yml')
+test_data = syck.load(test_db.read(-1))
+test_db.close()
+tests = [ExtractorTest(**data) for data in test_data]
+for original_data in test_data:
+    if original_data.has_key('directory'):
+        continue
+    data = original_data.copy()
+    data['name'] += ' in ..'
+    data['directory'] = 'inside-dir'
+    data['filename'] = os.path.join('..', data['filename'])
+    tests.append(ExtractorTest(**data))
+results = [test.run() for test in tests]
+counts = {}
+for outcome in OUTCOMES:
+    counts[outcome] = 0
+for result in results:
+    counts[result] += 1
+print " Totals:", ', '.join(["%s %s" % (counts[key], key) for key in OUTCOMES])
Binary file tests/test-text.gz has changed
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/tests.yml	Sun Dec 31 19:20:26 2006 -0500
@@ -0,0 +1,81 @@
+- name: basic .tar
+  filename: test-1.23.tar
+  baseline: |
+    tar -xf $1
+
+- name: basic .tar.gz
+  filename: test-1.23.tar.gz
+  baseline: |
+    tar -zxf $1
+
+- name: basic .tar.bz2
+  filename: test-1.23.tar.bz2
+  baseline: |
+    mkdir test-1.23
+    cd test-1.23
+    tar -jxf ../$1
+
+- name: basic .zip
+  filename: test-1.23.zip
+  baseline: |
+    mkdir test-1.23
+    cd test-1.23
+    unzip -q ../$1
+
+- name: basic .deb
+  filename: test-1.23_all.deb
+  baseline: |
+    TD=$PWD
+    mkdir test-1.23
+    cd /tmp
+    ar x $TD/$1 data.tar.gz
+    cd $TD/test-1.23
+    tar -zxf /tmp/data.tar.gz
+    rm /tmp/data.tar.gz
+
+- name: recursion and permissions
+  filename: test-recursive-badperms.tar.bz2
+  options: -r
+  baseline: |
+    mkdir test-recursive-badperms
+    cd test-recursive-badperms
+    tar -jxf ../$1
+    mkdir test-badperms
+    cd test-badperms
+    tar -xf ../test-badperms.tar
+    chmod 755 testdir
+  posttest: |
+    if [ "x`cat test-recursive-badperms/test-badperms/testdir/testfile`" = \
+         "xhey" ]; then exit 0; else exit 1; fi
+
+- name: decompression
+  directory: inside-dir
+  filename: ../test-text.gz
+  baseline: |
+    zcat $1 >test-text
+
+- name: decompression with -r
+  directory: inside-dir
+  filename: ../test-text.gz
+  options: -r
+  baseline: |
+    zcat $1 >test-text
+
+- name: overwrite protection
+  filename: test-1.23.tar.bz2
+  baseline: |
+    mkdir test-1.23 test-1.23.1
+    cd test-1.23.1
+    tar -jxf ../$1
+  prerun: |
+    mkdir test-1.23
+
+- name: overwrite option
+  filename: test-1.23.tar.bz2
+  options: -o
+  baseline: |
+    mkdir test-1.23
+    cd test-1.23
+    tar -jxf ../$1
+  prerun: |
+    mkdir test-1.23 

mercurial