Source code for omni.pipelines.preprocessing

import os
import json
import shutil
from typing import Dict, List
import nibabel as nib
import numpy as np
from memori.helpers import *
from memori.pathman import append_suffix, repath
from memori import Pipeline, Stage
from omni.interfaces.afni import NwarpApply
from omni.pipelines.anat.masks import weight_mask_and_autobox2
from omni.pipelines.epi.align import align_affine_epi_to_anat, distortion_correction
from omni.affine import _decompose
from .anat.processing import anat_proc, anat_to_epi_results
from .func.processing import func_proc, func_to_epi_results
from .epi.processing import epi_proc


[docs]@create_output_path def pre_proc( output_path: str, anat_path: str = "anat", func_path: str = "func", epi_path: str = "epi", **kwargs ) -> None: """Run Synth preprocessing pipeline. Parameters ---------- output_path: str Output path to write out files to. anat_path: str Subpath for anatomical outputs. func_path: str Subpath for functional outputs. epi_path: str Subpath for EPI outputs. kwargs: dict Various keyword Arguments. """ # run anatomical pipeline anat_output = os.path.join(output_path, anat_path) anat_results = anat_proc(anat_output, **kwargs) # copy anatomically aligned atlas files to an output_data folder anat_output_data = os.path.join(anat_output, "output_data") os.makedirs(anat_output_data, exist_ok=True) if anat_results["t1_atlas"]: shutil.copy2(anat_results["t1_atlas"], anat_output_data) if anat_results["t2_atlas"]: shutil.copy2(anat_results["t2_atlas"], anat_output_data) # open the metadata from the bids sidecar, if metadata not defined. if "metadata" not in kwargs: with open(kwargs["bids_sidecar"], "r") as metadata_file: metadata = json.load(metadata_file) else: metadata = kwargs["metadata"] # run functional pipeline func_output = os.path.join(output_path, func_path) func_results = func_proc( func_output, TR=metadata["RepetitionTime"], slice_times=metadata["SliceTiming"] if "SliceTiming" in metadata else [], **kwargs, ) # copy moco func and rigid body params to output_data folder func_output_data = os.path.join(func_output, "output_data") os.makedirs(func_output_data, exist_ok=True) shutil.copy2(func_results["func_do_moco"], func_output_data) shutil.copy2(func_results["rigid_body_params"], func_output_data) # delete old values ckwargs = kwargs.copy() del ckwargs["t1"] del ckwargs["t2"] # run EPI pipeline epi_output = os.path.join(output_path, epi_path) epi_results = epi_proc( epi_output, **anat_to_epi_results(anat_results), **func_to_epi_results(func_results, (ckwargs["moco"] == "allineate")), **ckwargs, ) # copy atlas aligned epi to output_data folder epi_output_data = os.path.join(epi_output, "output_data") os.makedirs(epi_output_data, exist_ok=True) shutil.copy2(epi_results["raw_epi_atlas"], epi_output_data)
[docs]@create_output_path def make_ref_epi_bet(output_path: str, ref_epi: str, ref_epi_bet_mask: str, initial_warp_field: str, **kwargs): """Function for applying mask to ref epi Parameters ---------- ref_epi: str Reference EPI. ref_epi_bet_mask: str Brain mask for EPI. (NOTE: if initial_warp_field provided this should be a mask after that warp) initial_warp_field: str Initial distortion correction warp field. Returns ------- str Brain extracted EPI. """ # if a warp field is provided, then apply it to the ref_epi if initial_warp_field: ref_epi_warped = repath(output_path, ref_epi) NwarpApply(ref_epi_warped, ref_epi, ref_epi, initial_warp_field) ref_epi = ref_epi_warped # create path ref_epi_bet = repath(output_path, append_suffix(ref_epi, "_bet")) # load data ref_epi_img = nib.load(ref_epi) ref_epi_bet_mask_img = nib.load(ref_epi_bet_mask) # write masked image to file nib.Nifti1Image( ref_epi_img.get_fdata() * ref_epi_bet_mask_img.get_fdata(), ref_epi_img.affine, ref_epi_img.header ).to_filename(ref_epi_bet) # return return ref_epi_bet
[docs]@create_output_path @use_abspaths @use_output_path_working_directory @create_symlinks_to_input_files() def synthunwarp( output_path: str, epi: str, ref_epi: str, ref_epi_bet_mask: str, anat_bet_mask: str, anat_eye_mask: str, t1_debias: str = None, t2_debias: str = None, initial_synth_model: str = "rbf(0;4)+rbf(1;4)+rbf(0;4)*rbf(1;4)", final_synth_model: str = "rbf(0;12)+rbf(1;12)+rbf(0;12)*rbf(1;12)", program: str = "fsl", dilation_size: int = 30, bandwidth: int = 16, initial_affine: str = None, skip_affine: bool = False, skip_synthtarget_affine: bool = False, resolution_pyramid: List[float] = [4, 2, 1], synthtarget_max_iterations: List[int] = [2000, 500, 100], synthtarget_err_tol: List[float] = [1e-4, 1e-4, 5e-4], synthtarget_step_size: List[float] = [1e-3, 1e-3, 1e-3], resample_resolution: float = 1, sigma_t2: float = 0.5, initial_warp_field: str = None, distortion_correction_smoothing: str = "2x1x0x0", distortion_correction_shrink_factors: str = "4x3x2x1", distortion_correction_step_size: List[float] = [3, 1, 0.1], warp_direction: str = "none", noise_mask_dilation_size: int = 2, noise_mask_iterations: int = 20, noise_mask_sigma: float = 2, autobox_mask: str = None, **kwargs, ) -> Dict: """SynthUnwarp Pipeline. Parameters ---------- output_path: str Output path to write out files to. epi: str EPI image. ref_epi: str Reference EPI image. ref_epi_bet_mask: str Reference EPI brain mask. anat_bet_mask: str Anatomical brain mask. anat_eye_mask: str Anatomical eye mask. t1_debias: str Bias field corrected T1 image. t2_debias: str Bias field corrected T2 image. inital_synth_model: str Initial model used to generate synthetic image. final_synth_model: str Final model used to generate synthetic image. program: str Program to use for affine alignment. dilation_size: int Size of dilation kernel for weight mask. bandwidth: int Bandwidth for Epanechnikov kernel. initial_affine: str Initial affine transformation. skip_affine: bool Skip affine alignment step. skip_synthtarget_affine : bool Skip synthtarget affine alignment step. resolution_pyramid: List[float] Resampling pyramid to use for affine alignment (mm). synthtarget_max_iterations: List[int] Max iterations for each SynthTarget call. synthtarget_err_tol: List[float] Error tolerance level for each SynthTarget call. synthtarget_step_size: List[float] Step size for gradient descent. resample_resolution: float Resample resolution space to do warps on (mm). sigma_t2: float Parameter to smooth T2 for initial warp. initial_warp_field: str Uses this file as an initial warp field instead of computing it, this should be from ref_epi -> T2. distortion_correction_step_size : List[float] Set the gradient descent step size for each iteration of warp. distortion_correction_smoothing: str Smoothing kernel size for each level of optimization. distortion_correction_shrink_factors: str Resampling factor for each level of optimization. warp_direction: str Warp direction noise_mask_dilation_size : int Dilation size for noise mask. noise_mask_iterations : int Number of iterations to run noise mask LDA. noise_mask_sigma : float Size of gaussian smoothing kernel for noise mask. autobox_mask : str Mask of autobox on functional Returns ------- Dict Results of pipeline. """ # check obliqueness of data beforehand for data in [epi, ref_epi]: affine = nib.load(data).affine _, R, _, _ = _decompose(affine) if not np.allclose(R, np.eye(3)): raise ValueError("Data is oblique, please deoblique inputs before running synthunwarp.") # make dummy eye mask if not specified if anat_eye_mask is None: # set default flag compute_mask = False # set path for the eye mask anat_eye_mask = os.path.join("input_data", "dummy_eye_mask.nii.gz") # load the brain mask brain_mask = nib.load(anat_bet_mask) # check if dummy eye mask already exists if os.path.exists(anat_eye_mask): # check that the mask is the same size as the bet mask if brain_mask.shape != nib.load(anat_eye_mask).shape: # recompute the mask if it is not the same size compute_mask = True else: # if not, make one compute_mask = True # make the mask if flag raised if compute_mask: dummy_data = np.zeros(brain_mask.shape) nib.Nifti1Image(dummy_data, brain_mask.affine, brain_mask.header).to_filename(anat_eye_mask) # make stages make_ref_epi_bet_stage = Stage( make_ref_epi_bet, stage_outputs=["ref_epi_bet"], hash_output="synthunwarp_0_make_ref_epi_bet", output_path="synthunwarp_0_make_ref_epi_bet", ref_epi=ref_epi, ref_epi_bet_mask=ref_epi_bet_mask, initial_warp_field=initial_warp_field, ) weight_mask_and_autobox_stage = Stage( weight_mask_and_autobox2, stage_outputs=[ "notused0", "anat_weight_mask", "anat_bet_mask", "notused1", "notused2", "t1", "t2", "t1_bet", "t2_bet", ], hash_output="synthunwarp_1_weight_mask_and_autobox", output_path="synthunwarp_1_weight_mask_and_autobox", t1_debias=t1_debias, t2_debias=t2_debias, anat_bet_mask=anat_bet_mask, anat_eye_mask=anat_eye_mask, dilation_size=dilation_size, ) align_affine_epi_to_anat_stage = Stage( align_affine_epi_to_anat, stage_outputs=["final_epi_to_anat_affine", "final_anat_to_epi_affine"], hash_output="synthunwarp_2_align_affine_epi_to_anat", output_path="synthunwarp_2_align_affine_epi_to_anat", initial_synth_model=initial_synth_model, program=program, bandwidth=bandwidth, initial_affine=initial_affine, skip_affine=skip_affine, skip_synthtarget_affine=skip_synthtarget_affine, resolution_pyramid=resolution_pyramid, max_iterations=synthtarget_max_iterations, err_tol=synthtarget_err_tol, step_size=synthtarget_step_size, ) distortion_correction_stage = Stage( distortion_correction, stage_outputs=["final_synth_to_epi_warp", "final_epi_to_synth_warp"], hash_output="synthunwarp_3_distortion_correction", output_path="synthunwarp_3_distortion_correction", epi_moco=epi, ref_epi=ref_epi, final_synth_model=final_synth_model, bandwidth=bandwidth, resample_resolution=resample_resolution, sigma_t2=sigma_t2, use_initializing_warp=kwargs["use_initializing_warp"], skip_synth_warp=kwargs["skip_synth_warp"], contrast_enhance_method=kwargs["contrast_enhance_method"], initial_warp_field=initial_warp_field, SyN_step_size=distortion_correction_step_size, SyN_smoothing=distortion_correction_smoothing, SyN_shrink_factors=distortion_correction_shrink_factors, SyN_field_variance=kwargs["field_variance"], noise_mask_dilation_size=noise_mask_dilation_size, noise_mask_iterations=noise_mask_iterations, noise_mask_sigma=noise_mask_sigma, warp_direction=warp_direction, autobox_mask=autobox_mask, ) # create pipeline pipeline = Pipeline( [ ("start", make_ref_epi_bet_stage), ("start", weight_mask_and_autobox_stage), ((make_ref_epi_bet_stage, weight_mask_and_autobox_stage), align_affine_epi_to_anat_stage), ((weight_mask_and_autobox_stage, align_affine_epi_to_anat_stage), distortion_correction_stage), ] ) # run pipeline pipeline.run() # return results return pipeline.results