Source code for

import datetime
import numpy as np
import deepdish as dd

from stytra.utilities import FrameProcess
from multiprocessing import Event, Queue
from queue import Empty
import os

    import av
except ImportError:
    print("PyAv not installed, writing videos in formats other than H5 not possible.")

[docs]class VideoWriter(FrameProcess): """Writes behavior movies into video files using PyAV Parameters ---------- folder output folder input_queue queue of incoming frames finished_signal signal to finish recording kbit_rate ouput movie bitrate """ def __init__(self, input_queue, finished_signal, saving_evt, log_format="hdf5"): super().__init__() self.filename_queue = Queue() self.filename_base = None self.input_queue = input_queue self.finished_signal = finished_signal self.saving_evt = saving_evt self.reset_signal = Event() self.times = [] self.recording = False self.log_format = log_format
[docs] def run(self): while True: toggle_save = False self.reset() while True: try: t, current_frame = self.input_queue.get(timeout=0.01) if self.saving_evt.is_set(): if not self.recording: self.configure(current_frame.shape) self.recording = True self.ingest_frame(current_frame) self.times.append(t) toggle_save = True except Empty: pass if not self.saving_evt.is_set() and toggle_save: self.complete() toggle_save = False if self.reset_signal.is_set() or self.finished_signal.is_set(): self.reset_signal.clear() self.reset() break self.framerate_rec.update_framerate() if self.finished_signal.is_set(): break
[docs] def configure(self, size): self.filename_base = self.filename_queue.get(timeout=0.01)
[docs] def ingest_frame(self, frame): pass
[docs] def complete(self): self.recording = False
[docs] def reset(self): self.recording = False self.times = []
[docs]class H5VideoWriter(VideoWriter): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.frames = []
[docs] def reset(self): super().reset() self.frames = []
[docs] def ingest_frame(self, frame): self.frames.append(frame)
[docs] def complete(self): super().complete() self.filename_base + "video.hdf5", np.array(self.frames, dtype=np.uint8) )
[docs]class StreamingVideoWriter(VideoWriter): def __init__( self, *args, extension="mp4", output_framerate=24, format="mpeg4", kbit_rate=1000, **kwargs ): super().__init__(*args, **kwargs) self.extension = extension self.output_framerate = output_framerate self.format = format self.kbit_rate = kbit_rate self.container = None = None
[docs] def configure(self, shape): super().configure(shape) filename = self.filename_base + "video." + self.extension self.container =, mode="w") = self.container.add_stream(self.format, rate=self.output_framerate), = shape = "AUTO" = self.kbit_rate * 1000 = self.kbit_rate * 200 = "yuv420p"
[docs] def ingest_frame(self, frame): if is None: self.configure(frame.shape) av_frame = av.VideoFrame.from_ndarray(frame, format="gray8") for packet in self.container.mux(packet)
[docs] def reset(self): super().reset() self.container = None = None
[docs] def complete(self): super().complete() for packet in self.container.mux(packet) # Close the file self.container.close()