scripts/dtrx

Sat, 10 Feb 2007 16:43:44 -0500

author
brett
date
Sat, 10 Feb 2007 16:43:44 -0500
branch
trunk
changeset 19
bb6e9f4af1a5
parent 18
scripts/x@1600807a32bd
child 20
69c93c3e6972
permissions
-rwxr-xr-x

[svn] Rename the program to dtrx.

Add a -l/-t option, which just lists the contents of an archive without
extracting anything.

#!/usr/bin/env python
#
# dtrx -- Intelligently extract various archive types.
# Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, 5th Floor, Boston, MA, 02111.

import errno
import logging
import mimetypes
import optparse
import os
import stat
import subprocess
import sys
import tempfile

from cStringIO import StringIO

VERSION = "4.0"
VERSION_BANNER = """dtrx version %s
Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>

This program is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.

This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
Public License for more details.""" % (VERSION,)

MATCHING_DIRECTORY = 1
# ONE_DIRECTORY = 2
BOMB = 3
EMPTY = 4
COMPRESSED = 5

mimetypes.encodings_map.setdefault('.bz2', 'bzip2')
mimetypes.types_map['.exe'] = 'application/x-msdos-program'

def run_command(command, description, stdout=None, stderr=None, stdin=None):
    process = subprocess.Popen(command, stdin=stdin, stdout=stdout,
                               stderr=stderr)
    status = process.wait()
    for pipe in (process.stdout, process.stderr):
        try:
            pipe.close()
        except AttributeError:
            pass
    if status != 0:
        return ("%s error: '%s' returned status code %s" %
                (description, ' '.join(command), status))
    return None

class FilenameChecker(object):
    def __init__(self, original_name):
        self.original_name = original_name

    def is_free(self, filename):
        return not os.path.exists(filename)

    def check(self):
        for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]:
            filename = '%s%s' % (self.original_name, suffix)
            if self.is_free(filename):
                return filename
        raise ValueError("all alternatives for name %s taken" %
                         (self.original_name,))
        

class DirectoryChecker(FilenameChecker):
    def is_free(self, filename):
        try:
            os.mkdir(filename)
        except OSError, error:
            if error.errno == errno.EEXIST:
                return False
            raise
        return True


class ExtractorError(Exception):
    pass


class ProcessStreamer(object):
    def __init__(self, command, stdin, description="checking contents",
                 stderr=None):
        self.process = subprocess.Popen(command, bufsize=1, stdin=stdin,
                                        stdout=subprocess.PIPE, stderr=stderr)
        self.command = ' '.join(command)
        self.description = description

    def __iter__(self):
        return self

    def next(self):
        line = self.process.stdout.readline()
        if line:
            return line.rstrip('\n')
        else:
            raise StopIteration

    def stop(self):
        while self.process.stdout.readline():
            pass
        self.process.stdout.close()
        status = self.process.wait()
        if status != 0:
            raise ExtractorError("%s error: '%s' returned status code %s" %
                                 (self.description, self.command, status))
        try:
            self.process.stderr.close()
        except AttributeError:
            pass
    

class BaseExtractor(object):
    decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'}

    name_checker = DirectoryChecker

    def __init__(self, filename, mimetype, encoding):
        if encoding and (not self.decoders.has_key(encoding)):
            raise ValueError("unrecognized encoding %s" % (encoding,))
        self.filename = os.path.realpath(filename)
        self.mimetype = mimetype
        self.encoding = encoding
        self.included_archives = []
        try:
            self.archive = open(filename, 'r')
        except (IOError, OSError), error:
            raise ExtractorError("could not open %s: %s" %
                                 (filename, error.strerror))
        if encoding:
            self.pipe([self.decoders[encoding]], "decoding")
        self.prepare()

    def run(self, command, description="extraction", stdout=None, stderr=None,
            stdin=None):
        error = run_command(command, description, stdout, stderr, stdin)
        if error:
            raise ExtractorError(error)

    def pipe(self, command, description, stderr=None):
        output = tempfile.TemporaryFile()
        self.run(command, description, output, stderr, self.archive)
        self.archive.close()
        self.archive = output
        self.archive.flush()
    
    def prepare(self):
        pass

    def check_contents(self):
        archive_type = None
        filenames = self.get_filenames()
        try:
            filename = filenames.next()
            if extractor_map.has_key(mimetypes.guess_type(filename)[0]):
                self.included_archives.append(filename)
            first_part = filename.split('/', 1)[0] + '/'
        except StopIteration:
            filenames.stop()
            return EMPTY
        for filename in filenames:
            if extractor_map.has_key(mimetypes.guess_type(filename)[0]):
                self.included_archives.append(filename)
            if (archive_type is None) and (not filename.startswith(first_part)):
                archive_type = BOMB
        filenames.stop()
        if archive_type:
            return archive_type
        if self.basename() == first_part[:-1]:
            return MATCHING_DIRECTORY
        return first_part

    def basename(self):
        pieces = os.path.basename(self.filename).split('.')
        extension = '.' + pieces[-1]
        if mimetypes.encodings_map.has_key(extension):
            pieces.pop()
            extension = '.' + pieces[-1]
        if (mimetypes.types_map.has_key(extension) or
            mimetypes.common_types.has_key(extension) or
            mimetypes.suffix_map.has_key(extension)):
            pieces.pop()
        return '.'.join(pieces)

    def extract(self, path):
        old_path = os.path.realpath(os.curdir)
        os.chdir(path)
        self.archive.seek(0, 0)
        self.extract_archive()
        os.chdir(old_path)
    

class TarExtractor(BaseExtractor):
    def get_filenames(self):
        self.archive.seek(0, 0)
        return ProcessStreamer(['tar', '-t'], self.archive)

    def extract_archive(self): 
        self.run(['tar', '-x'], stdin=self.archive)
        
        
class ZipExtractor(BaseExtractor):
    def __init__(self, filename, mimetype, encoding):
        self.filename = os.path.realpath(filename)
        self.mimetype = mimetype
        self.encoding = encoding
        self.included_archives = []
        self.archive = StringIO()

    def get_filenames(self):
        self.archive.seek(0, 0)
        return ProcessStreamer(['zipinfo', '-1', self.filename], None)

    def extract_archive(self):
        self.run(['unzip', '-q', self.filename])


class CpioExtractor(BaseExtractor):
    def get_filenames(self):
        self.archive.seek(0, 0)
        return ProcessStreamer(['cpio', '-t'], self.archive,
                               stderr=subprocess.PIPE)

    def extract_archive(self):
        self.run(['cpio', '-i', '--make-directories',
                  '--no-absolute-filenames'],
                 stderr=subprocess.PIPE, stdin=self.archive)


class RPMExtractor(CpioExtractor):
    def prepare(self):
        self.pipe(['rpm2cpio', '-'], "rpm2cpio")

    def basename(self):
        pieces = os.path.basename(self.filename).split('.')
        if len(pieces) == 1:
            return pieces[0]
        elif pieces[-1] != 'rpm':
            return BaseExtractor.basename(self)
        pieces.pop()
        if len(pieces) == 1:
            return pieces[0]
        elif len(pieces[-1]) < 8:
            pieces.pop()
        return '.'.join(pieces)

    def check_contents(self):
        CpioExtractor.check_contents(self)
        return BOMB


class DebExtractor(TarExtractor):
    def prepare(self):
        self.pipe(['ar', 'p', self.filename, 'data.tar.gz'],
                  "data.tar.gz extraction")
        self.archive.seek(0, 0)
        self.pipe(['zcat'], "data.tar.gz decompression")

    def basename(self):
        pieces = os.path.basename(self.filename).split('_')
        if len(pieces) == 1:
            return pieces[0]
        last_piece = pieces.pop()
        if (len(last_piece) > 10) or (not last_piece.endswith('.deb')):
            return BaseExtractor.basename(self)
        return '_'.join(pieces)

    def check_contents(self):
        TarExtractor.check_contents(self)
        return BOMB
        

class CompressionExtractor(BaseExtractor):
    name_checker = FilenameChecker

    def basename(self):
        pieces = os.path.basename(self.filename).split('.')
        extension = '.' + pieces[-1]
        if mimetypes.encodings_map.has_key(extension):
            pieces.pop()
        return '.'.join(pieces)

    def get_filenames(self):
        yield self.basename()

    def check_contents(self):
        return COMPRESSED

    def extract(self, path):
        output = open(path, 'w')
        self.run(['cat'], "output write", stdin=self.archive, stdout=output)
        output.close()
        

class BaseHandler(object):
    def __init__(self, extractor, contents, options):
        self.logger = logging.getLogger('dtrx-log')
        self.extractor = extractor
        self.contents = contents
        self.options = options
        self.target = None

    def extract(self):
        try:
            self.extractor.extract(self.target)
        except (ExtractorError, IOError, OSError), error:
            return str(error)
        
    def cleanup(self):
        if self.target is None:
            return
        command = 'find'
        status = subprocess.call(['find', self.target, '-type', 'd',
                                  '-exec', 'chmod', 'u+rwx', '{}', ';'])
        if status == 0:
            command = 'chmod'
            status = subprocess.call(['chmod', '-R', 'u+rw', self.target])
        if status != 0:
            return "%s returned with exit status %s" % (command, status)


# The "where to extract" table, with options and archive types.
# This dictates the contents of each can_handle method.
#
#         Flat           Overwrite            None
# File    basename       basename             FilenameChecked
# Match   .              .                    tempdir + checked
# Bomb    .              basename             DirectoryChecked

class FlatHandler(BaseHandler):
    def can_handle(contents, options):
        return ((options.flat and (contents != COMPRESSED)) or
                (options.overwrite and (contents == MATCHING_DIRECTORY)))
    can_handle = staticmethod(can_handle)

    def __init__(self, extractor, contents, options):
        BaseHandler.__init__(self, extractor, contents, options)
        self.target = '.'

    def cleanup(self):
        for filename in self.extractor.get_filenames():
            stat_info = os.stat(filename)
            perms = stat.S_IRUSR | stat.S_IWUSR
            if stat.S_ISDIR(stat_info.st_mode):
                perms |= stat.S_IXUSR
            os.chmod(filename, stat_info.st_mode | perms)


class OverwriteHandler(BaseHandler):
    def can_handle(contents, options):
        return ((options.flat and (contents == COMPRESSED)) or
                (options.overwrite and (contents != MATCHING_DIRECTORY)))
    can_handle = staticmethod(can_handle)

    def __init__(self, extractor, contents, options):
        BaseHandler.__init__(self, extractor, contents, options)
        self.target = self.extractor.basename()
        

class MatchHandler(BaseHandler):
    def can_handle(contents, options):
        return contents == MATCHING_DIRECTORY
    can_handle = staticmethod(can_handle)

    def extract(self):
        basename = self.extractor.basename()
        self.target = tempfile.mkdtemp(dir='.')
        result = BaseHandler.extract(self)
        if result is None:
            tempdir = self.target
            checker = self.extractor.name_checker(basename)
            self.target = checker.check()
            os.rename(os.path.join(tempdir, basename), self.target)
            os.rmdir(tempdir)
        return result


class EmptyHandler(object):
    def can_handle(contents, options):
        return contents == EMPTY
    can_handle = staticmethod(can_handle)

    def __init__(self, extractor, contents, options): pass
    def extract(self): pass
    def cleanup(self): pass


class BombHandler(BaseHandler):
    def can_handle(contents, options):
        return True
    can_handle = staticmethod(can_handle)

    def __init__(self, extractor, contents, options):
        BaseHandler.__init__(self, extractor, contents, options)
        checker = self.extractor.name_checker(self.extractor.basename())
        self.target = checker.check()

        
extractor_map = {'application/x-tar': TarExtractor,
                 'application/zip': ZipExtractor,
                 'application/x-msdos-program': ZipExtractor,
                 'application/x-debian-package': DebExtractor,
                 'application/x-redhat-package-manager': RPMExtractor,
                 'application/x-rpm': RPMExtractor,
                 'application/x-cpio': CpioExtractor}

handlers = [FlatHandler, OverwriteHandler, MatchHandler, EmptyHandler,
            BombHandler]

class ExtractorApplication(object):
    def __init__(self, arguments):
        self.parse_options(arguments)
        self.setup_logger()
        self.successes = []
        self.failures = []

    def parse_options(self, arguments):
        parser = optparse.OptionParser(
            usage="%prog [options] archive [archive2 ...]",
            description="Intelligent archive extractor",
            version=VERSION_BANNER
            )
        parser.add_option('-r', '--recursive', dest='recursive',
                          action='store_true', default=False,
                          help='extract archives contained in the ones listed')
        parser.add_option('-q', '--quiet', dest='quiet',
                          action='count', default=3,
                          help='suppress warning/error messages')
        parser.add_option('-v', '--verbose', dest='verbose',
                          action='count', default=0,
                          help='be verbose/print debugging information')
        parser.add_option('-o', '--overwrite', dest='overwrite',
                          action='store_true', default=False,
                          help='overwrite any existing target directory')
        parser.add_option('-f', '--flat', '--no-directory', dest='flat',
                          action='store_true', default=False,
                          help="don't put contents in their own directory")
        parser.add_option('-l', '-t', '--list', '--table', dest='show_list',
                          action='store_true', default=False,
                          help="list contents of archives on standard output")
##         parser.add_option('-n', '--noninteractive', dest='batch',
##                           action='store_true', default=False,
##                           help="don't ask how to handle special cases")
        self.options, filenames = parser.parse_args(arguments)
        if not filenames:
            parser.error("you did not list any archives")
        self.archives = {os.path.realpath(os.curdir): filenames}

    def setup_logger(self):
        self.logger = logging.getLogger('dtrx-log')
        handler = logging.StreamHandler()
        # WARNING is the default.
        handler.setLevel(10 * (self.options.quiet - self.options.verbose))
        formatter = logging.Formatter("dtrx: %(levelname)s: %(message)s")
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

    def get_extractor(self):
        mimetype, encoding = mimetypes.guess_type(self.current_filename)
        try:
            extractor = extractor_map[mimetype]
        except KeyError:
            if encoding:
                extractor = CompressionExtractor
                contents = COMPRESSED
            else:
                return "not a known archive type"
        try:
            self.current_extractor = extractor(self.current_filename, mimetype,
                                               encoding)
            content = self.current_extractor.check_contents()
            for handler in handlers:
                if handler.can_handle(content, self.options):
                    self.current_handler = handler(self.current_extractor,
                                                   content, self.options)
                    break
        except ExtractorError, error:
            return str(error)

    def recurse(self):
        if not self.options.recursive:
            return
        for filename in self.current_extractor.included_archives:
            tail_path, basename = os.path.split(filename)
            directory = os.path.join(self.current_directory,
                                     self.current_handler.target, tail_path)
            self.archives.setdefault(directory, []).append(basename)

    def report(self, function, *args):
        try:
            error = function(*args)
        except (ExtractorError, IOError, OSError), exception:
            error = str(exception)
        if error:
            self.logger.error("%s: %s", self.current_filename, error)
            return False
        return True

    def record_status(self, success):
        if success:
            self.successes.append(self.current_filename)
        else:
            self.failures.append(self.current_filename)

    def extract(self):
        while self.archives:
            self.current_directory, filenames = self.archives.popitem()
            for filename in filenames:
                os.chdir(self.current_directory)
                self.current_filename = filename
                success = self.report(self.get_extractor)
                if success:
                    for name in 'extract', 'cleanup':
                        success = (self.report(getattr(self.current_handler,
                                                       name)) and success)
                    self.recurse()
                self.record_status(success)

    def show_contents(self):
        for filename in self.current_extractor.get_filenames():
            print filename

    def show_list(self):
        filenames = self.archives.values()[0]
        if len(filenames) > 1:
            header = "%s:\n"
        else:
            header = None
        for filename in filenames:
            if header:
                print header % (filename,),
                header = "\n%s:\n"
            self.current_filename = filename
            success = (self.report(self.get_extractor) and
                       self.report(self.show_contents))
            self.record_status(success)

    def run(self):
        if self.options.show_list:
            self.show_list()
        else:
            self.extract()
        if self.failures:
            return 1
        return 0


if __name__ == '__main__':
    app = ExtractorApplication(sys.argv[1:])
    sys.exit(app.run())

mercurial