import numpy as np
import scipy as sp
from scipy import signal
from sekupy.dataset.base import Dataset
from sekupy.dataset.collections import SampleAttributesCollection, \
FeatureAttributesCollection, DatasetAttributesCollection
from sekupy.preprocessing.base import Transformer
import logging
logger = logging.getLogger(__name__)
# This must be moved to __init__
[docs]
class SimulationModel(Transformer):
def __init__(self, name='simulation_model', order=None, noise=None, delay=None, snr=1e6, **kwargs):
self.order = order
self.noise = noise
self.delay = delay
self.snr = snr
# Missing sampling_frequency for dataset
Transformer.__init__(self, name=name)
[docs]
def fit(self, dynamics_model):
a_dict = {
'states': dynamics_model._states,
'sample_frequency': dynamics_model._fs,
'time': dynamics_model._time,
'snr': self.snr
}
sa_dict = {'targets': dynamics_model._dynamics}
a = DatasetAttributesCollection(a_dict)
sa = SampleAttributesCollection(sa_dict)
ds = Dataset(self.data, sa=sa, a=a)
return ds
# TODO: Not proper here!
def _create_ar_matrices(self, matrices, c=2.5, n=20000):
"""This function randomly creates autoregressive matrices
Parameters
----------
matrices : n x n binary matrix
This is the ground truth matrix on which other must be based on.
c : float, optional
Coefficient of coupling strenght (keep it close to default value to avoid
non convergence) see also n specification, by default 2.5
n : int, optional
Number of random trials before raising a convergence error, in that case
try to lower c or increase n, by default 8000
Returns
-------
ar_matrices: n_ored
[description]
"""
n_brain_states = matrices.shape[0]
n_nodes = matrices.shape[1]
full_bs_matrix = np.zeros((n_brain_states, n_nodes, n_nodes, self.order))
for j in range(n_brain_states):
has_converged = False
matrix = np.dstack([matrices[j] for _ in range(self.order)])
FA = np.zeros((n_nodes*self.order, n_nodes*self.order))
eye_idx = n_nodes*self.order - n_nodes
FA[n_nodes:, :eye_idx] = np.eye(eye_idx)
for k in range(n):
A = 1/c * np.random.rand(n_nodes, n_nodes, self.order) * matrix
FA[:n_nodes, :] = np.reshape(A, (n_nodes, -1), 'F')
eig, _ = sp.linalg.eig(FA)
if np.all(np.abs(eig) < 1):
logger.info(k)
has_converged = True
break
if not has_converged:
logger.info("Not converged")
raise Exception("Solutions not found, try to lower c or increase n")
full_bs_matrix[j] = A.copy()
return full_bs_matrix
[docs]
class AutoRegressiveModel(SimulationModel):
def __init__(self, name='ar_model', order=10, noise=0.01, **kwargs):
SimulationModel.__init__(self, name, order, noise, **kwargs)
[docs]
def fit(self, dynamics_model):
# Check if model is fitted (or fit yourself) ?
matrices = dynamics_model._states
bs_sequence = dynamics_model._state_sequence
bs_length = dynamics_model._state_length
ar_matrices = self._create_ar_matrices(matrices)
data = []
for i, state in enumerate(bs_sequence):
length = bs_length[i]
# TODO: move data_bs in superclass and build create data in subclasses
data_bs = self.noise * np.random.randn(length, ar_matrices.shape[1])
for t in np.arange(self.order, length):
for d in range(self.order):
data_bs[t, :] = data_bs[t, :] + np.dot(data_bs[t-d, :], ar_matrices[state, :, :, d])
data.append(data_bs)
data = np.hstack(data)
random_noise = np.random.randn(*data.shape)
data_noise = data + np.sqrt(1/self.snr)*(random_noise/np.std(random_noise))
self.data = data_noise
return SimulationModel.fit(self, dynamics_model)
[docs]
class DelayedModel(SimulationModel):
def __init__(self, name='delayed_model', order=5, noise=1, delay=0.0195, **kwargs):
self.noise = noise
SimulationModel.__init__(self, name, order, noise, delay, **kwargs)
[docs]
def fit(self, dynamics_model):
import matplotlib.pyplot as pl
data = []
matrices_ = dynamics_model._states
bs_sequence = dynamics_model._state_sequence
bs_length = dynamics_model._state_length
ar_matrices = self._create_ar_matrices(matrices_)
for i, state in enumerate(bs_sequence):
m = ar_matrices[state]
adjacency_matrix = np.int_(np.mean(np.abs(m), axis=2) != 0) - np.eye(m.shape[1])
diagonal = np.dstack([np.diag(np.diag(m[..., i])) for i in range(m.shape[-1])])
length = bs_length[i]
# TODO: move data_bs in superclass and build create data in subclasses
remove_idx = np.int_(np.sum(adjacency_matrix, axis=0) == 0)
data_bs = self.noise * np.random.randn(length, ar_matrices.shape[1]) * remove_idx
# TODO: move in superclass, remember diagonal!
for t in np.arange(self.order, length):
for d in range(self.order):
data_bs[t, :] = data_bs[t, :] + np.dot(data_bs[t-d,:], diagonal[:,:,d])
# data_bs[t-d, :] @ diagonal[:, :, d]
leading, following = np.nonzero(adjacency_matrix)
logger.info(ar_matrices.shape)
logger.info((leading, following))
logger.info(data_bs.shape)
for l, f in zip(leading, following):
data_bs[:, f] = data_bs[:, f] + self._get_delayed_signal(data_bs[:, l])
data.append(data_bs)
data = np.vstack(data)
random_noise = np.random.randn(*data.shape)
random_noise /= np.std(random_noise)
data_noise = data + np.sqrt(1/self.snr) * (random_noise)
self.data = data_noise
return SimulationModel.fit(self, dynamics_model)
[docs]
class TimeDelayedModel(DelayedModel):
def __init__(self, name='time_delayed_model', order=5, noise=1,
delay=0.0195, fsample=256, **kwargs):
self.delay = np.int16(delay * fsample)
DelayedModel.__init__(self, name, order, noise, delay, **kwargs)
def _get_delayed_signal(self, leading_signal):
return np.hstack((np.random.randn(self.delay), leading_signal[self.delay:]))
[docs]
class PhaseDelayedModel(DelayedModel):
def __init__(self, name='phase_delayed_model', **kwargs):
DelayedModel.__init__(self, name, **kwargs)
def _get_delayed_signal(self, leading_signal):
hilbert_l = signal.hilbert(leading_signal)
hilbert_f = signal.hilbert(np.random.randn(*leading_signal.shape))
angle = (np.angle(hilbert_l)+self.delay)
shifted_signal = np.abs(hilbert_f) * np.exp(angle*1j)
return np.real(shifted_signal)