Source code for sekupy.simulation.connectivity
import numpy as np
import itertools
from sekupy.preprocessing.base import Transformer
import logging
logger = logging.getLogger(__name__)
[docs]
class ConnectivityStateSimulator(Transformer):
def __init__(self, n_nodes=10, max_edges=5, fsamp=128,
n_brain_states=6, length_dynamics=100,
state_duration={'min_time': 2.5, 'max_time': 3.5},
method='random'):
"""This class is used to simulate the connectivity state dynamics.
We used a trick to use it in the loader simulator: usually a loader
fetches data from files while simulation loader creates a dynamics of the
system and then passes itself to the model which generates a dataset based on
the simulated dynamics.
Parameters
----------
n_nodes : int, optional
Number of nodes used in the simulation (e.g. brain sources), by default 10
max_edges : int, optional
Maximum number of simultaneous connections, by default 5
fsamp : int, optional
Sampling frequency of the brain nodes signal, by default 128
n_brain_states : int, optional
Number of brain states in the dynamics, by default 6
length_dynamics : int, optional
Length of the dynamics (e.g. number of brain state events), by default 100
state_duration : dict, optional
Duration of a state:
- it can be a dictionary with max_time and min_time, and duration is uniformly
sampled between min and max
- it can be a dictionary with distribution and parameters keywords to specify
how state duration should be generated (see np.random)
method : str, optional
method used to generate brain dynamics, by default 'random'
"""
edges = [e for e in itertools.combinations(np.arange(n_nodes), 2)]
n_edges = len(edges)
states = []
for i in range(max_edges):
states += [e for e in itertools.combinations(np.arange(n_edges), i+1)]
states = np.array(states)
self._edges = edges
self._n_edges = len(edges)
self._edges_states = states
self._n_states = len(states)
self._method = method
self._n_nodes = n_nodes
self._max_edges = max_edges
self._fs = fsamp
self._n_brain_states = n_brain_states
self._duration = state_duration
self._length_dynamics = length_dynamics
Transformer.__init__(self, name='connectivity_state_simulator')
[docs]
def fit(self):
# Randomize random stuff
states_idx = np.random.randint(0, self._n_states, self._n_brain_states)
selected_states = self._edges_states[states_idx]
bs_length = self.generate_duration()
# This is done using Hidden Markov Models but since
# Transition matrix is uniformly distributed we can use random sequency
# mc = mcmix(nBS,'Fix',ones(nBS)*(1/nBS));
# seqBS= simulate(mc,nbs_sequence-1);
bs_sequence = self.simulate_dynamics()
bs_dynamics = []
for i, time in enumerate(bs_length):
for _ in range(time):
bs_dynamics.append(bs_sequence[i])
bs_dynamics = np.array(bs_dynamics)
brain_matrices = []
for j in range(self._n_brain_states):
matrix = np.eye(self._n_nodes)
selected_edges = selected_states[j]
for edge in selected_edges:
matrix[self._edges[np.array(edge)]] = 1
brain_matrices.append(matrix)
brain_matrices = np.array(brain_matrices)
self._states = brain_matrices
self._state_length = bs_length
self._state_sequence = bs_sequence
self._dynamics = bs_dynamics
self._time = self._duration['params']
return self
[docs]
def simulate_dynamics(self, method='random'):
if self._method == 'random':
# Returns a random array of size `length_dynamics`
# with elements bounded between 0 and `n_states`
return np.random.randint(0,
self._n_brain_states,
self._length_dynamics)
[docs]
def generate_duration(self):
duration = {}
if ('max_time' in self._duration.keys()) or \
('min_time' in self._duration.keys()):
duration['distribution'] = np.random.uniform
duration['params'] = {'low': self._duration['min_time'],
'high': self._duration['max_time']}
# TODO: check that distribution is numpy random style
else:
duration = self._duration.copy()
logger.debug(duration)
duration['params'].update({'size': self._length_dynamics})
has_zero = True
while has_zero:
data = duration['distribution'](**duration['params'])
data = np.int_(self._fs * np.abs(data))
# logger.info(data)
if np.count_nonzero(data) == 0:
has_zero = False
self._duration = duration
return data