scripts/x

Sun, 29 Oct 2006 19:34:46 -0500

author
brett
date
Sun, 29 Oct 2006 19:34:46 -0500
branch
trunk
changeset 1
a86a0cb0dd57
child 2
1570351bf863
permissions
-rwxr-xr-x

[svn] Repository reorganization to make tags easy

#!/usr/bin/env python
#
# x -- Intelligently extract various archive types.
# Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
# Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, 5th Floor, Boston, MA, 02111.

import mimetypes
import os
import subprocess
import sys
import tempfile

from cStringIO import StringIO

mimetypes.encodings_map.setdefault('.bz2', 'bzip2')

MATCHING_DIRECTORY = 1
ONE_DIRECTORY = 2
BOMB = 3
EMPTY = 4

class ExtractorError(Exception):
    pass


class ProcessStreamer(object):
    def __init__(self, command, stdin, description="checking contents",
                 stderr=None):
        self.process = subprocess.Popen(command, bufsize=1, stdin=stdin,
                                        stdout=subprocess.PIPE, stderr=stderr)
        self.command = ' '.join(command)
        self.description = description

    def __iter__(self):
        return self

    def next(self):
        line = self.process.stdout.readline()
        if line:
            return line.rstrip('\n')
        else:
            raise StopIteration

    def stop(self):
        while self.process.stdout.readline():
            pass
        self.process.stdout.close()
        status = self.process.wait()
        if status != 0:
            raise ExtractorError("%s error: '%s' returned status code %s" %
                                 (self.description, self.command, status))
        try:
            self.process.stderr.close()
        except AttributeError:
            pass
    

class BaseExtractor(object):
    decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'}

    def __init__(self, filename, mimetype, encoding):
        self.filename = filename
        self.mimetype = mimetype
        self.encoding = encoding
        self.archive = open(filename, 'r')
        if encoding:
            self.pipe([self.decoders[encoding]], "decoding")
        self.prepare()

    def run(self, command, description="extraction", stdout=None, stderr=None,
            stdin=None, string_output=True):
        process = subprocess.Popen(command, stdin=stdin, stdout=stdout,
                                   stderr=stderr)
        status = process.wait()
        if status != 0:
            raise ExtractorError("%s error: '%s' returned status code %s" %
                                 (description, ' '.join(command), status))
        try:
            process.stderr.close()
        except AttributeError:
            pass
        output = process.stdout
        if string_output:
            try:
                output = output.read(-1)
                process.stdout.close()
            except AttributeError:
                pass
        return output

    def pipe(self, command, description, stderr=None):
        output = tempfile.TemporaryFile()
        self.run(command, description, output, stderr, self.archive, False)
        self.archive.close()
        self.archive = output
        self.archive.flush()
    
    def prepare(self):
        pass

    def check_contents(self):
        self.archive.seek(0, 0)
        filenames = self.get_filenames()
        try:
            first_part = filenames.next().split('/', 1)[0] + '/'
        except IndexError:
            filenames.stop()
            return EMPTY
        for filename in filenames:
            if not filename.startswith(first_part):
                filenames.stop()
                return BOMB
        filenames.stop()
        if self.basename() == first_part[:-1]:
            return MATCHING_DIRECTORY
        return ONE_DIRECTORY

    def basename(self):
        pieces = self.filename.split('.')
        if mimetypes.encodings_map.has_key('.' + pieces.pop()):
            pieces.pop()
        return '.'.join(pieces)

    def extract(self, path):
        self.archive.seek(0, 0)
        self.extract_archive()
    

class TarExtractor(BaseExtractor):
    def get_filenames(self):
        return ProcessStreamer(['tar', '-t'], self.archive)

    def extract_archive(self):
        self.run(['tar', '-x'], stdin=self.archive)
        
        
class ZipExtractor(BaseExtractor):
    def __init__(self, filename, mimetype, encoding):
        self.filename = filename
        self.mimetype = mimetype
        self.encoding = encoding
        self.archive = StringIO()

    def get_filenames(self):
        return ProcessStreamer(['zipinfo', '-1', self.filename], None)

    def extract(self, path):
        self.run(['unzip', '-q', os.path.join(path, self.filename)])


class CpioExtractor(BaseExtractor):
    def get_filenames(self):
        return ProcessStreamer(['cpio', '-t'], self.archive,
                               stderr=subprocess.PIPE)

    def extract_archive(self):
        self.run(['cpio', '-i', '--make-directories',
                  '--no-absolute-filenames'],
                 stderr=subprocess.PIPE, stdin=self.archive)


class RPMExtractor(CpioExtractor):
    def prepare(self):
        self.pipe(['rpm2cpio', '-'], "rpm2cpio")

    def basename(self):
        return self.filename.rsplit('.', 2)[0]

    def check_contents(self):
        return BOMB
        

class DebExtractor(TarExtractor):
    def prepare(self):
        self.pipe(['ar', 'p', self.filename, 'data.tar.gz'],
                  "data.tar.gz extraction")
        self.archive.seek(0, 0)
        self.pipe(['zcat'], "data.tar.gz decompression")

    def basename(self):
        return self.filename.rsplit('_', 1)[0]

    def check_contents(self):
        return BOMB
        

extractor_map = {'application/x-tar': TarExtractor,
                 'application/zip': ZipExtractor,
                 'application/x-debian-package': DebExtractor,
                 'application/x-redhat-package-manager': RPMExtractor,
                 'application/x-shar': None,
                 'application/x-cpio': CpioExtractor}

def show_error(filename, message):
    print >>sys.stderr, "%s: %s" % (filename, message)

def main(arguments):
    for filename in arguments:
        mimetype, encoding = mimetypes.guess_type(filename)
        try:
            handler = extractor_map[mimetype]
        except KeyError:
            show_error(filename, "doesn't look like an archive")
            continue
        extractor = handler(filename, mimetype, encoding)
        contents = extractor.check_contents()
        path = '.'
        if contents == BOMB:
            directory = extractor.basename()
            try:
                os.mkdir(directory)
            except OSError, error:
                show_error(filename, "could not create %s: %s" %
                           (error.filename, error.strerror))
                continue
            os.chdir(directory)
            path = '..'
        extractor.extract(path)
        if contents == BOMB:
            os.chdir('..')

if __name__ == '__main__':
    main(sys.argv[1:])

mercurial