From 0e6326934a251739d5f21712241c9715dd631213 Mon Sep 17 00:00:00 2001 From: The n6 Development Team Date: Wed, 1 Dec 2021 06:42:32 +0100 Subject: [PATCH] Version 3.0.0 (final) --- .n6-version | 2 +- CHANGELOG.md | 40 +- N6BrokerAuthApi/n6brokerauthapi/__init__.py | 2 +- N6Core/README.md | 16 +- N6Core/n6/archiver/archive_raw.py | 58 +- N6Core/n6/data/conf/00_pipeline.conf | 2 +- N6DataPipeline/console_scripts | 2 +- N6DataPipeline/n6datapipeline/archive_raw.py | 943 ++++++++++++++++++ .../n6datapipeline/data/conf/00_pipeline.conf | 2 +- .../n6datapipeline/tests/test_stub.py | 2 - do_setup.py | 10 +- docs/changelog.md | 1 + docs/docker.md | 8 +- docs/guides/api/streamapi.md | 114 +++ docs/guides/new_source/index.md | 2 +- docs/installation/configuration.md | 7 - docs/installation/index.md | 10 +- docs/installation/n6_core.md | 7 - docs/installation/n6_web.md | 3 - docs/installation/system.md | 6 - mkdocs.yml | 5 +- test_do_setup.py | 8 +- 22 files changed, 1159 insertions(+), 91 deletions(-) create mode 100644 N6DataPipeline/n6datapipeline/archive_raw.py delete mode 100644 N6DataPipeline/n6datapipeline/tests/test_stub.py create mode 120000 docs/changelog.md create mode 100644 docs/guides/api/streamapi.md diff --git a/.n6-version b/.n6-version index 005e92c..4a36342 100644 --- a/.n6-version +++ b/.n6-version @@ -1 +1 @@ -3.0.0b3 +3.0.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index 511b7e7..97a87a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,13 +1,45 @@ # Changelog -Starting with the 3.0.0 release, all notable changes applied to -the code of _n6_ will be continuously documented in this file. +Starting with the 3.0.0 release, all notable changes applied to the +[code of _n6_](https://github.com/CERT-Polska/n6) are continuously +documented here. The format of this file is based, to much extent, on [Keep a Changelog](https://keepachangelog.com/). -## 3.0.0b... (beta releases...) - since 2021-10-13... +## [3.0.0] - 2021-12-01 -TBD in the description of the 3.0.0 final release (soon...). +**This release is a big milestone.** It includes, among others: +* migration to Python 3 +* in the *n6* data pipeline infrastructure: optional integration + with [IntelMQ](https://github.com/certtools/intelmq) +* in the *n6 Portal:* a new frontend (implemented using + [React](https://reactjs.org/)), two-factor authentication + (based on [TOTP](https://datatracker.ietf.org/doc/html/rfc6238)), + user's/organization's own data management (including config update + and password reset forms, with related e-mail notices), and other + goodies... +* in the *n6 REST API:* API-key-based authentication +* and many, many more improvements, a bunch of fixes, as well as + some refactorization, removals and cleanups... + +Beware that many of the changes are *not* backwards-compatible. + +Note that most of the main elements of *n6* -- namely: +`N6DataPipeline`, `N6DataSources`, `N6Portal`, `N6RestApi`, +`N6AdminPanel`, `N6BrokerAuthApi`, `N6Lib` and `N6SDK` -- are now +*Python-3-only* (more precisely: are compatible with CPython 3.9). + +The legacy, *Python-2-only* stuff -- most of which are *collectors* and +*parsers* (external-data-sources-related components) -- reside in +`N6Core` and `N6CoreLib`; the collectors and parsers placed in `N6Core`, +if related to non-obsolete external data sources, will be gradually +migrated to *Python-3-only* `N6DataSources` (so that, finally, we will +be able to rid of `N6Core` and `N6CoreLib`). There are also +*Python-2-only* variants of `N6Lib` and `N6SDK`: `N6Lib-py2` and +`N6SDK-py2` (needed only as dependencies of `N6Core`/`N6CoreLib`). + + +[3.0.0]: https://github.com/CERT-Polska/n6/compare/v2.0.6a2-dev1...v3.0.0 diff --git a/N6BrokerAuthApi/n6brokerauthapi/__init__.py b/N6BrokerAuthApi/n6brokerauthapi/__init__.py index 16c7791..be1374d 100644 --- a/N6BrokerAuthApi/n6brokerauthapi/__init__.py +++ b/N6BrokerAuthApi/n6brokerauthapi/__init__.py @@ -1,5 +1,5 @@ # Copyright (c) 2013-2021 NASK. All rights reserved. -#TODO: Module modernized to Python 3, but no changes detected, comment to be deleted after MR + """ This package provides a REST API implementation intended to cooperate with `rabbitmq-auth-backend-http` -- the RabbitMQ AMQP message broker's diff --git a/N6Core/README.md b/N6Core/README.md index 762275c..4b9fd6f 100644 --- a/N6Core/README.md +++ b/N6Core/README.md @@ -1,8 +1,10 @@ -**Note:** `N6Core` contains legacy *Python-2-only* stuff. Typically, -you will want to use -- instead of it -- the new, *Python-3-only* stuff -residing in `N6DataPipeline`. +**Note:** `N6Core` contains legacy *Python-2-only* stuff. -Then it comes to data sources -- i.e., collectors and parsers -- -`N6DataSources` is the place where new sources should be implemented -(in Python 3). The collectors and parsers residing in `N6Core` will -be gradually migrated to `N6DataSources` (if not obsolete). +When it comes to the basic *n6* pipeline components, please use the new, +*Python-3-only* stuff residing in `N6DataPipeline`. + +When it comes to the data-sources-related components -- i.e., collectors +and parsers -- `N6DataSources` is the place where any new stuff is to be +implemented (in Python 3). The collectors and parsers residing in +`N6Core` will be gradually migrated to `N6DataSources` (for those data +sources than are not obsolete). diff --git a/N6Core/n6/archiver/archive_raw.py b/N6Core/n6/archiver/archive_raw.py index d7e7284..0cfded9 100644 --- a/N6Core/n6/archiver/archive_raw.py +++ b/N6Core/n6/archiver/archive_raw.py @@ -5,11 +5,11 @@ Component archive_raw -- adds raw data to the archive database (MongoDB). A new source is added as a new collection. """ +from __future__ import print_function #3-- import datetime import hashlib import itertools -import math import os import socket import subprocess @@ -20,10 +20,10 @@ import gridfs import pymongo -from gridfs import GridFS from bson.json_util import loads from bson.json_util import dumps +from n6lib.common_helpers import open_file from n6lib.config import Config from n6.base.queue import QueuedBase, n6QueueProcessingException from n6lib.log_helpers import get_logger, logging_configured @@ -39,16 +39,16 @@ def backup_msg(fname, collection, msg, header): - with open(fname, 'w') as f: - if isinstance(msg, basestring): - payload = (msg.encode('utf-8') if isinstance(msg, unicode) + with open_file(fname, 'w') as f: #3: add 'b'-mode flag + if isinstance(msg, (bytes, unicode)): #3: unicode->str + payload = (msg.encode('utf-8') if isinstance(msg, unicode) #3: unicode->str else msg) else: - payload = (repr(msg).encode('utf-8') if isinstance(repr(msg), unicode) + payload = (repr(msg).encode('utf-8') if isinstance(repr(msg), unicode) #3: unicode->str else repr(msg)) - hdr = (repr(header).encode('utf-8') if isinstance(repr(header), unicode) - else repr(header)) + hdr = (repr(header).encode('utf-8') if isinstance(repr(header), unicode) #3: unicode->str + else repr(header)) f.write('\n'.join(( collection, hdr, payload ))) @@ -58,8 +58,8 @@ def timed(*args, **kw): result = method(*args, **kw) stop = datetime.datetime.now() delta = stop - start - print '%r %r (%r, %r) %r ' % \ - (str(datetime.datetime.now()), method.__name__, args, kw, str(delta)) + print('%r %r (%r, %r) %r ' % \ + (str(datetime.datetime.now()), method.__name__, args, kw, str(delta))) #3: __name__ -> __qualname__ return result return timed @@ -106,11 +106,11 @@ def __init__(self, connection, db_name, collection_name): docs = collection.find().sort("ns", pymongo.ASCENDING) for i in docs: coll = Collection(i['ns'].replace(''.join((db_name, '.')), '')) - self.add_to_storage(coll, i['key'].keys()[0]) + self.add_to_storage(coll, list(i['key'].keys())[0]) @staticmethod def add_to_storage(collection, index): - if collection.name not in IndexesStore._collections_tmp_store.keys(): + if collection.name not in list(IndexesStore._collections_tmp_store.keys()): # new collection, add index, and initialize key in store dict collection.indexes.append(index) IndexesStore._collections_tmp_store.update({collection.name: collection}) @@ -122,7 +122,7 @@ def add_to_storage(collection, index): def name_of_indexed_collection_n6(): # simple select a collection, no system and no tip chunks # to check the amount of indexes - return [name for name in IndexesStore._collections_tmp_store.keys() + return [name for name in list(IndexesStore._collections_tmp_store.keys()) if ('.chunks' not in name) and name not in ('n6.system.namespaces')] @staticmethod @@ -573,7 +573,7 @@ def input_callback(self, routing_key, body, properties): # Add to archive if writing: type_ = properties.type - payload = (body.encode('utf-8') if isinstance(body, unicode) + payload = (body.encode('utf-8') if isinstance(body, unicode) #3: unicode -> str else body) if type_ == 'stream': @@ -591,7 +591,7 @@ def input_callback(self, routing_key, body, properties): "Unknown message type: {0}, source: {1}".format(type_, routing_key)) #finally: # self.__tf.append(time.time() - t0) - # if next(self.__count) % 5000 == 0: + # if next(self.__count) % 5000 == 0: #spr. # try: # LOGGER.critical('ARCHIVE-RAW INPUT CALLBACK TIMES: min %s, avg %s', # min(tf), @@ -686,14 +686,14 @@ def init_files(self): self.list_tmp_files.append((self.tempfilefd_ver, self.tempfile_ver)) # save orig init file - with open(self.tempfile_file_init, 'w') as fid: + with open_file(self.tempfile_file_init, 'w') as fid: LOGGER.debug('WTF: %r', type(self.payload)) fid.write(self.payload) self.file_init = self.tempfile_file_init for fd, fn in self.list_tmp_files: os.close(fd) - os.chmod(fn, 0644) + os.chmod(fn, 0o644) LOGGER.debug('run blacklist init tmp files') @safe_mongocall @@ -750,7 +750,7 @@ def get_patches(self): "marker": self.marker_db_init} ).sort("received", pymongo.DESCENDING).limit(1) - row = cursor.next() + row = next(cursor) date = row["received"] first_file_id = row["_id"] @@ -773,7 +773,7 @@ def save_diff_in_db(self, files): Return: None """ file1, file2 = files - f_sout = open(self.tempfile_patch_u, "w") + f_sout = open_file(self.tempfile_patch_u, "w") if BlackListCompacter.init: BlackListCompacter.init = 0 subprocess.call("diff -u " + file1 + " " + file2, @@ -781,7 +781,7 @@ def save_diff_in_db(self, files): f_sout.close() self.save_file_in_db(self.marker_db_init, - open(self.tempfile_patch_u, 'r').read()) + open_file(self.tempfile_patch_u, 'r').read()) LOGGER.debug(' marker init in db:%s ', self.marker_db_init) else: subprocess.call("diff -u " + file1 + " " + @@ -789,7 +789,7 @@ def save_diff_in_db(self, files): f_sout.close() self.save_file_in_db(self.marker_db_diff, - open(self.tempfile_patch_u, 'r').read()) + open_file(self.tempfile_patch_u, 'r').read()) LOGGER.debug('marker in period in db :%s ', self.marker_db_diff) def generate_orig_file(self, cursor, file_id): @@ -807,7 +807,7 @@ def generate_orig_file(self, cursor, file_id): # generate first file files_count = 1 # stdout in file - f_sout = open(self.tempfile_patch_u, "w") + f_sout = open_file(self.tempfile_patch_u, "w") # first diff file post init in GENERATE_ALL_FILE mode if cursor.count() > 0 and BlackListCompacter.generate_all_file: out = subprocess.call("patch " + self.tempfile_file + " -i " + @@ -818,7 +818,7 @@ def generate_orig_file(self, cursor, file_id): self.list_tmp_files.append((self.tempfile_ver, self.tempfile_ver + str(files_count - 1))) - os.chmod(self.tempfile_ver + str(files_count - 1), 0644) + os.chmod(self.tempfile_ver + str(files_count - 1), 0o644) # # first diff file post init in GENERATE_ONE_FILE mode elif cursor.count() > 0 and (not BlackListCompacter.generate_all_file): @@ -830,7 +830,7 @@ def generate_orig_file(self, cursor, file_id): else: file_db = self.dbm.get_file_from_db_raw(file_id) - patch_file = open(self.tempfile_patch_tmp, 'w') + patch_file = open_file(self.tempfile_patch_tmp, 'w') patch_file.write(file_db) patch_file.close() @@ -844,7 +844,7 @@ def generate_orig_file(self, cursor, file_id): # set prev id in current doc. self.prev_id = id_dba file_db = self.dbm.get_file_from_db_raw(id_dba) - patch_file = open(self.tempfile_patch_tmp, 'w') + patch_file = open_file(self.tempfile_patch_tmp, 'w') patch_file.write(file_db) patch_file.close() @@ -860,7 +860,7 @@ def generate_orig_file(self, cursor, file_id): self.tempfile_patch_tmp + " -o " + self.tempfile_ver + str(files_count), stdout=f_sout, stderr=subprocess.STDOUT, shell=True) - os.chmod(self.tempfile_ver + str(files_count), 0644) + os.chmod(self.tempfile_ver + str(files_count), 0o644) LOGGER.debug('patch_all_files(return code): %r', out) else: @@ -890,7 +890,7 @@ def start(self): self.marker_db_diff = files_count LOGGER.debug('files_count: %r', files_count) - except StopIteration, exc: + except StopIteration as exc: # first file LOGGER.warning('First file, initialize: %r', exc) BlackListCompacter.init = 1 @@ -936,12 +936,12 @@ def main(): except socket.timeout as exc: # at the moment need to capture sys.exit tool for monitoring LOGGER.critical('socket.timeout: %r', exc) - print >> sys.stderr, exc + print(exc, file=sys.stderr) sys.exit(1) except socket.error as exc: # at the moment need to capture sys.exit tool for monitoring LOGGER.critical('socket.error: %r', exc) - print >> sys.stderr, exc + print(exc, file=sys.stderr) sys.exit(1) diff --git a/N6Core/n6/data/conf/00_pipeline.conf b/N6Core/n6/data/conf/00_pipeline.conf index 9ffcabc..d1bef04 100644 --- a/N6Core/n6/data/conf/00_pipeline.conf +++ b/N6Core/n6/data/conf/00_pipeline.conf @@ -25,4 +25,4 @@ comparator = enriched filter = enriched, compared anonymizer = filtered recorder = filtered -counter= recorded +counter = recorded diff --git a/N6DataPipeline/console_scripts b/N6DataPipeline/console_scripts index 337eec6..914dbdf 100644 --- a/N6DataPipeline/console_scripts +++ b/N6DataPipeline/console_scripts @@ -1,4 +1,4 @@ -#n6archiveraw = n6datapipeline.archive_raw:main +n6archiveraw = n6datapipeline.archive_raw:main n6aggregator = n6datapipeline.aggregator:main n6enrich = n6datapipeline.enrich:main n6comparator = n6datapipeline.comparator:main diff --git a/N6DataPipeline/n6datapipeline/archive_raw.py b/N6DataPipeline/n6datapipeline/archive_raw.py new file mode 100644 index 0000000..7c13545 --- /dev/null +++ b/N6DataPipeline/n6datapipeline/archive_raw.py @@ -0,0 +1,943 @@ +""" +Component archive_raw -- adds raw data to the archive database (MongoDB). +A new source is added as a new collection. +""" + +import datetime +import hashlib +import itertools +import os +import socket +import subprocess +import sys +import tempfile +import time +import re + +import gridfs +import pymongo +from bson.json_util import loads +from bson.json_util import dumps + +from n6lib.common_helpers import open_file +from n6lib.config import Config +from n6datapipeline.base import LegacyQueuedBase, n6QueueProcessingException +from n6lib.log_helpers import get_logger, logging_configured + + +LOGGER = get_logger(__name__) + +FORBIDDEN_DB_NAME_CHAR = '/\\." \n\t\r' +FORBIDDEN_COLLECTION_NAME_CHAR = '$ \n\t\r' +INSUFFICIENT_DISK_SPACE_CODE = 17035 + +first_letter_collection_name = re.compile("^(?!system)[a-z_].*", re.UNICODE) + + +def backup_msg(fname, collection, msg, header): + with open_file(fname, 'wb') as f: + if isinstance(msg, (bytes, str)): + payload = (msg.encode('utf-8') if isinstance(msg, str) + else msg) + else: + payload = (ascii(msg).encode('utf-8') if isinstance(ascii(msg), str) + else ascii(msg)) + + hdr = (ascii(header).encode('utf-8') if isinstance(ascii(header), str) + else ascii(header)) + f.write('\n'.join(( collection, hdr, payload ))) + + +def timeit(method): + def timed(*args, **kw): + start = datetime.datetime.now() + result = method(*args, **kw) + stop = datetime.datetime.now() + delta = stop - start + print('%a %a (%a, %a) %a ' % \ + (str(datetime.datetime.now()), method.__qualname__, args, kw, str(delta))) + return result + return timed + + +def safe_mongocall(call): + def _safe_mongocall(*args, **kwargs): + count_try_connection = 86400 # 5 days + while True: + try: + return call(*args, **kwargs) + except pymongo.errors.AutoReconnect: + LOGGER.error("Cannot connect to mongodb. Retrying...") + time.sleep(5) + count_try_connection -= 1 + ob = args[0] + if (isinstance(ob, JsonStream) or + isinstance(ob, FileGridfs) or + isinstance(ob, BlackListCompacter)): + LOGGER.debug("backup_msg") + try: + backup_msg(ob.dbm.backup_msg, ob.dbm.currcoll, ob.data, ob.headers) + except Exception as exc: + LOGGER.debug('backup_msg_error: %a', exc) + ob.dbm.get_connection() + elif isinstance(ob, DbManager): + LOGGER.debug("backup_msg") + try: + backup_msg(ob.backup_msg, ob.currcoll, ob.backup_msg_data, ob.backup_msg_headers) + except Exception as exc: + LOGGER.error('backup_msg_error: %a', exc) + ob.get_connection() + if count_try_connection < 1: + LOGGER.error("Could not connect to mongodb. Exiting...") + sys.exit(1) + return _safe_mongocall + + +class IndexesStore: + _collections_tmp_store = {} + + def __init__(self, connection, db_name, collection_name): + self.db = connection[db_name] + collection = self.db[collection_name] + docs = collection.find().sort("ns", pymongo.ASCENDING) + for i in docs: + coll = Collection(i['ns'].replace(''.join((db_name, '.')), '')) + self.add_to_storage(coll, list(i['key'].keys())[0]) + + @staticmethod + def add_to_storage(collection, index): + if collection.name not in list(IndexesStore._collections_tmp_store.keys()): + # new collection, add index, and initialize key in store dict + collection.indexes.append(index) + IndexesStore._collections_tmp_store.update({collection.name: collection}) + else: + # collection in store, add only new index name + IndexesStore._collections_tmp_store[collection.name].indexes.append(index) + + @staticmethod + def name_of_indexed_collection_n6(): + # simple select a collection, no system and no tip chunks + # to check the amount of indexes + return [name for name in list(IndexesStore._collections_tmp_store.keys()) + if ('.chunks' not in name) and name not in ('n6.system.namespaces')] + + @staticmethod + def cleanup_store(): + IndexesStore._collections_tmp_store = {} + + +class Collection: + __slots__ = ['name', 'indexes'] + + def __init__(self, name): + self.name = name + self.indexes = [] + + +class DbManager: + """""" + + def __init__(self, config=None): + """ + Args: + config: dict containing: mongohost, mongoport, mongodb, + count_try_connection, time_sleep_between_try_connect, uri + """ + if config is None: + config = Config(required={"archiveraw": ("mongohost", + "mongoport", + "mongodb", + "count_try_connection", + "time_sleep_between_try_connect", + "uri")}) + self.config = config["archiveraw"] + else: + self.config = config + self.host = self.config['mongohost'] + self.port = int(self.config['mongoport']) + self._currdb = self.config['mongodb'] + self.uri = self.config["uri"] + self.connection = None + self._currcoll = None + self.conn_gridfs = None + self.time_sleep_between_try_connect = int(self.config['time_sleep_between_try_connect']) + self.count_try_connection = int(self.config['count_try_connection']) + self.indexes_store = [] + self.backup_msg = '.backup_msg' + self.backup_msg_data = None + self.backup_msg_headers = None + + def get_connection(self): + """ + Get a connection to MongoDB. + Try `self.count_try_connection` times, then (if not succeeded) + raise SystemExit. + + Returns: + `self.connection`, as returned by MongoClient(, ). + + Raises: + SystemExit + """ + count_try_connection = self.count_try_connection + while True: + try: + #self.connection = pymongo.mongo_client.MongoClient(self.host, port=self.port) + self.connection = pymongo.mongo_client.MongoClient(self.uri, + sockettimeoutms=2000, + connecttimeoutms=2000, + waitqueuetimeoutms=2000, + ) + return self.connection + + except pymongo.errors.ConnectionFailure: + LOGGER.error("Cannot connect to mongodb@ %s:%s. Retrying...", + self.host, self.port) + time.sleep(self.time_sleep_between_try_connect) + count_try_connection -= 1 + if count_try_connection < 1: + LOGGER.error("Cannot connect to mongodb@ %s:%s. Exiting...", + self.host, self.port) + sys.exit(1) + + @safe_mongocall + def get_conn_db(self): + """Get connection to db.""" + return self.connection[self.currdb] + + @safe_mongocall + def get_conn_collection(self, gridfs=False): + """Get connection to collection.""" + return self.get_conn_db()[self.currcoll] + + @safe_mongocall + def get_conn_gridfs(self): + """Get connection to gridfs api to put, and get files.""" + assert self.currcoll, 'not set self.currcoll' + self.conn_gridfs = gridfs.GridFS(self.get_conn_db(), collection=self.currcoll) + + @safe_mongocall + def put_file_to_db(self, data, **kwargs): + """Put file in mongo.""" + assert self.conn_gridfs, 'not set self.conn_gridfs' + return self.conn_gridfs.put(data, **kwargs) + + @safe_mongocall + def get_file_from_db(self, id_): + """Get file from db.""" + assert self.conn_gridfs, 'not set self.conn_gridfs' + return str(self.conn_gridfs.get(id_).read()) + + @safe_mongocall + def get_file_from_db_raw(self, id_): + """Get file from db, raw not str.""" + assert self.conn_gridfs, 'not set self.conn_gridfs' + return self.conn_gridfs.get(id_).read() + + @property + def currdb(self): + return self._currdb + + @currdb.setter + def currdb(self, value): + value_str = str(value) + if len(value_str) >= 64 or len(value_str) < 1: + LOGGER.error('to long db name in mongo, max 63 chars, min 1 char : %a', value_str) + raise n6QueueProcessingException("to long db name in mongo, max 63 chars, min 1 char" + ": {0}".format(value_str)) + for forbidden_char in FORBIDDEN_DB_NAME_CHAR: + if forbidden_char in value_str: + LOGGER.error('name of db: %a, contains forbidden_char: %a', value_str, + forbidden_char) + raise n6QueueProcessingException("name of db: {}, " + "contains forbidden_char: {}".format(value_str, forbidden_char)) + self._currdb = value + + @property + def currcoll(self): + return self._currcoll + + @currcoll.setter + def currcoll(self, value): + if value is None: + self._currcoll = value + return + value_str = str(value) + m = re.match(first_letter_collection_name, value_str) + if not m or len(value_str) < 1: + raise n6QueueProcessingException('Collection names should begin with an underscore ' + 'or a letter character, and not be an empty string ' + '(e.g. ""), and not begin with the system. prefix. ' + '(Reserved for internal use.)') + for forbidden_char in FORBIDDEN_COLLECTION_NAME_CHAR: + if forbidden_char in value_str: + LOGGER.error('name of collection: %a, contains forbidden_char: %a', value_str, + forbidden_char) + raise n6QueueProcessingException("name of collection: {0}, " + "contains forbidden_char: {1}". + format(value_str, forbidden_char)) + self._currcoll = value + + def database_exists(self): + """Check if the database exists on the server.""" + return self.currdb in self.connection.database_names() + + def collection_exists(self): + """Check if the collection exists in the database. + Not very good in terms of performance!.""" + if self.currcoll not in self.get_conn_db().collection_names(): + # only for manageApi + return self.currcoll + '.files' in self.get_conn_db().collection_names() + return self.currcoll in self.get_conn_db().collection_names() + + def initialize_index_store(self): + if self.connection: + IndexesStore.cleanup_store() + index_store = IndexesStore(self.connection, self.currdb, 'system.indexes') + self.indexes_store = index_store.name_of_indexed_collection_n6() + else: + LOGGER.error('No connection to initialize index store') + + +class MongoConnection: + """ + MongoConnection - a set of common attributes of classes + (JsonStream, FileGridfs, BlackListCompacter). + + Args: + `dbmanager` : object DbManager type. + `properties` : properties from AMQP. Required for the next processing + `**kwargs` : (dict with additional data) + """ + indexes_common = ['rid', 'received', 'md5'] + + def __init__(self, dbmanager=None, properties=None, **kwargs): + self.dbm = dbmanager + self.data = {} + self.raw = None + self.content_type = None + self.headers = {} + if properties: + if properties.headers: + self.headers = properties.headers.copy() + if "meta" in properties.headers: + self.headers['meta'].update(properties.headers['meta']) + else: + # empty meta, add key meta, adds key meta + # another data such. rid, received, contentType.... + self.headers["meta"] = {} + LOGGER.debug('No "meta" in headers: %a', properties.headers) + else: + # empty headers, add key meta, adds key + # meta another data such. rid, received, contentType.... + self.headers["meta"] = {} + LOGGER.debug('Empty headers: %a', properties.headers) + + if properties.type in ('file', 'blacklist'): + # content_type required fo type file and blacklist + try: + self.headers['meta'].update({'contentType': properties.content_type}) + except AttributeError as exc: + LOGGER.error('No "content_type" in properties: %a', properties.headers) + raise + # always add + self.headers['meta'].update({'rid': properties.message_id, + 'received': self.get_time_created(properties.timestamp)}) + else: + # empty properties, it is very bad + raise n6QueueProcessingException("empty properties, it is very bad" + ": {0}".format(properties)) + + def get_time_created(self, ts): + try: + return datetime.datetime.utcfromtimestamp(ts) + except TypeError as exc: + LOGGER.error("Bad type timestamp: %a, exc: %a, collection: %a", ts, exc, + self.dbm.currcoll) + raise + + def create_indexes(self, coll): + """Create indexes on new collection.""" + for idx in MongoConnection.indexes_common: + LOGGER.info("Create indexes: %a on collection: %a", idx, coll.name) + coll.create_index(idx) + # refresh indexes store + self.dbm.initialize_index_store() + + +class JsonStream(MongoConnection): + """ + This class is responsible for the different types of writing to mongo. + This class extJson|Json format stores only + (http://docs.mongodb.org/manual/reference/mongodb-extended-json/). + JsonStream inherits from the MongoConnection. + """ + def preparations_data(self, data): + """ + Data preparation. + + Args: + `data` : data from AMQP. + + Raises: + `n6QueueProcessingException` when except processing data. + """ + try: + self.raw = loads(data) + # calculate md5, inplace its fastest + self.headers['meta'].update({ + 'md5': hashlib.md5(dumps(self.raw, sort_keys=True).encode('utf-8')).hexdigest()}) + + except Exception as exc: + LOGGER.error('exception when processing: %a %a %a (%a)', + self.dbm.currdb, self.dbm.currcoll, data, exc) + raise + + else: + self.write() + + @safe_mongocall + def write(self): + """ + Write data to db as json store. + + Raises: + `UnicodeDecodeError` when collection name or the database name is not allowed + `pymongo.errors.AutoReconnect` when problem with connection to mongo. + `n6QueueProcessingException` if catch other exception. + """ + LOGGER.debug('Stream inserting...') + LOGGER.debug('HEADER: %a', self.headers) + self.data['data'] = self.raw + self.data['uploadDate'] = datetime.datetime.utcfromtimestamp(time.time()) + self.data.update(self.headers['meta']) + + # for backup msg + self.dbm.backup_msg_data = self.data + self.dbm.backup_msg_headers = self.headers + + try: + try: + if self.dbm.currcoll not in self.dbm.indexes_store: + self.create_indexes(self.dbm.get_conn_collection()) + + self.dbm.get_conn_collection().insert(self.data) + except pymongo.errors.OperationFailure as exc: + if exc.code == INSUFFICIENT_DISK_SPACE_CODE: + sys.exit(ascii(exc)) + raise + except pymongo.errors.AutoReconnect as exc: + LOGGER.error('%a', exc) + raise + except UnicodeDecodeError as exc: + LOGGER.error("collection name or the database name is not allowed: %a, %a, %a", + self.dbm.currdb, self.dbm.currcoll, exc) + raise + except Exception as exc: + LOGGER.error('save data in mongodb FAILED, header: %a , exception: %a', + self.headers, exc) + raise n6QueueProcessingException('save data in mongob FAILED') + else: + LOGGER.debug('Insert done.') + + def gen_md5(self, data): + """Generate md5 hash In the data field.""" + return hashlib.md5(dumps(data, sort_keys=True)).hexdigest() + + +class FileGridfs(MongoConnection): + """ + This class is responsible for the different types of writing to mongo. + This class files and other binary format stores. + FileGridfs inherits from the MongoConnection. + """ + def preparations_data(self, data): + """ + Data preparation. + + Args: + `data` : data from AMQP. + + Raises: + `n6QueueProcessingException` when except processing data. + """ + + try: + self.data = data + except Exception as exc: + LOGGER.error('exception when processing: %a %a %a (%a)', + self.dbm.currdb, self.dbm.currcoll, data, exc) + raise + else: + self.write() + + @safe_mongocall + def write(self): + """ + Write data to db as GridFS store. + + Raises: + `UnicodeDecodeError` when collection name or the database name is not allowed. + `pymongo.errors.AutoReconnect` when problem with connection to mongo. + `n6QueueProcessingException` if catch other exception. + """ + LOGGER.debug('Binary inserting...') + LOGGER.debug('HEADER: %a', self.headers) + + # for backup msg + self.dbm.backup_msg_data = self.data + self.dbm.backup_msg_headers = self.headers + + try: + try: + self.dbm.get_conn_gridfs() + coll = self.dbm.get_conn_collection().files + if coll.name not in self.dbm.indexes_store: + self.create_indexes(coll) + self.dbm.put_file_to_db(self.data, **self.headers['meta']) + except pymongo.errors.OperationFailure as exc: + if exc.code == INSUFFICIENT_DISK_SPACE_CODE: + sys.exit(ascii(exc)) + raise + except pymongo.errors.AutoReconnect as exc: + LOGGER.error('%a', exc) + raise + except UnicodeDecodeError as exc: + LOGGER.error("collection name or the database name is not allowed: %a, %a, %a", + self.dbm.currdb, self.dbm.currcoll, exc) + raise + except Exception as exc: + LOGGER.error('save data in mongodb FAILED, header: %a , exception: %a', + self.headers, exc) + raise n6QueueProcessingException('save data in mongob FAILED') + else: + LOGGER.debug('Saving data, with meta key, done') + + def get_file(self, currdb, currcoll, **kw): + """Get file/s from mongo gridfs system. Not implemented.""" + pass + + +class DBarchiver(LegacyQueuedBase): + """ Archive data """ + input_queue = {"exchange": "raw", + "exchange_type": "topic", + "queue_name": "dba", + "binding_keys": ["#"] + } + + def __init__(self, *args, **kwargs): + self.manager = DbManager() + self.connectdb = self.manager.get_connection() + self.manager.initialize_index_store() # after call get_connection + self.connectdb.secondary_acceptable_latency_ms = 5000 # max latency for ping + super().__init__(*args, **kwargs) + + __count = itertools.count(1) + __tf = [] + def input_callback(self, routing_key, body, properties): + #t0 = time.time() + #try: + """ + Channel callback method. + + Args: + `routing_key` : routing_key from AMQP. + `body` : message body from AMQP. + `properties` : properties from AMQP. Required for the next processing + + Raises: + `n6QueueProcessingException`: + From JsonStream/FileGridfs or when message type is unknown. + Other exceptions (e.g. pymongo.errors.DuplicateKeyError). + """ + # Headers required for the next processing + if properties.headers is None: + properties.headers = {} + + # Suspend writing to Mongo if header is set to False + try: + writing = properties.headers['write_to_mongo'] + except KeyError: + writing = True + + LOGGER.debug("Received properties :%a", properties) + LOGGER.debug("Received headers :%a", properties.headers) + # set collection name + self.manager.currcoll = routing_key + + # Add to archive + if writing: + type_ = properties.type + payload = (body.encode('utf-8') if isinstance(body, str) + else body) + + if type_ == 'stream': + s = JsonStream(dbmanager=self.manager, properties=properties) + s.preparations_data(payload) + elif type_ == 'file': + s = FileGridfs(dbmanager=self.manager, properties=properties) + s.preparations_data(payload) + elif type_ == 'blacklist': + s = BlackListCompacter(dbmanager=self.manager, properties=properties) + s.preparations_data(payload) + s.start() + else: + raise n6QueueProcessingException( + "Unknown message type: {0}, source: {1}".format(type_, routing_key)) + #finally: + # self.__tf.append(time.time() - t0) + # if next(self.__count) % 5000 == 0: #spr. + # try: + # LOGGER.critical('ARCHIVE-RAW INPUT CALLBACK TIMES: min %s, avg %s', + # min(tf), + # math.fsum(tf) / len(tf)) + # finally: + # del tf[:] + + +class BlackListCompacter(MongoConnection): + """ + Performs a diff of a record (patches) to the database, the differences recovers file ORIGINAL + (saves space) + """ + generate_all_file = False + init = 1 + period = 14 + + def __init__(self, dbmanager=None, properties=None): + LOGGER.debug('run blacklist : collection: %a', + dbmanager.currcoll) + super().__init__(dbmanager=dbmanager, properties=properties) + self.list_tmp_files = [] + self.prefix = '.csv_' + self.suffix = 'bl-' + self.marker_db_init = 0 + self.marker_db_diff = 1 + self.prev_id = None + self.file_init = None + self.payload = None + self.dbm = dbmanager + # for backup msg + self.dbm.backup_msg_data = self.data + self.dbm.backup_msg_headers = self.headers + try: + self.dbm.get_conn_gridfs() + # name collection in gridfs is src.subsrc.files|chunks + self.collection = self.dbm.get_conn_collection().files + except UnicodeDecodeError as exc: + LOGGER.error("collection name or the database name is not allowed: %a, %a", + self.dbm.currcoll, exc) + raise + # create indexes + if self.collection.name not in self.dbm.indexes_store: + self.create_indexes(self.collection) + + def preparations_data(self, data): + """ + Data preparation. + + Args: + `data` : data from AMQP. + + Raises: + `n6QueueProcessingException` when except processing data. + """ + + try: + self.payload = data + self.init_files() + except Exception as exc: + LOGGER.error('exception when processing: %a %a %a (%a)', + self.dbm.currdb, self.dbm.currcoll, data, exc) + raise + + def init_files(self): + """Init all tmp files""" + self.tempfilefd_file_init, self.tempfile_file_init = tempfile.mkstemp(self.prefix, + self.suffix) + self.tempfilefd_file, self.tempfile_file = tempfile.mkstemp(self.prefix, + self.suffix) + self.tempfilefd_patch_all, self.tempfile_patch_all = tempfile.mkstemp(self.prefix, + self.suffix) + self.tempfilefd_patch, self.tempfile_patch = tempfile.mkstemp(self.prefix, self.suffix) + self.tempfilefd_patch_tmp, self.tempfile_patch_tmp = tempfile.mkstemp(self.prefix, + self.suffix) + self.tempfilefd_patch_u, self.tempfile_patch_u = tempfile.mkstemp(self.prefix, + self.suffix) + (self.tempfilefd_file_recovery_0, + self.tempfile_file_recovery_0) = tempfile.mkstemp(self.prefix, self.suffix) + self.tempfilefd_ver, self.tempfile_ver = tempfile.mkstemp(self.prefix, self.suffix) + + self.list_tmp_files.append((self.tempfilefd_file_init, self.tempfile_file_init)) + self.list_tmp_files.append((self.tempfilefd_file, self.tempfile_file)) + self.list_tmp_files.append((self.tempfilefd_patch_all, self.tempfile_patch_all)) + self.list_tmp_files.append((self.tempfilefd_patch_tmp, self.tempfile_patch_tmp)) + self.list_tmp_files.append((self.tempfilefd_patch_u, self.tempfile_patch_u)) + self.list_tmp_files.append((self.tempfilefd_file_recovery_0, + self.tempfile_file_recovery_0)) + self.list_tmp_files.append((self.tempfilefd_patch, self.tempfile_patch)) + self.list_tmp_files.append((self.tempfilefd_ver, self.tempfile_ver)) + + # save orig init file + with open_file(self.tempfile_file_init, 'wb') as fid: + LOGGER.debug('WTF: %a', type(self.payload)) + fid.write(self.payload) + self.file_init = self.tempfile_file_init + + for fd, fn in self.list_tmp_files: + os.close(fd) + os.chmod(fn, 0o644) + LOGGER.debug('run blacklist init tmp files') + + @safe_mongocall + def save_file_in_db(self, marker, data): + """ + Save file in DB + + Args: `marker` int, 0 - init file, 1,2,...,self.period - diff files + `data` file + + Return: None + + Raises: + `pymongo.errors.AutoReconnect` when problem with connection to mongo. + `n6QueueProcessingException` if catch other exception. + """ + # marker indicates the beginning of a sequence of file patch, length period. + # override these attr, results in a new sequence differences(diffs) + # override is very important ! + self.headers["meta"]["marker"] = marker + self.headers["meta"]["prev_id"] = self.prev_id + # for bakup_msg + self.data = data + self.dbm.backup_msg_data = self.data + self.dbm.backup_msg_headers = self.headers + try: + try: + self.dbm.put_file_to_db(data, **self.headers["meta"]) + except pymongo.errors.OperationFailure as exc: + if exc.code == INSUFFICIENT_DISK_SPACE_CODE: + sys.exit(ascii(exc)) + raise + except pymongo.errors.AutoReconnect as exc: + LOGGER.error('%a', exc) + raise + except Exception as exc: + LOGGER.error('save file in mongodb FAILED, header: %a , exception: %a', + self.headers, exc) + raise n6QueueProcessingException('save file in mongob FAILED') + else: + LOGGER.debug('save file in db marker: %a', marker) + + @safe_mongocall + def get_patches(self): + """ + Get patch from DB + + Args: None + + Return: first_file_id, cursor(with patch files without first init file) + """ + cursor = self.collection.find( + { + "marker": self.marker_db_init} + ).sort("received", pymongo.DESCENDING).limit(1) + + row = next(cursor) + date = row["received"] + first_file_id = row["_id"] + + cursor = self.collection.find( + { + "marker": {"$gte": self.marker_db_init}, + "received": {"$gte": date} + } + ).sort("received", pymongo.ASCENDING) + + LOGGER.debug('first_file_id :%s date: %s', first_file_id, date) + return first_file_id, cursor + + def save_diff_in_db(self, files): + """ + Saves Diff, used Unix features: diff. + + Args: `files` + + Return: None + """ + file1, file2 = files + f_sout = open_file(self.tempfile_patch_u, "w") + if BlackListCompacter.init: + BlackListCompacter.init = 0 + subprocess.call("diff -u " + file1 + " " + file2, + stdout=f_sout, stderr=subprocess.STDOUT, shell=True) + f_sout.close() + + self.save_file_in_db(self.marker_db_init, + open_file(self.tempfile_patch_u, 'rb').read()) + LOGGER.debug(' marker init in db:%s ', self.marker_db_init) + else: + subprocess.call("diff -u " + file1 + " " + + file2, stdout=f_sout, stderr=subprocess.STDOUT, shell=True) + f_sout.close() + + self.save_file_in_db(self.marker_db_diff, + open_file(self.tempfile_patch_u, 'rb').read()) + LOGGER.debug('marker in period in db :%s ', self.marker_db_diff) + + def generate_orig_file(self, cursor, file_id): + """ + Generates one or more files, patching one file to another. + Used Unix features: patch. + + Args: `cursor`: (with all the patch from one period) + `file_id`: first init file id to generate first patch + + Return: None + """ + LOGGER.debug('BlackListCompacter.GENERATE_ALL_FILE: %a', + BlackListCompacter.generate_all_file) + # generate first file + files_count = 1 + # stdout in file + f_sout = open_file(self.tempfile_patch_u, "w") + # first diff file post init in GENERATE_ALL_FILE mode + if cursor.count() > 0 and BlackListCompacter.generate_all_file: + out = subprocess.call("patch " + self.tempfile_file + " -i " + + self.tempfile_patch_tmp + " -o " + + self.tempfile_ver + str(files_count - 1), + stdout=f_sout, stderr=subprocess.STDOUT, shell=True) + LOGGER.debug('patch_next_file(return code): %a', out) + self.list_tmp_files.append((self.tempfile_ver, + self.tempfile_ver + + str(files_count - 1))) + os.chmod(self.tempfile_ver + str(files_count - 1), 0o644) + + # # first diff file post init in GENERATE_ONE_FILE mode + elif cursor.count() > 0 and (not BlackListCompacter.generate_all_file): + out = subprocess.call("patch " + self.tempfile_file + " -i " + + self.tempfile_patch_tmp + " -o " + + self.tempfile_file_recovery_0, + stdout=f_sout, stderr=subprocess.STDOUT, shell=True) + LOGGER.debug('patch_next_file(return code): %a', out) + + else: + file_db = self.dbm.get_file_from_db_raw(file_id) + patch_file = open_file(self.tempfile_patch_tmp, 'wb') + patch_file.write(file_db) + patch_file.close() + + out = subprocess.call("patch " + + self.tempfile_file_recovery_0 + " -i " + + self.tempfile_patch_tmp, stdout=f_sout, stderr=subprocess.STDOUT, shell=True) + LOGGER.debug('patch_first_file(return code): %a', out) + + for i in cursor: + id_dba = i["_id"] + # set prev id in current doc. + self.prev_id = id_dba + file_db = self.dbm.get_file_from_db_raw(id_dba) + patch_file = open_file(self.tempfile_patch_tmp, 'wb') + patch_file.write(file_db) + patch_file.close() + + # # gen. tmp all version files + self.list_tmp_files.append((self.tempfilefd_ver, + self.tempfile_ver + + str(files_count))) + + if BlackListCompacter.generate_all_file: + # # generate all partial files + out = subprocess.call("patch " + self.tempfile_ver + + str(files_count - 1) + " -i " + + self.tempfile_patch_tmp + " -o " + + self.tempfile_ver + str(files_count), + stdout=f_sout, stderr=subprocess.STDOUT, shell=True) + os.chmod(self.tempfile_ver + str(files_count), 0o644) + LOGGER.debug('patch_all_files(return code): %a', out) + + else: + out = subprocess.call( + "patch " + self.tempfile_file_recovery_0 + " -i " + self.tempfile_patch_tmp, + stdout=f_sout, stderr=subprocess.STDOUT, shell=True + ) + LOGGER.debug('patch(return code): %a', out) + + files_count += 1 + f_sout.close() + return self.tempfile_file_recovery_0 + + def start(self): + """Start BlackListCompacter.""" + LOGGER.debug('BlackListCompacter.GENERATE_ALL_FILE: %a', + BlackListCompacter.generate_all_file) + LOGGER.debug('BlackListCompacter.PERIOD: %a', BlackListCompacter.period) + LOGGER.debug('BlackListCompacter.INIT: %a', BlackListCompacter.init) + file_id = None + try: + file_id, cursor = self.get_patches() + LOGGER.debug('file_id:%a, cursor:%a:', file_id, cursor) + # count files orig + diffs + # cursor.count it is ok, if we count from zero(marker=0) + files_count = cursor.count() + + self.marker_db_diff = files_count + LOGGER.debug('files_count: %a', files_count) + except StopIteration as exc: + # first file + LOGGER.warning('First file, initialize: %a', exc) + BlackListCompacter.init = 1 + # init files, set marker = 0 + self.marker_db_diff = self.marker_db_init + # init file, set prev id = null + self.prev_id = None + + if file_id: # patch_start exist + if files_count <= BlackListCompacter.period: + # add new patch_diffs.txt in DB + BlackListCompacter.init = 0 + orig = self.generate_orig_file(cursor, file_id) + self.save_diff_in_db((orig, self.file_init)) + else: + # # generate new patch_start.txt, and save to DB + BlackListCompacter.init = 1 + self.save_diff_in_db((self.tempfile_file, self.file_init)) + else: + # failure to file patch_start.txt, initialize new cycle + BlackListCompacter.init = 1 + self.save_diff_in_db((self.tempfile_file, self.file_init)) + + self.cleanup() + + def cleanup(self): + """Cleanup all tmp files.""" + + for fd, fn in self.list_tmp_files: + if os.path.exists(fn): + os.remove(fn) + LOGGER.debug('cleanup tmp files') + + +def main(): + with logging_configured(): + t = DBarchiver() + try: + t.run() + except KeyboardInterrupt: + LOGGER.debug('SIGINT. waiting for ...') + t.stop() + except socket.timeout as exc: + # at the moment need to capture sys.exit tool for monitoring + LOGGER.critical('socket.timeout: %a', exc) + print(exc, file=sys.stderr) + sys.exit(1) + except socket.error as exc: + # at the moment need to capture sys.exit tool for monitoring + LOGGER.critical('socket.error: %a', exc) + print(exc, file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/N6DataPipeline/n6datapipeline/data/conf/00_pipeline.conf b/N6DataPipeline/n6datapipeline/data/conf/00_pipeline.conf index 9ffcabc..d1bef04 100644 --- a/N6DataPipeline/n6datapipeline/data/conf/00_pipeline.conf +++ b/N6DataPipeline/n6datapipeline/data/conf/00_pipeline.conf @@ -25,4 +25,4 @@ comparator = enriched filter = enriched, compared anonymizer = filtered recorder = filtered -counter= recorded +counter = recorded diff --git a/N6DataPipeline/n6datapipeline/tests/test_stub.py b/N6DataPipeline/n6datapipeline/tests/test_stub.py deleted file mode 100644 index 993c50b..0000000 --- a/N6DataPipeline/n6datapipeline/tests/test_stub.py +++ /dev/null @@ -1,2 +0,0 @@ -def test(): - assert True diff --git a/do_setup.py b/do_setup.py index ed5ac0b..748d222 100755 --- a/do_setup.py +++ b/do_setup.py @@ -188,12 +188,11 @@ def parse_arguments(): # **except** N6SDK (see below). if PY2: # (Python 2: always coerce `N6Lib` to `N6Lib-py2`) - if N6_LIB in arguments.components or N6_LIB_py2 in arguments.components: + if (not arguments.no_n6lib) or (N6_LIB in arguments.components + or N6_LIB_py2 in arguments.components): if N6_LIB in arguments.components: arguments.components.remove(N6_LIB) if N6_LIB_py2 in arguments.components: arguments.components.remove(N6_LIB_py2) arguments.components.insert(0, N6_LIB_py2) - elif not arguments.no_n6lib: - arguments.components.insert(0, N6_LIB_py2) else: if N6_LIB in arguments.components: arguments.components.remove(N6_LIB) @@ -204,12 +203,11 @@ def parse_arguments(): # * N6SDK, if needed, must the set up **before** any other components. if PY2: # (Python 2: always coerce `N6SDK` to `N6SDK-py2`) - if N6_SDK in arguments.components or N6_SDK_py2 in arguments.components: + if (not arguments.no_n6lib) or (N6_SDK in arguments.components + or N6_SDK_py2 in arguments.components): if N6_SDK in arguments.components: arguments.components.remove(N6_SDK) if N6_SDK_py2 in arguments.components: arguments.components.remove(N6_SDK_py2) arguments.components.insert(0, N6_SDK_py2) - elif not arguments.no_n6lib: - arguments.components.insert(0, N6_SDK_py2) else: if N6_SDK in arguments.components: arguments.components.remove(N6_SDK) diff --git a/docs/changelog.md b/docs/changelog.md new file mode 120000 index 0000000..04c99a5 --- /dev/null +++ b/docs/changelog.md @@ -0,0 +1 @@ +../CHANGELOG.md \ No newline at end of file diff --git a/docs/docker.md b/docs/docker.md index ab85831..da0487b 100644 --- a/docs/docker.md +++ b/docs/docker.md @@ -1,9 +1,9 @@ # Docker-Based Installation Guide -**TBD: this document needs an update regarding the stuff that now works -under Python 3.9; in particular, the current implementation of the *n6* -basic data pipeline now resides in `N6DataPipeline`, *not* in `N6Core` -(where the legacy Python-2.7 stuff resides).** +**TBD: this guide needs an update regarding the migration from Python +2.7 to 3.9; in particular, the current implementation of the *n6* basic +data pipeline now resides in `N6DataPipeline` (Python-3-only), *not* in +`N6Core` (where the legacy Python-2 stuff is kept).** This short guide describes how to run, for testing and exploration, the latest version of *n6* -- using the *Docker* and *Docker Compose* tools. diff --git a/docs/guides/api/streamapi.md b/docs/guides/api/streamapi.md new file mode 100644 index 0000000..cb27be7 --- /dev/null +++ b/docs/guides/api/streamapi.md @@ -0,0 +1,114 @@ +# n6 Stream API + +The stream API complements the REST API of the n6 platform. The stream API +allows to receive events asynchronously, near real-time. The JSON data format is +identical with the one used in the REST API (with a single exception: see next +sections). + +## Transport layer + +The stream API is based on STOMP (Simple Text Oriented Message Protocol) and +connections are authenticated via X.509 client certificates. +Address of the STOMP server: **n6stream.cert.pl:61614** + +Supported STOMP versions: 1.0, 1.1, 1.2. TLS is mandatory. We recommend to +use the most recent version of the protocol (1.2) and the OpenSSL cryptographic +library. + +To receive data from n6, the client must subscribe to an appropriate STOMP +destination. The client uses the destination header to define which of the available +events should be delivered through the connection. The format is as follows (ABNF +syntax): + +``` +destination = "destination:/exchange/" id "/" resource "." +category "." source "." source-detail +``` + +Meaning of the variables: + +* **id**: n6 client identifier (equals to the Organization field in the X.509 certificate) +* **resource**: analogous to the REST API resource, can take one of the following +values +* **inside**: events that occurred within the client’s network +* **threats**: data on threats relevant to the recipient, might not be present +in the client’s network (e.g. command and control servers) +* **category**: equal to value of the category field in events +* **source, source-detail**: name of the source of the information; it is split into +two components (a general group of sources and a specific feed) + +Except id, each of the variables can be substituted by an asterisk (*), which matches +any value. + +## Data format + +Each STOMP message corresponds to a single n6 event in JSON format. All +attributes described in the REST API documentation are available in the stream +API with identical semantics. + +Additionally, there is a **type** attribute that can take following values: + +* event: a single event +* bl-new: new blacklist entry +* bl-update: update of the expiration time for a blacklist entry +* bl-change: change of any attribute except expiration time for a blacklist entry +* bl-delist: removal of a blacklist entry + +## Examples + +### Example 1 + +Subscription to all available events for client `nask.pl` (no filtering): + +``` +SUBSCRIBE +destination:/exchange/nask.pl/*.*.*.* + +^@ +``` +Note: `ˆ@` is a terminal escape sequence for NULL (ASCII 0x00), which signals the +end of a STOMP frame. Common keyboard shortcut `Ctrl` + `Shift` + `2`. +Message from the server (lines wrapped for readability): + +``` +MESSAGE +destination:/exchange/clients/inside.bots.hidden.48 +message-id:Q_/exchange/nask.pl/inside.#@@session-FOUv4xFVkvfMtmK_4A@@1 +n6-client-id:nask.pl +persistent:1 +content-length:263 +{"category": "bots", "origin": "sinkhole", "confidence": "medium", +"name": "slenfbot", "address": [{"cc": "PL", "ip": "10.20.30.40", +"asn": 8308}], "source": "hidden.48", "time": "2015-08-28T09:32:05Z", +"type": "event", "id": "0f56ebba9129003dc6192e72eef50e70"} +``` + +### Example 2 + +STOMP *destination* used to receive only information about malware infections +(category "bots") within the protected network, regardless of the original data +source: + +``` +destination:/exchange/nask.pl/inside.bots.*.* +``` + +### Example 3 + +Connecting to the server using OpenSSL command line tools: + +``` +openssl s_client -cert [CLIENT CERTIFICATE] -key [PRIVATE KEY] \ +-CAfile [n6 CA BUNDLE] -host n6stream.cert.pl -port 61614 +``` +If you get no errors, than the TLS connection is working. This example can be +extended to create the most basic command line STOMP client: + +``` +(echo -e "SUBSCRIBE\ndestination:/exchange/CLIENT-ID/*.*.*.*\n\n\0"; \ +read) | openssl s_client -cert [CLIENT CERTIFICATE] -key [PRIVATE KEY] \ +-CAfile [n6 CA BUNDLE] -host n6stream.cert.pl -port 61614 +``` + +Note: the example above must be adapted to suit your client id and file paths. + diff --git a/docs/guides/new_source/index.md b/docs/guides/new_source/index.md index 84f37e5..c7c5fe9 100644 --- a/docs/guides/new_source/index.md +++ b/docs/guides/new_source/index.md @@ -2,7 +2,7 @@ **TBD: this guide needs an update regarding the stuff that now works under Python 3.9 and resides in `N6DataPipeline` and `N6DataSources` -(*not* in `N6Core` where the legacy Python-2.7 stuff resides).** +(*not* in `N6Core` where the legacy Python-2.7 stuff is kept).** The aim of this guide is to describe how to implement new *n6* components necessary for collecting and parsing data from some diff --git a/docs/installation/configuration.md b/docs/installation/configuration.md index 5ed61f1..3f5106f 100644 --- a/docs/installation/configuration.md +++ b/docs/installation/configuration.md @@ -1,12 +1,5 @@ # Configuration of _n6_ components -**TBD: the following description needs an update regarding the -stuff that now works under Python 3.9; in particular, the current -implementation of the *n6* basic data pipeline now resides in -`N6DataPipeline`, *not* in `N6Core` (where the legacy Python-2.7 -stuff resides).** - - ## Generating pipeline components' configuration files To create configuration files required for the _n6_ pipeline (`N6Core`) diff --git a/docs/installation/index.md b/docs/installation/index.md index 9aed4d1..ead0d64 100644 --- a/docs/installation/index.md +++ b/docs/installation/index.md @@ -1,10 +1,10 @@ # Step-by-Step Installation Guide: Introduction -**TBD: this guide needs an update regarding the stuff that now works -under Python 3.9, and the software it now depends on (including the OS); -notably, the current implementation of the *n6* basic data pipeline now -resides in `N6DataPipeline`, *not* in `N6Core` (where the legacy -Python-2.7 stuff resides).** +**TBD: this guide needs an update regarding the migration from Python +2.7 to 3.9 (as well as the related upgrades applied to other software in +use, including the OS). Notably, the current implementation of the *n6* +basic data pipeline resides in `N6DataPipeline` (Python-3-only), *not* in +`N6Core` (where the legacy Python-2 stuff is kept).** The goal of this guide is to give you an example of how you can glue the relevant elements together in a (relatively) easy way, so that you can diff --git a/docs/installation/n6_core.md b/docs/installation/n6_core.md index 4c6e25c..7163527 100644 --- a/docs/installation/n6_core.md +++ b/docs/installation/n6_core.md @@ -1,12 +1,5 @@ # Installation of _n6_ components -**TBD: the following description needs an update regarding the -stuff that now works under Python 3.9; in particular, the current -implementation of the *n6* basic data pipeline now resides in -`N6DataPipeline`, *not* in `N6Core` (where the legacy Python-2.7 -stuff resides).** - - First, install `git` and change the shell user to `dataman`: ```bash diff --git a/docs/installation/n6_web.md b/docs/installation/n6_web.md index 7ece9fd..fdd09bf 100644 --- a/docs/installation/n6_web.md +++ b/docs/installation/n6_web.md @@ -1,8 +1,5 @@ # HTTP Services -**TBD: the following description needs an update regarding the -components that now work under Python 3.9.** - > **Note**: to complete any of the steps described below you need to have: > > * installed the relevant *n6* component(s); see: section [Installation of n6 components](n6_core.md) diff --git a/docs/installation/system.md b/docs/installation/system.md index 0984bcf..320e108 100644 --- a/docs/installation/system.md +++ b/docs/installation/system.md @@ -1,11 +1,5 @@ # System Preparation -**TBD: the following description needs an update regarding the stuff -that now works under Python 3.9, and the software it now depends on -(including the OS); notably, the current implementation of the *n6* -basic data pipeline now resides in `N6DataPipeline`, *not* in `N6Core` -(where the legacy Python-2.7 stuff resides).** - ## RabbitMQ RabbitMQ is an open source message broker software (sometimes called message-oriented middleware) diff --git a/mkdocs.yml b/mkdocs.yml index cc7fce9..cf9f5bd 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -7,6 +7,7 @@ repo_url: "https://github.com/CERT-Polska/n6" nav: - Home: index.md + - Changelog: changelog.md - n6 Architecture and Data Flow Overview: data_flow_overview.md - Docker-Based Installation Guide: docker.md - Step-by-Step Installation Guide: @@ -17,7 +18,7 @@ nav: - Certificates: installation/certs.md - HTTP Services: installation/n6_web.md - Supervisor: installation/supervisord.md - - Data Flow: installation/examining_data_flow.md + - Examining the Data Flow: installation/examining_data_flow.md - Running IntelMQ Bots in the n6 Pipeline: - Introduction: guides/intelmq/index.md - Installation and Configuration: guides/intelmq/config.md @@ -36,6 +37,8 @@ nav: - Parser Base Classes: guides/new_source/parsers/baseclasses.md - High-Frequency Data Sources: guides/new_source/parsers/hifreq.md - Parser Tests: guides/new_source/parsers/testing.md + - API: + - Stream API: guides/api/streamapi.md theme: readthedocs markdown_extensions: diff --git a/test_do_setup.py b/test_do_setup.py index f30b116..f61670b 100755 --- a/test_do_setup.py +++ b/test_do_setup.py @@ -29,12 +29,12 @@ class Case(namedtuple('Case', 'input, expected, py')): def __new__(cls, input, expected, py=None): - return super(Case, cls).__new__(cls, input, expected, py) + return super(Case, cls).__new__(cls, input, expected, py) # noqa class Call(namedtuple('Call', 'name, args, kwargs')): - def __new__(cls, name, *args, **kwargs): - return super(Call, cls).__new__(cls, name, args, kwargs) + def __new__(cls, __name, *args, **kwargs): + return super(Call, cls).__new__(cls, __name, args, kwargs) # noqa class PrototypeDict(dict): @@ -134,7 +134,7 @@ def test__seq(self): def test__iterator(self): iterator = iter(self.INPUT_SEQ) - self._test(iter(iterator)) + self._test(iterator) @using_template_and_cases