scripts/x

branch
trunk
changeset 19
bb6e9f4af1a5
parent 18
1600807a32bd
child 20
69c93c3e6972
--- a/scripts/x	Tue Jan 02 20:30:17 2007 -0500
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,538 +0,0 @@
-#!/usr/bin/env python
-#
-# x -- Intelligently extract various archive types.
-# Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>.
-#
-# This program is free software; you can redistribute it and/or modify it
-# under the terms of the GNU General Public License as published by the
-# Free Software Foundation; either version 2 of the License, or (at your
-# option) any later version.
-#
-# This program is distributed in the hope that it will be useful, but
-# WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
-# Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, 5th Floor, Boston, MA, 02111.
-
-import errno
-import logging
-import mimetypes
-import optparse
-import os
-import stat
-import subprocess
-import sys
-import tempfile
-
-from cStringIO import StringIO
-
-VERSION = "3.0"
-VERSION_BANNER = """x version %s
-Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>
-
-This program is free software; you can redistribute it and/or modify it
-under the terms of the GNU General Public License as published by the
-Free Software Foundation; either version 2 of the License, or (at your
-option) any later version.
-
-This program is distributed in the hope that it will be useful, but
-WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
-Public License for more details.""" % (VERSION,)
-
-MATCHING_DIRECTORY = 1
-# ONE_DIRECTORY = 2
-BOMB = 3
-EMPTY = 4
-COMPRESSED = 5
-
-mimetypes.encodings_map.setdefault('.bz2', 'bzip2')
-mimetypes.types_map['.exe'] = 'application/x-msdos-program'
-
-def run_command(command, description, stdout=None, stderr=None, stdin=None):
-    process = subprocess.Popen(command, stdin=stdin, stdout=stdout,
-                               stderr=stderr)
-    status = process.wait()
-    for pipe in (process.stdout, process.stderr):
-        try:
-            pipe.close()
-        except AttributeError:
-            pass
-    if status != 0:
-        return ("%s error: '%s' returned status code %s" %
-                (description, ' '.join(command), status))
-    return None
-
-class FilenameChecker(object):
-    def __init__(self, original_name):
-        self.original_name = original_name
-
-    def is_free(self, filename):
-        return not os.path.exists(filename)
-
-    def check(self):
-        for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]:
-            filename = '%s%s' % (self.original_name, suffix)
-            if self.is_free(filename):
-                return filename
-        raise ValueError("all alternatives for name %s taken" %
-                         (self.original_name,))
-        
-
-class DirectoryChecker(FilenameChecker):
-    def is_free(self, filename):
-        try:
-            os.mkdir(filename)
-        except OSError, error:
-            if error.errno == errno.EEXIST:
-                return False
-            raise
-        return True
-
-
-class ExtractorError(Exception):
-    pass
-
-
-class ProcessStreamer(object):
-    def __init__(self, command, stdin, description="checking contents",
-                 stderr=None):
-        self.process = subprocess.Popen(command, bufsize=1, stdin=stdin,
-                                        stdout=subprocess.PIPE, stderr=stderr)
-        self.command = ' '.join(command)
-        self.description = description
-
-    def __iter__(self):
-        return self
-
-    def next(self):
-        line = self.process.stdout.readline()
-        if line:
-            return line.rstrip('\n')
-        else:
-            raise StopIteration
-
-    def stop(self):
-        while self.process.stdout.readline():
-            pass
-        self.process.stdout.close()
-        status = self.process.wait()
-        if status != 0:
-            raise ExtractorError("%s error: '%s' returned status code %s" %
-                                 (self.description, self.command, status))
-        try:
-            self.process.stderr.close()
-        except AttributeError:
-            pass
-    
-
-class BaseExtractor(object):
-    decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'}
-
-    name_checker = DirectoryChecker
-
-    def __init__(self, filename, mimetype, encoding):
-        if encoding and (not self.decoders.has_key(encoding)):
-            raise ValueError("unrecognized encoding %s" % (encoding,))
-        self.filename = os.path.realpath(filename)
-        self.mimetype = mimetype
-        self.encoding = encoding
-        self.included_archives = []
-        try:
-            self.archive = open(filename, 'r')
-        except (IOError, OSError), error:
-            raise ExtractorError("could not open %s: %s" %
-                                 (filename, error.strerror))
-        if encoding:
-            self.pipe([self.decoders[encoding]], "decoding")
-        self.prepare()
-
-    def run(self, command, description="extraction", stdout=None, stderr=None,
-            stdin=None):
-        error = run_command(command, description, stdout, stderr, stdin)
-        if error:
-            raise ExtractorError(error)
-
-    def pipe(self, command, description, stderr=None):
-        output = tempfile.TemporaryFile()
-        self.run(command, description, output, stderr, self.archive)
-        self.archive.close()
-        self.archive = output
-        self.archive.flush()
-    
-    def prepare(self):
-        pass
-
-    def check_contents(self):
-        archive_type = None
-        filenames = self.get_filenames()
-        try:
-            filename = filenames.next()
-            if extractor_map.has_key(mimetypes.guess_type(filename)[0]):
-                self.included_archives.append(filename)
-            first_part = filename.split('/', 1)[0] + '/'
-        except StopIteration:
-            filenames.stop()
-            return EMPTY
-        for filename in filenames:
-            if extractor_map.has_key(mimetypes.guess_type(filename)[0]):
-                self.included_archives.append(filename)
-            if (archive_type is None) and (not filename.startswith(first_part)):
-                archive_type = BOMB
-        filenames.stop()
-        if archive_type:
-            return archive_type
-        if self.basename() == first_part[:-1]:
-            return MATCHING_DIRECTORY
-        return first_part
-
-    def basename(self):
-        pieces = os.path.basename(self.filename).split('.')
-        extension = '.' + pieces[-1]
-        if mimetypes.encodings_map.has_key(extension):
-            pieces.pop()
-            extension = '.' + pieces[-1]
-        if (mimetypes.types_map.has_key(extension) or
-            mimetypes.common_types.has_key(extension) or
-            mimetypes.suffix_map.has_key(extension)):
-            pieces.pop()
-        return '.'.join(pieces)
-
-    def extract(self, path):
-        old_path = os.path.realpath(os.curdir)
-        os.chdir(path)
-        self.archive.seek(0, 0)
-        self.extract_archive()
-        os.chdir(old_path)
-    
-
-class TarExtractor(BaseExtractor):
-    def get_filenames(self):
-        self.archive.seek(0, 0)
-        return ProcessStreamer(['tar', '-t'], self.archive)
-
-    def extract_archive(self): 
-        self.run(['tar', '-x'], stdin=self.archive)
-        
-        
-class ZipExtractor(BaseExtractor):
-    def __init__(self, filename, mimetype, encoding):
-        self.filename = os.path.realpath(filename)
-        self.mimetype = mimetype
-        self.encoding = encoding
-        self.included_archives = []
-        self.archive = StringIO()
-
-    def get_filenames(self):
-        self.archive.seek(0, 0)
-        return ProcessStreamer(['zipinfo', '-1', self.filename], None)
-
-    def extract_archive(self):
-        self.run(['unzip', '-q', self.filename])
-
-
-class CpioExtractor(BaseExtractor):
-    def get_filenames(self):
-        self.archive.seek(0, 0)
-        return ProcessStreamer(['cpio', '-t'], self.archive,
-                               stderr=subprocess.PIPE)
-
-    def extract_archive(self):
-        self.run(['cpio', '-i', '--make-directories',
-                  '--no-absolute-filenames'],
-                 stderr=subprocess.PIPE, stdin=self.archive)
-
-
-class RPMExtractor(CpioExtractor):
-    def prepare(self):
-        self.pipe(['rpm2cpio', '-'], "rpm2cpio")
-
-    def basename(self):
-        pieces = os.path.basename(self.filename).split('.')
-        if len(pieces) == 1:
-            return pieces[0]
-        elif pieces[-1] != 'rpm':
-            return BaseExtractor.basename(self)
-        pieces.pop()
-        if len(pieces) == 1:
-            return pieces[0]
-        elif len(pieces[-1]) < 8:
-            pieces.pop()
-        return '.'.join(pieces)
-
-    def check_contents(self):
-        CpioExtractor.check_contents(self)
-        return BOMB
-
-
-class DebExtractor(TarExtractor):
-    def prepare(self):
-        self.pipe(['ar', 'p', self.filename, 'data.tar.gz'],
-                  "data.tar.gz extraction")
-        self.archive.seek(0, 0)
-        self.pipe(['zcat'], "data.tar.gz decompression")
-
-    def basename(self):
-        pieces = os.path.basename(self.filename).split('_')
-        if len(pieces) == 1:
-            return pieces[0]
-        last_piece = pieces.pop()
-        if (len(last_piece) > 10) or (not last_piece.endswith('.deb')):
-            return BaseExtractor.basename(self)
-        return '_'.join(pieces)
-
-    def check_contents(self):
-        TarExtractor.check_contents(self)
-        return BOMB
-        
-
-class CompressionExtractor(BaseExtractor):
-    name_checker = FilenameChecker
-
-    def basename(self):
-        pieces = os.path.basename(self.filename).split('.')
-        extension = '.' + pieces[-1]
-        if mimetypes.encodings_map.has_key(extension):
-            pieces.pop()
-        return '.'.join(pieces)
-
-    def get_filenames(self):
-        yield self.basename()
-
-    def check_contents(self):
-        return COMPRESSED
-
-    def extract(self, path):
-        output = open(path, 'w')
-        self.run(['cat'], "output write", stdin=self.archive, stdout=output)
-        output.close()
-        
-
-class BaseHandler(object):
-    def __init__(self, extractor, contents, options):
-        self.logger = logging.getLogger('x-log')
-        self.extractor = extractor
-        self.contents = contents
-        self.options = options
-        self.target = None
-
-    def extract(self):
-        try:
-            self.extractor.extract(self.target)
-        except (ExtractorError, IOError, OSError), error:
-            return str(error)
-        
-    def cleanup(self):
-        if self.target is None:
-            return
-        command = 'find'
-        status = subprocess.call(['find', self.target, '-type', 'd',
-                                  '-exec', 'chmod', 'u+rwx', '{}', ';'])
-        if status == 0:
-            command = 'chmod'
-            status = subprocess.call(['chmod', '-R', 'u+rw', self.target])
-        if status != 0:
-            return "%s returned with exit status %s" % (command, status)
-
-
-# The "where to extract" table, with options and archive types.
-# This dictates the contents of each can_handle method.
-#
-#         Flat           Overwrite            None
-# File    basename       basename             FilenameChecked
-# Match   .              .                    tempdir + checked
-# Bomb    .              basename             DirectoryChecked
-
-class FlatHandler(BaseHandler):
-    def can_handle(contents, options):
-        return ((options.flat and (contents != COMPRESSED)) or
-                (options.overwrite and (contents == MATCHING_DIRECTORY)))
-    can_handle = staticmethod(can_handle)
-
-    def __init__(self, extractor, contents, options):
-        BaseHandler.__init__(self, extractor, contents, options)
-        self.target = '.'
-
-    def cleanup(self):
-        for filename in self.extractor.get_filenames():
-            stat_info = os.stat(filename)
-            perms = stat.S_IRUSR | stat.S_IWUSR
-            if stat.S_ISDIR(stat_info.st_mode):
-                perms |= stat.S_IXUSR
-            os.chmod(filename, stat_info.st_mode | perms)
-
-
-class OverwriteHandler(BaseHandler):
-    def can_handle(contents, options):
-        return ((options.flat and (contents == COMPRESSED)) or
-                (options.overwrite and (contents != MATCHING_DIRECTORY)))
-    can_handle = staticmethod(can_handle)
-
-    def __init__(self, extractor, contents, options):
-        BaseHandler.__init__(self, extractor, contents, options)
-        self.target = self.extractor.basename()
-        
-
-class MatchHandler(BaseHandler):
-    def can_handle(contents, options):
-        return contents == MATCHING_DIRECTORY
-    can_handle = staticmethod(can_handle)
-
-    def extract(self):
-        basename = self.extractor.basename()
-        self.target = tempfile.mkdtemp(dir='.')
-        result = BaseHandler.extract(self)
-        if result is None:
-            tempdir = self.target
-            checker = self.extractor.name_checker(basename)
-            self.target = checker.check()
-            os.rename(os.path.join(tempdir, basename), self.target)
-            os.rmdir(tempdir)
-        return result
-
-
-class EmptyHandler(object):
-    def can_handle(contents, options):
-        return contents == EMPTY
-    can_handle = staticmethod(can_handle)
-
-    def __init__(self, extractor, contents, options): pass
-    def extract(self): pass
-    def cleanup(self): pass
-
-
-class BombHandler(BaseHandler):
-    def can_handle(contents, options):
-        return True
-    can_handle = staticmethod(can_handle)
-
-    def __init__(self, extractor, contents, options):
-        BaseHandler.__init__(self, extractor, contents, options)
-        checker = self.extractor.name_checker(self.extractor.basename())
-        self.target = checker.check()
-
-        
-extractor_map = {'application/x-tar': TarExtractor,
-                 'application/zip': ZipExtractor,
-                 'application/x-msdos-program': ZipExtractor,
-                 'application/x-debian-package': DebExtractor,
-                 'application/x-redhat-package-manager': RPMExtractor,
-                 'application/x-rpm': RPMExtractor,
-                 'application/x-cpio': CpioExtractor}
-
-handlers = [FlatHandler, OverwriteHandler, MatchHandler, EmptyHandler,
-            BombHandler]
-
-class ExtractorApplication(object):
-    def __init__(self, arguments):
-        self.parse_options(arguments)
-        self.setup_logger()
-        self.successes = []
-        self.failures = []
-
-    def parse_options(self, arguments):
-        parser = optparse.OptionParser(
-            usage="%prog [options] archive [archive2 ...]",
-            description="Intelligent archive extractor",
-            version=VERSION_BANNER
-            )
-        parser.add_option('-r', '--recursive', dest='recursive',
-                          action='store_true', default=False,
-                          help='extract archives contained in the ones listed')
-        parser.add_option('-q', '--quiet', dest='quiet',
-                          action='count', default=3,
-                          help='suppress warning/error messages')
-        parser.add_option('-v', '--verbose', dest='verbose',
-                          action='count', default=0,
-                          help='be verbose/print debugging information')
-        parser.add_option('-o', '--overwrite', dest='overwrite',
-                          action='store_true', default=False,
-                          help='overwrite any existing target directory')
-        parser.add_option('-f', '--flat', '--no-directory', dest='flat',
-                          action='store_true', default=False,
-                          help="don't put contents in their own directory")
-##         parser.add_option('-n', '--noninteractive', dest='batch',
-##                           action='store_true', default=False,
-##                           help="don't ask how to handle special cases")
-        self.options, filenames = parser.parse_args(arguments)
-        if not filenames:
-            parser.error("you did not list any archives")
-        self.archives = {os.path.realpath(os.curdir): filenames}
-
-    def setup_logger(self):
-        self.logger = logging.getLogger('x-log')
-        handler = logging.StreamHandler()
-        # WARNING is the default.
-        handler.setLevel(10 * (self.options.quiet - self.options.verbose))
-        formatter = logging.Formatter("x: %(levelname)s: %(message)s")
-        handler.setFormatter(formatter)
-        self.logger.addHandler(handler)
-
-    def get_extractor(self):
-        mimetype, encoding = mimetypes.guess_type(self.current_filename)
-        try:
-            extractor = extractor_map[mimetype]
-        except KeyError:
-            if encoding:
-                extractor = CompressionExtractor
-                contents = COMPRESSED
-            else:
-                return "not a known archive type"
-        try:
-            self.current_extractor = extractor(self.current_filename, mimetype,
-                                               encoding)
-            content = self.current_extractor.check_contents()
-            for handler in handlers:
-                if handler.can_handle(content, self.options):
-                    self.current_handler = handler(self.current_extractor,
-                                                   content, self.options)
-                    break
-        except ExtractorError, error:
-            return str(error)
-
-    def recurse(self):
-        if not self.options.recursive:
-            return
-        for filename in self.current_extractor.included_archives:
-            tail_path, basename = os.path.split(filename)
-            directory = os.path.join(self.current_directory,
-                                     self.current_handler.target, tail_path)
-            self.archives.setdefault(directory, []).append(basename)
-
-    def report(self, function, *args):
-        try:
-            error = function(*args)
-        except (ExtractorError, IOError, OSError), exception:
-            error = str(exception)
-        if error:
-            self.logger.error("%s: %s", self.current_filename, error)
-            return False
-        return True
-
-    def run(self):
-        while self.archives:
-            self.current_directory, filenames = self.archives.popitem()
-            for filename in filenames:
-                os.chdir(self.current_directory)
-                self.current_filename = filename
-                success = self.report(self.get_extractor)
-                if success:
-                    for name in 'extract', 'cleanup':
-                        success = (self.report(getattr(self.current_handler,
-                                                       name)) and success)
-                    self.recurse()
-                if success:
-                    self.successes.append(self.current_filename)
-                else:
-                    self.failures.append(self.current_filename)
-        if self.failures:
-            return 1
-        return 0
-
-
-if __name__ == '__main__':
-    app = ExtractorApplication(sys.argv[1:])
-    sys.exit(app.run())

mercurial