diff --git a/postprocessing/compare_comstock_to_cbecs.py.template b/postprocessing/compare_comstock_to_cbecs.py.template index 4a98f42df..8446ef443 100644 --- a/postprocessing/compare_comstock_to_cbecs.py.template +++ b/postprocessing/compare_comstock_to_cbecs.py.template @@ -32,12 +32,6 @@ def main(): stock_estimation_version='2024R2', # Only updated when a new stock estimate is published truth_data_version='v01' # Typically don't change this ) - - # Scale ComStock run to CBECS 2018 AND remove non-ComStock buildings from CBECS - comstock.add_weights_aportioned_by_stock_estimate(apportionment=stock_estimate) - comstock.create_national_aggregation() - comstock.create_geospatially_resolved_aggregations(comstock.STATE_ID, pretty_geo_col_name='state_id') - comstock.create_geospatially_resolved_aggregations(comstock.COUNTY_ID, pretty_geo_col_name='county_id') # CBECS cbecs = cspp.CBECS( @@ -46,19 +40,26 @@ def main(): color_hex='#009E73', # Color used to represent CBECS in plots reload_from_csv=False # True if CSV already made and want faster reload times ) - - # TODO Update past here including ensuring we can still apply CBECS weights on top of previous weights. - + + # Scale ComStock runs to the 'truth data' from StockE V3 estimates using bucket-based apportionment + comstock.add_weights_aportioned_by_stock_estimate(apportionment=stock_estimate) # Scale ComStock run to CBECS 2018 AND remove non-ComStock buildings from CBECS comstock.add_national_scaling_weights(cbecs, remove_non_comstock_bldg_types_from_cbecs=True) - comstock.calculate_weighted_columnal_values() - comstock.export_to_csv_wide() + # TODO This needs to be rewritten with safe column names, lazyframe usage, etc. + #comstock.calculate_weighted_columnal_values() + + # Uncomment whichever to write results to disk: + comstock.create_national_aggregation() + # comstock.create_geospatially_resolved_aggregations(comstock.STATE_ID, pretty_geo_col_name='state_id') + # comstock.create_geospatially_resolved_aggregations(comstock.COUNTY_ID, pretty_geo_col_name='county_id') # Make a comparison by passing in a list of CBECs and ComStock runs to compare # upgrade_id can be 'All' or the upgrade number + comstock.create_plotting_lazyframe() comp = cspp.ComStockToCBECSComparison(cbecs_list=[cbecs], comstock_list=[comstock], upgrade_id='All',make_comparison_plots=True) comp.export_to_csv_wide() + # Code to execute the script if __name__ == "__main__": diff --git a/postprocessing/compare_runs.py.template b/postprocessing/compare_runs.py.template index e90cb22f6..c1a92d13d 100644 --- a/postprocessing/compare_runs.py.template +++ b/postprocessing/compare_runs.py.template @@ -12,14 +12,14 @@ logger = logging.getLogger(__name__) def main(): # First ComStock run comstock_a = cspp.ComStock( - s3_base_dir='eulp/comstock_fy22', # If run not on S3, download results_up**.parquet manually - comstock_run_name='com_v15_cooking', # Name of the run on S3 - comstock_run_version='v15', # Use whatever you want to see in plot and folder names + s3_base_dir='eulp/euss_com', # If run not on S3, download results_up**.parquet manually + comstock_run_name='sampling_lighting_11079_1', # Name of the run on S3 + comstock_run_version='sampling_lighting_11079_1', # Use whatever you want to see in plot and folder names comstock_year=2018, # Typically don't change this athena_table_name=None, # Typically don't change this truth_data_version='v01', # Typically don't change this buildstock_csv_name='buildstock.csv', # Download buildstock.csv manually - acceptable_failure_percentage=0.05, # Can increase this when testing and high failure are OK + acceptable_failure_percentage=0.25, # Can increase this when testing and high failure are OK drop_failed_runs=True, # False if you want to evaluate which runs failed in raw output data color_hex='#0072B2', # Color used to represent this run in plots skip_missing_columns=True, # False if you want to ensure you have all data specified for export @@ -29,14 +29,14 @@ def main(): # Second ComStock run comstock_b = cspp.ComStock( - s3_base_dir='eulp/comstock_fy22', # If run not on S3, download results_up**.parquet manually - comstock_run_name='com_v16_windows_lighting', # Name of the run on S3 - comstock_run_version='v16', # Use whatever you want to see in plot and folder names + s3_base_dir='eulp/euss_com', # If run not on S3, download results_up**.parquet manually + comstock_run_name='cycle_4_sampling_test_rand_985932_20240321', # Name of the run on S3 + comstock_run_version='new_sampling_test', # Use whatever you want to see in plot and folder names comstock_year=2018, # Typically don't change this - athena_table_name=None, # Typically don't change this + athena_table_name='rand_985932_20240321', # Typically same as comstock_run_name or None truth_data_version='v01', # Typically don't change this - buildstock_csv_name='buildstock.csv', # Download buildstock.csv manually - acceptable_failure_percentage=0.05, # Can increase this when testing and high failure are OK + buildstock_csv_name='rand_985932_sampling_buildstock.csv', # Download buildstock.csv manually + acceptable_failure_percentage=0.9, # Can increase this when testing and high failure are OK drop_failed_runs=True, # False if you want to evaluate which runs failed in raw output data color_hex='#56B4E9', # Color used to represent this run in plots skip_missing_columns=True, # False if you want to ensure you have all data specified for export @@ -44,6 +44,12 @@ def main(): include_upgrades=False # False if not looking at upgrades ) + # Stock Estimation for Apportionment: + stock_estimate = cspp.Apportion( + stock_estimation_version='2024R2', # Only updated when a new stock estimate is published + truth_data_version='v01' # Typically don't change this + ) + # CBECS cbecs = cspp.CBECS( cbecs_year=2018, # 2012 and 2018 currently available @@ -52,19 +58,19 @@ def main(): reload_from_csv=False # True if CSV already made and want faster reload times ) - # Scale both ComStock runs to CBECS 2018 AND remove non-ComStock buildings from CBECS + # First scale ComStock runs to the 'truth data' from StockE V3 estimates using bucket-based apportionment + # Then scale both ComStock runs to CBECS 2018 AND remove non-ComStock buildings from CBECS # This is how weights in the models are set to represent national energy consumption + comstock_a.add_weights_aportioned_by_stock_estimate(apportionment=stock_estimate) comstock_a.add_national_scaling_weights(cbecs, remove_non_comstock_bldg_types_from_cbecs=True) + comstock_b.add_weights_aportioned_by_stock_estimate(apportionment=stock_estimate) comstock_b.add_national_scaling_weights(cbecs, remove_non_comstock_bldg_types_from_cbecs=True) - # Uncomment this to correct gas consumption for a ComStock run to match CBECS - # Don't typically want to do this - # comstock_a.correct_comstock_gas_to_match_cbecs(cbecs) - # Export CBECS and ComStock data to wide and long formats for Tableau and to skip processing later cbecs.export_to_csv_wide() # May comment this out if CSV output isn't needed - comstock_a.export_to_csv_wide() # May comment this out if CSV output isn't needed - comstock_b.export_to_csv_wide() # May comment this out if CSV output isn't needed + # comstock_a.create_national_aggregation() # May comment this out if CSV output isn't needed + # comstock_b.create_national_aggregation() # May comment this out if CSV output isn't needed + # TODO This (long CSV export) is not yet re-implemented # comstock_a.export_to_csv_long() # Long format useful for stacking end uses and fuels # comstock_b.export_to_csv_long() # Long format useful for stacking end uses and fuels @@ -73,7 +79,7 @@ def main(): cbecs_list=[cbecs], comstock_list = [comstock_a, comstock_b], make_comparison_plots=True - ) + ) # Export the comparison data to wide format for Tableau comparison.export_to_csv_wide() diff --git a/postprocessing/compare_upgrades.py.template b/postprocessing/compare_upgrades.py.template index 4f724032f..888bce4e8 100644 --- a/postprocessing/compare_upgrades.py.template +++ b/postprocessing/compare_upgrades.py.template @@ -1,63 +1,73 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -import logging - -import comstockpostproc as cspp - - -logging.basicConfig(level='INFO') # Use DEBUG, INFO, or WARNING -logger = logging.getLogger(__name__) - -def main(): - # ComStock run - comstock = cspp.ComStock( - s3_base_dir='eulp/euss_com', # If run not on S3, download results_up**.parquet manually - comstock_run_name='hprtu_stdperf_fan_test_10k', # Name of the run on S3 - comstock_run_version='hprtu_stdperf_fan_test_10k', # Use whatever you want to see in plot and folder names - comstock_year=2018, # Typically don't change this - athena_table_name=None, # Typically don't change this - truth_data_version='v01', # Typically don't change this - buildstock_csv_name='buildstock.csv', # Download buildstock.csv manually - acceptable_failure_percentage=0.025, # Can increase this when testing and high failure are OK - drop_failed_runs=True, # False if you want to evaluate which runs failed in raw output data - color_hex='#0072B2', # Color used to represent this run in plots - skip_missing_columns=True, # False if you want to ensure you have all data specified for export - reload_from_csv=False, # True if CSV already made and want faster reload times - include_upgrades=True, # False if not looking at upgrades - upgrade_ids_to_skip=[], # Use [1, 3] etc. to exclude certain upgrades - make_timeseries_plots=True, - states={ - #'MN': 'Minnesota', # specify state to use for timeseries plots in dictionary format. State ID must correspond correctly. - 'MA':'Massachusetts', - 'OR': 'Oregon', - 'LA': 'Louisiana', - #'AZ': 'Arizona', - #'TN': 'Tennessee' - }, - upgrade_ids_for_comparison={} # Use {'':[0,1,2]}; add as many upgrade IDs as needed, but plots look strange over 5 - ) - - # CBECS - cbecs = cspp.CBECS( - cbecs_year=2018, # 2012 and 2018 currently available - truth_data_version='v01', # Typically don't change this - color_hex='#009E73', # Color used to represent CBECS in plots - reload_from_csv=False # True if CSV already made and want faster reload times - ) - - # Scale ComStock run to CBECS 2018 AND remove non-ComStock buildings from CBECS - # This is how weights in the models are set to represent national energy consumption - comstock.add_national_scaling_weights(cbecs, remove_non_comstock_bldg_types_from_cbecs=True) - - # Export CBECS and ComStock data to wide and long formats for Tableau and to skip processing later - cbecs.export_to_csv_wide() # May comment this out after run once - comstock.export_to_csv_wide() # May comment this out after run once - # comstock.export_to_csv_long() # Long format useful for stacking end uses and fuels - - # Create measure run comparisons; only use if run has measures - comparison = cspp.ComStockMeasureComparison(comstock, states=comstock.states, make_comparison_plots = comstock.make_comparison_plots, make_timeseries_plots = comstock.make_timeseries_plots) - -# Code to execute the script -if __name__=="__main__": - main() +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import logging + +import comstockpostproc as cspp + + +logging.basicConfig(level='INFO') # Use DEBUG, INFO, or WARNING +logger = logging.getLogger(__name__) + +def main(): + # ComStock run + comstock = cspp.ComStock( + s3_base_dir='eulp/euss_com', # If run not on S3, download results_up**.parquet manually + comstock_run_name='sampling_lighting_11079_1', # Name of the run on S3 + comstock_run_version='sampling_lighting_11079_1', # Use whatever you want to see in plot and folder names + comstock_year=2018, # Typically don't change this + athena_table_name=None, # Typically don't change this + truth_data_version='v01', # Typically don't change this + buildstock_csv_name='buildstock.csv', # Download buildstock.csv manually + acceptable_failure_percentage=0.25, # Can increase this when testing and high failure are OK + drop_failed_runs=True, # False if you want to evaluate which runs failed in raw output data + color_hex='#0072B2', # Color used to represent this run in plots + skip_missing_columns=True, # False if you want to ensure you have all data specified for export + reload_from_csv=False, # True if CSV already made and want faster reload times + include_upgrades=True, # False if not looking at upgrades + upgrade_ids_to_skip=[], # Use [1, 3] etc. to exclude certain upgrades + make_timeseries_plots=False, + states={ + #'MN': 'Minnesota', # specify state to use for timeseries plots in dictionary format. State ID must correspond correctly. + 'MA':'Massachusetts', + #'OR': 'Oregon', + #'LA': 'Louisiana', + #'AZ': 'Arizona', + #'TN': 'Tennessee' + }, + upgrade_ids_for_comparison={} # Use {'':[0,1,2]}; add as many upgrade IDs as needed, but plots look strange over 5 + ) + + # Stock Estimation for Apportionment: + stock_estimate = cspp.Apportion( + stock_estimation_version='2024R2', # Only updated when a new stock estimate is published + truth_data_version='v01' # Typically don't change this + ) + + # CBECS + cbecs = cspp.CBECS( + cbecs_year=2018, # 2012 and 2018 currently available + truth_data_version='v01', # Typically don't change this + color_hex='#009E73', # Color used to represent CBECS in plots + reload_from_csv=False # True if CSV already made and want faster reload times + ) + + # Scale ComStock runs to the 'truth data' from StockE V3 estimates using bucket-based apportionment + comstock.add_weights_aportioned_by_stock_estimate(apportionment=stock_estimate) + # Scale ComStock run to CBECS 2018 AND remove non-ComStock buildings from CBECS + comstock.add_national_scaling_weights(cbecs, remove_non_comstock_bldg_types_from_cbecs=True) + + # Export CBECS and ComStock data to wide and long formats for Tableau and to skip processing later + # cbecs.export_to_csv_wide() # May comment this out after run once + # comstock.create_national_aggregation() + # comstock.create_geospatially_resolved_aggregations(comstock.STATE_ID, pretty_geo_col_name='state_id') + # comstock.create_geospatially_resolved_aggregations(comstock.COUNTY_ID, pretty_geo_col_name='county_id') + # TODO Long is def not working as expected anymore... + # comstock.export_to_csv_long() # Long format useful for stacking end uses and fuels + + # Create measure run comparisons; only use if run has measures + comparison = cspp.ComStockMeasureComparison(comstock, states=comstock.states, make_comparison_plots = comstock.make_comparison_plots, make_timeseries_plots = comstock.make_timeseries_plots) + +# Code to execute the script +if __name__=="__main__": + main() diff --git a/postprocessing/comstockpostproc/cbecs.py b/postprocessing/comstockpostproc/cbecs.py index de4005c93..7e6561a17 100644 --- a/postprocessing/comstockpostproc/cbecs.py +++ b/postprocessing/comstockpostproc/cbecs.py @@ -483,4 +483,9 @@ def export_to_csv_wide(self): file_name = f'CBECS wide.csv' file_path = os.path.join(self.output_dir, file_name) - self.data.to_csv(file_path, index=False) + try: + self.data.sink_csv(file_path) + except pl.exceptions.InvalidOperationError: + logger.warn('Warning - sink_csv not supported for metadata write in current polars version') + logger.warn('Falling back to .collect.write_csv') + self.data.collect().write_csv(file_path) diff --git a/postprocessing/comstockpostproc/comstock.py b/postprocessing/comstockpostproc/comstock.py index df2ff0a20..d6db25b87 100644 --- a/postprocessing/comstockpostproc/comstock.py +++ b/postprocessing/comstockpostproc/comstock.py @@ -8,6 +8,7 @@ import glob import json import logging +import botocore.exceptions import numpy as np import pandas as pd import polars as pl @@ -98,6 +99,7 @@ def __init__(self, s3_base_dir, comstock_run_name, comstock_run_version, comstoc self.rename_upgrades_file_name = 'rename_upgrades.json' self.athena_table_name = athena_table_name self.data = None + self.plotting_data = None self.monthly_data = None self.monthly_data_gap = None self.ami_timeseries_data = None @@ -115,7 +117,11 @@ def __init__(self, s3_base_dir, comstock_run_name, comstock_run_version, comstoc self.unweighted_weighted_map = {} self.dropping_columns = [] self.cached_parquet = [] # List of parquet files to reload and export + # TODO our currect credential setup aren't playing well with this approach but does with the s3 ServiceResource + # We are currently unable to list the HeadObject for automatically uploaded data + # Consider migrating all usage to s3 ServiceResource instead. self.s3_client = boto3.client('s3', config=botocore.client.Config(max_pool_connections=50)) + self.s3_resource = boto3.resource('s3') if self.athena_table_name is not None: self.athena_client = BuildStockQuery(workgroup='eulp', db_name='enduse', @@ -125,6 +131,7 @@ def __init__(self, s3_base_dir, comstock_run_name, comstock_run_version, comstoc self.make_comparison_plots = make_comparison_plots self.make_timeseries_plots = make_timeseries_plots self.APPORTIONED = False # Including this for some basic control logic in which methods are allowed + self.CBECS_WEIGHTS_APPLIED = False # Including this for some additional control logic about method order logger.info(f'Creating {self.dataset_name}') # Make directories @@ -135,7 +142,10 @@ def __init__(self, s3_base_dir, comstock_run_name, comstock_run_version, comstoc # S3 location self.s3_inpath = None if s3_base_dir is not None: - self.s3_inpath = f"s3://{s3_base_dir}/{self.comstock_run_name}/{self.comstock_run_name}" + if self.athena_table_name: + self.s3_inpath = f"s3://{s3_base_dir}/{self.comstock_run_name}/{self.athena_table_name}" + else: + self.s3_inpath = f"s3://{s3_base_dir}/{self.comstock_run_name}/{self.comstock_run_name}" # Load and transform data, preserving all columns self.download_data() @@ -238,13 +248,26 @@ def __init__(self, s3_base_dir, comstock_run_name, comstock_run_version, comstoc # logger.debug(c) def download_data(self): + # Get data on the s3 resource to download data from: + if self.s3_inpath is None: + logger.info('The s3 path provided in the ComStock object initalization is invalid.') + s3_path_items = self.s3_inpath.lstrip('s3://').split('/') + bucket_name = s3_path_items[0] + prfx = '/'.join(s3_path_items[1:]) + # baseline/results_up00.parquet results_data_path = os.path.join(self.data_dir, self.results_file_name) if not os.path.exists(results_data_path): - s3_path = f"{self.s3_inpath}/baseline/{self.results_file_name}" - logger.info(f'Downloading: {s3_path}') - data = pd.read_parquet(s3_path, engine="pyarrow") - data.to_parquet(results_data_path) + baseline_parquet_path = f"{prfx}/baseline/{self.results_file_name}" + try: + self.s3_resource.Object(bucket_name, baseline_parquet_path).load() + except botocore.exceptions.ClientError: + logger.error(f'Could not find results_up00.parquet at {baseline_parquet_path} in bucket {bucket_name}') + raise FileNotFoundError( + f'Missing results_up00.parquet file. Manually download and place at {results_data_path}' + ) + logger.info(f'Downloading {baseline_parquet_path} from the {bucket_name} bucket') + self.s3_resource.Object(bucket_name, baseline_parquet_path).download_file(results_data_path) # upgrades/upgrade=*/results_up*.parquet if self.include_upgrades: @@ -253,13 +276,10 @@ def download_data(self): logger.info('The s3 path passed to the constructor is invalid, ' 'cannot check for results_up**.parquet files to download') else: - s3_path_items = self.s3_inpath.lstrip('s3://').split('/') - bucket_name = s3_path_items[0] - prfx = '/'.join(s3_path_items[1:]) - prfx = f'{prfx}/upgrades' - resp = self.s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prfx) - for obj in resp.get("Contents"): - obj_path = obj['Key'] + upgrade_parquet_path = f'{prfx}/upgrades' + resp = self.s3_resource.Bucket(bucket_name).objects.filter(Prefix=upgrade_parquet_path).all() + for obj in list(resp): + obj_path = obj.key obj_name = obj_path.split('/')[-1] m = re.search('results_up(.*).parquet', obj_name) if not m: @@ -270,21 +290,26 @@ def download_data(self): continue results_data_path = os.path.join(self.data_dir, obj_name) if not os.path.exists(results_data_path): - s3_path = f"s3://{bucket_name}/{obj_path}" - logger.info(f'Downloading: {s3_path}') - data = pd.read_parquet(s3_path, engine="pyarrow") - data.to_parquet(results_data_path) + logger.info(f'Downloading {obj_path} from the {bucket_name} bucket') + self.s3_resource.Object(bucket_name, obj_path).download_file(results_data_path) # buildstock.csv - #TODO: handle the missing buildstock.csv in a more robust way #1. check the file in the data_dir #2. if not found, download from S3 #3. if not found in S3, raise an error - buildstock_csv_path = os.path.join(self.data_dir, self.buildstock_file_name) if not os.path.exists(buildstock_csv_path): - raise FileNotFoundError( - f'Missing buildstock.csv file. Manually download and place in {os.path.abspath(self.data_dir)}') + s3_path = f"{self.s3_inpath}/buildstock_csv/buildstock.csv" + bldstk_s3_path = f'{prfx}/buildstock_csv/buildstock.csv' + try: + self.s3_resource.Object(bucket_name, bldstk_s3_path).load() + except botocore.exceptions.ClientError: + logger.error(f'Could not find buildstock.csv at {bldstk_s3_path} in bucket {bucket_name}') + raise FileNotFoundError( + f'Missing buildstock.csv file. Manually download and place at {buildstock_csv_path}' + ) + logger.info(f'Downloading {bldstk_s3_path} from the {bucket_name} bucket') + self.s3_resource.Object(bucket_name, bldstk_s3_path).download_file(buildstock_csv_path) # EJSCREEN ejscreen_data_path = os.path.join(self.truth_data_dir, self.ejscreen_file_name) @@ -1431,7 +1456,7 @@ def diff_lists(li1, li2): # These geography columns should be close together for convenience # but have no obvious pattern to match against possible_geog_cols = [ - 'in.ashrae_iecc_climate_zone_2004', + 'in.ashrae_iecc_climate_zone_2006', 'in.building_america_climate_zone', 'in.cambium_grid_region', 'in.census_division_name', @@ -1818,13 +1843,11 @@ def add_building_type_group(self): self.data = self.data.with_columns((pl.col(self.BLDG_TYPE).cast(pl.Utf8).replace(bldg_type_groups, default=None)).alias(self.BLDG_TYPE_GROUP)) self.data = self.data.with_columns(pl.col(self.BLDG_TYPE_GROUP).cast(pl.Categorical)) - def add_national_scaling_weights(self, cbecs: CBECS, remove_non_comstock_bldg_types_from_cbecs: bool): # Remove CBECS entries for building types not included in the ComStock run # comstock_bldg_types = self.data[self.BLDG_TYPE].unique() # assert "calc.weighted.utility_bills.total_mean_bill..billion_usd" in self.data.columns assert isinstance(self.data, pl.LazyFrame) - comstock_bldg_types: set = set(self.data.select(self.BLDG_TYPE).unique().collect().to_pandas()[self.BLDG_TYPE].tolist()) cbecs.data: pd.DataFrame = cbecs.data.collect().to_pandas() @@ -1856,15 +1879,24 @@ def add_national_scaling_weights(self, cbecs: CBECS, remove_non_comstock_bldg_ty logger.debug(cbecs_bldg_type_sqft) # Total sqft of each building type, ComStock - baseline_data: pl.LazyFrame = self.data.filter(pl.col(self.UPGRADE_NAME) == self.BASE_NAME).clone() - comstock_bldg_type_sqft: pl.DataFrame = baseline_data.group_by(self.BLDG_TYPE).agg([pl.col(self.FLR_AREA).sum()]).collect() - comstock_bldg_type_sqft: pd.DataFrame = comstock_bldg_type_sqft.to_pandas().set_index(self.BLDG_TYPE) + if self.APPORTIONED: + # Since this is a national calculation, groupby on building id and upgrade only in foreign key table + national_agg = self.fkt.filter(pl.col(self.UPGRADE_ID) == 0).clone() + national_agg = national_agg.select([pl.col(self.BLDG_WEIGHT), pl.col(self.BLDG_ID)]).groupby(pl.col(self.BLDG_ID)).sum() + cs_data = self.data.filter(pl.col(self.UPGRADE_NAME) == self.BASE_NAME).select([pl.col(self.BLDG_ID), pl.col(self.FLR_AREA), pl.col(self.BLDG_TYPE)]).clone() + national_agg = national_agg.join(cs_data, on=pl.col(self.BLDG_ID)) + national_agg = national_agg.with_columns((pl.col(self.BLDG_WEIGHT) * pl.col(self.FLR_AREA)).alias(self.FLR_AREA)) + national_agg = national_agg.select([pl.col(self.BLDG_TYPE), pl.col(self.FLR_AREA)]).groupby(pl.col(self.BLDG_TYPE)).sum().collect() + comstock_bldg_type_sqft: pd.DataFrame = national_agg.to_pandas().set_index(self.BLDG_TYPE) + else: + baseline_data: pl.LazyFrame = self.data.filter(pl.col(self.UPGRADE_NAME) == self.BASE_NAME).clone() + comstock_bldg_type_sqft: pl.DataFrame = baseline_data.group_by(self.BLDG_TYPE).agg([pl.col(self.FLR_AREA).sum()]).collect() + comstock_bldg_type_sqft: pd.DataFrame = comstock_bldg_type_sqft.to_pandas().set_index(self.BLDG_TYPE) logger.debug('ComStock Baseline floor area by building type') logger.debug(comstock_bldg_type_sqft) # Calculate scaling factor for each building type based on floor area (not building/model count) sf = pd.concat([cbecs_bldg_type_sqft, comstock_bldg_type_sqft], axis = 1) - logger.info("sf wt_area_col shape: ", sf[wt_area_col].shape) sf[self.BLDG_WEIGHT] = sf[wt_area_col].astype(float) / sf[self.FLR_AREA].astype(float) bldg_type_scale_factors = sf[self.BLDG_WEIGHT].to_dict() if np.nan in bldg_type_scale_factors: @@ -1874,18 +1906,31 @@ def add_national_scaling_weights(self, cbecs: CBECS, remove_non_comstock_bldg_ty del bldg_type_scale_factors[np.nan] # Report any scaling factor greater than some threshold. + if self.APPORTIONED: + logger.info(f'{self.dataset_name} post-apportionment scaling factors to CBECS floor area:') + for bldg_type, scaling_factor in bldg_type_scale_factors.items(): + logger.info(f'--- {bldg_type}: {round(scaling_factor, 2)}') + if scaling_factor > 1.3: + wrn_msg = (f'The scaling factor for {bldg_type} is high, which indicates something unexpected ' + 'in the apportionment step, except for Healthcare where this is expected. Please review.') + logger.warning(wrn_msg) + elif scaling_factor < 0.6: + wrn_msg = (f'The scaling factor for {bldg_type} is low, which indicates something unexpected ' + 'in the apportionment step. Please review.') + logger.warning(wrn_msg) + else: # In situations with high failure rates of a single building, # the scaling factor will be high, and the results are likely to be # heavily skewed toward the few successful simulations of that building type. - logger.info(f'{self.dataset_name} scaling factors - scale ComStock results to CBECS floor area') - for bldg_type, scaling_factor in bldg_type_scale_factors.items(): - logger.info(f'--- {bldg_type}: {round(scaling_factor, 2)}') - if scaling_factor > 15: - wrn_msg = (f'The scaling factor for {bldg_type} is high, which indicates either a test run <350k models ' - f'or significant failed runs for this building type. Comparisons to CBECS will likely be invalid.') - logger.warning(wrn_msg) - - # For reference/comparison, here are the weights from the ComStock V1 runs + logger.info(f'{self.dataset_name} scaling factors - scale ComStock results to CBECS floor area') + for bldg_type, scaling_factor in bldg_type_scale_factors.items(): + logger.info(f'--- {bldg_type}: {round(scaling_factor, 2)}') + if scaling_factor > 15: + wrn_msg = (f'The scaling factor for {bldg_type} is high, which indicates either a test run <350k models ' + f'or significant failed runs for this building type. Comparisons to CBECS will likely be invalid.') + logger.warning(wrn_msg) + + # For reference/comparison, here are the weights from the ComStock Pre-EUSS 2024R2 runs # PROD_V1_COMSTOCK_WEIGHTS = { # 'small_office': 9.625838016683277, # 'medium_office': 9.625838016683277, @@ -1902,24 +1947,49 @@ def add_national_scaling_weights(self, cbecs: CBECS, remove_non_comstock_bldg_ty # 'strip_mall': 2.1106205675100735, # 'warehouse': 2.1086048544461304 # } + # Here are the 'nominal' weights from Sampling V2 implementation (EUSS 2024 R2 on): + # TODO Add weights here # Assign scaling factors to each ComStock run self.building_type_weights = bldg_type_scale_factors - self.data = self.data.with_columns((pl.col(self.BLDG_TYPE).cast(pl.Utf8).replace(bldg_type_scale_factors, default=None)).alias(self.BLDG_WEIGHT)) + if self.APPORTIONED: + cbecs_weights = pl.LazyFrame({self.BLDG_TYPE: bldg_type_scale_factors.keys(), 'cbecs_weight': bldg_type_scale_factors.values()}) + self.fkt = self.fkt.join(cbecs_weights, on=pl.col(self.BLDG_TYPE)) + self.fkt = self.fkt.with_columns((pl.col(self.BLDG_WEIGHT) * pl.col('cbecs_weight')).alias(self.BLDG_WEIGHT)) + self.fkt = self.fkt.drop(self.BLDG_TYPE, 'cbecs_weight') + else: + self.data = self.data.with_columns((pl.col(self.BLDG_TYPE).cast(pl.Utf8).replace(bldg_type_scale_factors, default=None)).alias(self.BLDG_WEIGHT)) assert isinstance(cbecs.data, pd.DataFrame) cbecs.data = pl.from_pandas(cbecs.data).lazy() assert isinstance(cbecs.data, pl.LazyFrame) + self.CBECS_WEIGHTS_APPLIED = True return bldg_type_scale_factors def _calculate_weighted_columnal_values(self, input_lf: pl.LazyFrame): # Apply the weights to the columns - input_lf = self.add_weighted_area_energy_savings_columns(input_lf) #compute out the weighted value, based on the unweighted columns and the weights. - assert isinstance(self.data, pl.LazyFrame) + #compute out the weighted value, based on the unweighted columns and the weights. + input_lf = self.add_weighted_area_energy_savings_columns(input_lf) + assert isinstance(input_lf, pl.LazyFrame) return input_lf + def create_plotting_lazyframe(self): + plotting_aggregation = self.fkt.clone() + plotting_aggregation = plotting_aggregation.select( + [pl.col(self.BLDG_WEIGHT), pl.col(self.UPGRADE_ID), pl.col(self.BLDG_ID), pl.col(self.CEN_DIV)] + ).groupby([pl.col(self.UPGRADE_ID), pl.col(self.BLDG_ID), pl.col(self.CEN_DIV)]).sum() + plotting_aggregation = plotting_aggregation.join(self.data, on=[pl.col(self.UPGRADE_ID), pl.col(self.BLDG_ID)]) + plotting_aggregation = self._calculate_weighted_columnal_values(plotting_aggregation) + plotting_aggregation = self.reorder_data_columns(plotting_aggregation) + plotting_aggregation = self.add_sightglass_column_units(plotting_aggregation) + assert isinstance(plotting_aggregation, pl.LazyFrame) + self.plotting_data = plotting_aggregation + def create_national_aggregation(self): - national_aggregation = self.fkt.select([pl.col('weight'), pl.col(self.UPGRADE_ID), pl.col(self.BLDG_ID)]).groupby([pl.col(self.UPGRADE_ID), pl.col(self.BLDG_ID)]).sum() + national_aggregation = self.fkt.clone() + national_aggregation = national_aggregation.select( + [pl.col(self.BLDG_WEIGHT), pl.col(self.UPGRADE_ID), pl.col(self.BLDG_ID)] + ).groupby([pl.col(self.UPGRADE_ID), pl.col(self.BLDG_ID)]).sum() national_aggregation = national_aggregation.join(self.data, on=[pl.col(self.UPGRADE_ID), pl.col(self.BLDG_ID)]) national_aggregation = self._calculate_weighted_columnal_values(national_aggregation) @@ -1928,7 +1998,6 @@ def create_national_aggregation(self): # self.get_scaled_comstock_monthly_consumption_by_state(national_aggregation) # Reorder the columns before exporting - # TODO works on self.data national_aggregation = self.reorder_data_columns(national_aggregation) assert isinstance(national_aggregation, pl.LazyFrame) @@ -1961,13 +2030,22 @@ def create_national_aggregation(self): # Export dictionaries corresponding to the exported columns self.export_data_and_enumeration_dictionary() + # Return the nationally aggregated dataframe for use by plotting, etc. + return national_aggregation + def create_geospatially_resolved_aggregations(self, geographic_col_name, pretty_geo_col_name=False): - supported_geographies = [self.STATE_ID, self.COUNTY_ID, self.TRACT_ID, self.CZ_ASHRAE] + # Ensure the geography is supported + supported_geographies = [self.CEN_DIV, self.STATE_ID, self.COUNTY_ID, self.TRACT_ID, self.CZ_ASHRAE] if geographic_col_name not in [self.STATE_ID, self.COUNTY_ID, self.TRACT_ID, self.CZ_ASHRAE]: logger.error(f'Requeted geographic aggregation {geographic_col_name} not in supported geographies.') logger.error(f'Currently supported geographies are {supported_geographies}') raise RuntimeError('Unsupported geography selected for geospatial aggregation') - spatial_aggregation = self.fkt.select([pl.col('weight'), pl.col(self.UPGRADE_ID), pl.col(self.BLDG_ID), pl.col(geographic_col_name)]).groupby([pl.col(self.UPGRADE_ID), pl.col(self.BLDG_ID), pl.col(geographic_col_name)]).sum() + + # Create the spatial aggregation + spatial_aggregation = self.fkt.clone() + spatial_aggregation = spatial_aggregation.select( + [pl.col('weight'), pl.col(self.UPGRADE_ID), pl.col(self.BLDG_ID), pl.col(geographic_col_name)] + ).groupby([pl.col(self.UPGRADE_ID), pl.col(self.BLDG_ID), pl.col(geographic_col_name)]).sum() spatial_aggregation = spatial_aggregation.join(self.data, on=[pl.col(self.UPGRADE_ID), pl.col(self.BLDG_ID)]) spatial_aggregation = self._calculate_weighted_columnal_values(spatial_aggregation) @@ -1980,7 +2058,9 @@ def create_geospatially_resolved_aggregations(self, geographic_col_name, pretty_ up_ids.sort() for up_id in up_ids: file_name = f'{self.UPGRADE_ID}={up_id}' - file_path = os.path.abspath(os.path.join(self.output_dir, 'geospatial_results', geographic_col_name.replace('in.', ''), file_name)) + file_path = os.path.abspath(os.path.join( + self.output_dir, 'geospatial_results', geographic_col_name.replace('in.', ''), file_name + )) logger.info(f'Exporting to: {file_path}') to_write = spatial_aggregation.filter(pl.col(self.UPGRADE_ID) == up_id) if pretty_geo_col_name: @@ -1992,13 +2072,21 @@ def create_geospatially_resolved_aggregations(self, geographic_col_name, pretty_ logger.warn("ulimit -n 200000") logger.warn("ulimit -u 2048") logger.info("Attempting pottentially OSERROR triggering write:") - to_write.collect().write_parquet(file_path, use_pyarrow=True, pyarrow_options={"partition_cols": [pretty_geo_col_name], 'max_partitions': 3143}) + to_write.collect().write_parquet(file_path, use_pyarrow=True, pyarrow_options={ + "partition_cols": [pretty_geo_col_name], 'max_partitions': 3143 + }) # Export dictionaries corresponding to the exported columns self.export_data_and_enumeration_dictionary() + # Return the geospatially aggregated dataframe for use by plotting, etc. + return spatial_aggregation def add_weights_aportioned_by_stock_estimate(self, apportionment: Apportion, keep_n_per_apportionment_group=False): + # This function doesn't support already CBECS-weighted self.data - error out + if self.CBECS_WEIGHTS_APPLIED: + raise RuntimeError('Unable to apply apportionment weighting after CBECS weighting - reverse order.') + # TODO this should live somewhere else - don't know where... self.data = self.data.with_columns( pl.col(self.COUNTY_ID).cast(str).str.slice(0, 4).alias(self.STATE_ID) @@ -2006,7 +2094,7 @@ def add_weights_aportioned_by_stock_estimate(self, apportionment: Apportion, kee # Pull the columns required to do the matching plus the annual energy total as a safety blanket # TODO this is a superset for convienience - slim down later - csdf = self.data.filter(pl.col(self.UPGRADE_NAME) == self.BASE_NAME).select(pl.col( + csdf = self.data.clone().filter(pl.col(self.UPGRADE_NAME) == self.BASE_NAME).select(pl.col( self.BLDG_ID, self.STATE_ID, self.COUNTY_ID, self.TRACT_ID, self.SAMPLING_REGION, self.CZ_ASHRAE, self.BLDG_TYPE, self.HVAC_SYS, self.SH_FUEL, self.SIZE_BIN, self.FLR_AREA, self.TOT_EUI, self.CEN_DIV )) @@ -2027,7 +2115,9 @@ def add_weights_aportioned_by_stock_estimate(self, apportionment: Apportion, kee # domain (csdf data). # TODO make the apportionment data object a lazy df nativly apportionment.data.loc[:, 'hvac_and_fueltype'] = apportionment.data.loc[:, 'system_type'] + '_' + apportionment.data.loc[:, 'heating_fuel'] - appo_group_df = apportionment.data.loc[:, ['sampling_region', 'building_type', 'size_bin', 'hvac_and_fueltype']] + appo_group_df = apportionment.data.copy(deep=True).loc[ + :, ['sampling_region', 'building_type', 'size_bin', 'hvac_and_fueltype'] + ] appo_group_df = appo_group_df.drop_duplicates(keep='first').sort_values( by=['sampling_region', 'building_type', 'size_bin', 'hvac_and_fueltype'] ).reset_index(drop=True).reset_index(names='appo_group_id') @@ -2043,7 +2133,7 @@ def add_weights_aportioned_by_stock_estimate(self, apportionment: Apportion, kee raise RuntimeError('Not all combinations of sampling region, bt, and size bin could be matched.') # Join apportionment group id into comstock data - tdf = pl.DataFrame(apportionment.data).lazy() + tdf = pl.DataFrame(apportionment.data.copy(deep=True)).lazy() tdf = tdf.join(appo_group_df, on=['sampling_region', 'building_type', 'size_bin', 'hvac_and_fueltype']) # Identify combination in the truth data not supported by the current sample. @@ -2114,8 +2204,10 @@ def add_weights_aportioned_by_stock_estimate(self, apportionment: Apportion, kee pl.col('county').alias(self.COUNTY_ID), pl.col('state').alias(self.STATE_ID), pl.col('cz').alias(self.CZ_ASHRAE), + pl.col('cen_div').alias(self.CEN_DIV), pl.col('sqft').alias('truth_sqft'), - pl.col('tract_assignment_type').alias('in.tract_assignment_type') + pl.col('tract_assignment_type').alias('in.tract_assignment_type'), + pl.col('building_type').alias(self.BLDG_TYPE) ), on=pl.col('tdf_id')) # Pull in the sqft calculate weights @@ -2130,11 +2222,11 @@ def add_weights_aportioned_by_stock_estimate(self, apportionment: Apportion, kee self.TRACT_ID: self.TRACT_ID.replace('in.', self.POST_APPO_SIM_COL_PREFIX), self.COUNTY_ID: self.COUNTY_ID.replace('in.', self.POST_APPO_SIM_COL_PREFIX), self.STATE_ID: self.STATE_ID.replace('in.', self.POST_APPO_SIM_COL_PREFIX), - self.CZ_ASHRAE: self.CZ_ASHRAE.replace('in.', self.POST_APPO_SIM_COL_PREFIX), + self.CEN_DIV: self.CEN_DIV.replace('in.', self.POST_APPO_SIM_COL_PREFIX), }) # Drop unwanted columns from the foreign key table and persist - fkt = fkt.drop('tdf_id', 'appo_group_id', 'truth_sqft', 'in.tract_assignment_type') + fkt = fkt.drop('tdf_id', 'appo_group_id', 'truth_sqft', 'in.tract_assignment_type', self.FLR_AREA) self.APPORTIONED = True self.fkt = fkt logger.info('Successfully completed the apportionment sampling postprocessing') @@ -2442,7 +2534,7 @@ def rmv_units(c): self.data = self.data.rename(crnms) - def add_sightglass_column_units(self): + def add_sightglass_column_units(self, lazyframe): # SightGlass requires that the energy_consumption, energy_consumption_intensity, # energy_savings, and energy_savings_intensity columns have no units on the # column names. This method adds those units back to the appropriate column names, @@ -2452,7 +2544,7 @@ def rmv_units(c): return c.replace(f'..{self.units_from_col_name(c)}', '') crnms = {} # Column renames - og_cols = self.data.columns + og_cols = lazyframe.columns for col in (self.COLS_TOT_ANN_ENGY + self.COLS_ENDUSE_ANN_ENGY): # energy_consumption if rmv_units(col) in og_cols: crnms[rmv_units(col)] = col @@ -2478,7 +2570,8 @@ def rmv_units(c): assert new.startswith(old) logger.debug(f'{old} -> {new}') - self.data = self.data.rename(crnms) + lazyframe = lazyframe.rename(crnms) + return lazyframe def get_comstock_unscaled_monthly_energy_consumption(self): """ diff --git a/postprocessing/comstockpostproc/comstock_apportionment.py b/postprocessing/comstockpostproc/comstock_apportionment.py index 59ff25c4d..c564d8301 100644 --- a/postprocessing/comstockpostproc/comstock_apportionment.py +++ b/postprocessing/comstockpostproc/comstock_apportionment.py @@ -1,3 +1,6 @@ +# ComStockā„¢, Copyright (c) 2023 Alliance for Sustainable Energy, LLC. All rights reserved. +# See top level LICENSE.txt file for license terms. + import boto3 import botocore from glob import glob @@ -89,8 +92,8 @@ def __init__(self, stock_estimation_version, truth_data_version, bootstrap_coeff CEN_DIV_LKUP={ 'G090': 'New England', 'G230': 'New England', 'G250': 'New England', 'G330': 'New England', - 'G440': 'New England', 'G500': 'New England', 'G340': 'Mid-Atlantic', 'G360': 'Mid-Atlantic', - 'G420': 'Mid-Atlantic', 'G180': 'East North Central', 'G170': 'East North Central', + 'G440': 'New England', 'G500': 'New England', 'G340': 'Middle Atlantic', 'G360': 'Middle Atlantic', + 'G420': 'Middle Atlantic', 'G180': 'East North Central', 'G170': 'East North Central', 'G260': 'East North Central', 'G390': 'East North Central', 'G550': 'East North Central', 'G190': 'West North Central', 'G200': 'West North Central', 'G270': 'West North Central', 'G290': 'West North Central', 'G310': 'West North Central', 'G380': 'West North Central', @@ -534,6 +537,7 @@ def upsample_hvac_system_fuel_types(self): hcols = [col.replace('Option=', '') for col in hsdf.columns if 'Option=' in col] hsdf.columns = [col.replace('Dependency=', '').replace('Option=', '') for col in hsdf.columns] hsdf.loc[:, 'building_type'] = hsdf.loc[:, 'building_type'].map(self.BUILDING_TYPE_NAME_MAPPER) + hsdf.loc[:, 'census_region'] = hsdf.loc[:, 'census_region'].replace('Mid-Atlantic', 'Middle Atlantic') df = df.merge(hsdf, left_on=['building_type', 'heating_fuel', 'cen_div'], right_on=['building_type', 'heating_fuel', 'census_region']) # Use the merged probabilities to sample in fuel type diff --git a/postprocessing/comstockpostproc/comstock_measure_comparison.py b/postprocessing/comstockpostproc/comstock_measure_comparison.py index c8611f42a..7e242dc34 100644 --- a/postprocessing/comstockpostproc/comstock_measure_comparison.py +++ b/postprocessing/comstockpostproc/comstock_measure_comparison.py @@ -20,7 +20,12 @@ def __init__(self, comstock_object: comstock.ComStock, states, make_comparison_p # Initialize members assert isinstance(comstock_object.data, pl.LazyFrame) - self.data = comstock_object.data.clone() #not really a deep copy, only schema is copied but not data. + # Instantiate the plotting data lazyframe if it doesn't yet exist: + if not isinstance(comstock_object.plotting_data, pl.LazyFrame): + logger.info(f'Instantiating plotting lazyframe for comstock dataset {comstock_object.dataset_name}.') + comstock_object.create_plotting_lazyframe() + assert isinstance(comstock_object.plotting_data, pl.LazyFrame) + self.data = comstock_object.plotting_data.clone() #not really a deep copy, only schema is copied but not data. assert isinstance(self.data, pl.LazyFrame) self.color_map = {} diff --git a/postprocessing/comstockpostproc/comstock_to_cbecs_comparison.py b/postprocessing/comstockpostproc/comstock_to_cbecs_comparison.py index 3e499315d..dfa1b7489 100644 --- a/postprocessing/comstockpostproc/comstock_to_cbecs_comparison.py +++ b/postprocessing/comstockpostproc/comstock_to_cbecs_comparison.py @@ -51,9 +51,13 @@ def __init__(self, comstock_list: List[ComStock], cbecs_list: List[CBECS], upgra # remove measure data from ComStock if isinstance(dataset, ComStock): #dataset is ComStock assert isinstance(dataset.data, pl.LazyFrame) + # Instantiate the plotting data lazyframe if it doesn't yet exist: + if not isinstance(dataset.plotting_data, pl.LazyFrame): + logger.info(f'Instantiating plotting lazyframe for comstock dataset {dataset.dataset_name}.') + dataset.create_plotting_lazyframe() + assert isinstance(dataset.plotting_data, pl.LazyFrame) - dataset.add_sightglass_column_units() # Add units to SightGlass columns if missing - up_id_name: list = dataset.data.select(dataset.UPGRADE_ID, dataset.UPGRADE_NAME).collect().unique().to_numpy().tolist() + up_id_name: list = dataset.plotting_data.select(dataset.UPGRADE_ID, dataset.UPGRADE_NAME).collect().unique().to_numpy().tolist() up_name_map = {k: v for k, v in up_id_name} valid_upgrade_id = [x for x in up_name_map.keys()] valid_upgrade_name = [up_name_map[x] for x in valid_upgrade_id] @@ -62,10 +66,12 @@ def __init__(self, comstock_list: List[ComStock], cbecs_list: List[CBECS], upgra if upgrade_id == 'All': # df_data: pl.LazyFrame = dataset.data # df_data[dataset.DATASET] = df_data[dataset.DATASET] + ' - ' + df_data['upgrade_name'] - comstock_dfs_to_concat.append(dataset.data) + comstock_dfs_to_concat.append(dataset.plotting_data) # df_data[dataset.DATASET] = df_data[dataset.DATASET].astype(str) + ' - ' + df_data[dataset.UPGRADE_NAME].astype(str) - dataset.data = dataset.data.with_columns((pl.col(dataset.DATASET).cast(pl.Utf8) + ' - ' + pl.col(dataset.UPGRADE_NAME).cast(pl.Utf8)).alias(dataset.DATASET)) - dfs_to_concat.append(dataset.data) + dataset.plotting_data = dataset.plotting_data.with_columns(( + pl.col(dataset.DATASET).cast(pl.Utf8) + ' - ' + pl.col(dataset.UPGRADE_NAME).cast(pl.Utf8) + ).alias(dataset.DATASET)) + dfs_to_concat.append(dataset.plotting_data) # up_name_map = dict(zip(df_data[dataset.UPGRADE_ID].unique(), df_data[dataset.UPGRADE_NAME].unique())) # upgrade_list = list(df_data[dataset.UPGRADE_ID].unique()) color_dict = self.linear_gradient(dataset.COLOR_COMSTOCK_BEFORE, dataset.COLOR_COMSTOCK_AFTER, len(valid_upgrade_id)) @@ -78,7 +84,7 @@ def __init__(self, comstock_list: List[ComStock], cbecs_list: List[CBECS], upgra elif upgrade_id not in valid_upgrade_id: logger.error(f"Upgrade {upgrade_id} not found in {dataset.dataset_name}. Enter a valid upgrade ID in the ComStockToCBECSComparison constructor or \"All\" to include all upgrades.") else: - df_data = dataset.data.filter(pl.col(dataset.UPGRADE_ID) == upgrade_id) + df_data = dataset.plotting_data.filter(pl.col(dataset.UPGRADE_ID) == upgrade_id) df_data = df_data.with_columns((pl.col(dataset.DATASET).cast(pl.Utf8) + ' - ' + pl.col(dataset.UPGRADE_NAME).cast(pl.Utf8)).alias(dataset.DATASET)) dataset_name = dataset.dataset_name + " - " + up_name_map[upgrade_id] comstock_dfs_to_concat.append(df_data) @@ -112,6 +118,12 @@ def __init__(self, comstock_list: List[ComStock], cbecs_list: List[CBECS], upgra current_dir = os.path.dirname(os.path.abspath(__file__)) # Combine just comstock runs into single dataframe for QOI plots + common_columns = set(comstock_dfs_to_concat[0].columns) + all_columns = common_columns + for df in comstock_dfs_to_concat: + common_columns = common_columns & set(df.columns) + logger.info(f"Not including columns {all_columns - common_columns} in comstock only plots") + comstock_dfs_to_concat = [df.select(common_columns) for df in comstock_dfs_to_concat] comstock_df = pl.concat(comstock_dfs_to_concat, how="vertical_relaxed") # comstock_df = comstock_df[[self.DATASET] + self.QOI_MAX_DAILY_TIMING_COLS + self.QOI_MAX_USE_COLS + self.QOI_MIN_USE_COLS + self.QOI_MAX_USE_COLS_NORMALIZED + self.QOI_MIN_USE_COLS_NORMALIZED] comstock_qoi_columns = [self.DATASET] + self.QOI_MAX_DAILY_TIMING_COLS + self.QOI_MAX_USE_COLS + self.QOI_MIN_USE_COLS + self.QOI_MAX_USE_COLS_NORMALIZED + self.QOI_MIN_USE_COLS_NORMALIZED @@ -198,4 +210,9 @@ def export_to_csv_wide(self): file_name = f'ComStock wide.csv' file_path = os.path.join(self.output_dir, file_name) - self.data.to_csv(file_path, index=False) \ No newline at end of file + try: + self.data.sink_csv(file_path) + except pl.exceptions.InvalidOperationError: + logger.warn('Warning - sink_csv not supported for metadata write in current polars version') + logger.warn('Falling back to .collect.write_csv') + self.data.collect().write_csv(file_path) diff --git a/sampling/resources/10k_sample_input_validated.csv.zip b/sampling/resources/10k_sample_input_validated.csv.zip new file mode 100644 index 000000000..27b8d6256 Binary files /dev/null and b/sampling/resources/10k_sample_input_validated.csv.zip differ