Tue, 15 Jul 2008 22:18:03 -0400
Give extraction stderr more context, and suppress normal cpio stderr junk.
cpio will write a useless "N blocks" message to stderr without --quiet, so
use that.
When we show extraction's stderr to the user, first write a line explaining
what it is, and also don't forget to strip the trailing newline, since
.warning() writes its own.
#!/usr/bin/env python # # dtrx -- Intelligently extract various archive types. # Copyright (c) 2006, 2007, 2008 Brett Smith <brettcsmith@brettcsmith.org>. # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 3 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, see <http://www.gnu.org/licenses/>. # Python 2.3 string methods: 'rfind', 'rindex', 'rjust', 'rstrip' import errno import glob import logging import mimetypes import optparse import os import re import shutil import signal import stat import subprocess import sys import tempfile import textwrap import traceback from sets import Set as set VERSION = "6.1" VERSION_BANNER = """dtrx version %s Copyright (c) 2006, 2007, 2008 Brett Smith <brettcsmith@brettcsmith.org> This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.""" % (VERSION,) MATCHING_DIRECTORY = 1 ONE_ENTRY_KNOWN = 2 BOMB = 3 EMPTY = 4 ONE_ENTRY_FILE = 'file' ONE_ENTRY_DIRECTORY = 'directory' ONE_ENTRY_UNKNOWN = [ONE_ENTRY_FILE, ONE_ENTRY_DIRECTORY] EXTRACT_HERE = 1 EXTRACT_WRAP = 2 EXTRACT_RENAME = 3 RECURSE_ALWAYS = 1 RECURSE_ONCE = 2 RECURSE_NOT_NOW = 3 RECURSE_NEVER = 4 RECURSE_LIST = 5 mimetypes.encodings_map.setdefault('.bz2', 'bzip2') mimetypes.encodings_map.setdefault('.lzma', 'lzma') mimetypes.types_map.setdefault('.gem', 'application/x-ruby-gem') logger = logging.getLogger('dtrx-log') class FilenameChecker(object): free_func = os.open free_args = (os.O_CREAT | os.O_EXCL,) free_close = os.close def __init__(self, original_name): self.original_name = original_name def is_free(self, filename): try: result = self.free_func(filename, *self.free_args) except OSError, error: if error.errno == errno.EEXIST: return False raise if self.free_close: self.free_close(result) return True def create(self): fd, filename = tempfile.mkstemp(prefix=self.original_name + '.', dir='.') os.close(fd) return filename def check(self): for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]: filename = '%s%s' % (self.original_name, suffix) if self.is_free(filename): return filename return self.create() class DirectoryChecker(FilenameChecker): free_func = os.mkdir free_args = () free_close = None def create(self): return tempfile.mkdtemp(prefix=self.original_name + '.', dir='.') class ExtractorError(Exception): pass class ExtractorUnusable(Exception): pass EXTRACTION_ERRORS = (ExtractorError, ExtractorUnusable, OSError, IOError) class BaseExtractor(object): decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat', 'lzma': 'lzcat'} name_checker = DirectoryChecker warning_header = "The extraction process output the following errors:\n" def __init__(self, filename, encoding): if encoding and (not self.decoders.has_key(encoding)): raise ValueError("unrecognized encoding %s" % (encoding,)) self.filename = os.path.realpath(filename) self.encoding = encoding self.file_count = 0 self.included_archives = [] self.target = None self.content_type = None self.content_name = None self.pipes = [] self.stderr = tempfile.TemporaryFile() self.exit_codes = [] try: self.archive = open(filename, 'r') except (IOError, OSError), error: raise ExtractorError("could not open %s: %s" % (filename, error.strerror)) if encoding: self.pipe([self.decoders[encoding]], "decoding") self.prepare() def pipe(self, command, description="extraction"): self.pipes.append((command, description)) def first_bad_exit_code(self): for index, code in enumerate(self.exit_codes): if code != 0: return index return None def run_pipes(self, final_stdout=None): if not self.pipes: return elif final_stdout is None: # FIXME: Buffering this might be dumb. final_stdout = tempfile.TemporaryFile() num_pipes = len(self.pipes) last_pipe = num_pipes - 1 processes = [] for index, command in enumerate([pipe[0] for pipe in self.pipes]): if index == 0: stdin = self.archive else: stdin = processes[-1].stdout if index == last_pipe: stdout = final_stdout else: stdout = subprocess.PIPE try: processes.append(subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=self.stderr)) except OSError, error: if error.errno == errno.ENOENT: raise ExtractorUnusable("could not run %s" % (command[0],)) raise self.exit_codes = [pipe.wait() for pipe in processes] self.archive.close() for index in range(last_pipe): processes[index].stdout.close() self.archive = final_stdout def prepare(self): pass def check_included_archives(self): if (self.content_name is None) or (not self.content_name.endswith('/')): self.included_root = './' else: self.included_root = self.content_name start_index = len(self.included_root) for path, dirname, filenames in os.walk(self.included_root): self.file_count += len(filenames) path = path[start_index:] for filename in filenames: if (ExtractorBuilder.try_by_mimetype(filename) or ExtractorBuilder.try_by_extension(filename)): self.included_archives.append(os.path.join(path, filename)) def check_contents(self): self.contents = os.listdir('.') if not self.contents: self.content_type = EMPTY elif len(self.contents) == 1: if self.basename() == self.contents[0]: self.content_type = MATCHING_DIRECTORY elif os.path.isdir(self.contents[0]): self.content_type = ONE_ENTRY_DIRECTORY else: self.content_type = ONE_ENTRY_FILE self.content_name = self.contents[0] if os.path.isdir(self.contents[0]): self.content_name += '/' else: self.content_type = BOMB self.check_included_archives() def basename(self): pieces = os.path.basename(self.filename).split('.') extension = '.' + pieces[-1] if mimetypes.encodings_map.has_key(extension): pieces.pop() extension = '.' + pieces[-1] if (mimetypes.types_map.has_key(extension) or mimetypes.common_types.has_key(extension) or mimetypes.suffix_map.has_key(extension)): pieces.pop() return '.'.join(pieces) def check_success(self, got_output): self.stderr.seek(0, 0) if self.stderr.read(1): self.stderr.seek(0, 0) logger.warning(self.warning_header + self.stderr.read(-1).rstrip('\n')) self.stderr.close() error_index = self.first_bad_exit_code() if (not got_output) and (error_index is not None): command = ' '.join(self.pipes[error_index][0]) raise ExtractorError("%s error: '%s' returned status code %s" % (self.pipes[error_index][1], command, self.exit_codes[error_index])) def extract(self): try: self.target = tempfile.mkdtemp(prefix='.dtrx-', dir='.') except (OSError, IOError), error: raise ExtractorError("cannot extract here: %s" % (error.strerror,)) old_path = os.path.realpath(os.curdir) os.chdir(self.target) try: self.archive.seek(0, 0) self.extract_archive() self.check_contents() self.check_success(self.content_type != EMPTY) except EXTRACTION_ERRORS: self.archive.close() os.chdir(old_path) shutil.rmtree(self.target, ignore_errors=True) raise self.archive.close() os.chdir(old_path) def get_filenames(self): self.run_pipes() self.archive.seek(0, 0) while True: line = self.archive.readline() if not line: self.archive.close() return yield line.rstrip('\n') class CompressionExtractor(BaseExtractor): file_type = 'compressed file' name_checker = FilenameChecker def basename(self): pieces = os.path.basename(self.filename).split('.') extension = '.' + pieces[-1] if mimetypes.encodings_map.has_key(extension): pieces.pop() return '.'.join(pieces) def get_filenames(self): yield self.basename() def extract(self): self.content_type = ONE_ENTRY_KNOWN self.content_name = self.basename() self.contents = None self.included_root = './' try: output_fd, self.target = tempfile.mkstemp(prefix='.dtrx-', dir='.') except (OSError, IOError), error: raise ExtractorError("cannot extract here: %s" % (error.strerror,)) self.run_pipes(output_fd) os.close(output_fd) try: self.check_success(os.stat(self.target)[stat.ST_SIZE] > 0) except EXTRACTION_ERRORS: os.unlink(self.target) raise class TarExtractor(BaseExtractor): file_type = 'tar file' def get_filenames(self): self.pipe(['tar', '-t'], "listing") return BaseExtractor.get_filenames(self) def extract_archive(self): self.pipe(['tar', '-x']) self.run_pipes() class CpioExtractor(BaseExtractor): file_type = 'cpio file' def get_filenames(self): self.pipe(['cpio', '-t'], "listing") return BaseExtractor.get_filenames(self) def extract_archive(self): self.pipe(['cpio', '-i', '--make-directories', '--quiet', '--no-absolute-filenames']) self.run_pipes() class RPMExtractor(CpioExtractor): file_type = 'RPM' def prepare(self): self.pipe(['rpm2cpio', '-'], "rpm2cpio") def basename(self): pieces = os.path.basename(self.filename).split('.') if len(pieces) == 1: return pieces[0] elif pieces[-1] != 'rpm': return BaseExtractor.basename(self) pieces.pop() if len(pieces) == 1: return pieces[0] elif len(pieces[-1]) < 8: pieces.pop() return '.'.join(pieces) def check_contents(self): self.check_included_archives() self.content_type = BOMB class DebExtractor(TarExtractor): file_type = 'Debian package' def prepare(self): self.pipe(['ar', 'p', self.filename, 'data.tar.gz'], "data.tar.gz extraction") self.pipe(['zcat'], "data.tar.gz decompression") def basename(self): pieces = os.path.basename(self.filename).split('_') if len(pieces) == 1: return pieces[0] last_piece = pieces.pop() if (len(last_piece) > 10) or (not last_piece.endswith('.deb')): return BaseExtractor.basename(self) return '_'.join(pieces) def check_contents(self): self.check_included_archives() self.content_type = BOMB class DebMetadataExtractor(DebExtractor): def prepare(self): self.pipe(['ar', 'p', self.filename, 'control.tar.gz'], "control.tar.gz extraction") self.pipe(['zcat'], "control.tar.gz decompression") class GemExtractor(TarExtractor): file_type = 'Ruby gem' def prepare(self): self.pipe(['tar', '-xO', 'data.tar.gz'], "data.tar.gz extraction") self.pipe(['zcat'], "data.tar.gz decompression") def check_contents(self): self.check_included_archives() self.content_type = BOMB class GemMetadataExtractor(CompressionExtractor): file_type = 'Ruby gem' def prepare(self): self.pipe(['tar', '-xO', 'metadata.gz'], "metadata.gz extraction") self.pipe(['zcat'], "metadata.gz decompression") def basename(self): return os.path.basename(self.filename) + '-metadata.txt' class NoPipeExtractor(BaseExtractor): # Some extraction tools won't accept the archive from stdin. With # these, the piping infrastructure we normally set up generally doesn't # work, at least at first. We can still use most of it; we just don't # want to seed self.archive with the archive file, since that sucks up # memory. So instead we seed it with /dev/null, and specify the # filename on the command line as necessary. We also open the actual # file with os.open, to make sure we can actually do it (permissions # are good, etc.). This class doesn't do anything by itself; it's just # meant to be a base class for extractors that rely on these dumb # tools. def __init__(self, filename, encoding): os.close(os.open(filename, os.O_RDONLY)) BaseExtractor.__init__(self, '/dev/null', None) self.filename = os.path.realpath(filename) class ZipExtractor(NoPipeExtractor): file_type = 'Zip file' def get_filenames(self): self.pipe(['zipinfo', '-1', self.filename], "listing") return BaseExtractor.get_filenames(self) def extract_archive(self): self.pipe(['unzip', '-q', self.filename]) self.run_pipes() class SevenExtractor(NoPipeExtractor): file_type = '7z file' border_re = re.compile('^[- ]+$') def get_filenames(self): self.pipe(['7z', 'l', self.filename], "listing") self.run_pipes() self.archive.seek(0, 0) fn_index = None for line in self.archive: if self.border_re.match(line): if fn_index is not None: break else: fn_index = line.rindex(' ') + 1 elif fn_index is not None: yield line[fn_index:-1] self.archive.close() def extract_archive(self): self.pipe(['7z', 'x', self.filename]) self.run_pipes() class CABExtractor(NoPipeExtractor): file_type = 'CAB archive' border_re = re.compile(r'^[-\+]+$') def get_filenames(self): self.pipe(['cabextract', '-l', self.filename], "listing") self.run_pipes() self.archive.seek(0, 0) fn_index = None for line in self.archive: if self.border_re.match(line): break for line in self.archive: try: yield line.split(' | ', 2)[2].rstrip('\n') except IndexError: break self.archive.close() def extract_archive(self): self.pipe(['cabextract', '-q', self.filename]) self.run_pipes() class ShieldExtractor(NoPipeExtractor): file_type = 'InstallShield archive' prefix_re = re.compile(r'^\s+\d+\s+') end_re = re.compile(r'^\s+-+\s+-+\s*$') def get_filenames(self): self.pipe(['unshield', 'l', self.filename], "listing") self.run_pipes() self.archive.seek(0, 0) for line in self.archive: if self.end_re.match(line): break else: match = self.prefix_re.match(line) if match: yield line[match.end():].rstrip('\n') self.archive.close() def extract_archive(self): self.pipe(['unshield', 'x', self.filename]) self.run_pipes() def basename(self): result = NoPipeExtractor.basename(self) if result.endswith('.hdr'): result = result[:-4] return result class BaseHandler(object): def __init__(self, extractor, options): self.extractor = extractor self.options = options self.target = None def handle(self): command = 'find' status = subprocess.call(['find', self.extractor.target, '-type', 'd', '-exec', 'chmod', 'u+rwx', '{}', ';']) if status == 0: command = 'chmod' status = subprocess.call(['chmod', '-R', 'u+rwX', self.extractor.target]) if status != 0: return "%s returned with exit status %s" % (command, status) return self.organize() def set_target(self, target, checker): self.target = checker(target).check() if self.target != target: logger.warning("extracting %s to %s" % (self.extractor.filename, self.target)) # The "where to extract" table, with options and archive types. # This dictates the contents of each can_handle method. # # Flat Overwrite None # File basename basename FilenameChecked # Match . . tempdir + checked # Bomb . basename DirectoryChecked class FlatHandler(BaseHandler): def can_handle(contents, options): return ((options.flat and (contents != ONE_ENTRY_KNOWN)) or (options.overwrite and (contents == MATCHING_DIRECTORY))) can_handle = staticmethod(can_handle) def organize(self): self.target = '.' for curdir, dirs, filenames in os.walk(self.extractor.target, topdown=False): path_parts = curdir.split(os.sep) if path_parts[0] == '.': del path_parts[1] else: del path_parts[0] newdir = os.path.join(*path_parts) if not os.path.isdir(newdir): os.makedirs(newdir) for filename in filenames: os.rename(os.path.join(curdir, filename), os.path.join(newdir, filename)) os.rmdir(curdir) class OverwriteHandler(BaseHandler): def can_handle(contents, options): return ((options.flat and (contents == ONE_ENTRY_KNOWN)) or (options.overwrite and (contents != MATCHING_DIRECTORY))) can_handle = staticmethod(can_handle) def organize(self): self.target = self.extractor.basename() if os.path.isdir(self.target): shutil.rmtree(self.target) os.rename(self.extractor.target, self.target) class MatchHandler(BaseHandler): def can_handle(contents, options): return ((contents == MATCHING_DIRECTORY) or ((contents in ONE_ENTRY_UNKNOWN) and options.one_entry_policy.ok_for_match())) can_handle = staticmethod(can_handle) def organize(self): source = os.path.join(self.extractor.target, os.listdir(self.extractor.target)[0]) if os.path.isdir(source): checker = DirectoryChecker else: checker = FilenameChecker if self.options.one_entry_policy == EXTRACT_HERE: destination = self.extractor.content_name.rstrip('/') else: destination = self.extractor.basename() self.set_target(destination, checker) if os.path.isdir(self.extractor.target): os.rename(source, self.target) os.rmdir(self.extractor.target) else: os.rename(self.extractor.target, self.target) self.extractor.included_root = './' class EmptyHandler(object): def can_handle(contents, options): return contents == EMPTY can_handle = staticmethod(can_handle) def __init__(self, extractor, options): pass def handle(self): pass class BombHandler(BaseHandler): def can_handle(contents, options): return True can_handle = staticmethod(can_handle) def organize(self): basename = self.extractor.basename() self.set_target(basename, self.extractor.name_checker) os.rename(self.extractor.target, self.target) class BasePolicy(object): def __init__(self, options): self.current_policy = None if options.batch: self.permanent_policy = self.answers[''] else: self.permanent_policy = None def ask_question(self, question): question = question + self.choices while True: print "\n".join(question) try: answer = raw_input(self.prompt) except EOFError: return self.answers[''] try: return self.answers[answer.lower()] except KeyError: print def __cmp__(self, other): return cmp(self.current_policy, other) class OneEntryPolicy(BasePolicy): answers = {'h': EXTRACT_HERE, 'i': EXTRACT_WRAP, 'r': EXTRACT_RENAME, '': EXTRACT_WRAP} choices = ["You can:", " * extract it Inside another directory", " * extract it and Rename the directory", " * extract it Here"] prompt = "What do you want to do? (I/r/h) " def __init__(self, options): BasePolicy.__init__(self, options) if options.flat: self.permanent_policy = EXTRACT_HERE def prep(self, archive_filename, extractor): question = ["%s contains one %s, but it has a weird name." % (archive_filename, extractor.content_type)] question.append(" Expected: " + extractor.basename()) question.append(" Actual: " + extractor.content_name) self.current_policy = (self.permanent_policy or self.ask_question(question)) def ok_for_match(self): return self.current_policy in (EXTRACT_RENAME, EXTRACT_HERE) class RecursionPolicy(BasePolicy): answers = {'o': RECURSE_ONCE, 'a': RECURSE_ALWAYS, 'n': RECURSE_NOT_NOW, 'v': RECURSE_NEVER, 'l': RECURSE_LIST, '': RECURSE_NOT_NOW} choices = ["You can:", " * Always extract included archives", " * extract included archives this Once", " * choose Not to extract included archives", " * neVer extract included archives", " * List included archives"] prompt = "What do you want to do? (a/o/N/v/l) " def __init__(self, options): BasePolicy.__init__(self, options) if options.show_list: self.permanent_policy = RECURSE_NEVER elif options.recursive: self.permanent_policy = RECURSE_ALWAYS def prep(self, current_filename, target, extractor): archive_count = len(extractor.included_archives) if (self.permanent_policy is not None) or (archive_count == 0): self.current_policy = self.permanent_policy or RECURSE_NOT_NOW return question = (("%s contains %s other archive file(s), " + "out of %s file(s) total.") % (current_filename, archive_count, extractor.file_count)) question = textwrap.wrap(question) if target == '.': target = '' included_root = extractor.included_root if included_root == './': included_root = '' while True: self.current_policy = self.ask_question(question) if self.current_policy != RECURSE_LIST: break print ("\n%s\n" % '\n'.join([os.path.join(target, included_root, filename) for filename in extractor.included_archives])) if self.current_policy in (RECURSE_ALWAYS, RECURSE_NEVER): self.permanent_policy = self.current_policy def ok_to_recurse(self): return self.current_policy in (RECURSE_ALWAYS, RECURSE_ONCE) class ExtractorBuilder(object): extractor_map = {'tar': (TarExtractor, None), 'zip': (ZipExtractor, None), 'deb': (DebExtractor, DebMetadataExtractor), 'rpm': (RPMExtractor, None), 'cpio': (CpioExtractor, None), 'gem': (GemExtractor, GemMetadataExtractor), 'compress': (CompressionExtractor, None), '7z': (SevenExtractor, None), 'cab': (CABExtractor, None), 'shield': (ShieldExtractor, None)} mimetype_map = {} for mapping in (('tar', 'x-tar'), ('zip', 'zip'), ('deb', 'x-debian-package'), ('rpm', 'x-redhat-package-manager', 'x-rpm'), ('cpio', 'x-cpio'), ('gem', 'x-ruby-gem'), ('7z', 'x-7z-compressed'), ('cab', 'x-cab'), ('shield', 'x-cab')): for mimetype in mapping[1:]: if '/' not in mimetype: mimetype = 'application/' + mimetype mimetype_map[mimetype] = mapping[0] magic_mime_map = {} for mapping in (('deb', 'Debian binary package'), ('cpio', 'cpio archive'), ('tar', 'POSIX tar archive'), ('zip', '(Zip|ZIP self-extracting) archive'), ('rpm', 'RPM'), ('7z', '7-zip archive'), ('cab', 'Microsoft Cabinet archive'), ('shield', 'InstallShield CAB')): for pattern in mapping[1:]: magic_mime_map[re.compile(pattern)] = mapping[0] magic_encoding_map = {} for mapping in (('bzip2', 'bzip2 compressed'), ('gzip', 'gzip compressed')): for pattern in mapping[1:]: magic_encoding_map[re.compile(pattern)] = mapping[0] extension_map = {} for mapping in (('tar', 'bzip2', 'tar.bz2'), ('tar', 'gzip', 'tar.gz', 'tgz'), ('tar', None, 'tar'), ('zip', None, 'zip'), ('deb', None, 'deb'), ('rpm', None, 'rpm'), ('cpio', None, 'cpio'), ('gem', None, 'gem'), ('compress', 'gzip', 'Z', 'gz'), ('compress', 'bzip2', 'bz2'), ('compress', 'lzma', 'lzma'), ('7z', None, '7z'), ('cab', None, 'cab'), ('shield', None, 'cab', 'hdr')): for extension in mapping[2:]: extension_map.setdefault(extension, []).append(mapping[:2]) def __init__(self, filename, options): self.filename = filename self.options = options def build_extractor(self, archive_type, encoding): extractors = self.extractor_map[archive_type] if self.options.metadata and (extractors[1] is not None): extractor = extractors[1] else: extractor = extractors[0] return extractor(self.filename, encoding) def get_extractor(self): tried_types = set() # As smart as it is, the magic test can't go first, because at least # on my system it just recognizes gem files as tar files. I guess # it's possible for the opposite problem to occur -- where the mimetype # or extension suggests something less than ideal -- but it seems less # likely so I'm sticking with this. for func_name in ('mimetype', 'extension', 'magic'): logger.debug("getting extractors by %s" % (func_name,)) extractor_types = \ getattr(self, 'try_by_' + func_name)(self.filename) logger.debug("done getting extractors") for ext_args in extractor_types: if ext_args in tried_types: continue tried_types.add(ext_args) logger.debug("trying %s extractor from %s" % (ext_args, func_name)) yield self.build_extractor(*ext_args) def try_by_mimetype(cls, filename): mimetype, encoding = mimetypes.guess_type(filename) try: return [(cls.mimetype_map[mimetype], encoding)] except KeyError: if encoding: return [('compress', encoding)] return [] try_by_mimetype = classmethod(try_by_mimetype) def magic_map_matches(cls, output, magic_map): return [result for regexp, result in magic_map.items() if regexp.search(output)] magic_map_matches = classmethod(magic_map_matches) def try_by_magic(cls, filename): process = subprocess.Popen(['file', '-z', filename], stdout=subprocess.PIPE) status = process.wait() if status != 0: return [] output = process.stdout.readline() process.stdout.close() if output.startswith('%s: ' % filename): output = output[len(filename) + 2:] mimes = cls.magic_map_matches(output, cls.magic_mime_map) encodings = cls.magic_map_matches(output, cls.magic_encoding_map) if mimes and not encodings: encodings = [None] elif encodings and not mimes: mimes = ['compress'] return [(m, e) for m in mimes for e in encodings] try_by_magic = classmethod(try_by_magic) def try_by_extension(cls, filename): parts = filename.split('.')[-2:] results = [] while parts: results.extend(cls.extension_map.get('.'.join(parts), [])) del parts[0] return results try_by_extension = classmethod(try_by_extension) class BaseAction(object): def __init__(self, options, filenames): self.options = options self.filenames = filenames self.target = None def report(self, function, *args): try: error = function(*args) except EXTRACTION_ERRORS, exception: error = str(exception) logger.debug(''.join(traceback.format_exception(*sys.exc_info()))) return error class ExtractionAction(BaseAction): handlers = [FlatHandler, OverwriteHandler, MatchHandler, EmptyHandler, BombHandler] def __init__(self, options, filenames): BaseAction.__init__(self, options, filenames) self.did_print = False def get_handler(self, extractor): if extractor.content_type in ONE_ENTRY_UNKNOWN: self.options.one_entry_policy.prep(self.current_filename, extractor) for handler in self.handlers: if handler.can_handle(extractor.content_type, self.options): logger.debug("using %s handler" % (handler.__name__,)) self.current_handler = handler(extractor, self.options) break def show_extraction(self, extractor): if self.options.log_level > logging.INFO: return elif self.did_print: print else: self.did_print = True print "%s:" % (self.current_filename,) if extractor.contents is None: print self.current_handler.target return def reverser(x, y): return cmp(y, x) if self.current_handler.target == '.': filenames = extractor.contents filenames.sort(reverser) else: filenames = [self.current_handler.target] pathjoin = os.path.join isdir = os.path.isdir while filenames: filename = filenames.pop() if isdir(filename): print "%s/" % (filename,) new_filenames = os.listdir(filename) new_filenames.sort(reverser) filenames.extend([pathjoin(filename, new_filename) for new_filename in new_filenames]) else: print filename def run(self, filename, extractor): self.current_filename = filename error = (self.report(extractor.extract) or self.report(self.get_handler, extractor) or self.report(self.current_handler.handle) or self.report(self.show_extraction, extractor)) if not error: self.target = self.current_handler.target return error class ListAction(BaseAction): def __init__(self, options, filenames): BaseAction.__init__(self, options, filenames) self.count = 0 def get_list(self, extractor): # Note: The reason I'm getting all the filenames up front is # because if we run into trouble partway through the archive, we'll # try another extractor. So before we display anything we have to # be sure this one is successful. We maybe don't have to be quite # this conservative but this is the easy way out for now. self.filelist = list(extractor.get_filenames()) def show_list(self, filename): self.count += 1 if len(self.filenames) != 1: if self.count > 1: print print "%s:" % (filename,) print '\n'.join(self.filelist) def run(self, filename, extractor): return (self.report(self.get_list, extractor) or self.report(self.show_list, filename)) class ExtractorApplication(object): def __init__(self, arguments): for signal_num in (signal.SIGINT, signal.SIGTERM): signal.signal(signal_num, self.abort) self.parse_options(arguments) self.setup_logger() self.successes = [] self.failures = [] def abort(self, signal_num, frame): signal.signal(signal_num, signal.SIG_IGN) print logger.debug("traceback:\n" + ''.join(traceback.format_stack(frame)).rstrip()) logger.debug("got signal %s; cleaning up" % (signal_num,)) clean_targets = set([os.path.realpath('.')]) if hasattr(self, 'current_directory'): clean_targets.add(os.path.realpath(self.current_directory)) for directory in clean_targets: os.chdir(directory) for path in glob.glob('.dtrx-*'): try: os.unlink(path) except OSError, error: if error.errno == errno.EISDIR: shutil.rmtree(path, ignore_errors=True) sys.exit(1) def parse_options(self, arguments): parser = optparse.OptionParser( usage="%prog [options] archive [archive2 ...]", description="Intelligent archive extractor", version=VERSION_BANNER ) parser.add_option('-r', '--recursive', dest='recursive', action='store_true', default=False, help='extract archives contained in the ones listed') parser.add_option('-q', '--quiet', dest='quiet', action='count', default=3, help='suppress warning/error messages') parser.add_option('-v', '--verbose', dest='verbose', action='count', default=0, help='be verbose/print debugging information') parser.add_option('-o', '--overwrite', dest='overwrite', action='store_true', default=False, help='overwrite any existing target directory') parser.add_option('-f', '--flat', '--no-directory', dest='flat', action='store_true', default=False, help="don't put contents in their own directory") parser.add_option('-l', '-t', '--list', '--table', dest='show_list', action='store_true', default=False, help="list contents of archives on standard output") parser.add_option('-n', '--noninteractive', dest='batch', action='store_true', default=False, help="don't ask how to handle special cases") parser.add_option('-m', '--metadata', dest='metadata', action='store_true', default=False, help="extract metadata from a .deb/.gem") self.options, filenames = parser.parse_args(arguments) if not filenames: parser.error("you did not list any archives") # This makes WARNING is the default. self.options.log_level = (10 * (self.options.quiet - self.options.verbose)) self.options.one_entry_policy = OneEntryPolicy(self.options) self.options.recursion_policy = RecursionPolicy(self.options) self.archives = {os.path.realpath(os.curdir): filenames} def setup_logger(self): logging.getLogger().setLevel(self.options.log_level) handler = logging.StreamHandler() handler.setLevel(self.options.log_level) formatter = logging.Formatter("dtrx: %(levelname)s: %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) logger.debug("logger is set up") def recurse(self, filename, extractor, action): self.options.recursion_policy.prep(filename, action.target, extractor) if self.options.recursion_policy.ok_to_recurse(): for filename in extractor.included_archives: logger.debug("recursing with %s archive" % (extractor.content_type,)) tail_path, basename = os.path.split(filename) path_args = [self.current_directory, extractor.included_root, tail_path] logger.debug("included root: %s" % (extractor.included_root,)) logger.debug("tail path: %s" % (tail_path,)) if os.path.isdir(action.target): logger.debug("action target: %s" % (action.target,)) path_args.insert(1, action.target) directory = os.path.join(*path_args) self.archives.setdefault(directory, []).append(basename) def check_file(self, filename): try: result = os.stat(filename) except OSError, error: return error.strerror if stat.S_ISDIR(result.st_mode): return "cannot extract a directory" def try_extractors(self, filename, builder): errors = [] for extractor in builder: error = self.action.run(filename, extractor) if error: errors.append((extractor.file_type, extractor.encoding, error)) else: self.recurse(filename, extractor, self.action) return logger.error("could not handle %s" % (filename,)) if not errors: logger.error("not a known archive type") return True for file_type, encoding, error in errors: message = ["treating as", file_type, "failed:", error] if encoding: message.insert(1, "%s-encoded" % (encoding,)) logger.error(' '.join(message)) return True def run(self): if self.options.show_list: action = ListAction else: action = ExtractionAction self.action = action(self.options, self.archives.values()[0]) while self.archives: self.current_directory, self.filenames = self.archives.popitem() os.chdir(self.current_directory) for filename in self.filenames: builder = ExtractorBuilder(filename, self.options) error = (self.check_file(filename) or self.try_extractors(filename, builder.get_extractor())) if error: if error != True: logger.error("%s: %s" % (filename, error)) self.failures.append(filename) else: self.successes.append(filename) self.options.one_entry_policy.permanent_policy = EXTRACT_WRAP if self.failures: return 1 return 0 if __name__ == '__main__': app = ExtractorApplication(sys.argv[1:]) sys.exit(app.run())