diff --git a/README.md b/README.md index 4e8dc62..a810f0c 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,7 @@ optional arguments: --exclude EXCLUDE specify threat IDs to be ignored --seq output sequential diagram --list list all available threats + --colormap color the risk in the diagram --describe DESCRIBE describe the properties available for a given element --list-elements list all elements which can be part of a threat model --json JSON output a JSON file @@ -113,6 +114,9 @@ Element class attributes: ``` +The *colormap* argument, used together with *dfd*, outputs a color-coded DFD where the elements are painted red, yellow or green depending on their risk level (as identified by running the rules). + + ## Creating a Threat Model The following is a sample `tm.py` file that describes a simple application where a User logs into the application @@ -200,6 +204,8 @@ tm.process() ``` +You also have the option of using [pytmGPT](https://chat.openai.com/g/g-soISG24ix-pytmgpt) to create your models from prose! + ### Generating Diagrams Diagrams are output as [Dot](https://graphviz.gitlab.io/) and [PlantUML](https://plantuml.com/). @@ -303,10 +309,13 @@ user_to_web.overrides = [ threat_id="INP02", cvss="9.3", response="""**To Mitigate**: run a memory sanitizer to validate the binary""", + severity="Very High", ) ] ``` +If you are adding a Finding, make sure to add a severity: "Very High", "High", "Medium", "Low", "Very Low". + ## Threats database For the security practitioner, you may supply your own threats file by setting `TM.threatsFile`. It should contain entries like: diff --git a/pytm/images/datastore_black.png b/pytm/images/datastore_black.png new file mode 100644 index 0000000..3258f66 Binary files /dev/null and b/pytm/images/datastore_black.png differ diff --git a/pytm/images/datastore_darkgreen.png b/pytm/images/datastore_darkgreen.png new file mode 100644 index 0000000..759a1ce Binary files /dev/null and b/pytm/images/datastore_darkgreen.png differ diff --git a/pytm/images/datastore_firebrick3.png b/pytm/images/datastore_firebrick3.png new file mode 100644 index 0000000..5c8446d Binary files /dev/null and b/pytm/images/datastore_firebrick3.png differ diff --git a/pytm/images/datastore_gold.png b/pytm/images/datastore_gold.png new file mode 100644 index 0000000..6ed29d9 Binary files /dev/null and b/pytm/images/datastore_gold.png differ diff --git a/pytm/pytm.py b/pytm/pytm.py index e6e005f..9906648 100644 --- a/pytm/pytm.py +++ b/pytm/pytm.py @@ -34,6 +34,18 @@ """ +def sev_to_color(sev): + # calculate the color depending on the severity + if sev == 5: + return 'firebrick3; fillcolor="#b2222222"; style=filled ' + elif sev <= 4 and sev >= 2: + return 'gold; fillcolor="#ffd80022"; style=filled' + elif sev < 2 and sev >= 0: + return 'darkgreen; fillcolor="#00630022"; style=filled' + + return "black" + + class UIError(Exception): def __init__(self, e, context): self.error = e @@ -248,15 +260,16 @@ def __ne__(self, other): def __str__(self): return ", ".join(sorted(set(d.name for d in self))) + class varControls(var): def __set__(self, instance, value): if not isinstance(value, Controls): raise ValueError( - "expecting an Controls " - "value, got a {}".format(type(value)) + "expecting an Controls " "value, got a {}".format(type(value)) ) super().__set__(instance, value) + class Action(Enum): """Action taken when validating a threat model.""" @@ -448,18 +461,25 @@ def _apply_defaults(flows, data): e._safeset("dstPort", e.sink.port) if hasattr(e.sink.controls, "isEncrypted"): e.controls._safeset("isEncrypted", e.sink.controls.isEncrypted) - e.controls._safeset("authenticatesDestination", e.source.controls.authenticatesDestination) - e.controls._safeset("checksDestinationRevocation", e.source.controls.checksDestinationRevocation) + e.controls._safeset( + "authenticatesDestination", e.source.controls.authenticatesDestination + ) + e.controls._safeset( + "checksDestinationRevocation", e.source.controls.checksDestinationRevocation + ) for d in e.data: if d.isStored: if hasattr(e.sink.controls, "isEncryptedAtRest"): for d in e.data: - d._safeset("isDestEncryptedAtRest", e.sink.controls.isEncryptedAtRest) + d._safeset( + "isDestEncryptedAtRest", e.sink.controls.isEncryptedAtRest + ) if hasattr(e.source, "isEncryptedAtRest"): for d in e.data: d._safeset( - "isSourceEncryptedAtRest", e.source.controls.isEncryptedAtRest + "isSourceEncryptedAtRest", + e.source.controls.isEncryptedAtRest, ) if d.credentialsLife != Lifetime.NONE and not d.isCredentials: d._safeset("isCredentials", True) @@ -528,28 +548,26 @@ def _describe_classes(classes): def _list_elements(): """List all elements which can be used in a threat model with the corresponding description""" + def all_subclasses(cls): """Get all sub classes of a class""" subclasses = set(cls.__subclasses__()) - return subclasses.union( - (s for c in subclasses for s in all_subclasses(c))) + return subclasses.union((s for c in subclasses for s in all_subclasses(c))) def print_components(cls_list): elements = sorted(cls_list, key=lambda c: c.__name__) max_len = max((len(e.__name__) for e in elements)) for sc in elements: - doc = sc.__doc__ if sc.__doc__ is not None else '' - print(f'{sc.__name__:<{max_len}} -- {doc}') - #print all elements - print('Elements:') + doc = sc.__doc__ if sc.__doc__ is not None else "" + print(f"{sc.__name__:<{max_len}} -- {doc}") + + # print all elements + print("Elements:") print_components(all_subclasses(Element)) # Print Attributes - print('\nAtributes:') - print_components( - all_subclasses(OrderedEnum) | {Data, Action, Lifetime} - ) - + print("\nAtributes:") + print_components(all_subclasses(OrderedEnum) | {Data, Action, Lifetime}) def _get_elements_and_boundaries(flows): @@ -735,7 +753,14 @@ class TM: _data = [] _threatsExcluded = [] _sf = None - _duplicate_ignored_attrs = "name", "note", "order", "response", "responseTo", "controls" + _duplicate_ignored_attrs = ( + "name", + "note", + "order", + "response", + "responseTo", + "controls", + ) name = varString("", required=True, doc="Model name") description = varString("", required=True, doc="Model description") threatsFile = varString( @@ -757,6 +782,7 @@ class TM: required=False, doc="A list of assumptions about the design/model.", ) + _colormap = False def __init__(self, name, **kwargs): for key, value in kwargs.items(): @@ -787,7 +813,9 @@ def _add_threats(self): with open(self.threatsFile, "r", encoding="utf8") as threat_file: threats_json = json.load(threat_file) except (FileNotFoundError, PermissionError, IsADirectoryError) as e: - raise UIError(e, f"while trying to open the the threat file ({self.threatsFile}).") + raise UIError( + e, f"while trying to open the the threat file ({self.threatsFile})." + ) for i in threats_json: TM._threats.append(Threat(**i)) @@ -819,9 +847,9 @@ def resolve(self): finding_count += 1 f = Finding(e, id=str(finding_count), threat=t) - logger.debug(f"new finding: {f}") findings.append(f) elements[e].append(f) + e._set_severity(f.severity) self.findings = findings for e, findings in elements.items(): e.findings = findings @@ -886,7 +914,7 @@ def _check_duplicates(self, flows): left_controls_attrs = left.controls._attr_values() right_controls_attrs = right.controls._attr_values() - #for a in self._duplicate_ignored_attrs: + # for a in self._duplicate_ignored_attrs: # del left_controls_attrs[a], right_controls_attrs[a] if left_controls_attrs != right_controls_attrs: continue @@ -894,7 +922,6 @@ def _check_duplicates(self, flows): right._is_drawn = True continue - raise ValueError( "Duplicate Dataflow found between {} and {}: " "{} is same as {}".format( @@ -1013,7 +1040,9 @@ def report(self, template_path): with open(template_path) as file: template = file.read() except (FileNotFoundError, PermissionError, IsADirectoryError) as e: - raise UIError(e, f"while trying to open the report template file ({template_path}).") + raise UIError( + e, f"while trying to open the report template file ({template_path})." + ) threats = encode_threat_data(TM._threats) findings = encode_threat_data(self.findings) @@ -1049,7 +1078,6 @@ def process(self): sys.stderr.write(erromsg) sys.exit(127) - def _process(self): self.check() result = get_args() @@ -1065,7 +1093,9 @@ def _process(self): print(self.seq()) if result.dfd is True: - print(self.dfd(levels=(result.levels or set()))) + if result.colormap is True: + self.resolve() + print(self.dfd(colormap=result.colormap, levels=(result.levels or set()))) if ( result.report is not None @@ -1083,7 +1113,9 @@ def _process(self): with open(result.json, "w", encoding="utf8") as f: json.dump(self, f, default=to_serializable) except (FileExistsError, PermissionError, IsADirectoryError) as e: - raise UIError(e, f"while trying to write to the result file ({result.json})") + raise UIError( + e, f"while trying to write to the result file ({result.json})" + ) if result.report is not None: print(self.report(result.report)) @@ -1114,7 +1146,6 @@ def _stale(self, days): print(f"Checking for code {days} days older than this model.") for e in TM._elements: - for src in e.sourceFiles: try: src_mtime = datetime.fromtimestamp( @@ -1191,6 +1222,7 @@ def get_table(self, db, klass): ] return db.define_table(name, fields) + class Controls: """Controls implemented by/on and Element""" @@ -1283,7 +1315,6 @@ def _attr_values(self): result[i] = value return result - def _safeset(self, attr, value): try: setattr(self, attr, value) @@ -1291,7 +1322,6 @@ def _safeset(self, attr, value): pass - class Element: """A generic element""" @@ -1322,6 +1352,7 @@ class Element: doc="Location of the source code that describes this element relative to the directory of the model script.", ) controls = varControls(None) + severity = 0 def __init__(self, name, **kwargs): for key, value in kwargs.items(): @@ -1353,7 +1384,7 @@ def _dfd_template(self): return """{uniq_name} [ shape = {shape}; color = {color}; - fontcolor = {color}; + fontcolor = black; label = "{label}"; margin = 0.02; ] @@ -1366,10 +1397,14 @@ def dfd(self, **kwargs): if levels and not levels & self.levels: return "" + color = self._color() + if kwargs.get("colormap", False): + color = sev_to_color(self.severity) + return self._dfd_template().format( uniq_name=self._uniq_name(), label=self._label(), - color=self._color(), + color=color, shape=self._shape(), ) @@ -1463,6 +1498,23 @@ def _attr_values(self): def checkTLSVersion(self, flows): return any(f.tlsVersion < self.minTLSVersion for f in flows) + def _set_severity(self, sev): + sevs = { + "very high": 5, + "high": 4, + "medium": 3, + "low": 2, + "very low": 1, + "info": 0, + } + + if sev.lower() not in sevs.keys(): + return + + if self.severity < sevs[sev.lower()]: + self.severity = sevs[sev.lower()] + return + class Data: """Represents a single piece of data that traverses the system""" @@ -1564,7 +1616,7 @@ def _dfd_template(self): shape = {shape}; color = {color}; - fontcolor = {color}; + fontcolor = "black"; label = < @@ -1580,10 +1632,15 @@ def dfd(self, **kwargs): if levels and not levels & self.levels: return "" + color = self._color() + + if kwargs.get("colormap", False): + color = sev_to_color(self.severity) + return self._dfd_template().format( uniq_name=self._uniq_name(), label=self._label(), - color=self._color(), + color=color, shape=self._shape(), ) @@ -1634,7 +1691,7 @@ class Datastore(Asset): * FILE_SYSTEM - files on a file system * SQL - A SQL Database * LDAP - An LDAP Server -* AWS_S3 - An S3 Bucket within AWS""" +* AWS_S3 - An S3 Bucket within AWS""", ) def __init__(self, name, **kwargs): @@ -1647,7 +1704,7 @@ def _dfd_template(self): image = "{image}"; imagescale = true; color = {color}; - fontcolor = {color}; + fontcolor = black; xlabel = "{label}"; label = ""; ] @@ -1663,12 +1720,21 @@ def dfd(self, **kwargs): if levels and not levels & self.levels: return "" + color = self._color() + color_file = "black" + + if kwargs.get("colormap", False): + color = sev_to_color(self.severity) + color_file = color.split(";")[0] + return self._dfd_template().format( uniq_name=self._uniq_name(), label=self._label(), - color=self._color(), + color=color, shape=self._shape(), - image=os.path.join(os.path.dirname(__file__), "images", "datastore.png"), + image=os.path.join( + os.path.dirname(__file__), "images", f"datastore_{color_file}.png" + ), ) @@ -1734,6 +1800,7 @@ class Dataflow(Element): note = varString("") usesVPN = varBool(False) usesSessionTokens = varBool(False) + severity = 0 def __init__(self, source, sink, name, **kwargs): self.source = source @@ -1766,6 +1833,11 @@ def dfd(self, mergeResponses=False, **kwargs): ): return "" + color = self._color() + + if kwargs.get("colormap", False): + color = sev_to_color(self.severity) + direction = "forward" label = self._label() if mergeResponses and self.response is not None: @@ -1777,7 +1849,7 @@ def dfd(self, mergeResponses=False, **kwargs): sink=self.sink._uniq_name(), direction=direction, label=label, - color=self._color(), + color=color, ) def hasDataLeaks(self): @@ -1801,7 +1873,7 @@ def _dfd_template(self): return """subgraph cluster_{uniq_name} {{ graph [ fontsize = 10; - fontcolor = {color}; + fontcolor = black; style = dashed; color = {color}; label = <{label}>; @@ -1817,23 +1889,25 @@ def dfd(self, **kwargs): self._is_drawn = True - logger.debug("Now drawing boundary " + self.name) edges = [] for e in TM._elements: if e.inBoundary != self or e._is_drawn: continue # The content to draw can include Boundary objects - logger.debug("Now drawing content {}".format(e.name)) edges.append(e.dfd(**kwargs)) + return self._dfd_template().format( uniq_name=self._uniq_name(), label=self._label(), - color=self._color(), + color=self._color(**kwargs), edges=indent("\n".join(edges), " "), ) - def _color(self): - return "firebrick2" + def _color(self, **kwargs): + if kwargs.get("colormap", False): + return "black" + else: + return "firebrick2" def parents(self): result = [] @@ -1901,27 +1975,29 @@ def serialize(obj, nested=False): result[i.lstrip("_")] = value return result + def encode_element_threat_data(obj): """Used to html encode threat data from a list of Elements""" encoded_elements = [] - if (type(obj) is not list): - raise ValueError("expecting a list value, got a {}".format(type(value))) + if type(obj) is not list: + raise ValueError("expecting a list value, got a {}".format(type(obj))) for o in obj: - c = copy.deepcopy(o) - for a in o._attr_values(): - if (a == "findings"): - encoded_findings = encode_threat_data(o.findings) - c._safeset("findings", encoded_findings) + c = copy.deepcopy(o) + for a in o._attr_values(): + if a == "findings": + encoded_findings = encode_threat_data(o.findings) + c._safeset("findings", encoded_findings) else: - v = getattr(o, a) - if (type(v) is not list or (type(v) is list and len(v) != 0)): - c._safeset(a, v) - - encoded_elements.append(c) + v = getattr(o, a) + if type(v) is not list or (type(v) is list and len(v) != 0): + c._safeset(a, v) + + encoded_elements.append(c) return encoded_elements + def encode_threat_data(obj): """Used to html encode threat data from a list of threats or findings""" encoded_threat_data = [] @@ -1978,11 +2054,16 @@ def get_args(): _parser.add_argument( "--list", action="store_true", help="list all available threats" ) + _parser.add_argument( + "--colormap", action="store_true", help="color the risk in the diagram" + ) _parser.add_argument( "--describe", help="describe the properties available for a given element" ) _parser.add_argument( - "--list-elements", action="store_true", help="list all elements which can be part of a threat model" + "--list-elements", + action="store_true", + help="list all elements which can be part of a threat model", ) _parser.add_argument("--json", help="output a JSON file") _parser.add_argument( diff --git a/tests/dfd.dot b/tests/dfd.dot index 1a16d24..2f707bb 100644 --- a/tests/dfd.dot +++ b/tests/dfd.dot @@ -21,7 +21,7 @@ digraph tm { subgraph cluster_boundary_Companynet_88f2d9c06f { graph [ fontsize = 10; - fontcolor = firebrick2; + fontcolor = black; style = dashed; color = firebrick2; label = <Company net>; @@ -30,7 +30,7 @@ digraph tm { subgraph cluster_boundary_dmz_579e9aae81 { graph [ fontsize = 10; - fontcolor = firebrick2; + fontcolor = black; style = dashed; color = firebrick2; label = <dmz>; @@ -49,7 +49,7 @@ digraph tm { subgraph cluster_boundary_backend_f2eb7a3ff7 { graph [ fontsize = 10; - fontcolor = firebrick2; + fontcolor = black; style = dashed; color = firebrick2; label = <backend>; @@ -66,7 +66,7 @@ digraph tm { datastore_SQLDatabase_0291419f72 [ shape = none; fixedsize = shape; - image = "INSTALL_PATH/pytm/images/datastore.png"; + image = "INSTALL_PATH/pytm/images/datastore_black.png"; imagescale = true; color = black; fontcolor = black; @@ -81,7 +81,7 @@ digraph tm { subgraph cluster_boundary_Internet_acf3059e70 { graph [ fontsize = 10; - fontcolor = firebrick2; + fontcolor = black; style = dashed; color = firebrick2; label = <Internet>; diff --git a/tests/dfd_colormap.dot b/tests/dfd_colormap.dot new file mode 100644 index 0000000..1c4e61a --- /dev/null +++ b/tests/dfd_colormap.dot @@ -0,0 +1,142 @@ +digraph tm { + graph [ + fontname = Arial; + fontsize = 14; + ] + node [ + fontname = Arial; + fontsize = 14; + rankdir = lr; + ] + edge [ + shape = none; + arrowtail = onormal; + fontname = Arial; + fontsize = 12; + ] + labelloc = "t"; + fontsize = 20; + nodesep = 1; + + subgraph cluster_boundary_Companynet_88f2d9c06f { + graph [ + fontsize = 10; + fontcolor = black; + style = dashed; + color = black; + label = <Company net>; + ] + + subgraph cluster_boundary_dmz_579e9aae81 { + graph [ + fontsize = 10; + fontcolor = black; + style = dashed; + color = black; + label = <dmz>; + ] + + server_Gateway_f8af758679 [ + shape = circle; + color = firebrick3; fillcolor="#b2222222"; style=filled ; + fontcolor = black; + label = "Gateway"; + margin = 0.02; + ] + + } + + subgraph cluster_boundary_backend_f2eb7a3ff7 { + graph [ + fontsize = 10; + fontcolor = black; + style = dashed; + color = black; + label = <backend>; + ] + + server_WebServer_2c440ebe53 [ + shape = circle; + color = firebrick3; fillcolor="#b2222222"; style=filled ; + fontcolor = black; + label = "Web Server"; + margin = 0.02; + ] + + datastore_SQLDatabase_0291419f72 [ + shape = none; + fixedsize = shape; + image = "INSTALL_PATH/pytm/images/datastore_gold.png"; + imagescale = true; + color = gold; fillcolor="#ffd80022"; style=filled; + fontcolor = black; + xlabel = "SQL Database"; + label = ""; + ] + + } + + } + + subgraph cluster_boundary_Internet_acf3059e70 { + graph [ + fontsize = 10; + fontcolor = black; + style = dashed; + color = black; + label = <Internet>; + ] + + actor_User_d2006ce1bb [ + shape = square; + color = darkgreen; fillcolor="#00630022"; style=filled; + fontcolor = black; + label = "User"; + margin = 0.02; + ] + + } + + actor_User_d2006ce1bb -> server_Gateway_f8af758679 [ + color = gold; fillcolor="#ffd80022"; style=filled; + fontcolor = gold; fillcolor="#ffd80022"; style=filled; + dir = forward; + label = "User enters\ncomments (*)"; + ] + + server_Gateway_f8af758679 -> server_WebServer_2c440ebe53 [ + color = gold; fillcolor="#ffd80022"; style=filled; + fontcolor = gold; fillcolor="#ffd80022"; style=filled; + dir = forward; + label = "Request"; + ] + + server_WebServer_2c440ebe53 -> datastore_SQLDatabase_0291419f72 [ + color = gold; fillcolor="#ffd80022"; style=filled; + fontcolor = gold; fillcolor="#ffd80022"; style=filled; + dir = forward; + label = "Insert query with\ncomments"; + ] + + datastore_SQLDatabase_0291419f72 -> server_WebServer_2c440ebe53 [ + color = gold; fillcolor="#ffd80022"; style=filled; + fontcolor = gold; fillcolor="#ffd80022"; style=filled; + dir = forward; + label = "Retrieve comments"; + ] + + server_WebServer_2c440ebe53 -> server_Gateway_f8af758679 [ + color = gold; fillcolor="#ffd80022"; style=filled; + fontcolor = gold; fillcolor="#ffd80022"; style=filled; + dir = forward; + label = "Response"; + ] + + server_Gateway_f8af758679 -> actor_User_d2006ce1bb [ + color = gold; fillcolor="#ffd80022"; style=filled; + fontcolor = gold; fillcolor="#ffd80022"; style=filled; + dir = forward; + label = "Show comments (*)"; + ] + +} diff --git a/tests/dfd_level0.txt b/tests/dfd_level0.txt index 6624edb..bf888d2 100644 --- a/tests/dfd_level0.txt +++ b/tests/dfd_level0.txt @@ -21,7 +21,7 @@ digraph tm { subgraph cluster_boundary_Internet_acf3059e70 { graph [ fontsize = 10; - fontcolor = firebrick2; + fontcolor = black; style = dashed; color = firebrick2; label = <Internet>; @@ -40,7 +40,7 @@ digraph tm { subgraph cluster_boundary_ServerDB_88f2d9c06f { graph [ fontsize = 10; - fontcolor = firebrick2; + fontcolor = black; style = dashed; color = firebrick2; label = <Server/DB>; @@ -49,7 +49,7 @@ digraph tm { datastore_SQLDatabase_d2006ce1bb [ shape = none; fixedsize = shape; - image = "INSTALL_PATH/pytm/images/datastore.png"; + image = "INSTALL_PATH/pytm/images/datastore_black.png"; imagescale = true; color = black; fontcolor = black; diff --git a/tests/dfd_level1.txt b/tests/dfd_level1.txt index 8509f3c..61e4dfa 100644 --- a/tests/dfd_level1.txt +++ b/tests/dfd_level1.txt @@ -21,7 +21,7 @@ digraph tm { subgraph cluster_boundary_Internet_acf3059e70 { graph [ fontsize = 10; - fontcolor = firebrick2; + fontcolor = black; style = dashed; color = firebrick2; label = <Internet>; @@ -40,7 +40,7 @@ digraph tm { subgraph cluster_boundary_ServerDB_88f2d9c06f { graph [ fontsize = 10; - fontcolor = firebrick2; + fontcolor = black; style = dashed; color = firebrick2; label = <Server/DB>; diff --git a/tests/output.json b/tests/output.json index bfd2627..94d3326 100644 --- a/tests/output.json +++ b/tests/output.json @@ -68,6 +68,7 @@ "overrides": [], "port": -1, "protocol": "", + "severity": 0, "sourceFiles": [] } ], @@ -145,6 +146,7 @@ "overrides": [], "port": -1, "protocol": "", + "severity": 0, "sourceFiles": [], "usesCache": false, "usesEnvironmentVariables": false, @@ -222,6 +224,7 @@ "overrides": [], "port": -1, "protocol": "", + "severity": 0, "sourceFiles": [], "usesEnvironmentVariables": false }, @@ -298,6 +301,7 @@ "overrides": [], "port": -1, "protocol": "", + "severity": 0, "sourceFiles": [], "tracksExecutionFlow": false, "usesEnvironmentVariables": false @@ -377,6 +381,7 @@ "overrides": [], "port": -1, "protocol": "", + "severity": 0, "sourceFiles": [], "storesLogData": false, "storesPII": false, @@ -444,6 +449,7 @@ "minTLSVersion": "TLSVersion.NONE", "name": "Internet", "overrides": [], + "severity": 0, "sourceFiles": [] }, { @@ -503,9 +509,11 @@ "minTLSVersion": "TLSVersion.NONE", "name": "Server/DB", "overrides": [], + "severity": 0, "sourceFiles": [] } ], + "colormap": false, "data": [ { "carriedBy": [ @@ -597,6 +605,7 @@ "overrides": [], "port": -1, "protocol": "", + "severity": 0, "sourceFiles": [] }, { @@ -672,6 +681,7 @@ "overrides": [], "port": -1, "protocol": "", + "severity": 0, "sourceFiles": [], "usesCache": false, "usesEnvironmentVariables": false, @@ -749,6 +759,7 @@ "overrides": [], "port": -1, "protocol": "", + "severity": 0, "sourceFiles": [], "usesEnvironmentVariables": false }, @@ -825,6 +836,7 @@ "overrides": [], "port": -1, "protocol": "", + "severity": 0, "sourceFiles": [], "tracksExecutionFlow": false, "usesEnvironmentVariables": false @@ -904,6 +916,7 @@ "overrides": [], "port": -1, "protocol": "", + "severity": 0, "sourceFiles": [], "storesLogData": false, "storesPII": false, @@ -982,6 +995,7 @@ "protocol": "", "response": null, "responseTo": null, + "severity": 0, "sink": "Web Server", "source": "User", "sourceFiles": [], @@ -1056,6 +1070,7 @@ "protocol": "", "response": null, "responseTo": null, + "severity": 0, "sink": "SQL Database", "source": "Web Server", "sourceFiles": [], @@ -1130,6 +1145,7 @@ "protocol": "", "response": null, "responseTo": null, + "severity": 0, "sink": "Lambda func", "source": "Web Server", "sourceFiles": [], @@ -1204,6 +1220,7 @@ "protocol": "", "response": null, "responseTo": null, + "severity": 0, "sink": "Web Server", "source": "SQL Database", "sourceFiles": [], @@ -1278,6 +1295,7 @@ "protocol": "", "response": null, "responseTo": null, + "severity": 0, "sink": "User", "source": "Web Server", "sourceFiles": [], @@ -1352,6 +1370,7 @@ "protocol": "", "response": null, "responseTo": null, + "severity": 0, "sink": "SQL Database", "source": "Task queue worker", "sourceFiles": [], @@ -1368,4 +1387,4 @@ "onDuplicates": "Action.NO_ACTION", "threatsExcluded": [], "threatsFile": "pytm/threatlib/threats.json" -} \ No newline at end of file +} diff --git a/tests/test_pytmfunc.py b/tests/test_pytmfunc.py index ab1a8d4..f5129d5 100644 --- a/tests/test_pytmfunc.py +++ b/tests/test_pytmfunc.py @@ -126,6 +126,43 @@ def test_dfd(self): self.maxDiff = None self.assertEqual(output, expected) + def test_dfd_colormap(self): + dir_path = os.path.dirname(os.path.realpath(__file__)) + install_path = os.path.dirname(os.path.realpath(pytm.__file__)) + + with open(os.path.join(dir_path, "dfd_colormap.dot")) as x: + expected = ( + x.read().strip().replace("INSTALL_PATH", os.path.dirname(install_path)) + ) + + random.seed(0) + + TM.reset() + tm = TM("my test tm", description="aaa") + internet = Boundary("Internet") + net = Boundary("Company net") + dmz = Boundary("dmz", inBoundary=net) + backend = Boundary("backend", inBoundary=net) + user = Actor("User", inBoundary=internet) + gw = Server("Gateway", inBoundary=dmz) + web = Server("Web Server", inBoundary=backend) + db = Datastore("SQL Database", inBoundary=backend, isEncryptedAtRest=True) + comment = Data("Comment", isStored=True) + + Dataflow(user, gw, "User enters comments (*)") + Dataflow(gw, web, "Request") + Dataflow(web, db, "Insert query with comments", data=[comment]) + Dataflow(db, web, "Retrieve comments") + Dataflow(web, gw, "Response") + Dataflow(gw, user, "Show comments (*)") + + self.assertTrue(tm.check()) + tm.resolve() + output = tm.dfd(colormap=True) + + self.maxDiff = None + self.assertEqual(output, expected) + def test_dfd_duplicates_ignore(self): dir_path = os.path.dirname(os.path.realpath(__file__)) install_path = os.path.dirname(os.path.realpath(pytm.__file__)) @@ -227,7 +264,7 @@ def test_resolve(self): resp = Dataflow(web, user, "Show comments (*)") TM._threats = [ - Threat(SID=klass, target=klass) + Threat(SID=klass, target=klass, severity="") for klass in ["Actor", "Server", "Datastore", "Dataflow"] ] tm.resolve() @@ -276,8 +313,8 @@ def test_overrides(self): resp = Dataflow(web, user, "Show comments (*)") TM._threats = [ - Threat(SID="Server", target="Server", condition="False"), - Threat(SID="Datastore", target="Datastore"), + Threat(SID="Server", severity="High", target="Server", condition="False"), + Threat(SID="Datastore", target="Datastore", severity="High"), ] tm.resolve() @@ -437,6 +474,7 @@ def test_multilevel_dfd(self): output = tm.dfd(levels={0}) with open(os.path.join(output_path, "0.txt"), "w") as x: x.write(output) + self.maxDiff = None self.assertEqual(output, level_0) TM.reset()
{label}