Source code for stytra.collectors.accumulators

from PyQt5.QtCore import QObject, pyqtSignal
import datetime
import numpy as np
from queue import Empty
import pandas as pd
from collections import namedtuple
from bisect import bisect_right
from os.path import basename

from stytra.utilities import save_df


[docs]class Accumulator(QObject): def __init__(self, experiment, name="", max_history_if_not_running=1000): super().__init__() self.name = name self.exp = experiment self.stored_data = [] self.times = [] self.max_history_if_not_running = max_history_if_not_running
[docs]class DataFrameAccumulator(Accumulator): """Abstract class for accumulating streams of data. It is use to save or plot in real time data from stimulus logs or behavior tracking. Data is stored in a list in the stored_data attribute. Specific methods for updating the stored_data list (e.g., by acquiring data from a Queue or a DynamicStimulus attribute) are defined in subclasses of the Accumulator. Data that end up in the stored_data list must be NamedTuples where the first element is a timestamp. Therefore, stored_data of an Accumulator that is fed 2 values will be something like [(t_0, x_0, y_0), (t_0, x_0, y_0), ...] Data can be retrieved from the Accumulator as a pandas DataFrame with the :meth:`get_dataframe() <Accumulator.get_dataframe()>` method. Parameters ---------- fps_calc_points : int number of data points used to calculate the sampling rate of the data. Returns ------- """ """Emitted every change of stimulation, with the index of the new stimulus.""" sig_acc_reset = pyqtSignal() sig_acc_init = pyqtSignal() def __init__(self, *args, fps_calc_points=10, monitored_headers=None, **kwargs): super().__init__(*args, **kwargs) """ """ self.plot_columns = monitored_headers self.fps_calc_points = fps_calc_points self._header_dict = None def __getitem__(self, item): if isinstance(item, tuple): return np.array(getattr(k, item[1]) for k in self.stored_data[item[0]]) if isinstance(item, str): return np.array(getattr(k, item[1]) for k in self.stored_data) @property def t(self): return np.array(self.times)
[docs] def values_at_abs_time(self, time): """ Finds the values in the accumulator closest to the datetime time Parameters ---------- time : datetime time to search for Returns ------- namedtuple of values """ find_time = (time - self.exp.t0).total_seconds() i = bisect_right(self.times, find_time) return self.stored_data[i - 1]
@property def columns(self): try: return ("t",) + self.stored_data[-1]._fields except IndexError: raise ValueError("Accumulator empty, data types not known") @property def header_dict(self): """ for each header name gives the column """ if self._header_dict is None: self._header_dict = {hn: i for i, hn in enumerate(self.columns)} return self._header_dict
[docs] def reset(self, monitored_headers=None): """Reset accumulator and assign a new headers list. Parameters ---------- monitored_headers : list of str List with the headers displayed by default Default value = None) Returns ------- """ self.sig_acc_reset.emit() if monitored_headers is not None: self.plot_columns = monitored_headers self.stored_data = [] self.times = [] self._header_dict = None
[docs] def trim_data(self): if ( not self.exp.protocol_runner.running and len(self.times) > self.max_history_if_not_running * 1.5 ): self.times[: -self.max_history_if_not_running] = [] self.stored_data[: -self.max_history_if_not_running] = []
[docs] def get_fps(self): """ """ try: last_t = self.times[-1] t_minus_dif = self.times[-self.fps_calc_points] return self.fps_calc_points / (last_t - t_minus_dif) except (IndexError, ValueError, ZeroDivisionError, OverflowError): return 0.0
[docs] def get_last_n(self, n=None): """Return the last n data points. Parameters ---------- n : int number of data points to be returned Returns ------- np.array NxJ Array containing the last n data points, where J is the number of values collected at each timepoint + 1 (the timestamp) """ if n is not None: last_n = min(n, len(self.stored_data)) else: last_n = len(self.stored_data) if last_n == 0: return None df = pd.DataFrame.from_records( self.stored_data[-last_n:], columns=self.stored_data[-1]._fields ) df["t"] = np.array(self.times[-last_n:]) return df
[docs] def get_last_t(self, t): """ Parameters ---------- t : float Time window in seconds from which data should be returned Returns ------- np.array NxJ Array containing the last n data points, where J is the number of values collected at each timepoint + 1 (the timestamp) and N is t*fps """ try: n = int(self.get_fps() * t) return self.get_last_n(n) except (OverflowError, ValueError): return self.get_last_n(1)
[docs] def get_dataframe(self): """Returns pandas DataFrame with data and headers. """ return self.get_last_n(len(self.stored_data))
[docs] def save(self, path, format="csv"): """ Saves the content of the accumulator in a tabular format. Choose CSV for widest compatibility, HDF if using Python only, or feather for efficient storage compatible with Python and Julia data frames Parameters ---------- path : str output path, without extension name format : str output format, csv, feather, hdf5, json """ df = self.get_dataframe() if df is None: return saved_filename = save_df(df, path, format) return basename(saved_filename)
[docs] def is_empty(self): return len(self.stored_data) == 0
[docs]class QueueDataAccumulator(DataFrameAccumulator): """General class for retrieving data from a Queue. The QueueDataAccumulator takes as input a multiprocessing.Queue object and retrieves data from it whenever its :meth:`update_list() <QueueDataAccumulator.update_list()>` method is called. All the data are then put in the stored_data list. It is usually connected with a QTimer() timeout to make sure that data from the Queue are constantly retrieved. Parameters ---------- data_queue : (multiprocessing.Queue object) queue from witch to retrieve data. header_list : list of str headers for the data to stored. Returns ------- """ def __init__(self, data_queue, **kwargs): """ """ super().__init__(**kwargs) # Store externally the starting time make us free to keep # only time differences in milliseconds in the list (faster) self.starting_time = None self.data_queue = data_queue
[docs] def update_list(self): """Upon calling put all available data into a list. """ while True: try: # Get data from queue: t, data = self.data_queue.get(timeout=0.001) newtype = False if len(self.stored_data) == 0 or type(data) != type( self.stored_data[-1] ): self.reset() newtype = True # Time in ms (for having np and not datetime objects) t_s = (t - self.exp.t0).total_seconds() # append: self.times.append(t_s) self.stored_data.append(data) self.trim_data() # if the data type changed, emit a signal if newtype: self.sig_acc_init.emit() except Empty: break
[docs]class FramerateAccumulator(Accumulator): def __init__(self, *args, goal_framerate=None, **kwargs): super().__init__(*args, **kwargs) self.goal_framerate = goal_framerate
[docs] def trim_data(self): if len(self.times) > self.max_history_if_not_running * 1.5: self.times[: -self.max_history_if_not_running] = [] self.stored_data[: -self.max_history_if_not_running] = []
[docs] def reset(self): self.times = [] self.stored_data = []
[docs] def update_list(self, fps): self.stored_data.append(fps) self.times.append((datetime.datetime.now() - self.exp.t0).total_seconds())
[docs]class FramerateQueueAccumulator(FramerateAccumulator): """A simple accumulator, just for framerates""" def __init__(self, *args, queue, **kwargs): super().__init__(*args, **kwargs) self.queue = queue
[docs] def update_list(self): while True: try: # Get data from queue: t, fps = self.queue.get(timeout=0.001) # Time in ms (for having np and not datetime objects) t_s = (t - self.exp.t0).total_seconds() # append: self.times.append(t_s) self.stored_data.append(fps) self.trim_data() except Empty: break
[docs]class DynamicLog(DataFrameAccumulator): """Accumulator to save feature of a stimulus, e.g. velocity of gratings in a closed-loop experiment. Parameters ---------- stimuli : list list of the stimuli to be logged """ def __init__(self, stimuli, **kwargs): """ """ self.name = "stimulus_params" self._tupletype = None super().__init__(**kwargs) # it is assumed the first dynamic stimulus has all the fields self.update_stimuli(stimuli) @property def columns(self): try: return ("t",) + self._tupletype._fields except IndexError: raise ValueError("Data type not set for stimulus log")
[docs] def update_list(self, time, data): """ Parameters ---------- data : Returns ------- """ self.times.append(time) self.stored_data.append( self._tupletype(*(data.get(f, np.nan) for f in self._tupletype._fields)) )
[docs] def update_stimuli(self, stimuli): dynamic_params = [] for stimulus in stimuli: try: for new_param in stimulus.dynamic_parameter_names: if not new_param in dynamic_params: dynamic_params.append(new_param) except AttributeError: pass self._tupletype = namedtuple("s", dynamic_params) self.reset()
[docs]class EstimatorLog(DataFrameAccumulator): """ """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.stored_data = []
[docs] def update_list(self, t, data): """ Parameters ---------- data : Returns ------- """ self.times.append(t) self.stored_data.append(data) self.trim_data() if len(self.stored_data) == 1: self.sig_acc_init.emit()