Skip to content

Commit

Permalink
Merge branch 'branch-23.08' into python-sampling-updates-23.08
Browse files Browse the repository at this point in the history
  • Loading branch information
alexbarghi-nv authored Jul 18, 2023
2 parents e8684d4 + 54c0812 commit ad08e89
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 39 deletions.
19 changes: 19 additions & 0 deletions python/cugraph/cugraph/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
stop_dask_client,
)

import os
import tempfile

# module-wide fixtures


Expand All @@ -42,3 +45,19 @@ def dask_client():
yield dask_client

stop_dask_client(dask_client, dask_cluster)


@pytest.fixture(scope="module")
def scratch_dir():
# This should always be set if doing MG testing, since temporary
# directories are only accessible from the current process.
tempdir_object = os.getenv(
"RAPIDS_PYTEST_SCRATCH_DIR", tempfile.TemporaryDirectory()
)

if isinstance(tempdir_object, tempfile.TemporaryDirectory):
yield tempdir_object.name
else:
yield tempdir_object

del tempdir_object
45 changes: 29 additions & 16 deletions python/cugraph/cugraph/tests/sampling/test_bulk_sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
import cugraph
from cugraph.experimental.datasets import karate
from cugraph.experimental.gnn import BulkSampler
from cugraph.utilities.utils import create_directory_with_overwrite

import tempfile
import os
import shutil


@pytest.mark.sg
def test_bulk_sampler_simple():
def test_bulk_sampler_simple(scratch_dir):
el = karate.get_edgelist().reset_index().rename(columns={"index": "eid"})
el["eid"] = el["eid"].astype("int32")
el["etp"] = cupy.int32(0)
Expand All @@ -36,10 +37,12 @@ def test_bulk_sampler_simple():
edge_attr=["wgt", "eid", "etp"],
)

tempdir_object = tempfile.TemporaryDirectory()
samples_path = os.path.join(scratch_dir, "test_bulk_sampler_simple")
create_directory_with_overwrite(samples_path)

bs = BulkSampler(
batch_size=2,
output_path=tempdir_object.name,
output_path=samples_path,
graph=G,
fanout_vals=[2, 2],
with_replacement=False,
Expand All @@ -55,14 +58,16 @@ def test_bulk_sampler_simple():
bs.add_batches(batches, start_col_name="start", batch_col_name="batch")
bs.flush()

recovered_samples = cudf.read_parquet(tempdir_object.name)
recovered_samples = cudf.read_parquet(samples_path)

for b in batches["batch"].unique().values_host.tolist():
assert b in recovered_samples["batch_id"].values_host.tolist()

shutil.rmtree(samples_path)


@pytest.mark.sg
def test_bulk_sampler_remainder():
def test_bulk_sampler_remainder(scratch_dir):
el = karate.get_edgelist().reset_index().rename(columns={"index": "eid"})
el["eid"] = el["eid"].astype("int32")
el["etp"] = cupy.int32(0)
Expand All @@ -75,10 +80,12 @@ def test_bulk_sampler_remainder():
edge_attr=["wgt", "eid", "etp"],
)

tempdir_object = tempfile.TemporaryDirectory()
samples_path = os.path.join(scratch_dir, "test_bulk_sampler_remainder")
create_directory_with_overwrite(samples_path)

bs = BulkSampler(
batch_size=2,
output_path=tempdir_object.name,
output_path=samples_path,
graph=G,
seeds_per_call=7,
batches_per_partition=2,
Expand All @@ -102,26 +109,27 @@ def test_bulk_sampler_remainder():
bs.add_batches(batches, start_col_name="start", batch_col_name="batch")
bs.flush()

tld = tempdir_object.name
recovered_samples = cudf.read_parquet(tld)
recovered_samples = cudf.read_parquet(samples_path)

for b in batches["batch"].unique().values_host.tolist():
assert b in recovered_samples["batch_id"].values_host.tolist()

for x in range(0, 6, 2):
subdir = f"{x}-{x+1}"
df = cudf.read_parquet(os.path.join(tld, f"batch={subdir}.parquet"))
df = cudf.read_parquet(os.path.join(samples_path, f"batch={subdir}.parquet"))

assert ((df.batch_id == x) | (df.batch_id == (x + 1))).all()
assert ((df.hop_id == 0) | (df.hop_id == 1)).all()

assert (
cudf.read_parquet(os.path.join(tld, "batch=6-6.parquet")).batch_id == 6
cudf.read_parquet(os.path.join(samples_path, "batch=6-6.parquet")).batch_id == 6
).all()

shutil.rmtree(samples_path)


@pytest.mark.sg
def test_bulk_sampler_large_batch_size():
def test_bulk_sampler_large_batch_size(scratch_dir):
el = karate.get_edgelist().reset_index().rename(columns={"index": "eid"})
el["eid"] = el["eid"].astype("int32")
el["etp"] = cupy.int32(0)
Expand All @@ -134,10 +142,13 @@ def test_bulk_sampler_large_batch_size():
edge_attr=["wgt", "eid", "etp"],
)

tempdir_object = tempfile.TemporaryDirectory()
samples_path = os.path.join(scratch_dir, "test_bulk_sampler_large_batch_size")
if os.path.exists(samples_path):
shutil.rmtree(samples_path)
os.makedirs(samples_path)
bs = BulkSampler(
batch_size=5120,
output_path=tempdir_object.name,
output_path=samples_path,
graph=G,
fanout_vals=[2, 2],
with_replacement=False,
Expand All @@ -153,7 +164,9 @@ def test_bulk_sampler_large_batch_size():
bs.add_batches(batches, start_col_name="start", batch_col_name="batch")
bs.flush()

recovered_samples = cudf.read_parquet(tempdir_object.name)
recovered_samples = cudf.read_parquet(samples_path)

for b in batches["batch"].unique().values_host.tolist():
assert b in recovered_samples["batch_id"].values_host.tolist()

shutil.rmtree(samples_path)
19 changes: 12 additions & 7 deletions python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@

import pytest
import cudf
import tempfile
import os
import shutil

from cugraph.gnn.data_loading.bulk_sampler_io import write_samples
from cugraph.utilities.utils import create_directory_with_overwrite


@pytest.mark.sg
def test_bulk_sampler_io():
def test_bulk_sampler_io(scratch_dir):
results = cudf.DataFrame(
{
"sources": [0, 0, 1, 2, 2, 2, 3, 4, 5, 5, 6, 7],
Expand All @@ -34,12 +35,14 @@ def test_bulk_sampler_io():

offsets = cudf.DataFrame({"offsets": [0, 8], "batch_id": [0, 1]})

tempdir_object = tempfile.TemporaryDirectory()
write_samples(results, offsets, 1, tempdir_object.name)
samples_path = os.path.join(scratch_dir, "test_bulk_sampler_io")
create_directory_with_overwrite(samples_path)

assert len(os.listdir(tempdir_object.name)) == 2
write_samples(results, offsets, 1, samples_path)

df = cudf.read_parquet(os.path.join(tempdir_object.name, "batch=0-0.parquet"))
assert len(os.listdir(samples_path)) == 2

df = cudf.read_parquet(os.path.join(samples_path, "batch=0-0.parquet"))
assert len(df) == 8

assert (
Expand All @@ -55,7 +58,7 @@ def test_bulk_sampler_io():
)
assert (df.batch_id == 0).all()

df = cudf.read_parquet(os.path.join(tempdir_object.name, "batch=1-1.parquet"))
df = cudf.read_parquet(os.path.join(samples_path, "batch=1-1.parquet"))
assert len(df) == 4
assert (
df.sources.values_host.tolist()
Expand All @@ -69,3 +72,5 @@ def test_bulk_sampler_io():
df.hop_id.values_host.tolist() == results.hop_id.iloc[8:12].values_host.tolist()
)
assert (df.batch_id == 1).all()

shutil.rmtree(samples_path)
19 changes: 12 additions & 7 deletions python/cugraph/cugraph/tests/sampling/test_bulk_sampler_io_mg.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
import cudf
import dask_cudf

import tempfile
import os
import shutil

from cugraph.gnn.data_loading.bulk_sampler_io import write_samples
from cugraph.utilities.utils import create_directory_with_overwrite


@pytest.mark.mg
def test_bulk_sampler_io():
def test_bulk_sampler_io(scratch_dir):
results = cudf.DataFrame(
{
"sources": [0, 0, 1, 2, 2, 2, 3, 4, 5, 5, 6, 7],
Expand All @@ -41,12 +42,14 @@ def test_bulk_sampler_io():
offsets = cudf.DataFrame({"offsets": [0, 0], "batch_id": [0, 1]})
offsets = dask_cudf.from_cudf(offsets, npartitions=2)

tempdir_object = tempfile.TemporaryDirectory()
write_samples(results, offsets, 1, tempdir_object.name)
samples_path = os.path.join(scratch_dir, "mg_test_bulk_sampler_io")
create_directory_with_overwrite(samples_path)

assert len(os.listdir(tempdir_object.name)) == 2
write_samples(results, offsets, 1, samples_path)

df = cudf.read_parquet(os.path.join(tempdir_object.name, "batch=0-0.parquet"))
assert len(os.listdir(samples_path)) == 2

df = cudf.read_parquet(os.path.join(samples_path, "batch=0-0.parquet"))
assert len(df) == 8

results = results.compute()
Expand All @@ -63,7 +66,7 @@ def test_bulk_sampler_io():
)
assert (df.batch_id == 0).all()

df = cudf.read_parquet(os.path.join(tempdir_object.name, "batch=1-1.parquet"))
df = cudf.read_parquet(os.path.join(samples_path, "batch=1-1.parquet"))
assert len(df) == 4
assert (
df.sources.values_host.tolist()
Expand All @@ -77,3 +80,5 @@ def test_bulk_sampler_io():
df.hop_id.values_host.tolist() == results.hop_id.iloc[8:12].values_host.tolist()
)
assert (df.batch_id == 1).all()

shutil.rmtree(samples_path)
28 changes: 19 additions & 9 deletions python/cugraph/cugraph/tests/sampling/test_bulk_sampler_mg.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import cugraph
from cugraph.experimental.datasets import karate
from cugraph.experimental import BulkSampler
from cugraph.utilities.utils import create_directory_with_overwrite

import tempfile
import os
import shutil


@pytest.mark.mg
def test_bulk_sampler_simple(dask_client):
def test_bulk_sampler_simple(dask_client, scratch_dir):
el = karate.get_edgelist().reset_index().rename(columns={"index": "eid"})
el["eid"] = el["eid"].astype("int32")
el["etp"] = cupy.int32(0)
Expand All @@ -36,10 +38,12 @@ def test_bulk_sampler_simple(dask_client):
edge_attr=["wgt", "eid", "etp"],
)

tempdir_object = tempfile.TemporaryDirectory()
samples_path = os.path.join(scratch_dir, "mg_test_bulk_sampler_simple")
create_directory_with_overwrite(samples_path)

bs = BulkSampler(
batch_size=2,
output_path=tempdir_object.name,
output_path=samples_path,
graph=G,
fanout_vals=[2, 2],
with_replacement=False,
Expand All @@ -58,14 +62,16 @@ def test_bulk_sampler_simple(dask_client):
bs.add_batches(batches, start_col_name="start", batch_col_name="batch")
bs.flush()

recovered_samples = cudf.read_parquet(tempdir_object.name)
recovered_samples = cudf.read_parquet(samples_path)

for b in batches["batch"].unique().compute().values_host.tolist():
assert b in recovered_samples["batch_id"].values_host.tolist()

shutil.rmtree(samples_path)


@pytest.mark.mg
def test_bulk_sampler_mg_graph_sg_input(dask_client):
def test_bulk_sampler_mg_graph_sg_input(dask_client, scratch_dir):
el = karate.get_edgelist().reset_index().rename(columns={"index": "eid"})
el["eid"] = el["eid"].astype("int32")
el["etp"] = cupy.int32(0)
Expand All @@ -78,10 +84,12 @@ def test_bulk_sampler_mg_graph_sg_input(dask_client):
edge_attr=["wgt", "eid", "etp"],
)

tempdir_object = tempfile.TemporaryDirectory()
samples_path = os.path.join(scratch_dir, "mg_test_bulk_sampler_mg_graph_sg_input")
create_directory_with_overwrite(samples_path)

bs = BulkSampler(
batch_size=2,
output_path=tempdir_object.name,
output_path=samples_path,
graph=G,
fanout_vals=[2, 2],
with_replacement=False,
Expand All @@ -97,7 +105,9 @@ def test_bulk_sampler_mg_graph_sg_input(dask_client):
bs.add_batches(batches, start_col_name="start", batch_col_name="batch")
bs.flush()

recovered_samples = cudf.read_parquet(tempdir_object.name)
recovered_samples = cudf.read_parquet(samples_path)

for b in batches["batch"].unique().values_host.tolist():
assert b in recovered_samples["batch_id"].values_host.tolist()

shutil.rmtree(samples_path)
12 changes: 12 additions & 0 deletions python/cugraph/cugraph/utilities/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# limitations under the License.

import importlib
import os
import shutil

from numba import cuda

Expand Down Expand Up @@ -529,3 +531,13 @@ def create_list_series_from_2d_ar(ar, index):
children=(offset_col, data),
)
return cudf.Series(lc, index=index)


def create_directory_with_overwrite(directory):
"""
Creates the given directory. If it already exists, the
existing directory is recursively deleted first.
"""
if os.path.exists(directory):
shutil.rmtree(directory)
os.makedirs(directory)

0 comments on commit ad08e89

Please sign in to comment.