diff -r 10840258891a -r a86a0cb0dd57 scripts/x --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/x Sun Oct 29 19:34:46 2006 -0500 @@ -0,0 +1,236 @@ +#!/usr/bin/env python +# +# x -- Intelligently extract various archive types. +# Copyright (c) 2006 Brett Smith . +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, 5th Floor, Boston, MA, 02111. + +import mimetypes +import os +import subprocess +import sys +import tempfile + +from cStringIO import StringIO + +mimetypes.encodings_map.setdefault('.bz2', 'bzip2') + +MATCHING_DIRECTORY = 1 +ONE_DIRECTORY = 2 +BOMB = 3 +EMPTY = 4 + +class ExtractorError(Exception): + pass + + +class ProcessStreamer(object): + def __init__(self, command, stdin, description="checking contents", + stderr=None): + self.process = subprocess.Popen(command, bufsize=1, stdin=stdin, + stdout=subprocess.PIPE, stderr=stderr) + self.command = ' '.join(command) + self.description = description + + def __iter__(self): + return self + + def next(self): + line = self.process.stdout.readline() + if line: + return line.rstrip('\n') + else: + raise StopIteration + + def stop(self): + while self.process.stdout.readline(): + pass + self.process.stdout.close() + status = self.process.wait() + if status != 0: + raise ExtractorError("%s error: '%s' returned status code %s" % + (self.description, self.command, status)) + try: + self.process.stderr.close() + except AttributeError: + pass + + +class BaseExtractor(object): + decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'} + + def __init__(self, filename, mimetype, encoding): + self.filename = filename + self.mimetype = mimetype + self.encoding = encoding + self.archive = open(filename, 'r') + if encoding: + self.pipe([self.decoders[encoding]], "decoding") + self.prepare() + + def run(self, command, description="extraction", stdout=None, stderr=None, + stdin=None, string_output=True): + process = subprocess.Popen(command, stdin=stdin, stdout=stdout, + stderr=stderr) + status = process.wait() + if status != 0: + raise ExtractorError("%s error: '%s' returned status code %s" % + (description, ' '.join(command), status)) + try: + process.stderr.close() + except AttributeError: + pass + output = process.stdout + if string_output: + try: + output = output.read(-1) + process.stdout.close() + except AttributeError: + pass + return output + + def pipe(self, command, description, stderr=None): + output = tempfile.TemporaryFile() + self.run(command, description, output, stderr, self.archive, False) + self.archive.close() + self.archive = output + self.archive.flush() + + def prepare(self): + pass + + def check_contents(self): + self.archive.seek(0, 0) + filenames = self.get_filenames() + try: + first_part = filenames.next().split('/', 1)[0] + '/' + except IndexError: + filenames.stop() + return EMPTY + for filename in filenames: + if not filename.startswith(first_part): + filenames.stop() + return BOMB + filenames.stop() + if self.basename() == first_part[:-1]: + return MATCHING_DIRECTORY + return ONE_DIRECTORY + + def basename(self): + pieces = self.filename.split('.') + if mimetypes.encodings_map.has_key('.' + pieces.pop()): + pieces.pop() + return '.'.join(pieces) + + def extract(self, path): + self.archive.seek(0, 0) + self.extract_archive() + + +class TarExtractor(BaseExtractor): + def get_filenames(self): + return ProcessStreamer(['tar', '-t'], self.archive) + + def extract_archive(self): + self.run(['tar', '-x'], stdin=self.archive) + + +class ZipExtractor(BaseExtractor): + def __init__(self, filename, mimetype, encoding): + self.filename = filename + self.mimetype = mimetype + self.encoding = encoding + self.archive = StringIO() + + def get_filenames(self): + return ProcessStreamer(['zipinfo', '-1', self.filename], None) + + def extract(self, path): + self.run(['unzip', '-q', os.path.join(path, self.filename)]) + + +class CpioExtractor(BaseExtractor): + def get_filenames(self): + return ProcessStreamer(['cpio', '-t'], self.archive, + stderr=subprocess.PIPE) + + def extract_archive(self): + self.run(['cpio', '-i', '--make-directories', + '--no-absolute-filenames'], + stderr=subprocess.PIPE, stdin=self.archive) + + +class RPMExtractor(CpioExtractor): + def prepare(self): + self.pipe(['rpm2cpio', '-'], "rpm2cpio") + + def basename(self): + return self.filename.rsplit('.', 2)[0] + + def check_contents(self): + return BOMB + + +class DebExtractor(TarExtractor): + def prepare(self): + self.pipe(['ar', 'p', self.filename, 'data.tar.gz'], + "data.tar.gz extraction") + self.archive.seek(0, 0) + self.pipe(['zcat'], "data.tar.gz decompression") + + def basename(self): + return self.filename.rsplit('_', 1)[0] + + def check_contents(self): + return BOMB + + +extractor_map = {'application/x-tar': TarExtractor, + 'application/zip': ZipExtractor, + 'application/x-debian-package': DebExtractor, + 'application/x-redhat-package-manager': RPMExtractor, + 'application/x-shar': None, + 'application/x-cpio': CpioExtractor} + +def show_error(filename, message): + print >>sys.stderr, "%s: %s" % (filename, message) + +def main(arguments): + for filename in arguments: + mimetype, encoding = mimetypes.guess_type(filename) + try: + handler = extractor_map[mimetype] + except KeyError: + show_error(filename, "doesn't look like an archive") + continue + extractor = handler(filename, mimetype, encoding) + contents = extractor.check_contents() + path = '.' + if contents == BOMB: + directory = extractor.basename() + try: + os.mkdir(directory) + except OSError, error: + show_error(filename, "could not create %s: %s" % + (error.filename, error.strerror)) + continue + os.chdir(directory) + path = '..' + extractor.extract(path) + if contents == BOMB: + os.chdir('..') + +if __name__ == '__main__': + main(sys.argv[1:])