scripts/dtrx

branch
trunk
changeset 30
1015bbd6dc5e
parent 29
5fad99c17221
child 31
c3a2760d1c3a
--- a/scripts/dtrx	Fri Oct 19 23:03:17 2007 -0400
+++ b/scripts/dtrx	Fri Oct 19 23:06:53 2007 -0400
@@ -22,6 +22,7 @@
 import mimetypes
 import optparse
 import os
+import re
 import stat
 import subprocess
 import sys
@@ -548,41 +549,86 @@
             BombHandler]
 
 class ExtractorBuilder(object):
-    extractor_map = {}
-    for args in ((TarExtractor, None, 'x-tar'),
-                 (ZipExtractor, None, 'zip', 'x-msdos-program'),
-                 (DebExtractor, DebMetadataExtractor, 'x-debian-package'),
-                 (RPMExtractor, None, 'x-redhat-package-manager', 'x-rpm'),
-                 (CpioExtractor, None, 'x-cpio'),
-                 (GemExtractor, GemMetadataExtractor, 'x-ruby-gem')):
-        for entry in args[2:]:
-            if '/' not in entry:
-                entry = 'application/' + entry
-            extractor_map[entry] = args[0:2]
+    extractor_map = {'tar': (TarExtractor, None),
+                     'zip': (ZipExtractor, None),
+                     'deb': (DebExtractor, DebMetadataExtractor),
+                     'rpm': (RPMExtractor, None),
+                     'cpio': (CpioExtractor, None),
+                     'gem': (GemExtractor, GemMetadataExtractor),
+                     'compress': (CompressionExtractor, None)}
+
+    mimetype_map = {}
+    for mapping in (('tar', 'x-tar'),
+                    ('zip', 'x-msdos-program', 'zip'),
+                    ('deb', 'x-debian-package'),
+                    ('rpm', 'x-redhat-package-manager', 'x-rpm'),
+                    ('cpio', 'x-cpio'),
+                    ('gem', 'x-ruby-gem')):
+        for mimetype in mapping[1:]:
+            if '/' not in mimetype:
+                mimetype = 'application/' + mimetype
+            mimetype_map[mimetype] = mapping[0]
+
+    magic_mime_map = {}
+    for mapping in (('deb', 'Debian binary package'),
+                    ('cpio', 'cpio archive'),
+                    ('tar', 'POSIX tar archive'),
+                    ('zip', 'Zip archive'),
+                    ('rpm', 'RPM')):
+        for pattern in mapping[1:]:
+            magic_mime_map[re.compile(pattern)] = mapping[0]
+    
+    magic_encoding_map = {}
+    for mapping in (('bzip2', 'bzip2 compressed'),
+                    ('gzip', 'gzip compressed')):
+        for pattern in mapping[1:]:
+            magic_encoding_map[re.compile(pattern)] = mapping[0]
 
     def __init__(self, filename, options):
         self.filename = filename
         self.options = options
-        self.mimetype, self.encoding = mimetypes.guess_type(self.filename)
+
+    def build_extractor(self, archive_type, encoding):
+        extractors = self.extractor_map[archive_type]
+        if self.options.metadata and (extractors[1] is not None):
+            extractor = extractors[1]
+        else:
+            extractor = extractors[0]
+        return extractor(self.filename, encoding)
 
     def get_extractor(self):
-        extractor = self.find_extractor()
-        if extractor is None:
-            raise ExtractorError("not a known archive type")
-        return extractor(self.filename, self.encoding)
+        for func_name in ('mimetype', 'magic'):
+            archive_type, encoding = getattr(self, 'try_by_' + func_name)()
+            if archive_type is not None:
+                yield self.build_extractor(archive_type, encoding)
 
-    def find_extractor(self):
-        extractor = None
+    def try_by_mimetype(self):
+        mimetype, encoding = mimetypes.guess_type(self.filename)
         try:
-            extractors = self.extractor_map[self.mimetype]
-            if self.options.metadata and (extractors[1] is not None):
-                extractor = extractors[1]
-            else:
-                extractor = extractors[0]
+            return self.mimetype_map[mimetype], encoding
         except KeyError:
-            if self.encoding:
-                extractor = CompressionExtractor
-        return extractor
+            if encoding:
+                return 'compress', encoding
+        return None, None
+
+    def try_by_magic(self):
+        process = subprocess.Popen(['file', '-z', self.filename],
+                                   stdout=subprocess.PIPE)
+        status = process.wait()
+        if status != 0:
+            return None, None
+        output = process.stdout.readline()
+        process.stdout.close()
+        if output.startswith('%s: ' % self.filename):
+            output = output[len(self.filename) + 2:]
+        results = [None, None]
+        for index, mapping in enumerate((self.magic_mime_map,
+                                         self.magic_encoding_map)):
+            for regexp, result in mapping.items():
+                if regexp.search(output):
+                    results[index] = result
+                    break
+        return results
 
 
 class ExtractorApplication(object):
@@ -638,10 +684,6 @@
         handler.setFormatter(formatter)
         self.logger.addHandler(handler)
 
-    def get_extractor(self):
-        builder = ExtractorBuilder(self.current_filename, self.options)
-        self.current_extractor = builder.get_extractor()
-
     def get_handler(self):
         for var_name in ('type', 'name'):
             exec('content_%s = self.current_extractor.content_%s' %
@@ -683,44 +725,50 @@
             self.failures.append(self.current_filename)
 
     def extract(self):
-        while self.archives:
-            self.current_directory, filenames = self.archives.popitem()
-            for filename in filenames:
-                os.chdir(self.current_directory)
-                self.current_filename = filename
-                success = (self.report(self.get_extractor) and
-                           self.report(self.current_extractor.extract) and
-                           self.report(self.get_handler) and
-                           self.report(self.current_handler.handle))
-                if success:
-                    self.recurse()
-                self.record_status(success)
-            self.options.one_entry_policy.permanent_policy = EXTRACT_WRAP
+        success = (self.report(self.current_extractor.extract) and
+                   self.report(self.get_handler) and
+                   self.report(self.current_handler.handle))
+        if success:
+            self.recurse()
+        return success
 
     def show_contents(self):
         for filename in self.current_extractor.get_filenames():
             print filename
 
-    def show_list(self):
-        filenames = self.archives.values()[0]
-        if len(filenames) > 1:
-            header = "%s:\n"
+    def make_list(self):
+        if len(self.archives.values()[0]) == 1:
+            def show_list():
+                return self.report(self.show_contents)
         else:
-            header = None
-        for filename in filenames:
-            if header:
-                print header % (filename,),
-                header = "\n%s:\n"
-            self.current_filename = filename
-            success = (self.report(self.get_extractor) and
-                       self.report(self.show_contents))
-            self.record_status(success)
+            def show_list():
+                if self.current_filename == self.filenames[0]:
+                    print "%s:\n" % (self.current_filename,),
+                else:
+                    print "\n%s:\n" % (self.current_filename,),
+                return self.report(self.show_contents)
+        return show_list
 
     def run(self):
         if self.options.show_list:
-            self.show_list()
+            action_function = self.make_list()
         else:
-            self.extract()
+            action_function = self.extract
+        while self.archives:
+            self.current_directory, self.filenames = self.archives.popitem()
+            os.chdir(self.current_directory)
+            for filename in self.filenames:
+                self.current_filename = filename
+                builder = ExtractorBuilder(self.current_filename, self.options)
+                for extractor in builder.get_extractor():
+                    self.current_extractor = extractor
+                    success = action_function()
+                    if success:
+                        self.record_status(success)
+                        break
+                else:
+                    self.record_status(success=False)
+            self.options.one_entry_policy.permanent_policy = EXTRACT_WRAP
         if self.failures:
             return 1
         return 0

mercurial