Source code for sekupy.results.base

import pandas as pd
import json
import os
from scipy.io import loadmat
from scipy.stats import ttest_1samp
import numpy as np
from itertools import product
from joblib import Parallel, delayed
import logging
import warnings

logger = logging.getLogger(__name__)



[docs] def get_values(path, directory, field_list, result_keys): """Extract analysis results and configuration from a results directory. This function reads configuration and result files from an analysis directory and extracts specified fields and scores for further analysis. Parameters ---------- path : str Base path containing analysis results directory : str Specific directory name containing the results field_list : list List of configuration fields to extract result_keys : list or None Additional result keys to extract from .mat files Returns ------- list List of dictionaries containing extracted field values and scores """ dir_path = os.path.join(path, directory) conf_fname = os.path.join(dir_path, "configuration.json") with open(conf_fname) as f: conf = json.load(f) fields, scores = get_configuration_fields(conf, *field_list) files = os.listdir(dir_path) files = [f for f in files if f.find(".mat") != -1] results = [] for fname in files: fname_split = fname.split("_") fields['roi'] = "_".join(fname_split[:-4]) fields['roi_value'] = np.float16(fname_split[-4]) fields['permutation'] = np.float16(fname_split[-2]) data = loadmat(os.path.join(dir_path, fname)) logger.debug(data.keys()) for score in scores: test_score = [k.find(score) != -1 for k in list(data.keys())] if not np.any(np.array(test_score)): score = 'score' for i, s in enumerate(data['test_%s' % (score)].squeeze()): fields["score_%s" % (score)] = s fields['fold'] = i+1 logger.debug(fields) if result_keys is not None: for k in result_keys: values = data[k].squeeze() fields[k] = values[i].squeeze().copy() fields_ = fields.copy() results.append(fields_) return results
[docs] def get_results(path, pipeline_name, field_list=['sample_slicer'], result_keys=None, filter=None, n_jobs=-1, verbose=1): """This function is used to collect the results from analysis folders. Parameters ---------- path : str The pathname of the folder in which results are stored pipeline_name : str The id / pattern to be used to filter folders. It is often the id of the Analysis Pipeline used. field_list : list, optional List of different condition used by the AnalysisIterator (the default is ['sample_slicer'], which is a fields of the configuration) result_keys : list, optional List of strings indicating the other fields to get from the result (e.g. cross_validation folds) filter : dictionary, optional This is used to filter dataset and include only fields or conditions. See ```sekupy.preprocessing.SampleSlicer``` for an example of dictionary (the default is None, which [default_description]) Returns ------- dataframe : pandas dataframe A table of the results in pandas format """ # TODO: Optimize memory dir_analysis = os.listdir(path) dir_analysis = [d for d in dir_analysis if d.find(pipeline_name) != -1 and d.find(".") == -1] dir_analysis.sort() logger.info("Loading %d files..." %(len(dir_analysis))) results = Parallel(n_jobs=n_jobs, verbose=verbose)(delayed(get_values)(path, d, field_list, result_keys) \ for d in dir_analysis) results = [item for sublist in results for item in sublist] dataframe = pd.DataFrame(results) if filter is not None: dataframe = filter_dataframe(dataframe, **filter) return dataframe
[docs] def ttest_values(dataframe, keys, scores=["accuracy"], popmean=0.5): # TODO: Documentation # TODO: Multiple scores (test) options = {k: np.unique(dataframe[k]) for k in keys} keys, values = options.keys(), options.values() opts = [dict(zip(keys, items)) for items in product(*values)] p_values = [] for item in opts: cond_dict = {k: v for k, v in item.items()} item = {k: [v] for k, v in item.items()} df_true = dataframe.copy() df_true = filter_dataframe(df_true, **item) for score in scores: values_score = df_true[score].values t, p = ttest_1samp(values_score, popmean) cond_dict[score+'_avg'] = np.mean(values_score) cond_dict[score+'_t'] = t cond_dict[score+'_p'] = p p_values.append(cond_dict) return pd.DataFrame(p_values)
[docs] def get_permutation_values(dataframe, keys, scores=["accuracy"], permutation_key='permutation'): # TODO: Document # TODO: Multiple scores (test) # TODO: Cast permutation to int # TODO: Issue #56 #keys = ['band', 'targets', 'permutation', "C", "k", "n_splits"] df_perm = dataframe.loc[np.int_(dataframe[permutation_key].values) != 0] table_perm = pd.pivot_table(df_perm, values=scores, index=keys, aggfunc=np.mean).reset_index() options = {k:np.unique(table_perm[k]) for k in keys} n_permutation = options.pop(permutation_key)[-1] keys, values = options.keys(), options.values() opts = [dict(zip(keys, items)) for items in product(*values)] p_values = [] for item in opts: cond_dict = {k: v for k, v in item.items()} item = {k: [v] for k, v in item.items()} df_true = dataframe.copy() df_permutation = table_perm.copy() df_permutation = filter_dataframe(df_permutation, **item) item.update({permutation_key: [0]}) df_true = filter_dataframe(df_true, **item) for score in scores: if 'fx' in keys: score = 'score_%s' % (cond_dict['fx']) df_avg = np.nanmean(df_true[score].values) permutation_values = df_permutation[score].values n_values = (np.count_nonzero(permutation_values > df_avg) + 1) p = n_values / float(n_permutation) cond_dict[score+'_perm'] = np.nanmean(df_permutation[score].values) cond_dict[score+'_true'] = np.nanmean(df_true[score].values) cond_dict[score+'_p'] = p p_values.append(cond_dict) return pd.DataFrame(p_values)
[docs] def get_configuration_fields(conf, *args): """This function is used to collect fields from the configuration file. Parameters ---------- conf : dictionary The configuration dictionary to be digged. args : list of strings List of keywords to be found in the configuration file. Returns ------- [type] [description] """ # TODO: Complete documentation import ast results = dict() fixed_items = ['id', 'num'] for item in fixed_items: if item in list(conf.keys()): results[item] = conf[item] else: results[item] = "None" for k, v in conf.items(): for arg in args: if arg == k == 'ds__img_pattern': results[arg] = v if arg == k == 'prepro': value = ast.literal_eval(v) results[arg] = "_".join(value) idx_end = len(arg) + 2 # len("__") if k[:idx_end] == arg+"__": try: value = ast.literal_eval(v) except ValueError as _: if str(k[idx_end:]) != 'prepro': results[str(k[idx_end:])] = v else: results[str(k[idx_end:])] += v continue if isinstance(value, list): value = [str(v) for v in value] if len(value) != 1: value = "_".join(value) else: value = value[0] results[str(k[idx_end:])] = value if arg == k: results[k] = v scores = None if 'scores' in conf.keys(): scores = ast.literal_eval(conf['scores']) elif 'analysis__scoring' in conf.keys(): scores = conf['analysis__scoring'] return results, scores
[docs] def get_searchlight_results(path, pipeline_name, field_list=['sample_slicer'], load_cv=False): # TODO: Mind BIDS! dir_analysis = os.listdir(path) dir_analysis = [d for d in dir_analysis if d.find(pipeline_name) != -1 and d.find(".") == -1] dir_analysis.sort() results = [] for d in tqdm(dir_analysis): # read json conf_fname = os.path.join(path, d, "configuration.json") with open(conf_fname) as f: conf = json.load(f) # TODO: Check if permutation is in fields fields, scores = get_configuration_fields(conf, *field_list) files = os.listdir(os.path.join(path, d)) files = [f for f in files if f.find(".nii.gz") != -1] files.sort() if not load_cv: files = [f for f in files if f.find("avg") != -1] for fname in tqdm(files): fname_split = fname.split("_") fields['measure'] = "_".join(fname_split[:-3]) fields['permutation'] = np.float_(fname_split[-2]) fields['map'] = os.path.join(path, d, fname) fields_ = fields.copy() results.append(fields_) dataframe = pd.DataFrame(results) return dataframe
[docs] def get_connectivity_results(path, dir_id, field_list=['sample_slicer'], load_cv=False): dir_analysis = os.listdir(path) dir_analysis = [d for d in dir_analysis if d.find(dir_id) != -1 and d.find(".") == -1] dir_analysis.sort() results = [] for d in tqdm(dir_analysis): # read json conf_fname = os.path.join(path, d, "configuration.json") with open(conf_fname) as f: conf = json.load(f) # TODO: Check if permutation is in fields fields, scores = get_configuration_fields(conf, *field_list) data = loadmat(os.path.join(path, d, "connectivity_data.mat")) fields['data'] = data['matrix'] fields_ = fields.copy() results.append(fields_) dataframe = pd.DataFrame(results) return dataframe
[docs] def filter_dataframe(dataframe, return_mask=False, return_null=False, **selection_dict): _symbols = ['!', '<', '>'] selection_mask = np.ones(dataframe.shape[0], dtype=bool) for key, values in selection_dict.items(): ds_values = dataframe[key].values condition_mask = np.zeros_like(ds_values, dtype=bool) for value in values: if str(value)[0] == '!': array_val = np.array(value[1:]).astype(ds_values.dtype) condition_mask = np.logical_or(condition_mask, ds_values != array_val) else: condition_mask = np.logical_or(condition_mask, ds_values == value) selection_mask = np.logical_and(selection_mask, condition_mask) if np.count_nonzero(selection_mask) == 0 and return_null == False: warnings.warn("No rows in filtered dataframe. Check selection field spelling or datatype.") if return_mask: return dataframe.loc[selection_mask], selection_mask return dataframe.loc[selection_mask]
[docs] def aggregate_searchlight(path, dir_id, filter): """This should be used for a within subject analysis to collect data from different folders / subjects and collect results. Be aware of the different parameters of the analysis. So the best approach is to use get_searchlight_results and then use that to aggregate. Parameters ---------- path : [type] [description] dir_id : [type] [description] """ dataframe = get_searchlight_results(path, dir_id, field_list=['sample_slicer'], load_cv=False) return
[docs] def dataframe_to_afni(dataframe, outpath=None, command='3dttest++', label_attr='task', **filter): """This should return a command or similar to perform statistics in AFNI Use filter to select fields of interest """ filtered = filter_dataframe(dataframe, **filter) command = "3dttest++ -singletonA 0.5 -setB %s -prefix %s" setB = "" for i, sub in dataframe.iterrows(): setB += "sub%02d %s'[0]' " % (i+1, dataframe['filename']) command = command % (setB, outpath) return command