Skip to content

Commit

Permalink
Merge pull request #10 from LamaAni/add_support_for_file_paths_and_se…
Browse files Browse the repository at this point in the history
…cret_mask

Add support for file paths and secret mask
  • Loading branch information
LamaAni authored Apr 19, 2023
2 parents 0388e79 + ead3fc8 commit 2fea17f
Show file tree
Hide file tree
Showing 17 changed files with 167 additions and 177 deletions.
8 changes: 8 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[flake8]
ignore = E203, W503
max-line-length = 120
exclude =
.*
build/*
dist/*
*.egg-info/*
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
.local/
logs/
.vscode/
Pipefile.lock
.devcontainer/
.devcontainer
Pipfile.lock

# ignore the test airflow db.
tests/airflow.db
Expand Down
6 changes: 4 additions & 2 deletions Pipfile
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
zthreading=">=0.1.15"
zthreading = ">=0.1.15"

[dev-packages]
apache-airflow = ">=2.5.3"
pytest = "*"

[requires]
python_version = "3.8"
56 changes: 28 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ Add to airflow.cfg,
logging_config_class = airflow_db_logger.LOGGING_CONFIG

[db_logger]
SQL_ALCHEMY_CONN=
SQL_ALCHEMY_SCHEMA=
SQL_ALCHEMY_POOL_ENABLED=True
SQL_ALCHEMY_POOL_SIZE=5
SQL_ALCHEMY_MAX_OVERFLOW=1
SQL_ALCHEMY_POOL_RECYCLE=1800
SQL_ALCHEMY_POOL_PRE_PING=True
SQL_ENGINE_ENCODING=utf-8
sql_alchemy_conn =
sql_alchemy_schema =
sql_alchemy_pool_enabled = true
sql_alchemy_pool_size = 5
sql_alchemy_max_overflow = 1
sql_alchemy_pool_recycle = 1800
sql_alchemy_pool_pre_ping = true
sql_engine_encoding = utf-8
```

Or use the airflow builtin envs,
Expand All @@ -62,27 +62,27 @@ configuration of apply these values using envs, like so,

| section | description | type/values | default |
| ---------------------------------------- | --------------------------------------------------------- | ----------- | --------------------------- |
| [db_logger].`SQL_ALCHEMY_CONN` | The sqlalchemy connection string | `string` | [core].`SQL_ALCHEMY_CONN` |
| [db_logger].`SQL_ALCHEMY_CONN_ARGS` | The sqlalchemy connection args | `string` | None |
| [db_logger].`SQL_ALCHEMY_SCHEMA` | The schema where to put the logging tables. | `string` | [core].`SQL_ALCHEMY_SCHEMA` |
| [db_logger].`SQL_ALCHEMY_POOL_ENABLED` | If true enable sql alchemy pool | `boolean` | True |
| [db_logger].`SQL_ALCHEMY_POOL_SIZE` | The size of the sqlalchemy pool. | `int` | 5 |
| [db_logger].`SQL_ALCHEMY_MAX_OVERFLOW` | The max overflow for sqlalchemy | `int` | 1 |
| [db_logger].`SQL_ALCHEMY_POOL_RECYCLE` | The pool recycle time | `int` | 1800 |
| [db_logger].`SQL_ALCHEMY_POOL_PRE_PING` | If true, do a ping at the connection start. | `boolean` | true |
| [db_logger].`SQL_ENGINE_ENCODING` | THe encoding for the sql engine | `string` | utf-8 |
| [db_logger].`sql_alchemy_conn` | The sqlalchemy connection string | `string` | [core].`sql_alchemy_conn` |
| [db_logger].`sql_alchemy_conn_args` | The sqlalchemy connection args | `string` | None |
| [db_logger].`sql_alchemy_schema` | The schema where to put the logging tables. | `string` | [core].`sql_alchemy_schema` |
| [db_logger].`sql_alchemy_pool_enabled` | If true enable sql alchemy pool | `boolean` | True |
| [db_logger].`sql_alchemy_pool_size` | The size of the sqlalchemy pool. | `int` | 5 |
| [db_logger].`sql_alchemy_max_overflow` | The max overflow for sqlalchemy | `int` | 1 |
| [db_logger].`sql_alchemy_pool_recycle` | The pool recycle time | `int` | 1800 |
| [db_logger].`sql_alchemy_pool_pre_ping` | If true, do a ping at the connection start. | `boolean` | true |
| [db_logger].`sql_engine_encoding` | THe encoding for the sql engine | `string` | utf-8 |
| | | |
| [db_logger].`SHOW_REVERSE_ORDER` | Show logs in reverse order in airflow log ui | bool | false |
| [db_logger].`CREATE_INDEXES` | If true create db indexis | bool | false |
| [db_logger].`GOOGLE_APP_CREDS_PATH` | The credentials file path for google bucket writing (gcs) | `string` | None |
| [db_logger].`WRITE_TO_GCS_BUCKET` | The gcs bucket to write to | `string` | None |
| [db_logger].`WRITE_TO_GCS_PROJECT_ID` | The gcs project to write to | `string` | None |
| [db_logger].`WRITE_TO_FILES` | If true, writes the log also to files | false | None |
| [db_logger].`WRITE_TO_SHELL` | Output the logs to shell as well | false | None |
| [db_logger].`WRITE_DAG_PROCESSING_TO_DB` | Write all dag processing to database (a lot) | `string` | utf-8 |
| [db_logger].`CONSOLE_FORMATTER` | the formatter to use for teh console | `string` | airflow_coloured |
| [db_logger].`TASK_FORMATTER` | the formatter to use for the task | `string` | airflow |
| [db_logger].`PROCESSER_LOG_LEVEL` | The log level to use for dag processing | `string` | "WARN" |
| [db_logger].`show_reverse_order` | Show logs in reverse order in airflow log ui | bool | false |
| [db_logger].`create_indexes` | If true create db indexis | bool | false |
| [db_logger].`google_app_creds_path` | The credentials file path for google bucket writing (gcs) | `string` | None |
| [db_logger].`write_to_gcs_bucket` | The gcs bucket to write to | `string` | None |
| [db_logger].`write_to_gcs_project_id` | The gcs project to write to | `string` | None |
| [db_logger].`write_to_files` | If true, writes the log also to files | false | None |
| [db_logger].`write_to_shell` | Output the logs to shell as well | false | None |
| [db_logger].`write_dag_processing_to_db` | Write all dag processing to database (a lot) | `string` | utf-8 |
| [db_logger].`console_formatter` | the formatter to use for teh console | `string` | airflow_coloured |
| [db_logger].`task_formatter` | the formatter to use for the task | `string` | airflow |
| [db_logger].`processer_log_level` | The log level to use for dag processing | `string` | "WARN" |

# Maintenance

Expand Down
77 changes: 28 additions & 49 deletions airflow_db_logger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,53 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
__title__ = "airflow_db_logger"
__author__ = "Zav Shotan"

import sys
import os
from copy import deepcopy
import airflow_db_logger.consts as consts


LOGGING_CONFIG = consts.DB_LOGGER_LOGGING_CONFIG
AIRFLOW_DEFAULT_LOGGING_CONFIG = consts.get_default_loggin_config()

from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow_db_logger.config import ( # noqa
check_cli_for_init_db, # noqa: E402
DB_LOGGER_TASK_FORMATTER, # noqa: E402
DB_LOGGER_CONSOLE_FORMATTER, # noqa: E402
DB_LOGGER_WRITE_DAG_PROCESSING_TO_DB, # noqa: E402
DB_LOGGER_PROCESSER_LOG_LEVEL,
airflow_db_logger_log,
)


def update_config_from_defaults():
consts.IS_DB_LOGGER_LOADING_CONFIG

if consts.IS_DB_LOGGER_LOADING_CONFIG is True:
return

# Remove any other loads.
try:
consts.IS_DB_LOGGER_LOADING_CONFIG = True
def create_logging_config():
config = deepcopy(DEFAULT_LOGGING_CONFIG)
processor_handler_config = {
"class": "airflow_db_logger.handlers.StreamHandler",
"formatter": DB_LOGGER_CONSOLE_FORMATTER,
"level": DB_LOGGER_PROCESSER_LOG_LEVEL,
}

if DB_LOGGER_WRITE_DAG_PROCESSING_TO_DB:
processor_handler_config = {
"class": "airflow_db_logger.handlers.StreamHandler",
"class": "airflow_db_logger.handlers.DBProcessLogHandler",
"formatter": DB_LOGGER_CONSOLE_FORMATTER,
"level": DB_LOGGER_PROCESSER_LOG_LEVEL,
}

if DB_LOGGER_WRITE_DAG_PROCESSING_TO_DB:
processor_handler_config = {
"class": "airflow_db_logger.handlers.DBProcessLogHandler",
"formatter": DB_LOGGER_CONSOLE_FORMATTER,
"level": DB_LOGGER_PROCESSER_LOG_LEVEL,
}

LOGGING_CONFIG.update(deepcopy(AIRFLOW_DEFAULT_LOGGING_CONFIG))
LOGGING_CONFIG["handlers"] = {
"console": {
"class": "airflow_db_logger.handlers.StreamHandler",
"formatter": DB_LOGGER_CONSOLE_FORMATTER,
},
"task": {
"class": "airflow_db_logger.handlers.DBTaskLogHandler",
"formatter": DB_LOGGER_TASK_FORMATTER,
},
"processor": processor_handler_config,
}
config["handlers"] = {
"console": {
"class": "airflow_db_logger.handlers.StreamHandler",
"formatter": DB_LOGGER_CONSOLE_FORMATTER,
},
"task": {
"class": "airflow_db_logger.handlers.DBTaskLogHandler",
"formatter": DB_LOGGER_TASK_FORMATTER,
},
"processor": processor_handler_config,
}
return config

loggers = LOGGING_CONFIG.get("loggers", {})

# for logger_name in loggers.keys():
# loggers[logger_name]["level"] = LOG_LEVEL
LOGGING_CONFIG = create_logging_config()

# Checking for database initialization
check_cli_for_init_db()
# Checking for database initialization
check_cli_for_init_db()

finally:
consts.IS_DB_LOGGER_LOADING_CONFIG = False

if __name__ == "__main__":
import json

update_config_from_defaults()
# airflow_db_logger_log.info("airflow_db_logger initialized")
print(json.dumps(LOGGING_CONFIG, indent=2))
2 changes: 1 addition & 1 deletion airflow_db_logger/airflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import airflow
from datetime import datetime
from airflow import settings
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python import PythonOperator
from sqlalchemy.orm import Session, Query

from airflow_db_logger.config import DBLoggerSession
Expand Down
49 changes: 28 additions & 21 deletions airflow_db_logger/config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import sys
import os
import logging
import warnings
import colorlog
from typing import Union, List
from typing import Type
Expand Down Expand Up @@ -30,26 +29,32 @@ def conf_get_no_warnings_no_errors(*args, **kwargs):
return val


def __add_core(*collections):
if AIRFLOW_MAJOR_VERSION < 1:
collections = ["core", *collections]
return collections


def get(
key: str,
default=None,
otype: Type = None,
allow_empty: bool = False,
collection: Union[str, List[str]] = None,
):
collection = collection or "db_logger"
collection = collection if isinstance(collection, list) else [collection]
otype = otype or str if default is None else default.__class__
collection = collection or AIRFLOW_CONFIG_SECTION_NAME
if isinstance(collection, str):
collection = [collection]
assert all(isinstance(v, str) for v in collection), ValueError("Collection must be a string or a list of strings")
collection = [v for v in collection if len(v.strip()) > 0]
assert len(collection) > 0, ValueError("Collection must be a non empty string or list of non empty strings")

otype = otype or str if default is None else default.__class__
for col in collection:
val = conf_get_no_warnings_no_errors(col, key)
if val is not None:
break

assert all([isinstance(v, str) for v in collection]), AirflowConfigException(
"Collection must be a non empty string or a collection of non empty strings"
)

if issubclass(otype, Enum):
allow_empty = False

Expand All @@ -73,21 +78,21 @@ def get(


# Loading airflow parameters
LOG_LEVEL = get(collection=["logging", "core"], key="logging_level").upper()
FILENAME_TEMPLATE = get(collection=["logging", "core"], key="LOG_FILENAME_TEMPLATE")
LOG_LEVEL = get(collection=__add_core("logging"), key="logging_level").upper()
FILENAME_TEMPLATE = get(collection=__add_core("logging"), key="LOG_FILENAME_TEMPLATE")
AIRFLOW_EXECUTOR = get(collection="core", key="executor")
IS_RUNNING_DEBUG_EXECUTOR = AIRFLOW_EXECUTOR == "DebugExecutor"
IS_USING_COLORED_CONSOLE = get(collection=["logging", "core"], key="colored_console_log").lower() == "true"
IS_USING_COLORED_CONSOLE = get(collection=__add_core("logging"), key="colored_console_log").lower() == "true"
DAGS_FOLDER = os.path.expanduser(get(collection="core", key="dags_folder"))
BASE_LOG_FOLDER = os.path.expanduser(get(collection=["logging", "core"], key="base_log_folder"))
BASE_LOG_FOLDER = os.path.expanduser(get(collection=__add_core("logging"), key="base_log_folder"))

# Loading sql parameters
SQL_ALCHEMY_CONN = get(collection=["core", "database"], key="sql_alchemy_conn", allow_empty=False)
SQL_ALCHEMY_SCHEMA = get(collection=["core", "database"], key="sql_alchemy_schema", allow_empty=True)
SQL_ALCHEMY_CONN = get(collection=__add_core("database"), key="sql_alchemy_conn", allow_empty=False)
SQL_ALCHEMY_SCHEMA = get(collection=__add_core("database"), key="sql_alchemy_schema", allow_empty=True)

TASK_LOG_FILENAME_TEMPLATE = (
get(
collection="core",
collection=__add_core("logging"),
key="LOG_FILENAME_TEMPLATE",
default="{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log",
)
Expand All @@ -96,7 +101,7 @@ def get(
)
PROCESS_LOG_FILENAME_TEMPLATE = (
get(
collection="core",
collection=__add_core("logging"),
key="log_processor_filename_template",
default="{{ filename }}.log",
)
Expand All @@ -117,7 +122,7 @@ def get(
DB_LOGGER_SQL_ALCHEMY_MAX_OVERFLOW = get("sql_alchemy_max_overflow", 1)
DB_LOGGER_SQL_ALCHEMY_POOL_RECYCLE = get("sql_alchemy_pool_recycle", 1800)
DB_LOGGER_SQL_ALCHEMY_POOL_PRE_PING = get("sql_alchemy_pool_pre_ping", True)
DB_LOGGER_SQL_ENGINE_ENCODING = get("sql_engine_encoding", "utf-8")
DB_LOGGER_SQL_ENGINE_ENCODING = get("sql_engine_encoding", None, allow_empty=True)

DB_LOGGER_GOOGLE_APP_CREDS_PATH = get("google_application_credentials", default=None, allow_empty=True, otype=str)
# A bucket path, requires google-cloud-storage to be installed.
Expand All @@ -142,7 +147,6 @@ def get(


def create_db_logger_sqlalchemy_engine():

# Configuring the db_logger sql engine.
engine_args = {}
if DB_LOGGER_SQL_ALCHEMY_POOL_ENABLED:
Expand Down Expand Up @@ -203,9 +207,12 @@ def create_db_logger_sqlalchemy_engine():
# Allow the user to specify an encoding for their DB otherwise default
# to utf-8 so jobs & users with non-latin1 characters can still use
# us.
engine_args["encoding"] = DB_LOGGER_SQL_ENGINE_ENCODING
# For Python2 we get back a newstr and need a str
engine_args["encoding"] = engine_args["encoding"].__str__()
if DB_LOGGER_SQL_ENGINE_ENCODING:
engine_args["encoding"] = DB_LOGGER_SQL_ENGINE_ENCODING

# DEPRECATED:
# # For Python2 we get back a newstr and need a str
# engine_args["encoding"] = engine_args["encoding"].__str__()

return create_engine(DB_LOGGER_SQL_ALCHEMY_CONNECTION, **engine_args)

Expand Down
37 changes: 0 additions & 37 deletions airflow_db_logger/consts.py
Original file line number Diff line number Diff line change
@@ -1,38 +1 @@
import os
from copy import deepcopy
from airflow_db_logger.shell_logging_config import create_shell_logging_config

global IS_DB_LOGGER_LOADING_CONFIG
IS_DB_LOGGER_LOADING_CONFIG = False

global DB_LOGGER_LOGGING_CONFIG
DB_LOGGER_LOGGING_CONFIG = create_shell_logging_config()


def get_default_loggin_config():
from airflow.version import version as AIRFLOW_VERSION

AIRFLOW_VERSION_PARTS = AIRFLOW_VERSION.split(".")
AIRFLOW_VERSION_PARTS = [int(v) for v in AIRFLOW_VERSION_PARTS]

AIRFLOW_MAJOR_VERSION = AIRFLOW_VERSION_PARTS[0]

"""Returns the airflow default logging config from the settings.
Start the ariflow system. settings.initialize should be called if the logging configuration is to be reset?
"""

config_env_name = (
"AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS" if AIRFLOW_MAJOR_VERSION > 1 else "AIRFLOW__CORE__LOGGING_CONFIG_CLASS"
)

action_logging_config_env = os.environ.get(config_env_name, None)

os.environ[config_env_name] = "airflow_db_logger.shell_logging_config.SIMPLE_LOGGING_CONFIG"

from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG # noqa

if action_logging_config_env is not None:
os.environ[config_env_name] = action_logging_config_env

return DEFAULT_LOGGING_CONFIG
Loading

0 comments on commit 2fea17f

Please sign in to comment.