[svn] If we can't figure out what the file is by mimetype, try using the file trunk

Fri, 19 Oct 2007 23:06:53 -0400

author
brett
date
Fri, 19 Oct 2007 23:06:53 -0400
branch
trunk
changeset 30
1015bbd6dc5e
parent 29
5fad99c17221
child 31
c3a2760d1c3a

[svn] If we can't figure out what the file is by mimetype, try using the file
command to figure out what it is instead.

This completely changes the program structure because now we might try to
use several extractors on a particular file before giving up. I haven't
really done the refactoring that would be appropriate for a change this
fundamental. I'd like to do that before the next release.

scripts/dtrx file | annotate | diff | comparison | revisions
tests/compare.py file | annotate | diff | comparison | revisions
tests/tests.yml file | annotate | diff | comparison | revisions
--- a/scripts/dtrx	Fri Oct 19 23:03:17 2007 -0400
+++ b/scripts/dtrx	Fri Oct 19 23:06:53 2007 -0400
@@ -22,6 +22,7 @@
 import mimetypes
 import optparse
 import os
+import re
 import stat
 import subprocess
 import sys
@@ -548,41 +549,86 @@
             BombHandler]
 
 class ExtractorBuilder(object):
-    extractor_map = {}
-    for args in ((TarExtractor, None, 'x-tar'),
-                 (ZipExtractor, None, 'zip', 'x-msdos-program'),
-                 (DebExtractor, DebMetadataExtractor, 'x-debian-package'),
-                 (RPMExtractor, None, 'x-redhat-package-manager', 'x-rpm'),
-                 (CpioExtractor, None, 'x-cpio'),
-                 (GemExtractor, GemMetadataExtractor, 'x-ruby-gem')):
-        for entry in args[2:]:
-            if '/' not in entry:
-                entry = 'application/' + entry
-            extractor_map[entry] = args[0:2]
+    extractor_map = {'tar': (TarExtractor, None),
+                     'zip': (ZipExtractor, None),
+                     'deb': (DebExtractor, DebMetadataExtractor),
+                     'rpm': (RPMExtractor, None),
+                     'cpio': (CpioExtractor, None),
+                     'gem': (GemExtractor, GemMetadataExtractor),
+                     'compress': (CompressionExtractor, None)}
+
+    mimetype_map = {}
+    for mapping in (('tar', 'x-tar'),
+                    ('zip', 'x-msdos-program', 'zip'),
+                    ('deb', 'x-debian-package'),
+                    ('rpm', 'x-redhat-package-manager', 'x-rpm'),
+                    ('cpio', 'x-cpio'),
+                    ('gem', 'x-ruby-gem')):
+        for mimetype in mapping[1:]:
+            if '/' not in mimetype:
+                mimetype = 'application/' + mimetype
+            mimetype_map[mimetype] = mapping[0]
+
+    magic_mime_map = {}
+    for mapping in (('deb', 'Debian binary package'),
+                    ('cpio', 'cpio archive'),
+                    ('tar', 'POSIX tar archive'),
+                    ('zip', 'Zip archive'),
+                    ('rpm', 'RPM')):
+        for pattern in mapping[1:]:
+            magic_mime_map[re.compile(pattern)] = mapping[0]
+    
+    magic_encoding_map = {}
+    for mapping in (('bzip2', 'bzip2 compressed'),
+                    ('gzip', 'gzip compressed')):
+        for pattern in mapping[1:]:
+            magic_encoding_map[re.compile(pattern)] = mapping[0]
 
     def __init__(self, filename, options):
         self.filename = filename
         self.options = options
-        self.mimetype, self.encoding = mimetypes.guess_type(self.filename)
+
+    def build_extractor(self, archive_type, encoding):
+        extractors = self.extractor_map[archive_type]
+        if self.options.metadata and (extractors[1] is not None):
+            extractor = extractors[1]
+        else:
+            extractor = extractors[0]
+        return extractor(self.filename, encoding)
 
     def get_extractor(self):
-        extractor = self.find_extractor()
-        if extractor is None:
-            raise ExtractorError("not a known archive type")
-        return extractor(self.filename, self.encoding)
+        for func_name in ('mimetype', 'magic'):
+            archive_type, encoding = getattr(self, 'try_by_' + func_name)()
+            if archive_type is not None:
+                yield self.build_extractor(archive_type, encoding)
 
-    def find_extractor(self):
-        extractor = None
+    def try_by_mimetype(self):
+        mimetype, encoding = mimetypes.guess_type(self.filename)
         try:
-            extractors = self.extractor_map[self.mimetype]
-            if self.options.metadata and (extractors[1] is not None):
-                extractor = extractors[1]
-            else:
-                extractor = extractors[0]
+            return self.mimetype_map[mimetype], encoding
         except KeyError:
-            if self.encoding:
-                extractor = CompressionExtractor
-        return extractor
+            if encoding:
+                return 'compress', encoding
+        return None, None
+
+    def try_by_magic(self):
+        process = subprocess.Popen(['file', '-z', self.filename],
+                                   stdout=subprocess.PIPE)
+        status = process.wait()
+        if status != 0:
+            return None, None
+        output = process.stdout.readline()
+        process.stdout.close()
+        if output.startswith('%s: ' % self.filename):
+            output = output[len(self.filename) + 2:]
+        results = [None, None]
+        for index, mapping in enumerate((self.magic_mime_map,
+                                         self.magic_encoding_map)):
+            for regexp, result in mapping.items():
+                if regexp.search(output):
+                    results[index] = result
+                    break
+        return results
 
 
 class ExtractorApplication(object):
@@ -638,10 +684,6 @@
         handler.setFormatter(formatter)
         self.logger.addHandler(handler)
 
-    def get_extractor(self):
-        builder = ExtractorBuilder(self.current_filename, self.options)
-        self.current_extractor = builder.get_extractor()
-
     def get_handler(self):
         for var_name in ('type', 'name'):
             exec('content_%s = self.current_extractor.content_%s' %
@@ -683,44 +725,50 @@
             self.failures.append(self.current_filename)
 
     def extract(self):
-        while self.archives:
-            self.current_directory, filenames = self.archives.popitem()
-            for filename in filenames:
-                os.chdir(self.current_directory)
-                self.current_filename = filename
-                success = (self.report(self.get_extractor) and
-                           self.report(self.current_extractor.extract) and
-                           self.report(self.get_handler) and
-                           self.report(self.current_handler.handle))
-                if success:
-                    self.recurse()
-                self.record_status(success)
-            self.options.one_entry_policy.permanent_policy = EXTRACT_WRAP
+        success = (self.report(self.current_extractor.extract) and
+                   self.report(self.get_handler) and
+                   self.report(self.current_handler.handle))
+        if success:
+            self.recurse()
+        return success
 
     def show_contents(self):
         for filename in self.current_extractor.get_filenames():
             print filename
 
-    def show_list(self):
-        filenames = self.archives.values()[0]
-        if len(filenames) > 1:
-            header = "%s:\n"
+    def make_list(self):
+        if len(self.archives.values()[0]) == 1:
+            def show_list():
+                return self.report(self.show_contents)
         else:
-            header = None
-        for filename in filenames:
-            if header:
-                print header % (filename,),
-                header = "\n%s:\n"
-            self.current_filename = filename
-            success = (self.report(self.get_extractor) and
-                       self.report(self.show_contents))
-            self.record_status(success)
+            def show_list():
+                if self.current_filename == self.filenames[0]:
+                    print "%s:\n" % (self.current_filename,),
+                else:
+                    print "\n%s:\n" % (self.current_filename,),
+                return self.report(self.show_contents)
+        return show_list
 
     def run(self):
         if self.options.show_list:
-            self.show_list()
+            action_function = self.make_list()
         else:
-            self.extract()
+            action_function = self.extract
+        while self.archives:
+            self.current_directory, self.filenames = self.archives.popitem()
+            os.chdir(self.current_directory)
+            for filename in self.filenames:
+                self.current_filename = filename
+                builder = ExtractorBuilder(self.current_filename, self.options)
+                for extractor in builder.get_extractor():
+                    self.current_extractor = extractor
+                    success = action_function()
+                    if success:
+                        self.record_status(success)
+                        break
+                else:
+                    self.record_status(success=False)
+            self.options.one_entry_policy.permanent_policy = EXTRACT_WRAP
         if self.failures:
             return 1
         return 0
--- a/tests/compare.py	Fri Oct 19 23:03:17 2007 -0400
+++ b/tests/compare.py	Fri Oct 19 23:06:53 2007 -0400
@@ -76,14 +76,23 @@
         script.close()
         subprocess.call(['chmod', 'u+w', TESTSCRIPT_NAME])
 
+    def run_script(self, key):
+        commands = getattr(self, key)
+        if commands is not None:
+            if self.directory:
+                directory_hint = '../'
+            else:
+                directory_hint = ''
+            self.write_script(commands)
+            subprocess.call(['sh', TESTSCRIPT_NAME, directory_hint])
+
     def get_shell_results(self):
+        self.run_script('prerun')
         self.write_script(self.baseline)
         return self.get_results(['sh', TESTSCRIPT_NAME] + self.filenames)
 
     def get_extractor_results(self):
-        if self.prerun:
-            self.write_script(self.prerun)
-            subprocess.call(['sh', TESTSCRIPT_NAME])
+        self.run_script('prerun')
         input_buffer.seek(0, 0)
         input_buffer.truncate()
         if self.input:
@@ -102,9 +111,7 @@
         return subprocess.call(['sh', TESTSCRIPT_NAME])
 
     def clean(self):
-        if self.cleanup is not None:
-            self.write_script(self.cleanup)
-            subprocess.call(['sh', TESTSCRIPT_NAME])
+        self.run_script('cleanup')
         if self.directory:
             target = os.path.join(ROOT_DIR, self.directory)
             extra_options = ['!', '-name', TESTSCRIPT_NAME]
--- a/tests/tests.yml	Fri Oct 19 23:03:17 2007 -0400
+++ b/tests/tests.yml	Fri Oct 19 23:06:53 2007 -0400
@@ -101,7 +101,7 @@
 - name: overwrite protection
   filenames: test-1.23.tar.bz2
   baseline: |
-    mkdir test-1.23 test-1.23.1
+    mkdir test-1.23.1
     cd test-1.23.1
     tar -jxf ../$1
   prerun: |
@@ -111,7 +111,6 @@
   filenames: test-1.23.tar.bz2
   options: -n -o
   baseline: |
-    mkdir test-1.23
     cd test-1.23
     tar -jxf ../$1
   prerun: |
@@ -364,3 +363,10 @@
     }
     extract $1
     extract $1 .1
+
+- name: extracting file with bad extension
+  filenames: test-1.23.bin
+  prerun: cp ${1}test-1.23.tar.gz ${1}test-1.23.bin
+  cleanup: rm -f ${1}test-1.23.bin
+  baseline: |
+    tar -zxf $1

mercurial