forked from Love-Lyu/Cute-wool
-
Notifications
You must be signed in to change notification settings - Fork 1
/
xx.py
964 lines (840 loc) · 40.3 KB
/
xx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
"""
心喜 v1.0
变量 sso#备注, 多账户换行或空格
export XSSONF=""
cron: 48 11,16 * * *
const $ = new Env("心喜");
"""
import os
import requests
import random
from datetime import datetime, timezone, timedelta
import time
import sys
import io
# 控制是否启用通知的变量
enable_notification = 1
# 只有在需要发送通知时才尝试导入notify模块
if enable_notification == 1:
try:
from notify import send
except ModuleNotFoundError:
print("警告:未找到notify.py模块。它不是一个依赖项,请勿错误安装。程序将退出。")
sys.exit(1)
#---------简化的框架--------
# 配置参数
BASE_URL = "https://api.xinc818.com/mini/"
# 获取北京日期的函数
def get_beijing_date():
beijing_time = datetime.now(timezone(timedelta(hours=8)))
return beijing_time.date()
def dq_time():
# 获取当前时间戳
dqsj = int(time.time())
# 将时间戳转换为可读的时间格式
dysj = datetime.fromtimestamp(dqsj).strftime('%Y-%m-%d %H:%M:%S')
return dqsj, dysj
# 获取环境变量
def get_env_variable(var_name):
value = os.getenv(var_name)
if value is None:
print(f'环境变量{var_name}未设置,请检查。')
return None
accounts = value.strip().split('\n')
num_accounts = len(accounts)
print(f'-----------本次账号运行数量:{num_accounts}-----------')
print(f'----------项目:心喜小程序-----------')
return accounts
# 封装请求头
def create_headers(sso):
headers = {
'Host': 'api.xinc818.com',
'Connection': 'keep-alive',
'sso': sso,
'xweb_xhr': '1',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) NetType/WIFI MiniProgramEnv/Windows WindowsWechat/WMPF WindowsWechat(0x63090819) XWEB/8531',
'Content-Type': 'application/json',
'Accept': '*/*',
'Sec-Fetch-Site': 'cross-site',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'zh-CN,zh;q=0.9'
}
return headers
def rwlb(sso): # 任务列表
urlrw = BASE_URL + 'dailyTask/daily' # 确保字符串连接正确
headers = create_headers(sso)
try:
response = requests.get(urlrw, headers=headers)
response_data = response.json()
if response_data["code"] != 0 or response_data["data"] is None:
print("错误响应或数据为空,跳过当前账号")
return None
for task in response_data["data"]:
task_id = task.get("id")
task_name = task.get("name")
task_status = task.get("status")
# 跳过不需要处理的任务
if task_name in ["完善个人资料", "购买商城商品", "参加活动", "申请试用", "提交反馈建议"]:
print(f"🚫🚫'{task_name}' ⛔⛔⛔跳过❌❌。")
continue
if task_status: # 如果任务状态为 True (已完成)
print(f"任务 '{task_name}' 已完成。")
#elif task_name == "参与讨论": # 测试bug
# selected_post = fetch_posts_data(sso)
# if selected_post:
# post_id = selected_post[0] # 假设列表的第一个元素是 post_id
# cytl(sso, post_id)
continue
print(f"任务 '{task_name}' 尚未完成。进行处理...")
if task_name == "分享心喜":
fx_xx(sso)
elif task_name == "签到打卡":
sign_dk(sso, task_id)
elif task_name == "想要商品":
xysp(sso)
elif task_name == "大转盘抽奖":
dzp_cj(sso, task_id)
elif task_name == "去商城浏览30秒":
ll_sp(sso, task_id)
elif task_name == "发帖":
hitokoto_content = fetch_hitokoto() # 获取一言内容
if hitokoto_content and not hitokoto_content.startswith("请求一言失败"):
rw_post(sso, hitokoto_content, task_id) # 使用一言内容作为发帖内容
else:
print("未能获取有效的一言内容,无法执行发帖操作。跳过此任务。")
elif task_name == "点赞用户":
selected_post = fetch_posts_data(sso)
if selected_post:
post_id = selected_post[0] # 从选中的帖子中获取 post_id
like_post(sso, post_id) # 执行点赞操作
else:
print("未能获取帖子数据,无法执行点赞操作。")
elif task_name == "关注用户":
selected_post = fetch_posts_data(sso)
if selected_post:
follow_user_id = selected_post[1] # 假设使用帖子的发布者ID作为关注对象
gz_user(sso, follow_user_id)
else:
print("未能获取帖子数据,无法执行关注操作。")
elif task_name == "参与讨论":
selected_post = fetch_posts_data(sso)
if selected_post:
post_id = selected_post[0] # 假设列表的第一个元素是 post_id
cytl(sso, post_id)
else:
print("未能获取帖子数据,无法参与讨论。")
elif task_name == "给主播留言":
selected_anchor = zb_list(sso)
if selected_anchor:
circle_id, related_id = selected_anchor
add_comment(sso, circle_id, related_id, task_id)
else:
print("未能获取主播数据,无法执行留言操作。")
# 在任务之间停止 1 到 3 秒
time.sleep(random.randint(1, 2))
return response_data
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None
except ValueError:
print("响应内容不是有效的 JSON 格式")
return None
def fx_xx(sso): # 分享心喜
headers = create_headers(sso)
urlfx = BASE_URL + 'dailyTask/share'
try:
response = requests.get(urlfx, headers=headers)
if response.status_code == 200:
response_data = response.json() # 解析 JSON 响应
#print("分享心喜完整响应内容: ", response_data)
# 检查 data 是否存在
if response_data.get('data'):
task_name = response_data['data'].get('taskName', '未知任务')
single_reward = response_data['data'].get('singleReward', '未知奖励')
print(f"完成🎉 {task_name},🥂奖励: {single_reward}")
print()
else:
#print("分享心喜任务完成,但未获取到详细信息。")
print("🤡🤡分享心喜任务🤡🤡。")
time.sleep(3)
else:
print(f"请求失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
def sign_dk(sso, task_id): # 签到打卡
sign_in_url = BASE_URL + f'sign/in?dailyTaskId={task_id}' # 构造签到 URL
headers = create_headers(sso) # 使用 create_headers 函数创建 headers
try:
response = requests.get(sign_in_url, headers=headers) # 使用 GET 方法签到
if response.status_code == 200:
response_data = response.json()
#print(f"签到成功!响应内容: {response.text}")
# 在访问 taskResult 之前检查它是否存在
if response_data.get('data') and response_data['data'].get('taskResult'):
task_name = response_data['data']['taskResult'].get('taskName', '未知任务')
single_reward = response_data['data']['taskResult'].get('singleReward', '未知奖励')
print(f" 🎉: {task_name}, 奖励: {single_reward}")
print()
else:
print("签到成功,但未获取到任务结果。")
return True # 签到成功
else:
print(f"签到失败,状态码:{response.status_code}, 响应内容: {response.text}")
return False # 签到失败
except requests.exceptions.RequestException as e:
print(f"签到请求失败: {e}")
return False # 请求失败
def xysp(sso): # 想要商品
headers = {
'sso': sso,
'Host': 'cdn-api.xinc818.com' # 注意这里的 Host
}
random_page_num = random.randint(1, 15)
url_desire_goods = f'https://cdn-api.xinc818.com/mini/integralGoods?orderField=sort&orderScheme=DESC&pageSize=10&pageNum={random_page_num}'
try:
response = requests.get(url_desire_goods, headers=headers)
print(f"随机选择的页数为: {random_page_num}")
if response.status_code == 200:
response_json = response.json()
goods_list = response_json.get('data', {}).get('list', [])
if goods_list:
random_id = random.choice(goods_list)['id']
print("随机选取的商品ID:", random_id)
headers['Host'] = 'api.xinc818.com' # 更新headers为 api.xinc818.com
url_specific_good = f'https://api.xinc818.com/mini/integralGoods/{random_id}?type'
response_specific = requests.get(url_specific_good, headers=headers)
if response_specific.status_code == 200:
response_specific_json = response_specific.json()
outer_id = response_specific_json.get('data', {}).get('outerId', '')
print(f"提取的outerId: {outer_id}")
# 新的POST请求
url_submit = 'https://api.xinc818.com/mini/live/likeLiveItem'
data = {
"isLike": True,
"dailyTaskId": 20,
"productId": outer_id
}
print(data)
submit_response = requests.post(url_submit, headers=headers, json=data)
if submit_response.status_code == 200 and submit_response.headers.get('Content-Type') == 'application/json':
response_data = submit_response.json()
#print(submit_response.json())
if response_data is None:
print(f"响应的JSON数据为空,原始响应内容: {submit_response.text}")
return # 退出当前任务
if response_data.get("data"):
task_name = response_data["data"].get("taskName", "未知任务")
single_reward = response_data["data"].get("singleReward", 0)
print(f" 🎉: {task_name},奖励: {single_reward}")
else:
print("想要商品 请求成功但未完成任务,响应码或数据内容不正确")
return # 退出当前任务
else:
print(f"POST请求失败,状态码:{submit_response.status_code}, 响应内容: {submit_response.text}")
return # 退出当前任务
else:
print("商品列表为空,跳过当前账号")
return # 退出当前任务
else:
print(f"获取商品列表失败,状态码:{response.status_code}, 响应内容: {response.text}")
return # 退出当前任务
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return # 退出当前任务
def dzp_cj(sso, task_id):#大转盘抽奖
try:
headers = create_headers(sso)
# 获取抽奖活动列表
activity_list_url = BASE_URL + 'lottery/list'
activity_response = requests.get(activity_list_url, headers=headers)
if activity_response.status_code != 200:
print('获取活动列表失败,状态码:', activity_response.status_code)
return
activity_data = activity_response.json()
# 查找“幸运大转盘抽奖”的id
activity_id = None
for activity in activity_data['data']:
if activity['activityName'] == "幸运大转盘抽奖":
activity_id = activity['id']
break
if activity_id is None:
print("没有找到‘幸运大转盘抽奖’活动")
return
print(f"找到‘幸运大转盘抽奖’活动, ID: {activity_id}")
# 检查抽奖次数
lottery_url = BASE_URL + f'lottery/{activity_id}/freeNum'
lottery_response = requests.get(lottery_url, headers=headers)
if lottery_response.status_code != 200:
print('检查抽奖次数失败,状态码:', lottery_response.status_code)
return
lottery_data = lottery_response.json()
print('抽奖次数:', lottery_data['data'])
if lottery_data['data'] == 0:
print("没有抽奖机会,过程跳过。")
return
# 获取用户id
user_url = BASE_URL + 'user'
user_response = requests.get(user_url, headers=headers)
if user_response.status_code != 200:
print("获取用户ID失败,状态码: ", user_response.status_code)
return
user_data = user_response.json()
user_id = user_data['data']['id']
print(f"用户ID获取成功: {user_id}")
# 抽奖
lottery_draw_url = BASE_URL + 'lottery/draw'
payload = {
"activityId": activity_id,
"batch": False,
"isIntegral": False,
"userId": user_id,
"dailyTaskId": task_id # 使用变量task_id作为dailyTaskId
}
#print(payload)
lottery_draw_response = requests.post(lottery_draw_url, headers=headers, json=payload)
if lottery_draw_response.status_code == 200:
#print("抽奖成功!")
lottery_result = lottery_draw_response.json()
lottery_result_list = lottery_result['data']['lotteryResult']
if lottery_result_list:
prize_name = lottery_result_list[0].get('prizeName', '未知奖品')
print("中奖奖品🎉:", prize_name)
else:
print("未获取到抽奖结果")
else:
print("抽奖失败,状态码:", lottery_draw_response.status_code)
time.sleep(3)
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
def ll_sp(sso, task_id): # 浏览商品30秒
browse_url = BASE_URL + f'dailyTask/browseGoods/{task_id}'
headers = create_headers(sso)
print(browse_url)
try:
# 模拟浏览前的准备请求
pre_browse_response = requests.get(browse_url, headers=headers)
if pre_browse_response.status_code != 200:
print(f"浏览前的请求失败,状态码:{pre_browse_response.status_code}")
return False
# 模拟用户浏览30秒
time.sleep(3) # 注意这里应该是30秒,但现在是3秒
# 模拟浏览后的完成请求
post_browse_response = requests.get(browse_url, headers=headers)
if post_browse_response.status_code == 200:
print("成功模拟浏览商品30秒")
# 打印响应内容,如果需要
print(post_browse_response.json()) # 更正的行
print(f"浏览后的响应内容: {post_browse_response.text}")
return True
else:
print(f"浏览后的请求失败,状态码:{post_browse_response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return False
def fetch_posts_data(sso):#获取帖子数据并提取 点赞用户id 和 关注用户publisherId,然后随机选择一个
url = "https://cdn-api.xinc818.com/mini/posts/sorts?sortType=SPAM&pageNum=1&pageSize=10&groupClassId=0"
headers = {
"Host": "cdn-api.xinc818.com",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36",
"sso": sso
}
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json().get('data', {}).get('list', [])
extracted_data = [(item['id'], item['publisherId']) for item in data]
if extracted_data:
# 随机选择一个帖子
selected_post = random.choice(extracted_data)
return selected_post
else:
print("没有找到帖子数据")
return None
else:
print(f"请求失败,状态码:{response.status_code}, 响应内容: {response.text}")
return None
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
return None
def like_post(sso, post_id): # 点赞用户
url = BASE_URL + "posts/like"
headers = create_headers(sso)
payload = {
"postsId": str(post_id),
"decision": True
}
try:
response = requests.put(url, headers=headers, json=payload)
if response.status_code == 200:
print(f"💨成功点赞帖子,帖子ID: {post_id}")
#print(f"点赞帖子 完整响应内容: {response.text}")
response_json = response.json()
# 检查 response_json['data'] 是否存在
if response_json.get('data'):
task_name = response_json['data'].get('taskName', '未知任务')
single_reward = response_json['data'].get('singleReward', '未知奖励')
print(f"任务名称: {task_name}, 单次奖励: {single_reward}")
print()
else:
#print("点赞成功,但未获取到任务详情。")
#print("👻👻点赞👻👻。")
print()
else:
print(f"点赞失败,状态码:{response.status_code}")
print(f"完整响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
def gz_user(sso, follow_user_id): # 关注用户
url = BASE_URL + "user/follow"
headers = create_headers(sso)
payload = {
"decision": True,
"followUserId": follow_user_id
}
try:
response = requests.put(url, headers=headers, json=payload)
if response.status_code == 200:
print(f"成功关注💗用户,用户ID: {follow_user_id}")
#print(f"完整响应内容: {response.text}") # 打印完整的响应内容
response_json = response.json()
# 检查 response_json['data'] 是否存在
if response_json.get('data'):
task_name = response_json['data'].get('taskName', '未知任务')
single_reward = response_json['data'].get('singleReward', '未知奖励')
print(f" 🎉: {task_name}, 奖励: {single_reward}")
print()
else:
#print("关注成功,但未获取到任务详情。")
print()
else:
print(f"关注用户失败,状态码:{response.status_code}")
print(f"完整响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
def user_info(sso):#"获取用户信息ID 积分
url = BASE_URL + "user"
headers = create_headers(sso)
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
response_json = response.json()
user_data = response_json.get('data', {})
user_id = user_data.get('id', '')
integral = user_data.get('integral', '')
history_integral = user_data.get('historyIntegral', '')
print(f"用户ID: {user_id}, 当前积分: {integral}, 历史积分: {history_integral}")
return user_id # 返回用户ID
else:
print(f"获取用户信息失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
return None # 如果请求失败或异常,返回None
def cytl(sso, post_id): # "参与讨论,对指定帖子发表随机评论
url = BASE_URL + "postsComments"
headers = create_headers(sso)
user_info_result = user_info(sso) # 获取用户信息的结果
# 定义一个评论内容的字典
comments = [
"非常好!",
"我同意!",
"这确实很重要。",
"我从这个评论学到了很多。",
"加油,",
"为你打call!",
"今天也要加油哦!",
"很有见地!",
"太精彩了!",
"赞同这个观点。",
"好好",
"真的很棒!",
"这个我喜欢!",
"太赞了!",
"很有帮助!",
"这是我见过的最好的观点!",
"太有创意了!",
"这让我思考良多。",
"绝对同意!",
"讲得太好了!",
"这才是重点!",
"我刚想到这个!",
"太同意了!",
"这解释得太清楚了!",
"这个分析很到位!",
"你抓住了核心!",
"这个角度很独特!",
"没想到这样的观点,太棒了!"
"情感丰富,真实感人!",
"每次看到你的评论都很期待!",
"你的观点总能给人启发!",
"你的评论总是那么独到!",
"期待你更多的分享!",
"你的观点太有深度了!",
"每次看到你的评论都很受益!",
"这分析太透彻了!",
"你的评论总能点亮我的思考!",
"看到你的评论,我的心情都好了!"
"这让我看到了不同的视角!",
"每个人的看法都很有意思!",
"太有创造力了,我喜欢!",
"这个讨论很有价值!",
"你的见解让人耳目一新!",
"这是一个很棒的开始!",
"从你的评论中学到了很多!",
"你的想法很有启发性!",
"这个观点很有趣,赞一个!",
"你的理解深度让我佩服!",
"这确实是个好问题,值得探讨。",
"谢谢分享,我受益匪浅!",
"这种观点很难得,很欣赏!",
"你的分析很到位,赞同!",
"这样的讨论太精彩了,期待更多!"
]
content = random.choice(comments) # 随机选择一个评论内容
if user_info_result:
user_id = user_info_result[0] # 仅提取 user_id
payload = {
"customizeImages": [],
"content": content,
"postsId": post_id,
"publisherId": user_id, # 使用提取的 user_id
"floorId": "",
"voice": ""
}
#print(payload)
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
print(f"🙋参与讨论,帖子ID: {post_id}, 内容: '{content}'")
# 解析响应内容以获取taskName和singleReward
response_json = response.json()
#print("参与讨论完整响应内容: ", response_json)
task_name = response_json.get('data', {}).get('taskResult', {}).get('taskName', '')
single_reward = response_json.get('data', {}).get('taskResult', {}).get('singleReward', '')
print(f"完成🎉{task_name}, 奖励: {single_reward}🔝🔝🔝")
print()
else:
print(f"参与讨论失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
else:
print("未能获取用户ID,无法参与讨论。")
def zb_list(sso): # 主播列表
url = BASE_URL + "groups/defaultGroupList"
headers = create_headers(sso)
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
group_list = response.json()
# 提取所有主播的id和associatedAnchorId
anchors = [(group.get('id'), group.get('associatedAnchorId')) for group in group_list.get('data', [])]
# 随机选择一个主播
if anchors:
selected_anchor = random.choice(anchors)
group_id, associated_anchor_id = selected_anchor
#print(f"随机选取的群组ID: {group_id}, 关联主播ID: {associated_anchor_id}")
return group_id, associated_anchor_id # 返回随机选取的群组ID和关联主播ID
else:
print("没有可用的主播列表。")
return None, None
else:
print(f"获取主播列表失败,状态码:{response.status_code}, 响应内容: {response.text}")
return None, None
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
return None, None
def add_comment(sso, circle_id, related_id, daily_task_id):#给主播留言
"""在指定圈子中对指定主播添加评论"""
url = BASE_URL + "anchorComment/addComment"
headers = create_headers(sso)
# 预定义的评论列表
comments = [
"加油,我们支持你!",
"你是最棒的!",
"永远支持你!",
"我们在这里,永远爱你!",
"你总是能给我们带来欢笑!",
"你的直播太棒了!",
"你总是能够打动我们!",
"你的笑容真的很治愈!",
"我们是你坚强的后盾!",
"为你打call!",
"今天也要加油哦!",
"一直在你身后支持你!",
"你是最亮的星!",
"很高兴认识你!",
"你的努力我们都看到了!",
"每一次直播都是一次享受!",
"你的粉丝们都在这里!",
"继续前进,我们会一直在这里!",
"你总是那么有活力!",
"我们看到的不只是努力的你!",
"每次看你的直播都很开心!",
"你的每一次努力我们都看在眼里!",
"你的直播总能给我带来好心情!",
"你的才华无人能及!",
"你的直播是我一天中最期待的时刻!",
"你的魅力真的无法抗拒!",
"在你的直播中总能找到快乐!",
"你的每个瞬间都充满了惊喜!",
"你的努力值得每一份赞赏!",
"你的存在让这个平台更加精彩!",
"期待你的每一次直播!",
"你的每个直播都值得反复观看!",
"你总是能带给我们满满的正能量!",
"你的每一次分享都很有价值!",
"每次看你的直播都能学到很多东西!",
"你的直播里总有无限的乐趣!",
"你的直播总是那么充满活力!",
"你的直播是我们的快乐源泉!",
"感谢你带来这么多美好的直播时光!",
"你的每一场直播都是一场视听盛宴!",
"你的每场直播都是我们的心灵鸡汤!",
"看到你的努力,我们都非常感动!",
"你的笑声太迷人了,每次听都很开心!",
"你的才华横溢,每场直播都令人期待!",
"感谢你总是带给我们这么多正能量!",
"你的每一次直播都给我留下深刻印象!",
"你是我们的超级明星,永远支持你!",
"你在直播中的每一刻都是那么的真实可爱!",
"你的直播是我一天中最放松的时光!",
"每次看到你,都觉得世界变得更美好了!",
"你的直播充满了温暖和力量!",
"你的存在就是我们的幸运!",
"你的直播总是那么富有创造力和想象力!",
"你是那么的不同凡响,总能带来惊喜!",
"你的直播是我的精神食粮!",
"看着你的成长和进步,我们都为你感到骄傲!",
"你总是那么的充满魅力和活力!",
"你的每一次直播都是一次美好的旅行!",
"你的存在让我们的生活充满了乐趣!",
"你是我们心中的英雄,永远支持你!",
"你的直播总是能点亮我们的生活!",
"每次听你说话都特别有感染力!",
"你的直播总是那么有趣,让人忍不住一直看!",
"你的每一次表演都是那么精彩,无法挪开眼!",
"你的直播总是给我带来好心情,谢谢你!",
"每次看你直播都有新的收获,真的很棒!",
"你的直播里总有无尽的正能量,真的很喜欢!",
"你的直播总是那么温馨,感觉像回到家一样!",
"你是我们的快乐小天使,每次看到你都特别开心!",
"你的每一次直播都是我们的期待!",
"你的直播中总有许多惊喜,让人意犹未尽!",
"每次看你的直播都能感受到你的用心!",
"你的直播就像一股清泉,沁人心脾!",
"你的每一次直播都是我们的精神食粮!",
"你的直播总能带给我不一样的感受,太棒了!",
"你的直播充满了智慧和趣味,真是太有才了!",
"你的直播总是能给我们带来欢乐和知识,感谢你!",
"每次看你直播都有种被治愈的感觉!",
"你的直播总是那么充满活力,真是太赞了!",
"你的直播给了我们很多快乐,永远支持你!",
]
# 随机选择一个评论内容
content = random.choice(comments)
payload = {
"content": content,
"circleId": circle_id,
"relatedId": related_id,
"contentType": 0, # contentType固定为0
"dailyTaskId": daily_task_id,
"topCommentId": 0, # topCommentId固定为0
}
#print(payload) # 打印请求内容
try:
response = requests.post(url, headers=headers, json=payload)
if response.status_code == 200:
response_data = response.json()
print(f"💬评论成功,圈子ID: {circle_id}, 主播ID: {related_id}, ✍️: '{content}'")
#print(f"评论 完整响应内容: {response_data}") # 打印完整的响应内容
# 检查 response_data['data'] 和 response_data['data']['taskResult'] 是否存在
if response_data.get('data') and response_data['data'].get('taskResult'):
task_name = response_data['data']['taskResult'].get('taskName', '未知任务')
single_reward = response_data['data']['taskResult'].get('singleReward', '未知奖励')
print(f" 🎉: {task_name}, 奖励: {single_reward}")
print()
else:
#print("评论成功,但未获取到任务详情。")
print()
else:
print(f"评论失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
def user_info(sso):
url = BASE_URL + "user"
headers = create_headers(sso)
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
response_json = response.json()
user_data = response_json.get('data', {})
if user_data is None: # 检查 user_data 是否为 None
#print(f"未能获取到 {sso} 的用户数据。")
return None
user_id = user_data.get('id', '')
integral = user_data.get('integral', '')
history_integral = user_data.get('historyIntegral', '')
#print(f"用户ID: {user_id}, 当前积分: {integral}, 历史积分: {history_integral}")
return user_id, integral, history_integral # 以元组形式返回
else:
print(f"获取用户信息失败,状态码:{response.status_code}, 响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
return None # 如果请求失败或异常,返回None
def fetch_hitokoto(): #一言
url_hitokoto = 'https://v1.hitokoto.cn/'
# 设置请求参数
params = {
'c': 'k', # 类型为哲学
'min_length': 10 # 设置返回句子的最小长度为 10
}
try:
response_hitokoto = requests.get(url_hitokoto, params=params)
if response_hitokoto.status_code == 200:
data = response_hitokoto.json()
return data.get('hitokoto')
else:
# 主要 API 请求失败,尝试备用 API
return fetch_hitokoto_backup()
except requests.exceptions.RequestException:
# 主要 API 请求异常,尝试备用 API
return fetch_hitokoto_backup()
def fetch_hitokoto_backup(): # 备用一言
url_backup = 'https://api.7585.net.cn/yan/api.php?charset=utf-8'
try:
response_backup = requests.get(url_backup)
if response_backup.status_code == 200:
return response_backup.text.strip() # 返回备用 API 的响应内容
else:
return f"备用API请求失败,状态码:{response_backup.status_code}"
except requests.exceptions.RequestException as e:
return f"备用API请求异常: {e}"
def rw_post(sso, content, task_id): # 发帖
headers = create_headers(sso)
url = BASE_URL + "posts"
sid = int(time.time() * 1000) # 生成时间戳
data = {
"topicNames": [],
"content": content,
"groupId": 0,
"groupClassifyId": 0,
"attachments": [],
"voteType": 0,
"commentType": "0",
"dailyTaskId": task_id,
"platform": "android",
"sid": sid
}
#print(data)
try:
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
response_json = response.json()
print(f"💬发帖成功,✍️{content}, 任务ID: {task_id}, 时间戳: '{sid}'")
# 检查 response_json['data'] 是否存在
if response_json.get('data'):
task_name = response_json['data'].get('taskName', '未知任务')
single_reward = response_json['data'].get('singleReward', '未知奖励')
print(f" 🎉: {task_name}, 单次奖励: {single_reward}")
print()
else:
#print("发帖成功,但未获取到任务详情。")
print()
#print(f"发帖完整响应内容: {response.text}")
else:
print(f"发帖请求失败,状态码:{response.status_code}")
print(f"完整响应内容: {response.text}")
except requests.exceptions.RequestException as e:
print(f"请求异常: {e}")
def hqjljl(sso): # 奖励记录 积分
"""获取奖励记录,并返回当天的积分总和"""
records_url = BASE_URL + f'user/integralRecord?pageNum=1&pageSize=20'
headers = create_headers(sso) # 使用 create_headers 函数创建 headers
try:
response = requests.get(records_url, headers=headers)
if response.status_code == 200:
data = response.json()
total_points_today = 0
today = datetime.now(timezone(timedelta(hours=8))).date() # 获取当前日期
for record in data.get('data', {}).get('list', []):
# 提取 remark、changeValue 和 changeTime
remark = record.get('remark')
change_value = record.get('changeValue')
change_time = record.get('changeTime')
# 将时间戳转换为北京时间
beijing_time = datetime.fromtimestamp(change_time / 1000, timezone(timedelta(hours=8)))
formatted_time = beijing_time.strftime('%Y-%m-%d %H:%M:%S')
# 判断记录是否为当天的
if beijing_time.date() == today:
total_points_today += change_value
# 在循环内部打印每条记录的详细信息
#print(f"任务: {remark}, 积分: {change_value}, 时间: {formatted_time}")
print(f"今日积分: {total_points_today}")
return total_points_today # 返回当天的积分总和
else:
print(f"获取奖励记录失败,状态码:{response.status_code}, 响应内容: {response.text}")
return None # 获取失败时返回 None
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
return None # 请求异常时返回 None
#本地测试用
os.environ['XSSONF1'] = '''
Wmeimob_eyJ0ebGciOiJIUzI1NiJ9.eyJzdWIiOiIx#大号
'''
class Tee:
def __init__(self, *files):
self.files = files
def write(self, obj):
for file in self.files:
file.write(obj)
file.flush() # 确保及时输出
def flush(self):
for file in self.files:
file.flush()
# 主函数
def main():
var_name = 'XSSONF'
tokens = get_env_variable(var_name)
if not tokens:
return
yxsl = len(tokens) # 账号总数
# 首先对每个账号运行 rwlb(sso)
for i, token in enumerate(tokens):
parts = token.split('#')
if len(parts) < 2:
print("令牌格式不正确。跳过处理。")
continue
sso = parts[0]
account_no = parts[1]
print(f'------账号 {i+1}/{yxsl} {account_no} -------')
rwlb(sso) # 任务列表
# 设置Tee类实例并开始捕获输出
original_stdout = sys.stdout # 保存原始stdout
string_io = io.StringIO() # 创建StringIO对象以捕获输出
sys.stdout = Tee(sys.stdout, string_io) # 将stdout重定向
# 所有账号运行完 rwlb(sso) 后再统一运行 hqjljl(sso) 和 user_info(sso)
for i, token in enumerate(tokens):
parts = token.split('#')
if len(parts) < 2:
continue # 如果令牌格式不正确,继续下一个
sso = parts[0]
account_no = parts[1]
print(f'---账号{i+1}/{yxsl} {account_no}---')
user_info_result = user_info(sso) # 获取 user_info 函数的返回值
if user_info_result is None:
print(f"由于某些原因/过期/不正确 跳过此账号。")
continue # 跳过此次循环的剩余部分
# 解包 user_info 函数返回的元组
user_id, integral, history_integral = user_info_result
print(f"🎊当前积分: {integral}, 历史积分: {history_integral}") # 打印积分信息
hqjljl(sso) # 处理奖励
# 捕获完成后,重置sys.stdout并获取内容
sys.stdout = original_stdout # 重置stdout
output_content = string_io.getvalue() # 获取捕获的输出
# 如果需要发送通知
if enable_notification == 1:
send("心喜-通知", output_content)
if __name__ == "__main__":
main()