"""
Module contains a wx video player widget and a wx video controller widget. Uses multi-threading to load frames to a
deque while playing them, allowing for smoother playback...
"""
import dataclasses
import time
from typing import Optional, Tuple
import wx
from wx.lib.newevent import NewCommandEvent
import cv2
from collections import deque
import numpy as np
from diplomat.utils.video_info import get_frame_count_robust_fast
from diplomat.utils.video_io import ContextVideoCapture
[docs]
def read_frame(
video_hdl: cv2.VideoCapture, frame_idx: Optional[int] = None
) -> Tuple[bool, int, np.ndarray]:
valid_frame = False
frame = None
if frame_idx is not None:
if tell_frame(video_hdl) != frame_idx:
seek_frame(video_hdl, frame_idx)
if video_hdl.isOpened():
valid_frame, frame = video_hdl.read()
if not valid_frame:
frame = np.zeros(
(
int(video_hdl.get(cv2.CAP_PROP_FRAME_HEIGHT)),
int(video_hdl.get(cv2.CAP_PROP_FRAME_WIDTH)),
3,
),
dtype=np.uint8,
)
return valid_frame, int(video_hdl.get(cv2.CAP_PROP_POS_FRAMES)), frame
[docs]
def seek_frame(video_hdl: cv2.VideoCapture, new_loc: int):
video_hdl.set(cv2.CAP_PROP_POS_FRAMES, new_loc)
[docs]
def tell_frame(video_hdl: cv2.VideoCapture) -> int:
return int(video_hdl.get(cv2.CAP_PROP_POS_FRAMES))
# Represents (x, y, width, height)
Box = Tuple[int, int, int, int]
Coord = Tuple[float, float]
IntCoord = Tuple[int, int]
[docs]
@dataclasses.dataclass
class ZoomConfig:
key: Optional[wx.KeyCode] = None
min_zoom: float = 1
max_zoom: float = 20
zoom_slow_down: float = 1000
min_move_refresh: float = 10
[docs]
class VideoPlayer(wx.Control):
"""
A video player for wx Widgets, Using cv2 for solid cross-platform video support. Can play video, but no audio.
"""
# The number of frames to store in the backward buffer...
BACK_LOAD_AMT = 50
MAX_FAST_FORWARD_MODE = 10
# Events for the VideoPlayer class, one triggered for every frame change, and one triggered for every change in
# play state (starting, stopping, pausing, etc....)
FrameChangeEvent, EVT_FRAME_CHANGE = NewCommandEvent()
PlayStateChangeEvent, EVT_PLAY_STATE_CHANGE = NewCommandEvent()
[docs]
def __init__(
self,
parent,
w_id=wx.ID_ANY,
video_hdl: cv2.VideoCapture = None,
crop_box: Optional[Box] = None,
zoom_config: Optional[ZoomConfig] = None,
pos=wx.DefaultPosition,
size=wx.DefaultSize,
style=wx.BORDER_DEFAULT,
validator=wx.DefaultValidator,
name="VideoPlayer",
):
"""
Create a new VideoPlayer
:param parent: The wx Control Parent.
:param w_id: The wx ID.
:param video_hdl: The cv2 VideoCapture to play video from. One should avoid never manipulate the video capture
once passed to this constructor, as the handle will be passed to another thread for fast
video loading.
:param crop_box: Tuple of ints, x, y, width, height, being the area of the video to show instead of the entire video.
if set to None, just shows the entire video...
:param pos: The position of the widget.
:param size: The size of the widget.
:param style: The style of the widget.
:param validator: The widgets validator.
:param name: The name of the widget.
"""
super().__init__(
parent, w_id, pos, size, style | wx.FULL_REPAINT_ON_RESIZE, validator, name
)
self.SetBackgroundStyle(wx.BG_STYLE_PAINT)
self._width = int(video_hdl.get(cv2.CAP_PROP_FRAME_WIDTH))
self._height = int(video_hdl.get(cv2.CAP_PROP_FRAME_HEIGHT))
self._fps = video_hdl.get(cv2.CAP_PROP_FPS)
self._crop_box = VideoTransform.check_crop_box(
crop_box, self._width, self._height
)
self._num_frames = get_frame_count(video_hdl)
# Useful indicator variables...
self._playing = False
self._frozen = False
self._prior_frames = deque(maxlen=self.BACK_LOAD_AMT)
self._current_loc = 0
size = self._compute_min_size()
self.SetMinSize(size)
self.SetInitialSize(size)
self._core_timer = wx.Timer(self)
# Create the video loader to start loading frames:
self._video_hdl = video_hdl
self._loaded_frame = None
self._max_video_load_rate = (1 / self._fps) / 2
self._last_frame_read = time.monotonic() - self._max_video_load_rate * 2
self._video_transform = None
self._zoom_config = zoom_config
self._is_pressed = False
self._prior_mouse_location = (0, 0)
self.Bind(wx.EVT_TIMER, self._on_timer)
self.Bind(wx.EVT_PAINT, self.on_paint)
self.Bind(wx.EVT_MOUSEWHEEL, self._on_wheel)
self.Bind(wx.EVT_LEFT_DOWN, self._on_mouse_down)
self.Bind(wx.EVT_LEFT_UP, self._on_mouse_up)
self.Bind(wx.EVT_MOTION, self._on_mouse_move)
self.Bind(wx.EVT_ERASE_BACKGROUND, lambda evt: None)
@property
def video_transform(self) -> VideoTransform:
if self._loaded_frame is None:
raise ValueError("No frame is loaded!")
if self._video_transform is None:
fh, fw = self._loaded_frame[1].shape[:2]
self._video_transform = VideoTransform(
(fw, fh), self.GetSize().Get(), self._crop_box
)
else:
fh, fw = self._loaded_frame[1].shape[:2]
self._video_transform.update((fw, fh), self.GetSize().Get(), self._crop_box)
return self._video_transform
def _on_wheel(self, evt):
if self._zoom_config is None:
evt.Skip()
return
if self._zoom_config.key is not None and not wx.GetKeyState(
self._zoom_config.key
):
evt.Skip()
return
if self._loaded_frame is None:
return
vt = self.video_transform
w, h = self.GetClientSize()
x, y = self.ScreenToClient(wx.GetMousePosition())
scale = vt.scale
offset = vt.offset
ix = (x + (scale * w * offset[0])) / (scale * w)
iy = (y + (scale * h * offset[1])) / (scale * h)
scale = min(
self._zoom_config.max_zoom,
max(
self._zoom_config.min_zoom,
scale + evt.GetWheelRotation() / self._zoom_config.zoom_slow_down,
),
)
offset = (ix - (x / (scale * w)), iy - (y / (scale * h)))
if scale <= 1:
offset = (0, 0)
vt.update(offset=offset, scale=scale)
self.Refresh()
evt.Skip()
def _on_mouse_down(self, evt):
if self._zoom_config is None or self._is_pressed:
evt.Skip()
return
self._is_pressed = True
self._prior_mouse_location = tuple(evt.GetPosition())
self._net_dist = 0
evt.Skip()
def _on_mouse_up(self, evt):
if self._zoom_config is None or not self._is_pressed:
evt.Skip()
return
self._on_mouse_move(evt, True)
self._is_pressed = False
evt.Skip()
def _on_mouse_move(self, evt, force_move=False):
if self._zoom_config is None or not self._is_pressed:
evt.Skip()
return
if self._loaded_frame is None:
return
vt = self.video_transform
nx, ny = evt.GetPosition()
px, py = self._prior_mouse_location
w, h = evt.GetEventObject().GetClientSize()
offset = vt.offset
scale = vt.scale
offset = (
offset[0] + ((px - nx) / (scale * w)),
offset[1] + ((py - ny) / (scale * h)),
)
self._net_dist += np.sqrt((px - nx) ** 2 + (py - ny) ** 2)
if force_move or self._net_dist > self._zoom_config.min_move_refresh:
self._prior_mouse_location = (nx, ny)
self._net_dist = 0
if scale <= 1:
offset = (0, 0)
vt.update(offset=offset)
self.Refresh()
evt.Skip()
def _compute_min_size(self) -> wx.Size:
displays = (wx.Display(i) for i in range(wx.Display.GetCount()))
sizes = [display.GetGeometry().GetSize() for display in displays]
w = int(min(self._width / 2, *(s.GetWidth() / 3 for s in sizes)))
h = int(min(self._height / 2, *(s.GetHeight() / 3 for s in sizes)))
return wx.Size(w, h)
[docs]
def on_paint(self, event):
"""
Run on a paint event, redraws the widget.
"""
painter = (
wx.PaintDC(self) if (self.IsDoubleBuffered()) else wx.BufferedPaintDC(self)
)
self.on_draw(painter)
def _attempt_frame_load(self):
now = time.monotonic()
if now - self._last_frame_read < self._max_video_load_rate:
# self.Refresh() # is this needed?
return
if self._loaded_frame is None:
self._prior_frames.clear()
self._loaded_frame = read_frame(self._video_hdl, self._current_loc)[1:]
return
offset = self._current_loc - self._loaded_frame[0]
if offset == 0:
return
elif offset > 0 and offset <= self.MAX_FAST_FORWARD_MODE:
while self._loaded_frame[0] < self._current_loc:
self._prior_frames.append(self._loaded_frame)
self._loaded_frame = read_frame(self._video_hdl)[1:]
elif offset < 0 and offset >= len(self._prior_frames):
while (
self._loaded_frame[0] > self._current_loc
and len(self._prior_frames) > 0
):
self._loaded_frame = self._prior_frames.pop()
else:
self._prior_frames.clear()
self._loaded_frame = read_frame(self._video_hdl, self._current_loc)[1:]
self._last_frame_read = now
[docs]
def on_draw(self, dc: wx.DC):
"""
Draws the widget.
:param dc: The wx DC to use for drawing.
"""
width, height = self.GetClientSize()
if (not width) or (not height):
return
dc.SetBackground(wx.Brush(self.GetBackgroundColour(), wx.BRUSHSTYLE_SOLID))
dc.Clear()
self._attempt_frame_load()
if self._loaded_frame is None:
return
vt = self.video_transform
resized_frame, (loc_x, loc_y) = vt.transform_image(
vt.get_cropped_image(self._loaded_frame[1]),
img_scale=1,
interpolation=cv2.INTER_LINEAR,
)
# Draw the video background
b_h, b_w = resized_frame.shape[:2]
bitmap = wx.Bitmap.FromBuffer(
b_w, b_h, resized_frame[:, :, ::-1].astype(dtype=np.uint8)
)
dc.DrawBitmap(bitmap, int(loc_x), int(loc_y))
[docs]
def _push_time_change_event(self):
"""PRIVATE: Used to specify how long the event should"""
new_event = self.FrameChangeEvent(id=self.Id, frame=self.get_offset_count())
wx.PostEvent(self, new_event)
[docs]
def _on_timer(self, event, trigger_run=True):
"""
PRIVATE: Executed whenever a timer event occurs, which triggers a video frame update if the video is playing.
"""
if self._playing:
if self._frozen:
self.pause()
return
# If we have reached the end of the video, pause the video and don't perform a frame update as
# we will deadlock the system by waiting for a frame forever...
if self._current_loc >= (self._num_frames - 1):
self.pause()
return
# Get the next frame and set it as the current frame
self._current_loc += 1
# Post a frame change event.
self._push_time_change_event()
# Trigger a redraw on the next pass through the loop and start the timer to play the next frame...
if trigger_run:
self._core_timer.StartOnce(int(1000 / self._fps))
self.Refresh() # Force a redraw....
[docs]
def play(self):
"""
Play the video.
"""
if not self.is_playing():
self._playing = True
wx.PostEvent(
self,
self.PlayStateChangeEvent(
id=self.Id, playing=True, stop_triggered=False
),
)
self._on_timer(None)
[docs]
def stop(self):
"""
Stop the video.
"""
self._playing = False
wx.PostEvent(
self,
self.PlayStateChangeEvent(id=self.Id, playing=False, stop_triggered=True),
)
self.set_offset_frames(0)
[docs]
def pause(self):
"""
Pause the video.
"""
self._playing = False
wx.PostEvent(
self,
self.PlayStateChangeEvent(id=self.Id, playing=False, stop_triggered=False),
)
[docs]
def is_playing(self) -> bool:
"""
Returns whether or not the video is currently playing.
"""
return self._playing
[docs]
def freeze(self):
"""
Freeze the video player, immediately pausing the video and making it unresponsive to play/pause/stop commands,
and also frame changing methods.
"""
self.pause()
self._frozen = True
[docs]
def unfreeze(self):
"""
Unfreeze the video, allowing controls to work again.
"""
self.pause()
self._frozen = False
[docs]
def is_frozen(self) -> bool:
"""
Check whether this video is frozen.
:returns: True is this video is frozen and therefore will not respond to any play/pause/stop commands, or
False otherwise.
"""
return self._frozen
[docs]
def get_offset_count(self):
"""
Get the current frame index we are at in the video.
:returns: An integer, the frame offset.
"""
return self._current_loc
[docs]
def get_total_frames(self):
"""
Get the total number of frames in this video.
:returns: An integer being the total frame count of the video.
"""
return int(self._num_frames)
[docs]
def move_back(self, amount: int = 1):
"""
Move backward a given amount of frames.
:param amount: A non-negative integer. Being how many frames to move backward. Defaults to 1.
Can be 0, does nothing if so.
"""
# Check if movement is valid...
if amount < 0:
raise ValueError("Offset must be positive!")
elif amount == 0:
return
self.set_offset_frames(self._current_loc - amount)
[docs]
def move_forward(self, amount: int = 1):
"""
Move forward a given amount of frames.
:param amount: A non-negative integer. Being how many frames to move forward. Defaults to 1.
Can be 0, does nothing if so.
"""
# Check if movement is valid...
if amount < 0:
raise ValueError("Offset must be positive!")
elif amount == 0:
return
self.set_offset_frames(self._current_loc + amount)
[docs]
def set_offset_frames(self, value: int):
"""
Set the current frame offset location into the video.
:param value: An integer index, being the frame to move to in the video.
"""
# Is this a valid frame value?
if not (0 <= value < self._num_frames):
raise ValueError(
f"Can't set frame index to {value}, there is only {self._num_frames} frames."
)
if self._frozen:
return
# current_state = self._playing
# self._playing = False
self._current_loc = value
# Restore play state prior to frame change...
# self._playing = current_state
self._push_time_change_event()
self.Refresh()
# self._core_timer.StartOnce(int(1000 / self._fps))
[docs]
def __del__(self):
"""
Delete this video player, deleting its video reading thread.
"""
self._prior_frames.clear()
self._video_hdl.release()
[docs]
class VideoController(wx.Panel):
"""
Provides a set of video controls for controlling a VideoPlayer. Provides some play back controls.
"""
PLAY_SYMBOL = "\u25b6"
PAUSE_SYMBOL = "\u23f8"
STOP_SYMBOL = "\u23f9"
FRAME_BACK_SYMBOL = "\u21b6"
FRAME_FORWARD_SYMBOL = "\u21b7"
[docs]
def __init__(
self,
parent,
video_player: VideoPlayer,
w_id=wx.ID_ANY,
pos=wx.DefaultPosition,
size=wx.DefaultSize,
style=wx.TAB_TRAVERSAL,
name="VideoController",
):
"""
Construct a new VideoController.
:param parent: The parent WX widget.
:param video_player: The VideoPlayer to control. Will automatically hook into the video player's events.
:param w_id: The WX ID. Defaults to wx.ID_ANY
:param pos: The WX Position. Defaults to wx.DefaultPosition.
:param size: The WX Size. Defaults to wx.DefaultSize.
:param style: A wx.Panel style. Look at wx.Panel docs to see supported styles. Defaults to wx.TAB_TRAVERSAL.
:param name: The WX internal name.
"""
super().__init__(parent, w_id, pos, size, style, name)
if video_player is None:
raise ValueError("Have to pass a VideoPlayer!!!")
self._video_player = video_player
self._sizer = wx.BoxSizer(wx.HORIZONTAL)
self._back_btn = wx.Button(self, label=self.FRAME_BACK_SYMBOL)
self._play_pause_btn = wx.Button(self, label=self.PLAY_SYMBOL)
self._stop_btn = wx.Button(self, label=self.STOP_SYMBOL)
self._forward_btn = wx.Button(self, label=self.FRAME_FORWARD_SYMBOL)
self._slider_control = wx.Slider(
self,
value=0,
minValue=0,
maxValue=video_player.get_total_frames() - 1,
style=wx.SL_HORIZONTAL | wx.SL_LABELS,
)
self._sizer.Add(self._back_btn, 0, wx.EXPAND | wx.ALL)
self._sizer.Add(self._play_pause_btn, 0, wx.EXPAND | wx.ALL)
self._sizer.Add(self._stop_btn, 0, wx.EXPAND | wx.ALL)
self._sizer.Add(self._forward_btn, 0, wx.EXPAND | wx.ALL)
self._sizer.Add(self._slider_control, 1, wx.EXPAND)
self._sizer.SetSizeHints(self)
self.SetSizer(self._sizer)
self._video_player.Bind(VideoPlayer.EVT_FRAME_CHANGE, self.frame_change)
self._video_player.Bind(VideoPlayer.EVT_PLAY_STATE_CHANGE, self.on_play_switch)
self._slider_control.Bind(wx.EVT_SLIDER, self.on_slide)
self._play_pause_btn.Bind(wx.EVT_BUTTON, self.on_play_pause_press)
self._back_btn.Bind(wx.EVT_BUTTON, self.on_back_press)
self._forward_btn.Bind(wx.EVT_BUTTON, self.on_forward_press)
self._stop_btn.Bind(wx.EVT_BUTTON, lambda evt: self._video_player.stop())
[docs]
def on_char(self, evt: wx.KeyEvent):
"""
PRIVATE: Handles optional keyboard events....
"""
if not self.IsEnabled() and not self._video_player.is_frozen():
return
# Is the control were working with some type of text input?
# If so don't process this event...
window: wx.Window = wx.GetTopLevelParent(self)
foc_widget = window.FindFocus()
from wx.lib.agw.floatspin import FloatSpin
if isinstance(foc_widget, (wx.SpinCtrl, wx.TextEntry, FloatSpin)):
evt.Skip()
return
if evt.GetModifiers() != 0:
evt.Skip()
return
elif evt.GetKeyCode() == wx.WXK_SPACE:
self.on_play_pause_press(None)
# If it was the space key we eat the event, to stop buttons from triggering. User can still use enter key
# to activate buttons in the UI....
return
elif evt.GetKeyCode() == wx.WXK_LEFT:
self.on_back_press(None)
elif evt.GetKeyCode() == wx.WXK_RIGHT:
self.on_forward_press(None)
elif evt.GetKeyCode() == wx.WXK_BACK:
self._video_player.stop()
evt.Skip()
[docs]
def set_keyboard_listener(self, control: wx.Window):
"""
Set the keyboard listener, which enables keyboard shortcuts for this video controller.
:param control: The wx.Window to bind listen for keyboard events from.
"""
control.Bind(wx.EVT_CHAR_HOOK, self.on_char)
[docs]
def frame_change(self, event):
"""
PRIVATE: Triggered when video player frame changes.
"""
frame = event.frame
self._slider_control.SetValue(frame)
self._back_btn.Enable(frame > 0)
self._forward_btn.Enable(frame < (self._video_player.get_total_frames() - 1))
wx.PostEvent(self, event)
[docs]
def on_play_switch(self, event):
"""
PRIVATE: Triggered when video player is paused/played.
"""
self._play_pause_btn.SetLabel(
self.PAUSE_SYMBOL if (event.playing) else self.PLAY_SYMBOL
)
[docs]
def on_slide(self, event):
"""
PRIVATE: Triggered when slider is moved.
"""
self._video_player.set_offset_frames(self._slider_control.GetValue())
[docs]
def on_play_pause_press(self, event):
"""
PRIVATE: Triggered when the play/pause button is pressed.
"""
if self._video_player.is_playing():
self._video_player.pause()
else:
if (
self._video_player.get_offset_count() + 1
== self._video_player.get_total_frames()
):
self._video_player.set_offset_frames(0)
self._video_player.play()
[docs]
def on_back_press(self, event):
"""
PRIVATE: Triggered when go back 1 frame button is pressed.
"""
if self._video_player.get_offset_count() > 0:
self._video_player.move_back()
[docs]
def on_forward_press(self, event):
"""
PRIVATE: Triggered when go forward 1 frame button has been pressed.
"""
if self._video_player.get_offset_count() < (
self._video_player.get_total_frames() - 1
):
self._video_player.move_forward()
get_frame_count = get_frame_count_robust_fast
def _main_test():
from diplomat.wx_gui.probability_displayer import ProbabilityDisplayer
# We test the video player by playing a video with it.
vid_path = input("Enter a video path: ")
print(get_frame_count(ContextVideoCapture(vid_path)))
app = wx.App()
wid_frame = wx.Frame(None, title="Test...")
panel = wx.Panel(parent=wid_frame)
sizer = wx.BoxSizer(wx.VERTICAL)
wid = VideoPlayer(
panel, video_hdl=ContextVideoCapture(vid_path), zoom_config=ZoomConfig()
)
obj3 = ProbabilityDisplayer(
panel,
data=np.random.randint(0, 10, (wid.get_total_frames())),
bad_locations=np.array([], np.uint64),
)
obj2 = VideoController(panel, video_player=wid)
obj2.set_keyboard_listener(wid_frame)
obj2.Bind(wid.EVT_FRAME_CHANGE, lambda evt: obj3.set_location(evt.frame))
sizer.Add(wid, 1, wx.EXPAND)
sizer.Add(obj3, 0, wx.EXPAND)
sizer.Add(obj2, 0, wx.EXPAND)
panel.SetSizerAndFit(sizer)
wid_frame.Fit()
wid_frame.Show(True)
def destroy(evt):
wid_frame.Destroy()
wid_frame.Bind(wx.EVT_CLOSE, destroy)
app.MainLoop()
if __name__ == "__main__":
_main_test()