scripts/dtrx

Thu, 17 Jan 2008 22:36:07 -0500

author
brett
date
Thu, 17 Jan 2008 22:36:07 -0500
branch
trunk
changeset 45
37d555407334
parent 43
4591a32eedc8
child 46
652871d804ab
permissions
-rwxr-xr-x

[svn] At work I was getting an unhelpful "No such file or directory" error when I
tried to extract an .exe. It turns out this was because I didn't have
cabextract installed. This inspired a few changes:

* BaseExtractor now raises an ExtractorUnusable error when this happens.
* Various points in the code deal with this properly.
* The loop for trying extractors will now report all the errors it got, if
extraction is unsuccessful, to better help you understand why.

Also snuck in a bug fix where things weren't being cleaned properly with a
half-extracted archive.

Also got my version number situation in order, in anticipation for the next
release, which may be a while yet but at least I won't have to worry about
it then.

#!/usr/bin/env python
#
# dtrx -- Intelligently extract various archive types.
# Copyright (c) 2006, 2007 Brett Smith <brettcsmith@brettcsmith.org>.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, see <http://www.gnu.org/licenses/>.

import errno
import logging
import mimetypes
import optparse
import os
import re
import shutil
import stat
import subprocess
import sys
import tempfile
import textwrap
import traceback

from sets import Set

VERSION = "6.0"
VERSION_BANNER = """dtrx version %s
Copyright (c) 2006, 2007, 2008 Brett Smith <brettcsmith@brettcsmith.org>

This program is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 3 of the License, or (at your
option) any later version.

This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
Public License for more details.""" % (VERSION,)

MATCHING_DIRECTORY = 1
ONE_ENTRY = 2
BOMB = 3
EMPTY = 4
ONE_ENTRY_KNOWN = 5

EXTRACT_HERE = 1
EXTRACT_WRAP = 2
EXTRACT_RENAME = 3

RECURSE_ALWAYS = 1
RECURSE_ONCE = 2
RECURSE_NOT_NOW = 3
RECURSE_NEVER = 4

mimetypes.encodings_map.setdefault('.bz2', 'bzip2')
mimetypes.encodings_map.setdefault('.lzma', 'lzma')
mimetypes.types_map.setdefault('.gem', 'application/x-ruby-gem')

logger = logging.getLogger('dtrx-log')

def run_command(command, description, stdout=None, stderr=None, stdin=None):
    process = subprocess.Popen(command, stdin=stdin, stdout=stdout,
                               stderr=stderr)
    status = process.wait()
    for pipe in (process.stdout, process.stderr):
        try:
            pipe.close()
        except AttributeError:
            pass
    if status != 0:
        return ("%s error: '%s' returned status code %s" %
                (description, ' '.join(command), status))
    return None

class FilenameChecker(object):
    def __init__(self, original_name):
        self.original_name = original_name

    def is_free(self, filename):
        return not os.path.exists(filename)

    def create(self):
        fd, filename = tempfile.mkstemp(prefix=self.original_name + '.',
                                        dir='.')
        os.close(fd)
        return filename

    def check(self):
        if self.is_free(self.original_name):
            return self.original_name
        for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]:
            filename = '%s%s' % (self.original_name, suffix)
            if self.is_free(filename):
                return filename
        return self.create()


class DirectoryChecker(FilenameChecker):
    def is_free(self, filename):
        try:
            os.mkdir(filename)
        except OSError, error:
            if error.errno == errno.EEXIST:
                return False
            raise
        return True

    def create(self):
        return tempfile.mkdtemp(prefix=self.original_name + '.', dir='.')


class ExtractorError(Exception):
    pass


class ExtractorUnusable(Exception):
    pass


EXTRACTION_ERRORS = (ExtractorError, ExtractorUnusable, OSError, IOError)

class BaseExtractor(object):
    decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat',
                'lzma': 'lzcat'}

    name_checker = DirectoryChecker

    def __init__(self, filename, encoding):
        if encoding and (not self.decoders.has_key(encoding)):
            raise ValueError("unrecognized encoding %s" % (encoding,))
        self.filename = os.path.realpath(filename)
        self.encoding = encoding
        self.included_archives = []
        self.target = None
        self.content_type = None
        self.content_name = None
        self.pipes = []
        try:
            self.archive = open(filename, 'r')
        except (IOError, OSError), error:
            raise ExtractorError("could not open %s: %s" %
                                 (filename, error.strerror))
        if encoding:
            self.pipe([self.decoders[encoding]], "decoding")
        self.prepare()

    def pipe(self, command, description="extraction"):
        self.pipes.append((command, description))

    def run_pipes(self, final_stdout=None):
        if not self.pipes:
            return
        elif final_stdout is None:
            # FIXME: Buffering this might be dumb.
            final_stdout = tempfile.TemporaryFile()
        num_pipes = len(self.pipes)
        last_pipe = num_pipes - 1
        processes = []
        for index, command in enumerate([pipe[0] for pipe in self.pipes]):
            if index == 0:
                stdin = self.archive
            else:
                stdin = processes[-1].stdout
            if index == last_pipe:
                stdout = final_stdout
            else:
                stdout = subprocess.PIPE
            try:
                processes.append(subprocess.Popen(command, stdin=stdin,
                                                  stdout=stdout,
                                                  stderr=subprocess.PIPE))
            except OSError, error:
                if error.errno == errno.ENOENT:
                    raise ExtractorUnusable("could not run %s" % (command[0],))
                raise
        exit_codes = [pipe.wait() for pipe in processes]
        self.archive.close()
        for index in range(last_pipe):
            processes[index].stdout.close()
            processes[index].stderr.close()
        for index, status in enumerate(exit_codes):
            if status != 0:
                raise ExtractorError("%s error: '%s' returned status code %s" %
                                     (self.pipes[index][1],
                                      ' '.join(self.pipes[index][0]), status))
        self.archive = final_stdout
    
    def prepare(self):
        pass

    def check_included_archives(self, filenames):
        for filename in filenames:
            if (ExtractorBuilder.try_by_mimetype(filename) or
                ExtractorBuilder.try_by_extension(filename)):
                self.included_archives.append(filename)

    def check_contents(self):
        filenames = os.listdir('.')
        if not filenames:
            self.content_type = EMPTY
        elif len(filenames) == 1:
            if self.basename() == filenames[0]:
                self.content_type = MATCHING_DIRECTORY
            else:
                self.content_type = ONE_ENTRY
            self.content_name = filenames[0]
            if os.path.isdir(filenames[0]):
                self.content_name += '/'
        else:
            self.content_type = BOMB
        self.check_included_archives(filenames)

    def basename(self):
        pieces = os.path.basename(self.filename).split('.')
        extension = '.' + pieces[-1]
        if mimetypes.encodings_map.has_key(extension):
            pieces.pop()
            extension = '.' + pieces[-1]
        if (mimetypes.types_map.has_key(extension) or
            mimetypes.common_types.has_key(extension) or
            mimetypes.suffix_map.has_key(extension)):
            pieces.pop()
        return '.'.join(pieces)

    def extract(self):
        try:
            self.target = tempfile.mkdtemp(prefix='.dtrx-', dir='.')
        except (OSError, IOError), error:
            raise ExtractorError("cannot extract here: %s" % (error.strerror,))
        old_path = os.path.realpath(os.curdir)
        os.chdir(self.target)
        try:
            self.archive.seek(0, 0)
            self.extract_archive()
            self.check_contents()
        except EXTRACTION_ERRORS:
            os.chdir(old_path)
            shutil.rmtree(self.target, ignore_errors=True)
            raise
        os.chdir(old_path)

    def get_filenames(self):
        self.run_pipes()
        self.archive.seek(0, 0)
        while True:
            line = self.archive.readline()
            if not line:
                self.archive.close()
                return
            yield line.rstrip('\n')
    

class CompressionExtractor(BaseExtractor):
    file_type = 'compressed file'
    name_checker = FilenameChecker

    def basename(self):
        pieces = os.path.basename(self.filename).split('.')
        extension = '.' + pieces[-1]
        if mimetypes.encodings_map.has_key(extension):
            pieces.pop()
        return '.'.join(pieces)

    def get_filenames(self):
        yield self.basename()

    def extract(self):
        self.content_type = ONE_ENTRY_KNOWN
        self.content_name = self.basename()
        try:
            output_fd, self.target = tempfile.mkstemp(prefix='.dtrx-', dir='.')
        except (OSError, IOError), error:
            raise ExtractorError("cannot extract here: %s" % (error.strerror,))
        try:
            self.run_pipes(output_fd)
        except EXTRACTION_ERRORS:
            os.close(output_fd)
            os.unlink(self.target)
            raise
        os.close(output_fd)
        

class TarExtractor(BaseExtractor):
    file_type = 'tar file'

    def get_filenames(self):
        self.pipe(['tar', '-t'], "listing")
        return BaseExtractor.get_filenames(self)

    def extract_archive(self): 
        self.pipe(['tar', '-x'])
        self.run_pipes()
        
        
class CpioExtractor(BaseExtractor):
    file_type = 'cpio file'

    def get_filenames(self):
        self.pipe(['cpio', '-t'], "listing")
        return BaseExtractor.get_filenames(self)

    def extract_archive(self):
        self.pipe(['cpio', '-i', '--make-directories',
                   '--no-absolute-filenames'])
        self.run_pipes()


class RPMExtractor(CpioExtractor):
    file_type = 'RPM'

    def prepare(self):
        self.pipe(['rpm2cpio', '-'], "rpm2cpio")

    def basename(self):
        pieces = os.path.basename(self.filename).split('.')
        if len(pieces) == 1:
            return pieces[0]
        elif pieces[-1] != 'rpm':
            return BaseExtractor.basename(self)
        pieces.pop()
        if len(pieces) == 1:
            return pieces[0]
        elif len(pieces[-1]) < 8:
            pieces.pop()
        return '.'.join(pieces)

    def check_contents(self):
        self.check_included_archives(os.listdir('.'))
        self.content_type = BOMB


class DebExtractor(TarExtractor):
    file_type = 'Debian package'

    def prepare(self):
        self.pipe(['ar', 'p', self.filename, 'data.tar.gz'],
                  "data.tar.gz extraction")
        self.pipe(['zcat'], "data.tar.gz decompression")

    def basename(self):
        pieces = os.path.basename(self.filename).split('_')
        if len(pieces) == 1:
            return pieces[0]
        last_piece = pieces.pop()
        if (len(last_piece) > 10) or (not last_piece.endswith('.deb')):
            return BaseExtractor.basename(self)
        return '_'.join(pieces)

    def check_contents(self):
        self.check_included_archives(os.listdir('.'))
        self.content_type = BOMB


class DebMetadataExtractor(DebExtractor):
    def prepare(self):
        self.pipe(['ar', 'p', self.filename, 'control.tar.gz'],
                  "control.tar.gz extraction")
        self.pipe(['zcat'], "control.tar.gz decompression")


class GemExtractor(TarExtractor):
    file_type = 'Ruby gem'

    def prepare(self):
        self.pipe(['tar', '-xO', 'data.tar.gz'], "data.tar.gz extraction")
        self.pipe(['zcat'], "data.tar.gz decompression")

    def check_contents(self):
        self.check_included_archives(os.listdir('.'))
        self.content_type = BOMB


class GemMetadataExtractor(CompressionExtractor):
    file_type = 'Ruby gem'

    def prepare(self):
        self.pipe(['tar', '-xO', 'metadata.gz'], "metadata.gz extraction")
        self.pipe(['zcat'], "metadata.gz decompression")

    def basename(self):
        return os.path.basename(self.filename) + '-metadata.txt'


class NoPipeExtractor(BaseExtractor):
    # Some extraction tools won't accept the archive from stdin.  With
    # these, the piping infrastructure we normally set up generally doesn't
    # work, at least at first.  We can still use most of it; we just don't
    # want to seed self.archive with the archive file, since that sucks up
    # memory.  So instead we seed it with /dev/null, and specify the
    # filename on the command line as necessary.  We also open the actual
    # file with os.open, to make sure we can actually do it (permissions
    # are good, etc.).  This class doesn't do anything by itself; it's just
    # meant to be a base class for extractors that rely on these dumb
    # tools.
    def __init__(self, filename, encoding):
        os.close(os.open(filename, os.O_RDONLY))
        BaseExtractor.__init__(self, '/dev/null', None)
        self.filename = os.path.realpath(filename)


class ZipExtractor(NoPipeExtractor):
    file_type = 'Zip file'

    def get_filenames(self):
        self.pipe(['zipinfo', '-1', self.filename], "listing")
        return BaseExtractor.get_filenames(self)

    def extract_archive(self):
        self.pipe(['unzip', '-q', self.filename])
        self.run_pipes()


class SevenExtractor(NoPipeExtractor):
    file_type = '7z file'
    border_re = re.compile('^[- ]+$')

    def get_filenames(self):
        self.pipe(['7z', 'l', self.filename], "listing")
        self.run_pipes()
        self.archive.seek(0, 0)
        fn_index = None
        for line in self.archive:
            if self.border_re.match(line):
                if fn_index is not None:
                    break
                else:
                    fn_index = line.rindex(' ') + 1
            elif fn_index is not None:
                yield line[fn_index:-1]
        self.archive.close()

    def extract_archive(self):
        self.pipe(['7z', 'x', self.filename])
        self.run_pipes()
        

class CABExtractor(NoPipeExtractor):
    file_type = 'CAB archive'
    border_re = re.compile(r'^[-\+]+$')

    def get_filenames(self):
        self.pipe(['cabextract', '-l', self.filename], "listing")
        self.run_pipes()
        self.archive.seek(0, 0)
        fn_index = None
        for line in self.archive:
            if self.border_re.match(line):
                break
        for line in self.archive:
            try:
                yield line.split(' | ', 2)[2].rstrip('\n')
            except IndexError:
                break
        self.archive.close()

    def extract_archive(self):
        self.pipe(['cabextract', '-q', self.filename])
        self.run_pipes()


class BaseHandler(object):
    def __init__(self, extractor, options):
        self.extractor = extractor
        self.options = options
        self.target = None

    def handle(self):
        command = 'find'
        status = subprocess.call(['find', self.extractor.target, '-type', 'd',
                                  '-exec', 'chmod', 'u+rwx', '{}', ';'])
        if status == 0:
            command = 'chmod'
            status = subprocess.call(['chmod', '-R', 'u+rwX',
                                      self.extractor.target])
        if status != 0:
            return "%s returned with exit status %s" % (command, status)
        return self.organize()

    def set_target(self, target, checker):
        self.target = checker(target).check()
        if self.target != target:
            logger.warning("extracting %s to %s" %
                           (self.extractor.filename, self.target))


# The "where to extract" table, with options and archive types.
# This dictates the contents of each can_handle method.
#
#         Flat           Overwrite            None
# File    basename       basename             FilenameChecked
# Match   .              .                    tempdir + checked
# Bomb    .              basename             DirectoryChecked

class FlatHandler(BaseHandler):
    def can_handle(contents, options):
        return ((options.flat and (contents != ONE_ENTRY_KNOWN)) or
                (options.overwrite and (contents == MATCHING_DIRECTORY)))
    can_handle = staticmethod(can_handle)

    def organize(self):
        self.target = '.'
        for curdir, dirs, filenames in os.walk(self.extractor.target,
                                               topdown=False):
            path_parts = curdir.split(os.sep)
            if path_parts[0] == '.':
                del path_parts[1]
            else:
                del path_parts[0]
            newdir = os.path.join(*path_parts)
            if not os.path.isdir(newdir):
                os.makedirs(newdir)
            for filename in filenames:
                os.rename(os.path.join(curdir, filename),
                          os.path.join(newdir, filename))
            os.rmdir(curdir)


class OverwriteHandler(BaseHandler):
    def can_handle(contents, options):
        return ((options.flat and (contents == ONE_ENTRY_KNOWN)) or
                (options.overwrite and (contents != MATCHING_DIRECTORY)))
    can_handle = staticmethod(can_handle)

    def organize(self):
        self.target = self.extractor.basename()
        result = run_command(['rm', '-rf', self.target],
                             "removing %s to overwrite" % (self.target,))
        if result is None:
            os.rename(self.extractor.target, self.target)
        return result
        

class MatchHandler(BaseHandler):
    def can_handle(contents, options):
        return ((contents == MATCHING_DIRECTORY) or
                ((contents == ONE_ENTRY) and
                 options.one_entry_policy.ok_for_match()))
    can_handle = staticmethod(can_handle)

    def organize(self):
        source = os.path.join(self.extractor.target,
                              os.listdir(self.extractor.target)[0])
        if os.path.isdir(source):
            checker = DirectoryChecker
        else:
            checker = FilenameChecker
        if self.options.one_entry_policy == EXTRACT_HERE:
            destination = self.extractor.content_name.rstrip('/')
        else:
            destination = self.extractor.basename()
        self.set_target(destination, checker)
        if os.path.isdir(self.extractor.target):
            os.rename(source, self.target)
            os.rmdir(self.extractor.target)
        else:
            os.rename(self.extractor.target, self.target)


class EmptyHandler(object):
    def can_handle(contents, options):
        return contents == EMPTY
    can_handle = staticmethod(can_handle)

    def __init__(self, extractor, options): pass
    def handle(self): pass


class BombHandler(BaseHandler):
    def can_handle(contents, options):
        return True
    can_handle = staticmethod(can_handle)

    def organize(self):
        basename = self.extractor.basename()
        self.set_target(basename, self.extractor.name_checker)
        os.rename(self.extractor.target, self.target)

        
class BasePolicy(object):
    def __init__(self, options):
        self.current_policy = None
        if options.batch:
            self.permanent_policy = self.answers['']
        else:
            self.permanent_policy = None

    def ask_question(self, question):
        question = textwrap.wrap(question) + self.choices
        while True:
            print "\n".join(question)
            try:
                answer = raw_input(self.prompt)
            except EOFError:
                return self.answers['']
            try:
                return self.answers[answer.lower()]
            except KeyError:
                print

    def __cmp__(self, other):
        return cmp(self.current_policy, other)
    

class OneEntryPolicy(BasePolicy):
    answers = {'h': EXTRACT_HERE, 'i': EXTRACT_WRAP, 'r': EXTRACT_RENAME,
               '': EXTRACT_WRAP}
    choices = ["You can:",
               " * extract it Inside another directory",
               " * extract it and Rename the directory",
               " * extract it Here"]
    prompt = "What do you want to do?  (I/r/h) "

    def prep(self, archive_filename, entry_name):
        question = ("%s contains one entry: %s." %
                    (archive_filename, entry_name))
        self.current_policy = (self.permanent_policy or
                               self.ask_question(question))

    def ok_for_match(self):
        return self.current_policy in (EXTRACT_RENAME, EXTRACT_HERE)


class RecursionPolicy(BasePolicy):
    answers = {'o': RECURSE_ONCE, 'a': RECURSE_ALWAYS, 'n': RECURSE_NOT_NOW,
               'v': RECURSE_NEVER, '': RECURSE_NOT_NOW}
    choices = ["You can:",
               " * Always extract included archives",
               " * extract included archives this Once",
               " * choose Not to extract included archives",
               " * neVer extract included archives"]
    prompt = "What do you want to do?  (a/o/N/v) "

    def __init__(self, options):
        BasePolicy.__init__(self, options)
        if options.show_list:
            self.permanent_policy = RECURSE_NEVER
        elif options.recursive:
            self.permanent_policy = RECURSE_ALWAYS

    def prep(self, current_filename, included_archives):
        archive_count = len(included_archives)
        if (self.permanent_policy is not None) or (archive_count == 0):
            self.current_policy = self.permanent_policy or RECURSE_NOT_NOW
            return
        elif archive_count > 1:
            question = ("%s contains %s other archive files." %
                        (current_filename, archive_count))
        else:
            question = ("%s contains another archive: %s." %
                        (current_filename, included_archives[0]))
        self.current_policy = self.ask_question(question)
        if self.current_policy in (RECURSE_ALWAYS, RECURSE_NEVER):
            self.permanent_policy = self.current_policy

    def ok_to_recurse(self):
        return self.current_policy in (RECURSE_ALWAYS, RECURSE_ONCE)
            

class ExtractorBuilder(object):
    extractor_map = {'tar': (TarExtractor, None),
                     'zip': (ZipExtractor, None),
                     'deb': (DebExtractor, DebMetadataExtractor),
                     'rpm': (RPMExtractor, None),
                     'cpio': (CpioExtractor, None),
                     'gem': (GemExtractor, GemMetadataExtractor),
                     'compress': (CompressionExtractor, None),
                     '7z': (SevenExtractor, None),
                     'cab': (CABExtractor, None)}

    mimetype_map = {}
    for mapping in (('tar', 'x-tar'),
                    ('zip', 'x-msdos-program', 'zip'),
                    ('deb', 'x-debian-package'),
                    ('rpm', 'x-redhat-package-manager', 'x-rpm'),
                    ('cpio', 'x-cpio'),
                    ('gem', 'x-ruby-gem'),
                    ('7z', 'x-7z-compressed'),
                    ('cab', 'x-cab')):
        for mimetype in mapping[1:]:
            if '/' not in mimetype:
                mimetype = 'application/' + mimetype
            mimetype_map[mimetype] = mapping[0]

    magic_mime_map = {}
    for mapping in (('deb', 'Debian binary package'),
                    ('cpio', 'cpio archive'),
                    ('tar', 'POSIX tar archive'),
                    ('zip', 'Zip archive'),
                    ('rpm', 'RPM'),
                    ('7z', '7-zip archive'),
                    ('cab', 'Microsoft Cabinet archive')):
        for pattern in mapping[1:]:
            magic_mime_map[re.compile(pattern)] = mapping[0]
    
    magic_encoding_map = {}
    for mapping in (('bzip2', 'bzip2 compressed'),
                    ('gzip', 'gzip compressed')):
        for pattern in mapping[1:]:
            magic_encoding_map[re.compile(pattern)] = mapping[0]

    extension_map = {}
    for mapping in (('tar', 'bzip2', 'tar.bz2'),
                    ('tar', 'gzip', 'tar.gz', 'tgz'),
                    ('tar', None, 'tar'),
                    ('zip', None, 'zip', 'exe'),
                    ('deb', None, 'deb'),
                    ('rpm', None, 'rpm'),
                    ('cpio', None, 'cpio'),
                    ('gem', None, 'gem'),
                    ('compress', 'gzip', 'Z', 'gz'),
                    ('compress', 'bzip2', 'bz2'),
                    ('compress', 'lzma', 'lzma'),
                    ('7z', None, '7z'),
                    ('cab', None, 'cab', 'exe')):
        for extension in mapping[2:]:
            extension_map.setdefault(extension, []).append(mapping[:2])

    def __init__(self, filename, options):
        self.filename = filename
        self.options = options

    def build_extractor(self, archive_type, encoding):
        extractors = self.extractor_map[archive_type]
        if self.options.metadata and (extractors[1] is not None):
            extractor = extractors[1]
        else:
            extractor = extractors[0]
        return extractor(self.filename, encoding)

    def get_extractor(self):
        tried_types = Set()
        # As smart as it is, the magic test can't go first, because at least
        # on my system it just recognizes gem files as tar files.  I guess
        # it's possible for the opposite problem to occur -- where the mimetype
        # or extension suggests something less than ideal -- but it seems less
        # likely so I'm sticking with this.
        for func_name in ('mimetype', 'extension', 'magic'):
            logger.debug("getting extractors by %s" % (func_name,))
            extractor_types = \
                            getattr(self, 'try_by_' + func_name)(self.filename)
            logger.debug("done getting extractors")
            for ext_args in extractor_types:
                if ext_args in tried_types:
                    continue
                tried_types.add(ext_args)
                logger.debug("trying %s extractor from %s" %
                             (ext_args, func_name))
                yield self.build_extractor(*ext_args)

    def try_by_mimetype(cls, filename):
        mimetype, encoding = mimetypes.guess_type(filename)
        try:
            return [(cls.mimetype_map[mimetype], encoding)]
        except KeyError:
            if encoding:
                return [('compress', encoding)]
        return []
    try_by_mimetype = classmethod(try_by_mimetype)

    def magic_map_matches(cls, output, magic_map):
        return [result for regexp, result in magic_map.items()
                if regexp.search(output)]
    magic_map_matches = classmethod(magic_map_matches)
        
    def try_by_magic(cls, filename):
        process = subprocess.Popen(['file', '-z', filename],
                                   stdout=subprocess.PIPE)
        status = process.wait()
        if status != 0:
            return []
        output = process.stdout.readline()
        process.stdout.close()
        if output.startswith('%s: ' % filename):
            output = output[len(filename) + 2:]
        mimes = cls.magic_map_matches(output, cls.magic_mime_map)
        encodings = cls.magic_map_matches(output, cls.magic_encoding_map)
        if mimes and not encodings:
            encodings = [None]
        elif encodings and not mimes:
            mimes = ['compress']
        return [(m, e) for m in mimes for e in encodings]
    try_by_magic = classmethod(try_by_magic)

    def try_by_extension(cls, filename):
        parts = filename.split('.')[-2:]
        results = []
        while parts:
            results.extend(cls.extension_map.get('.'.join(parts), []))
            del parts[0]
        return results
    try_by_extension = classmethod(try_by_extension)


class BaseAction(object):
    def __init__(self, options, filenames):
        self.options = options
        self.filenames = filenames
        self.target = None
        
    def report(self, function, *args):
        try:
            error = function(*args)
        except EXTRACTION_ERRORS, exception:
            error = str(exception)
            logger.debug(''.join(traceback.format_exception(*sys.exc_info())))
        return error


class ExtractionAction(BaseAction):
    handlers = [FlatHandler, OverwriteHandler, MatchHandler, EmptyHandler,
                BombHandler]

    def get_handler(self, extractor):
        if extractor.content_type == ONE_ENTRY:
            self.options.one_entry_policy.prep(self.current_filename,
                                               extractor.content_name)
        for handler in self.handlers:
            if handler.can_handle(extractor.content_type, self.options):
                logger.debug("using %s handler" % (handler.__name__,))
                self.current_handler = handler(extractor, self.options)
                break

    def run(self, filename, extractor):
        self.current_filename = filename
        error = (self.report(extractor.extract) or
                 self.report(self.get_handler, extractor) or
                 self.report(self.current_handler.handle))
        if not error:
            self.target = self.current_handler.target
        return error


class ListAction(BaseAction):
    def __init__(self, options, filenames):
        BaseAction.__init__(self, options, filenames)
        self.count = 0

    def get_list(self, extractor):
        # Note: The reason I'm getting all the filenames up front is
        # because if we run into trouble partway through the archive, we'll
        # try another extractor.  So before we display anything we have to
        # be sure this one is successful.  We maybe don't have to be quite
        # this conservative but this is the easy way out for now.
        self.filelist = list(extractor.get_filenames())

    def show_list(self, filename):
        self.count += 1
        if len(self.filenames) != 1:
            if self.count > 1:
                print
            print "%s:" % (filename,)
        print '\n'.join(self.filelist)

    def run(self, filename, extractor):
        return (self.report(self.get_list, extractor) or
                self.report(self.show_list, filename))


class ExtractorApplication(object):
    def __init__(self, arguments):
        self.parse_options(arguments)
        self.setup_logger()
        self.successes = []
        self.failures = []

    def parse_options(self, arguments):
        parser = optparse.OptionParser(
            usage="%prog [options] archive [archive2 ...]",
            description="Intelligent archive extractor",
            version=VERSION_BANNER
            )
        parser.add_option('-r', '--recursive', dest='recursive',
                          action='store_true', default=False,
                          help='extract archives contained in the ones listed')
        parser.add_option('-q', '--quiet', dest='quiet',
                          action='count', default=3,
                          help='suppress warning/error messages')
        parser.add_option('-v', '--verbose', dest='verbose',
                          action='count', default=0,
                          help='be verbose/print debugging information')
        parser.add_option('-o', '--overwrite', dest='overwrite',
                          action='store_true', default=False,
                          help='overwrite any existing target directory')
        parser.add_option('-f', '--flat', '--no-directory', dest='flat',
                          action='store_true', default=False,
                          help="don't put contents in their own directory")
        parser.add_option('-l', '-t', '--list', '--table', dest='show_list',
                          action='store_true', default=False,
                          help="list contents of archives on standard output")
        parser.add_option('-n', '--noninteractive', dest='batch',
                          action='store_true', default=False,
                          help="don't ask how to handle special cases")
        parser.add_option('-m', '--metadata', dest='metadata',
                          action='store_true', default=False,
                          help="extract metadata from a .deb/.gem")
        self.options, filenames = parser.parse_args(arguments)
        if not filenames:
            parser.error("you did not list any archives")
        self.options.one_entry_policy = OneEntryPolicy(self.options)
        self.options.recursion_policy = RecursionPolicy(self.options)
        self.archives = {os.path.realpath(os.curdir): filenames}

    def setup_logger(self):
        # WARNING is the default.
        log_level = (10 * (self.options.quiet - self.options.verbose))
        logging.getLogger().setLevel(log_level)
        handler = logging.StreamHandler()
        handler.setLevel(log_level)
        formatter = logging.Formatter("dtrx: %(levelname)s: %(message)s")
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        logger.debug("logger is set up")

    def recurse(self, filename, extractor, action):
        archives = extractor.included_archives
        self.options.recursion_policy.prep(filename, archives)
        if self.options.recursion_policy.ok_to_recurse():
            for filename in archives:
                tail_path, basename = os.path.split(filename)
                directory = os.path.join(self.current_directory,
                                         action.target, tail_path)
                self.archives.setdefault(directory, []).append(basename)

    def check_file(self, filename):
        try:
            result = os.stat(filename)
        except OSError, error:
            return error.strerror
        if stat.S_ISDIR(result.st_mode):
            return "cannot extract a directory"

    def try_extractors(self, filename, builder):
        errors = []
        for extractor in builder:
            error = self.action.run(filename, extractor)
            if error:
                errors.append((extractor.file_type, extractor.encoding, error))
            else:
                self.recurse(filename, extractor, self.action)
                return
        logger.error("could not handle %s" % (filename,))
        if not errors:
            logger.error("not a known archive type")
            return True
        for file_type, encoding, error in errors:
            message = ["treating as", file_type, "failed:", error]
            if encoding:
                message.insert(1, "%s-encoded" % (encoding,))
            logger.error(' '.join(message))
        return True
        
    def run(self):
        if self.options.show_list:
            action = ListAction
        else:
            action = ExtractionAction
        self.action = action(self.options, self.archives.values()[0])
        while self.archives:
            self.current_directory, self.filenames = self.archives.popitem()
            os.chdir(self.current_directory)
            for filename in self.filenames:
                builder = ExtractorBuilder(filename, self.options)
                error = (self.check_file(filename) or
                         self.try_extractors(filename, builder.get_extractor()))
                if error:
                    if error != True:
                        logger.error("%s: %s" % (filename, error))
                    self.failures.append(filename)
                else:
                    self.successes.append(filename)
            self.options.one_entry_policy.permanent_policy = EXTRACT_WRAP
        if self.failures:
            return 1
        return 0


if __name__ == '__main__':
    app = ExtractorApplication(sys.argv[1:])
    sys.exit(app.run())

mercurial