Source code for sekupy.preprocessing.connectivity
import numpy as np
from sekupy.utils.matrix import array_to_matrix, copy_matrix
from sekupy.io.base import add_attributes
from sekupy.preprocessing.base import Transformer
from scipy.spatial.distance import pdist
from scipy import signal
from sekupy.dataset.collections import SampleAttributesCollection
from sekupy.dataset.base import Dataset
import itertools
import logging
logger = logging.getLogger(__name__)
# TODO: Document better!
[docs]
class SingleRowMatrixTransformer(Transformer):
"""This transformer change a row dataset representing a matrix
in a square matrix dataset.
"""
def __init__(self, name='upper_matrix', **kwargs):
Transformer.__init__(self, name=name, **kwargs)
[docs]
def transform(self, ds):
"""This function performs the transformation into
a square matrix dataset
Parameters
----------
ds : pymvpa Dataset
The dataset to be transformed.
Returns
-------
ds : pymvpa Dataset
The transformed dataset
"""
data = np.dstack([copy_matrix(array_to_matrix(a)) for a in ds.samples])
data = np.hstack([d for d in data[:, :]]).T
attr = self._edit_attr(ds, data.shape)
ds_ = Dataset.from_wizard(data)
ds_ = add_attributes(ds_, attr)
return Transformer.transform(self, ds_)
def _edit_attr(self, ds, shape):
attr = dict()
for key in ds.sa.keys():
attr[key] = []
for v in ds.sa[key].value:
attr[key] += [v for _ in range(shape[1])]
attr['roi_labels'] = []
for _ in range(shape[0]/shape[1]):
for i in range(shape[1]):
attr['roi_labels'] += ["roi_%02d" % (i+1)]
logger.debug(shape)
return SampleAttributesCollection(attr)
[docs]
class SpeedEstimator(Transformer):
# TODO: Is it a transformer or an estimator??
def __init__(self, name='speed_tc', **kwargs):
Transformer.__init__(self, name=name, **kwargs)
[docs]
def transform(self, ds, metric='euclidean'):
ds_ = ds.copy()
X = ds.samples
trajectory = [pdist(np.vstack((X[i+1], X[i])), metric) \
for i in range(ds.shape[0]-1)]
ds_.samples = np.array(trajectory)
return Transformer.transform(self, ds_)
[docs]
class AverageEstimator(Transformer):
def __init__(self, name='average_estimator', **kwargs):
Transformer.__init__(self, name=name, **kwargs)
[docs]
def transform(self, ds):
ds_ = ds.copy()
ds_.samples = np.mean(ds_.samples, axis=1, keepdims=True)
return super().transform(ds)()
[docs]
class SlidingWindowConnectivity(Transformer):
def __init__(self, window_length=1, shift=1, overlap=0):
# Units are in seconds ?
self.window_length = window_length
self.shift = shift
self.overlap = overlap
Transformer.__init__(self, name='sliding_window')
[docs]
def transform(self, ds):
"""Connectivity"""
# TODO: Replace with get_data_ds
data = ds.samples
n_edges = int(ds.shape[1] * (ds.shape[1] - 1) * 0.5)
edges = [e for e in itertools.combinations(np.arange(ds.shape[1]), 2)]
window_length = self.window_length * ds.a.sample_frequency
window_start = np.arange(0, (data.shape[0] - window_length + 1),
self.shift)
connectivity_lenght = len(window_start)
timewise_connectivity = np.zeros((connectivity_lenght, n_edges))
for w in window_start:
data_window = data[w:w+window_length, :]
# From here must be included in a function.
phi = np.angle(signal.hilbert(data_window, axis=0))
for e, (x, y) in enumerate(edges):
coh = np.imag(np.exp(1j*(phi[:, x] - phi[:, y])))
iplv = np.abs(np.mean(coh))
timewise_connectivity[w, e] = iplv
ds.samples = timewise_connectivity
ds = self.update_ds(ds, window_start)
return ds
[docs]
def update_ds(self, ds, windows_start):
sa = {}
for k in ds.sa.keys():
sa.update({k: ds.sa[k].value[windows_start]})
ds_ = Dataset(ds.samples, sa=sa, a=ds.a)
return ds_