scripts/x

branch
trunk
changeset 19
bb6e9f4af1a5
parent 18
1600807a32bd
child 20
69c93c3e6972
equal deleted inserted replaced
18:1600807a32bd 19:bb6e9f4af1a5
1 #!/usr/bin/env python
2 #
3 # x -- Intelligently extract various archive types.
4 # Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>.
5 #
6 # This program is free software; you can redistribute it and/or modify it
7 # under the terms of the GNU General Public License as published by the
8 # Free Software Foundation; either version 2 of the License, or (at your
9 # option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
14 # Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License along
17 # with this program; if not, write to the Free Software Foundation, Inc.,
18 # 51 Franklin Street, 5th Floor, Boston, MA, 02111.
19
20 import errno
21 import logging
22 import mimetypes
23 import optparse
24 import os
25 import stat
26 import subprocess
27 import sys
28 import tempfile
29
30 from cStringIO import StringIO
31
32 VERSION = "3.0"
33 VERSION_BANNER = """x version %s
34 Copyright (c) 2006 Brett Smith <brettcsmith@brettcsmith.org>
35
36 This program is free software; you can redistribute it and/or modify it
37 under the terms of the GNU General Public License as published by the
38 Free Software Foundation; either version 2 of the License, or (at your
39 option) any later version.
40
41 This program is distributed in the hope that it will be useful, but
42 WITHOUT ANY WARRANTY; without even the implied warranty of
43 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
44 Public License for more details.""" % (VERSION,)
45
46 MATCHING_DIRECTORY = 1
47 # ONE_DIRECTORY = 2
48 BOMB = 3
49 EMPTY = 4
50 COMPRESSED = 5
51
52 mimetypes.encodings_map.setdefault('.bz2', 'bzip2')
53 mimetypes.types_map['.exe'] = 'application/x-msdos-program'
54
55 def run_command(command, description, stdout=None, stderr=None, stdin=None):
56 process = subprocess.Popen(command, stdin=stdin, stdout=stdout,
57 stderr=stderr)
58 status = process.wait()
59 for pipe in (process.stdout, process.stderr):
60 try:
61 pipe.close()
62 except AttributeError:
63 pass
64 if status != 0:
65 return ("%s error: '%s' returned status code %s" %
66 (description, ' '.join(command), status))
67 return None
68
69 class FilenameChecker(object):
70 def __init__(self, original_name):
71 self.original_name = original_name
72
73 def is_free(self, filename):
74 return not os.path.exists(filename)
75
76 def check(self):
77 for suffix in [''] + ['.%s' % (x,) for x in range(1, 10)]:
78 filename = '%s%s' % (self.original_name, suffix)
79 if self.is_free(filename):
80 return filename
81 raise ValueError("all alternatives for name %s taken" %
82 (self.original_name,))
83
84
85 class DirectoryChecker(FilenameChecker):
86 def is_free(self, filename):
87 try:
88 os.mkdir(filename)
89 except OSError, error:
90 if error.errno == errno.EEXIST:
91 return False
92 raise
93 return True
94
95
96 class ExtractorError(Exception):
97 pass
98
99
100 class ProcessStreamer(object):
101 def __init__(self, command, stdin, description="checking contents",
102 stderr=None):
103 self.process = subprocess.Popen(command, bufsize=1, stdin=stdin,
104 stdout=subprocess.PIPE, stderr=stderr)
105 self.command = ' '.join(command)
106 self.description = description
107
108 def __iter__(self):
109 return self
110
111 def next(self):
112 line = self.process.stdout.readline()
113 if line:
114 return line.rstrip('\n')
115 else:
116 raise StopIteration
117
118 def stop(self):
119 while self.process.stdout.readline():
120 pass
121 self.process.stdout.close()
122 status = self.process.wait()
123 if status != 0:
124 raise ExtractorError("%s error: '%s' returned status code %s" %
125 (self.description, self.command, status))
126 try:
127 self.process.stderr.close()
128 except AttributeError:
129 pass
130
131
132 class BaseExtractor(object):
133 decoders = {'bzip2': 'bzcat', 'gzip': 'zcat', 'compress': 'zcat'}
134
135 name_checker = DirectoryChecker
136
137 def __init__(self, filename, mimetype, encoding):
138 if encoding and (not self.decoders.has_key(encoding)):
139 raise ValueError("unrecognized encoding %s" % (encoding,))
140 self.filename = os.path.realpath(filename)
141 self.mimetype = mimetype
142 self.encoding = encoding
143 self.included_archives = []
144 try:
145 self.archive = open(filename, 'r')
146 except (IOError, OSError), error:
147 raise ExtractorError("could not open %s: %s" %
148 (filename, error.strerror))
149 if encoding:
150 self.pipe([self.decoders[encoding]], "decoding")
151 self.prepare()
152
153 def run(self, command, description="extraction", stdout=None, stderr=None,
154 stdin=None):
155 error = run_command(command, description, stdout, stderr, stdin)
156 if error:
157 raise ExtractorError(error)
158
159 def pipe(self, command, description, stderr=None):
160 output = tempfile.TemporaryFile()
161 self.run(command, description, output, stderr, self.archive)
162 self.archive.close()
163 self.archive = output
164 self.archive.flush()
165
166 def prepare(self):
167 pass
168
169 def check_contents(self):
170 archive_type = None
171 filenames = self.get_filenames()
172 try:
173 filename = filenames.next()
174 if extractor_map.has_key(mimetypes.guess_type(filename)[0]):
175 self.included_archives.append(filename)
176 first_part = filename.split('/', 1)[0] + '/'
177 except StopIteration:
178 filenames.stop()
179 return EMPTY
180 for filename in filenames:
181 if extractor_map.has_key(mimetypes.guess_type(filename)[0]):
182 self.included_archives.append(filename)
183 if (archive_type is None) and (not filename.startswith(first_part)):
184 archive_type = BOMB
185 filenames.stop()
186 if archive_type:
187 return archive_type
188 if self.basename() == first_part[:-1]:
189 return MATCHING_DIRECTORY
190 return first_part
191
192 def basename(self):
193 pieces = os.path.basename(self.filename).split('.')
194 extension = '.' + pieces[-1]
195 if mimetypes.encodings_map.has_key(extension):
196 pieces.pop()
197 extension = '.' + pieces[-1]
198 if (mimetypes.types_map.has_key(extension) or
199 mimetypes.common_types.has_key(extension) or
200 mimetypes.suffix_map.has_key(extension)):
201 pieces.pop()
202 return '.'.join(pieces)
203
204 def extract(self, path):
205 old_path = os.path.realpath(os.curdir)
206 os.chdir(path)
207 self.archive.seek(0, 0)
208 self.extract_archive()
209 os.chdir(old_path)
210
211
212 class TarExtractor(BaseExtractor):
213 def get_filenames(self):
214 self.archive.seek(0, 0)
215 return ProcessStreamer(['tar', '-t'], self.archive)
216
217 def extract_archive(self):
218 self.run(['tar', '-x'], stdin=self.archive)
219
220
221 class ZipExtractor(BaseExtractor):
222 def __init__(self, filename, mimetype, encoding):
223 self.filename = os.path.realpath(filename)
224 self.mimetype = mimetype
225 self.encoding = encoding
226 self.included_archives = []
227 self.archive = StringIO()
228
229 def get_filenames(self):
230 self.archive.seek(0, 0)
231 return ProcessStreamer(['zipinfo', '-1', self.filename], None)
232
233 def extract_archive(self):
234 self.run(['unzip', '-q', self.filename])
235
236
237 class CpioExtractor(BaseExtractor):
238 def get_filenames(self):
239 self.archive.seek(0, 0)
240 return ProcessStreamer(['cpio', '-t'], self.archive,
241 stderr=subprocess.PIPE)
242
243 def extract_archive(self):
244 self.run(['cpio', '-i', '--make-directories',
245 '--no-absolute-filenames'],
246 stderr=subprocess.PIPE, stdin=self.archive)
247
248
249 class RPMExtractor(CpioExtractor):
250 def prepare(self):
251 self.pipe(['rpm2cpio', '-'], "rpm2cpio")
252
253 def basename(self):
254 pieces = os.path.basename(self.filename).split('.')
255 if len(pieces) == 1:
256 return pieces[0]
257 elif pieces[-1] != 'rpm':
258 return BaseExtractor.basename(self)
259 pieces.pop()
260 if len(pieces) == 1:
261 return pieces[0]
262 elif len(pieces[-1]) < 8:
263 pieces.pop()
264 return '.'.join(pieces)
265
266 def check_contents(self):
267 CpioExtractor.check_contents(self)
268 return BOMB
269
270
271 class DebExtractor(TarExtractor):
272 def prepare(self):
273 self.pipe(['ar', 'p', self.filename, 'data.tar.gz'],
274 "data.tar.gz extraction")
275 self.archive.seek(0, 0)
276 self.pipe(['zcat'], "data.tar.gz decompression")
277
278 def basename(self):
279 pieces = os.path.basename(self.filename).split('_')
280 if len(pieces) == 1:
281 return pieces[0]
282 last_piece = pieces.pop()
283 if (len(last_piece) > 10) or (not last_piece.endswith('.deb')):
284 return BaseExtractor.basename(self)
285 return '_'.join(pieces)
286
287 def check_contents(self):
288 TarExtractor.check_contents(self)
289 return BOMB
290
291
292 class CompressionExtractor(BaseExtractor):
293 name_checker = FilenameChecker
294
295 def basename(self):
296 pieces = os.path.basename(self.filename).split('.')
297 extension = '.' + pieces[-1]
298 if mimetypes.encodings_map.has_key(extension):
299 pieces.pop()
300 return '.'.join(pieces)
301
302 def get_filenames(self):
303 yield self.basename()
304
305 def check_contents(self):
306 return COMPRESSED
307
308 def extract(self, path):
309 output = open(path, 'w')
310 self.run(['cat'], "output write", stdin=self.archive, stdout=output)
311 output.close()
312
313
314 class BaseHandler(object):
315 def __init__(self, extractor, contents, options):
316 self.logger = logging.getLogger('x-log')
317 self.extractor = extractor
318 self.contents = contents
319 self.options = options
320 self.target = None
321
322 def extract(self):
323 try:
324 self.extractor.extract(self.target)
325 except (ExtractorError, IOError, OSError), error:
326 return str(error)
327
328 def cleanup(self):
329 if self.target is None:
330 return
331 command = 'find'
332 status = subprocess.call(['find', self.target, '-type', 'd',
333 '-exec', 'chmod', 'u+rwx', '{}', ';'])
334 if status == 0:
335 command = 'chmod'
336 status = subprocess.call(['chmod', '-R', 'u+rw', self.target])
337 if status != 0:
338 return "%s returned with exit status %s" % (command, status)
339
340
341 # The "where to extract" table, with options and archive types.
342 # This dictates the contents of each can_handle method.
343 #
344 # Flat Overwrite None
345 # File basename basename FilenameChecked
346 # Match . . tempdir + checked
347 # Bomb . basename DirectoryChecked
348
349 class FlatHandler(BaseHandler):
350 def can_handle(contents, options):
351 return ((options.flat and (contents != COMPRESSED)) or
352 (options.overwrite and (contents == MATCHING_DIRECTORY)))
353 can_handle = staticmethod(can_handle)
354
355 def __init__(self, extractor, contents, options):
356 BaseHandler.__init__(self, extractor, contents, options)
357 self.target = '.'
358
359 def cleanup(self):
360 for filename in self.extractor.get_filenames():
361 stat_info = os.stat(filename)
362 perms = stat.S_IRUSR | stat.S_IWUSR
363 if stat.S_ISDIR(stat_info.st_mode):
364 perms |= stat.S_IXUSR
365 os.chmod(filename, stat_info.st_mode | perms)
366
367
368 class OverwriteHandler(BaseHandler):
369 def can_handle(contents, options):
370 return ((options.flat and (contents == COMPRESSED)) or
371 (options.overwrite and (contents != MATCHING_DIRECTORY)))
372 can_handle = staticmethod(can_handle)
373
374 def __init__(self, extractor, contents, options):
375 BaseHandler.__init__(self, extractor, contents, options)
376 self.target = self.extractor.basename()
377
378
379 class MatchHandler(BaseHandler):
380 def can_handle(contents, options):
381 return contents == MATCHING_DIRECTORY
382 can_handle = staticmethod(can_handle)
383
384 def extract(self):
385 basename = self.extractor.basename()
386 self.target = tempfile.mkdtemp(dir='.')
387 result = BaseHandler.extract(self)
388 if result is None:
389 tempdir = self.target
390 checker = self.extractor.name_checker(basename)
391 self.target = checker.check()
392 os.rename(os.path.join(tempdir, basename), self.target)
393 os.rmdir(tempdir)
394 return result
395
396
397 class EmptyHandler(object):
398 def can_handle(contents, options):
399 return contents == EMPTY
400 can_handle = staticmethod(can_handle)
401
402 def __init__(self, extractor, contents, options): pass
403 def extract(self): pass
404 def cleanup(self): pass
405
406
407 class BombHandler(BaseHandler):
408 def can_handle(contents, options):
409 return True
410 can_handle = staticmethod(can_handle)
411
412 def __init__(self, extractor, contents, options):
413 BaseHandler.__init__(self, extractor, contents, options)
414 checker = self.extractor.name_checker(self.extractor.basename())
415 self.target = checker.check()
416
417
418 extractor_map = {'application/x-tar': TarExtractor,
419 'application/zip': ZipExtractor,
420 'application/x-msdos-program': ZipExtractor,
421 'application/x-debian-package': DebExtractor,
422 'application/x-redhat-package-manager': RPMExtractor,
423 'application/x-rpm': RPMExtractor,
424 'application/x-cpio': CpioExtractor}
425
426 handlers = [FlatHandler, OverwriteHandler, MatchHandler, EmptyHandler,
427 BombHandler]
428
429 class ExtractorApplication(object):
430 def __init__(self, arguments):
431 self.parse_options(arguments)
432 self.setup_logger()
433 self.successes = []
434 self.failures = []
435
436 def parse_options(self, arguments):
437 parser = optparse.OptionParser(
438 usage="%prog [options] archive [archive2 ...]",
439 description="Intelligent archive extractor",
440 version=VERSION_BANNER
441 )
442 parser.add_option('-r', '--recursive', dest='recursive',
443 action='store_true', default=False,
444 help='extract archives contained in the ones listed')
445 parser.add_option('-q', '--quiet', dest='quiet',
446 action='count', default=3,
447 help='suppress warning/error messages')
448 parser.add_option('-v', '--verbose', dest='verbose',
449 action='count', default=0,
450 help='be verbose/print debugging information')
451 parser.add_option('-o', '--overwrite', dest='overwrite',
452 action='store_true', default=False,
453 help='overwrite any existing target directory')
454 parser.add_option('-f', '--flat', '--no-directory', dest='flat',
455 action='store_true', default=False,
456 help="don't put contents in their own directory")
457 ## parser.add_option('-n', '--noninteractive', dest='batch',
458 ## action='store_true', default=False,
459 ## help="don't ask how to handle special cases")
460 self.options, filenames = parser.parse_args(arguments)
461 if not filenames:
462 parser.error("you did not list any archives")
463 self.archives = {os.path.realpath(os.curdir): filenames}
464
465 def setup_logger(self):
466 self.logger = logging.getLogger('x-log')
467 handler = logging.StreamHandler()
468 # WARNING is the default.
469 handler.setLevel(10 * (self.options.quiet - self.options.verbose))
470 formatter = logging.Formatter("x: %(levelname)s: %(message)s")
471 handler.setFormatter(formatter)
472 self.logger.addHandler(handler)
473
474 def get_extractor(self):
475 mimetype, encoding = mimetypes.guess_type(self.current_filename)
476 try:
477 extractor = extractor_map[mimetype]
478 except KeyError:
479 if encoding:
480 extractor = CompressionExtractor
481 contents = COMPRESSED
482 else:
483 return "not a known archive type"
484 try:
485 self.current_extractor = extractor(self.current_filename, mimetype,
486 encoding)
487 content = self.current_extractor.check_contents()
488 for handler in handlers:
489 if handler.can_handle(content, self.options):
490 self.current_handler = handler(self.current_extractor,
491 content, self.options)
492 break
493 except ExtractorError, error:
494 return str(error)
495
496 def recurse(self):
497 if not self.options.recursive:
498 return
499 for filename in self.current_extractor.included_archives:
500 tail_path, basename = os.path.split(filename)
501 directory = os.path.join(self.current_directory,
502 self.current_handler.target, tail_path)
503 self.archives.setdefault(directory, []).append(basename)
504
505 def report(self, function, *args):
506 try:
507 error = function(*args)
508 except (ExtractorError, IOError, OSError), exception:
509 error = str(exception)
510 if error:
511 self.logger.error("%s: %s", self.current_filename, error)
512 return False
513 return True
514
515 def run(self):
516 while self.archives:
517 self.current_directory, filenames = self.archives.popitem()
518 for filename in filenames:
519 os.chdir(self.current_directory)
520 self.current_filename = filename
521 success = self.report(self.get_extractor)
522 if success:
523 for name in 'extract', 'cleanup':
524 success = (self.report(getattr(self.current_handler,
525 name)) and success)
526 self.recurse()
527 if success:
528 self.successes.append(self.current_filename)
529 else:
530 self.failures.append(self.current_filename)
531 if self.failures:
532 return 1
533 return 0
534
535
536 if __name__ == '__main__':
537 app = ExtractorApplication(sys.argv[1:])
538 sys.exit(app.run())

mercurial