Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of instance-level download #112

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ repos:
rev: "v2.2.6"
hooks:
- id: codespell
args: ["--quiet-level 3"]

- repo: https://github.com/shellcheck-py/shellcheck-py
rev: "v0.9.0.6"
Expand Down
206 changes: 167 additions & 39 deletions idc_index/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
aws_endpoint_url = "https://s3.amazonaws.com"
gcp_endpoint_url = "https://storage.googleapis.com"
asset_endpoint_url = f"https://github.com/ImagingDataCommons/idc-index-data/releases/download/{idc_index_data.__version__}"
# TODO: remove later
asset_endpoint_url = (
"https://github.com/ImagingDataCommons/idc-index-data/releases/download/18.2.0"
)


logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -119,7 +124,12 @@ def _filter_dataframe_by_id(key, dataframe, _id):

@staticmethod
def _safe_filter_by_selection(
df_index, collection_id, patientId, studyInstanceUID, seriesInstanceUID
df_index,
collection_id,
patientId,
studyInstanceUID,
seriesInstanceUID,
sopInstanceUID=None,
):
if collection_id is not None:
if not isinstance(collection_id, str) and not isinstance(
Expand All @@ -139,6 +149,11 @@ def _safe_filter_by_selection(
seriesInstanceUID, list
):
raise TypeError("seriesInstanceUID must be a string or list of strings")
if sopInstanceUID is not None:
if not isinstance(sopInstanceUID, str) and not isinstance(
sopInstanceUID, list
):
raise TypeError("sopInstanceUID must be a string or list of strings")

if collection_id is not None:
result_df = IDCClient._filter_by_collection_id(df_index, collection_id)
Expand All @@ -158,6 +173,11 @@ def _safe_filter_by_selection(
result_df, seriesInstanceUID
)

if sopInstanceUID is not None:
result_df = IDCClient._filter_by_dicom_instance_uid(
result_df, sopInstanceUID
)

return result_df

@staticmethod
Expand All @@ -182,6 +202,12 @@ def _filter_by_dicom_series_uid(df_index, dicom_series_uid):
"SeriesInstanceUID", df_index, dicom_series_uid
)

@staticmethod
def _filter_by_dicom_instance_uid(df_index, dicom_instance_uid):
return IDCClient._filter_dataframe_by_id(
"SOPInstanceUID", df_index, dicom_instance_uid
)

@staticmethod
def get_idc_version():
"""
Expand All @@ -192,7 +218,7 @@ def get_idc_version():

def fetch_index(self, index) -> None:
"""
Downloads requested index.
Downloads requested index and adds this index joined with the main index as respective class attribute.

Args:
index (str): Name of the index to be downloaded.
Expand All @@ -211,10 +237,19 @@ def fetch_index(self, index) -> None:
idc_index_data.IDC_INDEX_PARQUET_FILEPATH.parents[0],
f"{index}.parquet",
)

with open(filepath, mode="wb") as file:
file.write(response.content)
setattr(self.__class__, index, pd.read_parquet(filepath))

# Join new index with main index
sm_instance_index = pd.read_parquet(filepath)
sm_instance_index = sm_instance_index.merge(
self.index, on="SeriesInstanceUID", how="left"
)

setattr(self.__class__, index, sm_instance_index)
self.indices_overview[index]["installed"] = True

else:
logger.error(
f"Failed to fetch index from URL {self.indices_overview[index]['url']}: {response.status_code}"
Expand Down Expand Up @@ -1123,7 +1158,7 @@ def _s5cmd_run(
logger.info(
f"Requested total download size is {total_size} MB, \
however at least {existing_data_size} MB is already present,\
so downloading only remaining upto {sync_size} MB\n\
so downloading only remaining up to {sync_size} MB\n\
Please note that disk sizes are calculated at series level, \
so if individual files are missing, displayed progress bar may\
not be accurate."
Expand Down Expand Up @@ -1392,6 +1427,7 @@ def download_from_selection(
patientId=None,
studyInstanceUID=None,
seriesInstanceUID=None,
sopInstanceUID=None,
quiet=True,
show_progress_bar=True,
use_s5cmd_sync=False,
Expand All @@ -1407,6 +1443,7 @@ def download_from_selection(
patientId: string or list of strings containing the values of PatientID to filter by
studyInstanceUID: string or list of strings containing the values of DICOM StudyInstanceUID to filter by
seriesInstanceUID: string or list of strings containing the values of DICOM SeriesInstanceUID to filter by
sopInstanceUID: string or list of strings containing the values of DICOM SOPInstanceUID to filter by
quiet (bool): If True, suppresses the output of the subprocess. Defaults to True
show_progress_bar (bool): If True, tracks the progress of download
use_s5cmd_sync (bool): If True, will use s5cmd sync operation instead of cp when downloadDirectory is not empty; this can significantly improve the download speed if the content is partially downloaded
Expand All @@ -1418,21 +1455,43 @@ def download_from_selection(
if not os.path.exists(downloadDir):
raise ValueError("Download directory does not exist.")

# If SOPInstanceUID(s) are given, we need to join the main index with the instance-level index
if sopInstanceUID:
if hasattr(
self, "sm_instance_index"
): # check if instance-level index is installed
index_to_be_filtered = self.sm_instance_index
else:
logger.error(
"Instance-level access not possible because instance-level index not installed."
)
raise ValueError(
"Instance-level access not possible because instance-level index not installed."
)
else:
index_to_be_filtered = self.index

result_df = self._safe_filter_by_selection(
self.index,
index_to_be_filtered,
collection_id=collection_id,
patientId=patientId,
studyInstanceUID=studyInstanceUID,
seriesInstanceUID=seriesInstanceUID,
sopInstanceUID=sopInstanceUID,
)

total_size = round(result_df["series_size_MB"].sum(), 2)
logger.info("Total size of files to download: " + self._format_size(total_size))
logger.info(
"Total free space on disk: "
+ str(psutil.disk_usage(downloadDir).free / (1000 * 1000 * 1000))
+ "GB"
)
if not sopInstanceUID:
total_size = round(result_df["series_size_MB"].sum(), 2)
logger.info(
"Total size of files to download: " + self._format_size(total_size)
)
logger.info(
"Total free space on disk: "
+ str(psutil.disk_usage(downloadDir).free / (1000 * 1000 * 1000))
+ "GB"
)
else:
total_size = 0

if dry_run:
logger.info(
Expand All @@ -1445,51 +1504,77 @@ def download_from_selection(
downloadDir=downloadDir,
dirTemplate=dirTemplate,
)
sql = f"""
WITH temp as
(
SELECT
seriesInstanceUID
FROM
result_df
)
SELECT
series_aws_url,
{hierarchy} as path
FROM
temp
JOIN
index using (seriesInstanceUID)
"""
if sopInstanceUID:
sql = f"""
WITH temp as
(
SELECT
sopInstanceUID
FROM
result_df
)
SELECT
CONCAT(TRIM('*' FROM series_aws_url), crdc_instance_uuid, '.dcm') as instance_url,
CONCAT({hierarchy}, '/') as path
FROM
temp
JOIN
sm_instance_index using (sopInstanceUID)
"""
else:
sql = f"""
WITH temp as
(
SELECT
seriesInstanceUID
FROM
result_df
)
SELECT
series_aws_url,
{hierarchy} as path
FROM
temp
JOIN
index using (seriesInstanceUID)
"""
result_df = self.sql_query(sql)
# Download the files
# make temporary file to store the list of files to download
# Download the files and make temporary file to store the list of files to download

with tempfile.NamedTemporaryFile(mode="w", delete=False) as manifest_file:
# Determine column containing the URL for instance / series-level access
if sopInstanceUID:
if not "instance_url" in result_df:
result_df["instance_url"] = (
result_df["series_aws_url"].replace("/*", "/")
+ result_df["crdc_instance_uuid"]
+ ".dcm"
)
url_column = "instance_url"
else:
url_column = "series_aws_url"

if use_s5cmd_sync and len(os.listdir(downloadDir)) != 0:
if dirTemplate is not None:
result_df["s5cmd_cmd"] = (
"sync "
+ result_df["series_aws_url"]
+ ' "'
+ result_df["path"]
+ '"'
"sync " + result_df[url_column] + ' "' + result_df["path"] + '"'
)
else:
result_df["s5cmd_cmd"] = (
"sync " + result_df["series_aws_url"] + ' "' + downloadDir + '"'
"sync " + result_df[url_column] + ' "' + downloadDir + '"'
)
elif dirTemplate is not None:
result_df["s5cmd_cmd"] = (
"cp " + result_df["series_aws_url"] + ' "' + result_df["path"] + '"'
"cp " + result_df[url_column] + ' "' + result_df["path"] + '"'
)
else:
result_df["s5cmd_cmd"] = (
"cp " + result_df["series_aws_url"] + ' "' + downloadDir + '"'
"cp " + result_df[url_column] + ' "' + downloadDir + '"'
)

# Combine all commands into a single string with newline separators
commands = "\n".join(result_df["s5cmd_cmd"])

print(commands)
manifest_file.write(commands)

if dirTemplate is not None:
Expand All @@ -1513,6 +1598,44 @@ def download_from_selection(
list_of_directories=list_of_directories,
)

def download_dicom_instance(
self,
sopInstanceUID,
downloadDir,
dry_run=False,
quiet=True,
show_progress_bar=True,
use_s5cmd_sync=False,
dirTemplate=DOWNLOAD_HIERARCHY_DEFAULT,
) -> None:
"""
Download the files corresponding to the seriesInstanceUID to the specified directory.

Args:
sopInstanceUID: string or list of strings containing the values of DICOM SOPInstanceUID to filter by
downloadDir: string containing the path to the directory to download the files to
dry_run: calculates the size of the cohort but download does not start
quiet (bool): If True, suppresses the output of the subprocess. Defaults to True.
show_progress_bar (bool): If True, tracks the progress of download
use_s5cmd_sync (bool): If True, will use s5cmd sync operation instead of cp when downloadDirectory is not empty; this can significantly improve the download speed if the content is partially downloaded
dirTemplate (str): Download directory hierarchy template. This variable defines the folder hierarchy for the organizing the downloaded files in downloadDirectory. Defaults to index.DOWNLOAD_HIERARCHY_DEFAULT set to %collection_id/%PatientID/%StudyInstanceUID/%Modality_%SeriesInstanceUID. The template string can be built using a combination of selected metadata attributes (PatientID, collection_id, Modality, StudyInstanceUID, SeriesInstanceUID) that must be prefixed by '%'. The following special characters can be used as separators: '-' (hyphen), '/' (slash for subdirectories), '_' (underscore). When set to None all files will be downloaded to the download directory with no subdirectories.

Returns: None

Raises:
TypeError: If sopInstanceUID(s) passed is(are) not a string or list

"""
self.download_from_selection(
downloadDir,
sopInstanceUID=sopInstanceUID,
dry_run=dry_run,
quiet=quiet,
show_progress_bar=show_progress_bar,
use_s5cmd_sync=use_s5cmd_sync,
dirTemplate=dirTemplate,
)

def download_dicom_series(
self,
seriesInstanceUID,
Expand Down Expand Up @@ -1679,4 +1802,9 @@ def sql_query(self, sql_query):
"""

index = self.index
# TODO: find a more elegant way to automate the following
if hasattr(self, "sm_index"):
sm_index = self.sm_index
if hasattr(self, "sm_instance_index"):
sm_instance_index = self.sm_instance_index
return duckdb.query(sql_query).to_df()
15 changes: 11 additions & 4 deletions tests/idcindex.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,17 @@ def test_download_dicom_series(self):
)
self.assertEqual(sum([len(files) for r, d, files in os.walk(temp_dir)]), 3)

def test_download_dicom_instance(self):
i = IDCClient()
i.fetch_index("sm_instance_index")
with tempfile.TemporaryDirectory() as temp_dir:
self.client.download_dicom_instance(
sopInstanceUID="1.3.6.1.4.1.5962.99.1.528744472.1087975700.1641206284312.14.0",
downloadDir=temp_dir,
)

self.assertEqual(sum([len(files) for r, d, files in os.walk(temp_dir)]), 1)

def test_download_with_template(self):
dirTemplateValues = [
None,
Expand Down Expand Up @@ -483,10 +494,6 @@ def test_prior_version_manifest(self):
with open(temp_manifest_file) as file:
assert len(file.readlines()) == 0

def test_list_indices(self):
i = IDCClient()
assert i.indices_overview # assert that dict was created

def test_fetch_index(self):
i = IDCClient()
assert i.indices_overview["sm_index"]["installed"] is False
Expand Down
Loading