Source code for omni.masks

"""Functions for image mask creation.
"""
import os
import logging
from typing import Tuple
import numpy as np
import nibabel as nib
from memori.helpers import hashable
from skimage.filters import threshold_otsu
from scipy.ndimage import binary_dilation, binary_closing, binary_fill_holes
from scipy.linalg import eig
from scipy.ndimage import gaussian_filter
from scipy.stats import zscore
from scipy.ndimage import binary_dilation, binary_opening, generate_binary_structure, iterate_structure
from omni.interfaces.common import append_suffix, repath, normalize
from omni.interfaces.afni import Allineate, NwarpApply


# @hashable
[docs]def compute_weight_mask( brain: nib.Nifti1Image, brain_mask: nib.Nifti1Image, eye_mask: nib.Nifti1Image = None, dilation_size: int = 15 ) -> nib.Nifti1Image: """Compute a weight mask for a given brain mask. This function computes a weight mask for a given brain mask. The weight mask is computed by dilating the brain mask by `dilation_size` and then finding the non-brain portions through the use of otsu's method + thresholding + morphology The weight mask is weighted by the number of voxels in brain vs. non-brain. Parameters ---------- brain: nib.Nifti1Image Brain image. brain_mask: nib.Nifti1Image Brain mask. eye_mask: nib.Nifti1Image, optional Eye mask. dilation_size: int Size to dilate brain mask by. Returns ------- nib.Nifti1Image Weight mask. """ # get the data for the brain mask brain_mask_data = brain_mask.get_fdata().astype(bool) # get the brain data brain_data = brain.get_fdata() # ensure they are the same size assert brain_mask_data.shape == brain_data.shape, "Brain mask and brain data must be the same size" # pad the data so later operations don't go out of bounds padded_brain_data = np.pad(brain_data, pad_width=dilation_size, mode="constant") padded_brain_mask_data = np.pad(brain_mask_data, pad_width=dilation_size, mode="constant") # create structural element for dilation strel = generate_binary_structure(3, 3) # now dilate the brain_mask dilated_brain_mask_data = binary_dilation(padded_brain_mask_data, structure=strel, iterations=dilation_size) # now take the difference between the dilated and brain_mask non_brain_mask_data = np.logical_not(padded_brain_mask_data) # within the non-brain mask, find the otsu's method threshold / 2 threshold = threshold_otsu(padded_brain_data[non_brain_mask_data]) / 2 # threshold the brain image thresholded_brain_data = padded_brain_data > threshold # now compute the intersection of the thresholded brain and the non-brain mask non_brain_mask_data2 = np.logical_and( np.logical_and(thresholded_brain_data, dilated_brain_mask_data), non_brain_mask_data ) # now apply a sphere to the non-brain mask with a closing operation strel2 = generate_binary_structure(3, 1) non_brain_mask_data3 = binary_closing(non_brain_mask_data2, structure=strel2, iterations=dilation_size) # and dilate out the mask a bit non_brain_mask_data4 = binary_dilation(non_brain_mask_data3, structure=strel2, iterations=1) # fill in any holes in the mask non_brain_mask_data5 = binary_fill_holes(non_brain_mask_data4) # get the data for the eye mask if available if eye_mask: eye_mask_data = eye_mask.get_fdata() # pad the eye mas padded_eye_mask_data = np.pad(eye_mask_data, pad_width=dilation_size, mode="constant") # and add it to the brain mask padded_brain_data = np.logical_or(padded_brain_data, padded_eye_mask_data) # remove part of non-brain mask overlapping with the brain mask (and eye mask) non_brain_mask_data6 = np.logical_and(non_brain_mask_data5, np.logical_not(padded_brain_mask_data)) # get number of voxels in each mask num_brain_voxels = np.sum(padded_brain_mask_data) num_non_brain_voxels = np.sum(non_brain_mask_data6) # combine the images weight_mask_data = (padded_brain_mask_data / num_brain_voxels) + (0.5 * non_brain_mask_data6 / num_non_brain_voxels) # normalize 0 to 1 weight_mask_data = normalize(weight_mask_data)[ dilation_size:-dilation_size, dilation_size:-dilation_size, dilation_size:-dilation_size ] # return image return nib.Nifti1Image(weight_mask_data, brain_mask.affine)
[docs]@hashable def lda(X: np.ndarray, labels: np.ndarray) -> np.ndarray: """Linear Discriminant Analysis for time series data. Parameters ---------- X: np.ndarray The data in a numpy array. Where rows are samples, cols are features. labels: np.ndarray Labels for data. Corresponds to samples. Returns ------- np.ndarray Projected data. """ # get the mean over all time for each voxel u = np.mean(X, axis=1)[:, np.newaxis] exclude_mask = np.all(X == u, axis=1) # save original array origX = X.copy() # filter voxel using exclude mask X = X[~exclude_mask, :] labels = labels[~exclude_mask] # get mean of each voxel time series (And for each class) M = np.mean(X, axis=0) # mean time series M0 = np.mean(X[labels, :], axis=0) # mean time series of brain voxels M1 = np.mean(X[~labels, :], axis=0) # mean time series of non-brain voxels # get number of nonzero voxels and number of zero voxels N0 = np.count_nonzero(labels) N1 = np.count_nonzero(~labels) # get demeaned time series for each class demean0 = X[labels, :] - M0[np.newaxis, :] demean1 = X[~labels, :] - M1[np.newaxis, :] # get within/between class covariance matrices within_cov = np.matmul(demean0.T, demean0) + np.matmul(demean1.T, demean1) between_cov = N0 * np.matmul((M0 - M)[:, np.newaxis], (M0 - M)[:, np.newaxis].T) + N1 * np.matmul( (M1 - M)[:, np.newaxis], (M1 - M)[:, np.newaxis].T ) # do eigendecomposition W, R = eig(between_cov, within_cov) idx = np.argsort(-W) # sort from largest to smallest eigenvalue orderedR = R[:, idx] # project data onto eigenvectors (only grab real values) return np.real(np.matmul(origX, orderedR))
[docs]@hashable def generate_noise_mask( img: nib.Nifti1Image, mask: nib.Nifti1Image, size: int = 2, iterations: int = 20, sigma: float = 3 ) -> Tuple[np.ndarray, np.ndarray]: """Generates a mask identifying noise and signal voxels. Parameters ---------- img: nib.Nifti1Image Image to construct noise mask on. mask: nib.Nifti1Image Mask outlining a prior guess between noise/signal voxels. size: int Size to dilate noise mask by. iterations: int Number of iterations to run LDA. sigma: float Size of smoothing kernel for weight mask. Returns ------- np.ndarray Noise mask. np.ndarray Signal weight mask. """ # get data; reshape to voxels x time img_data = img.get_fdata() voxel_time = img_data.reshape(np.multiply.reduce(img_data.shape[:3]), img_data.shape[3]) # get class for voxels between brain and skull labels = (mask.get_fdata().ravel() > 0.5).astype(bool) # get z-scored functional time series z_voxel_time = zscore(voxel_time, axis=1) # remove NaN voxels z_voxel_time[np.where(np.isnan(z_voxel_time))] = 0 # do lda to find noisy voxels signal_label = labels num_voxels = 0 for i in range(iterations): logging.info("Iteration %d", i) # get lda on functional data Y0 = lda(voxel_time, signal_label) Y1 = lda(z_voxel_time, signal_label) Y2 = Y0 * Y1 # do lda on second level of lda on first components Y = lda(np.stack((Y0[:, 0], Y1[:, 0], Y2[:, 0]), axis=1), signal_label) # get otsu threshold threshold = threshold_otsu(Y[:, 0]) # get classes separated by otsu's method class1 = Y[:, 0] < threshold class2 = Y[:, 0] > threshold # identify the class corresponding to non-signal non_signal_array = ( class1 if np.sum(np.logical_and(class1, labels)) < np.sum(np.logical_and(class2, labels)) else class2 ) # take intersection between bet mask to get signal dropout areas only signal_drop_out_array = np.logical_and(labels, non_signal_array) current_num_voxels = np.count_nonzero(signal_drop_out_array) logging.info(f"Number of voxels in mask: {current_num_voxels}") if current_num_voxels == num_voxels: logging.info(f"Optimized mask found after {i} iterations.") break num_voxels = current_num_voxels # format noise mask noise_mask_array = signal_drop_out_array noise_mask_data = noise_mask_array.reshape(img.shape[:3]) # get new signal labels signal_label = np.logical_and(signal_label, np.logical_not(noise_mask_data.ravel())) # do opening on noise mask + dilation opened_noise_mask_data = binary_opening(noise_mask_data, generate_binary_structure(3, 1)) dilated_opened_noise_mask_data = binary_dilation( opened_noise_mask_data, iterate_structure(generate_binary_structure(3, 1), size) ) # construct noise mask image noise_mask_img = nib.Nifti1Image(dilated_opened_noise_mask_data.astype("f8"), img.affine, img.header) # smooth noise mask smooth_noise_mask = nib.Nifti1Image( gaussian_filter(noise_mask_img.get_fdata(), sigma), noise_mask_img.affine, noise_mask_img.header ) # return noise mask, signal weight mask return (noise_mask_img, smooth_noise_mask)
[docs]@hashable def make_regression_mask( output_prefix: str, epi: str, anat_bet_mask: str, anat_weight_mask: str, affine: str, iaffine: str, warp: str, iwarp: str, noise_mask_dilation_size: int = 2, noise_mask_iterations: int = 20, noise_mask_sigma: float = 2, ): """Make regression mask. Parameters ---------- output_prefix: str Set prefix for output files. epi: str EPI file to apply LDA. anat_bet_mask: str Anatomical brain mask. anat_weight_mask: str Anatomical weight mask. affine: str Affine transform (anat to func) (afni). iaffine: str Inverse affine transform (func to anat) (afni). warp: str Forward warp (anat to func). iwarp: str Inverse warp (func to anat). noise_mask_dilation_size: int Size to dilate noise mask by. noise_mask_iterations: int Number of iterations to run LDA. noise_mask_sigma: float Size of smoothing kernel for weight mask. Returns ------- str Regression mask. """ # get output path output_path = os.path.dirname(output_prefix) # apply affine to anat mask anat_bet_mask_epispace = append_suffix(repath(output_path, anat_bet_mask), "_epispace") Allineate(anat_bet_mask_epispace, epi, anat_bet_mask, matrix_apply=affine) # warp the anat bet mask anat_bet_mask_epispace_warped = append_suffix(anat_bet_mask_epispace, "_warped") NwarpApply(anat_bet_mask_epispace_warped, epi, anat_bet_mask_epispace, warp) # create noise mask epi_img = nib.load(epi) anat_bet_mask_epispace_warped_img = nib.load(anat_bet_mask_epispace_warped) _, noise_mask_smooth_img = generate_noise_mask( epi_img, anat_bet_mask_epispace_warped_img, noise_mask_dilation_size, noise_mask_iterations, noise_mask_sigma ) noise_mask_smooth = output_prefix + "noise_mask_smooth.nii.gz" noise_mask_smooth_img.to_filename(noise_mask_smooth) # unwarp the noise mask noise_mask_smooth_unwarped = append_suffix(noise_mask_smooth, "_unwarped") NwarpApply(noise_mask_smooth_unwarped, epi, noise_mask_smooth, iwarp) # align back to anat space noise_mask_smooth_unwarped_anatspace = append_suffix(noise_mask_smooth_unwarped, "_anatspace") Allineate(noise_mask_smooth_unwarped_anatspace, anat_bet_mask, noise_mask_smooth_unwarped, matrix_apply=iaffine) # load up weight mask/noise mask anat_weight_mask_img = nib.load(anat_weight_mask) noise_mask_smooth_unwarped_anatspace_img = nib.load(noise_mask_smooth_unwarped_anatspace) regression_mask_data = ( 1 - normalize(noise_mask_smooth_unwarped_anatspace_img.get_fdata()) ) * anat_weight_mask_img.get_fdata() regression_mask = output_prefix + "regression_mask.nii.gz" nib.Nifti1Image(regression_mask_data, anat_weight_mask_img.affine, anat_weight_mask_img.header).to_filename( regression_mask ) # return regression mask return regression_mask