-
Notifications
You must be signed in to change notification settings - Fork 2
/
rocketchat_notif_queue.py
79 lines (72 loc) · 2.71 KB
/
rocketchat_notif_queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# Add libraries
from pymongo import MongoClient
from dotenv import load_dotenv
import os
import logging
#--------------------------------------------------------
# Logging
def _logger(_env:str):
_log_format = "%(asctime)s %(levelname)s %(message)s"
logging.basicConfig(filename=_getEnv("LOG_FILE"), filemode='w', level=logging.INFO, format=_log_format)
logging.info(_env)
#--------------------------------------------------------
# Output file
def _openFile(_env:str): #Output File
ABS_PATH = _getEnv("OUTPUT_PATH")
if _env == "output_notification_queue":
FILE_NAME = "output_notification_queue.prom"
NOTIFICATION_QUEUE_SIZE = open(file= ABS_PATH + FILE_NAME, mode="w")
return (NOTIFICATION_QUEUE_SIZE)
#--------------------------------------------------------
# Get Environments
def _getEnv(_env: str):
load_dotenv()
if _env == "URI":
_replicaSet = os.getenv("REPLICA_SET")
return (str(_replicaSet))
elif _env == "NotifQueue":
COLLECTION = os.getenv("COLL_NOTIFQUEUE")
return (str(COLLECTION))
elif _env == "OUTPUT_PATH":
_outputpath = os.getenv("OUTPUT_PATH")
return (str(_outputpath))
else:
pass
#--------------------------------------------------------
# Run query
def notifQueueSize():
QUERY = _mongoConnection("NotifQueue").count_documents({})
if QUERY <= 250:
return (QUERY)
elif QUERY > 250:
_mongoConnection("NotifQueue").delete_many({})
return (QUERY)
#--------------------------------------------------------
def _abuseTokenApp(): #Count Abuse Application users
CLIENT = MongoClient(_getEnv("REPLICA_SET"))
DB = CLIENT.get_database(name="rocketchat")
COLLECTION_NAMES = DB.list_collection_names()
for item in COLLECTION_NAMES:
if item == "_raix_push_app_tokens":
TOKEN_COLLECTION = DB.get_collection(name=item)
QUERY = ({"appName" : {"$nin" : ["com.osalliance.rocketchatMobile", "com.osalliance.RocketchatMobile","com.osalliance.RocketChatMobile","chat.rocket.reactnative"]}})
RESULT = TOKEN_COLLECTION.count_documents(filter=QUERY)
# return(RESULT)
print(RESULT)
#--------------------------------------------------------
# Create mongo connection
def _mongoConnection(_env: str):
MONGO_CLIENT = MongoClient(_getEnv("URI"))
DATABASE = MONGO_CLIENT.rocketchat
COLLECTION = DATABASE[_getEnv("NotifQueue")]
return (COLLECTION)
#--------------------------------------------------------
# Main script
def main():
# notificationQueueSize = notifQueueSize()
deleteAbuseToken = _abuseTokenApp()
print("Total_Notification_Queue_Size:", notificationQueueSize, file=_openFile("output_notification_queue"))
#--------------------------------------------------------
# Execution
if __name__ == "__main__":
main()