Skip to content

Commit

Permalink
Automatic cleanup jobs (#1052)
Browse files Browse the repository at this point in the history
  • Loading branch information
achantavy authored Feb 1, 2023
1 parent c120a4a commit e320232
Show file tree
Hide file tree
Showing 13 changed files with 832 additions and 34 deletions.
10 changes: 0 additions & 10 deletions cartography/data/jobs/cleanup/aws_import_emr_cleanup.json

This file was deleted.

180 changes: 180 additions & 0 deletions cartography/graph/cleanupbuilder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
from dataclasses import asdict
from string import Template
from typing import List
from typing import Optional
from typing import Set

from cartography.graph.model import CartographyNodeSchema
from cartography.graph.model import CartographyRelSchema
from cartography.graph.model import LinkDirection
from cartography.graph.model import TargetNodeMatcher
from cartography.graph.querybuilder import _build_match_clause
from cartography.graph.querybuilder import filter_selected_relationships
from cartography.graph.querybuilder import rel_present_on_node_schema


def build_cleanup_queries(
node_schema: CartographyNodeSchema,
selected_rels: Optional[Set[CartographyRelSchema]] = None,
) -> List[str]:
"""
Generates queries to clean up stale nodes and relationships from the given CartographyNodeSchema.
:param node_schema: The given CartographyNodeSchema to generate cleanup queries for.
:param selected_rels: Optional. If specified, only generate cleanup queries where the `node_schema` is bound to this
given set of selected relationships. Raises an exception if any of the rels in `selected_rels` aren't actually
defined on the `node_schema`.
If `selected_rels` is not specified (default), we generate cleanup queries against all relationships defined on the
`node_schema`.
:return: A list of Neo4j queries to clean up nodes and relationships. Order matters: we always clean up the sub
resource relationship last because we only clean up stale nodes and rels that are associated with a given sub
resource, so if we delete the sub resource first then we will not be able to reach the stale nodes and rels, thus
leaving orphaned objects behind.
Note also that we return the empty list if the node_schema has no relationships. Doing cleanups of nodes without
relationships can be resource expensive for a large graph, and you might risk deleting unintended objects. Please
write a manual cleanup job if you wish to do this.
"""
other_rels = node_schema.other_relationships
sub_resource_rel = node_schema.sub_resource_relationship

if selected_rels:
# Ensure that the selected rels actually exist on the node_schema
sub_resource_rel, other_rels = filter_selected_relationships(node_schema, selected_rels)

if not sub_resource_rel:
raise ValueError(
"Auto-creating a cleanup job for a node_schema without a sub resource relationship is not supported. "
f'Please check the class definition of "{node_schema.__class__.__name__}". If the optional `selected_rels` '
'param was specified to build_cleanup_queries(), then ensure that the sub resource relationship is '
'present.',
)

result = []
if other_rels:
for rel in other_rels.rels:
result.extend(_build_cleanup_node_and_rel_queries(node_schema, rel))

# Make sure that the sub resource cleanup job is last in the list; order matters.
result.extend(_build_cleanup_node_and_rel_queries(node_schema, sub_resource_rel))
# Note that auto-cleanups for a node with no relationships does not happen at all - we don't support it.
return result


def _build_cleanup_node_and_rel_queries(
node_schema: CartographyNodeSchema,
selected_relationship: CartographyRelSchema,
) -> List[str]:
"""
Private function that performs the main string template logic for generating cleanup node and relationship queries.
:param node_schema: The given CartographyNodeSchema to generate cleanup queries for.
:param selected_relationship: Determines what relationship on the node_schema to build cleanup queries for.
selected_relationship must be in the set {node_schema.sub_resource_relationship} + node_schema.other_relationships.
:return: A list of 2 cleanup queries. The first one cleans up stale nodes attached to the given
selected_relationships, and the second one cleans up stale selected_relationships. For example outputs, see
tests.unit.cartography.graph.test_cleanupbuilder.
"""
if not node_schema.sub_resource_relationship:
raise ValueError(
f"_build_cleanup_node_query() failed: '{node_schema.label}' does not have a sub_resource_relationship "
"defined, so we cannot generate a query to clean it up. Please verify that the class definition is what "
"you expect.",
)
if not rel_present_on_node_schema(node_schema, selected_relationship):
raise ValueError(
f"_build_cleanup_node_query(): Attempted to build cleanup query for node '{node_schema.label}' and "
f"relationship {selected_relationship.rel_label} but that relationship is not present on the node. Please "
"verify the node class definition for the relationships that it has.",
)

# Draw sub resource rel with correct direction
if node_schema.sub_resource_relationship.direction == LinkDirection.INWARD:
sub_resource_link_template = Template("<-[s:$SubResourceRelLabel]-")
else:
sub_resource_link_template = Template("-[s:$SubResourceRelLabel]->")
sub_resource_link = sub_resource_link_template.safe_substitute(
SubResourceRelLabel=node_schema.sub_resource_relationship.rel_label,
)

# The cleanup node query must always be before the cleanup rel query
delete_action_clauses = [
"""
WHERE n.lastupdated <> $UPDATE_TAG
WITH n LIMIT $LIMIT_SIZE
DETACH DELETE n;
""",
]
# Now clean up the relationships
if selected_relationship == node_schema.sub_resource_relationship:
_validate_target_node_matcher_for_cleanup_job(node_schema.sub_resource_relationship.target_node_matcher)
delete_action_clauses.append(
"""
WHERE s.lastupdated <> $UPDATE_TAG
WITH s LIMIT $LIMIT_SIZE
DELETE s;
""",
)
else:
delete_action_clauses.append(
"""
WHERE r.lastupdated <> $UPDATE_TAG
WITH r LIMIT $LIMIT_SIZE
DELETE r;
""",
)

# Ensure the node is attached to the sub resource and delete the node
query_template = Template(
"""
MATCH (n:$node_label)$sub_resource_link(:$sub_resource_label{$match_sub_res_clause})
$selected_rel_clause
$delete_action_clause
""",
)
return [
query_template.safe_substitute(
node_label=node_schema.label,
sub_resource_link=sub_resource_link,
sub_resource_label=node_schema.sub_resource_relationship.target_node_label,
match_sub_res_clause=_build_match_clause(node_schema.sub_resource_relationship.target_node_matcher),
selected_rel_clause=(
"" if selected_relationship == node_schema.sub_resource_relationship
else _build_selected_rel_clause(selected_relationship)
),
delete_action_clause=delete_action_clause,
) for delete_action_clause in delete_action_clauses
]


def _build_selected_rel_clause(selected_relationship: CartographyRelSchema) -> str:
"""
Draw selected relationship with correct direction. Returns a string that looks like either
MATCH (n)<-[r:$SelectedRelLabel]-(:$other_node_label) or
MATCH (n)-[r:$SelectedRelLabel]->(:$other_node_label)
"""
if selected_relationship.direction == LinkDirection.INWARD:
selected_rel_template = Template("<-[r:$SelectedRelLabel]-")
else:
selected_rel_template = Template("-[r:$SelectedRelLabel]->")
selected_rel = selected_rel_template.safe_substitute(SelectedRelLabel=selected_relationship.rel_label)
selected_rel_clause_template = Template("""MATCH (n)$selected_rel(:$other_node_label)""")
selected_rel_clause = selected_rel_clause_template.safe_substitute(
selected_rel=selected_rel,
other_node_label=selected_relationship.target_node_label,
)
return selected_rel_clause


def _validate_target_node_matcher_for_cleanup_job(tgm: TargetNodeMatcher):
"""
Raises ValueError if a single PropertyRef in the given TargetNodeMatcher does not have set_in_kwargs=True.
Auto cleanups require the sub resource target node matcher to have set_in_kwargs=True because the GraphJob
class injects the sub resource id via a query kwarg parameter. See GraphJob and GraphStatement classes.
This is a private function meant only to be called when we clean up the sub resource relationship.
"""
tgm_asdict = asdict(tgm)

for key, prop_ref in tgm_asdict.items():
if not prop_ref.set_in_kwargs:
raise ValueError(
f"TargetNodeMatcher PropertyRefs in the sub_resource_relationship must have set_in_kwargs=True. "
f"{key} has set_in_kwargs=False, please check.",
)
75 changes: 74 additions & 1 deletion cartography/graph/job.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,59 @@
import json
import logging
import string
from pathlib import Path
from string import Template
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Set
from typing import Union

import neo4j

from cartography.graph.cleanupbuilder import build_cleanup_queries
from cartography.graph.model import CartographyNodeSchema
from cartography.graph.model import CartographyRelSchema
from cartography.graph.statement import get_job_shortname
from cartography.graph.statement import GraphStatement


logger = logging.getLogger(__name__)


def _get_identifiers(template: string.Template) -> List[str]:
"""
:param template: A string Template
:return: the variable names that start with a '$' like $this in the given Template.
Stolen from https://github.com/python/cpython/issues/90465#issuecomment-1093941790.
TODO we can get rid of this and use template.get_identifiers() once we are on python 3.11
"""
return list(
set(
filter(
lambda v: v is not None,
(
mo.group('named') or mo.group('braced')
for mo in template.pattern.finditer(template.template)
),
),
),
)


def get_parameters(queries: List[str]) -> Set[str]:
"""
:param queries: A list of Neo4j queries with parameters indicated by leading '$' like $this.
:return: The set of all parameters across all given Neo4j queries.
"""
parameter_set = set()
for query in queries:
as_template = Template(query)
params = _get_identifiers(as_template)
parameter_set.update(params)
return parameter_set


class GraphJobJSONEncoder(json.JSONEncoder):
"""
Support JSON serialization for GraphJob instances.
Expand Down Expand Up @@ -86,6 +124,41 @@ def from_json(cls, blob: str, short_name: Optional[str] = None) -> 'GraphJob':
name = data["name"]
return cls(name, statements, short_name)

@classmethod
def from_node_schema(
cls,
node_schema: CartographyNodeSchema,
parameters: Dict[str, Any],
selected_rels: Optional[Set[CartographyRelSchema]] = None,
) -> 'GraphJob':
"""
Create a cleanup job from a CartographyNodeSchema object.
For a given node, the fields used in the node_schema.sub_resource_relationship.target_node_node_matcher.keys()
must be provided as keys and values in the params dict.
"""
queries: List[str] = build_cleanup_queries(node_schema, selected_rels)

# Validate params
expected_param_keys: Set[str] = get_parameters(queries)
actual_param_keys: Set[str] = set(parameters.keys())
# Hacky, but LIMIT_SIZE is specified by default in cartography.graph.statement, so we exclude it from validation
actual_param_keys.add('LIMIT_SIZE')
if actual_param_keys != expected_param_keys:
raise ValueError(
f'Expected query params "{expected_param_keys}" but got "{actual_param_keys}". Please check the value '
f'passed to `parameters`.',
)

statements: List[GraphStatement] = [
GraphStatement(query, parameters=parameters, iterative=True, iterationsize=100) for query in queries
]

return cls(
f"Cleanup {node_schema.label}",
statements,
node_schema.label,
)

@classmethod
def from_json_file(cls, file_path: Union[str, Path]) -> 'GraphJob':
"""
Expand Down
6 changes: 3 additions & 3 deletions cartography/graph/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,16 @@ class PropertyRef:
(PropertyRef.set_in_kwargs=True).
"""

def __init__(self, name: str, set_in_kwargs=False):
def __init__(self, name: str, set_in_kwargs: bool = False):
"""
:param name: The name of the property
:param set_in_kwargs: Optional. If True, the property is not defined on the data dict, and we expect to find the
property in the kwargs.
If False, looks for the property in the data dict.
Defaults to False.
"""
self.name = name
self.set_in_kwargs = set_in_kwargs
self.name: str = name
self.set_in_kwargs: bool = set_in_kwargs

def _parameterize_name(self) -> str:
return f"${self.name}"
Expand Down
23 changes: 18 additions & 5 deletions cartography/graph/querybuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,20 @@ def _build_attach_relationships_statement(
return query_template.safe_substitute(attach_relationships_statement=attach_relationships_statement)


def _filter_selected_relationships(
def rel_present_on_node_schema(
node_schema: CartographyNodeSchema,
rel_schema: CartographyRelSchema,
) -> bool:
"""
Answers the question: is the given rel_schema is present on the given node_schema?
"""
sub_res_rel, other_rels = filter_selected_relationships(node_schema, {rel_schema})
if sub_res_rel or other_rels:
return True
return False


def filter_selected_relationships(
node_schema: CartographyNodeSchema,
selected_relationships: Set[CartographyRelSchema],
) -> Tuple[Optional[CartographyRelSchema], Optional[OtherRelationships]]:
Expand All @@ -277,7 +290,7 @@ def _filter_selected_relationships(
:param node_schema: The node schema object to filter relationships against
:param selected_relationships: The set of relationships to check if they exist in the node schema. If empty set,
this means that no relationships have been selected. None is not an accepted value here.
:return: a tuple of the (sub resource rel [if present in selected_relationships], an OtherRelationships object
:return: a tuple of the shape (sub resource rel [if present in selected_relationships], an OtherRelationships object
containing all values of node_schema.other_relationships that are present in selected_relationships)
"""
# The empty set means no relationships are selected
Expand All @@ -294,8 +307,8 @@ def _filter_selected_relationships(
for selected_rel in selected_relationships:
if selected_rel not in all_rels_on_node:
raise ValueError(
f"build_ingestion_query() failed: CartographyRelSchema {selected_rel.__class__.__name__} is not "
f"defined on CartographyNodeSchema type {node_schema.__class__.__name__}. Please verify the "
f"filter_selected_relationships() failed: CartographyRelSchema {selected_rel.__class__.__name__} is "
f"not defined on CartographyNodeSchema type {node_schema.__class__.__name__}. Please verify the "
f"value of `selected_relationships` passed to `build_ingestion_query()`.",
)

Expand Down Expand Up @@ -350,7 +363,7 @@ def build_ingestion_query(
sub_resource_rel: Optional[CartographyRelSchema] = node_schema.sub_resource_relationship
other_rels: Optional[OtherRelationships] = node_schema.other_relationships
if selected_relationships or selected_relationships == set():
sub_resource_rel, other_rels = _filter_selected_relationships(node_schema, selected_relationships)
sub_resource_rel, other_rels = filter_selected_relationships(node_schema, selected_relationships)

ingest_query = query_template.safe_substitute(
node_label=node_schema.label,
Expand Down
5 changes: 3 additions & 2 deletions cartography/intel/aws/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import neo4j

from cartography.client.core.tx import load_graph_data
from cartography.graph.job import GraphJob
from cartography.graph.model import CartographyNodeProperties
from cartography.graph.model import CartographyNodeSchema
from cartography.graph.model import CartographyRelProperties
Expand All @@ -20,7 +21,6 @@
from cartography.graph.querybuilder import build_ingestion_query
from cartography.intel.aws.ec2.util import get_botocore_config
from cartography.util import aws_handle_regions
from cartography.util import run_cleanup_job
from cartography.util import timeit

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -134,7 +134,8 @@ def load_emr_clusters(
@timeit
def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict) -> None:
logger.debug("Running EMR cleanup job.")
run_cleanup_job('aws_import_emr_cleanup.json', neo4j_session, common_job_parameters)
cleanup_job = GraphJob.from_node_schema(EMRClusterSchema(), common_job_parameters)
cleanup_job.run(neo4j_session)


@timeit
Expand Down
Loading

0 comments on commit e320232

Please sign in to comment.