# HG changeset patch # User brett # Date 1171143824 18000 # Node ID bb6e9f4af1a54a768b9f3f8f16e13bf61edd66c0 # Parent 1600807a32bdf612d4d1b9c436d178695ee969dc [svn] Rename the program to dtrx. Add a -l/-t option, which just lists the contents of an archive without extracting anything. diff -r 1600807a32bd -r bb6e9f4af1a5 README --- a/README Tue Jan 02 20:30:17 2007 -0500 +++ b/README Sat Feb 10 16:43:44 2007 -0500 @@ -1,38 +1,38 @@ -x - Intelligent archive extraction -================================== +dtrx - Intelligent archive extraction +===================================== Introduction ------------ -x extracts archives in a number of different formats; it currently supports -tar, zip, cpio, rpm, and deb. It can also decompress files compressed with -gzip, bzip2, or compress. +dtrx extracts archives in a number of different formats; it currently +supports tar, zip, cpio, rpm, and deb. It can also decompress files +compressed with gzip, bzip2, or compress. In addition to providing one command to handle many different archive -types, x also aids the user by extracting contents consistently. By +types, dtrx also aids the user by extracting contents consistently. By default, everything will be written to a dedicated directory that's named -after the archive. x will also change the permissions to ensure that the +after the archive. dtrx will also change the permissions to ensure that the owner can read and write all those files. -Running x ---------- +Running dtrx +------------ -To run x, simply call it with the archive(s) you wish to extract as +To run dtrx, simply call it with the archive(s) you wish to extract as arguments. For example:: - x coreutils-5.*.tar.gz + dtrx coreutils-5.*.tar.gz -x supports a number of options to mandate specific behavior: +dtrx supports a number of options to mandate specific behavior: -r, --recursive - With this option, x will search inside the archives you specify to see + With this option, dtrx will search inside the archives you specify to see if any of the contents are themselves archives, and extract those as well. -o, --overwrite - Normally, x will avoid extracting into a directory that already exists, + Normally, dtrx will avoid extracting into a directory that already exists, and instead try to find an alternative name to use. If this option is - listed, x will use the default directory name no matter what. + listed, dtrx will use the default directory name no matter what. -f, --flat Extract archive contents into the current directory, instead of their @@ -40,9 +40,12 @@ files which all need to be extracted into the same directory structure. Note that existing files may be overwritten with this option. +-l, -t, --list, --table + Don't extract the archives; just list their contents on standard output. + -q, --quiet Suppress warning messages. Listing this option twice will cause the - x to be silent if at all possible. + dtrx to be silent if at all possible. -v, --verbose Print more information about x's behavior. You can list this option up @@ -57,14 +60,14 @@ Other Useful Information ------------------------ -x 3.0 is copyright (c) 2006 `Brett Smith`_. Feel free to send comments, -bug reports, patches, and so on. You can find the latest version of x on -`its home page`_. +dtrx 4.0 is copyright (c) 2006, 2007 `Brett Smith`_. Feel free to send +comments, bug reports, patches, and so on. You can find the latest version +of dtrx on `its home page`_. .. _`Brett Smith`: mailto:brettcsmith@brettcsmith.org .. _`its home page`: http://www.brettcsmith.org/2006/x/ -x is free software; you can redistribute it and/or modify it under the +dtrx is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. diff -r 1600807a32bd -r bb6e9f4af1a5 scripts/dtrx --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/dtrx Sat Feb 10 16:43:44 2007 -0500 @@ -0,0 +1,569 @@ +#!/usr/bin/env python +# +# dtrx -- Intelligently extract various archive types. +# Copyright (c) 2006 Brett Smith . +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +# Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, 5th Floor, Boston, MA, 02111. + +import errno +import logging +import mimetypes +import optparse +import os +import stat +import subprocess +import sys +import tempfile + +from cStringIO import StringIO + +VERSION = "4.0" +VERSION_BANNER = """dtrx version %s +Copyright (c) 2006 Brett Smith + +This program is free software; you can redistribute it and/or modify it +under the terms of the GNU General Public License as published by the +Free Software Foundation; either version 2 of the License, or (at your +option) any later version. + +This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +Public License for more details.""" % (VERSION,) + +MATCHING_DIRECTORY = 1 +# ONE_DIRECTORY = 2 +BOMB = 3 +EMPTY = 4 +COMPRESSED = 5 + +mimetypes.encodings_map.setdefault('.bz2', 'bzip2') +mimetypes.types_map['.exe'] = 'application/x-msdos-program' + +def run_command(command, description, stdout=None, stderr=None, stdin=None): + process = subprocess.Popen(command, stdin=stdin, stdout=stdout, + stderr=stderr) + status = process.wait() + for pipe in (process.stdout, process.stderr): + try: + pipe.close() + except AttributeError: + pass + if status != 0: + return ("%s error: '%s' returned status code %s" % + (description, ' '.join(command), status)) + return None + +class FilenameChecker(object): + def __init__(self, original_name): + self.original_name = original_name + + def is_free(self, filename): + return not os.path.exists(filename) + + def check(self): + for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]: + filename = '%s%s' % (self.original_name, suffix) + if self.is_free(filename): + return filename + raise ValueError("all alternatives for name %s taken" % + (self.original_name,)) + + +class DirectoryChecker(FilenameChecker): + def is_free(self, filename): + try: + os.mkdir(filename) + except OSError, error: + if error.errno == errno.EEXIST: + return False + raise + return True + + +class ExtractorError(Exception): + pass + + +class ProcessStreamer(object): + def __init__(self, command, stdin, description="checking contents", + stderr=None): + self.process = subprocess.Popen(command, bufsize=1, stdin=stdin, + stdout=subprocess.PIPE, stderr=stderr) + self.command = ' '.join(command) + self.description = description + + def __iter__(self): + return self + + def next(self): + line = self.process.stdout.readline() + if line: + return line.rstrip('\n') + else: + raise StopIteration + + def stop(self): + while self.process.stdout.readline(): + pass + self.process.stdout.close() + status = self.process.wait() + if status != 0: + raise ExtractorError("%s error: '%s' returned status code %s" % + (self.description, self.command, status)) + try: + self.process.stderr.close() + except AttributeError: + pass + + +class BaseExtractor(object): + decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'} + + name_checker = DirectoryChecker + + def __init__(self, filename, mimetype, encoding): + if encoding and (not self.decoders.has_key(encoding)): + raise ValueError("unrecognized encoding %s" % (encoding,)) + self.filename = os.path.realpath(filename) + self.mimetype = mimetype + self.encoding = encoding + self.included_archives = [] + try: + self.archive = open(filename, 'r') + except (IOError, OSError), error: + raise ExtractorError("could not open %s: %s" % + (filename, error.strerror)) + if encoding: + self.pipe([self.decoders[encoding]], "decoding") + self.prepare() + + def run(self, command, description="extraction", stdout=None, stderr=None, + stdin=None): + error = run_command(command, description, stdout, stderr, stdin) + if error: + raise ExtractorError(error) + + def pipe(self, command, description, stderr=None): + output = tempfile.TemporaryFile() + self.run(command, description, output, stderr, self.archive) + self.archive.close() + self.archive = output + self.archive.flush() + + def prepare(self): + pass + + def check_contents(self): + archive_type = None + filenames = self.get_filenames() + try: + filename = filenames.next() + if extractor_map.has_key(mimetypes.guess_type(filename)[0]): + self.included_archives.append(filename) + first_part = filename.split('/', 1)[0] + '/' + except StopIteration: + filenames.stop() + return EMPTY + for filename in filenames: + if extractor_map.has_key(mimetypes.guess_type(filename)[0]): + self.included_archives.append(filename) + if (archive_type is None) and (not filename.startswith(first_part)): + archive_type = BOMB + filenames.stop() + if archive_type: + return archive_type + if self.basename() == first_part[:-1]: + return MATCHING_DIRECTORY + return first_part + + def basename(self): + pieces = os.path.basename(self.filename).split('.') + extension = '.' + pieces[-1] + if mimetypes.encodings_map.has_key(extension): + pieces.pop() + extension = '.' + pieces[-1] + if (mimetypes.types_map.has_key(extension) or + mimetypes.common_types.has_key(extension) or + mimetypes.suffix_map.has_key(extension)): + pieces.pop() + return '.'.join(pieces) + + def extract(self, path): + old_path = os.path.realpath(os.curdir) + os.chdir(path) + self.archive.seek(0, 0) + self.extract_archive() + os.chdir(old_path) + + +class TarExtractor(BaseExtractor): + def get_filenames(self): + self.archive.seek(0, 0) + return ProcessStreamer(['tar', '-t'], self.archive) + + def extract_archive(self): + self.run(['tar', '-x'], stdin=self.archive) + + +class ZipExtractor(BaseExtractor): + def __init__(self, filename, mimetype, encoding): + self.filename = os.path.realpath(filename) + self.mimetype = mimetype + self.encoding = encoding + self.included_archives = [] + self.archive = StringIO() + + def get_filenames(self): + self.archive.seek(0, 0) + return ProcessStreamer(['zipinfo', '-1', self.filename], None) + + def extract_archive(self): + self.run(['unzip', '-q', self.filename]) + + +class CpioExtractor(BaseExtractor): + def get_filenames(self): + self.archive.seek(0, 0) + return ProcessStreamer(['cpio', '-t'], self.archive, + stderr=subprocess.PIPE) + + def extract_archive(self): + self.run(['cpio', '-i', '--make-directories', + '--no-absolute-filenames'], + stderr=subprocess.PIPE, stdin=self.archive) + + +class RPMExtractor(CpioExtractor): + def prepare(self): + self.pipe(['rpm2cpio', '-'], "rpm2cpio") + + def basename(self): + pieces = os.path.basename(self.filename).split('.') + if len(pieces) == 1: + return pieces[0] + elif pieces[-1] != 'rpm': + return BaseExtractor.basename(self) + pieces.pop() + if len(pieces) == 1: + return pieces[0] + elif len(pieces[-1]) < 8: + pieces.pop() + return '.'.join(pieces) + + def check_contents(self): + CpioExtractor.check_contents(self) + return BOMB + + +class DebExtractor(TarExtractor): + def prepare(self): + self.pipe(['ar', 'p', self.filename, 'data.tar.gz'], + "data.tar.gz extraction") + self.archive.seek(0, 0) + self.pipe(['zcat'], "data.tar.gz decompression") + + def basename(self): + pieces = os.path.basename(self.filename).split('_') + if len(pieces) == 1: + return pieces[0] + last_piece = pieces.pop() + if (len(last_piece) > 10) or (not last_piece.endswith('.deb')): + return BaseExtractor.basename(self) + return '_'.join(pieces) + + def check_contents(self): + TarExtractor.check_contents(self) + return BOMB + + +class CompressionExtractor(BaseExtractor): + name_checker = FilenameChecker + + def basename(self): + pieces = os.path.basename(self.filename).split('.') + extension = '.' + pieces[-1] + if mimetypes.encodings_map.has_key(extension): + pieces.pop() + return '.'.join(pieces) + + def get_filenames(self): + yield self.basename() + + def check_contents(self): + return COMPRESSED + + def extract(self, path): + output = open(path, 'w') + self.run(['cat'], "output write", stdin=self.archive, stdout=output) + output.close() + + +class BaseHandler(object): + def __init__(self, extractor, contents, options): + self.logger = logging.getLogger('dtrx-log') + self.extractor = extractor + self.contents = contents + self.options = options + self.target = None + + def extract(self): + try: + self.extractor.extract(self.target) + except (ExtractorError, IOError, OSError), error: + return str(error) + + def cleanup(self): + if self.target is None: + return + command = 'find' + status = subprocess.call(['find', self.target, '-type', 'd', + '-exec', 'chmod', 'u+rwx', '{}', ';']) + if status == 0: + command = 'chmod' + status = subprocess.call(['chmod', '-R', 'u+rw', self.target]) + if status != 0: + return "%s returned with exit status %s" % (command, status) + + +# The "where to extract" table, with options and archive types. +# This dictates the contents of each can_handle method. +# +# Flat Overwrite None +# File basename basename FilenameChecked +# Match . . tempdir + checked +# Bomb . basename DirectoryChecked + +class FlatHandler(BaseHandler): + def can_handle(contents, options): + return ((options.flat and (contents != COMPRESSED)) or + (options.overwrite and (contents == MATCHING_DIRECTORY))) + can_handle = staticmethod(can_handle) + + def __init__(self, extractor, contents, options): + BaseHandler.__init__(self, extractor, contents, options) + self.target = '.' + + def cleanup(self): + for filename in self.extractor.get_filenames(): + stat_info = os.stat(filename) + perms = stat.S_IRUSR | stat.S_IWUSR + if stat.S_ISDIR(stat_info.st_mode): + perms |= stat.S_IXUSR + os.chmod(filename, stat_info.st_mode | perms) + + +class OverwriteHandler(BaseHandler): + def can_handle(contents, options): + return ((options.flat and (contents == COMPRESSED)) or + (options.overwrite and (contents != MATCHING_DIRECTORY))) + can_handle = staticmethod(can_handle) + + def __init__(self, extractor, contents, options): + BaseHandler.__init__(self, extractor, contents, options) + self.target = self.extractor.basename() + + +class MatchHandler(BaseHandler): + def can_handle(contents, options): + return contents == MATCHING_DIRECTORY + can_handle = staticmethod(can_handle) + + def extract(self): + basename = self.extractor.basename() + self.target = tempfile.mkdtemp(dir='.') + result = BaseHandler.extract(self) + if result is None: + tempdir = self.target + checker = self.extractor.name_checker(basename) + self.target = checker.check() + os.rename(os.path.join(tempdir, basename), self.target) + os.rmdir(tempdir) + return result + + +class EmptyHandler(object): + def can_handle(contents, options): + return contents == EMPTY + can_handle = staticmethod(can_handle) + + def __init__(self, extractor, contents, options): pass + def extract(self): pass + def cleanup(self): pass + + +class BombHandler(BaseHandler): + def can_handle(contents, options): + return True + can_handle = staticmethod(can_handle) + + def __init__(self, extractor, contents, options): + BaseHandler.__init__(self, extractor, contents, options) + checker = self.extractor.name_checker(self.extractor.basename()) + self.target = checker.check() + + +extractor_map = {'application/x-tar': TarExtractor, + 'application/zip': ZipExtractor, + 'application/x-msdos-program': ZipExtractor, + 'application/x-debian-package': DebExtractor, + 'application/x-redhat-package-manager': RPMExtractor, + 'application/x-rpm': RPMExtractor, + 'application/x-cpio': CpioExtractor} + +handlers = [FlatHandler, OverwriteHandler, MatchHandler, EmptyHandler, + BombHandler] + +class ExtractorApplication(object): + def __init__(self, arguments): + self.parse_options(arguments) + self.setup_logger() + self.successes = [] + self.failures = [] + + def parse_options(self, arguments): + parser = optparse.OptionParser( + usage="%prog [options] archive [archive2 ...]", + description="Intelligent archive extractor", + version=VERSION_BANNER + ) + parser.add_option('-r', '--recursive', dest='recursive', + action='store_true', default=False, + help='extract archives contained in the ones listed') + parser.add_option('-q', '--quiet', dest='quiet', + action='count', default=3, + help='suppress warning/error messages') + parser.add_option('-v', '--verbose', dest='verbose', + action='count', default=0, + help='be verbose/print debugging information') + parser.add_option('-o', '--overwrite', dest='overwrite', + action='store_true', default=False, + help='overwrite any existing target directory') + parser.add_option('-f', '--flat', '--no-directory', dest='flat', + action='store_true', default=False, + help="don't put contents in their own directory") + parser.add_option('-l', '-t', '--list', '--table', dest='show_list', + action='store_true', default=False, + help="list contents of archives on standard output") +## parser.add_option('-n', '--noninteractive', dest='batch', +## action='store_true', default=False, +## help="don't ask how to handle special cases") + self.options, filenames = parser.parse_args(arguments) + if not filenames: + parser.error("you did not list any archives") + self.archives = {os.path.realpath(os.curdir): filenames} + + def setup_logger(self): + self.logger = logging.getLogger('dtrx-log') + handler = logging.StreamHandler() + # WARNING is the default. + handler.setLevel(10 * (self.options.quiet - self.options.verbose)) + formatter = logging.Formatter("dtrx: %(levelname)s: %(message)s") + handler.setFormatter(formatter) + self.logger.addHandler(handler) + + def get_extractor(self): + mimetype, encoding = mimetypes.guess_type(self.current_filename) + try: + extractor = extractor_map[mimetype] + except KeyError: + if encoding: + extractor = CompressionExtractor + contents = COMPRESSED + else: + return "not a known archive type" + try: + self.current_extractor = extractor(self.current_filename, mimetype, + encoding) + content = self.current_extractor.check_contents() + for handler in handlers: + if handler.can_handle(content, self.options): + self.current_handler = handler(self.current_extractor, + content, self.options) + break + except ExtractorError, error: + return str(error) + + def recurse(self): + if not self.options.recursive: + return + for filename in self.current_extractor.included_archives: + tail_path, basename = os.path.split(filename) + directory = os.path.join(self.current_directory, + self.current_handler.target, tail_path) + self.archives.setdefault(directory, []).append(basename) + + def report(self, function, *args): + try: + error = function(*args) + except (ExtractorError, IOError, OSError), exception: + error = str(exception) + if error: + self.logger.error("%s: %s", self.current_filename, error) + return False + return True + + def record_status(self, success): + if success: + self.successes.append(self.current_filename) + else: + self.failures.append(self.current_filename) + + def extract(self): + while self.archives: + self.current_directory, filenames = self.archives.popitem() + for filename in filenames: + os.chdir(self.current_directory) + self.current_filename = filename + success = self.report(self.get_extractor) + if success: + for name in 'extract', 'cleanup': + success = (self.report(getattr(self.current_handler, + name)) and success) + self.recurse() + self.record_status(success) + + def show_contents(self): + for filename in self.current_extractor.get_filenames(): + print filename + + def show_list(self): + filenames = self.archives.values()[0] + if len(filenames) > 1: + header = "%s:\n" + else: + header = None + for filename in filenames: + if header: + print header % (filename,), + header = "\n%s:\n" + self.current_filename = filename + success = (self.report(self.get_extractor) and + self.report(self.show_contents)) + self.record_status(success) + + def run(self): + if self.options.show_list: + self.show_list() + else: + self.extract() + if self.failures: + return 1 + return 0 + + +if __name__ == '__main__': + app = ExtractorApplication(sys.argv[1:]) + sys.exit(app.run()) diff -r 1600807a32bd -r bb6e9f4af1a5 scripts/x --- a/scripts/x Tue Jan 02 20:30:17 2007 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,538 +0,0 @@ -#!/usr/bin/env python -# -# x -- Intelligently extract various archive types. -# Copyright (c) 2006 Brett Smith . -# -# This program is free software; you can redistribute it and/or modify it -# under the terms of the GNU General Public License as published by the -# Free Software Foundation; either version 2 of the License, or (at your -# option) any later version. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General -# Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with this program; if not, write to the Free Software Foundation, Inc., -# 51 Franklin Street, 5th Floor, Boston, MA, 02111. - -import errno -import logging -import mimetypes -import optparse -import os -import stat -import subprocess -import sys -import tempfile - -from cStringIO import StringIO - -VERSION = "3.0" -VERSION_BANNER = """x version %s -Copyright (c) 2006 Brett Smith - -This program is free software; you can redistribute it and/or modify it -under the terms of the GNU General Public License as published by the -Free Software Foundation; either version 2 of the License, or (at your -option) any later version. - -This program is distributed in the hope that it will be useful, but -WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General -Public License for more details.""" % (VERSION,) - -MATCHING_DIRECTORY = 1 -# ONE_DIRECTORY = 2 -BOMB = 3 -EMPTY = 4 -COMPRESSED = 5 - -mimetypes.encodings_map.setdefault('.bz2', 'bzip2') -mimetypes.types_map['.exe'] = 'application/x-msdos-program' - -def run_command(command, description, stdout=None, stderr=None, stdin=None): - process = subprocess.Popen(command, stdin=stdin, stdout=stdout, - stderr=stderr) - status = process.wait() - for pipe in (process.stdout, process.stderr): - try: - pipe.close() - except AttributeError: - pass - if status != 0: - return ("%s error: '%s' returned status code %s" % - (description, ' '.join(command), status)) - return None - -class FilenameChecker(object): - def __init__(self, original_name): - self.original_name = original_name - - def is_free(self, filename): - return not os.path.exists(filename) - - def check(self): - for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]: - filename = '%s%s' % (self.original_name, suffix) - if self.is_free(filename): - return filename - raise ValueError("all alternatives for name %s taken" % - (self.original_name,)) - - -class DirectoryChecker(FilenameChecker): - def is_free(self, filename): - try: - os.mkdir(filename) - except OSError, error: - if error.errno == errno.EEXIST: - return False - raise - return True - - -class ExtractorError(Exception): - pass - - -class ProcessStreamer(object): - def __init__(self, command, stdin, description="checking contents", - stderr=None): - self.process = subprocess.Popen(command, bufsize=1, stdin=stdin, - stdout=subprocess.PIPE, stderr=stderr) - self.command = ' '.join(command) - self.description = description - - def __iter__(self): - return self - - def next(self): - line = self.process.stdout.readline() - if line: - return line.rstrip('\n') - else: - raise StopIteration - - def stop(self): - while self.process.stdout.readline(): - pass - self.process.stdout.close() - status = self.process.wait() - if status != 0: - raise ExtractorError("%s error: '%s' returned status code %s" % - (self.description, self.command, status)) - try: - self.process.stderr.close() - except AttributeError: - pass - - -class BaseExtractor(object): - decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'} - - name_checker = DirectoryChecker - - def __init__(self, filename, mimetype, encoding): - if encoding and (not self.decoders.has_key(encoding)): - raise ValueError("unrecognized encoding %s" % (encoding,)) - self.filename = os.path.realpath(filename) - self.mimetype = mimetype - self.encoding = encoding - self.included_archives = [] - try: - self.archive = open(filename, 'r') - except (IOError, OSError), error: - raise ExtractorError("could not open %s: %s" % - (filename, error.strerror)) - if encoding: - self.pipe([self.decoders[encoding]], "decoding") - self.prepare() - - def run(self, command, description="extraction", stdout=None, stderr=None, - stdin=None): - error = run_command(command, description, stdout, stderr, stdin) - if error: - raise ExtractorError(error) - - def pipe(self, command, description, stderr=None): - output = tempfile.TemporaryFile() - self.run(command, description, output, stderr, self.archive) - self.archive.close() - self.archive = output - self.archive.flush() - - def prepare(self): - pass - - def check_contents(self): - archive_type = None - filenames = self.get_filenames() - try: - filename = filenames.next() - if extractor_map.has_key(mimetypes.guess_type(filename)[0]): - self.included_archives.append(filename) - first_part = filename.split('/', 1)[0] + '/' - except StopIteration: - filenames.stop() - return EMPTY - for filename in filenames: - if extractor_map.has_key(mimetypes.guess_type(filename)[0]): - self.included_archives.append(filename) - if (archive_type is None) and (not filename.startswith(first_part)): - archive_type = BOMB - filenames.stop() - if archive_type: - return archive_type - if self.basename() == first_part[:-1]: - return MATCHING_DIRECTORY - return first_part - - def basename(self): - pieces = os.path.basename(self.filename).split('.') - extension = '.' + pieces[-1] - if mimetypes.encodings_map.has_key(extension): - pieces.pop() - extension = '.' + pieces[-1] - if (mimetypes.types_map.has_key(extension) or - mimetypes.common_types.has_key(extension) or - mimetypes.suffix_map.has_key(extension)): - pieces.pop() - return '.'.join(pieces) - - def extract(self, path): - old_path = os.path.realpath(os.curdir) - os.chdir(path) - self.archive.seek(0, 0) - self.extract_archive() - os.chdir(old_path) - - -class TarExtractor(BaseExtractor): - def get_filenames(self): - self.archive.seek(0, 0) - return ProcessStreamer(['tar', '-t'], self.archive) - - def extract_archive(self): - self.run(['tar', '-x'], stdin=self.archive) - - -class ZipExtractor(BaseExtractor): - def __init__(self, filename, mimetype, encoding): - self.filename = os.path.realpath(filename) - self.mimetype = mimetype - self.encoding = encoding - self.included_archives = [] - self.archive = StringIO() - - def get_filenames(self): - self.archive.seek(0, 0) - return ProcessStreamer(['zipinfo', '-1', self.filename], None) - - def extract_archive(self): - self.run(['unzip', '-q', self.filename]) - - -class CpioExtractor(BaseExtractor): - def get_filenames(self): - self.archive.seek(0, 0) - return ProcessStreamer(['cpio', '-t'], self.archive, - stderr=subprocess.PIPE) - - def extract_archive(self): - self.run(['cpio', '-i', '--make-directories', - '--no-absolute-filenames'], - stderr=subprocess.PIPE, stdin=self.archive) - - -class RPMExtractor(CpioExtractor): - def prepare(self): - self.pipe(['rpm2cpio', '-'], "rpm2cpio") - - def basename(self): - pieces = os.path.basename(self.filename).split('.') - if len(pieces) == 1: - return pieces[0] - elif pieces[-1] != 'rpm': - return BaseExtractor.basename(self) - pieces.pop() - if len(pieces) == 1: - return pieces[0] - elif len(pieces[-1]) < 8: - pieces.pop() - return '.'.join(pieces) - - def check_contents(self): - CpioExtractor.check_contents(self) - return BOMB - - -class DebExtractor(TarExtractor): - def prepare(self): - self.pipe(['ar', 'p', self.filename, 'data.tar.gz'], - "data.tar.gz extraction") - self.archive.seek(0, 0) - self.pipe(['zcat'], "data.tar.gz decompression") - - def basename(self): - pieces = os.path.basename(self.filename).split('_') - if len(pieces) == 1: - return pieces[0] - last_piece = pieces.pop() - if (len(last_piece) > 10) or (not last_piece.endswith('.deb')): - return BaseExtractor.basename(self) - return '_'.join(pieces) - - def check_contents(self): - TarExtractor.check_contents(self) - return BOMB - - -class CompressionExtractor(BaseExtractor): - name_checker = FilenameChecker - - def basename(self): - pieces = os.path.basename(self.filename).split('.') - extension = '.' + pieces[-1] - if mimetypes.encodings_map.has_key(extension): - pieces.pop() - return '.'.join(pieces) - - def get_filenames(self): - yield self.basename() - - def check_contents(self): - return COMPRESSED - - def extract(self, path): - output = open(path, 'w') - self.run(['cat'], "output write", stdin=self.archive, stdout=output) - output.close() - - -class BaseHandler(object): - def __init__(self, extractor, contents, options): - self.logger = logging.getLogger('x-log') - self.extractor = extractor - self.contents = contents - self.options = options - self.target = None - - def extract(self): - try: - self.extractor.extract(self.target) - except (ExtractorError, IOError, OSError), error: - return str(error) - - def cleanup(self): - if self.target is None: - return - command = 'find' - status = subprocess.call(['find', self.target, '-type', 'd', - '-exec', 'chmod', 'u+rwx', '{}', ';']) - if status == 0: - command = 'chmod' - status = subprocess.call(['chmod', '-R', 'u+rw', self.target]) - if status != 0: - return "%s returned with exit status %s" % (command, status) - - -# The "where to extract" table, with options and archive types. -# This dictates the contents of each can_handle method. -# -# Flat Overwrite None -# File basename basename FilenameChecked -# Match . . tempdir + checked -# Bomb . basename DirectoryChecked - -class FlatHandler(BaseHandler): - def can_handle(contents, options): - return ((options.flat and (contents != COMPRESSED)) or - (options.overwrite and (contents == MATCHING_DIRECTORY))) - can_handle = staticmethod(can_handle) - - def __init__(self, extractor, contents, options): - BaseHandler.__init__(self, extractor, contents, options) - self.target = '.' - - def cleanup(self): - for filename in self.extractor.get_filenames(): - stat_info = os.stat(filename) - perms = stat.S_IRUSR | stat.S_IWUSR - if stat.S_ISDIR(stat_info.st_mode): - perms |= stat.S_IXUSR - os.chmod(filename, stat_info.st_mode | perms) - - -class OverwriteHandler(BaseHandler): - def can_handle(contents, options): - return ((options.flat and (contents == COMPRESSED)) or - (options.overwrite and (contents != MATCHING_DIRECTORY))) - can_handle = staticmethod(can_handle) - - def __init__(self, extractor, contents, options): - BaseHandler.__init__(self, extractor, contents, options) - self.target = self.extractor.basename() - - -class MatchHandler(BaseHandler): - def can_handle(contents, options): - return contents == MATCHING_DIRECTORY - can_handle = staticmethod(can_handle) - - def extract(self): - basename = self.extractor.basename() - self.target = tempfile.mkdtemp(dir='.') - result = BaseHandler.extract(self) - if result is None: - tempdir = self.target - checker = self.extractor.name_checker(basename) - self.target = checker.check() - os.rename(os.path.join(tempdir, basename), self.target) - os.rmdir(tempdir) - return result - - -class EmptyHandler(object): - def can_handle(contents, options): - return contents == EMPTY - can_handle = staticmethod(can_handle) - - def __init__(self, extractor, contents, options): pass - def extract(self): pass - def cleanup(self): pass - - -class BombHandler(BaseHandler): - def can_handle(contents, options): - return True - can_handle = staticmethod(can_handle) - - def __init__(self, extractor, contents, options): - BaseHandler.__init__(self, extractor, contents, options) - checker = self.extractor.name_checker(self.extractor.basename()) - self.target = checker.check() - - -extractor_map = {'application/x-tar': TarExtractor, - 'application/zip': ZipExtractor, - 'application/x-msdos-program': ZipExtractor, - 'application/x-debian-package': DebExtractor, - 'application/x-redhat-package-manager': RPMExtractor, - 'application/x-rpm': RPMExtractor, - 'application/x-cpio': CpioExtractor} - -handlers = [FlatHandler, OverwriteHandler, MatchHandler, EmptyHandler, - BombHandler] - -class ExtractorApplication(object): - def __init__(self, arguments): - self.parse_options(arguments) - self.setup_logger() - self.successes = [] - self.failures = [] - - def parse_options(self, arguments): - parser = optparse.OptionParser( - usage="%prog [options] archive [archive2 ...]", - description="Intelligent archive extractor", - version=VERSION_BANNER - ) - parser.add_option('-r', '--recursive', dest='recursive', - action='store_true', default=False, - help='extract archives contained in the ones listed') - parser.add_option('-q', '--quiet', dest='quiet', - action='count', default=3, - help='suppress warning/error messages') - parser.add_option('-v', '--verbose', dest='verbose', - action='count', default=0, - help='be verbose/print debugging information') - parser.add_option('-o', '--overwrite', dest='overwrite', - action='store_true', default=False, - help='overwrite any existing target directory') - parser.add_option('-f', '--flat', '--no-directory', dest='flat', - action='store_true', default=False, - help="don't put contents in their own directory") -## parser.add_option('-n', '--noninteractive', dest='batch', -## action='store_true', default=False, -## help="don't ask how to handle special cases") - self.options, filenames = parser.parse_args(arguments) - if not filenames: - parser.error("you did not list any archives") - self.archives = {os.path.realpath(os.curdir): filenames} - - def setup_logger(self): - self.logger = logging.getLogger('x-log') - handler = logging.StreamHandler() - # WARNING is the default. - handler.setLevel(10 * (self.options.quiet - self.options.verbose)) - formatter = logging.Formatter("x: %(levelname)s: %(message)s") - handler.setFormatter(formatter) - self.logger.addHandler(handler) - - def get_extractor(self): - mimetype, encoding = mimetypes.guess_type(self.current_filename) - try: - extractor = extractor_map[mimetype] - except KeyError: - if encoding: - extractor = CompressionExtractor - contents = COMPRESSED - else: - return "not a known archive type" - try: - self.current_extractor = extractor(self.current_filename, mimetype, - encoding) - content = self.current_extractor.check_contents() - for handler in handlers: - if handler.can_handle(content, self.options): - self.current_handler = handler(self.current_extractor, - content, self.options) - break - except ExtractorError, error: - return str(error) - - def recurse(self): - if not self.options.recursive: - return - for filename in self.current_extractor.included_archives: - tail_path, basename = os.path.split(filename) - directory = os.path.join(self.current_directory, - self.current_handler.target, tail_path) - self.archives.setdefault(directory, []).append(basename) - - def report(self, function, *args): - try: - error = function(*args) - except (ExtractorError, IOError, OSError), exception: - error = str(exception) - if error: - self.logger.error("%s: %s", self.current_filename, error) - return False - return True - - def run(self): - while self.archives: - self.current_directory, filenames = self.archives.popitem() - for filename in filenames: - os.chdir(self.current_directory) - self.current_filename = filename - success = self.report(self.get_extractor) - if success: - for name in 'extract', 'cleanup': - success = (self.report(getattr(self.current_handler, - name)) and success) - self.recurse() - if success: - self.successes.append(self.current_filename) - else: - self.failures.append(self.current_filename) - if self.failures: - return 1 - return 0 - - -if __name__ == '__main__': - app = ExtractorApplication(sys.argv[1:]) - sys.exit(app.run()) diff -r 1600807a32bd -r bb6e9f4af1a5 setup.py --- a/setup.py Tue Jan 02 20:30:17 2007 -0500 +++ b/setup.py Sat Feb 10 16:43:44 2007 -0500 @@ -2,12 +2,12 @@ from distutils.core import setup -setup(name="x", - version = "3.0", +setup(name="dtrx", + version = "4.0", description = "Script to intelligently extract multiple archive types", author = "Brett Smith", author_email = "brettcsmith@brettcsmith.org", - url = "http://www.brettcsmith.org/2006/x", - scripts = ['scripts/x'], + url = "http://www.brettcsmith.org/2007/dtrx", + scripts = ['scripts/dtrx'], license = "GNU General Public License, version 2 or later" ) diff -r 1600807a32bd -r bb6e9f4af1a5 tests/compare.py --- a/tests/compare.py Tue Jan 02 20:30:17 2007 -0500 +++ b/tests/compare.py Sat Feb 10 16:43:44 2007 -0500 @@ -26,15 +26,15 @@ from sets import Set as set -if os.path.exists('scripts/x') and os.path.exists('tests'): +if os.path.exists('scripts/dtrx') and os.path.exists('tests'): os.chdir('tests') -elif os.path.exists('../scripts/x') and os.path.exists('../tests'): +elif os.path.exists('../scripts/dtrx') and os.path.exists('../tests'): pass else: print "ERROR: Can't run tests in this directory!" sys.exit(2) -X_SCRIPT = os.path.realpath('../scripts/x') +X_SCRIPT = os.path.realpath('../scripts/dtrx') ROOT_DIR = os.path.realpath(os.curdir) OUTCOMES = ['error', 'failed', 'passed'] TESTSCRIPT_NAME = 'testscript.sh' @@ -53,7 +53,7 @@ for key in ('name',): setattr(self, key, kwargs[key]) for key in ('directory', 'prerun', 'posttest', 'baseline', 'error', - 'grep', 'antigrep'): + 'grep', 'antigrep', 'output'): setattr(self, key, kwargs.get(key, None)) for key in ('options', 'filenames'): setattr(self, key, kwargs.get(key, '').split()) @@ -144,25 +144,35 @@ return "x returned error code %s" % (status,) return None - def grep_output(self): - output_buffer.seek(0, 0) - output_buffer.readline() - output = output_buffer.read(-1) + def grep_output(self, output): if self.grep and (not re.search(self.grep, output)): return "output did not match %s" % (self.grep) elif self.antigrep and re.search(self.antigrep, output): return "output matched antigrep %s" % (self.antigrep) return None + def check_output(self, output): + if ((self.output is not None) and + (self.output.strip() != output.strip())): + return "output did not match provided text" + return None + def check_results(self): output_buffer.seek(0, 0) output_buffer.truncate() self.clean() status, actual = self.get_extractor_results() - problem = self.have_error_mismatch(status) or self.grep_output() + output_buffer.seek(0, 0) + output_buffer.readline() + output = output_buffer.read(-1) + problem = (self.have_error_mismatch(status) or + self.check_output(output) or self.grep_output(output)) if problem: return self.show_status('FAILED', problem) - return self.compare_results(actual) + if self.baseline: + return self.compare_results(actual) + else: + return self.show_status('Passed') def run(self): if self.directory: diff -r 1600807a32bd -r bb6e9f4af1a5 tests/tests.yml --- a/tests/tests.yml Tue Jan 02 20:30:17 2007 -0500 +++ b/tests/tests.yml Sat Feb 10 16:43:44 2007 -0500 @@ -150,3 +150,36 @@ prerun: | chmod 500 . +- name: list contents of one file + options: -l + filenames: test-1.23.tar + output: | + test-1.23/ + test-1.23/1/ + test-1.23/1/2/ + test-1.23/1/2/3 + test-1.23/a/ + test-1.23/a/b + test-1.23/foobar + +- name: list contents of multiple files + options: --table + filenames: test-1.23_all.deb test-1.23.zip + output: | + test-1.23_all.deb: + 1/ + 1/2/ + 1/2/3 + a/ + a/b + foobar + + test-1.23.zip: + 1/2/3 + a/b + foobar + +- name: list contents of compressed file + options: -t + filenames: test-text.gz + output: test-text