Skip to content

Commit

Permalink
Clean up FTIR file parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
GraysonBellamy committed Jun 20, 2024
1 parent 7a3e210 commit 7c4cf8d
Showing 1 changed file with 63 additions and 69 deletions.
132 changes: 63 additions & 69 deletions src/lab_etl/bruker_ftir_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Any
import numpy as np
from brukeropus.file import get_param_label
from lab_etl.util import set_metadata
from lab_etl.util import set_metadata, get_hash


def load_ftir_data(file_path: str) -> pa.Table:
Expand All @@ -19,7 +19,10 @@ def load_ftir_data(file_path: str) -> pa.Table:
"""
opus_file = read_opus(file_path)
if bool(opus_file):
# Get FTIR data as a pa.Table
table = get_ftir_data(opus_file)

# Define column metadata
col_meta = {
"wavelength": {"unit": "µm"},
"reflectance": {"unit": "a.u."},
Expand All @@ -29,7 +32,11 @@ def load_ftir_data(file_path: str) -> pa.Table:
"sample_spectrum": {"unit": "a.u."},
"sample_phase": {"unit": "a.u."},
}

# Get table metadata
tbl_meta = get_ftir_meta(opus_file)

# Set metadata to the table
table = set_metadata(table, col_meta=col_meta, tbl_meta=tbl_meta)
return table
else:
Expand All @@ -45,71 +52,42 @@ def get_ftir_data(file: OPUSFile) -> pa.Table:
Returns:
pa.Table: The FTIR data as a pa.Table.
"""
data = []
schema_list = []
if "r" in file.all_data_keys:
data = [
np.float64(file.r.wl), # Wavelength
np.float64(file.r.y), # Reflectance
]
schema_list = [
pa.field("wavelength", pa.float64()),
pa.field(file.r.label.lower().replace(" ", "_"), pa.float64()),
]
for key in file.all_data_keys:
if key != "r":
x = np.float64(getattr(file, key).wl)
y = np.float64(getattr(file, key).y)
y_new = np.interp(file.r.wl, x, y)
data.append(y_new)
schema_list.append(
pa.field(
getattr(file, key).label.lower().replace(" ", "_"), pa.float64()
)
)
schema = pa.schema(schema_list)
return pa.Table.from_arrays(data, schema=schema)
elif "a" in file.all_data_keys:

def construct_schema_and_data(
main_key: str, label: str
) -> tuple[list[np.ndarray], list[pa.field]]:
"""Constructs schema and data arrays based on the main data key and label."""
data = [
np.float64(file.a.wl), # Wavelength
np.float64(file.a.y), # Absorbance
np.float64(getattr(file, main_key).wl),
np.float64(getattr(file, main_key).y),
]
schema_list = [
pa.field("wavelength", pa.float64()),
pa.field(file.a.label.lower().replace(" ", "_"), pa.float64()),
]
for key in file.all_data_keys:
if key != "a":
x = np.float64(getattr(file, key).wl)
y = np.float64(getattr(file, key).y)
y_new = np.interp(file.a.wl, x, y)
data.append(y_new)
schema_list.append(
pa.field(
getattr(file, key).label.lower().replace(" ", "_"), pa.float64()
)
)
schema = pa.schema(schema_list)
return pa.Table.from_arrays(data, schema=schema)
elif "t" in file.all_data_keys:
data = [np.float64(file.t.wl), np.float64(file.t.y)]
schema_list = [
pa.field("wavelength", pa.float64()),
pa.field(file.t.label.lower().replace(" ", "_"), pa.float64()),
pa.field(label.lower().replace(" ", "_"), pa.float64()),
]

for key in file.all_data_keys:
if key != "t":
if key != main_key:
x = np.float64(getattr(file, key).wl)
y = np.float64(getattr(file, key).y)
y_new = np.interp(file.t.wl, x, y)
y_new = np.interp(getattr(file, main_key).wl, x, y)
data.append(y_new)
schema_list.append(
pa.field(
getattr(file, key).label.lower().replace(" ", "_"), pa.float64()
)
)
schema = pa.schema(schema_list)
return pa.Table.from_arrays(data, schema=schema)

return data, schema_list

for main_key in ("r", "a", "t"):
if main_key in file.all_data_keys:
label = getattr(file, main_key).label
data, schema_list = construct_schema_and_data(main_key, label)
schema = pa.schema(schema_list)
return pa.Table.from_arrays(data, schema=schema)

raise ValueError("No valid data keys found in the OPUS file")


def get_ftir_meta(file: OPUSFile) -> dict[Any, Any]:
Expand All @@ -124,26 +102,42 @@ def get_ftir_meta(file: OPUSFile) -> dict[Any, Any]:
"""
meta = {}
params = {
get_param_label(key).lower().replace(" ", "_"): value
for key, value in file.params.items()
}
rf_params = {
get_param_label(key).lower().replace(" ", "_"): value
for key, value in file.rf_params.items()
}

# Get file hash
hash = get_hash(file.filepath)

# Extract parameters with formatted keys
def format_key(key):
return get_param_label(key).lower().replace(" ", "_")

params = {format_key(key): value for key, value in file.params.items()}
rf_params = {format_key(key): value for key, value in file.rf_params.items()}

# Extract labels
labels = {
key.key: key.label.lower().replace(" ", "_") for key in file.iter_all_data()
}
meta.update({"data_labels": labels})
if "r" in labels:
meta.update({"data_performed": file.r.datetime.isoformat()})
elif "a" in labels:
meta.update({"data_performed": file.a.datetime.isoformat()})
elif "t" in labels:
meta.update({"data_performed": file.t.datetime.isoformat()})
meta.update({"parameters": params})
meta.update({"reference_parameters": rf_params})

# Update metadata dictionary
meta.update(
{
"data_labels": labels,
"parameters": params,
"reference_parameters": rf_params,
"file_hash": {
"file": file.filepath.split("/")[-1],
"method": "BLAKE2b",
"hash": hash,
},
}
)

# Determine data performed date
for data_type in ("r", "a", "t"):
if data_type in labels:
meta["data_performed"] = getattr(file, data_type).datetime.isoformat()
break

return meta


Expand Down

0 comments on commit 7c4cf8d

Please sign in to comment.