"""
Module to interact with video surces as cameras or video files. It also
implement video saving
"""
import numpy as np
from multiprocessing import Queue, Event
from queue import Empty, Full
from lightparam import Param
from lightparam.param_qt import ParametrizedQt
from stytra.utilities import FrameProcess
from arrayqueues.shared_arrays import IndexedArrayQueue
import deepdish as dd
from stytra.hardware.video.cameras import camera_class_dict
from stytra.hardware.video.write import VideoWriter
from stytra.hardware.video.ring_buffer import RingBuffer
import time
[docs]class VideoSource(FrameProcess):
"""Abstract class for a process that generates frames, being it a camera
or a file source. A maximum size of the memory used by the process can be
set.
**Input Queues:**
self.control_queue :
queue with control parameters for the source, e.g. from a
:class:`CameraControlParameters <.interfaces.CameraControlParameters>`
object.
**Output Queues**
self.frame_queue :
TimestampedArrayQueue from the arrayqueues module
where the frames read from the camera are sent.
**Events**
self.kill_signal :
When set kill the process.
Parameters
----------
rotation : int
n of times image should be rotated of 90 degrees
max_mbytes_queue : int
maximum size of camera queue (Mbytes)
Returns
-------
"""
def __init__(self, rotation=False, max_mbytes_queue=200, n_consumers=1):
""" """
super().__init__(name="camera")
self.rotation = rotation
self.control_queue = Queue()
self.frame_queue = IndexedArrayQueue(max_mbytes=max_mbytes_queue)
self.kill_event = Event()
self.n_consumers = 1
self.state = None
[docs] def put_frame(self, frame, messages):
# If the queue is full, arrayqueues should print a warning!
try:
if self.frame_queue.queue.qsize() < self.n_consumers + 2:
self.frame_queue.put(frame)
else:
messages.append("W:Dropped frame")
except NotImplementedError:
try:
self.frame_queue.put(frame)
except Full:
messages.append("W:Dropped frame")
self.update_framerate()
[docs]class CameraSource(VideoSource):
"""Process for controlling a camera.
Cameras currently implemented:
======== ===========================================
Ximea Add some info
Avt Add some info
======== ===========================================
Parameters
----------
camera_type : str
specifies type of the camera (currently supported: 'ximea', 'avt')
downsampling : int
specifies downsampling factor for the camera.
Returns
-------
"""
""" dictionary listing classes used to instantiate camera object."""
def __init__(
self,
camera_type,
*args,
downsampling=1,
roi=(-1, -1, -1, -1),
max_buffer_length=1000,
camera_params=dict(),
**kwargs
):
""" """
super().__init__(*args, **kwargs)
self.cam = None
self.camera_type = camera_type
self.downsampling = downsampling
self.roi = roi
self.camera_params = camera_params
self.max_buffer_length = max_buffer_length
self.state = None
self.ring_buffer = None
[docs] def retrieve_params(self, messages):
while True:
try:
param_dict = self.control_queue.get(timeout=0.0001)
self.state.params.values = param_dict
for param, value in param_dict.items():
ms = self.cam.set(param, value)
try:
messages.extend(list(ms))
except TypeError:
pass
except Empty:
break
[docs] def run(self):
"""
After initializing the camera, the process constantly does the
following:
- read control parameters from the control_queue and set them;
- read frames from the camera and put them in the frame_queue.
"""
if self.state is None:
self.state = CameraControlParameters()
try:
CameraClass = camera_class_dict[self.camera_type]
self.cam = CameraClass(
downsampling=self.downsampling, roi=self.roi, **self.camera_params
)
except KeyError:
raise Exception("{} is not a valid camera type!".format(self.camera_type))
camera_messages = list(self.cam.open_camera())
[self.message_queue.put(m) for m in camera_messages]
prt = None
while True:
# Kill if signal is set:
self.kill_event.wait(0.0001)
if self.kill_event.is_set():
break
# Try to get new parameters from the control queue:
messages = []
if self.control_queue is not None:
self.retrieve_params(messages)
# Grab the new frame, and put it in the queue if valid:
arr = self.cam.read()
if self.rotation:
arr = np.rot90(arr, self.rotation)
res_len = int(round(self.state.framerate * self.state.ring_buffer_length))
if res_len > self.max_buffer_length:
res_len = self.max_buffer_length
self.message_queue.put(
"W:Replay buffer too big, make the plot"
" time range smaller for full replay"
" capabilities"
)
if self.ring_buffer is None or res_len != self.ring_buffer.length:
self.ring_buffer = RingBuffer(res_len)
if self.state.paused:
self.message_queue.put(
"I:Ring_buffer_size:" + str(self.ring_buffer.length)
)
if self.ring_buffer.arr is not None:
self.frame_queue.put(self.ring_buffer.get_most_recent())
else:
self.message_queue.put("E:camera paused before any frames acquired")
prt = None
elif self.state.replay and self.state.replay_fps > 0:
messages.append(
"I:Replaying between {} and {} of {}".format(
*self.state.replay_limits, self.ring_buffer.length
)
)
old_fps = self.framerate_rec.current_framerate
if old_fps is not None:
self.ring_buffer.replay_limits = (
int(round(self.state.replay_limits[0] * old_fps)),
int(round(self.state.replay_limits[1] * old_fps)),
)
try:
self.frame_queue.put(self.ring_buffer.get())
except ValueError:
pass
delta_t = 1 / self.state.replay_fps
if prt is not None:
extrat = delta_t - (time.process_time() - prt)
if extrat > 0:
time.sleep(extrat)
prt = time.process_time()
else:
prt = None
if arr is not None:
try:
self.ring_buffer.put(arr)
except AttributeError:
pass
self.put_frame(arr, messages)
for m in messages:
self.message_queue.put(m)
self.cam.release()
[docs]class VideoFileSource(VideoSource):
"""A class to stream videos from a file to test parts of
stytra without a camera available, or do offline analysis
Parameters
----------
source_file
path of the video file
loop : bool
continue video from the beginning if the end is reached
Returns
-------
"""
def __init__(self, source_file=None, loop=True, **kwargs):
super().__init__(**kwargs)
self.source_file = source_file
self.loop = loop
self.state = None
self.offset = 0
self.paused = False
self.old_frame = None
self.offset = 0
[docs] def inner_loop(self):
pass
[docs] def update_params(self):
while True:
try:
param_dict = self.control_queue.get(timeout=0.0001)
self.state.params.values = param_dict
except Empty:
break
[docs] def run(self):
if self.state is None:
self.state = VideoControlParameters()
if self.source_file.endswith("h5") or self.source_file.endswith("hdf5"):
framedata = dd.io.load(self.source_file)
if isinstance(framedata, np.ndarray):
frames = framedata
else:
frames = framedata["video"]
i_frame = self.offset
prt = None
while not self.kill_event.is_set():
messages = []
# Try to get new parameters from the control queue:
message = ""
if self.control_queue is not None:
self.update_params()
# we adjust the framerate
delta_t = 1 / self.state.framerate
if prt is not None:
extrat = delta_t - (time.process_time() - prt)
if extrat > 0:
time.sleep(extrat)
self.put_frame(frames[i_frame, :, :], messages)
if not self.state.paused:
i_frame += 1
if i_frame == frames.shape[0]:
if self.loop:
i_frame = self.offset
else:
break
for m in messages:
self.message_queue.put(m)
prt = time.process_time()
else:
import av
container = av.open(self.source_file)
container.streams.video[0].thread_type = "AUTO"
container.streams.video[0].thread_count = 1
prt = None
while self.loop:
for framedata in container.decode(video=0):
messages = []
if self.paused:
frame = self.old_frame
else:
frame = framedata.to_ndarray(format="rgb24")
# adjust the frame rate by adding extra time if the processing
# is quicker than the specified framerate
if self.control_queue is not None:
self.update_params()
delta_t = 1 / self.state.framerate
if prt is not None:
extrat = delta_t - (time.process_time() - prt)
if extrat > 0:
time.sleep(extrat)
self.put_frame(frame[:, :, 0], messages)
prt = time.process_time()
self.old_frame = frame
for m in messages:
self.message_queue.put(m)
container.seek(0, whence="frame")
return
[docs]class VideoControlParameters(ParametrizedQt):
def __init__(self, **kwargs):
super().__init__(name="video_params", **kwargs)
self.framerate = Param(
100.0, limits=(10, 700), unit="Hz", desc="Framerate (Hz)"
)
self.offset = Param(50)
self.paused = Param(False)
[docs]class CameraControlParameters(ParametrizedQt):
"""HasPyQtGraphParams class for controlling the camera params.
Ideally, methods to automatically set dynamic boundaries on frame rate and
exposure time can be implemented. Currently not implemented.
Parameters
----------
Returns
-------
"""
def __init__(self, **kwargs):
super().__init__(name="camera_params", **kwargs)
self.exposure = Param(1.0, limits=(0.1, 1000), unit="ms", desc="Exposure (ms)")
self.framerate = Param(
150.0, limits=(1, 700), unit=" Hz", desc="Framerate (Hz)"
)
self.gain = Param(1.0, limits=(0.1, 12), desc="Camera amplification gain")
self.ring_buffer_length = Param(
300, (1, 2000), desc="Rolling buffer that saves the last items", gui=False
)
self.paused = Param(False)
self.replay = Param(True, desc="Replaying", gui=False)
self.replay_fps = Param(
15,
(0, 500),
desc="If bigger than 0, the rolling buffer will be replayed at the given framerate",
)
self.replay_limits = Param((0, 600), gui=False)