# HG changeset patch # User brett # Date 1163477190 18000 # Node ID 97388f5ff770108021d7eadae63cccb19806a5bc # Parent 1f3cb3845dfd77cce92fd85ae935d29f71ca527b [svn] Make ExtractorApplication suck less. Now the strategies for handling different archive types are out in their own classes, and polymorphism takes care of everything for us. This is way cleaner. While I was at it I changed the behavior in the case where an archive contains one directory that doesn't match the basename. I now treat that the same as a bomb. This can lead to silly directory structures but ensures that there's no "data" loss nor unexpected results. diff -r 1f3cb3845dfd -r 97388f5ff770 TODO --- a/TODO Sat Nov 11 18:42:19 2006 -0500 +++ b/TODO Mon Nov 13 23:06:30 2006 -0500 @@ -1,13 +1,16 @@ Things which I have a use case/anti-use case for: -* Clean up ExtractorApplication, ugh. -* Reconsider the current extraction policy for the ONE_DIRECTORY case. * Decompress non-archive files. +* Write tests for extracting in a different directory from where the archives + live. +* Figure out what to do about warnings in the Handlers. Things that are generally good: * Better tests. * Better error messages. Things I think might be good but can't prove: +* Consider having options about whether or not to make sane directories, + have tarbomb protection, etc. * Use zipfile instead of the zip commands. * Processing from stdin. * Extracting control.tar.gz from deb files. diff -r 1f3cb3845dfd -r 97388f5ff770 scripts/x --- a/scripts/x Sat Nov 11 18:42:19 2006 -0500 +++ b/scripts/x Mon Nov 13 23:06:30 2006 -0500 @@ -245,6 +245,62 @@ return BOMB +class MatchHandler(object): + def __init__(self, extractor, contents): + self.extractor = extractor + self.contents = contents + self.directory = extractor.basename() + + def extract(self, directory='.'): + try: + self.extractor.extract(directory) + except ExtractorError, error: + return error.strerror + + def cleanup(self): + command = 'chmod' + status = subprocess.call(['chmod', '-R', 'u+rw', self.directory]) + if status == 0: + command = 'find' + status = subprocess.call(['find', self.directory, '-type', 'd', + '-exec', 'chmod', 'u+x', '{}', ';']) + if status != 0: + return "%s returned with exit status %s" % (command, status) + + +class BombHandler(MatchHandler): + def __init__(self, extractor, contents): + MatchHandler.__init__(self, extractor, contents) + basename = self.directory + for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]: + self.directory = '%s%s' % (basename, suffix) + try: + os.mkdir(self.directory) + except OSError, error: + if error.errno == errno.EEXIST: + continue + raise ValueError("could not make extraction directory %s: %s" % + (error.filename, error.strerror)) +## if suffix != '': +## self.show_error("extracted to %s" % (directory,)) + break + else: + raise ValueError("all good names for an extraction directory taken") + + def extract(self): + os.chdir(self.directory) + return MatchHandler.extract(self, '..') + + def cleanup(self): + os.chdir('..') + return MatchHandler.cleanup(self) + + +class EmptyHandler(object): + def __init__(self, extractor, contents): pass + def extract(self): pass + def cleanup(self): pass + extractor_map = {'application/x-tar': TarExtractor, 'application/zip': ZipExtractor, 'application/x-msdos-program': ZipExtractor, @@ -253,9 +309,10 @@ 'application/x-rpm': RPMExtractor, 'application/x-cpio': CpioExtractor} +handler_map = {EMPTY: EmptyHandler, + MATCHING_DIRECTORY: MatchHandler} + class ExtractorApplication(object): - actions = ['get_extractor', 'prepare_extraction', 'extract', 'recurse'] - def __init__(self, arguments): self.parse_options(arguments) self.successes = [] @@ -281,109 +338,49 @@ def get_extractor(self): mimetype, encoding = mimetypes.guess_type(self.current_filename) try: - handler = extractor_map[mimetype] + extractor = extractor_map[mimetype] except KeyError: - self.show_error("not a known archive type") - return False + return "not a known archive type" try: - self.current_extractor = handler(self.current_filename, mimetype, - encoding) + self.current_extractor = extractor(self.current_filename, mimetype, + encoding) + content = self.current_extractor.check_contents() + handler = handler_map.get(content, BombHandler) + self.current_handler = handler(self.current_extractor, content) except ExtractorError, error: + return str(error) + + def recurse(self): + if not self.options.recursive: + return + archive_path = os.path.split(self.current_filename)[0] + for filename in self.current_extractor.included_archives: + tail_path, basename = os.path.split(filename) + directory = os.path.join(self.current_directory, archive_path, + self.current_handler.directory, tail_path) + self.archives.setdefault(directory, []).append(basename) + + def report(self, function, *args): + error = function(*args) + if error: self.show_error(error) return False return True - def prepare_target_directory(self): - basename = self.current_extractor.basename() - for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]: - directory = '%s%s' % (basename, suffix) - try: - os.mkdir(directory) - except OSError, error: - if error.errno == errno.EEXIST: - continue - self.show_error("could not create extraction directory %s: %s" % - (error.filename, error.strerror)) - return None - if suffix != '': - self.show_error("extracted to %s" % (directory,)) - break - else: - self.show_error("all good names for an extraction directory taken") - return directory - - def move_to_directory(self, filename, target): - if not os.path.isdir(filename): - filename = os.path.split(filename)[0] - target = os.path.join(target, filename) - os.rename(filename, target) - - def prepare_extraction(self): - self.current_path = '.' - contents = self.current_extractor.check_contents() - if contents == MATCHING_DIRECTORY: - self.target_directory = self.current_filename - elif contents != EMPTY: - self.target_directory = self.prepare_target_directory() - if self.target_directory is None: - return False - if contents == BOMB: - os.chdir(self.target_directory) - self.current_path = '..' - else: - self.cleanup_actions.append((self.move_to_directory, contents, - self.target_directory)) - else: - self.target_directory = None - return True - - def extract(self): - try: - self.current_extractor.extract(self.current_path) - except ExtractorError, error: - self.show_error(error) - return False - return True - - def recurse(self): - if not self.options.recursive: - return True - for filename in self.current_extractor.included_archives: - tail_path, basename = os.path.split(filename) - directory = os.path.join(self.current_directory, - self.target_directory, tail_path) - self.archives.setdefault(directory, []).append(basename) - return True - - def fix_perms(self): - if self.target_directory is None: - return True - status = subprocess.call(['chmod', '-R', 'u+rw', - os.path.join(self.current_directory, - self.target_directory)]) - if status == 0: - status = subprocess.call(['find', - os.path.join(self.current_directory, - self.target_directory), - '-type', 'd', - '-exec', 'chmod', 'u+x', '{}', ';']) - return status == 0 - def run(self): while self.archives: self.current_directory, filenames = self.archives.popitem() for filename in filenames: os.chdir(self.current_directory) - running = True self.current_filename = filename self.cleanup_actions = [] - actions = [getattr(self, name) for name in self.actions] - while running and actions: - running = actions.pop(0)() - for action in self.cleanup_actions: - action[0](*action[1:]) - running = self.fix_perms() - if running: + success = self.report(self.get_extractor) + if success: + for name in 'extract', 'cleanup': + success = (self.report(getattr(self.current_handler, + name)) and success) + self.recurse() + if success: self.successes.append(self.current_filename) else: self.failures.append(self.current_filename) diff -r 1f3cb3845dfd -r 97388f5ff770 tests/compare.py --- a/tests/compare.py Sat Nov 11 18:42:19 2006 -0500 +++ b/tests/compare.py Mon Nov 13 23:06:30 2006 -0500 @@ -50,10 +50,12 @@ ['mkdir test-recursive-badperms', 'cd test-recursive-badperms', 'tar -jxf ../test-recursive-badperms.tar.bz2', - 'tar -xf test-badperms.tar', - 'mv testdir test-badperms', - 'chmod 755 test-badperms'], - ['if [ "x`cat test-recursive-badperms/test-badperms/testfile`" = "xhey" ]', + 'mkdir test-badperms', + 'cd test-badperms', + 'tar -xf ../test-badperms.tar', + 'chmod 755 testdir'], + ['if [ "x`cat test-recursive-badperms/test-badperms/testdir/testfile`" = \ + "xhey" ]', 'then exit 0; else exit 1; fi'] )}