Sat, 04 Nov 2006 10:34:06 -0500
[svn] Deal with a bunch of low-hanging fruit:
* Correctly cope with mimetype oddities I found on Fedora.
* I'm not doing anything with shar files yet, so take out that hook.
* Better error handling and reporting throughout, including a meaningful
exit code.
* Remove unused cruft from the BaseExtractor.run method.
* When reporting the "basename" for the archive, make sure it doesn't
include any preceding path.
* If the archive contains one directory whose name doesn't match the
archive basename, rename it after extraction.
- Although I just realized this probably does the wrong thing if there's
just one file in the archive.
TODO | file | annotate | diff | comparison | revisions | |
scripts/x | file | annotate | diff | comparison | revisions |
--- a/TODO Wed Nov 01 22:03:46 2006 -0500 +++ b/TODO Sat Nov 04 10:34:06 2006 -0500 @@ -1,19 +1,10 @@ Things which I have a use case/anti-use case for: -* Make sure the basename method just works with the basename, and doesn't - include any preceding path. -* Take out the shar support, you're probably not gonna use it. -* Handle the one misnamed directory case. -* Provide a real exit status. * Fix permissions as you extract. * Recursive extraction. Things that are generally good: -* Better error messages: - * Problems opening/understanding the file. - * Extra strategies for making a directory, and then better errors. - * Running subcommands. -* Clean up the run method. * Usage information. +* Better error messages. Things I think might be good but can't prove: * Use zipfile instead of the zip commands.
--- a/scripts/x Wed Nov 01 22:03:46 2006 -0500 +++ b/scripts/x Sat Nov 04 10:34:06 2006 -0500 @@ -17,6 +17,7 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, 5th Floor, Boston, MA, 02111. +import errno import mimetypes import os import subprocess @@ -26,6 +27,7 @@ from cStringIO import StringIO mimetypes.encodings_map.setdefault('.bz2', 'bzip2') +mimetypes.types_map['.exe'] = 'application/x-msdos-program' MATCHING_DIRECTORY = 1 ONE_DIRECTORY = 2 @@ -75,35 +77,32 @@ self.filename = filename self.mimetype = mimetype self.encoding = encoding - self.archive = open(filename, 'r') + try: + self.archive = open(filename, 'r') + except (IOError, OSError), error: + raise ExtractorError("could not open %s: %s" % + (filename, error.strerror)) if encoding: self.pipe([self.decoders[encoding]], "decoding") self.prepare() def run(self, command, description="extraction", stdout=None, stderr=None, - stdin=None, string_output=True): + stdin=None): process = subprocess.Popen(command, stdin=stdin, stdout=stdout, stderr=stderr) status = process.wait() if status != 0: raise ExtractorError("%s error: '%s' returned status code %s" % (description, ' '.join(command), status)) - try: - process.stderr.close() - except AttributeError: - pass - output = process.stdout - if string_output: + for pipe in (process.stdout, process.stderr): try: - output = output.read(-1) - process.stdout.close() + pipe.close() except AttributeError: pass - return output def pipe(self, command, description, stderr=None): output = tempfile.TemporaryFile() - self.run(command, description, output, stderr, self.archive, False) + self.run(command, description, output, stderr, self.archive) self.archive.close() self.archive = output self.archive.flush() @@ -126,10 +125,10 @@ filenames.stop() if self.basename() == first_part[:-1]: return MATCHING_DIRECTORY - return ONE_DIRECTORY + return first_part def basename(self): - pieces = self.filename.split('.') + pieces = os.path.basename(self.filename).split('.') extension = '.' + pieces[-1] if mimetypes.encodings_map.has_key(extension): pieces.pop() @@ -183,7 +182,7 @@ self.pipe(['rpm2cpio', '-'], "rpm2cpio") def basename(self): - pieces = self.filename.rsplit('.', 2) + pieces = os.path.basename(self.filename).rsplit('.', 2) if len(pieces) == 1: return pieces[0] elif pieces[-1] != 'rpm': @@ -207,7 +206,7 @@ self.pipe(['zcat'], "data.tar.gz decompression") def basename(self): - pieces = self.filename.rsplit('_', 1) + pieces = os.path.basename(self.filename).rsplit('_', 1) if len(pieces) == 1: return pieces[0] elif (len(pieces[-1]) > 10) or (not pieces[-1].endswith('.deb')): @@ -218,41 +217,100 @@ return BOMB -extractor_map = {'application/x-tar': TarExtractor, - 'application/zip': ZipExtractor, - 'application/x-msdos-program': ZipExtractor, - 'application/x-debian-package': DebExtractor, - 'application/x-redhat-package-manager': RPMExtractor, - 'application/x-shar': None, - 'application/x-cpio': CpioExtractor} +class ExtractorApplication(object): + extractor_map = {'application/x-tar': TarExtractor, + 'application/zip': ZipExtractor, + 'application/x-msdos-program': ZipExtractor, + 'application/x-debian-package': DebExtractor, + 'application/x-redhat-package-manager': RPMExtractor, + 'application/x-rpm': RPMExtractor, + 'application/x-cpio': CpioExtractor} + actions = ['get_extractor', 'prepare_extraction', 'extract'] -def show_error(filename, message): - print >>sys.stderr, "%s: %s" % (filename, message) + def __init__(self, arguments): + self.filenames = arguments + self.successes = [] + self.failures = [] + + def show_error(self, message): + print >>sys.stderr, "%s: %s" % (self.current_filename, message) -def main(arguments): - for filename in arguments: - mimetype, encoding = mimetypes.guess_type(filename) + def get_extractor(self): + mimetype, encoding = mimetypes.guess_type(self.current_filename) try: - handler = extractor_map[mimetype] + handler = self.extractor_map[mimetype] except KeyError: - show_error(filename, "doesn't look like an archive") - continue - extractor = handler(filename, mimetype, encoding) - contents = extractor.check_contents() - path = '.' - if contents == BOMB: - directory = extractor.basename() + self.show_error("not a known archive type") + return False + try: + self.current_extractor = handler(self.current_filename, mimetype, + encoding) + except ExtractorError, error: + self.show_error(error) + return False + return True + + def prepare_target_directory(self): + basename = self.current_extractor.basename() + for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]: + directory = '%s%s' % (basename, suffix) try: os.mkdir(directory) except OSError, error: - show_error(filename, "could not create %s: %s" % - (error.filename, error.strerror)) - continue - os.chdir(directory) - path = '..' - extractor.extract(path) - if contents == BOMB: - os.chdir('..') + if error.errno == errno.EEXIST: + continue + self.show_error("could not create extraction directory %s: %s" % + (error.filename, error.strerror)) + return None + if suffix != '': + self.show_error("extracted to %s" % (directory,)) + break + else: + self.show_error("all good names for an extraction directory taken") + return directory + + def prepare_extraction(self): + self.current_path = '.' + contents = self.current_extractor.check_contents() + if contents not in (MATCHING_DIRECTORY, EMPTY): + directory = self.prepare_target_directory() + if directory is None: + return False + if contents == BOMB: + os.chdir(directory) + self.current_path = '..' + self.cleanup_actions.append((os.chdir, '..')) + else: + self.cleanup_actions.append((os.rename, contents, directory)) + return True + + def extract(self): + try: + self.current_extractor.extract(self.current_path) + except ExtractorError, error: + self.show_error(error) + return False + return True + + def run(self): + for filename in self.filenames: + running = True + self.current_filename = filename + self.cleanup_actions = [] + actions = [getattr(self, name) for name in self.actions] + while running and actions: + running = actions.pop(0)() + for action in self.cleanup_actions: + action[0](*action[1:]) + if running: + self.successes.append(self.current_filename) + else: + self.failures.append(self.current_filename) + if self.failures: + return 1 + return 0 + if __name__ == '__main__': - main(sys.argv[1:]) + app = ExtractorApplication(sys.argv[1:]) + sys.exit(app.run())