"""
Provides utility functions for quickly extracting frames from diplomat frame store files, and also printing frame data to the terminal
for debugging and display purposes.
"""
from typing import BinaryIO, Sequence, Callable, Optional, Generator, Union, Tuple, NamedTuple, List
from diplomat.processing import TrackingData
from diplomat.utils import frame_store_fmt
from io import BytesIO
import base64
import numpy as np
import cv2
[docs]
def get_terminal_size(fallback: Tuple[int, int] = (80, 24)):
"""
Get the size of the terminal.
:param fallback: The fallback size to use if one can't be found.
:return: A tuple of two ints, the size of the terminal in characters and lines.
"""
import os
for file_desc in range(3):
try:
return os.get_terminal_size(file_desc)
except OSError:
pass
return fallback
[docs]
class BorderStyle(NamedTuple):
"""
A named tuple, for representing a border style for the pretty print methods, mostly used internally...
"""
top_left: str
top_right: str
bottom_right: str
bottom_left: str
top: str
right: str
bottom: str
left: str
info_func: Callable[[int, int], str] = lambda w, h: f"{w}x{h}"
[docs]
def pretty_print_frame(
data: TrackingData,
frame_idx: int = 0,
body_part: int = 0,
dynamic_sz: bool = True,
size_up: bool = False,
interpol: int = cv2.INTER_CUBIC,
format_type: Tuple[str, int, tuple] = FrameStringFormats.REGULAR
):
"""
Print a DeepLabCut Probability Frame.
:param data: The probability frame, a TrackingData object.
:param frame_idx: Frame index to print, defaults to 0.
:param body_part: Body part index to print, defaults to 0.
:param dynamic_sz: Determines if the frame is resized to fit the window if it is to big....
:param size_up: Determines if the frame should not only be downsized, but also upsized if the terminal provides
extra room.
:param interpol: The interpolation method if a size is specified. Defaults to cv2.INTER_CUBIC, but any cv2
interpolation value works.
:param format_type: The format or 'font' to use for the pretty printed string. A tuple, containing a sequence of
strings being the displayed characters at given magnitudes, and an integer being the number
of times to repeat the characters when displaying them. ('abcd', 2 with 0 becomes aa)
"""
if(dynamic_sz):
print(pretty_frame_string(data, frame_idx, body_part, get_terminal_size()[0], size_up, interpol, format_type))
else:
print(pretty_frame_string(data, frame_idx, body_part, 0, size_up, interpol, format_type))
[docs]
def pretty_frame_string(
data: TrackingData,
frame_idx: int = 0,
body_part: int = 0,
width_limit: int = 0,
size_up: bool = False,
interpol: int = cv2.INTER_CUBIC,
format_type: Tuple[str, int, tuple] = FrameStringFormats.REGULAR
) -> str:
"""
Return a DeepLabCut Probability Frame in a pretty string for printing to the terminal.
:param data: The probability frame, a TrackingData object.
:param frame_idx: Frame index to print, defaults to 0.
:param body_part: Body part index to print, defaults to 0.
:param width_limit: Determines the max with the string can be when printing. Defaults to 0, meaning there is no
width limit.
:param size_up: Determines if the frame should not only be downsized, but also upsized if the width provided
gives extra room...
:param interpol: The interpolation method if a size is specified. Defaults to cv2.INTER_CUBIC, but any cv2
interpolation value works.
:param format_type: The format or 'font' to use for the pretty printed string. A length 3 tuple, containing a
sequence of characters being the displayed characters at given magnitudes, an integer being
the number of times to repeat the characters when displaying them. ('abcd', 2 with 0 becomes
aa), and an optional tuple of 8 strings representing characters for drawing a border (If None,
no border is drawn, tuple order is top-left, top right, bottom-right, bottom-left, top, right,
bottom, left).
:returns: A pretty string of the frame, that can be printed to the console.
"""
chars, char_rep_amt, border = format_type
if(border is not None):
border = BorderStyle(*border)
frame = data.get_prob_table(frame_idx, body_part)
# Make range 0-1...
max_val = np.nanmax(frame)
if(max_val != 0):
frame = frame / max_val
if(width_limit >= (char_rep_amt * 2)):
new_w = (width_limit / char_rep_amt) - 1
new_w = min(frame.shape[1], new_w) if(not size_up) else new_w
sized_f = cv2.resize(frame, (int(new_w), int(frame.shape[0] * (new_w / frame.shape[1]))),
interpolation=interpol)
max_val = np.nanmax(sized_f)
if(max_val != 0):
sized_f = sized_f / max_val
else:
sized_f = frame
res = bytearray()
if(border is not None):
dim_str = border.info_func(frame.shape[1], frame.shape[0])
res += (
f"{border.top_left}"
f"{dim_str}{border.top * ((sized_f.shape[1] * char_rep_amt) - len(dim_str))}"
f"{border.top_right}\n"
).encode()
for y in range(sized_f.shape[0]):
if(border is not None):
res += border.left.encode()
for x in range(sized_f.shape[1]):
res += (chars[int(sized_f[y, x] * (len(chars) - 0.5))] * char_rep_amt).encode()
if(border is not None):
res += f"{border.right}\n".encode()
else:
res += "\n".encode()
if(border is not None):
res += (
f"{border.bottom_left}"
f"{border.bottom * (sized_f.shape[1] * char_rep_amt)}"
f"{border.bottom_right}"
).encode()
return res.decode()
[docs]
def unpack_frame_string(
frame_string: bytes,
frames_per_iter: int = 0
) -> Tuple[List[str], Union[TrackingData, Generator[TrackingData, None, None]]]:
"""
Unpack a frame store string into a tracking data object for access to the original probability frame data.
:param frame_string: A bytes object containing the base64 encoded frame store file.
:param frames_per_iter: Number of frames to return in each TrackingData object generated. If this value is set to
0 or less, this function returns a single TrackingData object storing all frames instead of
returning a generator.
:returns: A tuple containing:
- A list of strings (body parts) and,
- A single TrackingData object if frames_per_iter <= 0,
or a Generator of TrackingData objects if frames_per_iter > 0.
"""
f = BytesIO(base64.decodebytes(frame_string))
reader = frame_store_fmt.DLFSReader(f)
if(frames_per_iter <= 0):
return (reader.get_header().bodypart_names, reader.read_frames(reader.get_header().number_of_frames))
else:
return (reader.get_header().bodypart_names, _unpack_frame_string_gen(reader, frames_per_iter))
def _unpack_frame_string_gen(reader: frame_store_fmt.DLFSReader, frames_per_iter: int = 0):
while(reader.has_next(frames_per_iter)):
yield reader.read_frames(frames_per_iter)
extra = reader.get_header().number_of_frames - (reader.tell_frame() + 1)
if(extra > 0):
yield reader.read_frames(extra)
return None