diff --git a/data_processing/common/PipelineBuilder.py b/data_processing/common/PipelineBuilder.py new file mode 100644 index 00000000..e793cf91 --- /dev/null +++ b/data_processing/common/PipelineBuilder.py @@ -0,0 +1,16 @@ +import yaml, json +import data_processing.transforms + +def load(stream): + if isinstance(stream, str): + stream = open(stream, 'r') + pipeline_config = yaml.safe_load( stream ) + pipeline = [] + for stage, job_config in enumerate(pipeline_config['stages']): + print (f"STAGE {stage}: {job_config['job']}") + print (json.dumps(job_config, indent=4)) + method_to_call = getattr(data_processing.transforms , job_config['job']) + pipeline.append ( (method_to_call, job_config) ) + + return pipeline + diff --git a/data_processing/pipelines/lung_radiomics_recist.yaml b/data_processing/pipelines/lung_radiomics_recist.yaml new file mode 100644 index 00000000..7f419b4e --- /dev/null +++ b/data_processing/pipelines/lung_radiomics_recist.yaml @@ -0,0 +1,94 @@ +stages: + - job: generate_scan + dicom_input_tag: base-etl + job_tag: original.mhd + itkImageType: mhd + + - job: randomize_contours + image_input_tag: original.mhd + label_input_tag: lung.radiologists + job_tag: randomize_contours_s3-125 + + - job: extract_radiomics + image_input_tag: original.mhd + label_input_tag: lung.radiologists + job_tag : radiomics-original-label-1 + strictGeometry: true + enableAllImageTypes: false + RadiomicsFeatureExtractor: + interpolator: sitkBSpline + resampledPixelSpacing: [1.25, 1.25, 1.25] + binWidth: 10 + verbose: true + label: 1 + + - job: extract_radiomics + image_input_tag: original.mhd + label_input_tag: lung.radiologists + job_tag : radiomics-original-label-2 + strictGeometry: true + enableAllImageTypes: false + RadiomicsFeatureExtractor: + interpolator: sitkBSpline + resampledPixelSpacing: [1.25, 1.25, 1.25] + binWidth: 10 + verbose: true + label: 2 + + - job: extract_radiomics + image_input_tag: original.mhd + label_input_tag: lung.radiologists + job_tag : radiomics-original-label-3 + strictGeometry: true + enableAllImageTypes: false + RadiomicsFeatureExtractor: + interpolator: sitkBSpline + resampledPixelSpacing: [1.25, 1.25, 1.25] + binWidth: 10 + verbose: true + label: 3 + + - job: extract_radiomics + image_input_tag: original.mhd + label_input_tag: lung.radiologists + job_tag : radiomics-original-label-4 + strictGeometry: true + enableAllImageTypes: false + RadiomicsFeatureExtractor: + interpolator: sitkBSpline + resampledPixelSpacing: [1.25, 1.25, 1.25] + binWidth: 10 + verbose: true + label: 4 + + - job: extract_radiomics + image_input_tag: original.mhd + label_input_tag: lung.radiologists + job_tag : radiomics-original-label-5 + strictGeometry: true + enableAllImageTypes: false + RadiomicsFeatureExtractor: + interpolator: sitkBSpline + resampledPixelSpacing: [1.25, 1.25, 1.25] + binWidth: 10 + verbose: true + label: 5 + + - job: extract_radiomics + image_input_tag: original.mhd + label_input_tag: lung.radiologists + job_tag : radiomics-original-label-6 + strictGeometry: true + enableAllImageTypes: false + RadiomicsFeatureExtractor: + interpolator: sitkBSpline + resampledPixelSpacing: [1.25, 1.25, 1.25] + binWidth: 10 + verbose: true + label: 6 + + - job: collect_results + output_container: lung_radiomics_v1-original-label + input_tags: [radiomics-original-label-1, radiomics-original-label-2, radiomics-original-label-3, radiomics-original-label-4, radiomics-original-label-5, radiomics-original-label-6] + + diff --git a/data_processing/pipelines/qin_tcia_example.yaml b/data_processing/pipelines/qin_tcia_example.yaml new file mode 100644 index 00000000..1b8bb695 --- /dev/null +++ b/data_processing/pipelines/qin_tcia_example.yaml @@ -0,0 +1,38 @@ +stages: + - job: randomize_contours + image_input_tag: main_scan + label_input_tag: annotation_ata + job_tag: randomize_contours_s3-125 + + - job: extract_radiomics + image_input_tag: main_scan + label_input_tag: annotation_ata + job_tag : radiomics-original-label-1 + strictGeometry: true + enableAllImageTypes: false + RadiomicsFeatureExtractor: + interpolator: sitkBSpline + resampledPixelSpacing: [1.25, 1.25, 1.25] + binWidth: 10 + verbose: true + label: 1 + + - job: extract_radiomics + image_input_tag: randomize_contours_s3-125 + label_input_tag: randomize_contours_s3-125 + job_tag : radiomics-pertubation-label-1 + strictGeometry: true + enableAllImageTypes: false + usingPertubations: true + RadiomicsFeatureExtractor: + interpolator: sitkBSpline + resampledPixelSpacing: [1.25, 1.25, 1.25] + binWidth: 10 + verbose: true + label: 1 + + - job: collect_results + output_container: lung_radiomics_v1-original-label + input_tags: [radiomics-original-label-1, radiomics-pertubation-label-1] + + diff --git a/data_processing/radiology/cli/extract_radiomics.py b/data_processing/radiology/cli/extract_radiomics.py index 41709118..6da020d3 100644 --- a/data_processing/radiology/cli/extract_radiomics.py +++ b/data_processing/radiology/cli/extract_radiomics.py @@ -68,9 +68,10 @@ def extract_radiomics_with_container(cohort_id, container_id, method_data, semap except Exception as e: container.logger.exception (f"{e}, stopping job execution...") else: - output_node = Node("Radiomics", method_id, properties) - container.add(output_node) - container.saveAll() + if properties: + output_node = Node("Radiomics", method_id, properties) + container.add(output_node) + container.saveAll() finally: return semaphore + 1 diff --git a/data_processing/radiology/common/preprocess.py b/data_processing/radiology/common/preprocess.py index 657e2e92..7306fc9b 100644 --- a/data_processing/radiology/common/preprocess.py +++ b/data_processing/radiology/common/preprocess.py @@ -409,12 +409,16 @@ def extract_radiomics(image_path: str, label_path: str, output_dir: str, params: result_list = [] for label_path in label_path_list: + image, image_header = load(image_path) + label, label_header = load(label_path) + + if params['RadiomicsFeatureExtractor']['label'] not in label: + logger.warning(f"No mask pixels labeled [{params['RadiomicsFeatureExtractor']['label']}] found, returning None") + return None extractor = featureextractor.RadiomicsFeatureExtractor(**params.get('RadiomicsFeatureExtractor', {})) if params.get("strictGeometry", False): - image, image_header = load(image_path) - label, label_header = load(label_path) if not image_header.get_voxel_spacing() == label_header.get_voxel_spacing(): raise RuntimeError(f"Voxel spacing mismatch, image.spacing={image_header.get_voxel_spacing()}, label.spacing={label_header.get_voxel_spacing()}" ) if not image.shape == label.shape: diff --git a/data_processing/radiology/mirp/imageReaders.py b/data_processing/radiology/mirp/imageReaders.py index bfd553a4..fb8e9713 100644 --- a/data_processing/radiology/mirp/imageReaders.py +++ b/data_processing/radiology/mirp/imageReaders.py @@ -50,6 +50,9 @@ def read_itk_segmentation(path_to_seg_file): roi_list = [] + if len(np.unique(int_mask)) > 256: + raise RuntimeWarning("More than 256 unique values in mask, perhaps this is an image, or something went really wrong?") + for label_value in np.unique(int_mask): if label_value == 0: continue diff --git a/data_processing/radiology/proxy_table/transfer_files.sh b/data_processing/radiology/proxy_table/transfer_files.sh old mode 100755 new mode 100644 diff --git a/data_processing/transforms.py b/data_processing/transforms.py new file mode 100644 index 00000000..b2a508b1 --- /dev/null +++ b/data_processing/transforms.py @@ -0,0 +1,4 @@ +from data_processing.radiology.cli.generate_scan import generate_scan_with_container as generate_scan +from data_processing.radiology.cli.randomize_contours import randomize_contours_with_container as randomize_contours +from data_processing.radiology.cli.extract_radiomics import extract_radiomics_with_container as extract_radiomics +from data_processing.radiology.cli.collect_csv_segment import collect_result_segment_with_container as collect_results diff --git a/integration/qin_radiology_example.py b/integration/qin_radiology_example.py index 0f762c3a..3fda8c37 100644 --- a/integration/qin_radiology_example.py +++ b/integration/qin_radiology_example.py @@ -1,116 +1,65 @@ -import sys +import sys, logging from data_processing.common.DataStore import DataStore from data_processing.common.DataStore import Node from data_processing.common.config import ConfigSet - -from data_processing.radiology.cli.randomize_contours import randomize_contours_with_container -from data_processing.radiology.cli.extract_radiomics import extract_radiomics_with_container -from data_processing.radiology.cli.collect_csv_segment import collect_result_segment_with_container +from data_processing.common.CodeTimer import CodeTimer +from data_processing.common.PipelineBuilder import load if __name__=='__main__': backend = sys.argv[1] - print ("Submitting to", backend) - - if backend=='pool': - from concurrent.futures import ProcessPoolExecutor - executor = ProcessPoolExecutor(20) - - elif backend=='dask': - from dask.distributed import Client - dask_client = Client() - - # This souldn't be neccessary once we have stable releases of data_processing - dask_client.upload_file('data_processing/radiology/cli/collect_csv_segment.py') - dask_client.upload_file('data_processing/radiology/cli/extract_radiomics.py') - dask_client.upload_file('data_processing/radiology/cli/randomize_contours.py') - dask_client.upload_file('data_processing/common/DataStore.py') - dask_client.upload_file('data_processing/common/Node.py') - - - cfg = ConfigSet("APP_CFG", "config.yaml") - - container = DataStore(cfg) - container.createNamespace("my-analysis") - - futures = [] - - for patient in ['QIN-BREAST-01-0001', 'QIN-BREAST-01-0002']: - container = DataStore(cfg).setNamespace("my-analysis") - - container.createContainer(patient, 'patient') - container.setContainer (patient) - - image = Node("VolumetricImage", "main_scan") - image.set_data(f"qin-test-data/{patient}.mhd") - image.set_aux (f"qin-test-data/{patient}.raw") - container.add(image) - - label = Node("VolumetricLabel", "annotation_ata") - label.set_data(f"qin-test-data/{patient}.mha") - container.add(label) - - container.saveAll() - - pertubation_job_config = { - "job_tag": "mirp_pertubation", - "image_input_tag": "main_scan", - "label_input_tag": "annotation_ata" - } - - radiomics_job_config_original = { - "image_input_tag": "main_scan", - "label_input_tag": "annotation_ata", - "job_tag" : "my_radiomics_original", - "strictGeometry": True, - "enableAllImageTypes": True, - "RadiomicsFeatureExtractor": { - "interpolator": "sitkBSpline", - "resampledPixelSpacing": [1.25, 1.25, 1.25], - "binWidth": 10, - "verbose": "True", - "label":1, - "geometryTolerance":1e-08 - } - } - - radiomics_job_config_pertubation = { - "image_input_tag": "mirp_pertubation", - "label_input_tag": "mirp_pertubation", - "job_tag" : "my_radiomics_pertubation", - "strictGeometry": True, - "enableAllImageTypes": True, - "usingPertubations": True, - "RadiomicsFeatureExtractor": { - "interpolator": "sitkBSpline", - "resampledPixelSpacing": [1.25, 1.25, 1.25], - "binWidth": 10, - "verbose": "True", - "label":1, - "geometryTolerance":1e-08 - } - } - - collect_job_config = { - "input_tags": ["my_radiomics_original", "my_radiomics_pertubation"], - "output_container": "my_results", - } - - pipeline = [ - (randomize_contours_with_container, pertubation_job_config), - (extract_radiomics_with_container, radiomics_job_config_original), - (extract_radiomics_with_container, radiomics_job_config_pertubation), - (collect_result_segment_with_container, collect_job_config), - ] - - if backend=='local': - container.runLocal(pipeline) - elif backend=='pool': - futures.append ( container.runProcessPoolExecutor(pipeline, executor) ) + with CodeTimer(logging.getLogger(), f"Running qin_tcia_example with backend=[{backend}]"): + + if backend=='pool': + from concurrent.futures import ProcessPoolExecutor + executor = ProcessPoolExecutor(20) + elif backend=='dask': - futures.append ( container.runDaskDistributed(pipeline, dask_client) ) + from dask.distributed import Client + dask_client = Client(processes = True, threads_per_worker=1, n_workers=10) + + # This souldn't be neccessary once we have stable releases of data_processing + dask_client.upload_file('data_processing/radiology/cli/collect_csv_segment.py') + dask_client.upload_file('data_processing/radiology/cli/extract_radiomics.py') + dask_client.upload_file('data_processing/radiology/cli/randomize_contours.py') + dask_client.upload_file('data_processing/common/DataStore.py') + dask_client.upload_file('data_processing/common/Node.py') + + + cfg = ConfigSet("APP_CFG", "config.yaml") + + container = DataStore(cfg) + container.createNamespace("my-analysis") + + futures = [] + + pipeline = load("data_processing/pipelines/qin_tcia_example.yaml") + + for patient in ['QIN-BREAST-01-0001', 'QIN-BREAST-01-0002']: + container = DataStore(cfg).setNamespace("my-analysis") + + container.createContainer(patient, 'patient') + container.setContainer (patient) + + image = Node("VolumetricImage", "main_scan") + image.set_data(f"qin-test-data/{patient}.mhd") + image.set_aux (f"qin-test-data/{patient}.raw") + container.add(image) + + label = Node("VolumetricLabel", "annotation_ata") + label.set_data(f"qin-test-data/{patient}.mha") + container.add(label) + + container.saveAll() + + if backend=='local': + container.runLocal(pipeline) + elif backend=='pool': + futures.append ( container.runProcessPoolExecutor(pipeline, executor) ) + elif backend=='dask': + futures.append ( container.runDaskDistributed(pipeline, dask_client) ) - # Block in this process until everything completes - for future in futures: - print (future.result()) + # Block in this process until everything completes + for future in futures: + print (future.result())