From 20dca85c65204ed6b63e60454f8f3bff4dc490be Mon Sep 17 00:00:00 2001 From: Joseph Nke <76006812+jnke2016@users.noreply.github.com> Date: Mon, 14 Aug 2023 15:06:04 +0100 Subject: [PATCH] Makes copy of input ddf to work around dropped column names (#3776) When creating multiple graphs with the same dask_cudf dataframe, there is a metadata mismatch occurring when one or more partitions are empty. In fact, during the second graph creation with the dask_cudf dataframe that was used/modified earlier, the metadata are not conserved for partitions with empty empty dataframes. This is due to the fact a _reference_ to the input dataframe partly destroyed (modfied) during the first graph creation is reused in the second graph creation. This PR makes a copy of the input dataframe right after the repartition call to avoid that alteration. Authors: - jnke2016 (jnke@gmail.com) Approvers: - Vibhu Jawa (https://github.com/VibhuJawa) - Alex Barghi (https://github.com/alexbarghi-nv) - Rick Ratzel (https://github.com/rlratzel) --- .../structure/graph_implementation/simpleDistributedGraph.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py index 90db2c6b1f5..bb546ab4e5d 100644 --- a/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py +++ b/python/cugraph/cugraph/structure/graph_implementation/simpleDistributedGraph.py @@ -182,6 +182,8 @@ def __from_edgelist( workers = _client.scheduler_info()["workers"] # Repartition to 2 partitions per GPU for memory efficient process input_ddf = input_ddf.repartition(npartitions=len(workers) * 2) + # FIXME: Make a copy of the input ddf before implicitly altering it. + input_ddf = input_ddf.map_partitions(lambda df: df.copy()) # The dataframe will be symmetrized iff the graph is undirected # otherwise, the inital dataframe will be returned if edge_attr is not None: @@ -318,7 +320,6 @@ def __from_edgelist( is_symmetric=not self.properties.directed, ) ddf = ddf.repartition(npartitions=len(workers) * 2) - ddf = ddf.map_partitions(lambda df: df.copy()) ddf = persist_dask_df_equal_parts_per_worker(ddf, _client) num_edges = len(ddf) ddf = get_persisted_df_worker_map(ddf, _client)