Source code for stytra.hardware.video.ring_buffer

import numpy as np


[docs]class RingBuffer: def __init__(self, length): self.length = length self.arr = None self.insert_idx = length - 1 self.read_idx = 0 self.replay_limits = (0, self.length)
[docs] def put(self, item): try: if ( self.arr is None or self.arr.shape[1:] != item.shape or self.arr.dtype != item.dtype ): self.insert_idx = 0 self.arr = np.zeros((self.length,) + item.shape, item.dtype) self.arr[self.insert_idx] = item self.insert_idx = (self.insert_idx + 1) % self.length self.read_idx = 0 except AttributeError as e: print(e)
[docs] def get(self): if self.arr is None: raise ValueError("Trying to get an item from an empty buffer") replay_range = self.replay_limits[1] - self.replay_limits[0] actual_read_idx = ( self.replay_limits[0] + self.read_idx + self.insert_idx ) % self.length out = self.arr[actual_read_idx, :, :] self.read_idx = (self.read_idx + 1) % replay_range return out
[docs] def get_most_recent(self): return self.arr[(self.insert_idx + 1) % self.length]