Mon, 13 Nov 2006 23:06:30 -0500
[svn] Make ExtractorApplication suck less. Now the strategies for handling
different archive types are out in their own classes, and polymorphism
takes care of everything for us. This is way cleaner.
While I was at it I changed the behavior in the case where an archive
contains one directory that doesn't match the basename. I now treat that
the same as a bomb. This can lead to silly directory structures but
ensures that there's no "data" loss nor unexpected results.
#!/usr/bin/env python # # x -- Intelligently extract various archive types. # Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>. # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, 5th Floor, Boston, MA, 02111. import errno import mimetypes import optparse import os import subprocess import sys import tempfile from cStringIO import StringIO VERSION = "1.1" VERSION_BANNER = """x version %s Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.""" % (VERSION,) MATCHING_DIRECTORY = 1 # ONE_DIRECTORY = 2 BOMB = 3 EMPTY = 4 mimetypes.encodings_map.setdefault('.bz2', 'bzip2') mimetypes.types_map['.exe'] = 'application/x-msdos-program' class ExtractorError(Exception): pass class ProcessStreamer(object): def __init__(self, command, stdin, description="checking contents", stderr=None): self.process = subprocess.Popen(command, bufsize=1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) self.command = ' '.join(command) self.description = description def __iter__(self): return self def next(self): line = self.process.stdout.readline() if line: return line.rstrip('\n') else: raise StopIteration def stop(self): while self.process.stdout.readline(): pass self.process.stdout.close() status = self.process.wait() if status != 0: raise ExtractorError("%s error: '%s' returned status code %s" % (self.description, self.command, status)) try: self.process.stderr.close() except AttributeError: pass class BaseExtractor(object): decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'} def __init__(self, filename, mimetype, encoding): if encoding and (not self.decoders.has_key(encoding)): raise ValueError("unrecognized encoding %s" % (encoding,)) self.filename = filename self.mimetype = mimetype self.encoding = encoding self.included_archives = [] try: self.archive = open(filename, 'r') except (IOError, OSError), error: raise ExtractorError("could not open %s: %s" % (filename, error.strerror)) if encoding: self.pipe([self.decoders[encoding]], "decoding") self.prepare() def run(self, command, description="extraction", stdout=None, stderr=None, stdin=None): process = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr) status = process.wait() if status != 0: raise ExtractorError("%s error: '%s' returned status code %s" % (description, ' '.join(command), status)) for pipe in (process.stdout, process.stderr): try: pipe.close() except AttributeError: pass def pipe(self, command, description, stderr=None): output = tempfile.TemporaryFile() self.run(command, description, output, stderr, self.archive) self.archive.close() self.archive = output self.archive.flush() def prepare(self): pass def check_contents(self): self.archive.seek(0, 0) archive_type = None filenames = self.get_filenames() try: filename = filenames.next() if extractor_map.has_key(mimetypes.guess_type(filename)[0]): self.included_archives.append(filename) first_part = filename.split('/', 1)[0] + '/' except StopIteration: filenames.stop() return EMPTY for filename in filenames: if extractor_map.has_key(mimetypes.guess_type(filename)[0]): self.included_archives.append(filename) if (archive_type is None) and (not filename.startswith(first_part)): archive_type = BOMB filenames.stop() if archive_type: return archive_type if self.basename() == first_part[:-1]: return MATCHING_DIRECTORY return first_part def basename(self): pieces = os.path.basename(self.filename).split('.') extension = '.' + pieces[-1] if mimetypes.encodings_map.has_key(extension): pieces.pop() extension = '.' + pieces[-1] if (mimetypes.types_map.has_key(extension) or mimetypes.common_types.has_key(extension) or mimetypes.suffix_map.has_key(extension)): pieces.pop() return '.'.join(pieces) def extract(self, path): self.archive.seek(0, 0) self.extract_archive() class TarExtractor(BaseExtractor): def get_filenames(self): return ProcessStreamer(['tar', '-t'], self.archive) def extract_archive(self): self.run(['tar', '-x'], stdin=self.archive) class ZipExtractor(BaseExtractor): def __init__(self, filename, mimetype, encoding): self.filename = filename self.mimetype = mimetype self.encoding = encoding self.included_archives = [] self.archive = StringIO() def get_filenames(self): return ProcessStreamer(['zipinfo', '-1', self.filename], None) def extract(self, path): self.run(['unzip', '-q', os.path.join(path, self.filename)]) class CpioExtractor(BaseExtractor): def get_filenames(self): return ProcessStreamer(['cpio', '-t'], self.archive, stderr=subprocess.PIPE) def extract_archive(self): self.run(['cpio', '-i', '--make-directories', '--no-absolute-filenames'], stderr=subprocess.PIPE, stdin=self.archive) class RPMExtractor(CpioExtractor): def prepare(self): self.pipe(['rpm2cpio', '-'], "rpm2cpio") def basename(self): pieces = os.path.basename(self.filename).rsplit('.', 2) if len(pieces) == 1: return pieces[0] elif pieces[-1] != 'rpm': return BaseExtractor.basename(self) pieces.pop() if len(pieces) == 1: return pieces[0] elif len(pieces[-1]) < 6: pieces.pop() return '.'.join(pieces) def check_contents(self): CpioExtractor.check_contents(self) return BOMB class DebExtractor(TarExtractor): def prepare(self): self.pipe(['ar', 'p', self.filename, 'data.tar.gz'], "data.tar.gz extraction") self.archive.seek(0, 0) self.pipe(['zcat'], "data.tar.gz decompression") def basename(self): pieces = os.path.basename(self.filename).rsplit('_', 1) if len(pieces) == 1: return pieces[0] elif (len(pieces[-1]) > 10) or (not pieces[-1].endswith('.deb')): return BaseExtractor.basename(self) return pieces[0] def check_contents(self): TarExtractor.check_contents(self) return BOMB class MatchHandler(object): def __init__(self, extractor, contents): self.extractor = extractor self.contents = contents self.directory = extractor.basename() def extract(self, directory='.'): try: self.extractor.extract(directory) except ExtractorError, error: return error.strerror def cleanup(self): command = 'chmod' status = subprocess.call(['chmod', '-R', 'u+rw', self.directory]) if status == 0: command = 'find' status = subprocess.call(['find', self.directory, '-type', 'd', '-exec', 'chmod', 'u+x', '{}', ';']) if status != 0: return "%s returned with exit status %s" % (command, status) class BombHandler(MatchHandler): def __init__(self, extractor, contents): MatchHandler.__init__(self, extractor, contents) basename = self.directory for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]: self.directory = '%s%s' % (basename, suffix) try: os.mkdir(self.directory) except OSError, error: if error.errno == errno.EEXIST: continue raise ValueError("could not make extraction directory %s: %s" % (error.filename, error.strerror)) ## if suffix != '': ## self.show_error("extracted to %s" % (directory,)) break else: raise ValueError("all good names for an extraction directory taken") def extract(self): os.chdir(self.directory) return MatchHandler.extract(self, '..') def cleanup(self): os.chdir('..') return MatchHandler.cleanup(self) class EmptyHandler(object): def __init__(self, extractor, contents): pass def extract(self): pass def cleanup(self): pass extractor_map = {'application/x-tar': TarExtractor, 'application/zip': ZipExtractor, 'application/x-msdos-program': ZipExtractor, 'application/x-debian-package': DebExtractor, 'application/x-redhat-package-manager': RPMExtractor, 'application/x-rpm': RPMExtractor, 'application/x-cpio': CpioExtractor} handler_map = {EMPTY: EmptyHandler, MATCHING_DIRECTORY: MatchHandler} class ExtractorApplication(object): def __init__(self, arguments): self.parse_options(arguments) self.successes = [] self.failures = [] def parse_options(self, arguments): parser = optparse.OptionParser( usage="%prog [options] archive [archive2 ...]", description="Intelligent archive extractor", version=VERSION_BANNER ) parser.add_option('-r', '--recursive', dest='recursive', action='store_true', default=False, help='extract archives contained in the ones listed') self.options, filenames = parser.parse_args(arguments) if not filenames: parser.error("you did not list any archives") self.archives = {os.path.realpath(os.curdir): filenames} def show_error(self, message): print >>sys.stderr, "%s: %s" % (self.current_filename, message) def get_extractor(self): mimetype, encoding = mimetypes.guess_type(self.current_filename) try: extractor = extractor_map[mimetype] except KeyError: return "not a known archive type" try: self.current_extractor = extractor(self.current_filename, mimetype, encoding) content = self.current_extractor.check_contents() handler = handler_map.get(content, BombHandler) self.current_handler = handler(self.current_extractor, content) except ExtractorError, error: return str(error) def recurse(self): if not self.options.recursive: return archive_path = os.path.split(self.current_filename)[0] for filename in self.current_extractor.included_archives: tail_path, basename = os.path.split(filename) directory = os.path.join(self.current_directory, archive_path, self.current_handler.directory, tail_path) self.archives.setdefault(directory, []).append(basename) def report(self, function, *args): error = function(*args) if error: self.show_error(error) return False return True def run(self): while self.archives: self.current_directory, filenames = self.archives.popitem() for filename in filenames: os.chdir(self.current_directory) self.current_filename = filename self.cleanup_actions = [] success = self.report(self.get_extractor) if success: for name in 'extract', 'cleanup': success = (self.report(getattr(self.current_handler, name)) and success) self.recurse() if success: self.successes.append(self.current_filename) else: self.failures.append(self.current_filename) if self.failures: return 1 return 0 if __name__ == '__main__': app = ExtractorApplication(sys.argv[1:]) sys.exit(app.run())