Skip to content

Commit

Permalink
Updates for API module (MITRE module) and submodules
Browse files Browse the repository at this point in the history
-   APP-4442 - [Mitre] Updated MITRE module to support default value and make logging optional
-   APP-4443 - [API] Updated multiple API modules to be cached properties
-   APP-4444 - [Util] Replaced inflect package with inflection package
-   APP-4445 - [Util] Updated utils module to use inflection for case conversion
  • Loading branch information
bsummers-tc authored Apr 10, 2024
1 parent b82cfb1 commit b82854c
Show file tree
Hide file tree
Showing 26 changed files with 115 additions and 104 deletions.
1 change: 1 addition & 0 deletions .cspell/custom-dictionary-workspace.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ openapi
outdir
paco
paho
passthru
pathlib
playbookdb
poolblock
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The ThreatConnect™ TcEx App Framework provides functionality for writing T

* arrow (https://pypi.python.org/pypi/arrow)
* black (https://pypi.org/project/black/)
* inflect (https://pypi.python.org/pypi/inflect)
* inflection (https://pypi.org/project/inflection/)
* isort (https://pypi.org/project/isort/)
* jmespath (https://pypi.org/project/jmespath/)
* paho-mqtt (https://pypi.org/project/paho-mqtt/)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ classifiers = [
dependencies = [
"arrow",
"black",
"inflect",
"inflection",
"isort",
"jmespath",
"paho-mqtt<2.0.0",
Expand Down
7 changes: 7 additions & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Release Notes

## 4.0.5

- APP-4442 - [Mitre] Updated MITRE module to support default value and make logging optional
- APP-4443 - [API] Updated multiple API modules to be cached properties
- APP-4444 - [Util] Replaced inflect package with inflection package
- APP-4445 - [Util] Updated utils module to use inflection for case conversion

## 4.0.4

- APP-4307 - [API] Added support for paginating indicator custom associations
Expand Down
2 changes: 1 addition & 1 deletion tcex/__metadata__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""TcEx Framework Module"""

__license__ = 'Apache-2.0'
__version__ = '4.0.4'
__version__ = '4.0.5'
17 changes: 15 additions & 2 deletions tcex/api/tc/v3/_gen/model/_property_model.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""TcEx Framework Module"""

# third-party
from pydantic import BaseModel, Extra, Field, validator

# first-party
from tcex.pleb.cached_property import cached_property
from tcex.util import Util
from tcex.util.render.render import Render
from tcex.util.string_operation import CamelString


Expand Down Expand Up @@ -132,8 +134,12 @@ def __extra_data(self) -> dict[str, str]:
self.__process_special_types(self, extra)

if extra.get('typing_type') is None:
raise RuntimeError(
f'Unable to determine typing_type for name={self.name}, type={self.type}.'
Render.panel.failure(
f'New type found for object name="{self.name}"\n'
f'Type: "{self.type}" should be added to PropertyModel '
f'in one of the following methods:\n'
f'_process_dict, _process_tc_type, _process_special_types\n'
f'\nextra: {extra}'
)

return extra
Expand Down Expand Up @@ -163,6 +169,7 @@ def __process_bool_types(cls, pm: 'PropertyModel', extra: dict[str, str]):
def __process_dict_types(cls, pm: 'PropertyModel', extra: dict[str, str]):
"""Process standard type."""
types = [
'AiInsights',
'AttackSecurityCoverage',
'AttributeSource',
'DNSResolutions',
Expand Down Expand Up @@ -315,6 +322,12 @@ def __process_special_types(cls, pm: 'PropertyModel', extra: dict[str, str]):
'typing_type': 'list[str]',
}
)
elif pm.name == 'customAssociationNames':
extra.update(
{
'typing_type': 'list[str]',
}
)
elif pm.type == 'TaskAssignees':
extra.update(
{
Expand Down
2 changes: 0 additions & 2 deletions tcex/api/tc/v3/case_attributes/case_attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class CaseAttribute(ObjectABC):
the one(s) specified).
source (str, kwargs): The attribute source.
type (str, kwargs): The attribute type.
update_last_modified_date (bool, kwargs): A flag giving the client the ability to choose if
the attribute last modified date should be changed.
value (str, kwargs): The attribute value.
"""

Expand Down
10 changes: 0 additions & 10 deletions tcex/api/tc/v3/case_attributes/case_attribute_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,6 @@ class CaseAttributeModel(
read_only=False,
title='type',
)
update_last_modified_date: bool = Field(
None,
description=(
'A flag giving the client the ability to choose if the attribute last modified date '
'should be changed.'
),
methods=['POST', 'PUT'],
read_only=False,
title='updateLastModifiedDate',
)
value: str | None = Field(
None,
description='The attribute value.',
Expand Down
2 changes: 0 additions & 2 deletions tcex/api/tc/v3/group_attributes/group_attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class GroupAttribute(ObjectABC):
the one(s) specified).
source (str, kwargs): The attribute source.
type (str, kwargs): The attribute type.
update_last_modified_date (bool, kwargs): A flag giving the client the ability to choose if
the attribute last modified date should be changed.
value (str, kwargs): The attribute value.
"""

Expand Down
10 changes: 0 additions & 10 deletions tcex/api/tc/v3/group_attributes/group_attribute_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,6 @@ class GroupAttributeModel(
read_only=False,
title='type',
)
update_last_modified_date: bool = Field(
None,
description=(
'A flag giving the client the ability to choose if the attribute last modified date '
'should be changed.'
),
methods=['POST', 'PUT'],
read_only=False,
title='updateLastModifiedDate',
)
value: str | None = Field(
None,
description='The attribute value.',
Expand Down
2 changes: 1 addition & 1 deletion tcex/api/tc/v3/groups/group_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class GroupModel(
read_only=True,
title='id',
)
insights: str | None = Field(
insights: dict | None = Field(
None,
allow_mutation=False,
applies_to=['Document', 'Report'],
Expand Down
2 changes: 0 additions & 2 deletions tcex/api/tc/v3/indicator_attributes/indicator_attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class IndicatorAttribute(ObjectABC):
the one(s) specified).
source (str, kwargs): The attribute source.
type (str, kwargs): The attribute type.
update_last_modified_date (bool, kwargs): A flag giving the client the ability to choose if
the attribute last modified date should be changed.
value (str, kwargs): The attribute value.
"""

Expand Down
10 changes: 0 additions & 10 deletions tcex/api/tc/v3/indicator_attributes/indicator_attribute_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,6 @@ class IndicatorAttributeModel(
read_only=False,
title='type',
)
update_last_modified_date: bool = Field(
None,
description=(
'A flag giving the client the ability to choose if the attribute last modified date '
'should be changed.'
),
methods=['POST', 'PUT'],
read_only=False,
title='updateLastModifiedDate',
)
value: str | None = Field(
None,
description='The attribute value.',
Expand Down
15 changes: 15 additions & 0 deletions tcex/api/tc/v3/indicators/indicator_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,21 @@ def attribute(self, operator: Enum, attribute: list | str):

self._tql.add_filter('attribute', operator, attribute, TqlType.STRING)

def common_id(self, operator: Enum, common_id: int | list):
"""Filter Common Id based on **commonId** keyword.
Args:
operator: The operator enum for the filter.
common_id: The common ID of the indicator linking it between owners.
"""
if isinstance(common_id, list) and operator not in self.list_types:
raise RuntimeError(
'Operator must be CONTAINS, NOT_CONTAINS, IN'
'or NOT_IN when filtering on a list of values.'
)

self._tql.add_filter('commonId', operator, common_id, TqlType.INTEGER)

def confidence(self, operator: Enum, confidence: int | list):
"""Filter Confidence Rating based on **confidence** keyword.
Expand Down
5 changes: 3 additions & 2 deletions tcex/api/tc/v3/security/users/user_filter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""TcEx Framework Module"""

# standard library
from datetime import datetime
from enum import Enum
Expand Down Expand Up @@ -157,8 +158,8 @@ def password_reset_required(self, operator: Enum, password_reset_required: bool)
Args:
operator: The operator enum for the filter.
password_reset_required: A flag indicating whether or not
the user's password needs to be reset upon next login.
password_reset_required: A flag indicating whether or not the user's password needs to
be reset upon next login.
"""
self._tql.add_filter(
'passwordResetRequired', operator, password_reset_required, TqlType.BOOLEAN
Expand Down
39 changes: 22 additions & 17 deletions tcex/api/tc/v3/tags/mitre_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ def formatted(self):
class MitreTags:
"""MitreTags Class"""

def __init__(self, mitre_tags: dict[str, str]):
def __init__(self, mitre_tags: dict[str, str], verbose: bool = False):
"""Initialize instance properties."""
self._mitre_tags = {id_: MitreTag(id=id_, name=name) for id_, name in mitre_tags.items()}
self.verbose = verbose
self.log = _logger

@cached_property
def mitre_tags_name_id(self):
def mitre_tags_name_id(self) -> dict[str, MitreTag]:
"""Return a dict of MitreTags keyed by name."""
mitre_tags = {}
for tag in self._mitre_tags.values():
Expand All @@ -46,29 +47,33 @@ def mitre_tags_name_id(self):

return mitre_tags

def get_by_name(self, name: str) -> str | None:
def get_by_name(self, name: str, default: str | None = None) -> str | None:
"""Return the tag id for the provided name."""
mitre_tag = self.mitre_tags_name_id.get(name.lower())
if mitre_tag is None:
self.log.warning(f'No Mitre match found for {name}.')
return None
if self.verbose is True:
self.log.warning(f'No Mitre match found for {name}.')
return default
return mitre_tag.formatted

def get_by_id(self, id_: str) -> str:
"""Return the tag name for the provided id."""
mitre_tag = self._mitre_tags.get(id_)
def get_by_id(self, id_: str, default: str | None = None) -> str | None:
"""Return the tag name for the provided id (e.g., T1000)."""
mitre_tag = self._mitre_tags.get(str(id_).upper())
if mitre_tag is None:
self.log.warning(f'No Mitre match found for {id_}, returning id unformatted.')
return id_
if self.verbose is True:
self.log.warning(f'No Mitre match found for {id_}, returning id unformatted.')
return default
return mitre_tag.formatted

def get_by_id_regex(self, value: str):
def get_by_id_regex(self, value: str, default: str | None = None) -> str | None:
r"""Get the appropriate MitreTag using the (T\d+(?:\.\d+)?) regex."""
matches = re.findall(r'(T\d+(?:\.\d+)?)', value)
matches = re.findall(r'([Tt]\d+(?:\.\d+)?)', value)
if not matches:
self.log.warning(f'No Mitre matches found for {value}')
return None
if self.verbose is True:
self.log.warning(f'No Mitre matches found for {value}')
return default
if len(matches) > 1:
self.log.warning(f'Multiple Mitre matches found for {value}: {matches}')
return None
return self.get_by_id(matches[0])
if self.verbose is True:
self.log.warning(f'Multiple Mitre matches found for {value}: {matches}')
return default
return self.get_by_id(matches[0], default)
36 changes: 17 additions & 19 deletions tcex/api/tc/v3/threat_intelligence/threat_intelligence.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""TcEx Framework Module"""

# standard library
import logging

Expand Down Expand Up @@ -37,15 +38,12 @@ class ThreatIntelligence:
def __init__(self, session: Session):
"""Initialize instance properties."""
self.session = session
self._ti_utils = None
self.log = _logger

@property
@cached_property
def ti_utils(self):
"""Return instance of Threat Intel Utils."""
if not self._ti_utils:
self._ti_utils = ThreatIntelUtil(session_tc=self.session)
return self._ti_utils
return ThreatIntelUtil(session_tc=self.session)

def create_entity(self, entity: dict, owner: str) -> dict | None:
"""Create a CM object provided a dict and owner."""
Expand Down Expand Up @@ -192,20 +190,6 @@ def groups(self, **kwargs) -> Groups:
"""
return Groups(session=self.session, **kwargs)

@cached_property
def mitre_tags(self) -> MitreTags:
"""Mitre Tags"""
mitre_tags = {}
try:
tags = Tags(session=self.session, params={'resultLimit': 1_000})
tags.filter.technique_id(TqlOperator.NE, None) # type: ignore
for tag in tags:
mitre_tags[str(tag.model.technique_id)] = tag.model.name
except Exception as e:
self.log.exception('Error downloading Mitre Tags')
raise e
return MitreTags(mitre_tags)

def indicator(self, **kwargs) -> Indicator:
"""Return a instance of Group object.
Expand Down Expand Up @@ -315,6 +299,20 @@ def indicators(self, **kwargs) -> Indicators:
"""
return Indicators(session=self.session, **kwargs)

@cached_property
def mitre_tags(self) -> MitreTags:
"""Mitre Tags"""
mitre_tags = {}
try:
tags = Tags(session=self.session, params={'resultLimit': 1_000})
tags.filter.technique_id(TqlOperator.NE, None) # type: ignore
for tag in tags:
mitre_tags[str(tag.model.technique_id)] = tag.model.name
except Exception as e:
self.log.exception('Error downloading Mitre Tags')
raise e
return MitreTags(mitre_tags)

def security_label(self, **kwargs) -> SecurityLabel:
"""Return a instance of Case Attributes object.
Expand Down
9 changes: 5 additions & 4 deletions tcex/api/tc/v3/v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from tcex.api.tc.v3.intel_requirement.ir import IR
from tcex.api.tc.v3.security.security import Security
from tcex.api.tc.v3.threat_intelligence.threat_intelligence import ThreatIntelligence
from tcex.pleb.cached_property import cached_property


class V3(CaseManagement, Security, ThreatIntelligence):
Expand All @@ -23,22 +24,22 @@ def attribute_types(self, **kwargs) -> AttributeTypes:
"""Return a instance of Attribute Types object."""
return AttributeTypes(session=self.session, **kwargs)

@property
@cached_property
def cm(self) -> CaseManagement:
"""Return Case Management API collection."""
return CaseManagement(self.session)

@property
@cached_property
def ir(self) -> IR:
"""Return Intel Requirement API collection."""
return IR(self.session)

@property
@cached_property
def security(self) -> Security:
"""Return Security API collection."""
return Security(self.session)

@property
@cached_property
def ti(self) -> ThreatIntelligence:
"""Return Threat Intelligence API collection."""
return ThreatIntelligence(self.session)
2 changes: 0 additions & 2 deletions tcex/api/tc/v3/victim_attributes/victim_attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ class VictimAttribute(ObjectABC):
the one(s) specified).
source (str, kwargs): The attribute source.
type (str, kwargs): The attribute type.
update_last_modified_date (bool, kwargs): A flag giving the client the ability to choose if
the attribute last modified date should be changed.
value (str, kwargs): The attribute value.
victim_id (int, kwargs): Victim associated with attribute.
"""
Expand Down
Loading

0 comments on commit b82854c

Please sign in to comment.