Skip to content

Commit

Permalink
try a more RAM saving approach by only reading in var matching lines …
Browse files Browse the repository at this point in the history
…at the 1st place
  • Loading branch information
jgriesfeller committed Aug 23, 2023
1 parent f9959c8 commit 8d26b34
Showing 1 changed file with 90 additions and 19 deletions.
109 changes: 90 additions & 19 deletions pyaerocom/io/read_airnow.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ class ReadAirNow(ReadUngriddedBase):
"value",
"institute",
]
# will be used millions of times, therefore store it
FILE_COL_ROW_NUMBER = len(FILE_COL_NAMES)

# row # for the variable
ROW_VAR_COL = 5

#: Mapping of columns in station metadata file to pyaerocom standard
STATION_META_MAP = {
Expand Down Expand Up @@ -347,19 +352,25 @@ def _read_files(self, files, vars_to_retrieve):
arrs = []
for i in tqdm(range(len(files))):
fp = files[i]
filedata = self._read_file(fp)
arr = filedata.values

# filedata = self._read_file(fp)
# old_arr = filedata.values
filedata = self.read_file(fp, vars_to_retrieve=vars_to_retrieve)
for i, var in enumerate(vars_to_retrieve):
cond = arr[:, varcol] == self.VAR_MAP[var]
if i == 0:
mask = cond
else:
mask = np.logical_or(mask, cond)
matches = mask.sum()
if matches:
vardata = arr[mask]
arrs.append(vardata)
if filedata[var]["lines_retrieved"] > 0:
arrs.append(np.array(filedata[var]["linedata"]))

# arr = filedata.values

# for i, var in enumerate(vars_to_retrieve):
# cond = arr[:, varcol] == self.VAR_MAP[var]
# if i == 0:
# mask = cond
# else:
# mask = np.logical_or(mask, cond)
# matches = mask.sum()
# if matches:
# vardata = arr[mask]
# arrs.append(vardata)
if len(arrs) == 0:
raise DataRetrievalError("None of the input variables could be found in input list")
return self._filedata_to_statlist(arrs, vars_to_retrieve)
Expand All @@ -382,7 +393,10 @@ def _filedata_to_statlist(self, arrs, vars_to_retrieve):
list of :class:`StationData` objects, one for each var and station.
"""
# doubling of RAM usage!
data = np.concatenate(arrs)
# so kill the input data right afterward
arrs = None

logger.info("Converting filedata to list of StationData")
stat_meta = self.station_metadata
Expand All @@ -396,7 +410,6 @@ def _filedata_to_statlist(self, arrs, vars_to_retrieve):
dtime = self.make_datetime64_array(data[:, 0], data[:, 1])
stats = []
for var in vars_to_retrieve:

# extract only variable data (should speed things up)
var_in_file = self.VAR_MAP[var]
mask = data[:, varcol] == var_in_file
Expand All @@ -416,11 +429,11 @@ def _filedata_to_statlist(self, arrs, vars_to_retrieve):
continue
statdata = subset[statmask]
timestamps = dtime_subset[statmask]
# timezone offsets
toffs = statdata[:, tzonecol].astype(int)
# timezone offsets (there's a half hour time zone!, so float)
toffs = statdata[:, tzonecol].astype(float)
stat = StationData(**stat_meta[stat_id])

vals = statdata[:, valcol]
vals = statdata[:, valcol].astype(float)
units = np.unique(statdata[:, unitcol])
# errors that did not occur in v0 but that may occur
assert len(units) == 1
Expand All @@ -435,15 +448,73 @@ def _filedata_to_statlist(self, arrs, vars_to_retrieve):
stats.append(stat)
return stats

def read_file(self):
def read_file(self, filename, vars_to_retrieve=None):
"""
This method is not implemented (but needs to be declared for template)
This method is returns just the raw content of a file as a dict
Parameters
----------
filename : str
absolute path to filename to read
vars_to_retrieve : :obj:`list`, optional
list of str with variable names to read. If None, use
:attr:`DEFAULT_VARS`
vars_as_series : bool
if True, the data columns of all variables in the result dictionary
are converted into pandas Series objects
Returns
-------
StationData
dict-like object containing results
Raises
------
NotImplementedError
"""
raise NotImplementedError("Not needed for these data since the format is unsuitable...")

# unfortunately the files have different encodings, so we have to try them
# on the entire file first
encoding = "utf_8"
try:
with open(filename, encoding=encoding) as infile:
linedata = infile.readlines()
except UnicodeDecodeError:
encoding = "cp863"
with open(filename, encoding=encoding) as infile:
linedata = infile.readlines()
except:
encoding = self.get_file_encoding(filename)
with open(filename, encoding=encoding) as infile:
linedata = infile.readlines()

if vars_to_retrieve is None:
vars_to_retrieve = self.DEFAULT_VARS
file_vars_to_retrieve = [self.VAR_MAP[x] for x in vars_to_retrieve]

ret_data = {}
for var in vars_to_retrieve:
ret_data[var] = {}
ret_data[var]["lines_retrieved"] = 0
ret_data[var]["linedata"] = []

for line in linedata:
# line_arr = line.strip().split(self.FILE_COL_DELIM)
line_arr = line.split(self.FILE_COL_DELIM)
# skip malformed lines
if len(line_arr) != self.FILE_COL_ROW_NUMBER:
continue
# skip lines that do not contain data of an interesting variable
if line_arr[self.ROW_VAR_COL] not in file_vars_to_retrieve:
continue

# make the numerical values numerical here already
# line_arr[4] = float(line_arr[4])
# line_arr[7] = float(line_arr[7])
ret_data[var]["linedata"].append(line_arr)
ret_data[var]["lines_retrieved"] += 1

return ret_data

def read(self, vars_to_retrieve=None, first_file=None, last_file=None):
"""
Expand Down

0 comments on commit 8d26b34

Please sign in to comment.