Skip to content

Commit

Permalink
Refactor AWS DynamoDB to use new data model (#1153)
Browse files Browse the repository at this point in the history
Refactors DynamoDB sync to use the cartography data model.
  • Loading branch information
phishoes authored Apr 13, 2023
1 parent ab1584f commit 97f64db
Show file tree
Hide file tree
Showing 6 changed files with 354 additions and 105 deletions.
5 changes: 0 additions & 5 deletions cartography/data/indexes.cypher
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ CREATE INDEX IF NOT EXISTS FOR (n:DODroplet) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:DODroplet) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:DOProject) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:DOProject) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:DynamoDBGlobalSecondaryIndex) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:DynamoDBGlobalSecondaryIndex) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:DynamoDBTable) ON (n.arn);
CREATE INDEX IF NOT EXISTS FOR (n:DynamoDBTable) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:DynamoDBTable) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:EBSSnapshot) ON (n.id);
CREATE INDEX IF NOT EXISTS FOR (n:EBSSnapshot) ON (n.lastupdated);
CREATE INDEX IF NOT EXISTS FOR (n:EC2KeyPair) ON (n.keyfingerprint);
Expand Down

This file was deleted.

123 changes: 61 additions & 62 deletions cartography/intel/aws/dynamodb.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import logging
from typing import Any
from typing import Dict
from typing import List

import boto3
import neo4j

from cartography.client.core.tx import load
from cartography.graph.job import GraphJob
from cartography.intel.aws.ec2.util import get_botocore_config
from cartography.models.aws.dynamodb.gsi import DynamoDBGSISchema
from cartography.models.aws.dynamodb.tables import DynamoDBTableSchema
from cartography.stats import get_stats_client
from cartography.util import aws_handle_regions
from cartography.util import merge_module_sync_metadata
from cartography.util import run_cleanup_job
from cartography.util import timeit

logger = logging.getLogger(__name__)
Expand All @@ -18,7 +23,7 @@
@timeit
@aws_handle_regions
def get_dynamodb_tables(boto3_session: boto3.session.Session, region: str) -> List[Dict]:
client = boto3_session.client('dynamodb', region_name=region)
client = boto3_session.client('dynamodb', region_name=region, config=get_botocore_config())
paginator = client.get_paginator('list_tables')
dynamodb_tables = []
for page in paginator.paginate():
Expand All @@ -27,77 +32,69 @@ def get_dynamodb_tables(boto3_session: boto3.session.Session, region: str) -> Li
return dynamodb_tables


@timeit
def transform_dynamodb_tables(dynamodb_tables: List, region: str) -> Any:
ddb_table_data: List[Dict[str, Any]] = []
ddb_gsi_data: List[Dict[str, Any]] = []

for table in dynamodb_tables:
ddb_table_data.append({
'Arn': table['Table']['TableArn'],
'TableName': table['Table']['TableName'],
'Region': region,
'Rows': table['Table']['ItemCount'],
'Size': table['Table']['TableSizeBytes'],
'ProvisionedThroughputReadCapacityUnits': table['Table']['ProvisionedThroughput']['ReadCapacityUnits'],
'ProvisionedThroughputWriteCapacityUnits': table['Table']['ProvisionedThroughput']['WriteCapacityUnits'],
})
for gsi in table['Table'].get('GlobalSecondaryIndexes', []):
ddb_gsi_data.append({
'Arn': gsi['IndexArn'],
'TableArn': table['Table']['TableArn'],
'Region': region,
'ProvisionedThroughputReadCapacityUnits': gsi['ProvisionedThroughput']['ReadCapacityUnits'],
'ProvisionedThroughputWriteCapacityUnits': gsi['ProvisionedThroughput']['WriteCapacityUnits'],
'GSIName': gsi['IndexName'],
})
return ddb_table_data, ddb_gsi_data


@timeit
def load_dynamodb_tables(
neo4j_session: neo4j.Session, data: List[Dict], region: str, current_aws_account_id: str,
neo4j_session: neo4j.Session, tables_data: List[Dict[str, Any]], region: str, current_aws_account_id: str,
aws_update_tag: int,
) -> None:
ingest_table = """
MERGE (table:DynamoDBTable{id: $Arn})
ON CREATE SET table.firstseen = timestamp(), table.arn = $Arn, table.name = $TableName,
table.region = $Region
SET table.lastupdated = $aws_update_tag, table.rows = $Rows, table.size = $Size,
table.provisioned_throughput_read_capacity_units = $ProvisionedThroughputReadCapacityUnits,
table.provisioned_throughput_write_capacity_units = $ProvisionedThroughputWriteCapacityUnits
WITH table
MATCH (owner:AWSAccount{id: $AWS_ACCOUNT_ID})
MERGE (owner)-[r:RESOURCE]->(table)
ON CREATE SET r.firstseen = timestamp()
SET r.lastupdated = $aws_update_tag
"""

for table in data:
neo4j_session.run(
ingest_table,
Arn=table['Table']['TableArn'],
Region=region,
ProvisionedThroughputReadCapacityUnits=table['Table']['ProvisionedThroughput']['ReadCapacityUnits'],
ProvisionedThroughputWriteCapacityUnits=table['Table']['ProvisionedThroughput']['WriteCapacityUnits'],
Size=table['Table']['TableSizeBytes'],
TableName=table['Table']['TableName'],
Rows=table['Table']['ItemCount'],
AWS_ACCOUNT_ID=current_aws_account_id,
aws_update_tag=aws_update_tag,
)
load_gsi(neo4j_session, table, region, current_aws_account_id, aws_update_tag)
logger.info(f"Loading Dynamo DB tables {len(tables_data)} for region '{region}' into graph.")
load(
neo4j_session,
DynamoDBTableSchema(),
tables_data,
lastupdated=aws_update_tag,
Region=region,
AWS_ID=current_aws_account_id,
)


@timeit
def load_gsi(
neo4j_session: neo4j.Session, table: Dict, region: str, current_aws_account_id: str,
def load_dynamodb_gsi(
neo4j_session: neo4j.Session, gsi_data: List[Dict[str, Any]], region: str, current_aws_account_id: str,
aws_update_tag: int,
) -> None:
ingest_gsi = """
MERGE (gsi:DynamoDBGlobalSecondaryIndex{id: $Arn})
ON CREATE SET gsi.firstseen = timestamp(), gsi.arn = $Arn, gsi.name = $GSIName,
gsi.region = $Region
SET gsi.lastupdated = $aws_update_tag,
gsi.provisioned_throughput_read_capacity_units = $ProvisionedThroughputReadCapacityUnits,
gsi.provisioned_throughput_write_capacity_units = $ProvisionedThroughputWriteCapacityUnits
WITH gsi
MATCH (table:DynamoDBTable{arn: $TableArn})
MERGE (table)-[r:GLOBAL_SECONDARY_INDEX]->(gsi)
ON CREATE SET r.firstseen = timestamp()
SET r.lastupdated = $aws_update_tag
"""

for gsi in table['Table'].get('GlobalSecondaryIndexes', []):
neo4j_session.run(
ingest_gsi,
TableArn=table['Table']['TableArn'],
Arn=gsi['IndexArn'],
Region=region,
ProvisionedThroughputReadCapacityUnits=gsi['ProvisionedThroughput']['ReadCapacityUnits'],
ProvisionedThroughputWriteCapacityUnits=gsi['ProvisionedThroughput']['WriteCapacityUnits'],
GSIName=gsi['IndexName'],
AWS_ACCOUNT_ID=current_aws_account_id,
aws_update_tag=aws_update_tag,
)
logger.info(f"Loading Dynamo DB GSI {len(gsi_data)} for region '{region}' into graph.")
load(
neo4j_session,
DynamoDBGSISchema(),
gsi_data,
lastupdated=aws_update_tag,
Region=region,
AWS_ID=current_aws_account_id,
)


@timeit
def cleanup_dynamodb_tables(neo4j_session: neo4j.Session, common_job_parameters: Dict) -> None:
run_cleanup_job('aws_import_dynamodb_tables_cleanup.json', neo4j_session, common_job_parameters)
GraphJob.from_node_schema(DynamoDBTableSchema(), common_job_parameters).run(neo4j_session)
GraphJob.from_node_schema(DynamoDBGSISchema(), common_job_parameters).run(neo4j_session)


@timeit
Expand All @@ -107,8 +104,10 @@ def sync_dynamodb_tables(
) -> None:
for region in regions:
logger.info("Syncing DynamoDB for region in '%s' in account '%s'.", region, current_aws_account_id)
data = get_dynamodb_tables(boto3_session, region)
load_dynamodb_tables(neo4j_session, data, region, current_aws_account_id, aws_update_tag)
dynamodb_tables = get_dynamodb_tables(boto3_session, region)
ddb_table_data, ddb_gsi_data = transform_dynamodb_tables(dynamodb_tables, region)
load_dynamodb_tables(neo4j_session, ddb_table_data, region, current_aws_account_id, aws_update_tag)
load_dynamodb_gsi(neo4j_session, ddb_gsi_data, region, current_aws_account_id, aws_update_tag)
cleanup_dynamodb_tables(neo4j_session, common_job_parameters)


Expand Down
68 changes: 68 additions & 0 deletions cartography/models/aws/dynamodb/gsi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from dataclasses import dataclass

from cartography.models.core.common import PropertyRef
from cartography.models.core.nodes import CartographyNodeProperties
from cartography.models.core.nodes import CartographyNodeSchema
from cartography.models.core.relationships import CartographyRelProperties
from cartography.models.core.relationships import CartographyRelSchema
from cartography.models.core.relationships import LinkDirection
from cartography.models.core.relationships import make_target_node_matcher
from cartography.models.core.relationships import OtherRelationships
from cartography.models.core.relationships import TargetNodeMatcher


@dataclass(frozen=True)
class DynamoDBGSINodeProperties(CartographyNodeProperties):
id: PropertyRef = PropertyRef('Arn')
arn: PropertyRef = PropertyRef('Arn')
name: PropertyRef = PropertyRef('GSIName')
region: PropertyRef = PropertyRef('Region', set_in_kwargs=True)
lastupdated: PropertyRef = PropertyRef('lastupdated', set_in_kwargs=True)
provisioned_throughput_read_capacity_units: PropertyRef = PropertyRef('ProvisionedThroughputReadCapacityUnits')
provisioned_throughput_write_capacity_units: PropertyRef = PropertyRef('ProvisionedThroughputWriteCapacityUnits')


@dataclass(frozen=True)
class DynamoDBGSIToAwsAccountRelProperties(CartographyRelProperties):
lastupdated: PropertyRef = PropertyRef('lastupdated', set_in_kwargs=True)


@dataclass(frozen=True)
# (:DynamoDBGlobalSecondaryIndex)<-[:RESOURCE]-(:AWSAccount)
class DynamoDBGSIToAWSAccount(CartographyRelSchema):
target_node_label: str = 'AWSAccount'
target_node_matcher: TargetNodeMatcher = make_target_node_matcher(
{'id': PropertyRef('AWS_ID', set_in_kwargs=True)},
)
direction: LinkDirection = LinkDirection.INWARD
rel_label: str = "RESOURCE"
properties: DynamoDBGSIToAwsAccountRelProperties = DynamoDBGSIToAwsAccountRelProperties()


@dataclass(frozen=True)
class DynamoDBGSIToDynamoDBTableRelProperties(CartographyRelProperties):
lastupdated: PropertyRef = PropertyRef('lastupdated', set_in_kwargs=True)


@dataclass(frozen=True)
# (:DynamoDBGlobalSecondaryIndex)<-[:GLOBAL_SECONDARY_INDEX]-(:DynamoDBTable)
class DynamoDBGSIToDynamoDBTable(CartographyRelSchema):
target_node_label: str = 'DynamoDBTable'
target_node_matcher: TargetNodeMatcher = make_target_node_matcher(
{'arn': PropertyRef('TableArn')},
)
direction: LinkDirection = LinkDirection.INWARD
rel_label: str = "GLOBAL_SECONDARY_INDEX"
properties: DynamoDBGSIToDynamoDBTableRelProperties = DynamoDBGSIToDynamoDBTableRelProperties()


@dataclass(frozen=True)
class DynamoDBGSISchema(CartographyNodeSchema):
label: str = 'DynamoDBGlobalSecondaryIndex'
properties: DynamoDBGSINodeProperties = DynamoDBGSINodeProperties()
sub_resource_relationship: DynamoDBGSIToAWSAccount = DynamoDBGSIToAWSAccount()
other_relationships: OtherRelationships = OtherRelationships(
[
DynamoDBGSIToDynamoDBTable(),
],
)
47 changes: 47 additions & 0 deletions cartography/models/aws/dynamodb/tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from dataclasses import dataclass

from cartography.models.core.common import PropertyRef
from cartography.models.core.nodes import CartographyNodeProperties
from cartography.models.core.nodes import CartographyNodeSchema
from cartography.models.core.relationships import CartographyRelProperties
from cartography.models.core.relationships import CartographyRelSchema
from cartography.models.core.relationships import LinkDirection
from cartography.models.core.relationships import make_target_node_matcher
from cartography.models.core.relationships import TargetNodeMatcher


@dataclass(frozen=True)
class DynamoDBTableNodeProperties(CartographyNodeProperties):
id: PropertyRef = PropertyRef('Arn')
arn: PropertyRef = PropertyRef('Arn')
name: PropertyRef = PropertyRef('TableName')
region: PropertyRef = PropertyRef('Region', set_in_kwargs=True)
lastupdated: PropertyRef = PropertyRef('lastupdated', set_in_kwargs=True)
rows: PropertyRef = PropertyRef('Rows')
size: PropertyRef = PropertyRef('Size')
provisioned_throughput_read_capacity_units: PropertyRef = PropertyRef('ProvisionedThroughputReadCapacityUnits')
provisioned_throughput_write_capacity_units: PropertyRef = PropertyRef('ProvisionedThroughputWriteCapacityUnits')


@dataclass(frozen=True)
class DynamoDBTableToAwsAccountRelProperties(CartographyRelProperties):
lastupdated: PropertyRef = PropertyRef('lastupdated', set_in_kwargs=True)


@dataclass(frozen=True)
# (:DynamoDBTable)<-[:RESOURCE]-(:AWSAccount)
class DynamoDBTableToAWSAccount(CartographyRelSchema):
target_node_label: str = 'AWSAccount'
target_node_matcher: TargetNodeMatcher = make_target_node_matcher(
{'id': PropertyRef('AWS_ID', set_in_kwargs=True)},
)
direction: LinkDirection = LinkDirection.INWARD
rel_label: str = "RESOURCE"
properties: DynamoDBTableToAwsAccountRelProperties = DynamoDBTableToAwsAccountRelProperties()


@dataclass(frozen=True)
class DynamoDBTableSchema(CartographyNodeSchema):
label: str = 'DynamoDBTable'
properties: DynamoDBTableNodeProperties = DynamoDBTableNodeProperties()
sub_resource_relationship: DynamoDBTableToAWSAccount = DynamoDBTableToAWSAccount()
Loading

0 comments on commit 97f64db

Please sign in to comment.