[svn] Make ExtractorApplication suck less. Now the strategies for handling trunk

Mon, 13 Nov 2006 23:06:30 -0500

author
brett
date
Mon, 13 Nov 2006 23:06:30 -0500
branch
trunk
changeset 8
97388f5ff770
parent 7
1f3cb3845dfd
child 9
920417b8acc9

[svn] Make ExtractorApplication suck less. Now the strategies for handling
different archive types are out in their own classes, and polymorphism
takes care of everything for us. This is way cleaner.

While I was at it I changed the behavior in the case where an archive
contains one directory that doesn't match the basename. I now treat that
the same as a bomb. This can lead to silly directory structures but
ensures that there's no "data" loss nor unexpected results.

TODO file | annotate | diff | comparison | revisions
scripts/x file | annotate | diff | comparison | revisions
tests/compare.py file | annotate | diff | comparison | revisions
--- a/TODO	Sat Nov 11 18:42:19 2006 -0500
+++ b/TODO	Mon Nov 13 23:06:30 2006 -0500
@@ -1,13 +1,16 @@
 Things which I have a use case/anti-use case for:
-* Clean up ExtractorApplication, ugh.
-* Reconsider the current extraction policy for the ONE_DIRECTORY case.
 * Decompress non-archive files.
+* Write tests for extracting in a different directory from where the archives
+  live.
+* Figure out what to do about warnings in the Handlers.
 
 Things that are generally good:
 * Better tests.
 * Better error messages.
 
 Things I think might be good but can't prove:
+* Consider having options about whether or not to make sane directories,
+  have tarbomb protection, etc.
 * Use zipfile instead of the zip commands.
 * Processing from stdin.
 * Extracting control.tar.gz from deb files.
--- a/scripts/x	Sat Nov 11 18:42:19 2006 -0500
+++ b/scripts/x	Mon Nov 13 23:06:30 2006 -0500
@@ -245,6 +245,62 @@
         return BOMB
         
 
+class MatchHandler(object):
+    def __init__(self, extractor, contents):
+        self.extractor = extractor
+        self.contents = contents
+        self.directory = extractor.basename()
+
+    def extract(self, directory='.'):
+        try:
+            self.extractor.extract(directory)
+        except ExtractorError, error:
+            return error.strerror
+
+    def cleanup(self):
+        command = 'chmod'
+        status = subprocess.call(['chmod', '-R', 'u+rw', self.directory])
+        if status == 0:
+            command = 'find'
+            status = subprocess.call(['find', self.directory, '-type', 'd',
+                                      '-exec', 'chmod', 'u+x', '{}', ';'])
+        if status != 0:
+            return "%s returned with exit status %s" % (command, status)
+        
+        
+class BombHandler(MatchHandler):
+    def __init__(self, extractor, contents):
+        MatchHandler.__init__(self, extractor, contents)
+        basename = self.directory
+        for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]:
+            self.directory = '%s%s' % (basename, suffix)
+            try:
+                os.mkdir(self.directory)
+            except OSError, error:
+                if error.errno == errno.EEXIST:
+                    continue
+                raise ValueError("could not make extraction directory %s: %s" %
+                                 (error.filename, error.strerror))
+##             if suffix != '':
+##                 self.show_error("extracted to %s" % (directory,))
+            break
+        else:
+            raise ValueError("all good names for an extraction directory taken")
+
+    def extract(self):
+        os.chdir(self.directory)
+        return MatchHandler.extract(self, '..')
+
+    def cleanup(self):
+        os.chdir('..')
+        return MatchHandler.cleanup(self)
+        
+
+class EmptyHandler(object):
+    def __init__(self, extractor, contents): pass
+    def extract(self): pass
+    def cleanup(self): pass
+
 extractor_map = {'application/x-tar': TarExtractor,
                  'application/zip': ZipExtractor,
                  'application/x-msdos-program': ZipExtractor,
@@ -253,9 +309,10 @@
                  'application/x-rpm': RPMExtractor,
                  'application/x-cpio': CpioExtractor}
 
+handler_map = {EMPTY: EmptyHandler,
+               MATCHING_DIRECTORY: MatchHandler}
+
 class ExtractorApplication(object):
-    actions = ['get_extractor', 'prepare_extraction', 'extract', 'recurse']
-
     def __init__(self, arguments):
         self.parse_options(arguments)
         self.successes = []
@@ -281,109 +338,49 @@
     def get_extractor(self):
         mimetype, encoding = mimetypes.guess_type(self.current_filename)
         try:
-            handler = extractor_map[mimetype]
+            extractor = extractor_map[mimetype]
         except KeyError:
-            self.show_error("not a known archive type")
-            return False
+            return "not a known archive type"
         try:
-            self.current_extractor = handler(self.current_filename, mimetype,
-                                             encoding)
+            self.current_extractor = extractor(self.current_filename, mimetype,
+                                               encoding)
+            content = self.current_extractor.check_contents()
+            handler = handler_map.get(content, BombHandler)
+            self.current_handler = handler(self.current_extractor, content)
         except ExtractorError, error:
+            return str(error)
+
+    def recurse(self):
+        if not self.options.recursive:
+            return
+        archive_path = os.path.split(self.current_filename)[0]
+        for filename in self.current_extractor.included_archives:
+            tail_path, basename = os.path.split(filename)
+            directory = os.path.join(self.current_directory, archive_path,
+                                     self.current_handler.directory, tail_path)
+            self.archives.setdefault(directory, []).append(basename)
+
+    def report(self, function, *args):
+        error = function(*args)
+        if error:
             self.show_error(error)
             return False
         return True
 
-    def prepare_target_directory(self):
-        basename = self.current_extractor.basename()
-        for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]:
-            directory = '%s%s' % (basename, suffix)
-            try:
-                os.mkdir(directory)
-            except OSError, error:
-                if error.errno == errno.EEXIST:
-                    continue
-                self.show_error("could not create extraction directory %s: %s" %
-                                (error.filename, error.strerror))
-                return None
-            if suffix != '':
-                self.show_error("extracted to %s" % (directory,))
-            break
-        else:
-            self.show_error("all good names for an extraction directory taken")
-        return directory
-
-    def move_to_directory(self, filename, target):
-        if not os.path.isdir(filename):
-            filename = os.path.split(filename)[0]
-            target = os.path.join(target, filename)
-        os.rename(filename, target)
-
-    def prepare_extraction(self):
-        self.current_path = '.'
-        contents = self.current_extractor.check_contents()
-        if contents == MATCHING_DIRECTORY:
-            self.target_directory = self.current_filename
-        elif contents != EMPTY:
-            self.target_directory = self.prepare_target_directory()
-            if self.target_directory is None:
-                return False
-            if contents == BOMB:
-                os.chdir(self.target_directory)
-                self.current_path = '..'
-            else:
-                self.cleanup_actions.append((self.move_to_directory, contents,
-                                             self.target_directory))
-        else:
-            self.target_directory = None
-        return True
-
-    def extract(self):
-        try:
-            self.current_extractor.extract(self.current_path)
-        except ExtractorError, error:
-            self.show_error(error)
-            return False
-        return True
-
-    def recurse(self):
-        if not self.options.recursive:
-            return True
-        for filename in self.current_extractor.included_archives:
-            tail_path, basename = os.path.split(filename)
-            directory = os.path.join(self.current_directory,
-                                     self.target_directory, tail_path)
-            self.archives.setdefault(directory, []).append(basename)
-        return True
-
-    def fix_perms(self):
-        if self.target_directory is None:
-            return True
-        status = subprocess.call(['chmod', '-R', 'u+rw',
-                                  os.path.join(self.current_directory,
-                                               self.target_directory)])
-        if status == 0:
-            status = subprocess.call(['find',
-                                      os.path.join(self.current_directory,
-                                                   self.target_directory),
-                                      '-type', 'd',
-                                      '-exec', 'chmod', 'u+x', '{}', ';'])
-        return status == 0
-
     def run(self):
         while self.archives:
             self.current_directory, filenames = self.archives.popitem()
             for filename in filenames:
                 os.chdir(self.current_directory)
-                running = True
                 self.current_filename = filename
                 self.cleanup_actions = []
-                actions = [getattr(self, name) for name in self.actions]
-                while running and actions:
-                    running = actions.pop(0)()
-                for action in self.cleanup_actions:
-                    action[0](*action[1:])
-                running = self.fix_perms()
-                if running:
+                success = self.report(self.get_extractor)
+                if success:
+                    for name in 'extract', 'cleanup':
+                        success = (self.report(getattr(self.current_handler,
+                                                       name)) and success)
+                    self.recurse()
+                if success:
                     self.successes.append(self.current_filename)
                 else:
                     self.failures.append(self.current_filename)
--- a/tests/compare.py	Sat Nov 11 18:42:19 2006 -0500
+++ b/tests/compare.py	Mon Nov 13 23:06:30 2006 -0500
@@ -50,10 +50,12 @@
     ['mkdir test-recursive-badperms',
      'cd test-recursive-badperms',
      'tar -jxf ../test-recursive-badperms.tar.bz2',
-     'tar -xf test-badperms.tar',
-     'mv testdir test-badperms',
-     'chmod 755 test-badperms'],
-    ['if [ "x`cat test-recursive-badperms/test-badperms/testfile`" = "xhey" ]',
+     'mkdir test-badperms',
+     'cd test-badperms',
+     'tar -xf ../test-badperms.tar',
+     'chmod 755 testdir'],
+    ['if [ "x`cat test-recursive-badperms/test-badperms/testdir/testfile`" = \
+      "xhey" ]',
      'then exit 0; else exit 1; fi']
     )}
 

mercurial