scripts/x

branch
trunk
changeset 1
a86a0cb0dd57
child 2
1570351bf863
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/scripts/x	Sun Oct 29 19:34:46 2006 -0500
@@ -0,0 +1,236 @@
+#!/usr/bin/env python
+#
+# x -- Intelligently extract various archive types.
+# Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>.
+#
+# This program is free software; you can redistribute it and/or modify it
+# under the terms of the GNU General Public License as published by the
+# Free Software Foundation; either version 2 of the License, or (at your
+# option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
+# Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with this program; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, 5th Floor, Boston, MA, 02111.
+
+import mimetypes
+import os
+import subprocess
+import sys
+import tempfile
+
+from cStringIO import StringIO
+
+mimetypes.encodings_map.setdefault('.bz2', 'bzip2')
+
+MATCHING_DIRECTORY = 1
+ONE_DIRECTORY = 2
+BOMB = 3
+EMPTY = 4
+
+class ExtractorError(Exception):
+    pass
+
+
+class ProcessStreamer(object):
+    def __init__(self, command, stdin, description="checking contents",
+                 stderr=None):
+        self.process = subprocess.Popen(command, bufsize=1, stdin=stdin,
+                                        stdout=subprocess.PIPE, stderr=stderr)
+        self.command = ' '.join(command)
+        self.description = description
+
+    def __iter__(self):
+        return self
+
+    def next(self):
+        line = self.process.stdout.readline()
+        if line:
+            return line.rstrip('\n')
+        else:
+            raise StopIteration
+
+    def stop(self):
+        while self.process.stdout.readline():
+            pass
+        self.process.stdout.close()
+        status = self.process.wait()
+        if status != 0:
+            raise ExtractorError("%s error: '%s' returned status code %s" %
+                                 (self.description, self.command, status))
+        try:
+            self.process.stderr.close()
+        except AttributeError:
+            pass
+    
+
+class BaseExtractor(object):
+    decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'}
+
+    def __init__(self, filename, mimetype, encoding):
+        self.filename = filename
+        self.mimetype = mimetype
+        self.encoding = encoding
+        self.archive = open(filename, 'r')
+        if encoding:
+            self.pipe([self.decoders[encoding]], "decoding")
+        self.prepare()
+
+    def run(self, command, description="extraction", stdout=None, stderr=None,
+            stdin=None, string_output=True):
+        process = subprocess.Popen(command, stdin=stdin, stdout=stdout,
+                                   stderr=stderr)
+        status = process.wait()
+        if status != 0:
+            raise ExtractorError("%s error: '%s' returned status code %s" %
+                                 (description, ' '.join(command), status))
+        try:
+            process.stderr.close()
+        except AttributeError:
+            pass
+        output = process.stdout
+        if string_output:
+            try:
+                output = output.read(-1)
+                process.stdout.close()
+            except AttributeError:
+                pass
+        return output
+
+    def pipe(self, command, description, stderr=None):
+        output = tempfile.TemporaryFile()
+        self.run(command, description, output, stderr, self.archive, False)
+        self.archive.close()
+        self.archive = output
+        self.archive.flush()
+    
+    def prepare(self):
+        pass
+
+    def check_contents(self):
+        self.archive.seek(0, 0)
+        filenames = self.get_filenames()
+        try:
+            first_part = filenames.next().split('/', 1)[0] + '/'
+        except IndexError:
+            filenames.stop()
+            return EMPTY
+        for filename in filenames:
+            if not filename.startswith(first_part):
+                filenames.stop()
+                return BOMB
+        filenames.stop()
+        if self.basename() == first_part[:-1]:
+            return MATCHING_DIRECTORY
+        return ONE_DIRECTORY
+
+    def basename(self):
+        pieces = self.filename.split('.')
+        if mimetypes.encodings_map.has_key('.' + pieces.pop()):
+            pieces.pop()
+        return '.'.join(pieces)
+
+    def extract(self, path):
+        self.archive.seek(0, 0)
+        self.extract_archive()
+    
+
+class TarExtractor(BaseExtractor):
+    def get_filenames(self):
+        return ProcessStreamer(['tar', '-t'], self.archive)
+
+    def extract_archive(self):
+        self.run(['tar', '-x'], stdin=self.archive)
+        
+        
+class ZipExtractor(BaseExtractor):
+    def __init__(self, filename, mimetype, encoding):
+        self.filename = filename
+        self.mimetype = mimetype
+        self.encoding = encoding
+        self.archive = StringIO()
+
+    def get_filenames(self):
+        return ProcessStreamer(['zipinfo', '-1', self.filename], None)
+
+    def extract(self, path):
+        self.run(['unzip', '-q', os.path.join(path, self.filename)])
+
+
+class CpioExtractor(BaseExtractor):
+    def get_filenames(self):
+        return ProcessStreamer(['cpio', '-t'], self.archive,
+                               stderr=subprocess.PIPE)
+
+    def extract_archive(self):
+        self.run(['cpio', '-i', '--make-directories',
+                  '--no-absolute-filenames'],
+                 stderr=subprocess.PIPE, stdin=self.archive)
+
+
+class RPMExtractor(CpioExtractor):
+    def prepare(self):
+        self.pipe(['rpm2cpio', '-'], "rpm2cpio")
+
+    def basename(self):
+        return self.filename.rsplit('.', 2)[0]
+
+    def check_contents(self):
+        return BOMB
+        
+
+class DebExtractor(TarExtractor):
+    def prepare(self):
+        self.pipe(['ar', 'p', self.filename, 'data.tar.gz'],
+                  "data.tar.gz extraction")
+        self.archive.seek(0, 0)
+        self.pipe(['zcat'], "data.tar.gz decompression")
+
+    def basename(self):
+        return self.filename.rsplit('_', 1)[0]
+
+    def check_contents(self):
+        return BOMB
+        
+
+extractor_map = {'application/x-tar': TarExtractor,
+                 'application/zip': ZipExtractor,
+                 'application/x-debian-package': DebExtractor,
+                 'application/x-redhat-package-manager': RPMExtractor,
+                 'application/x-shar': None,
+                 'application/x-cpio': CpioExtractor}
+
+def show_error(filename, message):
+    print >>sys.stderr, "%s: %s" % (filename, message)
+
+def main(arguments):
+    for filename in arguments:
+        mimetype, encoding = mimetypes.guess_type(filename)
+        try:
+            handler = extractor_map[mimetype]
+        except KeyError:
+            show_error(filename, "doesn't look like an archive")
+            continue
+        extractor = handler(filename, mimetype, encoding)
+        contents = extractor.check_contents()
+        path = '.'
+        if contents == BOMB:
+            directory = extractor.basename()
+            try:
+                os.mkdir(directory)
+            except OSError, error:
+                show_error(filename, "could not create %s: %s" %
+                           (error.filename, error.strerror))
+                continue
+            os.chdir(directory)
+            path = '..'
+        extractor.extract(path)
+        if contents == BOMB:
+            os.chdir('..')
+
+if __name__ == '__main__':
+    main(sys.argv[1:])

mercurial