Source code for stytra.stimulation

import datetime
from copy import deepcopy

from PyQt5.QtCore import pyqtSignal, QTimer, QObject
from stytra.stimulation.stimuli import Pause, DynamicStimulus
from stytra.collectors.accumulators import DynamicLog, FramerateAccumulator
from stytra.utilities import FramerateRecorder
from lightparam.param_qt import ParametrizedQt, Param

import logging


[docs]class ProtocolRunner(QObject): """Class for managing and running stimulation Protocols. It is thought to be integrated with the stytra.gui.protocol_control.ProtocolControlWidget GUI. In stytra Protocols are parameterized objects required just for generating a list of Stimulus objects. The engine that run this sequence of Stimuli is the ProtocolRunner class. A ProtocolRunner instance is not bound to a single Protocol object: - new Protocols can be set via the self.set_new_protocol() function; - current Protocol can be updated (e.g., after changing parameters). New Protocols are set by their name (a way for restoring state from the config.h5 file), but can also be set by passing a Protocol() class to the internal _set_new_protocol() method. Every time a Protocol is set or updated, the ProtocolRunner uses its get_stimulus_sequence() method to generate a new list of stimuli. For running the Protocol (i.e., going through the list of Stimulus objects keeping track of time), ProtocolRunner has an internal QTimer whose timeout calls the timestep() method, which: - checks elapsed time from beginning of the last stimulus; - if required, updates current stimulus state - if elapsed time has passed stimulus duration, changes current stimulus. Parameters ---------- experiment : :obj:`stytra.experiment.Experiment` the Experiment object where directory, calibrator *et similia* will be found. dt : float (optional) timestep for protocol updating. log_print : Bool (optional) if True, print stimulus log. protocol : str (optional) name of protocol to be set at the beginning. **Signals** """ sig_timestep = pyqtSignal(int) """Emitted at every timestep with the index of the current stimulus.""" sig_stim_change = pyqtSignal(int) """Emitted every change of stimulation, with the index of the new stimulus.""" sig_protocol_started = pyqtSignal() """Emitted when the protocol sequence starts.""" sig_protocol_finished = pyqtSignal() """Emitted when the protocol sequence ends.""" sig_protocol_updated = pyqtSignal() # parameters changed in the protocol """Emitted when protocol is changed/updated""" sig_protocol_interrupted = pyqtSignal() def __init__(self, experiment=None, target_dt=0, log_print=True): """ """ super().__init__() self.experiment = experiment self.target_dt = target_dt self.t_end = None self.completed = False self.t = 0 self.timer = QTimer() self.timer.timeout.connect(self.timestep) # connect timer to update fun self.timer.setSingleShot(False) self.protocol = experiment.protocol self.stimuli = [] self.i_current_stimulus = None # index of current stimulus self.current_stimulus = None # current stimulus object self.past_stimuli_elapsed = None # time elapsed in previous stimuli self.dynamic_log = None # dynamic log for stimuli self._set_new_protocol(self.protocol) self.update_protocol() self.protocol.sig_param_changed.connect(self.update_protocol) # Log will be a list of stimuli states: self.log = [] self.log_print = log_print self.running = False self.framerate_rec = FramerateRecorder() self.framerate_acc = FramerateAccumulator(experiment=self.experiment) def _set_new_protocol(self, protocol): """Set new Protocol. Parameters ---------- protocol : :obj:`stytra.experiment.Protocol` Protocol to be set. """ if protocol is not None: self.protocol = protocol # Connect changes to protocol parameters to update function: self.protocol.sig_param_changed.connect(self.update_protocol) self.experiment.dc.add(self.protocol) # Why were we resetting here? self.reset() self.update_protocol() self.sig_protocol_updated.emit()
[docs] def update_protocol(self): """Update current Protocol (get a new stimulus list if protocol exist. """ if self.protocol is not None: self.stimuli = self.protocol._get_stimulus_list() self.current_stimulus = self.stimuli[0] # pass experiment to stimuli for calibrator and asset folders: for stimulus in self.stimuli: stimulus.initialise_external(self.experiment) if self.dynamic_log is None: self.dynamic_log = DynamicLog(self.stimuli, experiment=self.experiment) else: self.dynamic_log.update_stimuli(self.stimuli) # new stimulus log self.sig_protocol_updated.emit()
[docs] def reset(self): """Make the protocol ready to start again. Reset all ProtocolRunner and stimuli timers and elapsed times. """ self.t_end = None self.completed = False self.t = 0 for stimulus in self.stimuli: stimulus._started = None stimulus._elapsed = 0.0 self.i_current_stimulus = 0 if len(self.stimuli) > 0: self.current_stimulus = self.stimuli[0] else: self.current_stimulus = None
[docs] def start(self): """Start the protocol by starting the timers. """ # Updating protocol before starting has been added to include changes # to the calibrator that are considered only in initializing the # stimulus and not while it is running (e.g., gratings). Consider # removing if it slows down significantly the starting event. self.update_protocol() self.log = [] self.experiment.logger.info("{} protocol started...".format(self.protocol.name)) self.past_stimuli_elapsed = self.experiment.t0 self.current_stimulus.started = self.experiment.t0 self.sig_protocol_started.emit() self.running = True self.current_stimulus.start() # start the timer self.timer.start(self.target_dt)
[docs] def timestep(self): """Update displayed stimulus. This function is the core of the ProtocolRunner class. It is called by every timer timeout. At every timestep, if protocol is running: - check elapsed time from beginning of the last stimulus; - if required, update current stimulus state - if elapsed time has passed stimulus duration, change current stimulus. """ if self.running: # Get total time from start in seconds: self.t = (datetime.datetime.now() - self.experiment.t0).total_seconds() # Calculate elapsed time for current stimulus: self.current_stimulus._elapsed = ( datetime.datetime.now() - self.past_stimuli_elapsed ).total_seconds() # If stimulus time is over: if self.current_stimulus._elapsed > self.current_stimulus.duration: self.current_stimulus.stop() self.sig_stim_change.emit(self.i_current_stimulus) self.update_log() # Is this stimulus was also the last one end protocol: if self.i_current_stimulus >= len(self.stimuli) - 1: self.completed = True self.sig_protocol_finished.emit() else: # Update the variable which keeps track when the last # stimulus *should* have ended, in order to avoid # drifting: self.past_stimuli_elapsed += datetime.timedelta( seconds=float(self.current_stimulus.duration) ) self.i_current_stimulus += 1 self.current_stimulus = self.stimuli[self.i_current_stimulus] self.current_stimulus.start() self.current_stimulus.update() # use stimulus update function self.sig_timestep.emit(self.i_current_stimulus) # If stimulus is a constantly changing stimulus: if isinstance(self.current_stimulus, DynamicStimulus): self.sig_stim_change.emit(self.i_current_stimulus) self.update_dynamic_log() # update dynamic log for stimulus self.framerate_rec.update_framerate() if self.framerate_rec.i_fps == self.framerate_rec.n_fps_frames - 1: self.framerate_acc.update_list(self.framerate_rec.current_framerate)
[docs] def stop(self): """Stop the stimulation sequence. Update log and stop timer. """ if not self.completed: # if protocol was interrupted, update log anyway self.update_log() self.experiment.logger.info( "{} protocol interrupted.".format(self.protocol.name) ) else: self.experiment.logger.info( "{} protocol finished.".format(self.protocol.name) ) if self.running: self.running = False self.t_end = datetime.datetime.now() self.timer.stop() self.i_current_stimulus = 0 self.t = 0 self.sig_protocol_interrupted.emit()
[docs] def update_log(self): """Append the log appending info from the last stimulus. Add to the stimulus info from Stimulus.get_state() start and stop times. """ # Update with the data of the current stimulus: current_stim_dict = self.current_stimulus.get_state() t_stim_stop = current_stim_dict["real_time_stop"] or datetime.datetime.now() try: new_dict = dict( current_stim_dict, t_start=( current_stim_dict["real_time_start"] - self.experiment.t0 ).total_seconds(), t_stop=(t_stim_stop - self.experiment.t0).total_seconds(), ) except TypeError as e: # if time is None stimulus was not run new_dict = dict() logging.getLogger().info("Stimulus times incorrect, state not saved") self.log.append(new_dict)
[docs] def update_dynamic_log(self): """ Update a dynamic log. Called only if one is present. """ self.dynamic_log.update_list(self.t, self.current_stimulus.get_dynamic_state())
@property def duration(self): """Get total duration of the protocol in sec, calculated from stimuli durations. Returns ------- float : protocol length in seconds. """ duration = 0 for stim in self.stimuli: duration += stim.duration return duration # def get_duration(self): # """Get total duration of the protocol in sec, calculated from stimuli # durations. # ## Returns # ------- # float : # protocol length in seconds. # """ # total_duration = 0 # for stim in self.stimuli: # total_duration += stim.duration # return total_duration
[docs] def print(self): """Print protocol sequence. """ string = "" for stim in self.stimuli: string += "-" + stim.name print(string)
[docs]class Protocol(ParametrizedQt): """Describes a dynamically-generated sequence of Stimuli and their parameters. Protocol objects generate a list of stimuli, that can optionally be parametrized, so that variants can be made by changing the protocol parameters in the Stytra GUI. The method :meth:`Protocol.get_stim_sequence() <~stytra.stimulation.Protocol.get_stim_sequence>` should be redefined for each experimental protocol: it is called by the ProtocolRunner and it generates a list of stimuli that will be presented during the protocol. When defining new protocols, subclass this class and redefine :meth:`~stytra.stimulation.Protocol.get_stim_sequence`. By default, all protocols have an optinal initial and final pause and a parameter n_repetitions that specifies the number of times the sequence from :meth:`Protocol.get_stim_sequence() <~stytra.stimulation.Protocol.get_stim_sequence>` will be repeated. Note ---- Everything concerning calibration, or asset directories that have to be passed to the stimulus is handled in the ProtocolRunner class to leave this class as light as possible. Parameters ---------- Returns ------- """ name = "" """Name of the protocol.""" def __init__(self): """ Add standard parameters common to all kind of protocols. """ try: assert len(self.__class__.name) > 0 except AssertionError: raise ValueError("Protocol does not have a specified name") super().__init__(name="stimulus/protocol/" + self.__class__.name) self.pre_pause = Param(0.0, limits=(0.0, 10000.0)) self.post_pause = Param(0.0, limits=(0.0, 10000.0)) self.n_repeats = Param(1, limits=(1, 10000)) def _get_stimulus_list(self): """Generate protocol from specified parameters. Called by the ProtocolRunner class where the Protocol instance is defined. This function puts together the stimulus sequence defined by each child class with the initial and final pause and repeats it the specified number of times. It should not change in subclasses. Parameters ---------- Returns ------- list : list of stimuli """ main_stimuli = self.get_stim_sequence() stimuli = [] if self.pre_pause > 0: stimuli.append(Pause(duration=self.pre_pause)) # self.params[ # "pre_pause"])) for i in range(self.n_repeats): stimuli.extend(deepcopy(main_stimuli)) if self.post_pause > 0: stimuli.append(Pause(duration=self.post_pause)) return stimuli
[docs] def get_stim_sequence(self): """To be specified in each child class to return the proper list of stimuli. Parameters ---------- Returns ------- """ return []