Module koma.signal

Expand source code
from scipy.signal import csd
import numpy as np
from scipy.signal import correlate, correlation_lags
from scipy.signal import resample_poly, resample
from scipy.interpolate import interp1d

def xwelch(x, **kwargs):
    '''
    Compute Welch cross-spectral density matrix for given inputs (stacked column-wise).

    Arguments
    -----------
    x : float
        data matrix (column-wise data)
    **kwargs 
        passed on to scipy's csd function for cross-spectral density estimation

    Returns
    -----------
    f : float
        numpy array of frequencies
    cpsd : float
        3d numpy array with CPSD matrix (frequency component on third index)
    
    '''
    f, __ = csd(x[:,0], x[:,0], **kwargs)
    cpsd = np.zeros([x.shape[1], x.shape[1], len(f)]).astype('complex')

    for i, xi in enumerate(x.T):
        for j, xj in enumerate(x.T):
            f, cpsd[i,j,:] = csd(xi, xj, **kwargs)

    return f, cpsd


def estimate_lags(data, ref=0, upsample=None, fs=1.0, upsample_method='fourier'):
    '''
    Compute lags between all channels of data (channels stacked column-wise),
    based on max of correlation.

    Arguments
    -----------
    data : float
        data matrix (column-wise data)
    ref : int, default=0
        reference channel (to define resulting lags, will not affect analysis itself)

    upsample : int, optional
        upsample factor; if not specified the lag will be restricted to a resolution
        given by the original sampling factor
    fs : float, default=1.0
        sampling factor of data; if not given
    upsample_method : str, default='fourier'
        method used to conduct the upsampling {'fourier', 'linear', 'poly'}

    Returns
    -----------
    lags : float
        numpy 1d array with lags between all channels and reference channel;
        if fs is not given (i.e. set to its default value of 1.0), 
        the output defines the sample lag
    
    '''   
    if upsample is not None:
        if upsample_method == 'fourier':
            data = resample_poly(data, upsample, 1.0)
        elif upsample_method == 'linear':
            t = np.arange(0, (data.shape[0])*1/fs, 1/fs)
            fsi = fs*upsample
            ti = np.arange(0, (data.shape[0])*upsample*1/fsi, 1/fsi)
            data = interp1d(t, data, axis=0, fill_value='extrapolate')(ti)
        elif upsample_method == 'poly':
            data = resample(data, upsample)
    else:
        upsample = 1.0
    
    l = np.shape(data)[1]        
    R0 = np.zeros([l, l, 2*data.shape[0]-1])
    
    for dof1 in range(0, l):
        for dof2 in range(0, l):
            R0[dof1, dof2, :] = correlate(data[:, dof2], data[:,dof1], mode='full', method='auto')
   
    lag_length = correlation_lags(data[:, dof1].size, data[:, dof2].size, mode="full")   
    lags = lag_length[np.argmax(R0[ref, :, :], axis=-1)]
    
    return lags/(upsample*fs)


def shift_data(data, lags, cut=True):
    '''
    Shift data by specified lags.

    Arguments
    -----------
    data : float
        data matrix (column-wise data)
    lags : float
        list or numpy 1d array with sample lags (not time lag) to apply to all channels
    cut : boolean, default=True
        whether or not to cut the data to common valid range; if not,
        nans are used

    Returns
    -----------
    data_shifted : float
        shifted data matrix
    '''
    data_shifted = data*np.nan
    for ix, lag in enumerate(lags):
        x = np.arange(0, data.shape[0], 1.0)
        data_shifted[:, ix] = interp1d(x, data[:, ix], axis=0, bounds_error=False)(x+lag)
        
    if cut:
        nan_start = int(np.ceil(np.max([0,-np.min(lags)])))
        nan_end = int(-np.ceil(np.max(lags)))
        
        if nan_end == 0:
            nan_end = None 
        
        data_shifted = data_shifted[nan_start:nan_end, :]
        
    return data_shifted

Functions

def estimate_lags(data, ref=0, upsample=None, fs=1.0, upsample_method='fourier')

Compute lags between all channels of data (channels stacked column-wise), based on max of correlation.

Arguments

data : float
data matrix (column-wise data)
ref : int, default=0
reference channel (to define resulting lags, will not affect analysis itself)
upsample : int, optional
upsample factor; if not specified the lag will be restricted to a resolution given by the original sampling factor
fs : float, default=1.0
sampling factor of data; if not given
upsample_method : str, default='fourier'
method used to conduct the upsampling {'fourier', 'linear', 'poly'}

Returns

lags : float
numpy 1d array with lags between all channels and reference channel; if fs is not given (i.e. set to its default value of 1.0), the output defines the sample lag
def shift_data(data, lags, cut=True)

Shift data by specified lags.

Arguments

data : float
data matrix (column-wise data)
lags : float
list or numpy 1d array with sample lags (not time lag) to apply to all channels
cut : boolean, default=True
whether or not to cut the data to common valid range; if not, nans are used

Returns

data_shifted : float
shifted data matrix
def xwelch(x, **kwargs)

Compute Welch cross-spectral density matrix for given inputs (stacked column-wise).

Arguments

x : float
data matrix (column-wise data)

**kwargs passed on to scipy's csd function for cross-spectral density estimation

Returns

f : float
numpy array of frequencies
cpsd : float
3d numpy array with CPSD matrix (frequency component on third index)