Skip to content

Commit

Permalink
Change sensitive fields to sensitive data types. We will now return a…
Browse files Browse the repository at this point in the history
… dictionary of sensitive data types, with a list of the matching field names.
  • Loading branch information
codestronger committed Jun 18, 2024
1 parent 7ec8290 commit 9205985
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 103 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# CHANGELOG


## Version v0.2.0
## Version v0.3.0

### Added
* Add warning when sensitive fields are detected by @codestronger in https://github.com/SuffolkLITLab/RateMyPDF/issues/25
Expand All @@ -12,7 +12,7 @@ N/A
### Fixed
N/A

**Full Changelog**: https://github.com/SuffolkLITLab/FormFyxer/compare/v0.2.0...v0.3.0a2
**Full Changelog**: https://github.com/SuffolkLITLab/FormFyxer/compare/v0.2.0...v0.3.0

## Version v0.2.0

Expand Down
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -448,17 +448,21 @@ An object grouping together similar field names.



### formfyxer.get_sensitive_fields(fields)
Given a list of fields, identify those related to sensitive information. Sensitive fields include Social Security Number(SSN)), Driver's License (DL), and account numbers.
### formfyxer.get_sensitive_data_types(fields, fields_old)
Given a list of fields, identify those related to sensitive information and return a dictionary with the sensitive fields grouped by type. A list of the old field names can also be provided. These fields should be in the same order. Passing the old field names allows the sensitive field algorithm to match more accurately. The return value will not contain the old field name, only the corresponding field name from the first parameter.

The sensitive field types are: Bank Account Number, Credit Card Number, Driver's License Number, and Social Security Number.
#### Parameters:
* **fields : List[str]** List of field names.
#### Returns:
List of sensitive fields found within the fields passed in.
#### Example:
```python
>>> import formfyxer
>>> formfyxer.get_sensitive_fields(["users1_name", "users1_address", "users1_ssn", "users1_routing_number"])
['Social Security Number', 'Bank Account Number']
>>> formfyxer.get_sensitive_data_types(["users1_name", "users1_address", "users1_ssn", "users1_routing_number"])
{'Social Security Number': ['users1_ssn'], 'Bank Account Number': ['users1_routing_number']}
>>> formfyxer.get_sensitive_data_types(["user_ban1", "user_credit_card_number", "user_cvc", "user_cdl", "user_social_security"], ["old_bank_account_number", "old_credit_card_number", "old_cvc", "old_drivers_license", "old_ssn"])
{'Bank Account Number': ['user_ban1'], 'Credit Card Number': ['user_credit_card_number', 'user_cvc'], "Driver's License Number": ['user_cdl'], 'Social Security Number': ['user_social_security']}
```
[back to top](#formfyxer)

Expand Down
166 changes: 96 additions & 70 deletions formfyxer/lit_explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
FieldType,
unlock_pdf_in_place,
is_tagged,
)
)

try:
from nltk.corpus import stopwords
Expand Down Expand Up @@ -131,18 +131,20 @@
with open(
os.path.join(os.path.dirname(__file__), "keys", "openai_key.txt"), "r"
) as in_file:
default_key:Optional[str] = in_file.read().rstrip()
default_key: Optional[str] = in_file.read().rstrip()
except:
default_key = None
try:
with open(
os.path.join(os.path.dirname(__file__), "keys", "openai_org.txt"), "r"
) as in_file:
default_org:Optional[str] = in_file.read().rstrip()
default_org: Optional[str] = in_file.read().rstrip()
except:
default_org = None
if default_key:
client:Optional[OpenAI] = OpenAI(api_key=default_key, organization=default_org or None)
client: Optional[OpenAI] = OpenAI(
api_key=default_key, organization=default_org or None
)
elif os.getenv("OPENAI_API_KEY"):
client = OpenAI()
else:
Expand All @@ -160,6 +162,7 @@
CURRENT_DIRECTORY, "data", "simplified_words.yml"
)


# This creates a timeout exception that can be triggered when something hangs too long.
class TimeoutException(Exception):
pass
Expand Down Expand Up @@ -429,18 +432,19 @@ def normalize_name(
not, to a snake_case variable name of appropriate length.
HACK: temporarily all we do is re-case it and normalize it using regex rules.
Will be replaced with call to LLM soon.
Will be replaced with call to LLM soon.
"""

if this_field not in included_fields:
this_field = re_case(this_field)
this_field = regex_norm_field(this_field)

if this_field in included_fields:
return f"*{this_field}", 0.01

return reformat_field(this_field, tools_token=tools_token), 0.5


# Take a list of AL variables and spits out suggested groupings. Here's what's going on:
#
# 1. It reads in a list of fields (e.g., `["user_name","user_address"]`)
Expand Down Expand Up @@ -652,23 +656,21 @@ def classify_field(field: FieldInfo, new_name: str) -> AnswerType:
return AnswerType.GATHERED


def get_adjusted_character_count(
field: FieldInfo
)-> float:
def get_adjusted_character_count(field: FieldInfo) -> float:
"""
Determines the bracketed length of an input field based on its max_length attribute,
returning a float representing the approximate length of the field content.
Determines the bracketed length of an input field based on its max_length attribute,
returning a float representing the approximate length of the field content.
The function chunks the answers into 5 different lengths (checkboxes, 2 words, short, medium, and long)
instead of directly using the character count, as forms can allocate different spaces
for the same data without considering the space the user actually needs.
Args:
field (FieldInfo): An object containing information about the input field,
field (FieldInfo): An object containing information about the input field,
including the "max_length" attribute.
Returns:
float: The approximate length of the field content, categorized into checkboxes, 2 words, short,
float: The approximate length of the field content, categorized into checkboxes, 2 words, short,
medium, or long based on the max_length attribute.
Examples:
Expand All @@ -694,10 +696,8 @@ def get_adjusted_character_count(
) # Anything over 10 lines probably needs a full page but form author skimped on space
if field["type"] != InputType.TEXT:
return ONE_WORD

if field["max_length"] <= ONE_LINE or (
field["max_length"] <= ONE_LINE * 2
):

if field["max_length"] <= ONE_LINE or (field["max_length"] <= ONE_LINE * 2):
return ONE_WORD * 2
elif field["max_length"] <= SHORT_ANSWER:
return SHORT_ANSWER
Expand Down Expand Up @@ -816,7 +816,12 @@ class OpenAiCreds(TypedDict):
key: str


def text_complete(prompt:str, max_tokens:int=500, creds: Optional[OpenAiCreds] = None, temperature:float=0) -> str:
def text_complete(
prompt: str,
max_tokens: int = 500,
creds: Optional[OpenAiCreds] = None,
temperature: float = 0,
) -> str:
"""Run a prompt via openAI's API and return the result.
Args:
Expand All @@ -836,16 +841,13 @@ def text_complete(prompt:str, max_tokens:int=500, creds: Optional[OpenAiCreds] =
response = openai_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": prompt
},
{"role": "system", "content": prompt},
],
temperature=temperature,
max_tokens=max_tokens,
top_p=1.0,
frequency_penalty=0.0,
presence_penalty=0.0
presence_penalty=0.0,
)
return str((response.choices[0].message.content or "").strip())
except Exception as ex:
Expand Down Expand Up @@ -972,38 +974,52 @@ def get_citations(text: str, tokenized_sentences: List[str]) -> List[str]:
return citations_with_context


def get_sensitive_fields(fields: List[str]) -> List[str]:
# NOTE: omitting "CID" for Credit Card IDs since it has a lot of false positives.
FIELD_PATTERNS = {
"Bank Account Number": [
r"account[\W_]*number",
r"ABA$",
r"routing[\W_]*number",
r"checking",
],
"Credit Card Number": [r"credit[\W_]*card", r"(CV[CDV]2?|CCV|CSC)"],
"Driver's License Number": [r"drivers[\W_]*license", r".?DL$"],
"Social Security Number": [r"social[\W_]*security[\W_]*number", r"SSN", r"TIN$"],
}
FIELD_REGEXES = {
name: re.compile("|".join(patterns), re.IGNORECASE | re.MULTILINE)
for name, patterns in FIELD_PATTERNS.items()
}


def get_sensitive_data_types(
fields: List[str], fields_old: Optional[List[str]] = None
) -> Dict[str, List[str]]:
"""
Given a list of fields, identify those related to sensitive information. Sensitive fields include
Social Security Number(SSN)), Driver's License (DL), and account numbers.
Given a list of fields, identify those related to sensitive information and return a dictionary with the sensitive
fields grouped by type. A list of the old field names can also be provided. These fields should be in the same
order. Passing the old field names allows the sensitive field algorithm to match more accurately. The return value
will not contain the old field name, only the corresponding field name from the first parameter.
The sensitive data types are: Bank Account Number, Credit Card Number, Driver's License Number, and Social Security
Number.
"""
# NOTE: omitting CID since it has a lot of false positives.
field_patterns = {
"Social Security Number": [
"social[\W_]*security[\W_]*number",
"SSN",
"TIN$"
],
"Bank Account Number": [
"account[\W_]*number",
"ABA$",
"routing[\W_]*number",
"checking"
],
"Credit Card Number": [
"credit[\W_]*card",
"(CV[CDV]2?|CCV|CSC)"
],
"Driver's License Number": [
"drivers[\W_]*license",
".?DL$"
]
}
text = "\n".join(fields)
field_regexes = {name: re.compile("|".join(patterns), re.IGNORECASE | re.MULTILINE) for name, patterns in field_patterns.items()}
sensitive_fields = [name for name, regex in field_regexes.items() if re.search(regex, text)]

return sensitive_fields
if fields_old is not None and len(fields) != len(fields_old):
raise ValueError(
"If provided, fields_old must have the same number of items as fields."
)

sensitive_data_types: Dict[str, List[str]] = {}
num_fields = len(fields)
for i, field in enumerate(fields):
for name, regex in FIELD_REGEXES.items():
if re.search(regex, field):
sensitive_data_types.setdefault(name, []).append(field)
elif fields_old is not None and re.search(regex, fields_old[i]):
sensitive_data_types.setdefault(name, []).append(field)
return sensitive_data_types


def substitute_phrases(
input_string: str, substitution_phrases: Dict[str, str]
Expand Down Expand Up @@ -1037,7 +1053,9 @@ def substitute_phrases(

# Find all matches for the substitution phrases
for original, replacement in sorted_phrases:
for match in re.finditer(r"\b" + re.escape(original) + r"\b", input_string, re.IGNORECASE):
for match in re.finditer(
r"\b" + re.escape(original) + r"\b", input_string, re.IGNORECASE
):
matches.append((match.start(), match.end(), replacement))

# Sort the matches based on their starting position
Expand Down Expand Up @@ -1141,7 +1159,11 @@ def parse_form(
except:
readability = -1
# Still attempt to re-evaluate if not using openai
if not original_text or (openai_creds and description == "abortthisnow.") or readability > 30:
if (
not original_text
or (openai_creds and description == "abortthisnow.")
or readability > 30
):
# We do not care what the PDF output is, doesn't add that much time
ocr_p = [
"ocrmypdf",
Expand Down Expand Up @@ -1231,10 +1253,7 @@ def parse_form(
classify_field(field, new_names[index])
for index, field in enumerate(field_types)
]
# NOTE: we send both the original and the cleaned up field names. There are cases where one or the other is cleaner.
# Since the sensitive fields are tagged as a group name rather than individual field names, it does no harm to send
# more variations to help detection.
sensitive_fields = get_sensitive_fields(field_names + new_names)
sensitive_data_types = get_sensitive_data_types(new_names, field_names)

slotin_count = sum(1 for c in classified if c == AnswerType.SLOT_IN)
gathered_count = sum(1 for c in classified if c == AnswerType.GATHERED)
Expand All @@ -1253,15 +1272,17 @@ def parse_form(
"category": cat,
"pages": pages_count,
"reading grade level": readability,
"time to answer": time_to_answer_form(field_types_and_sizes(ff), new_names)
if ff
else [-1, -1],
"time to answer": (
time_to_answer_form(field_types_and_sizes(ff), new_names)
if ff
else [-1, -1]
),
"list": nsmi,
"avg fields per page": f_per_page,
"fields": new_names,
"fields_conf": new_names_conf,
"fields_old": field_names,
"sensitive fields": sensitive_fields,
"sensitive data types": sensitive_data_types,
"text": text,
"original_text": original_text,
"number of sentences": sentence_count,
Expand All @@ -1274,16 +1295,21 @@ def parse_form(
"slotin percent": slotin_count / field_count if field_count > 0 else 0,
"gathered percent": gathered_count / field_count if field_count > 0 else 0,
"created percent": created_count / field_count if field_count > 0 else 0,
"third party percent": third_party_count / field_count
if field_count > 0
else 0,
"third party percent": (
third_party_count / field_count if field_count > 0 else 0
),
"passive voice percent": (
passive_sentences_count / sentence_count if sentence_count > 0 else 0
),
"citations per field": citation_count / field_count if field_count > 0 else 0,
"citation count": citation_count,
"all caps percent": all_caps_count / word_count,
"normalized characters per field": sum(get_adjusted_character_count(field) for field in field_types ) / field_count if ff else 0,
"normalized characters per field": (
sum(get_adjusted_character_count(field) for field in field_types)
/ field_count
if ff
else 0
),
"difficult words": difficult_words,
"difficult word count": difficult_word_count,
"difficult word percent": difficult_word_count / word_count,
Expand Down Expand Up @@ -1342,7 +1368,7 @@ def _form_complexity_per_metric(stats):
{"name": "pages", "weight": 2},
{"name": "citations per field", "weight": 1.2},
{"name": "avg fields per page", "weight": 1 / 8},
{"name": "normalized characters per field", "weight": 1/8},
{"name": "normalized characters per field", "weight": 1 / 8},
{"name": "sentences per page", "weight": 0.05},
# percents will have a higher weight, because they are between 0 and 1
{"name": "slotin percent", "weight": 2},
Expand All @@ -1360,11 +1386,11 @@ def weight(stats, metric):
weight = metric.get("weight") or 1
val = 0
if "clip" in metric:
val = min(max(stats.get(name,0), metric["clip"][0]), metric["clip"][1])
val = min(max(stats.get(name, 0), metric["clip"][0]), metric["clip"][1])
elif isinstance(stats.get(name), bool):
val = 1 if stats.get(name) else 0
else:
val = stats.get(name,0)
val = stats.get(name, 0)
if "intercept" in metric:
val -= metric["intercept"]
return val * weight
Expand Down
Loading

0 comments on commit 9205985

Please sign in to comment.