import datetime
import numpy as np
import deepdish as dd

from stytra.utilities import FrameProcess
from multiprocessing import Event, Queue
from queue import Empty
import os

    import av
except ImportError:
    print("PyAv not installed, writing videos in formats other than H5 not possible.")

[docs]class VideoWriter(FrameProcess): """Writes behavior movies into video files using PyAV Parameters ---------- folder output folder input_queue queue of incoming frames finished_signal signal to finish recording kbit_rate ouput movie bitrate """ def __init__(self, input_queue, finished_signal, saving_evt, log_format="hdf5"): super().__init__() self.filename_queue = Queue() self.filename_base = None self.input_queue = input_queue self.finished_signal = finished_signal self.saving_evt = saving_evt self.reset_signal = Event() self.times = [] self.recording = False self.log_format = log_format
[docs] def run(self): while True: toggle_save = False self.reset() while True: try: t, current_frame = self.input_queue.get(timeout=0.01) if self.saving_evt.is_set(): if not self.recording: self.configure(current_frame.shape) self.recording = True self.ingest_frame(current_frame) self.times.append(t) toggle_save = True except Empty: pass if not self.saving_evt.is_set() and toggle_save: self.complete() toggle_save = False if self.reset_signal.is_set() or self.finished_signal.is_set(): self.reset_signal.clear() self.reset() break self.framerate_rec.update_framerate() if self.finished_signal.is_set(): break
[docs] def configure(self, size): self.filename_base = self.filename_queue.get(timeout=0.01)
[docs] def ingest_frame(self, frame): pass
[docs] def complete(self): self.recording = False
[docs] def reset(self): self.recording = False self.times = []
[docs]class H5VideoWriter(VideoWriter): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.frames = []
[docs] def reset(self): super().reset() self.frames = []
[docs] def ingest_frame(self, frame): self.frames.append(frame)
[docs] def complete(self): super().complete() self.filename_base + "video.hdf5", np.array(self.frames, dtype=np.uint8) )
[docs]class StreamingVideoWriter(VideoWriter): def __init__( self, *args, extension="mp4", output_framerate=24, format="mpeg4", kbit_rate=1000, **kwargs ): super().__init__(*args, **kwargs) self.extension = extension self.output_framerate = output_framerate self.format = format self.kbit_rate = kbit_rate self.container = None = None
[docs] def configure(self, shape): super().configure(shape) filename = self.filename_base + "video." + self.extension self.container =, mode="w") = self.container.add_stream(self.format, rate=self.output_framerate), = shape = "AUTO" = self.kbit_rate * 1000 = self.kbit_rate * 200 = "yuv420p"
[docs] def ingest_frame(self, frame): if is None: self.configure(frame.shape) av_frame = av.VideoFrame.from_ndarray(frame, format="gray8") for packet in self.container.mux(packet)
[docs] def reset(self): super().reset() self.container = None = None
[docs] def complete(self): super().complete() for packet in self.container.mux(packet) # Close the file self.container.close()