-
Notifications
You must be signed in to change notification settings - Fork 2
/
pytar.py
executable file
·237 lines (200 loc) · 7.22 KB
/
pytar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
#!/bin/env python
import tarfile
import traceback
import sys
from itertools import chain
from contextlib import closing
###################################################################
# Feed out info
#
verbose = False
ftypes = {
tarfile.REGTYPE: "REG",
tarfile.AREGTYPE: "AREG",
tarfile.LNKTYPE: "LNK",
tarfile.SYMTYPE: "SYM",
tarfile.CHRTYPE: "CHR",
tarfile.BLKTYPE: "BLK",
tarfile.DIRTYPE: "DIR",
tarfile.FIFOTYPE: "FIFO",
tarfile.CONTTYPE: "CONT",
tarfile.GNUTYPE_LONGNAME: "GNULONGNAME",
tarfile.GNUTYPE_LONGLINK: "GNULONGLINK",
tarfile.GNUTYPE_SPARSE: "GNUSPARSE"
}
def verboseInfo(tinfo):
if not verbose: return
ftype = ftypes[tinfo.type]
if tinfo.issym() or tinfo.islnk():
ftype = "{}: {}".format(ftype, tinfo.linkname)
sys.stderr.write("'{tinfo.name}' "
"'{tinfo.size}' "
"'{tinfo.mtime}' "
"'{tinfo.mode}' "
"'{type}' "
"'{tinfo.uid}' "
"'{tinfo.gid}' "
"'{tinfo.gname}'\n".
format(tinfo=tinfo, type=ftype))
def verboseConvert(srcname, sinkname, imode, omode):
if not verbose: return
sys.stderr.write("{} ({}) -> {} ({})\n".
format(srcname, imode, sinkname, omode))
def verboseConverted(srcname, sinkname, imode, omode, complete):
if not verbose: return
s = "Completed" if complete else "Incomplete"
sys.stderr.write("{}: {} ({}) -> {} ({})\n".
format(s, srcname, imode, sinkname, omode))
def verboseCheck(srcname, mode):
if not verbose: return
sys.stderr.write("Check: {} ({})\n".
format(srcname, mode))
###################################################################
#
# the trick here is to check whether the member we're looking at
# is still good before we attempt to write it out
# We return the file if this is a regular file, otherwise the
# tinfo we got is enough
#
def extract(srctar, tinfo):
if not tinfo.isfile(): return None
try:
f = srctar.extractfile(tinfo)
# Go to the end -1 --- but this may not happen yet for gz
f.seek(-1, 2)
try: f.read(1) # So we try to read that last character
except:
sys.stderr.write("Error seeking {}\n".format(tinfo.name))
traceback.print_exc()
raise ValueError("Bad File: {tinfo.name}, {tinfo.size}".
format(tinfo=tinfo))
# But for not gzipped files, that works but the pos is wrong
p = f.tell()
if p != tinfo.size:
raise ValueError("File truncated: {tinfo.name}, {tinfo.size}".
format(tinfo=tinfo))
f.seek(0)
return f
except: f.close(); raise
#
# Like extract, but just do the check
# Much slower than simply running tar t...
#
def checkinfo(tar, tinfo):
if not tinfo.isfile(): return
with closing(tar.extractfile(tinfo)) as f:
f.seek(-1, 2)
try: f.read(1)
except:
sys.stderr.write("Error seeking {}\n".format(tinfo.name))
traceback.print_exc()
raise ValueError("Bad File: {tinfo.name}, {tinfo.size}".
format(tinfo=tinfo))
p = f.tell()
if p != tinfo.size:
raise ValueError("File truncated: {tinfo.name}, {tinfo.size}".
format(tinfo=tinfo))
#
# So in the transfer, if the extract fails to get to the end
# we bail and stopping updating the output tar
#
# Return: false if we bailed partway through
# srctar & sinktar: tarfile objects
#
def transfer(srctar, sinktar):
for tinfo in srctar:
verboseInfo(tinfo)
try: f = extract(srctar, tinfo)
except:
sys.stderr.write("Error extracting {}\n".format(tinfo.name))
traceback.print_exc()
return False # Bail! Bail!
# No exception? We should be safe to copy
sinktar.addfile(tinfo, f)
if f is not None: f.close()
return True
#
# Like transfer, but just do the check on the source
# Much slower than tar t...
def checkfile(tar):
for tinfo in tar:
verboseInfo(tinfo)
try: checkinfo(tar, tinfo)
except:
sys.stderr.write("Error checking {}\n".format(tinfo.name))
traceback.print_exc()
return False
#############################
# convert:
# Here's the entry point
# srcname and sinkname are file names,
# imode and omode are '' or 'gz' (bz2 doesn't work)
#
def convert(srcname, sinkname, imode='', omode=''):
verboseConvert(srcname, sinkname, imode, omode)
with tarfile.open(srcname, mode="r:{}".format(imode)) \
as srctar:
with tarfile.open(sinkname, mode="w:{}".format(omode)) \
as sinktar:
c = transfer(srctar, sinktar)
verboseConverted(srcname, sinkname, imode, omode, c)
#
# Like convert, but just do the check on source
# Much slower than tar t...
def check(srcname, mode=''):
verboseCheck(srcname, mode)
with tarfile.open(srcname, mode='r|{}'.format(mode)) as tar:
checkfile(tar)
##############################################################
#
# Command line wrapper
#
# convert src trg
def main_convert(src, trg):
srcmode, trgmode = '', ''
for mode in 'gz',: # bz2 doesn't work
if src.endswith('.{}'.format(mode)): srcmode = mode
if trg.endswith('.{}'.format(mode)): trgmode = mode
convert(src, trg, srcmode, trgmode)
# check src
# Like convert, but just do the check
# Guess what? Much slower than tar t...
def main_check(src):
srcmode = ''
for mode in 'gz': # bz2 doesn't work
if src.endswith('.{}'.format(mode)): srcmode = mode
check(src, srcmode)
# arg wrapper
def main():
import argparse
global verbose
parser = argparse.ArgumentParser(
description="Some utilities for broken tar files"
)
parser.add_argument('-v', help="be verbose", action='store_true')
# Either convert or check
# Set func to call as default value for func for sub parser
sub = parser.add_subparsers()
# convert
trans = sub.add_parser("convert",
help='safely convert broken tar',
description="Read a tar file into another. "
"Stops when it fails with a file, "
"allowing a 'good' "
"partial tar file to be created.")
trans.add_argument('src', help="input tar name, ending with .tar{,.gz}")
trans.add_argument('trg', help="output tar name, ending with .tar{,.gz}")
trans.set_defaults(func=lambda: main_convert(args.src, args.trg))
# check
chk = sub.add_parser("check",
help='Check whether tar is broken',
description="Check files from beginning, stop with failure")
chk.add_argument('src', help="input tar name, ending with .tar{,.gz}")
chk.set_defaults(func=lambda: main_check(args.src))
# Parse args, set verbose and call mode
args = parser.parse_args()
verbose = args.v
args.func()
###################################
if __name__ == "__main__": main()
###################################