scripts/x

Sat, 11 Nov 2006 18:42:19 -0500

author
brett
date
Sat, 11 Nov 2006 18:42:19 -0500
branch
trunk
changeset 7
1f3cb3845dfd
parent 6
77043f4e6a9f
child 8
97388f5ff770
permissions
-rwxr-xr-x

[svn] Add a test for recursive extraction which also makes sure that we fix
permissions after we extract the archive, and DTRT when an archive contains
one file. Add code to handle the latter two cases.

ExtractorApplication is a total mess at this point. I already am having a
hard time following how the pieces fit together. Cleaning it up is my next
task; that'll be easier now that I test most of the functionality again.

#!/usr/bin/env python
#
# x -- Intelligently extract various archive types.
# Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, 5th Floor, Boston, MA, 02111.

import errno
import mimetypes
import optparse
import os
import subprocess
import sys
import tempfile

from cStringIO import StringIO

VERSION = "1.1"
VERSION_BANNER = """x version %s
Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>

This program is free software; you can redistribute it and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation; either version 2 of the License, or (at your
option) any later version.

This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
Public License for more details.""" % (VERSION,)

MATCHING_DIRECTORY = 1
# ONE_DIRECTORY = 2
BOMB = 3
EMPTY = 4

mimetypes.encodings_map.setdefault('.bz2', 'bzip2')
mimetypes.types_map['.exe'] = 'application/x-msdos-program'

class ExtractorError(Exception):
    pass


class ProcessStreamer(object):
    def __init__(self, command, stdin, description="checking contents",
                 stderr=None):
        self.process = subprocess.Popen(command, bufsize=1, stdin=stdin,
                                        stdout=subprocess.PIPE, stderr=stderr)
        self.command = ' '.join(command)
        self.description = description

    def __iter__(self):
        return self

    def next(self):
        line = self.process.stdout.readline()
        if line:
            return line.rstrip('\n')
        else:
            raise StopIteration

    def stop(self):
        while self.process.stdout.readline():
            pass
        self.process.stdout.close()
        status = self.process.wait()
        if status != 0:
            raise ExtractorError("%s error: '%s' returned status code %s" %
                                 (self.description, self.command, status))
        try:
            self.process.stderr.close()
        except AttributeError:
            pass
    

class BaseExtractor(object):
    decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'}

    def __init__(self, filename, mimetype, encoding):
        if encoding and (not self.decoders.has_key(encoding)):
            raise ValueError("unrecognized encoding %s" % (encoding,))
        self.filename = filename
        self.mimetype = mimetype
        self.encoding = encoding
        self.included_archives = []
        try:
            self.archive = open(filename, 'r')
        except (IOError, OSError), error:
            raise ExtractorError("could not open %s: %s" %
                                 (filename, error.strerror))
        if encoding:
            self.pipe([self.decoders[encoding]], "decoding")
        self.prepare()

    def run(self, command, description="extraction", stdout=None, stderr=None,
            stdin=None):
        process = subprocess.Popen(command, stdin=stdin, stdout=stdout,
                                   stderr=stderr)
        status = process.wait()
        if status != 0:
            raise ExtractorError("%s error: '%s' returned status code %s" %
                                 (description, ' '.join(command), status))
        for pipe in (process.stdout, process.stderr):
            try:
                pipe.close()
            except AttributeError:
                pass

    def pipe(self, command, description, stderr=None):
        output = tempfile.TemporaryFile()
        self.run(command, description, output, stderr, self.archive)
        self.archive.close()
        self.archive = output
        self.archive.flush()
    
    def prepare(self):
        pass

    def check_contents(self):
        self.archive.seek(0, 0)
        archive_type = None
        filenames = self.get_filenames()
        try:
            filename = filenames.next()
            if extractor_map.has_key(mimetypes.guess_type(filename)[0]):
                self.included_archives.append(filename)
            first_part = filename.split('/', 1)[0] + '/'
        except StopIteration:
            filenames.stop()
            return EMPTY
        for filename in filenames:
            if extractor_map.has_key(mimetypes.guess_type(filename)[0]):
                self.included_archives.append(filename)
            if (archive_type is None) and (not filename.startswith(first_part)):
                archive_type = BOMB
        filenames.stop()
        if archive_type:
            return archive_type
        if self.basename() == first_part[:-1]:
            return MATCHING_DIRECTORY
        return first_part

    def basename(self):
        pieces = os.path.basename(self.filename).split('.')
        extension = '.' + pieces[-1]
        if mimetypes.encodings_map.has_key(extension):
            pieces.pop()
            extension = '.' + pieces[-1]
        if (mimetypes.types_map.has_key(extension) or
            mimetypes.common_types.has_key(extension) or
            mimetypes.suffix_map.has_key(extension)):
            pieces.pop()
        return '.'.join(pieces)

    def extract(self, path):
        self.archive.seek(0, 0)
        self.extract_archive()
    

class TarExtractor(BaseExtractor):
    def get_filenames(self):
        return ProcessStreamer(['tar', '-t'], self.archive)

    def extract_archive(self):
        self.run(['tar', '-x'], stdin=self.archive)
        
        
class ZipExtractor(BaseExtractor):
    def __init__(self, filename, mimetype, encoding):
        self.filename = filename
        self.mimetype = mimetype
        self.encoding = encoding
        self.included_archives = []
        self.archive = StringIO()

    def get_filenames(self):
        return ProcessStreamer(['zipinfo', '-1', self.filename], None)

    def extract(self, path):
        self.run(['unzip', '-q', os.path.join(path, self.filename)])


class CpioExtractor(BaseExtractor):
    def get_filenames(self):
        return ProcessStreamer(['cpio', '-t'], self.archive,
                               stderr=subprocess.PIPE)

    def extract_archive(self):
        self.run(['cpio', '-i', '--make-directories',
                  '--no-absolute-filenames'],
                 stderr=subprocess.PIPE, stdin=self.archive)


class RPMExtractor(CpioExtractor):
    def prepare(self):
        self.pipe(['rpm2cpio', '-'], "rpm2cpio")

    def basename(self):
        pieces = os.path.basename(self.filename).rsplit('.', 2)
        if len(pieces) == 1:
            return pieces[0]
        elif pieces[-1] != 'rpm':
            return BaseExtractor.basename(self)
        pieces.pop()
        if len(pieces) == 1:
            return pieces[0]
        elif len(pieces[-1]) < 6:
            pieces.pop()
        return '.'.join(pieces)

    def check_contents(self):
        CpioExtractor.check_contents(self)
        return BOMB


class DebExtractor(TarExtractor):
    def prepare(self):
        self.pipe(['ar', 'p', self.filename, 'data.tar.gz'],
                  "data.tar.gz extraction")
        self.archive.seek(0, 0)
        self.pipe(['zcat'], "data.tar.gz decompression")

    def basename(self):
        pieces = os.path.basename(self.filename).rsplit('_', 1)
        if len(pieces) == 1:
            return pieces[0]
        elif (len(pieces[-1]) > 10) or (not pieces[-1].endswith('.deb')):
            return BaseExtractor.basename(self)
        return pieces[0]

    def check_contents(self):
        TarExtractor.check_contents(self)
        return BOMB
        

extractor_map = {'application/x-tar': TarExtractor,
                 'application/zip': ZipExtractor,
                 'application/x-msdos-program': ZipExtractor,
                 'application/x-debian-package': DebExtractor,
                 'application/x-redhat-package-manager': RPMExtractor,
                 'application/x-rpm': RPMExtractor,
                 'application/x-cpio': CpioExtractor}

class ExtractorApplication(object):
    actions = ['get_extractor', 'prepare_extraction', 'extract', 'recurse']

    def __init__(self, arguments):
        self.parse_options(arguments)
        self.successes = []
        self.failures = []

    def parse_options(self, arguments):
        parser = optparse.OptionParser(
            usage="%prog [options] archive [archive2 ...]",
            description="Intelligent archive extractor",
            version=VERSION_BANNER
            )
        parser.add_option('-r', '--recursive', dest='recursive',
                          action='store_true', default=False,
                          help='extract archives contained in the ones listed')
        self.options, filenames = parser.parse_args(arguments)
        if not filenames:
            parser.error("you did not list any archives")
        self.archives = {os.path.realpath(os.curdir): filenames}

    def show_error(self, message):
        print >>sys.stderr, "%s: %s" % (self.current_filename, message)

    def get_extractor(self):
        mimetype, encoding = mimetypes.guess_type(self.current_filename)
        try:
            handler = extractor_map[mimetype]
        except KeyError:
            self.show_error("not a known archive type")
            return False
        try:
            self.current_extractor = handler(self.current_filename, mimetype,
                                             encoding)
        except ExtractorError, error:
            self.show_error(error)
            return False
        return True

    def prepare_target_directory(self):
        basename = self.current_extractor.basename()
        for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]:
            directory = '%s%s' % (basename, suffix)
            try:
                os.mkdir(directory)
            except OSError, error:
                if error.errno == errno.EEXIST:
                    continue
                self.show_error("could not create extraction directory %s: %s" %
                                (error.filename, error.strerror))
                return None
            if suffix != '':
                self.show_error("extracted to %s" % (directory,))
            break
        else:
            self.show_error("all good names for an extraction directory taken")
        return directory

    def move_to_directory(self, filename, target):
        if not os.path.isdir(filename):
            filename = os.path.split(filename)[0]
            target = os.path.join(target, filename)
        os.rename(filename, target)

    def prepare_extraction(self):
        self.current_path = '.'
        contents = self.current_extractor.check_contents()
        if contents == MATCHING_DIRECTORY:
            self.target_directory = self.current_filename
        elif contents != EMPTY:
            self.target_directory = self.prepare_target_directory()
            if self.target_directory is None:
                return False
            if contents == BOMB:
                os.chdir(self.target_directory)
                self.current_path = '..'
            else:
                self.cleanup_actions.append((self.move_to_directory, contents,
                                             self.target_directory))
        else:
            self.target_directory = None
        return True

    def extract(self):
        try:
            self.current_extractor.extract(self.current_path)
        except ExtractorError, error:
            self.show_error(error)
            return False
        return True

    def recurse(self):
        if not self.options.recursive:
            return True
        for filename in self.current_extractor.included_archives:
            tail_path, basename = os.path.split(filename)
            directory = os.path.join(self.current_directory,
                                     self.target_directory, tail_path)
            self.archives.setdefault(directory, []).append(basename)
        return True

    def fix_perms(self):
        if self.target_directory is None:
            return True
        status = subprocess.call(['chmod', '-R', 'u+rw',
                                  os.path.join(self.current_directory,
                                               self.target_directory)])
        if status == 0:
            status = subprocess.call(['find',
                                      os.path.join(self.current_directory,
                                                   self.target_directory),
                                      '-type', 'd',
                                      '-exec', 'chmod', 'u+x', '{}', ';'])
        return status == 0

    def run(self):
        while self.archives:
            self.current_directory, filenames = self.archives.popitem()
            for filename in filenames:
                os.chdir(self.current_directory)
                running = True
                self.current_filename = filename
                self.cleanup_actions = []
                actions = [getattr(self, name) for name in self.actions]
                while running and actions:
                    running = actions.pop(0)()
                for action in self.cleanup_actions:
                    action[0](*action[1:])
                running = self.fix_perms()
                if running:
                    self.successes.append(self.current_filename)
                else:
                    self.failures.append(self.current_filename)
        if self.failures:
            return 1
        return 0


if __name__ == '__main__':
    app = ExtractorApplication(sys.argv[1:])
    sys.exit(app.run())

mercurial