import pickle
from os import PathLike
from pathlib import Path
from typing import List, Union, Optional, Dict, Any, Tuple
import cv2
import numpy as np
import tqdm
from .dlc_importer import auxiliaryfunctions
import pandas as pd
from diplomat.processing import *
import diplomat.processing.type_casters as tc
from diplomat.utils.colormaps import to_colormap, iter_colormap
from diplomat.utils.cli_tools import extra_cli_args
from matplotlib import colors as mpl_colors
from diplomat.utils.shapes import shape_iterator, CV2DotShapeDrawer
def cv2_fourcc_string(val) -> int:
return int(cv2.VideoWriter_fourcc(*val))
Pathy = Union[PathLike, str]
LABELED_VIDEO_SETTINGS = {
"skeleton_color": ("black", mpl_colors.to_rgba, "Color of the skeleton."),
"pcutoff": (0.1, tc.RangedFloat(0, 1), "The probability to cutoff results below."),
"dotsize": (4, int, "The size of the dots."),
"alphavalue": (0.7, tc.RangedFloat(0, 1), "The alpha value of the dots."),
"colormap": (None, to_colormap, "The colormap to use for tracked points in the video. Can be a matplotlib colormap or a list of matplotlib colors."),
"shape_list": (None, tc.Optional(tc.List(str)), "A list of shape names, shapes to use for drawing each individual's dots."),
"line_thickness": (1, int, "Thickness of lines drawn."),
"antialiasing": (True, bool, "Use antialiasing when drawing points."),
"draw_hidden_tracks": (True, bool, "Whether or not to draw locations under the pcutoff value."),
"output_codec": ("mp4v", cv2_fourcc_string, "The codec to use for the labeled video..."),
"upscale_factor": (None, tc.Optional(tc.RangedFloat(0, 1000)), "An optional float multiplier to use for "
"increasing the size of the output video.")
}
class EverythingSet:
def __contains__(self, item):
return True
def _to_str_list(path_list):
if(isinstance(path_list, (list, tuple))):
return [str(path) for path in path_list]
return str(path_list)
[docs]
@extra_cli_args(LABELED_VIDEO_SETTINGS, auto_cast=False)
@tc.typecaster_function
def label_videos(
config: tc.PathLike,
videos: tc.Union[tc.List[tc.PathLike], tc.PathLike],
body_parts_to_plot: tc.Optional[tc.List[str]] = None,
shuffle: int = 1,
training_set_index: int = 0,
tracker: str = "",
video_type: str = "",
**kwargs
):
"""
Create labeled videos using results generated by DIPLOMAT's DEEPLABCUT frontend.
:param config: The path to the DLC config.yaml. The visual settings from the DLC config are used.
:param videos: A single path or list of paths, to the location of the video files to annotate. Can also be a directory.
:param body_parts_to_plot: A list of strings, specifying which body parts to plot. Defaults to None, which plots points for all body parts.
:param shuffle: int, optional. An integer specifying the shuffle index of the training dataset used for training
the network. The default is 1.
:param training_set_index: int, optional. Integer specifying which TrainingsetFraction to use. By default, the first
(note that TrainingFraction is a list in config.yaml).
:param tracker: String, the extension of the deeplabcut tracker used, used to find the h5 file. Doesn't need to be set for DIPLOMAT files.
:param video_type: Optional string, the video extension to search for if the 'videos' argument is a directory
to search inside ('.avi', '.mp4', ...).
:param kwargs: The following additional arguments are supported:
{extra_cli_args}
"""
cfg = auxiliaryfunctions.read_config(config)
train_frac = cfg["TrainingFraction"][training_set_index]
dlc_scorer, __ = auxiliaryfunctions.GetScorerName(
cfg, shuffle, train_frac
)
plotting_settings = Config({}, LABELED_VIDEO_SETTINGS)
plotting_settings.extract(cfg)
plotting_settings.colormap = cfg.get("diplomat_colormap", cfg["colormap"])
plotting_settings.update(kwargs)
video_list = auxiliaryfunctions.get_list_of_videos(_to_str_list(videos), video_type)
for video in video_list:
try:
loc_data, metadata, out_path, h5_path = _get_video_info(video, dlc_scorer, tracker)
if(Path(out_path).exists()):
print(f"Labeled video {Path(video).name} already exists...")
continue
except FileNotFoundError:
print(f"Unable to find h5 file for video {Path(video).name}. Make sure to run analysis first!")
continue
_create_video_single(
Path(video),
loc_data,
metadata,
out_path,
plotting_settings,
body_parts_to_plot
)
def _convert_new_ma_dlc_hdf5(df: pd.DataFrame) -> pd.DataFrame:
lvls = df.columns.levels
names = [lvl.name for lvl in lvls]
scs, indvs, bps, coords = [list(df.columns.unique(name)) for name in names]
iterator = [(sc, i, ind, bp, coord) for sc in scs for bp in bps for i, ind in enumerate(indvs) for coord in coords]
new_cols = [(sc, bp, coord + ("" if (i == 0) else f"{i + 1}")) for sc, i, ind, bp, coord in iterator]
old_cols = [(sc, ind, bp, coord) for sc, i, ind, bp, coord in iterator]
df2 = pd.DataFrame(index=df.index, columns=pd.MultiIndex.from_tuples(new_cols, names=[names[0], *names[2:]]))
for old_col, new_col in zip(old_cols, new_cols):
df2[new_col] = df[old_col]
return df2
def _get_video_info(video: Pathy, dlc_scorer: str, tracker_ext: str = "") -> Tuple[pd.DataFrame, Dict[str, Any], PathLike, PathLike]:
video = Path(video)
parent_folder = video.resolve().parent
tracker_ext = f"_{tracker_ext}" if(tracker_ext != "") else tracker_ext
h5_path = parent_folder / (video.stem + dlc_scorer + tracker_ext + ".h5")
try:
df_data = pd.read_hdf(str(h5_path), "df_with_missing")
except KeyError:
df_data = pd.read_hdf(str(h5_path))
if(len(df_data.columns.levels) == 4):
# We have a thing in the new MA-DLC format, convert to older MA-DLC format...
df_data = _convert_new_ma_dlc_hdf5(df_data)
with (parent_folder / (video.stem + dlc_scorer + "_meta.pickle")).open("rb") as f:
metadata = pickle.load(f)["data"]
final_video_path = parent_folder / (video.stem + dlc_scorer + "_labeled.mp4")
return (df_data, metadata, final_video_path, h5_path)
def _to_cv2_color(color: Tuple[float, float, float, float]) -> Tuple[int, int, int, int]:
r, g, b, a = [min(255, max(0, int(val * 256))) for val in color]
return (b, g, r, a)
def _create_video_single(
video_path: Pathy,
location_data: pd.DataFrame,
pickle_info: Dict[str, Any],
export_path: Pathy,
plotting_settings: Config,
body_parts_to_plot: Optional[List[str]] = None,
) -> None:
print(f"Creating labeled video: {Path(video_path).name}")
unlabeled_video = cv2.VideoCapture(str(video_path))
x_off, y_off = 0, 0
x_off2 = int(unlabeled_video.get(cv2.CAP_PROP_FRAME_WIDTH))
y_off2 = int(unlabeled_video.get(cv2.CAP_PROP_FRAME_HEIGHT))
if(pickle_info["cropping"]):
x_off, x_off2, y_off, y_off2 = [int(v) for v in pickle_info["cropping_parameters"]]
upscale = 1 if(plotting_settings.upscale_factor is None) else float(plotting_settings.upscale_factor)
labeled_video = cv2.VideoWriter(
str(export_path),
plotting_settings.output_codec,
float(unlabeled_video.get(cv2.CAP_PROP_FPS)),
(int((x_off2 - x_off) * upscale), int((y_off2 - y_off) * upscale))
)
if(not labeled_video.isOpened()):
raise IOError("Unable to create labeled video with the specified location and codec.")
# Compute the body parts to look up...
body_parts_to_plot = EverythingSet() if(body_parts_to_plot is None) else set(body_parts_to_plot)
body_part_data = []
counts = {}
for col in location_data:
scorer, body_part, coord = col
if(body_part in body_parts_to_plot and coord.startswith("x")):
counts[body_part] = counts.get(body_part, 0) + 1
# Views into the data_frame...
body_part_data.append([
location_data[scorer, body_part, coord],
location_data[scorer, body_part, "y" + coord[1:]],
location_data[scorer, body_part, "likelihood" + coord[1:]]
])
num_outputs = max(counts.values())
progress = tqdm.tqdm(total=location_data.shape[0])
i = 0
# Now begin writing frames...
while(unlabeled_video.isOpened() and labeled_video.isOpened() and i < location_data.shape[0]):
got_frame, frame = unlabeled_video.read()
if(not got_frame):
break
frame = frame[y_off:y_off2, x_off:x_off2]
if(plotting_settings.upscale_factor is not None):
h, w = frame.shape[:-1]
frame = cv2.resize(frame, (int(w * upscale), int(h * upscale)), interpolation=cv2.INTER_NEAREST)
overlay = frame.copy()
colors = iter_colormap(plotting_settings.colormap, len(body_part_data))
shapes = shape_iterator(plotting_settings.shape_list, num_outputs)
for bp_idx, ((bp_x, bp_y, bp_p), color, shape) in enumerate(zip(body_part_data, colors, shapes)):
if(not (np.isfinite(bp_x[i]) and np.isfinite(bp_y[i]))):
continue
shape_drawer = CV2DotShapeDrawer(
overlay,
_to_cv2_color(tuple(color[:3]) + (1,)),
-1 if(bp_p[i] > plotting_settings.pcutoff) else plotting_settings.line_thickness,
cv2.LINE_AA if(plotting_settings.antialiasing) else None
)[shape]
if(bp_p[i] > plotting_settings.pcutoff or plotting_settings.draw_hidden_tracks):
shape_drawer(int(bp_x[i] * upscale), int(bp_y[i] * upscale), int(plotting_settings.dotsize * upscale))
labeled_video.write(cv2.addWeighted(
overlay, plotting_settings.alphavalue, frame, 1 - plotting_settings.alphavalue, 0
))
progress.update()
i += 1
unlabeled_video.release()
labeled_video.release()