Skip to content

Commit

Permalink
Some fixes/pipeline module
Browse files Browse the repository at this point in the history
* Load pipeline from yaml

* Some updates/bug fixes
  • Loading branch information
aauker authored Apr 27, 2021
1 parent b7e511c commit ef4e306
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 111 deletions.
16 changes: 16 additions & 0 deletions data_processing/common/PipelineBuilder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import yaml, json
import data_processing.transforms

def load(stream):
if isinstance(stream, str):
stream = open(stream, 'r')
pipeline_config = yaml.safe_load( stream )
pipeline = []
for stage, job_config in enumerate(pipeline_config['stages']):
print (f"STAGE {stage}: {job_config['job']}")
print (json.dumps(job_config, indent=4))
method_to_call = getattr(data_processing.transforms , job_config['job'])
pipeline.append ( (method_to_call, job_config) )

return pipeline

94 changes: 94 additions & 0 deletions data_processing/pipelines/lung_radiomics_recist.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
stages:
- job: generate_scan
dicom_input_tag: base-etl
job_tag: original.mhd
itkImageType: mhd

- job: randomize_contours
image_input_tag: original.mhd
label_input_tag: lung.radiologists
job_tag: randomize_contours_s3-125

- job: extract_radiomics
image_input_tag: original.mhd
label_input_tag: lung.radiologists
job_tag : radiomics-original-label-1
strictGeometry: true
enableAllImageTypes: false
RadiomicsFeatureExtractor:
interpolator: sitkBSpline
resampledPixelSpacing: [1.25, 1.25, 1.25]
binWidth: 10
verbose: true
label: 1

- job: extract_radiomics
image_input_tag: original.mhd
label_input_tag: lung.radiologists
job_tag : radiomics-original-label-2
strictGeometry: true
enableAllImageTypes: false
RadiomicsFeatureExtractor:
interpolator: sitkBSpline
resampledPixelSpacing: [1.25, 1.25, 1.25]
binWidth: 10
verbose: true
label: 2

- job: extract_radiomics
image_input_tag: original.mhd
label_input_tag: lung.radiologists
job_tag : radiomics-original-label-3
strictGeometry: true
enableAllImageTypes: false
RadiomicsFeatureExtractor:
interpolator: sitkBSpline
resampledPixelSpacing: [1.25, 1.25, 1.25]
binWidth: 10
verbose: true
label: 3

- job: extract_radiomics
image_input_tag: original.mhd
label_input_tag: lung.radiologists
job_tag : radiomics-original-label-4
strictGeometry: true
enableAllImageTypes: false
RadiomicsFeatureExtractor:
interpolator: sitkBSpline
resampledPixelSpacing: [1.25, 1.25, 1.25]
binWidth: 10
verbose: true
label: 4

- job: extract_radiomics
image_input_tag: original.mhd
label_input_tag: lung.radiologists
job_tag : radiomics-original-label-5
strictGeometry: true
enableAllImageTypes: false
RadiomicsFeatureExtractor:
interpolator: sitkBSpline
resampledPixelSpacing: [1.25, 1.25, 1.25]
binWidth: 10
verbose: true
label: 5

- job: extract_radiomics
image_input_tag: original.mhd
label_input_tag: lung.radiologists
job_tag : radiomics-original-label-6
strictGeometry: true
enableAllImageTypes: false
RadiomicsFeatureExtractor:
interpolator: sitkBSpline
resampledPixelSpacing: [1.25, 1.25, 1.25]
binWidth: 10
verbose: true
label: 6

- job: collect_results
output_container: lung_radiomics_v1-original-label
input_tags: [radiomics-original-label-1, radiomics-original-label-2, radiomics-original-label-3, radiomics-original-label-4, radiomics-original-label-5, radiomics-original-label-6]


38 changes: 38 additions & 0 deletions data_processing/pipelines/qin_tcia_example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
stages:
- job: randomize_contours
image_input_tag: main_scan
label_input_tag: annotation_ata
job_tag: randomize_contours_s3-125

- job: extract_radiomics
image_input_tag: main_scan
label_input_tag: annotation_ata
job_tag : radiomics-original-label-1
strictGeometry: true
enableAllImageTypes: false
RadiomicsFeatureExtractor:
interpolator: sitkBSpline
resampledPixelSpacing: [1.25, 1.25, 1.25]
binWidth: 10
verbose: true
label: 1

- job: extract_radiomics
image_input_tag: randomize_contours_s3-125
label_input_tag: randomize_contours_s3-125
job_tag : radiomics-pertubation-label-1
strictGeometry: true
enableAllImageTypes: false
usingPertubations: true
RadiomicsFeatureExtractor:
interpolator: sitkBSpline
resampledPixelSpacing: [1.25, 1.25, 1.25]
binWidth: 10
verbose: true
label: 1

- job: collect_results
output_container: lung_radiomics_v1-original-label
input_tags: [radiomics-original-label-1, radiomics-pertubation-label-1]


7 changes: 4 additions & 3 deletions data_processing/radiology/cli/extract_radiomics.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ def extract_radiomics_with_container(cohort_id, container_id, method_data, semap
except Exception as e:
container.logger.exception (f"{e}, stopping job execution...")
else:
output_node = Node("Radiomics", method_id, properties)
container.add(output_node)
container.saveAll()
if properties:
output_node = Node("Radiomics", method_id, properties)
container.add(output_node)
container.saveAll()
finally:
return semaphore + 1

Expand Down
8 changes: 6 additions & 2 deletions data_processing/radiology/common/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,12 +409,16 @@ def extract_radiomics(image_path: str, label_path: str, output_dir: str, params:

result_list = []
for label_path in label_path_list:
image, image_header = load(image_path)
label, label_header = load(label_path)

if params['RadiomicsFeatureExtractor']['label'] not in label:
logger.warning(f"No mask pixels labeled [{params['RadiomicsFeatureExtractor']['label']}] found, returning None")
return None

extractor = featureextractor.RadiomicsFeatureExtractor(**params.get('RadiomicsFeatureExtractor', {}))

if params.get("strictGeometry", False):
image, image_header = load(image_path)
label, label_header = load(label_path)
if not image_header.get_voxel_spacing() == label_header.get_voxel_spacing():
raise RuntimeError(f"Voxel spacing mismatch, image.spacing={image_header.get_voxel_spacing()}, label.spacing={label_header.get_voxel_spacing()}" )
if not image.shape == label.shape:
Expand Down
3 changes: 3 additions & 0 deletions data_processing/radiology/mirp/imageReaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def read_itk_segmentation(path_to_seg_file):

roi_list = []

if len(np.unique(int_mask)) > 256:
raise RuntimeWarning("More than 256 unique values in mask, perhaps this is an image, or something went really wrong?")

for label_value in np.unique(int_mask):
if label_value == 0: continue

Expand Down
Empty file modified data_processing/radiology/proxy_table/transfer_files.sh
100755 → 100644
Empty file.
4 changes: 4 additions & 0 deletions data_processing/transforms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from data_processing.radiology.cli.generate_scan import generate_scan_with_container as generate_scan
from data_processing.radiology.cli.randomize_contours import randomize_contours_with_container as randomize_contours
from data_processing.radiology.cli.extract_radiomics import extract_radiomics_with_container as extract_radiomics
from data_processing.radiology.cli.collect_csv_segment import collect_result_segment_with_container as collect_results
161 changes: 55 additions & 106 deletions integration/qin_radiology_example.py
Original file line number Diff line number Diff line change
@@ -1,116 +1,65 @@
import sys
import sys, logging

from data_processing.common.DataStore import DataStore
from data_processing.common.DataStore import Node
from data_processing.common.config import ConfigSet

from data_processing.radiology.cli.randomize_contours import randomize_contours_with_container
from data_processing.radiology.cli.extract_radiomics import extract_radiomics_with_container
from data_processing.radiology.cli.collect_csv_segment import collect_result_segment_with_container
from data_processing.common.CodeTimer import CodeTimer
from data_processing.common.PipelineBuilder import load


if __name__=='__main__':
backend = sys.argv[1]
print ("Submitting to", backend)

if backend=='pool':
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor(20)

elif backend=='dask':
from dask.distributed import Client
dask_client = Client()

# This souldn't be neccessary once we have stable releases of data_processing
dask_client.upload_file('data_processing/radiology/cli/collect_csv_segment.py')
dask_client.upload_file('data_processing/radiology/cli/extract_radiomics.py')
dask_client.upload_file('data_processing/radiology/cli/randomize_contours.py')
dask_client.upload_file('data_processing/common/DataStore.py')
dask_client.upload_file('data_processing/common/Node.py')


cfg = ConfigSet("APP_CFG", "config.yaml")

container = DataStore(cfg)
container.createNamespace("my-analysis")

futures = []

for patient in ['QIN-BREAST-01-0001', 'QIN-BREAST-01-0002']:
container = DataStore(cfg).setNamespace("my-analysis")

container.createContainer(patient, 'patient')
container.setContainer (patient)

image = Node("VolumetricImage", "main_scan")
image.set_data(f"qin-test-data/{patient}.mhd")
image.set_aux (f"qin-test-data/{patient}.raw")
container.add(image)

label = Node("VolumetricLabel", "annotation_ata")
label.set_data(f"qin-test-data/{patient}.mha")
container.add(label)

container.saveAll()

pertubation_job_config = {
"job_tag": "mirp_pertubation",
"image_input_tag": "main_scan",
"label_input_tag": "annotation_ata"
}

radiomics_job_config_original = {
"image_input_tag": "main_scan",
"label_input_tag": "annotation_ata",
"job_tag" : "my_radiomics_original",
"strictGeometry": True,
"enableAllImageTypes": True,
"RadiomicsFeatureExtractor": {
"interpolator": "sitkBSpline",
"resampledPixelSpacing": [1.25, 1.25, 1.25],
"binWidth": 10,
"verbose": "True",
"label":1,
"geometryTolerance":1e-08
}
}

radiomics_job_config_pertubation = {
"image_input_tag": "mirp_pertubation",
"label_input_tag": "mirp_pertubation",
"job_tag" : "my_radiomics_pertubation",
"strictGeometry": True,
"enableAllImageTypes": True,
"usingPertubations": True,
"RadiomicsFeatureExtractor": {
"interpolator": "sitkBSpline",
"resampledPixelSpacing": [1.25, 1.25, 1.25],
"binWidth": 10,
"verbose": "True",
"label":1,
"geometryTolerance":1e-08
}
}

collect_job_config = {
"input_tags": ["my_radiomics_original", "my_radiomics_pertubation"],
"output_container": "my_results",
}

pipeline = [
(randomize_contours_with_container, pertubation_job_config),
(extract_radiomics_with_container, radiomics_job_config_original),
(extract_radiomics_with_container, radiomics_job_config_pertubation),
(collect_result_segment_with_container, collect_job_config),
]

if backend=='local':
container.runLocal(pipeline)
elif backend=='pool':
futures.append ( container.runProcessPoolExecutor(pipeline, executor) )
with CodeTimer(logging.getLogger(), f"Running qin_tcia_example with backend=[{backend}]"):

if backend=='pool':
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor(20)

elif backend=='dask':
futures.append ( container.runDaskDistributed(pipeline, dask_client) )
from dask.distributed import Client
dask_client = Client(processes = True, threads_per_worker=1, n_workers=10)

# This souldn't be neccessary once we have stable releases of data_processing
dask_client.upload_file('data_processing/radiology/cli/collect_csv_segment.py')
dask_client.upload_file('data_processing/radiology/cli/extract_radiomics.py')
dask_client.upload_file('data_processing/radiology/cli/randomize_contours.py')
dask_client.upload_file('data_processing/common/DataStore.py')
dask_client.upload_file('data_processing/common/Node.py')


cfg = ConfigSet("APP_CFG", "config.yaml")

container = DataStore(cfg)
container.createNamespace("my-analysis")

futures = []

pipeline = load("data_processing/pipelines/qin_tcia_example.yaml")

for patient in ['QIN-BREAST-01-0001', 'QIN-BREAST-01-0002']:
container = DataStore(cfg).setNamespace("my-analysis")

container.createContainer(patient, 'patient')
container.setContainer (patient)

image = Node("VolumetricImage", "main_scan")
image.set_data(f"qin-test-data/{patient}.mhd")
image.set_aux (f"qin-test-data/{patient}.raw")
container.add(image)

label = Node("VolumetricLabel", "annotation_ata")
label.set_data(f"qin-test-data/{patient}.mha")
container.add(label)

container.saveAll()

if backend=='local':
container.runLocal(pipeline)
elif backend=='pool':
futures.append ( container.runProcessPoolExecutor(pipeline, executor) )
elif backend=='dask':
futures.append ( container.runDaskDistributed(pipeline, dask_client) )

# Block in this process until everything completes
for future in futures:
print (future.result())
# Block in this process until everything completes
for future in futures:
print (future.result())

0 comments on commit ef4e306

Please sign in to comment.