Wed, 01 Nov 2006 22:03:46 -0500
[svn] Additions.
#!/usr/bin/env python # # x -- Intelligently extract various archive types. # Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>. # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, 5th Floor, Boston, MA, 02111. import mimetypes import os import subprocess import sys import tempfile from cStringIO import StringIO mimetypes.encodings_map.setdefault('.bz2', 'bzip2') MATCHING_DIRECTORY = 1 ONE_DIRECTORY = 2 BOMB = 3 EMPTY = 4 class ExtractorError(Exception): pass class ProcessStreamer(object): def __init__(self, command, stdin, description="checking contents", stderr=None): self.process = subprocess.Popen(command, bufsize=1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) self.command = ' '.join(command) self.description = description def __iter__(self): return self def next(self): line = self.process.stdout.readline() if line: return line.rstrip('\n') else: raise StopIteration def stop(self): while self.process.stdout.readline(): pass self.process.stdout.close() status = self.process.wait() if status != 0: raise ExtractorError("%s error: '%s' returned status code %s" % (self.description, self.command, status)) try: self.process.stderr.close() except AttributeError: pass class BaseExtractor(object): decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'} def __init__(self, filename, mimetype, encoding): self.filename = filename self.mimetype = mimetype self.encoding = encoding self.archive = open(filename, 'r') if encoding: self.pipe([self.decoders[encoding]], "decoding") self.prepare() def run(self, command, description="extraction", stdout=None, stderr=None, stdin=None, string_output=True): process = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr) status = process.wait() if status != 0: raise ExtractorError("%s error: '%s' returned status code %s" % (description, ' '.join(command), status)) try: process.stderr.close() except AttributeError: pass output = process.stdout if string_output: try: output = output.read(-1) process.stdout.close() except AttributeError: pass return output def pipe(self, command, description, stderr=None): output = tempfile.TemporaryFile() self.run(command, description, output, stderr, self.archive, False) self.archive.close() self.archive = output self.archive.flush() def prepare(self): pass def check_contents(self): self.archive.seek(0, 0) filenames = self.get_filenames() try: first_part = filenames.next().split('/', 1)[0] + '/' except StopIteration: filenames.stop() return EMPTY for filename in filenames: if not filename.startswith(first_part): filenames.stop() return BOMB filenames.stop() if self.basename() == first_part[:-1]: return MATCHING_DIRECTORY return ONE_DIRECTORY def basename(self): pieces = self.filename.split('.') extension = '.' + pieces[-1] if mimetypes.encodings_map.has_key(extension): pieces.pop() extension = '.' + pieces[-1] if (mimetypes.types_map.has_key(extension) or mimetypes.common_types.has_key(extension) or mimetypes.suffix_map.has_key(extension)): pieces.pop() return '.'.join(pieces) def extract(self, path): self.archive.seek(0, 0) self.extract_archive() class TarExtractor(BaseExtractor): def get_filenames(self): return ProcessStreamer(['tar', '-t'], self.archive) def extract_archive(self): self.run(['tar', '-x'], stdin=self.archive) class ZipExtractor(BaseExtractor): def __init__(self, filename, mimetype, encoding): self.filename = filename self.mimetype = mimetype self.encoding = encoding self.archive = StringIO() def get_filenames(self): return ProcessStreamer(['zipinfo', '-1', self.filename], None) def extract(self, path): self.run(['unzip', '-q', os.path.join(path, self.filename)]) class CpioExtractor(BaseExtractor): def get_filenames(self): return ProcessStreamer(['cpio', '-t'], self.archive, stderr=subprocess.PIPE) def extract_archive(self): self.run(['cpio', '-i', '--make-directories', '--no-absolute-filenames'], stderr=subprocess.PIPE, stdin=self.archive) class RPMExtractor(CpioExtractor): def prepare(self): self.pipe(['rpm2cpio', '-'], "rpm2cpio") def basename(self): pieces = self.filename.rsplit('.', 2) if len(pieces) == 1: return pieces[0] elif pieces[-1] != 'rpm': return BaseExtractor.basename(self) pieces.pop() if len(pieces) == 1: return pieces[0] elif len(pieces[-1]) < 6: pieces.pop() return '.'.join(pieces) def check_contents(self): return BOMB class DebExtractor(TarExtractor): def prepare(self): self.pipe(['ar', 'p', self.filename, 'data.tar.gz'], "data.tar.gz extraction") self.archive.seek(0, 0) self.pipe(['zcat'], "data.tar.gz decompression") def basename(self): pieces = self.filename.rsplit('_', 1) if len(pieces) == 1: return pieces[0] elif (len(pieces[-1]) > 10) or (not pieces[-1].endswith('.deb')): return BaseExtractor.basename(self) return pieces[0] def check_contents(self): return BOMB extractor_map = {'application/x-tar': TarExtractor, 'application/zip': ZipExtractor, 'application/x-msdos-program': ZipExtractor, 'application/x-debian-package': DebExtractor, 'application/x-redhat-package-manager': RPMExtractor, 'application/x-shar': None, 'application/x-cpio': CpioExtractor} def show_error(filename, message): print >>sys.stderr, "%s: %s" % (filename, message) def main(arguments): for filename in arguments: mimetype, encoding = mimetypes.guess_type(filename) try: handler = extractor_map[mimetype] except KeyError: show_error(filename, "doesn't look like an archive") continue extractor = handler(filename, mimetype, encoding) contents = extractor.check_contents() path = '.' if contents == BOMB: directory = extractor.basename() try: os.mkdir(directory) except OSError, error: show_error(filename, "could not create %s: %s" % (error.filename, error.strerror)) continue os.chdir(directory) path = '..' extractor.extract(path) if contents == BOMB: os.chdir('..') if __name__ == '__main__': main(sys.argv[1:])