Thu, 22 Nov 2007 22:37:40 -0500
[svn] Add support for LZMA compression. Holy crap that was easy.
#!/usr/bin/env python # # dtrx -- Intelligently extract various archive types. # Copyright (c) 2006, 2007 Brett Smith <brettcsmith@brettcsmith.org>. # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 3 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, 5th Floor, Boston, MA, 02111. import errno import logging import mimetypes import optparse import os import re import stat import subprocess import sys import tempfile import textwrap import traceback VERSION = "5.0" VERSION_BANNER = """dtrx version %s Copyright (c) 2006, 2007 Brett Smith <brettcsmith@brettcsmith.org> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.""" % (VERSION,) MATCHING_DIRECTORY = 1 ONE_ENTRY = 2 BOMB = 3 EMPTY = 4 ONE_ENTRY_KNOWN = 5 EXTRACT_HERE = 1 EXTRACT_WRAP = 2 EXTRACT_RENAME = 3 RECURSE_ALWAYS = 1 RECURSE_ONCE = 2 RECURSE_NOT_NOW = 3 RECURSE_NEVER = 4 mimetypes.encodings_map.setdefault('.bz2', 'bzip2') mimetypes.encodings_map.setdefault('.lzma', 'lzma') mimetypes.types_map.setdefault('.gem', 'x-ruby-gem') logger = logging.getLogger('dtrx-log') def run_command(command, description, stdout=None, stderr=None, stdin=None): process = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr) status = process.wait() for pipe in (process.stdout, process.stderr): try: pipe.close() except AttributeError: pass if status != 0: return ("%s error: '%s' returned status code %s" % (description, ' '.join(command), status)) return None class FilenameChecker(object): def __init__(self, original_name): self.original_name = original_name def is_free(self, filename): return not os.path.exists(filename) def check(self): for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]: filename = '%s%s' % (self.original_name, suffix) if self.is_free(filename): return filename raise ValueError("all alternatives for name %s taken" % (self.original_name,)) class DirectoryChecker(FilenameChecker): def is_free(self, filename): try: os.mkdir(filename) except OSError, error: if error.errno == errno.EEXIST: return False raise return True class ExtractorError(Exception): pass class BaseExtractor(object): decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat', 'lzma': 'lzcat'} name_checker = DirectoryChecker def __init__(self, filename, encoding): if encoding and (not self.decoders.has_key(encoding)): raise ValueError("unrecognized encoding %s" % (encoding,)) self.filename = os.path.realpath(filename) self.encoding = encoding self.included_archives = [] self.target = None self.content_type = None self.content_name = None self.pipes = [] try: self.archive = open(filename, 'r') except (IOError, OSError), error: raise ExtractorError("could not open %s: %s" % (filename, error.strerror)) if encoding: self.pipe([self.decoders[encoding]], "decoding") self.prepare() def pipe(self, command, description="extraction"): self.pipes.append((command, description)) def run_pipes(self, final_stdout=None): if final_stdout is None: # FIXME: Buffering this might be dumb. final_stdout = tempfile.TemporaryFile() if not self.pipes: return num_pipes = len(self.pipes) last_pipe = num_pipes - 1 processes = [] for index, command in enumerate([pipe[0] for pipe in self.pipes]): if index == 0: stdin = self.archive else: stdin = processes[-1].stdout if index == last_pipe: stdout = final_stdout else: stdout = subprocess.PIPE processes.append(subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=subprocess.PIPE)) exit_codes = [pipe.wait() for pipe in processes] self.archive.close() for index in range(last_pipe): processes[index].stdout.close() processes[index].stderr.close() for index, status in enumerate(exit_codes): if status != 0: raise ExtractorError("%s error: '%s' returned status code %s" % (self.pipes[index][1], ' '.join(self.pipes[index][0]), status)) self.archive = final_stdout def prepare(self): pass def check_included_archives(self, filenames): for filename in filenames: if (ExtractorBuilder.try_by_mimetype(filename)[0] or ExtractorBuilder.try_by_extension(filename)[0]): self.included_archives.append(filename) def check_contents(self): filenames = os.listdir('.') if not filenames: self.content_type = EMPTY elif len(filenames) == 1: if self.basename() == filenames[0]: self.content_type = MATCHING_DIRECTORY else: self.content_type = ONE_ENTRY self.content_name = filenames[0] if os.path.isdir(filenames[0]): self.content_name += '/' else: self.content_type = BOMB self.check_included_archives(filenames) def basename(self): pieces = os.path.basename(self.filename).split('.') extension = '.' + pieces[-1] if mimetypes.encodings_map.has_key(extension): pieces.pop() extension = '.' + pieces[-1] if (mimetypes.types_map.has_key(extension) or mimetypes.common_types.has_key(extension) or mimetypes.suffix_map.has_key(extension)): pieces.pop() return '.'.join(pieces) def extract(self): self.target = tempfile.mkdtemp(prefix='.dtrx-', dir='.') old_path = os.path.realpath(os.curdir) os.chdir(self.target) try: self.archive.seek(0, 0) self.extract_archive() self.check_contents() except ExtractorError: os.chdir(old_path) subprocess.call(['rm', '-rf', self.target]) raise os.chdir(old_path) def get_filenames(self): self.run_pipes() self.archive.seek(0, 0) while True: line = self.archive.readline() if not line: self.archive.close() return yield line.rstrip('\n') class CompressionExtractor(BaseExtractor): name_checker = FilenameChecker def basename(self): pieces = os.path.basename(self.filename).split('.') extension = '.' + pieces[-1] if mimetypes.encodings_map.has_key(extension): pieces.pop() return '.'.join(pieces) def get_filenames(self): yield self.basename() def extract(self): self.content_type = ONE_ENTRY_KNOWN self.content_name = self.basename() output_fd, self.target = tempfile.mkstemp(prefix='.dtrx-', dir='.') self.run_pipes(output_fd) os.close(output_fd) class TarExtractor(BaseExtractor): def get_filenames(self): self.pipe(['tar', '-t'], "listing") return BaseExtractor.get_filenames(self) def extract_archive(self): self.pipe(['tar', '-x']) self.run_pipes() class ZipExtractor(BaseExtractor): def __init__(self, filename, encoding): BaseExtractor.__init__(self, '/dev/null', None) self.filename = os.path.realpath(filename) def get_filenames(self): self.pipe(['zipinfo', '-1', self.filename], "listing") return BaseExtractor.get_filenames(self) def extract_archive(self): self.pipe(['unzip', '-q', self.filename]) self.run_pipes() class CpioExtractor(BaseExtractor): def get_filenames(self): self.pipe(['cpio', '-t'], "listing") return BaseExtractor.get_filenames(self) def extract_archive(self): self.pipe(['cpio', '-i', '--make-directories', '--no-absolute-filenames']) self.run_pipes() class RPMExtractor(CpioExtractor): def prepare(self): self.pipe(['rpm2cpio', '-'], "rpm2cpio") def basename(self): pieces = os.path.basename(self.filename).split('.') if len(pieces) == 1: return pieces[0] elif pieces[-1] != 'rpm': return BaseExtractor.basename(self) pieces.pop() if len(pieces) == 1: return pieces[0] elif len(pieces[-1]) < 8: pieces.pop() return '.'.join(pieces) def check_contents(self): self.check_included_archives(os.listdir('.')) self.content_type = BOMB class DebExtractor(TarExtractor): def prepare(self): self.pipe(['ar', 'p', self.filename, 'data.tar.gz'], "data.tar.gz extraction") self.pipe(['zcat'], "data.tar.gz decompression") def basename(self): pieces = os.path.basename(self.filename).split('_') if len(pieces) == 1: return pieces[0] last_piece = pieces.pop() if (len(last_piece) > 10) or (not last_piece.endswith('.deb')): return BaseExtractor.basename(self) return '_'.join(pieces) def check_contents(self): self.check_included_archives(os.listdir('.')) self.content_type = BOMB class DebMetadataExtractor(DebExtractor): def prepare(self): self.pipe(['ar', 'p', self.filename, 'control.tar.gz'], "control.tar.gz extraction") self.pipe(['zcat'], "control.tar.gz decompression") class GemExtractor(TarExtractor): def prepare(self): self.pipe(['tar', '-xO', 'data.tar.gz'], "data.tar.gz extraction") self.pipe(['zcat'], "data.tar.gz decompression") def check_contents(self): self.check_included_archives(os.listdir('.')) self.content_type = BOMB class GemMetadataExtractor(CompressionExtractor): def prepare(self): self.pipe(['tar', '-xO', 'metadata.gz'], "metadata.gz extraction") self.pipe(['zcat'], "metadata.gz decompression") def basename(self): return os.path.basename(self.filename) + '-metadata.txt' class SevenExtractor(BaseExtractor): border_re = re.compile('^[- ]+$') def __init__(self, filename, encoding): BaseExtractor.__init__(self, '/dev/null', None) self.filename = os.path.realpath(filename) def get_filenames(self): self.pipe(['7z', 'l', self.filename], "listing") self.run_pipes() self.archive.seek(0, 0) fn_index = None for line in self.archive: if self.border_re.match(line): if fn_index is not None: break else: fn_index = line.rindex(' ') + 1 elif fn_index is not None: yield line[fn_index:-1] self.archive.close() def extract_archive(self): self.pipe(['7z', 'x', self.filename]) self.run_pipes() class BaseHandler(object): def __init__(self, extractor, options): self.extractor = extractor self.options = options self.target = None def handle(self): command = 'find' status = subprocess.call(['find', self.extractor.target, '-type', 'd', '-exec', 'chmod', 'u+rwx', '{}', ';']) if status == 0: command = 'chmod' status = subprocess.call(['chmod', '-R', 'u+rwX', self.extractor.target]) if status != 0: return "%s returned with exit status %s" % (command, status) return self.organize() # The "where to extract" table, with options and archive types. # This dictates the contents of each can_handle method. # # Flat Overwrite None # File basename basename FilenameChecked # Match . . tempdir + checked # Bomb . basename DirectoryChecked class FlatHandler(BaseHandler): def can_handle(contents, options): return ((options.flat and (contents != ONE_ENTRY_KNOWN)) or (options.overwrite and (contents == MATCHING_DIRECTORY))) can_handle = staticmethod(can_handle) def organize(self): self.target = '.' for curdir, dirs, filenames in os.walk(self.extractor.target, topdown=False): path_parts = curdir.split(os.sep) if path_parts[0] == '.': del path_parts[1] else: del path_parts[0] newdir = os.path.join(*path_parts) if not os.path.isdir(newdir): os.makedirs(newdir) for filename in filenames: os.rename(os.path.join(curdir, filename), os.path.join(newdir, filename)) os.rmdir(curdir) class OverwriteHandler(BaseHandler): def can_handle(contents, options): return ((options.flat and (contents == ONE_ENTRY_KNOWN)) or (options.overwrite and (contents != MATCHING_DIRECTORY))) can_handle = staticmethod(can_handle) def organize(self): self.target = self.extractor.basename() result = run_command(['rm', '-rf', self.target], "removing %s to overwrite" % (self.target,)) if result is None: os.rename(self.extractor.target, self.target) return result class MatchHandler(BaseHandler): def can_handle(contents, options): return ((contents == MATCHING_DIRECTORY) or ((contents == ONE_ENTRY) and options.one_entry_policy.ok_for_match())) can_handle = staticmethod(can_handle) def organize(self): if self.options.one_entry_policy == EXTRACT_HERE: destination = self.extractor.content_name.rstrip('/') else: destination = self.extractor.basename() self.target = self.extractor.name_checker(destination).check() if os.path.isdir(self.extractor.target): os.rename(os.path.join(self.extractor.target, os.listdir(self.extractor.target)[0]), self.target) os.rmdir(self.extractor.target) else: os.rename(self.extractor.target, self.target) class EmptyHandler(object): def can_handle(contents, options): return contents == EMPTY can_handle = staticmethod(can_handle) def __init__(self, extractor, options): pass def handle(self): pass class BombHandler(BaseHandler): def can_handle(contents, options): return True can_handle = staticmethod(can_handle) def organize(self): basename = self.extractor.basename() self.target = self.extractor.name_checker(basename).check() os.rename(self.extractor.target, self.target) class BasePolicy(object): def __init__(self, options): self.current_policy = None if options.batch: self.permanent_policy = self.answers[''] else: self.permanent_policy = None def ask_question(self, question): question = textwrap.wrap(question) + self.choices while True: print "\n".join(question) try: answer = raw_input(self.prompt) except EOFError: return self.answers[''] try: return self.answers[answer.lower()] except KeyError: print def __cmp__(self, other): return cmp(self.current_policy, other) class OneEntryPolicy(BasePolicy): answers = {'h': EXTRACT_HERE, 'i': EXTRACT_WRAP, 'r': EXTRACT_RENAME, '': EXTRACT_WRAP} choices = ["You can:", " * extract it Inside another directory", " * extract it and Rename the directory", " * extract it Here"] prompt = "What do you want to do? (I/r/h) " def prep(self, archive_filename, entry_name): question = ("%s contains one entry: %s." % (archive_filename, entry_name)) self.current_policy = (self.permanent_policy or self.ask_question(question)) def ok_for_match(self): return self.current_policy in (EXTRACT_RENAME, EXTRACT_HERE) class RecursionPolicy(BasePolicy): answers = {'o': RECURSE_ONCE, 'a': RECURSE_ALWAYS, 'n': RECURSE_NOT_NOW, 'v': RECURSE_NEVER, '': RECURSE_NOT_NOW} choices = ["You can:", " * Always extract included archives", " * extract included archives this Once", " * choose Not to extract included archives", " * neVer extract included archives"] prompt = "What do you want to do? (a/o/N/v) " def __init__(self, options): BasePolicy.__init__(self, options) if options.show_list: self.permanent_policy = RECURSE_NEVER elif options.recursive: self.permanent_policy = RECURSE_ALWAYS def prep(self, current_filename, included_archives): archive_count = len(included_archives) if (self.permanent_policy is not None) or (archive_count == 0): self.current_policy = self.permanent_policy or RECURSE_NOT_NOW return elif archive_count > 1: question = ("%s contains %s other archive files." % (current_filename, archive_count)) else: question = ("%s contains another archive: %s." % (current_filename, included_archives[0])) self.current_policy = self.ask_question(question) if self.current_policy in (RECURSE_ALWAYS, RECURSE_NEVER): self.permanent_policy = self.current_policy def ok_to_recurse(self): return self.current_policy in (RECURSE_ALWAYS, RECURSE_ONCE) class ExtractorBuilder(object): extractor_map = {'tar': (TarExtractor, None), 'zip': (ZipExtractor, None), 'deb': (DebExtractor, DebMetadataExtractor), 'rpm': (RPMExtractor, None), 'cpio': (CpioExtractor, None), 'gem': (GemExtractor, GemMetadataExtractor), 'compress': (CompressionExtractor, None), '7z': (SevenExtractor, None)} mimetype_map = {} for mapping in (('tar', 'x-tar'), ('zip', 'x-msdos-program', 'zip'), ('deb', 'x-debian-package'), ('rpm', 'x-redhat-package-manager', 'x-rpm'), ('cpio', 'x-cpio'), ('gem', 'x-ruby-gem'), ('7z', 'x-7z-compressed')): for mimetype in mapping[1:]: if '/' not in mimetype: mimetype = 'application/' + mimetype mimetype_map[mimetype] = mapping[0] magic_mime_map = {} for mapping in (('deb', 'Debian binary package'), ('cpio', 'cpio archive'), ('tar', 'POSIX tar archive'), ('zip', 'Zip archive'), ('rpm', 'RPM'), ('7z', '7-zip archive')): for pattern in mapping[1:]: magic_mime_map[re.compile(pattern)] = mapping[0] magic_encoding_map = {} for mapping in (('bzip2', 'bzip2 compressed'), ('gzip', 'gzip compressed')): for pattern in mapping[1:]: magic_encoding_map[re.compile(pattern)] = mapping[0] extension_map = {} for mapping in (('tar', 'bzip2', 'tar.bz2'), ('tar', 'gzip', 'tar.gz', 'tgz'), ('tar', None, 'tar'), ('zip', None, 'zip', 'exe'), ('deb', None, 'deb'), ('rpm', None, 'rpm'), ('cpio', None, 'cpio'), ('gem', None, 'gem'), ('compress', None, 'Z', 'gz', 'bz2', 'lzma'), ('7z', None, '7z')): for extension in mapping[2:]: extension_map[extension] = mapping[:2] def __init__(self, filename, options): self.filename = filename self.options = options def build_extractor(self, archive_type, encoding): extractors = self.extractor_map[archive_type] if self.options.metadata and (extractors[1] is not None): extractor = extractors[1] else: extractor = extractors[0] return extractor(self.filename, encoding) def get_extractor(self): for func_name in ('mimetype', 'extension', 'magic'): archive_type, encoding = \ getattr(self, 'try_by_' + func_name)(self.filename) logger.debug("%s extractor is %s, %s" % (func_name, archive_type, encoding)) if archive_type is not None: yield self.build_extractor(archive_type, encoding) def try_by_mimetype(cls, filename): mimetype, encoding = mimetypes.guess_type(filename) try: return cls.mimetype_map[mimetype], encoding except KeyError: if encoding: return 'compress', encoding return None, None try_by_mimetype = classmethod(try_by_mimetype) def try_by_magic(cls, filename): process = subprocess.Popen(['file', '-z', filename], stdout=subprocess.PIPE) status = process.wait() if status != 0: return None, None output = process.stdout.readline() process.stdout.close() if output.startswith('%s: ' % filename): output = output[len(filename) + 2:] results = [None, None] for index, mapping in enumerate((cls.magic_mime_map, cls.magic_encoding_map)): for regexp, result in mapping.items(): if regexp.search(output): results[index] = result break return results try_by_magic = classmethod(try_by_magic) def try_by_extension(cls, filename): parts = filename.rsplit('.', 2)[1:] while parts: try: return cls.extension_map['.'.join(parts)] except KeyError: del parts[0] return [None, None] try_by_extension = classmethod(try_by_extension) class BaseAction(object): def __init__(self, options, filenames): self.options = options self.filenames = filenames self.target = None def report(self, function, *args): try: error = function(*args) except (ExtractorError, IOError, OSError), exception: error = str(exception) logger.debug(''.join(traceback.format_exception(*sys.exc_info()))) if error: logger.info("%s: %s", self.current_filename, error) return False return True class ExtractionAction(BaseAction): handlers = [FlatHandler, OverwriteHandler, MatchHandler, EmptyHandler, BombHandler] def get_handler(self, extractor): if extractor.content_type == ONE_ENTRY: self.options.one_entry_policy.prep(self.current_filename, extractor.content_name) for handler in self.handlers: if handler.can_handle(extractor.content_type, self.options): self.current_handler = handler(extractor, self.options) break def run(self, filename, extractor): self.current_filename = filename success = (self.report(extractor.extract) and self.report(self.get_handler, extractor) and self.report(self.current_handler.handle)) if success: self.target = self.current_handler.target return success class ListAction(BaseAction): def __init__(self, options, filenames): BaseAction.__init__(self, options, filenames) def get_list(self, extractor): # Note: The reason I'm getting all the filenames up front is # because if we run into trouble partway through the archive, we'll # try another extractor. So before we display anything we have to # be sure this one is successful. We maybe don't have to be quite # this conservative but this is the easy way out for now. self.filelist = list(extractor.get_filenames()) def show_list(self, filename): if len(self.filenames) != 1: if filename != self.filenames[0]: print print "%s:" % (filename,) print '\n'.join(self.filelist) def run(self, filename, extractor): self.current_filename = filename return (self.report(self.get_list, extractor) and self.report(self.show_list, filename)) class ExtractorApplication(object): def __init__(self, arguments): self.parse_options(arguments) self.setup_logger() self.successes = [] self.failures = [] def parse_options(self, arguments): parser = optparse.OptionParser( usage="%prog [options] archive [archive2 ...]", description="Intelligent archive extractor", version=VERSION_BANNER ) parser.add_option('-r', '--recursive', dest='recursive', action='store_true', default=False, help='extract archives contained in the ones listed') parser.add_option('-q', '--quiet', dest='quiet', action='count', default=3, help='suppress warning/error messages') parser.add_option('-v', '--verbose', dest='verbose', action='count', default=0, help='be verbose/print debugging information') parser.add_option('-o', '--overwrite', dest='overwrite', action='store_true', default=False, help='overwrite any existing target directory') parser.add_option('-f', '--flat', '--no-directory', dest='flat', action='store_true', default=False, help="don't put contents in their own directory") parser.add_option('-l', '-t', '--list', '--table', dest='show_list', action='store_true', default=False, help="list contents of archives on standard output") parser.add_option('-n', '--noninteractive', dest='batch', action='store_true', default=False, help="don't ask how to handle special cases") parser.add_option('-m', '--metadata', dest='metadata', action='store_true', default=False, help="extract metadata from a .deb/.gem/etc.") self.options, filenames = parser.parse_args(arguments) if not filenames: parser.error("you did not list any archives") self.options.one_entry_policy = OneEntryPolicy(self.options) self.options.recursion_policy = RecursionPolicy(self.options) self.archives = {os.path.realpath(os.curdir): filenames} def setup_logger(self): # WARNING is the default. log_level = (10 * (self.options.quiet - self.options.verbose)) logging.getLogger().setLevel(log_level) handler = logging.StreamHandler() handler.setLevel(log_level) formatter = logging.Formatter("dtrx: %(levelname)s: %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) logger.debug("logger is set up") def recurse(self, filename, extractor, action): archives = extractor.included_archives self.options.recursion_policy.prep(filename, archives) if self.options.recursion_policy.ok_to_recurse(): for filename in archives: tail_path, basename = os.path.split(filename) directory = os.path.join(self.current_directory, action.target, tail_path) self.archives.setdefault(directory, []).append(basename) def run(self): if self.options.show_list: action = ListAction else: action = ExtractionAction action = action(self.options, self.archives.values()[0]) while self.archives: self.current_directory, self.filenames = self.archives.popitem() os.chdir(self.current_directory) for filename in self.filenames: builder = ExtractorBuilder(filename, self.options) for extractor in builder.get_extractor(): if action.run(filename, extractor): self.successes.append(filename) self.recurse(filename, extractor, action) break else: logger.error("%s: could not find a way to extract this" % (filename,)) self.failures.append(filename) self.options.one_entry_policy.permanent_policy = EXTRACT_WRAP if self.failures: return 1 return 0 if __name__ == '__main__': app = ExtractorApplication(sys.argv[1:]) sys.exit(app.run())