Sat, 04 Nov 2006 10:34:06 -0500
[svn] Deal with a bunch of low-hanging fruit:
* Correctly cope with mimetype oddities I found on Fedora.
* I'm not doing anything with shar files yet, so take out that hook.
* Better error handling and reporting throughout, including a meaningful
exit code.
* Remove unused cruft from the BaseExtractor.run method.
* When reporting the "basename" for the archive, make sure it doesn't
include any preceding path.
* If the archive contains one directory whose name doesn't match the
archive basename, rename it after extraction.
- Although I just realized this probably does the wrong thing if there's
just one file in the archive.
#!/usr/bin/env python # # x -- Intelligently extract various archive types. # Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>. # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, 5th Floor, Boston, MA, 02111. import errno import mimetypes import os import subprocess import sys import tempfile from cStringIO import StringIO mimetypes.encodings_map.setdefault('.bz2', 'bzip2') mimetypes.types_map['.exe'] = 'application/x-msdos-program' MATCHING_DIRECTORY = 1 ONE_DIRECTORY = 2 BOMB = 3 EMPTY = 4 class ExtractorError(Exception): pass class ProcessStreamer(object): def __init__(self, command, stdin, description="checking contents", stderr=None): self.process = subprocess.Popen(command, bufsize=1, stdin=stdin, stdout=subprocess.PIPE, stderr=stderr) self.command = ' '.join(command) self.description = description def __iter__(self): return self def next(self): line = self.process.stdout.readline() if line: return line.rstrip('\n') else: raise StopIteration def stop(self): while self.process.stdout.readline(): pass self.process.stdout.close() status = self.process.wait() if status != 0: raise ExtractorError("%s error: '%s' returned status code %s" % (self.description, self.command, status)) try: self.process.stderr.close() except AttributeError: pass class BaseExtractor(object): decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'} def __init__(self, filename, mimetype, encoding): self.filename = filename self.mimetype = mimetype self.encoding = encoding try: self.archive = open(filename, 'r') except (IOError, OSError), error: raise ExtractorError("could not open %s: %s" % (filename, error.strerror)) if encoding: self.pipe([self.decoders[encoding]], "decoding") self.prepare() def run(self, command, description="extraction", stdout=None, stderr=None, stdin=None): process = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr) status = process.wait() if status != 0: raise ExtractorError("%s error: '%s' returned status code %s" % (description, ' '.join(command), status)) for pipe in (process.stdout, process.stderr): try: pipe.close() except AttributeError: pass def pipe(self, command, description, stderr=None): output = tempfile.TemporaryFile() self.run(command, description, output, stderr, self.archive) self.archive.close() self.archive = output self.archive.flush() def prepare(self): pass def check_contents(self): self.archive.seek(0, 0) filenames = self.get_filenames() try: first_part = filenames.next().split('/', 1)[0] + '/' except StopIteration: filenames.stop() return EMPTY for filename in filenames: if not filename.startswith(first_part): filenames.stop() return BOMB filenames.stop() if self.basename() == first_part[:-1]: return MATCHING_DIRECTORY return first_part def basename(self): pieces = os.path.basename(self.filename).split('.') extension = '.' + pieces[-1] if mimetypes.encodings_map.has_key(extension): pieces.pop() extension = '.' + pieces[-1] if (mimetypes.types_map.has_key(extension) or mimetypes.common_types.has_key(extension) or mimetypes.suffix_map.has_key(extension)): pieces.pop() return '.'.join(pieces) def extract(self, path): self.archive.seek(0, 0) self.extract_archive() class TarExtractor(BaseExtractor): def get_filenames(self): return ProcessStreamer(['tar', '-t'], self.archive) def extract_archive(self): self.run(['tar', '-x'], stdin=self.archive) class ZipExtractor(BaseExtractor): def __init__(self, filename, mimetype, encoding): self.filename = filename self.mimetype = mimetype self.encoding = encoding self.archive = StringIO() def get_filenames(self): return ProcessStreamer(['zipinfo', '-1', self.filename], None) def extract(self, path): self.run(['unzip', '-q', os.path.join(path, self.filename)]) class CpioExtractor(BaseExtractor): def get_filenames(self): return ProcessStreamer(['cpio', '-t'], self.archive, stderr=subprocess.PIPE) def extract_archive(self): self.run(['cpio', '-i', '--make-directories', '--no-absolute-filenames'], stderr=subprocess.PIPE, stdin=self.archive) class RPMExtractor(CpioExtractor): def prepare(self): self.pipe(['rpm2cpio', '-'], "rpm2cpio") def basename(self): pieces = os.path.basename(self.filename).rsplit('.', 2) if len(pieces) == 1: return pieces[0] elif pieces[-1] != 'rpm': return BaseExtractor.basename(self) pieces.pop() if len(pieces) == 1: return pieces[0] elif len(pieces[-1]) < 6: pieces.pop() return '.'.join(pieces) def check_contents(self): return BOMB class DebExtractor(TarExtractor): def prepare(self): self.pipe(['ar', 'p', self.filename, 'data.tar.gz'], "data.tar.gz extraction") self.archive.seek(0, 0) self.pipe(['zcat'], "data.tar.gz decompression") def basename(self): pieces = os.path.basename(self.filename).rsplit('_', 1) if len(pieces) == 1: return pieces[0] elif (len(pieces[-1]) > 10) or (not pieces[-1].endswith('.deb')): return BaseExtractor.basename(self) return pieces[0] def check_contents(self): return BOMB class ExtractorApplication(object): extractor_map = {'application/x-tar': TarExtractor, 'application/zip': ZipExtractor, 'application/x-msdos-program': ZipExtractor, 'application/x-debian-package': DebExtractor, 'application/x-redhat-package-manager': RPMExtractor, 'application/x-rpm': RPMExtractor, 'application/x-cpio': CpioExtractor} actions = ['get_extractor', 'prepare_extraction', 'extract'] def __init__(self, arguments): self.filenames = arguments self.successes = [] self.failures = [] def show_error(self, message): print >>sys.stderr, "%s: %s" % (self.current_filename, message) def get_extractor(self): mimetype, encoding = mimetypes.guess_type(self.current_filename) try: handler = self.extractor_map[mimetype] except KeyError: self.show_error("not a known archive type") return False try: self.current_extractor = handler(self.current_filename, mimetype, encoding) except ExtractorError, error: self.show_error(error) return False return True def prepare_target_directory(self): basename = self.current_extractor.basename() for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]: directory = '%s%s' % (basename, suffix) try: os.mkdir(directory) except OSError, error: if error.errno == errno.EEXIST: continue self.show_error("could not create extraction directory %s: %s" % (error.filename, error.strerror)) return None if suffix != '': self.show_error("extracted to %s" % (directory,)) break else: self.show_error("all good names for an extraction directory taken") return directory def prepare_extraction(self): self.current_path = '.' contents = self.current_extractor.check_contents() if contents not in (MATCHING_DIRECTORY, EMPTY): directory = self.prepare_target_directory() if directory is None: return False if contents == BOMB: os.chdir(directory) self.current_path = '..' self.cleanup_actions.append((os.chdir, '..')) else: self.cleanup_actions.append((os.rename, contents, directory)) return True def extract(self): try: self.current_extractor.extract(self.current_path) except ExtractorError, error: self.show_error(error) return False return True def run(self): for filename in self.filenames: running = True self.current_filename = filename self.cleanup_actions = [] actions = [getattr(self, name) for name in self.actions] while running and actions: running = actions.pop(0)() for action in self.cleanup_actions: action[0](*action[1:]) if running: self.successes.append(self.current_filename) else: self.failures.append(self.current_filename) if self.failures: return 1 return 0 if __name__ == '__main__': app = ExtractorApplication(sys.argv[1:]) sys.exit(app.run())