import datetime
import numpy as np
import deepdish as dd
from stytra.utilities import FrameProcess
from multiprocessing import Event, Queue
from queue import Empty
import os
try:
import av
except ImportError:
print("PyAv not installed, writing videos in formats other than H5 not possible.")
[docs]class VideoWriter(FrameProcess):
"""Writes behavior movies into video files using PyAV
Parameters
----------
folder
output folder
input_queue
queue of incoming frames
finished_signal
signal to finish recording
kbit_rate
ouput movie bitrate
"""
def __init__(self, input_queue, finished_signal, saving_evt, log_format="hdf5"):
super().__init__()
self.filename_queue = Queue()
self.filename_base = None
self.input_queue = input_queue
self.finished_signal = finished_signal
self.saving_evt = saving_evt
self.reset_signal = Event()
self.times = []
self.recording = False
self.log_format = log_format
[docs] def run(self):
while True:
toggle_save = False
self.reset()
while True:
try:
t, current_frame = self.input_queue.get(timeout=0.01)
if self.saving_evt.is_set():
if not self.recording:
self.configure(current_frame.shape)
self.recording = True
self.ingest_frame(current_frame)
self.times.append(t)
toggle_save = True
except Empty:
pass
if not self.saving_evt.is_set() and toggle_save:
self.complete()
toggle_save = False
if self.reset_signal.is_set() or self.finished_signal.is_set():
self.reset_signal.clear()
self.reset()
break
self.framerate_rec.update_framerate()
if self.finished_signal.is_set():
break
[docs] def ingest_frame(self, frame):
pass
[docs] def complete(self):
self.recording = False
[docs] def reset(self):
self.recording = False
self.times = []
[docs]class H5VideoWriter(VideoWriter):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.frames = []
[docs] def reset(self):
super().reset()
self.frames = []
[docs] def ingest_frame(self, frame):
self.frames.append(frame)
[docs] def complete(self):
super().complete()
dd.io.save(
self.filename_base + "video.hdf5", np.array(self.frames, dtype=np.uint8)
)
[docs]class StreamingVideoWriter(VideoWriter):
def __init__(
self,
*args,
extension="mp4",
output_framerate=24,
format="mpeg4",
kbit_rate=1000,
**kwargs
):
super().__init__(*args, **kwargs)
self.extension = extension
self.output_framerate = output_framerate
self.format = format
self.kbit_rate = kbit_rate
self.container = None
self.stream = None
[docs] def ingest_frame(self, frame):
if self.stream is None:
self.configure(frame.shape)
av_frame = av.VideoFrame.from_ndarray(frame, format="gray8")
for packet in self.stream.encode(av_frame):
self.container.mux(packet)
[docs] def reset(self):
super().reset()
self.container = None
self.stream = None
[docs] def complete(self):
super().complete()
for packet in self.stream.encode():
self.container.mux(packet)
# Close the file
self.container.close()