From 8a3d7f133cd2728a40634c74cddcdcc8f36822ea Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 26 Jul 2024 12:09:57 -0700 Subject: [PATCH 1/4] preserve dtypes in DaskExecutor.fit_phase --- merlin/dag/executors.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/merlin/dag/executors.py b/merlin/dag/executors.py index fcb8c9535..15ce96d99 100644 --- a/merlin/dag/executors.py +++ b/merlin/dag/executors.py @@ -374,7 +374,6 @@ def transform( # If so, we should perform column selection at the ddf level. # Otherwise, Dask will not push the column selection into the # IO function. - if not nodes: return ddf[_get_unique(additional_columns)] if additional_columns else ddf @@ -389,19 +388,20 @@ def transform( if col_dtype: output_dtypes[col_name] = md.dtype(col_dtype).to_numpy + def empty_like(df): + # Construct an empty DataFrame with the same dtypes as df + return df._constructor( + {k: df._constructor_sliced([], dtype=df[k].dtype) for k in df.columns} + ) + if isinstance(output_dtypes, dict) and isinstance(ddf._meta, pd.DataFrame): dtypes = output_dtypes - output_dtypes = type(ddf._meta)({k: [] for k in columns}) + output_dtypes = empty_like(ddf._meta[columns]) for col_name, col_dtype in dtypes.items(): output_dtypes[col_name] = output_dtypes[col_name].astype(col_dtype) elif not output_dtypes: - # TODO: constructing meta like this loses dtype information on the ddf - # and sets it all to 'float64'. We should propagate dtype information along - # with column names in the columngroup graph. This currently only - # happens during intermediate 'fit' transforms, so as long as statoperators - # don't require dtype information on the DDF this doesn't matter all that much - output_dtypes = type(ddf._meta)({k: [] for k in columns}) + output_dtypes = empty_like(ddf._meta[columns]) return ensure_optimize_dataframe_graph( ddf=ddf.map_partitions( From 77b963c7ae7f1a49872d8e07ddef385a8ca25b0b Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 26 Jul 2024 12:31:59 -0700 Subject: [PATCH 2/4] account for the case that columns are not consistent --- merlin/dag/executors.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/merlin/dag/executors.py b/merlin/dag/executors.py index 15ce96d99..2cd667d49 100644 --- a/merlin/dag/executors.py +++ b/merlin/dag/executors.py @@ -388,20 +388,25 @@ def transform( if col_dtype: output_dtypes[col_name] = md.dtype(col_dtype).to_numpy - def empty_like(df): + def empty_like(df, cols): # Construct an empty DataFrame with the same dtypes as df return df._constructor( - {k: df._constructor_sliced([], dtype=df[k].dtype) for k in df.columns} + { + col: df._constructor_sliced( + [], dtype=df[col].dtype if col in df.columns else "float64" + ) + for col in cols + } ) if isinstance(output_dtypes, dict) and isinstance(ddf._meta, pd.DataFrame): dtypes = output_dtypes - output_dtypes = empty_like(ddf._meta[columns]) + output_dtypes = empty_like(ddf._meta, columns) for col_name, col_dtype in dtypes.items(): output_dtypes[col_name] = output_dtypes[col_name].astype(col_dtype) elif not output_dtypes: - output_dtypes = empty_like(ddf._meta[columns]) + output_dtypes = empty_like(ddf._meta, columns) return ensure_optimize_dataframe_graph( ddf=ddf.map_partitions( From acb1688a6592da479b95eae486f81a927898f327 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 26 Jul 2024 12:35:43 -0700 Subject: [PATCH 3/4] add back comment --- merlin/dag/executors.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/merlin/dag/executors.py b/merlin/dag/executors.py index 2cd667d49..75ca53dac 100644 --- a/merlin/dag/executors.py +++ b/merlin/dag/executors.py @@ -374,6 +374,7 @@ def transform( # If so, we should perform column selection at the ddf level. # Otherwise, Dask will not push the column selection into the # IO function. + if not nodes: return ddf[_get_unique(additional_columns)] if additional_columns else ddf @@ -390,6 +391,13 @@ def transform( def empty_like(df, cols): # Construct an empty DataFrame with the same dtypes as df + + # TODO: constructing meta like this can loose dtype information for + # columns that are arbitrarily set to 'float64'. We should propagate + # dtype information along with column names in the columngroup graph. + # This currently only happens during intermediate 'fit' transforms, + # so as long as statoperators don't require dtype information on the + # DDF this doesn't matter all that much return df._constructor( { col: df._constructor_sliced( From 6cf5cb6a5c66e7ca03462b0c605145d9e30626e9 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 26 Jul 2024 13:19:27 -0700 Subject: [PATCH 4/4] punt on type preservation --- merlin/dag/executors.py | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/merlin/dag/executors.py b/merlin/dag/executors.py index 75ca53dac..ba2550ee1 100644 --- a/merlin/dag/executors.py +++ b/merlin/dag/executors.py @@ -389,32 +389,26 @@ def transform( if col_dtype: output_dtypes[col_name] = md.dtype(col_dtype).to_numpy - def empty_like(df, cols): - # Construct an empty DataFrame with the same dtypes as df - - # TODO: constructing meta like this can loose dtype information for - # columns that are arbitrarily set to 'float64'. We should propagate - # dtype information along with column names in the columngroup graph. - # This currently only happens during intermediate 'fit' transforms, - # so as long as statoperators don't require dtype information on the - # DDF this doesn't matter all that much + def make_empty(df, cols): + # Construct an empty DataFrame + + # TODO: constructing meta like this loses dtype information on the ddf + # and sets it all to 'float64'. We should propagate dtype information along + # with column names in the columngroup graph. This currently only + # happens during intermediate 'fit' transforms, so as long as statoperators + # don't require dtype information on the DDF this doesn't matter all that much return df._constructor( - { - col: df._constructor_sliced( - [], dtype=df[col].dtype if col in df.columns else "float64" - ) - for col in cols - } + {col: df._constructor_sliced([], dtype="float64") for col in cols} ) if isinstance(output_dtypes, dict) and isinstance(ddf._meta, pd.DataFrame): dtypes = output_dtypes - output_dtypes = empty_like(ddf._meta, columns) + output_dtypes = make_empty(ddf._meta, columns) for col_name, col_dtype in dtypes.items(): output_dtypes[col_name] = output_dtypes[col_name].astype(col_dtype) elif not output_dtypes: - output_dtypes = empty_like(ddf._meta, columns) + output_dtypes = make_empty(ddf._meta, columns) return ensure_optimize_dataframe_graph( ddf=ddf.map_partitions(