import cv2
import mediapipe as mp
import csv
import os
import pandas as pd
import numpy as np
import math
import shutil
import uuid
import time
from pathlib import Path
# Globals
STANCE = "regular"
STANCE_CONFIDENCE = 0.0
STANCE_LOCKED = False
STANCE_SCORE = {
"regular": 0,
"goofy": 0
}
GHOST_PHASE_ORDER = ["prep", "pop", "air", "land"]
SHOW_GHOST_IN_PHASES = {"prep", "pop", "air", "land"}
GHOST_JOINT_NAMES = [
"LEFT_SHOULDER", "RIGHT_SHOULDER",
"LEFT_ELBOW", "RIGHT_ELBOW",
"LEFT_WRIST", "RIGHT_WRIST",
"LEFT_HIP", "RIGHT_HIP",
"LEFT_KNEE", "RIGHT_KNEE",
"LEFT_ANKLE", "RIGHT_ANKLE",
]
SKELETON_EDGES = [
# Beine
("LEFT_HIP", "LEFT_KNEE"),
("LEFT_KNEE", "LEFT_ANKLE"),
("RIGHT_HIP", "RIGHT_KNEE"),
("RIGHT_KNEE", "RIGHT_ANKLE"),
# Hüfte / Oberkörper
("LEFT_HIP", "RIGHT_HIP"),
("LEFT_SHOULDER", "RIGHT_SHOULDER"),
("LEFT_HIP", "LEFT_SHOULDER"),
("RIGHT_HIP", "RIGHT_SHOULDER"),
# Arme
("LEFT_SHOULDER", "LEFT_ELBOW"),
("LEFT_ELBOW", "LEFT_WRIST"),
("RIGHT_SHOULDER", "RIGHT_ELBOW"),
("RIGHT_ELBOW", "RIGHT_WRIST"),
]
PHASE_ORDER = ["prep", "pop", "air", "land"]
PHASE_INDEX = {p: i for i, p in enumerate(PHASE_ORDER)}
PHASE_LABELS = {
"prep": "PREP – Vorbereitung",
"pop": "POP – Absprung",
"air": "AIR – Flugphase",
"land": "LAND – Landung"
}
KNEE_ANGLE_RULES = {
"prep": {"green": (140, 180), "yellow": (120, 140)},
"pop": {"green": (150, 180), "yellow": (130, 150)},
"air": {"green": (110, 160), "yellow": (90, 110)},
"land": {"green": (130, 170), "yellow": (110, 130)}
}
PHASE_FEEDBACK_RULES = {
"prep": {
"LEFT_ANKLE": ["red", "yellow"], "RIGHT_ANKLE": ["red", "yellow"],
"LEFT_KNEE": ["red", "yellow"], "RIGHT_KNEE": ["red", "yellow"],
"HIP": ["yellow"],
},
"pop": {
"LEFT_ANKLE": ["red", "yellow"], "RIGHT_ANKLE": ["red", "yellow"],
"LEFT_KNEE": ["red"], "RIGHT_KNEE": ["red"],
"HIP": ["red", "yellow"],
},
"air": {
"LEFT_KNEE": ["red", "yellow"], "RIGHT_KNEE": ["red", "yellow"],
"LEFT_SHOULDER": ["red", "yellow"], "RIGHT_SHOULDER": ["red", "yellow"],
"LEFT_ELBOW": ["red", "yellow"], "RIGHT_ELBOW": ["red", "yellow"],
},
"land": {
"LEFT_ANKLE": ["red", "yellow"], "RIGHT_ANKLE": ["red", "yellow"],
"LEFT_KNEE": ["red", "yellow"], "RIGHT_KNEE": ["red", "yellow"],
"HIP": ["yellow"],
}
}
JOINT_FEEDBACK_TEXT = {
"LEFT_KNEE": {
"prep": {"red": "Bend front knee more", "yellow": "Keep front knee bent"},
"pop": {"red": "Front knee straightening too early"},
"air": {"red": "Bring front knee back over the board", "yellow": "Keep front knee with the board"},
"land": {"red": "Bend front knee to absorb landing", "yellow": "Bend front knee on landing"}
},
"RIGHT_KNEE": {
"prep": {"red": "Bend back knee more", "yellow": "Keep back knee bent"},
"pop": {"red": "Back knee straightening too early"},
"air": {"red": "Bring back knee over the board", "yellow": "Keep back knee with the board"},
"land": {"red": "Bend back knee to absorb landing", "yellow": "Bend back knee on landing"}
},
"LEFT_ANKLE": {
"prep": {"red": "Front foot not stable", "yellow": "Set front foot firmer"},
"pop": {"red": "Pull front foot up faster", "yellow": "Pull front foot up more"},
"land": {"red": "Land solid on front foot", "yellow": "Press front foot down"}
},
"RIGHT_ANKLE": {
"prep": {"red": "Back foot not stable", "yellow": "Set back foot firmer"},
"pop": {"red": "Snap tail harder", "yellow": "Snap tail faster"},
"land": {"red": "Land solid on back foot", "yellow": "Press back foot down"}
},
"HIP": {
"prep": {"yellow": "Stay centered over the board"},
"pop": {"red": "Jump straight up with the board", "yellow": "Drive body upward with pop"},
"air": {"red": "Body drifting away from the board", "yellow": "Stay above the board"},
"land": {"yellow": "Stay centered on landing"}
},
"LEFT_SHOULDER": {"air": {"red": "Front shoulder opening too early", "yellow": "Keep front shoulder closed"}},
"RIGHT_SHOULDER": {"air": {"red": "Back shoulder opening too early", "yellow": "Keep back shoulder closed"}},
"LEFT_ELBOW": {"air": {"red": "Front arm moving away", "yellow": "Keep front arm close"}},
"RIGHT_ELBOW": {"air": {"red": "Back arm moving away", "yellow": "Keep back arm close"}}
}
ARROW_VECTOR_MEMORY = {}
ARROW_DEADZONE_PX = 8
ARROW_LAST_VECTOR = {}
# Helper Functions
def map_joint_by_stance(joint, stance):
if stance == "regular":
return joint
if stance == "goofy":
swap = {
"LEFT_ANKLE": "RIGHT_ANKLE", "RIGHT_ANKLE": "LEFT_ANKLE",
"LEFT_KNEE": "RIGHT_KNEE", "RIGHT_KNEE": "LEFT_KNEE",
"LEFT_SHOULDER": "RIGHT_SHOULDER", "RIGHT_SHOULDER": "LEFT_SHOULDER",
"LEFT_ELBOW": "RIGHT_ELBOW", "RIGHT_ELBOW": "LEFT_ELBOW",
}
return swap.get(joint, joint)
return joint
def extract_pose_to_csv(video_path: str, output_csv: str):
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False)
cap = cv2.VideoCapture(video_path)
fieldnames = ["timestamp"] + [f"lm_{i}_{axis}" for i in range(33) for axis in ["x", "y", "z"]]
with open(output_csv, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
while True:
ret, frame = cap.read()
if not ret: break
ts = int(cap.get(cv2.CAP_PROP_POS_MSEC))
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
res = pose.process(rgb)
if not res.pose_landmarks: continue
row = {"timestamp": ts}
for i, lm in enumerate(res.pose_landmarks.landmark):
row[f"lm_{i}_x"], row[f"lm_{i}_y"], row[f"lm_{i}_z"] = lm.x, lm.y, lm.z
writer.writerow(row)
cap.release()
pose.close()
def calculate_angle(a, b, c):
a, b, c = np.array(a), np.array(b), np.array(c)
ba, bc = a - b, c - b
cosine = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
return np.degrees(np.arccos(np.clip(cosine, -1, 1)))
def extract_knee_angle(lm):
def angle_helper(a, b, c):
ba = np.array([a.x - b.x, a.y - b.y])
bc = np.array([c.x - b.x, c.y - b.y])
cosang = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
return np.degrees(np.arccos(np.clip(cosang, -1.0, 1.0)))
left = angle_helper(lm[23], lm[25], lm[27])
right = angle_helper(lm[24], lm[26], lm[28])
return (left + right) / 2
def row_to_keypoints(row, w, h, joint_names):
pts = {}
for j in joint_names:
xk, yk = f"{j}_x", f"{j}_y"
if xk in row and yk in row:
pts[j] = np.array([row[xk] * w, row[yk] * h], dtype=np.float32)
return pts
def get_mid_hip(pts):
if "LEFT_HIP" not in pts or "RIGHT_HIP" not in pts: return None
return 0.5 * (pts["LEFT_HIP"] + pts["RIGHT_HIP"])
def get_mid_hip_from_row(row, w, h):
required = ["LEFT_HIP_x", "LEFT_HIP_y", "RIGHT_HIP_x", "RIGHT_HIP_y"]
for key in required:
if key not in row or pd.isna(row[key]): return None
mid_x, mid_y = (row["LEFT_HIP_x"] + row["RIGHT_HIP_x"]) / 2.0, (row["LEFT_HIP_y"] + row["RIGHT_HIP_y"]) / 2.0
if not (0.0 <= mid_x <= 1.0 and 0.0 <= mid_y <= 1.0): return None
return (int(mid_x * w), int(mid_y * h))
def is_core_visible(row, min_core_joints=2):
core = ["LEFT_HIP", "RIGHT_HIP", "LEFT_KNEE", "RIGHT_KNEE"]
valid = 0
for j in core:
x, y = row.get(f"{j}_x", -1), row.get(f"{j}_y", -1)
if 0.05 < x < 0.95 and 0.05 < y < 0.95: valid += 1
return valid >= min_core_joints
def core_motion_score(prev_row, curr_row):
joints = ["LEFT_HIP", "RIGHT_HIP", "LEFT_KNEE", "RIGHT_KNEE"]
motion = 0.0
for j in joints:
for axis in ("x", "y"):
k = f"{j}_{axis}"
if k in prev_row and k in curr_row: motion += abs(curr_row[k] - prev_row[k])
return motion
# Phase Detection
def detect_ollie_phases(df):
df = df.copy()
for c in ["LEFT_KNEE_y", "RIGHT_KNEE_y", "LEFT_ANKLE_y", "RIGHT_ANKLE_y"]:
df[c + "_s"] = df[c].rolling(5, center=True).mean()
df["lk_v"] = df["LEFT_KNEE_y_s"].diff()
df["rk_v"] = df["RIGHT_KNEE_y_s"].diff()
prep_mask = (df["lk_v"] > 0.002) & (df["rk_v"] > 0.002)
prep_idx = prep_mask[prep_mask].index
prep_start = prep_idx[0] if len(prep_idx) else df.index[0]
pop_candidates = df.loc[prep_start:, "lk_v"]
pop_candidates = pop_candidates[pop_candidates < -0.006].index
pop_idx = pop_candidates[0] if len(pop_candidates) else prep_start + 5
pop_end = min(pop_idx + 2, df.index[-1])
la, ra = df["LEFT_ANKLE_y_s"], df["RIGHT_ANKLE_y_s"]
air_mask = (la < la.shift(1) - 0.002) & (ra < ra.shift(1) - 0.002)
air_candidates = air_mask[air_mask & (df.index > pop_end)].index
air_start = air_candidates[0] if len(air_candidates) else pop_end + 1
ground_la, ground_ra = la.loc[prep_start:prep_start + 5].mean(), ra.loc[prep_start:prep_start + 5].mean()
land_mask = (la > ground_la * 0.97) & (ra > ground_ra * 0.97)
land_candidates = land_mask[land_mask & (df.index > air_start)].index
land_idx = land_candidates[0] if len(land_candidates) else df.index[-1]
df["phase"] = "prep"
df.loc[pop_idx:pop_end, "phase"] = "pop"
df.loc[air_start:land_idx - 1, "phase"] = "air"
df.loc[land_idx:, "phase"] = "land"
current_max = 0
fixed_phases = []
for p in df["phase"]:
idx = PHASE_INDEX.get(p, current_max)
if idx < current_max: idx = current_max
else: current_max = idx
fixed_phases.append(PHASE_ORDER[idx])
df["phase"] = fixed_phases
return df
def generate_feedback_csv_from_pose(pose_csv_path, out_path):
df_pose = pd.read_csv(pose_csv_path)
rows = []
for _, row in df_pose.iterrows():
lm = []
for i in range(33):
lm.append(type("LM", (), {"x": row[f"lm_{i}_x"], "y": row[f"lm_{i}_y"], "z": row.get(f"lm_{i}_z", 0.0)}))
ts = row["timestamp"]
knee_angle = extract_knee_angle(lm)
hip_y = (lm[23].y + lm[24].y) / 2
rows.append({
"timestamp": ts, "time_sec": ts / 1000.0, "knee_angle": knee_angle, "hip_y": hip_y,
"LEFT_HIP_x": lm[23].x, "LEFT_HIP_y": lm[23].y, "LEFT_KNEE_x": lm[25].x, "LEFT_KNEE_y": lm[25].y, "LEFT_ANKLE_x": lm[27].x, "LEFT_ANKLE_y": lm[27].y,
"RIGHT_HIP_x": lm[24].x, "RIGHT_HIP_y": lm[24].y, "RIGHT_KNEE_x": lm[26].x, "RIGHT_KNEE_y": lm[26].y, "RIGHT_ANKLE_x": lm[28].x, "RIGHT_ANKLE_y": lm[28].y,
"LEFT_SHOULDER_x": lm[11].x, "LEFT_SHOULDER_y": lm[11].y, "RIGHT_SHOULDER_x": lm[12].x, "RIGHT_SHOULDER_y": lm[12].y,
"LEFT_ELBOW_x": lm[13].x, "LEFT_ELBOW_y": lm[13].y, "RIGHT_ELBOW_x": lm[14].x, "RIGHT_ELBOW_y": lm[14].y,
"LEFT_WRIST_x": lm[15].x, "LEFT_WRIST_y": lm[15].y, "RIGHT_WRIST_x": lm[16].x, "RIGHT_WRIST_y": lm[16].y,
})
df_feedback = pd.DataFrame(rows)
if "phase" not in df_feedback.columns:
df_feedback = detect_ollie_phases(df_feedback)
df_feedback.to_csv(out_path, index=False)
return out_path
# Feedback Component Drawing
def knee_state_phase(row, side, phase):
hip, knee, ankle = f"{side}_HIP", f"{side}_KNEE", f"{side}_ANKLE"
a, b, c = (row[f"{hip}_x"], row[f"{hip}_y"]), (row[f"{knee}_x"], row[f"{knee}_y"]), (row[f"{ankle}_x"], row[f"{ankle}_y"])
angle = calculate_angle(a, b, c)
rules = KNEE_ANGLE_RULES.get(phase)
if not rules: return "neutral"
g_min, g_max = rules["green"]
y_min, y_max = rules["yellow"]
if g_min <= angle <= g_max: return "green"
elif y_min <= angle <= y_max: return "yellow"
else: return "red"
def draw_knee_feedback(frame, row, w, h, phase):
states = {}
colors = {"green": (0, 200, 0), "yellow": (0, 200, 200), "red": (0, 0, 255), "neutral": (160, 160, 160)}
for side in ["LEFT", "RIGHT"]:
state = knee_state_phase(row, side, phase)
states[f"{side}_KNEE"] = state
if state == "neutral": continue
cv2.circle(frame, (int(row[f"{side}_KNEE_x"] * w), int(row[f"{side}_KNEE_y"] * h)), 8, colors[state], -1)
return states
def hip_state_phase(row, phase):
try:
hx, hy, sx = (row["LEFT_HIP_x"] + row["RIGHT_HIP_x"]) / 2, (row["LEFT_HIP_y"] + row["RIGHT_HIP_y"]) / 2, (row["LEFT_SHOULDER_x"] + row["RIGHT_SHOULDER_x"]) / 2
except: return "neutral", None, None
dx = abs(hx - sx)
phase = str(phase).lower()
if phase in {"prep", "pop"}: gt, yt = 0.025, 0.05
elif phase == "air": gt, yt = 0.02, 0.04
elif phase == "land": gt, yt = 0.022, 0.045
else: gt, yt = 0.025, 0.05
state = "green" if dx <= gt else "yellow" if dx <= yt else "red"
return state, hx, hy
def draw_hip_feedback(frame, row, w, h, phase):
states = {}
state, hx, hy = hip_state_phase(row, phase); states["HIP"] = state
if state == "neutral": return states
cv2.circle(frame, (int(hx * w), int(hy * h)), 10, {"green": (0, 200, 0), "yellow": (0, 200, 200), "red": (0, 0, 255)}[state], -1)
return states
def elbow_state(row, phase):
phase = str(phase).lower()
try:
lh, rh = (row["LEFT_HIP_x"], row["LEFT_HIP_y"]), (row["RIGHT_HIP_x"], row["RIGHT_HIP_y"])
hc = ((lh[0] + rh[0]) / 2, (lh[1] + rh[1]) / 2)
except: return "neutral", None, None
ws, wp = "green", None
for side in ["LEFT", "RIGHT"]:
try:
s, e, wr = (row[f"{side}_SHOULDER_x"], row[f"{side}_SHOULDER_y"]), (row[f"{side}_ELBOW_x"], row[f"{side}_ELBOW_y"]), (row[f"{side}_WRIST_x"], row[f"{side}_WRIST_y"])
angle = calculate_angle(s, e, wr)
dist = ((e[0]-hc[0])**2 + (e[1]-hc[1])**2)**0.5
if phase == "air": ang_ok, dg, dy = (110 < angle < 160), 0.12, 0.18
else: ang_ok, dg, dy = (angle > 130), 0.10, 0.15
st = "green" if ang_ok and dist < dg else "yellow" if dist < dy else "red"
if st == "red" or (st == "yellow" and ws == "green"): ws, wp = st, e
except: continue
return ws, (wp[0] if wp else None), (wp[1] if wp else None)
def draw_elbow_feedback(frame, row, w, h, phase):
states = {}
st, ex, ey = elbow_state(row, phase); states["LEFT_ELBOW"] = states["RIGHT_ELBOW"] = st
if st == "neutral" or ex is None: return states
cv2.circle(frame, (int(ex * w), int(ey * h)), 8, {"green": (0, 200, 0), "yellow": (0, 200, 200), "red": (0, 0, 255)}[st], -1)
return states
def shoulder_state(row):
try:
ls, rs, lh, rh = (row["LEFT_SHOULDER_x"], row["LEFT_SHOULDER_y"]), (row["RIGHT_SHOULDER_x"], row["RIGHT_SHOULDER_y"]), (row["LEFT_HIP_x"], row["LEFT_HIP_y"]), (row["RIGHT_HIP_x"], row["RIGHT_HIP_y"])
sa, ha = math.degrees(math.atan2(rs[1]-ls[1], rs[0]-ls[0])), math.degrees(math.atan2(rh[1]-lh[1], rh[0]-lh[0]))
diff = min(abs(sa-ha), 360-abs(sa-ha))
st = "green" if diff < 10 else "yellow" if diff < 25 else "red"
return st, (ls[0]+rs[0])/2, (ls[1]+rs[1])/2
except: return "neutral", None, None
def draw_shoulder_feedback(frame, row, w, h):
states = {}
st, cx, cy = shoulder_state(row); states["LEFT_SHOULDER"] = states["RIGHT_SHOULDER"] = st
if st == "neutral": return states
cv2.circle(frame, (int(cx * w), int(cy * h)), 10, {"green": (0, 200, 0), "yellow": (0, 200, 200), "red": (0, 0, 255)}[st], -1)
return states
def foot_state(row, side, phase):
if str(phase).lower() == "air": return "neutral"
hip, knee, ankle = f"{side}_HIP", f"{side}_KNEE", f"{side}_ANKLE"
a, b, c = (row[f"{hip}_x"], row[f"{hip}_y"]), (row[f"{knee}_x"], row[f"{knee}_y"]), (row[f"{ankle}_x"], row[f"{ankle}_y"])
angle = calculate_angle(a, b, c)
return "green" if angle < 140 else "yellow" if angle < 160 else "red"
def draw_foot_feedback(frame, row, w, h, phase):
states = {}
colors = {"green": (0, 200, 0), "yellow": (0, 200, 200), "red": (0, 0, 255)}
for side in ["LEFT", "RIGHT"]:
state = foot_state(row, side, phase); states[f"{side}_ANKLE"] = state
if state == "neutral": continue
cv2.circle(frame, (int(row[f"{side}_ANKLE_x"] * w), int(row[f"{side}_ANKLE_y"] * h)), 8, colors[state], -1)
return states
# HUD and Correction
def get_phase_ghost_row(ghost_df, phase):
rows = ghost_df[ghost_df["phase"] == str(phase).lower()]
return rows.iloc[0] if len(rows) > 0 else None
def draw_feedback_hud(frame, joint_states, phase, anchor_xy):
h, w = frame.shape[:2]
ax, ay = int(anchor_xy[0]), int(anchor_xy[1])
HUD_WIDTH, SIDE_OFFSET, MARGIN = 380, 120, 20
PHASE_BAR_H, PHASE_TEXT_GAP, LINE_H, SECTION_GAP = 36, 40, 26, 14
if ax + SIDE_OFFSET + HUD_WIDTH < w - MARGIN: hud_x = ax + SIDE_OFFSET
else: hud_x = ax - SIDE_OFFSET - HUD_WIDTH
hud_x = min(max(hud_x, MARGIN), w - HUD_WIDTH - MARGIN)
hud_y = min(max(ay - 120, 40), h - MARGIN)
JOINT_CAT = {"ANKLE": "lower", "KNEE": "lower", "HIP": "core", "SHOULDER": "upper", "ELBOW": "upper"}
PHASE_PRIO = {"prep": ["ANKLE", "KNEE", "HIP"], "pop": ["ANKLE", "KNEE", "HIP"], "air": ["HIP", "SHOULDER", "ELBOW", "KNEE", "ANKLE"], "land": ["KNEE", "ANKLE", "HIP"]}
candidates = []
for joint, state in joint_states.items():
if state not in {"red", "yellow"}: continue
semantic = map_joint_by_stance(joint, STANCE)
txt = JOINT_FEEDBACK_TEXT.get(semantic, {}).get(phase, {}).get(state)
if not txt: continue
base = joint.split("_")[-1]
category = JOINT_CAT.get(base)
if not category: continue
prio = PHASE_PRIO.get(phase, []).index(base) if base in PHASE_PRIO.get(phase, []) else 99
candidates.append({"joint": joint, "category": category, "state": state, "text": txt, "color": (0, 0, 255) if state == "red" else (0, 200, 200), "prio": prio})
candidates.sort(key=lambda c: (0 if c["state"] == "red" else 1, c["prio"]))
selected, used = [], {"lower": 0, "upper": 0, "core": 0}
for c in candidates:
if c["state"] != "red": continue
if len(selected) >= 3: break
cat = c["category"]
if (phase == "air" and used[cat] >= 1): continue
if used[cat] == 0: selected.append(c); used[cat] += 1
for c in candidates:
if c["state"] != "red": continue
if len(selected) >= 3: break
if c in selected: continue
selected.append(c); used[c["category"]] += 1
for c in candidates:
if c["state"] != "yellow": continue
if len(selected) >= 3: break
cat = c["category"]
if phase == "air" and used[cat] >= 1: continue
selected.append(c); used[cat] += 1
wrapped_blocks = []
for c in selected:
words, line, wrapped = c["text"].split(" "), "", []
for word in words:
tw = cv2.getTextSize(line + word + " ", cv2.FONT_HERSHEY_SIMPLEX, 0.72, 2)[0][0]
if tw > HUD_WIDTH - 40: wrapped.append(line); line = word + " "
else: line += word + " "
if line: wrapped.append(line)
wrapped_blocks.append((wrapped, c["color"], c["joint"]))
y = hud_y + PHASE_BAR_H + PHASE_TEXT_GAP
for w_lines, _, _ in wrapped_blocks: y += len(w_lines) * LINE_H + SECTION_GAP
legend_y, ghost_y, stance_y = y, y + 38, y + 68
hud_bottom = stance_y + 18
overlay = frame.copy()
cv2.rectangle(overlay, (hud_x - 12, hud_y - 18), (hud_x + HUD_WIDTH, hud_bottom), (0, 0, 0), -1)
cv2.addWeighted(overlay, 0.65, frame, 0.35, 0, frame)
phases_bar = ["prep", "pop", "air", "land"]
seg_w = HUD_WIDTH // len(phases_bar)
for i, p in enumerate(phases_bar):
x1, x2 = hud_x + i * seg_w, hud_x + (i + 1) * seg_w - 6
if p == phase: color, t_color, thick = (255, 200, 0), (20, 20, 20), 2
elif phases_bar.index(p) < phases_bar.index(phase): color, t_color, thick = (150, 130, 90), (220, 220, 220), 1
else: color, t_color, thick = (70, 70, 70), (200, 200, 200), 1
cv2.rectangle(frame, (x1, hud_y), (x2, hud_y + PHASE_BAR_H), color, -1)
cv2.putText(frame, p.upper(), (x1 + 18, hud_y + 24), cv2.FONT_HERSHEY_DUPLEX, 0.62, t_color, thick, cv2.LINE_AA)
y = hud_y + PHASE_BAR_H + PHASE_TEXT_GAP
used_joints = []
for w_lines, color, joint in wrapped_blocks:
cv2.circle(frame, (hud_x + 8, y - 6), 4, color, -1)
for line in w_lines:
cv2.putText(frame, line, (hud_x + 20, y), cv2.FONT_HERSHEY_SIMPLEX, 0.72, color, 2, cv2.LINE_AA)
y += LINE_H
y += SECTION_GAP
used_joints.append(joint)
lx = hud_x
for lbl, clr in [("GOOD", (0, 200, 0)), ("OK", (0, 200, 200)), ("ADJUST", (0, 0, 255))]:
cv2.rectangle(frame, (lx, legend_y - 14), (lx + 22, legend_y + 6), clr, -1)
cv2.putText(frame, lbl, (lx + 30, legend_y + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.62, (245, 245, 245), 2, cv2.LINE_AA)
lx += 130
cv2.putText(frame, "Ghost pose = reference ollie", (hud_x, ghost_y), cv2.FONT_HERSHEY_SIMPLEX, 0.72, (255, 200, 0), 2, cv2.LINE_AA)
cv2.putText(frame, f"Stance: {STANCE.capitalize()}", (hud_x, stance_y), cv2.FONT_HERSHEY_SIMPLEX, 0.68, (230, 230, 230), 2, cv2.LINE_AA)
return hud_x, hud_y, HUD_WIDTH, used_joints
def draw_dynamic_correction_arrow(frame, joint_name, live_row, ghost_row, w, h, stance, phase, color=(0, 0, 255)):
global ARROW_LAST_VECTOR
lx, ly = live_row.get(f"{joint_name}_x"), live_row.get(f"{joint_name}_y")
if lx is None or ly is None: return
semantic = map_joint_by_stance(joint_name, stance)
gx, gy = ghost_row.get(f"{semantic}_x"), ghost_row.get(f"{semantic}_y")
if gx is None or gy is None: return
x1, y1, x2, y2 = int(lx * w), int(ly * h), int(gx * w), int(gy * h)
dx, dy = x2 - x1, y2 - y1
key = (joint_name, phase)
last = ARROW_LAST_VECTOR.get(key)
if phase == "land" and last is not None:
if dx * last[0] + dy * last[1] < 0: dx, dy = last
else: ARROW_LAST_VECTOR[key] = (dx, dy)
else: ARROW_LAST_VECTOR[key] = (dx, dy)
dist = (dx**2 + dy**2) ** 0.5
if dist < 10: return
scale = min(1.0, 60 / dist)
cv2.arrowedLine(frame, (x1, y1), (int(x1 + dx * scale), int(y1 + dy * scale)), color, 3, tipLength=0.35, line_type=cv2.LINE_AA)
def draw_ghost_pose(frame, ghost_row, live_row, w, h, joint_names, skeleton_edges, alpha=0.6):
ghost_overlay = frame.copy()
ghost_pts = {}
for j in joint_names:
xk, yk = f"{j}_x", f"{j}_y"
if xk in ghost_row and yk in ghost_row: ghost_pts[j] = np.array([ghost_row[xk] * w, ghost_row[yk] * h], dtype=np.float32)
if len(ghost_pts) < 4: return
live_pts = row_to_keypoints(live_row, w, h, joint_names)
if len(live_pts) < 4: return
live_anchor = get_mid_hip(live_pts)
if live_anchor is None or "LEFT_HIP" not in ghost_pts or "RIGHT_HIP" not in ghost_pts: return
ghost_anchor = 0.5 * (ghost_pts["LEFT_HIP"] + ghost_pts["RIGHT_HIP"])
phase, scale = str(live_row.get("phase", "")).lower(), 1.15
if phase == "prep": scale += 0.30
for j in ghost_pts: ghost_pts[j] = (ghost_pts[j] - ghost_anchor) * scale + live_anchor
for a, b in skeleton_edges:
if a in ghost_pts and b in ghost_pts: cv2.line(ghost_overlay, tuple(ghost_pts[a].astype(int)), tuple(ghost_pts[b].astype(int)), (255, 200, 0), 5, cv2.LINE_AA)
cv2.addWeighted(ghost_overlay, alpha, frame, 1 - alpha, 0, frame)
def draw_colored_skeleton(frame, row, w, h, joint_states):
colors = {"green": (0, 200, 0), "yellow": (0, 200, 200), "red": (0, 0, 255), "neutral": (180, 180, 180)}
for a, b in SKELETON_EDGES:
if f"{a}_x" not in row or f"{b}_x" not in row: continue
ax, ay = int(row[f"{a}_x"] * w), int(row[f"{a}_y"] * h)
bx, by = int(row[f"{b}_x"] * w), int(row[f"{b}_y"] * h)
st = joint_states.get(a) or joint_states.get(b)
cv2.line(frame, (ax, ay), (bx, by), colors.get(st, colors["neutral"]), 3)
# Integrated Movie Loop
def live_overlay_video(video_path: str, feedback_csv: str, ghost_df: pd.DataFrame, output_path: str, show_ghost_in_phases=None):
global STANCE, STANCE_CONFIDENCE, STANCE_LOCKED, ARROW_LAST_VECTOR
if show_ghost_in_phases is None: show_ghost_in_phases = {"prep", "pop", "air", "land"}
df = pd.read_csv(feedback_csv)
cap = cv2.VideoCapture(video_path)
if not cap.isOpened(): raise RuntimeError(f"Video fail: {video_path}")
w, h, fps = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)), cap.get(cv2.CAP_PROP_FPS) or 30.0
out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (w, h))
STANCE, STANCE_CONFIDENCE, STANCE_LOCKED, ARROW_LAST_VECTOR = "regular", 0.0, False, {}
L_X_S, R_X_S, core_app, core_dis, prev_row, m_cnt, e_cnt, f_since, f_idx = [], [], False, False, None, 0, 0, 0, 0
print(f"▶ Start rendering: {video_path}")
while True:
ret, frame = cap.read()
if not ret: break
ts = f_idx / fps
try:
row = df.iloc[(df["time_sec"] - ts).abs().argsort()[:1]].iloc[0]
except Exception as e:
print(f"Row lookup error at index {f_idx}: {e}")
break
phase = str(row.get("phase", "")).lower()
if not core_app and prev_row is not None:
if core_motion_score(prev_row, row) > 0.0013 and is_core_visible(row): m_cnt += 1
else: m_cnt = 0
if m_cnt >= 4: core_app, f_since, e_cnt = True, 0, 0
if core_app and not core_dis and prev_row is not None:
f_since += 1
if f_since >= 6 and (not is_core_visible(row, 1) or core_motion_score(prev_row, row) < 0.00030): e_cnt += 1
else: e_cnt = 0
if e_cnt >= 2: core_dis = True
prev_row = row
if not core_app or core_dis:
out.write(frame); f_idx += 1; continue
if phase == "prep" and not STANCE_LOCKED:
lx, rx = row.get("LEFT_ANKLE_x"), row.get("RIGHT_ANKLE_x")
if lx is not None and rx is not None: L_X_S.append(lx); R_X_S.append(rx)
if len(L_X_S) >= 15:
lm, rm = sum(L_X_S)/len(L_X_S), sum(R_X_S)/len(R_X_S)
STANCE, STANCE_CONFIDENCE, STANCE_LOCKED = ("regular" if lm < rm else "goofy"), abs(rm-lm), True
if phase in show_ghost_in_phases:
g_ref = get_phase_ghost_row(ghost_df, phase)
if g_ref is not None: draw_ghost_pose(frame, g_ref, row, w, h, GHOST_JOINT_NAMES, SKELETON_EDGES)
js = {}
js.update(draw_knee_feedback(frame, row, w, h, phase))
js.update(draw_hip_feedback(frame, row, w, h, phase))
js.update(draw_elbow_feedback(frame, row, w, h, phase))
js.update(draw_shoulder_feedback(frame, row, w, h))
js.update(draw_foot_feedback(frame, row, w, h, phase))
draw_colored_skeleton(frame, row, w, h, js)
anchor = get_mid_hip_from_row(row, w, h)
if anchor:
res = draw_feedback_hud(frame, js, phase, anchor)
if res and len(res) == 4 and phase in show_ghost_in_phases:
g_ref = get_phase_ghost_row(ghost_df, phase)
if g_ref is not None:
for j in res[3]:
if js.get(j) == "red": draw_dynamic_correction_arrow(frame, j, row, g_ref, w, h, STANCE, phase)
out.write(frame); f_idx += 1
print(f"▶ Render finished. Frames: {f_idx}")
cap.release(); out.release()
return output_path
# Entry Pipeline
def analyze_video_pipeline(input_video_path: str, ghost_df: pd.DataFrame, work_dir: str = "temp") -> str:
import subprocess
if not os.path.exists(input_video_path): raise FileNotFoundError(input_video_path)
os.makedirs(work_dir, exist_ok=True)
job_id = uuid.uuid4().hex[:8]
v_path = os.path.join(work_dir, f"{job_id}_input.mp4")
p_csv = os.path.join(work_dir, f"{job_id}_pose.csv")
f_csv = os.path.join(work_dir, f"{job_id}_feedback.csv")
o_video = os.path.join(work_dir, f"{job_id}_analysis.mp4")
shutil.copy(input_video_path, v_path)
print(f"▶ Processing job {job_id}")
print("▶ Extracting pose...")
extract_pose_to_csv(v_path, p_csv)
print("▶ Generating feedback...")
generate_feedback_csv_from_pose(p_csv, f_csv)
print("▶ Overlaying video...")
live_overlay_video(v_path, f_csv, ghost_df, o_video)
# FFmpeg Web Compatibility Fix
print("▶ Applying web compatibility fix (FFmpeg)...")
compatibility_video = os.path.join(work_dir, f"{job_id}_final.mp4")
cmd = [
"ffmpeg", "-y", "-i", o_video,
"-c:v", "libx264", "-pix_fmt", "yuv420p",
"-movflags", "+faststart", compatibility_video
]
subprocess.run(cmd, capture_output=True)
if os.path.exists(compatibility_video):
print("▶ Web fix applied successfully.")
return compatibility_video
else:
print("Web fix failed, returning original overlay.")
return o_video