scripts/dtrx

Fri, 19 Oct 2007 23:03:17 -0400

author
brett
date
Fri, 19 Oct 2007 23:03:17 -0400
branch
trunk
changeset 29
5fad99c17221
parent 28
4d88f2231d33
child 30
1015bbd6dc5e
permissions
-rwxr-xr-x

[svn] Add support for Ruby Gems, and extracting metadata from .deb/.gem files.

#!/usr/bin/env python
#
# dtrx -- Intelligently extract various archive types.
# Copyright (c) 2006, 2007 Brett Smith <brettcsmith@brettcsmith.org>.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, 5th Floor, Boston, MA, 02111.

import errno
import logging
import mimetypes
import optparse
import os
import stat
import subprocess
import sys
import tempfile
import textwrap
import traceback

from cStringIO import StringIO

VERSION = "5.0"
VERSION_BANNER = """dtrx version %s
Copyright (c) 2006, 2007 Brett Smith <brettcsmith@brettcsmith.org>

This program is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 3 of the License, or (at your
option) any later version.

This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
Public License for more details.""" % (VERSION,)

MATCHING_DIRECTORY = 1
ONE_ENTRY = 2
BOMB = 3
EMPTY = 4
ONE_ENTRY_KNOWN = 5

EXTRACT_HERE = 1
EXTRACT_WRAP = 2
EXTRACT_RENAME = 3

RECURSE_ALWAYS = 1
RECURSE_ONCE = 2
RECURSE_NOT_NOW = 3
RECURSE_NEVER = 4

mimetypes.encodings_map.setdefault('.bz2', 'bzip2')
mimetypes.types_map['.exe'] = 'application/x-msdos-program'

def run_command(command, description, stdout=None, stderr=None, stdin=None):
    process = subprocess.Popen(command, stdin=stdin, stdout=stdout,
                               stderr=stderr)
    status = process.wait()
    for pipe in (process.stdout, process.stderr):
        try:
            pipe.close()
        except AttributeError:
            pass
    if status != 0:
        return ("%s error: '%s' returned status code %s" %
                (description, ' '.join(command), status))
    return None

class FilenameChecker(object):
    def __init__(self, original_name):
        self.original_name = original_name

    def is_free(self, filename):
        return not os.path.exists(filename)

    def check(self):
        for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]:
            filename = '%s%s' % (self.original_name, suffix)
            if self.is_free(filename):
                return filename
        raise ValueError("all alternatives for name %s taken" %
                         (self.original_name,))
        

class DirectoryChecker(FilenameChecker):
    def is_free(self, filename):
        try:
            os.mkdir(filename)
        except OSError, error:
            if error.errno == errno.EEXIST:
                return False
            raise
        return True


class ExtractorError(Exception):
    pass


class BaseExtractor(object):
    decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'}

    name_checker = DirectoryChecker

    def __init__(self, filename, encoding):
        if encoding and (not self.decoders.has_key(encoding)):
            raise ValueError("unrecognized encoding %s" % (encoding,))
        self.filename = os.path.realpath(filename)
        self.encoding = encoding
        self.included_archives = []
        self.target = None
        self.content_type = None
        self.content_name = None
        self.pipes = []
        try:
            self.archive = open(filename, 'r')
        except (IOError, OSError), error:
            raise ExtractorError("could not open %s: %s" %
                                 (filename, error.strerror))
        if encoding:
            self.pipe([self.decoders[encoding]], "decoding")
        self.prepare()

    def pipe(self, command, description="extraction"):
        self.pipes.append((command, description))

    def run_pipes(self, final_stdout=None):
        if final_stdout is None:
            # FIXME: Buffering this might be dumb.
            final_stdout = tempfile.TemporaryFile()
        if not self.pipes:
            return
        num_pipes = len(self.pipes)
        last_pipe = num_pipes - 1
        processes = []
        for index, command in enumerate([pipe[0] for pipe in self.pipes]):
            if index == 0:
                stdin = self.archive
            else:
                stdin = processes[-1].stdout
            if index == last_pipe:
                stdout = final_stdout
            else:
                stdout = subprocess.PIPE
            processes.append(subprocess.Popen(command, stdin=stdin,
                                              stdout=stdout,
                                              stderr=subprocess.PIPE))
        exit_codes = [pipe.wait() for pipe in processes]
        self.archive.close()
        for index in range(last_pipe):
            processes[index].stdout.close()
            processes[index].stderr.close()
        for index, status in enumerate(exit_codes):
            if status != 0:
                raise ExtractorError("%s error: '%s' returned status code %s" %
                                     (self.pipes[index][1],
                                      ' '.join(self.pipes[index][0]), status))
        self.archive = final_stdout
    
    def prepare(self):
        pass

    def check_included_archives(self, filenames):
        for filename in filenames:
            if extractor_map.has_key(mimetypes.guess_type(filename)[0]):
                self.included_archives.append(filename)

    def check_contents(self):
        filenames = os.listdir('.')
        if not filenames:
            self.content_type = EMPTY
        elif len(filenames) == 1:
            if self.basename() == filenames[0]:
                self.content_type = MATCHING_DIRECTORY
            else:
                self.content_type = ONE_ENTRY
            self.content_name = filenames[0]
            if os.path.isdir(filenames[0]):
                self.content_name += '/'
        else:
            self.content_type = BOMB
        self.check_included_archives(filenames)

    def basename(self):
        pieces = os.path.basename(self.filename).split('.')
        extension = '.' + pieces[-1]
        if mimetypes.encodings_map.has_key(extension):
            pieces.pop()
            extension = '.' + pieces[-1]
        if (mimetypes.types_map.has_key(extension) or
            mimetypes.common_types.has_key(extension) or
            mimetypes.suffix_map.has_key(extension)):
            pieces.pop()
        return '.'.join(pieces)

    def extract(self):
        self.target = tempfile.mkdtemp(prefix='.dtrx-', dir='.')
        old_path = os.path.realpath(os.curdir)
        os.chdir(self.target)
        self.archive.seek(0, 0)
        self.extract_archive()
        self.check_contents()
        os.chdir(old_path)

    def get_filenames(self):
        self.run_pipes()
        self.archive.seek(0, 0)
        while True:
            line = self.archive.readline()
            if not line:
                self.archive.close()
                return
            yield line.rstrip('\n')
    

class CompressionExtractor(BaseExtractor):
    name_checker = FilenameChecker

    def basename(self):
        pieces = os.path.basename(self.filename).split('.')
        extension = '.' + pieces[-1]
        if mimetypes.encodings_map.has_key(extension):
            pieces.pop()
        return '.'.join(pieces)

    def get_filenames(self):
        yield self.basename()

    def extract(self):
        self.content_type = ONE_ENTRY_KNOWN
        self.content_name = self.basename()
        output_fd, self.target = tempfile.mkstemp(prefix='.dtrx-', dir='.')
        self.run_pipes(output_fd)
        os.close(output_fd)
        

class TarExtractor(BaseExtractor):
    def get_filenames(self):
        self.pipe(['tar', '-t'], "listing")
        return BaseExtractor.get_filenames(self)

    def extract_archive(self): 
        self.pipe(['tar', '-x'])
        self.run_pipes()
        
        
class ZipExtractor(BaseExtractor):
    def __init__(self, filename, encoding):
        BaseExtractor.__init__(self, '/dev/null', None)
        self.filename = os.path.realpath(filename)

    def get_filenames(self):
        self.pipe(['zipinfo', '-1', self.filename], "listing")
        return BaseExtractor.get_filenames(self)

    def extract_archive(self):
        self.pipe(['unzip', '-q', self.filename])
        self.run_pipes()


class CpioExtractor(BaseExtractor):
    def get_filenames(self):
        self.pipe(['cpio', '-t'], "listing")
        return BaseExtractor.get_filenames(self)

    def extract_archive(self):
        self.pipe(['cpio', '-i', '--make-directories',
                   '--no-absolute-filenames'])
        self.run_pipes()


class RPMExtractor(CpioExtractor):
    def prepare(self):
        self.pipe(['rpm2cpio', '-'], "rpm2cpio")

    def basename(self):
        pieces = os.path.basename(self.filename).split('.')
        if len(pieces) == 1:
            return pieces[0]
        elif pieces[-1] != 'rpm':
            return BaseExtractor.basename(self)
        pieces.pop()
        if len(pieces) == 1:
            return pieces[0]
        elif len(pieces[-1]) < 8:
            pieces.pop()
        return '.'.join(pieces)

    def check_contents(self):
        self.check_included_archives(os.listdir('.'))
        self.content_type = BOMB


class DebExtractor(TarExtractor):
    def prepare(self):
        self.pipe(['ar', 'p', self.filename, 'data.tar.gz'],
                  "data.tar.gz extraction")
        self.pipe(['zcat'], "data.tar.gz decompression")

    def basename(self):
        pieces = os.path.basename(self.filename).split('_')
        if len(pieces) == 1:
            return pieces[0]
        last_piece = pieces.pop()
        if (len(last_piece) > 10) or (not last_piece.endswith('.deb')):
            return BaseExtractor.basename(self)
        return '_'.join(pieces)

    def check_contents(self):
        self.check_included_archives(os.listdir('.'))
        self.content_type = BOMB


class DebMetadataExtractor(DebExtractor):
    def prepare(self):
        self.pipe(['ar', 'p', self.filename, 'control.tar.gz'],
                  "control.tar.gz extraction")
        self.pipe(['zcat'], "control.tar.gz decompression")


class GemExtractor(TarExtractor):
    def prepare(self):
        self.pipe(['tar', '-xO', 'data.tar.gz'], "data.tar.gz extraction")
        self.pipe(['zcat'], "data.tar.gz decompression")

    def check_contents(self):
        self.check_included_archives(os.listdir('.'))
        self.content_type = BOMB


class GemMetadataExtractor(CompressionExtractor):
    def prepare(self):
        self.pipe(['tar', '-xO', 'metadata.gz'], "metadata.gz extraction")
        self.pipe(['zcat'], "metadata.gz decompression")

    def basename(self):
        return os.path.basename(self.filename) + '-metadata.txt'


class BaseHandler(object):
    def __init__(self, extractor, options):
        self.logger = logging.getLogger('dtrx-log')
        self.extractor = extractor
        self.options = options
        self.target = None

    def handle(self):
        command = 'find'
        status = subprocess.call(['find', self.extractor.target, '-type', 'd',
                                  '-exec', 'chmod', 'u+rwx', '{}', ';'])
        if status == 0:
            command = 'chmod'
            status = subprocess.call(['chmod', '-R', 'u+rwX',
                                      self.extractor.target])
        if status != 0:
            return "%s returned with exit status %s" % (command, status)
        return self.organize()


# The "where to extract" table, with options and archive types.
# This dictates the contents of each can_handle method.
#
#         Flat           Overwrite            None
# File    basename       basename             FilenameChecked
# Match   .              .                    tempdir + checked
# Bomb    .              basename             DirectoryChecked

class FlatHandler(BaseHandler):
    def can_handle(contents, options):
        return ((options.flat and (contents != ONE_ENTRY_KNOWN)) or
                (options.overwrite and (contents == MATCHING_DIRECTORY)))
    can_handle = staticmethod(can_handle)

    def organize(self):
        self.target = '.'
        for curdir, dirs, filenames in os.walk(self.extractor.target,
                                               topdown=False):
            path_parts = curdir.split(os.sep)
            if path_parts[0] == '.':
                path_parts.pop(1)
            else:
                path_parts.pop(0)
            newdir = os.path.join(*path_parts)
            if not os.path.isdir(newdir):
                os.makedirs(newdir)
            for filename in filenames:
                os.rename(os.path.join(curdir, filename),
                          os.path.join(newdir, filename))
            os.rmdir(curdir)


class OverwriteHandler(BaseHandler):
    def can_handle(contents, options):
        return ((options.flat and (contents == ONE_ENTRY_KNOWN)) or
                (options.overwrite and (contents != MATCHING_DIRECTORY)))
    can_handle = staticmethod(can_handle)

    def organize(self):
        self.target = self.extractor.basename()
        result = run_command(['rm', '-rf', self.target],
                             "removing %s to overwrite" % (self.target,))
        if result is None:
            os.rename(self.extractor.target, self.target)
        return result
        

class MatchHandler(BaseHandler):
    def can_handle(contents, options):
        return ((contents == MATCHING_DIRECTORY) or
                ((contents == ONE_ENTRY) and
                 options.one_entry_policy.ok_for_match()))
    can_handle = staticmethod(can_handle)

    def organize(self):
        if self.options.one_entry_policy == EXTRACT_HERE:
            destination = self.extractor.content_name.rstrip('/')
        else:
            destination = self.extractor.basename()
        self.target = self.extractor.name_checker(destination).check()
        if os.path.isdir(self.extractor.target):
            os.rename(os.path.join(self.extractor.target,
                                   os.listdir(self.extractor.target)[0]),
                      self.target)
            os.rmdir(self.extractor.target)
        else:
            os.rename(self.extractor.target, self.target)


class EmptyHandler(object):
    def can_handle(contents, options):
        return contents == EMPTY
    can_handle = staticmethod(can_handle)

    def __init__(self, extractor, options): pass
    def handle(self): pass


class BombHandler(BaseHandler):
    def can_handle(contents, options):
        return True
    can_handle = staticmethod(can_handle)

    def organize(self):
        basename = self.extractor.basename()
        self.target = self.extractor.name_checker(basename).check()
        os.rename(self.extractor.target, self.target)

        
class BasePolicy(object):
    def __init__(self, options):
        self.current_policy = None
        if options.batch:
            self.permanent_policy = self.answers['']
        else:
            self.permanent_policy = None

    def ask_question(self, question):
        question = textwrap.wrap(question) + self.choices
        while True:
            print "\n".join(question)
            try:
                answer = raw_input(self.prompt)
            except EOFError:
                return self.answers['']
            try:
                return self.answers[answer.lower()]
            except KeyError:
                print

    def __cmp__(self, other):
        return cmp(self.current_policy, other)
    

class OneEntryPolicy(BasePolicy):
    answers = {'h': EXTRACT_HERE, 'i': EXTRACT_WRAP, 'r': EXTRACT_RENAME,
               '': EXTRACT_WRAP}
    choices = ["You can:",
               " * extract it Inside another directory",
               " * extract it and Rename the directory",
               " * extract it Here"]
    prompt = "What do you want to do?  (I/r/h) "

    def prep(self, archive_filename, entry_name):
        question = ("%s contains one entry: %s." %
                    (archive_filename, entry_name))
        self.current_policy = (self.permanent_policy or
                               self.ask_question(question))

    def ok_for_match(self):
        return self.current_policy in (EXTRACT_RENAME, EXTRACT_HERE)


class RecursionPolicy(BasePolicy):
    answers = {'o': RECURSE_ONCE, 'a': RECURSE_ALWAYS, 'n': RECURSE_NOT_NOW,
               'v': RECURSE_NEVER, '': RECURSE_NOT_NOW}
    choices = ["You can:",
               " * Always extract included archives",
               " * extract included archives this Once",
               " * choose Not to extract included archives",
               " * neVer extract included archives"]
    prompt = "What do you want to do?  (a/o/N/v) "

    def __init__(self, options):
        BasePolicy.__init__(self, options)
        if options.recursive:
            self.permanent_policy = RECURSE_ALWAYS

    def prep(self, current_filename, included_archives):
        archive_count = len(included_archives)
        if (self.permanent_policy is not None) or (archive_count == 0):
            self.current_policy = self.permanent_policy or RECURSE_NOT_NOW
            return
        elif archive_count > 1:
            question = ("%s contains %s other archive files." %
                        (current_filename, archive_count))
        else:
            question = ("%s contains another archive: %s." %
                        (current_filename, included_archives[0]))
        self.current_policy = self.ask_question(question)
        if self.current_policy in (RECURSE_ALWAYS, RECURSE_NEVER):
            self.permanent_policy = self.current_policy

    def ok_to_recurse(self):
        return self.current_policy in (RECURSE_ALWAYS, RECURSE_ONCE)
            

extractor_map = {'application/x-tar': TarExtractor,
                 'application/zip': ZipExtractor,
                 'application/x-msdos-program': ZipExtractor,
                 'application/x-debian-package': DebExtractor,
                 'application/x-redhat-package-manager': RPMExtractor,
                 'application/x-rpm': RPMExtractor,
                 'application/x-cpio': CpioExtractor,
                 'application/x-ruby-gem': GemExtractor}

handlers = [FlatHandler, OverwriteHandler, MatchHandler, EmptyHandler,
            BombHandler]

class ExtractorBuilder(object):
    extractor_map = {}
    for args in ((TarExtractor, None, 'x-tar'),
                 (ZipExtractor, None, 'zip', 'x-msdos-program'),
                 (DebExtractor, DebMetadataExtractor, 'x-debian-package'),
                 (RPMExtractor, None, 'x-redhat-package-manager', 'x-rpm'),
                 (CpioExtractor, None, 'x-cpio'),
                 (GemExtractor, GemMetadataExtractor, 'x-ruby-gem')):
        for entry in args[2:]:
            if '/' not in entry:
                entry = 'application/' + entry
            extractor_map[entry] = args[0:2]

    def __init__(self, filename, options):
        self.filename = filename
        self.options = options
        self.mimetype, self.encoding = mimetypes.guess_type(self.filename)

    def get_extractor(self):
        extractor = self.find_extractor()
        if extractor is None:
            raise ExtractorError("not a known archive type")
        return extractor(self.filename, self.encoding)

    def find_extractor(self):
        extractor = None
        try:
            extractors = self.extractor_map[self.mimetype]
            if self.options.metadata and (extractors[1] is not None):
                extractor = extractors[1]
            else:
                extractor = extractors[0]
        except KeyError:
            if self.encoding:
                extractor = CompressionExtractor
        return extractor


class ExtractorApplication(object):
    def __init__(self, arguments):
        self.parse_options(arguments)
        self.setup_logger()
        self.successes = []
        self.failures = []

    def parse_options(self, arguments):
        parser = optparse.OptionParser(
            usage="%prog [options] archive [archive2 ...]",
            description="Intelligent archive extractor",
            version=VERSION_BANNER
            )
        parser.add_option('-r', '--recursive', dest='recursive',
                          action='store_true', default=False,
                          help='extract archives contained in the ones listed')
        parser.add_option('-q', '--quiet', dest='quiet',
                          action='count', default=3,
                          help='suppress warning/error messages')
        parser.add_option('-v', '--verbose', dest='verbose',
                          action='count', default=0,
                          help='be verbose/print debugging information')
        parser.add_option('-o', '--overwrite', dest='overwrite',
                          action='store_true', default=False,
                          help='overwrite any existing target directory')
        parser.add_option('-f', '--flat', '--no-directory', dest='flat',
                          action='store_true', default=False,
                          help="don't put contents in their own directory")
        parser.add_option('-l', '-t', '--list', '--table', dest='show_list',
                          action='store_true', default=False,
                          help="list contents of archives on standard output")
        parser.add_option('-n', '--noninteractive', dest='batch',
                          action='store_true', default=False,
                          help="don't ask how to handle special cases")
        parser.add_option('-m', '--metadata', dest='metadata',
                          action='store_true', default=False,
                          help="extract metadata from a .deb/.gem/etc.")
        self.options, filenames = parser.parse_args(arguments)
        if not filenames:
            parser.error("you did not list any archives")
        self.options.one_entry_policy = OneEntryPolicy(self.options)
        self.options.recursion_policy = RecursionPolicy(self.options)
        self.archives = {os.path.realpath(os.curdir): filenames}

    def setup_logger(self):
        self.logger = logging.getLogger('dtrx-log')
        handler = logging.StreamHandler()
        # WARNING is the default.
        handler.setLevel(10 * (self.options.quiet - self.options.verbose))
        formatter = logging.Formatter("dtrx: %(levelname)s: %(message)s")
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

    def get_extractor(self):
        builder = ExtractorBuilder(self.current_filename, self.options)
        self.current_extractor = builder.get_extractor()

    def get_handler(self):
        for var_name in ('type', 'name'):
            exec('content_%s = self.current_extractor.content_%s' %
                 (var_name, var_name))
        if content_type == ONE_ENTRY:
            self.options.one_entry_policy.prep(self.current_filename,
                                               content_name)
        for handler in handlers:
            if handler.can_handle(content_type, self.options):
                self.current_handler = handler(self.current_extractor,
                                               self.options)
                break

    def recurse(self):
        archives = self.current_extractor.included_archives
        self.options.recursion_policy.prep(self.current_filename, archives)
        if self.options.recursion_policy.ok_to_recurse():
            for filename in archives:
                tail_path, basename = os.path.split(filename)
                directory = os.path.join(self.current_directory,
                                         self.current_handler.target, tail_path)
                self.archives.setdefault(directory, []).append(basename)

    def report(self, function, *args):
        try:
            error = function(*args)
        except (ExtractorError, IOError, OSError), exception:
            error = str(exception)
            self.logger.debug(traceback.format_exception(*sys.exc_info()))
        if error:
            self.logger.error("%s: %s", self.current_filename, error)
            return False
        return True

    def record_status(self, success):
        if success:
            self.successes.append(self.current_filename)
        else:
            self.failures.append(self.current_filename)

    def extract(self):
        while self.archives:
            self.current_directory, filenames = self.archives.popitem()
            for filename in filenames:
                os.chdir(self.current_directory)
                self.current_filename = filename
                success = (self.report(self.get_extractor) and
                           self.report(self.current_extractor.extract) and
                           self.report(self.get_handler) and
                           self.report(self.current_handler.handle))
                if success:
                    self.recurse()
                self.record_status(success)
            self.options.one_entry_policy.permanent_policy = EXTRACT_WRAP

    def show_contents(self):
        for filename in self.current_extractor.get_filenames():
            print filename

    def show_list(self):
        filenames = self.archives.values()[0]
        if len(filenames) > 1:
            header = "%s:\n"
        else:
            header = None
        for filename in filenames:
            if header:
                print header % (filename,),
                header = "\n%s:\n"
            self.current_filename = filename
            success = (self.report(self.get_extractor) and
                       self.report(self.show_contents))
            self.record_status(success)

    def run(self):
        if self.options.show_list:
            self.show_list()
        else:
            self.extract()
        if self.failures:
            return 1
        return 0


if __name__ == '__main__':
    app = ExtractorApplication(sys.argv[1:])
    sys.exit(app.run())

mercurial