forked from saucer-man/qq_msg_decode
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.py
387 lines (319 loc) · 10.3 KB
/
index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
import sys
import json
import math
import struct
import binascii
import sqlite3
# 从 face.json 中读取表情映射
with open('face.json', 'r', encoding='utf-8') as f:
faceMap = json.load(f)
MsgText = 1
MsgFace = 2
MsgGroupImage = 3
MsgPrivateImage = 6
MsgVoice = 7
MsgNickName = 18
MsgVideo = 26
class Buffer:
def __init__(self, buf: bytes):
self.buf = buf
self.off = 0
def empty(self):
return self.buf is None or len(self.buf) <= self.off
# 读取n个字节
def read(self, n) -> bytes:
if self.empty():
return None
r = self.buf[self.off : self.off + n]
self.off += n
return r
# 跳过n个字节
def skip(self, n):
self.off += n
# 获取4个字节,转化为小端int
def uint32(self) -> int:
(u,) = struct.unpack_from("<I", self.buf, self.off)
# 等同于
# u = int.from_bytes(self.buf[self.off:self.off+4], byteorder='little')
self.off += 4
return u
# 获取2个字节,转化为小端int
def uint16(self):
(u,) = struct.unpack_from("<H", self.buf, self.off)
self.off += 2
return u
# 获取1个字节
def byte(self) -> int:
by = self.buf[self.off]
self.off += 1
return by
# 获取一个字节
def t(self) -> int:
return self.byte()
# 获取2个字节
def l(self) -> int:
return self.uint16()
def tlv(self):
t = self.t()
l = self.l()
v = self.read(l)
return t, l, v
class MsgElem:
def Type(self):
pass
class Header:
def __init__(self):
self.Time = 0
self.Rand = 0
self.Color = 0
self.FontSize = 0
self.FontStyle = 0
self.Charset = 0
self.FontFamily = 0
self.FontName = ""
def __str__(self): # 定义打印对象时打印的字符串
return ",".join([f"{k}:{v}" for k, v in self.__dict__.items()])
def DecodeNickname(b: bytes) -> str:
# print(f"进入DecodeNickname,len(b):{len(b)}")
buf = Buffer(b)
while not buf.empty():
t, _, v = buf.tlv()
# print(f"DecodeNickname内部,t:{t}, v:{v}")
if t in (1, 2):
return v.decode("utf-16")
return ""
class Msg:
def __init__(self):
self.Header = Header()
self.Elements = []
self.SenderNickname = ""
def __str__(self): # 定义打印对象时打印的字符串
return "\n".join([f"{k}:{v}" for k, v in self.__dict__.items()])
MsgText = 1
MsgFace = 2
MsgGroupImage = 3
MsgPrivateImage = 6
MsgVoice = 7
MsgNickName = 18
MsgVideo = 26
class TextElement:
def __init__(self, content):
self.Content = content
def Type(self):
return MsgText
def __str__(self):
return self.Content
class ImageElement:
def __init__(self):
self.Path = ""
self.Hash = bytes()
def Type(self):
return MsgGroupImage
class FaceElement:
def __init__(self, id, name):
self.Id = id
self.Name = name
def Type(self):
return MsgFace
class VoiceElement:
def __init__(self, hash):
self.Hash = hash
def Type(self):
return MsgVoice
class VideoElement:
def __init__(self, hash):
self.Hash = hash
def Type(self):
return MsgVideo
def DecodeTextMsg(b):
buf = Buffer(b)
while not buf.empty():
t, _, v = buf.tlv()
# print(f"DecodeTextMsg内部:t:{t}, v:{v.decode('utf-16')}")
if t == 1:
return TextElement(content=v.decode("utf-16"))
return None
def DecodeFace(b):
buf = Buffer(b)
while not buf.empty():
t, _, v = buf.tlv()
if t == 1:
id = 0
for byte in v:
id = (id << 8) | byte
if id not in faceMap:
return FaceElement(id=id, name="未知")
return FaceElement(id=id, name=faceMap[id])
return None
def DecodeImage(b):
elem = ImageElement()
buf = Buffer(b)
while not buf.empty():
t, _, v = buf.tlv()
if t == 1:
elem.Hash = v
elif t == 2:
elem.Path = v.decode("utf-16")
return elem
def DecodeVoice(b):
buf = Buffer(b)
while not buf.empty():
t, _, v = buf.tlv()
if t == 1:
return VoiceElement(hash=v)
return None
def DecodeVideo(b):
buf = Buffer(b)
while not buf.empty():
t, _, v = buf.tlv()
if t == 1:
h = bytearray(v[244 : 244 + 16])
for i in range(len(h)):
h[i] ^= 0xEF
return VideoElement(hash=h)
return None
MsgDecoders = {
MsgText: DecodeTextMsg,
MsgFace: DecodeFace,
MsgGroupImage: DecodeImage,
MsgPrivateImage: DecodeImage,
MsgVoice: DecodeVoice,
MsgVideo: DecodeVideo,
}
def encode_msg(msg) -> str:
# print(f"进入encode_msg")
def encode_elem(elems):
ok = ""
for elem in elems:
# print(type(elem))
if isinstance(elem, TextElement):
# print(f"encode_elem命中了TextElement")
ok += elem.Content
elif isinstance(elem, ImageElement):
# print(f"encode_elem命中了ImageElement")
# print(f"[t:img,path={elem.Path},hash={binascii.hexlify(elem.Hash).decode('utf-8')}]")
ok += f"[t:img,path={elem.Path},hash={binascii.hexlify(elem.Hash).decode('utf-8')}]"
elif isinstance(elem, VoiceElement):
# print(f"encode_elem命中了VoiceElement")
# todo:这里需要对elem.Hash进行base48编码,就是file了
ok += (
f"[t:voice,file="
",hash={binascii.hexlify(elem.Hash).decode('utf-8')}.amr]"
)
elif isinstance(elem, VideoElement):
# print(f"encode_elem命中了VideoElement")
ok += f"[t:video,hash={binascii.hexlify(elem.Hash).decode('utf-8')}]"
elif isinstance(elem, FaceElement):
# print(f"encode_elem命中了FaceElement")
ok += f"[t:face,id={elem.Id},name={elem.Name}]"
elif isinstance(elem, str):
# print(f"encode_elem命中了str")
ok += elem
else:
print("[!]", end='', flush=True)
return ok
return encode_elem(msg.Elements)
def Unpack(b: bytes) -> Msg:
msg = Msg()
header = Header()
buf = Buffer(b)
buf.skip(8)
header.Time = buf.uint32()
# print(f"header.Time:{header.Time}")
header.Rand = buf.uint32()
header.Color = buf.uint32()
header.FontSize = buf.byte()
header.FontStyle = buf.byte()
header.Charset = buf.byte()
header.FontFamily = buf.byte()
fontName = buf.read(int(buf.uint16()))
header.FontName = fontName.decode("utf-16")
msg.Header = header
buf.skip(2)
while not buf.empty():
t, _, v = buf.tlv()
if t == MsgNickName:
msg.SenderNickname = DecodeNickname(v)
else:
if t in MsgDecoders:
msg.Elements.append(MsgDecoders[t](v))
return msg
def run(db_path: str):
# 连接数据库
conn = sqlite3.connect(db_path)
cur = conn.cursor()
# 获取表名
cur.execute("PRAGMA mmap_size = 10240000") # 1mb 内存缓存
cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
for table in cur.fetchall():
table_name = table[0]
print(f"开始解码 {table_name}", end='|')
# 获取表的列名
cur.execute(f"SELECT * FROM {table_name} limit 1")
# print(f"cur.description:{cur.description}")
col_name_list = [tuple[0] for tuple in cur.description]
if "MsgContent" not in col_name_list: # 如果这个表不包含MsgContent,则不做处理
print(f"不包含MsgContent, 跳过")
continue
# print(col_name_list)
if "DecodedMsg" in col_name_list:
print(f"已经解码过, 跳过")
continue
# 尝试增加一列
try:
# 添加新列到数据库
cur.execute(f"ALTER TABLE {table_name} ADD DecodedMsg text")
conn.commit()
except Exception as e:
print(f"添加新列到数据库失败:{e}")
continue
# 遍历每一列并且更新DecodedMsg字段
# 获取查询结果
cur.execute(f"SELECT TIME, MsgContent FROM {table_name}")
try:
rows = cur.fetchall()
except Exception as e:
print(f"获取查询结果失败:{e}")
continue
row_len = len(rows)
if row_len == 0:
print('没有数据,跳过')
continue
print(f"数据长度: {row_len} log10: {math.log10(row_len)}")
log_size = math.log10(row_len) # 取对数 算个位数 整个进度
if row_len > 500:
# 超过 500 条数据 就输出进度条
# 进度条长度 100 个 .
print_len = int(row_len / 100)
print(f"比例: 1:{print_len}")
counter = 0
for row in rows:
time, msg_content = row
msg = Unpack(msg_content)
cur.execute(f"UPDATE {table_name} SET `DecodedMsg` = ? WHERE TIME = ?", (encode_msg(msg), time,))
# print(f"UPDATE {table_name} SET `DecodedMsg` = {encode_msg(msg)} WHERE TIME = {time}")
# 提交
counter += 1
if counter > print_len:
counter = 0
print('.', end='', flush=True)
conn.commit()
conn.commit()
else:
# 不足 500 条数据 就不输出进度条
for row in rows:
time, msg_content = row
msg = Unpack(msg_content)
cur.execute(f"UPDATE {table_name} SET `DecodedMsg` = ? WHERE TIME = ?", (encode_msg(msg), time,))
# print(f"UPDATE {table_name} SET `DecodedMsg` = {encode_msg(msg)} WHERE TIME = {time}")
# 提交
conn.commit()
print("解码完成")
cur.close()
conn.close()
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python index.py [db_path]")
sys.exit(1)
db_path = sys.argv[1]
run(db_path)