Tue, 02 Jan 2007 20:30:17 -0500
[svn] Add basic documentation, and make this version 3.0.
#!/usr/bin/env python # # x -- Intelligently extract various archive types. # Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>. # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, 5th Floor, Boston, MA, 02111. import errno import logging import mimetypes import optparse import os import stat import subprocess import sys import tempfile from cStringIO import StringIO VERSION = "3.0" VERSION_BANNER = """x version %s Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.""" % (VERSION,) MATCHING_DIRECTORY = 1 # ONE_DIRECTORY = 2 BOMB = 3 EMPTY = 4 COMPRESSED = 5 mimetypes.encodings_map.setdefault('.bz2', 'bzip2') mimetypes.types_map['.exe'] = 'application/x-msdos-program' def run_command(command, description, stdout=None, stderr=None, stdin=None): process = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr) status = process.wait() for pipe in (process.stdout, process.stderr): try: pipe.close() except AttributeError: pass if status != 0: return ("%s error: '%s' returned status code %s" % (description, ' '.join(command), status)) return None class FilenameChecker(object): def __init__(self, original_name): self.original_name = original_name def is_free(self, filename): return not os.path.exists(filename) def check(self): for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]: filename = '%s%s' % (self.original_name, suffix) if self.is_free(filename): return filename raise ValueError("all alternatives for name %s taken" % (self.original_name,)) class DirectoryChecker(FilenameChecker): def is_free(self, filename): try: os.mkdir(filename) except OSError, error: if error.errno == errno.EEXIST: return False raise return True class ExtractorError(Exception): pass class ProcessStreamer(object): def __init__(self, command, stdin, description="checking contents", stderr=None): self.process = subprocess.Popen(command, bufsize=1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) self.command = ' '.join(command) self.description = description def __iter__(self): return self def next(self): line = self.process.stdout.readline() if line: return line.rstrip('\n') else: raise StopIteration def stop(self): while self.process.stdout.readline(): pass self.process.stdout.close() status = self.process.wait() if status != 0: raise ExtractorError("%s error: '%s' returned status code %s" % (self.description, self.command, status)) try: self.process.stderr.close() except AttributeError: pass class BaseExtractor(object): decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'} name_checker = DirectoryChecker def __init__(self, filename, mimetype, encoding): if encoding and (not self.decoders.has_key(encoding)): raise ValueError("unrecognized encoding %s" % (encoding,)) self.filename = os.path.realpath(filename) self.mimetype = mimetype self.encoding = encoding self.included_archives = [] try: self.archive = open(filename, 'r') except (IOError, OSError), error: raise ExtractorError("could not open %s: %s" % (filename, error.strerror)) if encoding: self.pipe([self.decoders[encoding]], "decoding") self.prepare() def run(self, command, description="extraction", stdout=None, stderr=None, stdin=None): error = run_command(command, description, stdout, stderr, stdin) if error: raise ExtractorError(error) def pipe(self, command, description, stderr=None): output = tempfile.TemporaryFile() self.run(command, description, output, stderr, self.archive) self.archive.close() self.archive = output self.archive.flush() def prepare(self): pass def check_contents(self): archive_type = None filenames = self.get_filenames() try: filename = filenames.next() if extractor_map.has_key(mimetypes.guess_type(filename)[0]): self.included_archives.append(filename) first_part = filename.split('/', 1)[0] + '/' except StopIteration: filenames.stop() return EMPTY for filename in filenames: if extractor_map.has_key(mimetypes.guess_type(filename)[0]): self.included_archives.append(filename) if (archive_type is None) and (not filename.startswith(first_part)): archive_type = BOMB filenames.stop() if archive_type: return archive_type if self.basename() == first_part[:-1]: return MATCHING_DIRECTORY return first_part def basename(self): pieces = os.path.basename(self.filename).split('.') extension = '.' + pieces[-1] if mimetypes.encodings_map.has_key(extension): pieces.pop() extension = '.' + pieces[-1] if (mimetypes.types_map.has_key(extension) or mimetypes.common_types.has_key(extension) or mimetypes.suffix_map.has_key(extension)): pieces.pop() return '.'.join(pieces) def extract(self, path): old_path = os.path.realpath(os.curdir) os.chdir(path) self.archive.seek(0, 0) self.extract_archive() os.chdir(old_path) class TarExtractor(BaseExtractor): def get_filenames(self): self.archive.seek(0, 0) return ProcessStreamer(['tar', '-t'], self.archive) def extract_archive(self): self.run(['tar', '-x'], stdin=self.archive) class ZipExtractor(BaseExtractor): def __init__(self, filename, mimetype, encoding): self.filename = os.path.realpath(filename) self.mimetype = mimetype self.encoding = encoding self.included_archives = [] self.archive = StringIO() def get_filenames(self): self.archive.seek(0, 0) return ProcessStreamer(['zipinfo', '-1', self.filename], None) def extract_archive(self): self.run(['unzip', '-q', self.filename]) class CpioExtractor(BaseExtractor): def get_filenames(self): self.archive.seek(0, 0) return ProcessStreamer(['cpio', '-t'], self.archive, stderr=subprocess.PIPE) def extract_archive(self): self.run(['cpio', '-i', '--make-directories', '--no-absolute-filenames'], stderr=subprocess.PIPE, stdin=self.archive) class RPMExtractor(CpioExtractor): def prepare(self): self.pipe(['rpm2cpio', '-'], "rpm2cpio") def basename(self): pieces = os.path.basename(self.filename).split('.') if len(pieces) == 1: return pieces[0] elif pieces[-1] != 'rpm': return BaseExtractor.basename(self) pieces.pop() if len(pieces) == 1: return pieces[0] elif len(pieces[-1]) < 8: pieces.pop() return '.'.join(pieces) def check_contents(self): CpioExtractor.check_contents(self) return BOMB class DebExtractor(TarExtractor): def prepare(self): self.pipe(['ar', 'p', self.filename, 'data.tar.gz'], "data.tar.gz extraction") self.archive.seek(0, 0) self.pipe(['zcat'], "data.tar.gz decompression") def basename(self): pieces = os.path.basename(self.filename).split('_') if len(pieces) == 1: return pieces[0] last_piece = pieces.pop() if (len(last_piece) > 10) or (not last_piece.endswith('.deb')): return BaseExtractor.basename(self) return '_'.join(pieces) def check_contents(self): TarExtractor.check_contents(self) return BOMB class CompressionExtractor(BaseExtractor): name_checker = FilenameChecker def basename(self): pieces = os.path.basename(self.filename).split('.') extension = '.' + pieces[-1] if mimetypes.encodings_map.has_key(extension): pieces.pop() return '.'.join(pieces) def get_filenames(self): yield self.basename() def check_contents(self): return COMPRESSED def extract(self, path): output = open(path, 'w') self.run(['cat'], "output write", stdin=self.archive, stdout=output) output.close() class BaseHandler(object): def __init__(self, extractor, contents, options): self.logger = logging.getLogger('x-log') self.extractor = extractor self.contents = contents self.options = options self.target = None def extract(self): try: self.extractor.extract(self.target) except (ExtractorError, IOError, OSError), error: return str(error) def cleanup(self): if self.target is None: return command = 'find' status = subprocess.call(['find', self.target, '-type', 'd', '-exec', 'chmod', 'u+rwx', '{}', ';']) if status == 0: command = 'chmod' status = subprocess.call(['chmod', '-R', 'u+rw', self.target]) if status != 0: return "%s returned with exit status %s" % (command, status) # The "where to extract" table, with options and archive types. # This dictates the contents of each can_handle method. # # Flat Overwrite None # File basename basename FilenameChecked # Match . . tempdir + checked # Bomb . basename DirectoryChecked class FlatHandler(BaseHandler): def can_handle(contents, options): return ((options.flat and (contents != COMPRESSED)) or (options.overwrite and (contents == MATCHING_DIRECTORY))) can_handle = staticmethod(can_handle) def __init__(self, extractor, contents, options): BaseHandler.__init__(self, extractor, contents, options) self.target = '.' def cleanup(self): for filename in self.extractor.get_filenames(): stat_info = os.stat(filename) perms = stat.S_IRUSR | stat.S_IWUSR if stat.S_ISDIR(stat_info.st_mode): perms |= stat.S_IXUSR os.chmod(filename, stat_info.st_mode | perms) class OverwriteHandler(BaseHandler): def can_handle(contents, options): return ((options.flat and (contents == COMPRESSED)) or (options.overwrite and (contents != MATCHING_DIRECTORY))) can_handle = staticmethod(can_handle) def __init__(self, extractor, contents, options): BaseHandler.__init__(self, extractor, contents, options) self.target = self.extractor.basename() class MatchHandler(BaseHandler): def can_handle(contents, options): return contents == MATCHING_DIRECTORY can_handle = staticmethod(can_handle) def extract(self): basename = self.extractor.basename() self.target = tempfile.mkdtemp(dir='.') result = BaseHandler.extract(self) if result is None: tempdir = self.target checker = self.extractor.name_checker(basename) self.target = checker.check() os.rename(os.path.join(tempdir, basename), self.target) os.rmdir(tempdir) return result class EmptyHandler(object): def can_handle(contents, options): return contents == EMPTY can_handle = staticmethod(can_handle) def __init__(self, extractor, contents, options): pass def extract(self): pass def cleanup(self): pass class BombHandler(BaseHandler): def can_handle(contents, options): return True can_handle = staticmethod(can_handle) def __init__(self, extractor, contents, options): BaseHandler.__init__(self, extractor, contents, options) checker = self.extractor.name_checker(self.extractor.basename()) self.target = checker.check() extractor_map = {'application/x-tar': TarExtractor, 'application/zip': ZipExtractor, 'application/x-msdos-program': ZipExtractor, 'application/x-debian-package': DebExtractor, 'application/x-redhat-package-manager': RPMExtractor, 'application/x-rpm': RPMExtractor, 'application/x-cpio': CpioExtractor} handlers = [FlatHandler, OverwriteHandler, MatchHandler, EmptyHandler, BombHandler] class ExtractorApplication(object): def __init__(self, arguments): self.parse_options(arguments) self.setup_logger() self.successes = [] self.failures = [] def parse_options(self, arguments): parser = optparse.OptionParser( usage="%prog [options] archive [archive2 ...]", description="Intelligent archive extractor", version=VERSION_BANNER ) parser.add_option('-r', '--recursive', dest='recursive', action='store_true', default=False, help='extract archives contained in the ones listed') parser.add_option('-q', '--quiet', dest='quiet', action='count', default=3, help='suppress warning/error messages') parser.add_option('-v', '--verbose', dest='verbose', action='count', default=0, help='be verbose/print debugging information') parser.add_option('-o', '--overwrite', dest='overwrite', action='store_true', default=False, help='overwrite any existing target directory') parser.add_option('-f', '--flat', '--no-directory', dest='flat', action='store_true', default=False, help="don't put contents in their own directory") ## parser.add_option('-n', '--noninteractive', dest='batch', ## action='store_true', default=False, ## help="don't ask how to handle special cases") self.options, filenames = parser.parse_args(arguments) if not filenames: parser.error("you did not list any archives") self.archives = {os.path.realpath(os.curdir): filenames} def setup_logger(self): self.logger = logging.getLogger('x-log') handler = logging.StreamHandler() # WARNING is the default. handler.setLevel(10 * (self.options.quiet - self.options.verbose)) formatter = logging.Formatter("x: %(levelname)s: %(message)s") handler.setFormatter(formatter) self.logger.addHandler(handler) def get_extractor(self): mimetype, encoding = mimetypes.guess_type(self.current_filename) try: extractor = extractor_map[mimetype] except KeyError: if encoding: extractor = CompressionExtractor contents = COMPRESSED else: return "not a known archive type" try: self.current_extractor = extractor(self.current_filename, mimetype, encoding) content = self.current_extractor.check_contents() for handler in handlers: if handler.can_handle(content, self.options): self.current_handler = handler(self.current_extractor, content, self.options) break except ExtractorError, error: return str(error) def recurse(self): if not self.options.recursive: return for filename in self.current_extractor.included_archives: tail_path, basename = os.path.split(filename) directory = os.path.join(self.current_directory, self.current_handler.target, tail_path) self.archives.setdefault(directory, []).append(basename) def report(self, function, *args): try: error = function(*args) except (ExtractorError, IOError, OSError), exception: error = str(exception) if error: self.logger.error("%s: %s", self.current_filename, error) return False return True def run(self): while self.archives: self.current_directory, filenames = self.archives.popitem() for filename in filenames: os.chdir(self.current_directory) self.current_filename = filename success = self.report(self.get_extractor) if success: for name in 'extract', 'cleanup': success = (self.report(getattr(self.current_handler, name)) and success) self.recurse() if success: self.successes.append(self.current_filename) else: self.failures.append(self.current_filename) if self.failures: return 1 return 0 if __name__ == '__main__': app = ExtractorApplication(sys.argv[1:]) sys.exit(app.run())