Initial commit

main
copygirl 2 weeks ago
commit ed9a63596f
  1. 102
      Resources/debug_visuals.tscn
  2. 0
      _tracker/Project/.kiri_export_python
  3. BIN
      _tracker/Project/face_landmarker.task
  4. BIN
      _tracker/Project/hand_landmarker.task
  5. 594
      _tracker/Project/new_new_tracker.py
  6. 3
      _tracker/Project/requirements.txt
  7. 335
      copyMediaPipe.gd
  8. 47
      copyMediaPipe.tscn

@ -0,0 +1,102 @@
[gd_scene load_steps=11 format=3 uid="uid://bsldjkd051hfj"]
[sub_resource type="StandardMaterial3D" id="StandardMaterial3D_prsl8"]
shading_mode = 0
albedo_color = Color(0.960784, 0.239216, 0.388235, 1)
[sub_resource type="CylinderMesh" id="CylinderMesh_15vc6"]
material = SubResource("StandardMaterial3D_prsl8")
top_radius = 0.003
bottom_radius = 0.003
height = 0.25
radial_segments = 8
cap_top = false
cap_bottom = false
[sub_resource type="CylinderMesh" id="CylinderMesh_qyx1w"]
material = SubResource("StandardMaterial3D_prsl8")
top_radius = 0.0
bottom_radius = 0.01
height = 0.05
radial_segments = 8
cap_top = false
[sub_resource type="StandardMaterial3D" id="StandardMaterial3D_3ajew"]
shading_mode = 0
albedo_color = Color(0.670588, 0.905882, 0.227451, 1)
[sub_resource type="CylinderMesh" id="CylinderMesh_3buap"]
material = SubResource("StandardMaterial3D_3ajew")
top_radius = 0.003
bottom_radius = 0.003
height = 0.25
radial_segments = 8
cap_top = false
cap_bottom = false
[sub_resource type="CylinderMesh" id="CylinderMesh_81e2y"]
material = SubResource("StandardMaterial3D_3ajew")
top_radius = 0.0
bottom_radius = 0.01
height = 0.05
radial_segments = 8
cap_top = false
[sub_resource type="StandardMaterial3D" id="StandardMaterial3D_mioyw"]
shading_mode = 0
albedo_color = Color(0.2, 0.662745, 0.960784, 1)
[sub_resource type="CylinderMesh" id="CylinderMesh_kpq33"]
material = SubResource("StandardMaterial3D_mioyw")
top_radius = 0.003
bottom_radius = 0.003
height = 0.25
radial_segments = 8
cap_top = false
cap_bottom = false
[sub_resource type="StandardMaterial3D" id="StandardMaterial3D_u1ptn"]
shading_mode = 0
albedo_color = Color(0.2, 0.662745, 0.960784, 1)
[sub_resource type="CylinderMesh" id="CylinderMesh_7wtlq"]
material = SubResource("StandardMaterial3D_u1ptn")
top_radius = 0.0
bottom_radius = 0.01
height = 0.05
radial_segments = 8
cap_top = false
[node name="DebugVisuals" type="Node3D"]
[node name="XArrow" type="Node3D" parent="."]
transform = Transform3D(-4.37114e-08, 1, 0, -1, -4.37114e-08, 0, 0, 0, 1, 0, 0, 0)
[node name="Shaft" type="MeshInstance3D" parent="XArrow"]
transform = Transform3D(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0.125, 0)
mesh = SubResource("CylinderMesh_15vc6")
[node name="Tip" type="MeshInstance3D" parent="XArrow"]
transform = Transform3D(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0.275, 0)
mesh = SubResource("CylinderMesh_qyx1w")
[node name="YArrow" type="Node3D" parent="."]
[node name="Shaft" type="MeshInstance3D" parent="YArrow"]
transform = Transform3D(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0.125, 0)
mesh = SubResource("CylinderMesh_3buap")
[node name="Tip" type="MeshInstance3D" parent="YArrow"]
transform = Transform3D(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0.275, 0)
mesh = SubResource("CylinderMesh_81e2y")
[node name="ZArrow" type="Node3D" parent="."]
transform = Transform3D(1, 0, 0, 0, -4.37114e-08, -1, 0, 1, -4.37114e-08, 0, 0, 0)
[node name="Shaft" type="MeshInstance3D" parent="ZArrow"]
transform = Transform3D(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0.125, 0)
mesh = SubResource("CylinderMesh_kpq33")
[node name="Tip" type="MeshInstance3D" parent="ZArrow"]
transform = Transform3D(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0.275, 0)
mesh = SubResource("CylinderMesh_7wtlq")

@ -0,0 +1,594 @@
#!/usr/bin/python3
import copy
import gc
import json
import os
import re
import socket
import sys
import threading
import time
import traceback
import cv2
import mediapipe
import numpy
BaseOptions = mediapipe.tasks.BaseOptions
FaceLandmarker = mediapipe.tasks.vision.FaceLandmarker
FaceLandmarkerOptions = mediapipe.tasks.vision.FaceLandmarkerOptions
FaceLandmarkerResult = mediapipe.tasks.vision.FaceLandmarkerResult
# PoseLandmarker = mediapipe.tasks.vision.PoseLandmarker
# PoseLandmarkerOptions = mediapipe.tasks.vision.PoseLandmarkerOptions
HandLandmarker = mediapipe.tasks.vision.HandLandmarker
HandLandmarkerOptions = mediapipe.tasks.vision.HandLandmarkerOptions
HandLandmarkerResult = mediapipe.tasks.vision.HandLandmarkerResult
RunningMode = mediapipe.tasks.vision.RunningMode
# Indices of hand landmarks.
WRIST = 0
THUMB_CMC = 1
THUMB_MCP = 2
THUMB_IP = 3
THUMB_TIP = 4
INDEX_FINGER_MCP = 5
INDEX_FINGER_PIP = 6
INDEX_FINGER_DIP = 7
INDEX_FINGER_TIP = 8
MIDDLE_FINGER_MCP = 9
MIDDLE_FINGER_PIP = 10
MIDDLE_FINGER_DIP = 12
MIDDLE_FINGER_TIP = 13
RING_FINGER_MCP = 14
RING_FINGER_PIP = 15
RING_FINGER_DIP = 16
RING_FINGER_TIP = 17
PINKY_MCP = 18
PINKY_PIP = 19
PINKY_DIP = 20
PINKY_TIP = 21
DEFAULT_TRACKING_DATA = {
"face" : {
"confidence" : 0.0, # Currently either 0.0 or 1.0.
"transform" : [ [ 1.0, 0.0, 0.0, 0.0 ],
[ 0.0, 1.0, 0.0, 0.0 ],
[ 0.0, 0.0, 1.0, 0.0 ],
[ 0.0, 0.0, 0.0, 1.0 ], ],
"blendshapes" : {
"_neutral" : 0.0,
"browDownLeft" : 0.0,
"browDownRight" : 0.0,
"browInnerUp" : 0.0,
"browOuterUpLeft" : 0.0,
"browOuterUpRight" : 0.0,
"cheekPuff" : 0.0,
"cheekSquintLeft" : 0.0,
"cheekSquintRight" : 0.0,
"eyeBlinkLeft" : 0.0,
"eyeBlinkRight" : 0.0,
"eyeLookDownLeft" : 0.0,
"eyeLookDownRight" : 0.0,
"eyeLookInLeft" : 0.0,
"eyeLookInRight" : 0.0,
"eyeLookOutLeft" : 0.0,
"eyeLookOutRight" : 0.0,
"eyeLookUpLeft" : 0.0,
"eyeLookUpRight" : 0.0,
"eyeSquintLeft" : 0.0,
"eyeSquintRight" : 0.0,
"eyeWideLeft" : 0.0,
"eyeWideRight" : 0.0,
"jawForward" : 0.0,
"jawLeft" : 0.0,
"jawOpen" : 0.0,
"jawRight" : 0.0,
"mouthClose" : 0.0,
"mouthDimpleLeft" : 0.0,
"mouthDimpleRight" : 0.0,
"mouthFrownLeft" : 0.0,
"mouthFrownRight" : 0.0,
"mouthFunnel" : 0.0,
"mouthLeft" : 0.0,
"mouthLowerDownLeft" : 0.0,
"mouthLowerDownRight" : 0.0,
"mouthPressLeft" : 0.0,
"mouthPressRight" : 0.0,
"mouthPucker" : 0.0,
"mouthRight" : 0.0,
"mouthRollLower" : 0.0,
"mouthRollUpper" : 0.0,
"mouthShrugLower" : 0.0,
"mouthShrugUpper" : 0.0,
"mouthSmileLeft" : 0.0,
"mouthSmileRight" : 0.0,
"mouthStretchLeft" : 0.0,
"mouthStretchRight" : 0.0,
"mouthUpperUpLeft" : 0.0,
"mouthUpperUpRight" : 0.0,
"noseSneerLeft" : 0.0,
"noseSneerRight" : 0.0,
},
},
"hands" : {
"left" : {
"confidence" : 0.0,
"image_landmarks" : [ [ 0.0, 0.0, 0.0 ] ] * 21,
"world_landmarks" : [ [ 0.0, 0.0, 0.0 ] ] * 21,
},
"right" : {
"confidence" : 0.0,
"image_landmarks" : [ [ 0.0, 0.0, 0.0 ] ] * 21,
"world_landmarks" : [ [ 0.0, 0.0, 0.0 ] ] * 21,
},
},
}
class MediaPipeTracker:
def __init__(self):
self.the_big_ugly_mutex = threading.Lock()
self._tracker_worker_thread = None
# We need these to avoid deadlocks. If we're queueing frames
# faster than they can process, we'll hit a deadlock in
# MediaPipe.
self.frames_queued_face = 0
self.frames_queued_hands = 0
self.frames_queued_mutex = threading.Lock()
self.should_quit_threads = False
# Open the socket immediately so we can start sending error
# and status stuff to the hosting application.
self._udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_port_number = 7098
# FIXME: Make this editable.
self.minimum_frame_time = 0.016
self.video_device_index = -1
self.video_device_capture = None
self.landmarker = None
# self.landmarker_pose = None
self.landmarker_hands = None
# These are for more deadlock avoidance, so we can keep track
# of how behind the hand tracker is.
self._last_hand_result_timestamp = (time.time() * 1000)
self._last_hand_detect_timestamp = (time.time() * 1000)
self.output_data = copy.deepcopy(DEFAULT_TRACKING_DATA)
def _close_video_device(self):
with self.the_big_ugly_mutex:
self.video_device_capture = None
def _open_video_device(self):
with self.the_big_ugly_mutex:
if self.video_device_index == -1:
self.video_device_capture = None
return
# Check to make sure we don't already have the device open.
if self.video_device_capture != None:
return
# Try opening it!
self._write_log("Opening a video device!")
self.video_device_capture = cv2.VideoCapture(self.video_device_index)
# Enforce low-res capture for performance reasons.
try:
self.video_device_capture.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
self.video_device_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
except Exception as e:
# Failed? Whatever. Just use the resolution it's stuck with.
pass
if self.video_device_capture.isOpened():
self._write_log("Video device acquired")
else:
self.video_device_capture = None
self._write_log("Failed to open video device: %s" % str(self.video_device_index))
def _init_mediapipe(self):
asset_path = os.path.abspath(os.path.dirname(__file__))
face_landmarker_path = os.path.join(asset_path, "face_landmarker.task")
# FIXME: Last minute breakages.
# pose_landmarker_path = os.path.join(asset_path, "pose_landmarker.task")
hand_landmarker_path = os.path.join(asset_path, "hand_landmarker.task")
options = FaceLandmarkerOptions(
base_options = BaseOptions(model_asset_path = face_landmarker_path),
running_mode = RunningMode.LIVE_STREAM,
output_face_blendshapes = True,
output_facial_transformation_matrixes = True,
result_callback = self._handle_result_face)
# FIXME: Last minute breakages.
# options_pose = PoseLandmarkerOptions(
# base_options = BaseOptions(model_asset_path = pose_landmarker_path),
# running_mode = RunningMode.LIVE_STREAM,
# output_segmentation_masks = False,
# result_callback = self._handle_result_pose)
options_hands = HandLandmarkerOptions(
base_options = BaseOptions(model_asset_path = hand_landmarker_path),
running_mode = RunningMode.LIVE_STREAM,
num_hands = 2,
# FIXME: Make these adjustable.
# Were working in the 4.1 version.
min_hand_detection_confidence = 0.75,
min_tracking_confidence = 0.75,
min_hand_presence_confidence = 0.9,
result_callback = self._handle_result_hands)
self._shutdown_mediapipe()
self._write_log("Init face landmarker...")
self.landmarker = FaceLandmarker.create_from_options(options)
# self._write_log("Init pose landmarker...")
# self.landmarker_pose = PoseLandmarker.create_from_options(options_pose)
self._write_log("Init hand landmarker...")
self.landmarker_hands = HandLandmarker.create_from_options(options_hands)
self._write_log("Init done")
def _write_log(self, *args):
try:
print(*args)
except Exception as e:
# Concerning...
pass
try:
self._send_status_packet(" ".join(str(s) for s in args))
except Exception as e:
pass
def _send_status_packet(self, status_str):
output_data = { "status" : status_str }
output_data_json = json.dumps(output_data, indent=4).encode("utf-8")
self._udp_socket.sendto(output_data_json, ("127.0.0.1", self.udp_port_number))
# Create a face landmarker instance with the live stream mode:
def _handle_result_face(
self,
result: FaceLandmarkerResult,
output_image: mediapipe.Image,
timestamp_ms: int,
):
with self.frames_queued_mutex:
self.frames_queued_face -= 1
face = self.output_data["face"]
face["confidence"] = 0.0
if len(result.facial_transformation_matrixes) > 0:
face["confidence"] = 1.0
face["transform"] = result.facial_transformation_matrixes[0].tolist()
if len(result.face_blendshapes) > 0:
face["confidence"] = 1.0
for shape in result.face_blendshapes[0]:
face["blendshapes"][shape.category_name] = shape.score
# FIXME: If we ever come back to it, finish this.
def _handle_result_pose(
self,
x,
output_image: mediapipe.Image,
timestamp_ms: int
):
for y in x.pose_world_landmarks:
pass
def _handle_result_hands(
self,
result: HandLandmarkerResult,
output_image: mediapipe.Image,
timestamp_ms: int,
):
with self.frames_queued_mutex:
self.frames_queued_hands -= 1
self._last_hand_result_timestamp = timestamp_ms
self.output_data["hands"]["left"]["confidence"] = 0.0
self.output_data["hands"]["right"]["confidence"] = 0.0
# TODO: Get actual dimensions of the camera?
frame_height, frame_width = (640, 480)
focal_length = frame_width * 0.75
center = (frame_width / 2, frame_height / 2)
camera_matrix = numpy.array([
[ focal_length, 0, center[0] ],
[ 0, focal_length, center[1] ],
[ 0, 0, 1 ]
], dtype = "double")
distortion = numpy.zeros((4, 1))
for index in range(len(result.hand_landmarks)):
handedness = result.handedness[index][0]
image_landmarks = result.hand_landmarks[index]
world_landmarks = result.hand_world_landmarks[index]
side = handedness.category_name.lower()
hand = self.output_data["hands"][side]
hand["confidence"] = handedness.score
for [i, image_landmark] in enumerate(image_landmarks):
world_landmark = world_landmarks[i]
hand["image_landmarks"][i] = [ image_landmark.x, image_landmark.y, image_landmark.z ]
hand["world_landmarks"][i] = [ world_landmark.x, world_landmark.y, world_landmark.z ]
def _tracker_worker_thread_func(self):
try:
# Deadlock-avoidance.
self._write_log("locking mutex before init mediapipe")
with self.the_big_ugly_mutex:
self._init_mediapipe()
self._write_log("Initializing MediaPipe")
self.output_data = copy.deepcopy(DEFAULT_TRACKING_DATA)
input_image = None
success = True
start_time = time.time()
frame_count = 0
# We'll send this when we're panicking from too many frames queued, as
# a last-ditch attempt to un-clog the queue before we get a deadlock
# thanks to the MediaPipe bug.
blank_image_cv2 = numpy.zeros((1,1,3), dtype=numpy.uint8)
blank_image_mp = mediapipe.Image(mediapipe.ImageFormat.SRGB, data=blank_image_cv2)
# Main capturing loop.
last_frame_time = 0
while not self.should_quit_threads:
# Wait for the minimum frame time.
time_to_sleep = self.minimum_frame_time - (time.time() - last_frame_time)
if time_to_sleep > 0.0:
time.sleep(time_to_sleep)
# If the video device got disconnected, reconnect it.
self._open_video_device()
with self.the_big_ugly_mutex:
last_frame_time = time.time()
last_timestamp_used = int(time.time() * 1000)
# Capture a frame.
if self.video_device_capture:
success, image = self.video_device_capture.read()
else:
# No camera connected at the moment. Just feed in
# blank images.
success = True
image = blank_image_cv2.copy()
if success:
# Convert image to MediaPipe.
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# FIXME: Find out why we do this. I think it was
# mentioned in the MediaPipe tutorial.
image.flags.writeable = False
mp_image = mediapipe.Image(
image_format=mediapipe.ImageFormat.SRGB,
data=image)
# Generate a timestamp to feed into the MediaPipe
# system. If we're still somehow inside the same
# millisecond as the last processed image, then skip
# this frame.
this_time = int(time.time() * 1000)
if this_time <= last_timestamp_used:
continue
# Check to see if we have too many face tracking
# frames queued.
need_reset = False
with self.frames_queued_mutex:
if self.frames_queued_face > 5:
need_reset = True
else:
self.frames_queued_face += 1
# Reset if we have too face frames queued. Avoid a
# deadlock.
if need_reset:
# Deadlock-avoidance.
self.landmarker._runner.restart()
self.frames_queued_face = 0
else:
self.landmarker.detect_async(mp_image, this_time)
# Hands
# If the last result we got back was too much time
# since the last one we queued up, then wait until
# some amount of time (which we guess in the most
# convoluted way possible) has passed.
#
# FIXME: Make this less stupid. Make it make
# sense. Then apply it to the face tracking.
hand_landmarker_time_skew = self._last_hand_detect_timestamp - self._last_hand_result_timestamp
if hand_landmarker_time_skew > 50: # FIXME: Make configurable (milliseconds)
self._last_hand_result_timestamp += this_time - self._last_hand_detect_timestamp
else:
# Check to see if we have too many hand tracking
# frames queued.
need_reset = False
with self.frames_queued_mutex:
if self.frames_queued_face > 5:
need_reset = True
else:
self.frames_queued_hands += 1
# If we do have too many frames queued, just reset
# the tracker to avoid a deadlock.
if need_reset:
self.landmarker_hands._runner.restart()
self.frames_queued_hands = 0
else:
self.landmarker_hands.detect_async(mp_image, this_time)
self._last_hand_detect_timestamp = this_time
# Track the last timestamp because we have to keep
# these monotonically increasing and we can't send
# the same timestamp twice.
last_timestamp_used = this_time
output_data_json = json.dumps(self.output_data, indent=4).encode("utf-8")
with self.frames_queued_mutex:
status_packet_str = "Tracking data sending. (Queue: %2d hand, %2d face)" % (self.frames_queued_hands, self.frames_queued_face)
self._write_log(status_packet_str)
# Output the packet.
self._udp_socket.sendto(output_data_json, ("127.0.0.1", self.udp_port_number))
self._write_log("Quitting")
except Exception as e:
exception_string_generator = traceback.TracebackException.from_exception(e)
exception_string = "".join(exception_string_generator.format())
self._write_log(exception_string)
def start_tracker(self):
if self._tracker_worker_thread:
stop_tracker()
assert(not self._tracker_worker_thread)
self._write_log("Starting worker thread.")
self._tracker_worker_thread = threading.Thread(
target=self._tracker_worker_thread_func,
daemon=True)
self._tracker_worker_thread.start()
self._write_log("Starting worker thread done.")
def stop_tracker(self):
assert(self._tracker_worker_thread)
self.should_quit_threads = True
self._write_log("Waiting for worker thread to join.")
self._tracker_worker_thread.join()
self._write_log("Worker thread joined.")
self._tracker_worker_thread = None
self.should_quit_threads = False
# Set to -1 to just release all devices.
def set_video_device_number(self, new_number):
if self.video_device_index != new_number:
with self.the_big_ugly_mutex:
self.video_device_index = new_number
self._close_video_device()
self._open_video_device()
def set_udp_port_number(self, new_number):
with self.the_big_ugly_mutex:
self.udp_port_number = new_number
def _shutdown_mediapipe(self):
if self.landmarker: self.landmarker.close()
# if self.landmarker_pose: self.landmarker_pose.close()
if self.landmarker_hands: self.landmarker_hands.close()
self.landmarker = None
# self.landmarker_pose = None
self.landmarker_hands = None
# Grumblegrumblegrumble...
gc.collect()
def __del__(self):
with self.the_big_ugly_mutex:
self._close_video_device()
self._shutdown_mediapipe()
# ----------------------------------------------------------------------
mediapipe_controller = MediaPipeTracker()
# ----------------------------------------------------------------------
# External interface (called from Godot)
def start_tracker():
global mediapipe_controller
mediapipe_controller.start_tracker()
def stop_tracker():
global mediapipe_controller
mediapipe_controller.stop_tracker()
# Set to -1 to just release all devices.
def set_video_device_number(new_number):
global mediapipe_controller
mediapipe_controller.set_video_device_number(new_number)
def set_udp_port_number(new_number):
global mediapipe_controller
mediapipe_controller.set_udp_port_number(new_number)
def enumerate_camera_devices():
from cv2_enumerate_cameras import enumerate_cameras
capture_api_preference=cv2.CAP_ANY
# Having issues with GSTREAMER sources, so let's just use V4L only.
if sys.platform == "linux": capture_api_preference = cv2.CAP_V4L2
# On Linux, we sometimes see stuff showing up as just "video#", so
# let's at least try to correlate paths and IDs from
# /dev/v4l/by-id .
path_to_name_mappings = {}
if sys.platform == "linux":
try:
device_id_list = os.listdir("/dev/v4l/by-id")
for device_id in device_id_list:
full_link_path = os.path.join("/dev/v4l/by-id", device_id)
actual_dev_file = os.path.abspath(os.path.join("/dev/v4l/by-id", os.readlink(full_link_path)))
path_to_name_mappings[actual_dev_file] = device_id
except IOError:
pass
all_camera_data = []
for camera_info in enumerate_cameras(apiPreference=capture_api_preference):
camera_name = camera_info.name
if re.match("video[0-9]+", camera_info.name):
if camera_info.path in path_to_name_mappings:
camera_name = path_to_name_mappings[camera_info.path]
# Figure out the backend.
backend_index = camera_info.backend
if sys.platform == "linux":
# For some reason, in Linux the backend is stored in the
# index and not the backend field.
backend_index = camera_info.index - (camera_info.index % 100)
backend_name = cv2.videoio_registry.getBackendName(backend_index)
camera_data = {
"name" : camera_name,
"backend" : backend_name,
"path" : camera_info.path,
"index" : camera_info.index,
}
all_camera_data.append(camera_data)
return all_camera_data

@ -0,0 +1,3 @@
mediapipe==0.10.14
cv2-enumerate-cameras==1.1.10
numpy==1.26.0

@ -0,0 +1,335 @@
class_name copyMediaPipe
extends Mod_Base
# FIXME: Best to get this from the tracker process (if possible).
var camera_aspect_ratio := 4.0 / 3.0 # Logitech C920 default?
@onready var tracker_head : Node3D = $TrackingRoot/Head
@onready var tracker_hand_left : Node3D = $TrackingRoot/LeftHand
@onready var tracker_hand_right : Node3D = $TrackingRoot/RightHand
@onready var landmark_template : MeshInstance3D = $TrackingRoot/LandmarkTemplate
@onready var landmarks_hand_left : Array[MeshInstance3D] = []
@onready var landmarks_hand_right : Array[MeshInstance3D] = []
@onready var hands := {
left = {
tracker = tracker_hand_left,
landmarks = landmarks_hand_left,
},
right = {
tracker = tracker_hand_right,
landmarks = landmarks_hand_right,
},
}
func _ready() -> void:
setup_hand_landmarks()
var dir = get_script().get_path().get_base_dir()
var path = dir.path_join("_tracker/Project/new_new_tracker.py")
python_process = KiriPythonWrapperInstance.new(path)
if not python_process.setup_python(false):
OS.alert("Failed to setup tracker dependencies!")
start_process()
# FIXME: Don't hardcode the video device.
set_video_device(get_video_devices()[0])
start_tracker()
func _exit_tree() -> void:
stop_tracker()
stop_process()
# Called after mod is initialized or model is changed.
func scene_init():
pass
# Called before mod is removed, model is changed or application is shut down.
func scene_shutdown():
pass
func _process(_delta: float) -> void:
if is_tracker_running():
receive_tracker_packets()
func setup_hand_landmarks() -> void:
for side in hands:
var hand = hands[side]
for i in 21:
var landmark: MeshInstance3D = landmark_template.duplicate(0)
landmark.position = Vector3.ZERO
landmark.visible = true
hand.tracker.add_child(landmark)
hand.landmarks.append(landmark)
# -----------------------------------------------------------------------------
# Functions to start/stop the PYTHON TRACKER PROCESS and communicate with it.
# -----------------------------------------------------------------------------
var python_process: KiriPythonWrapperInstance
func start_process() -> void:
python_process.start_process(false)
func stop_process() -> void:
python_process.stop_process()
func is_process_running() -> bool:
return python_process.get_status() == KiriPythonWrapperInstance.KiriPythonWrapperStatus.STATUS_RUNNING
# [{ name: String, backend: String, path: String, index: int }]
func get_video_devices() -> Array:
assert(is_process_running())
var devices = python_process.call_rpc_sync("enumerate_camera_devices", [])
return devices if devices is Array else []
func set_video_device(device) -> void:
assert(is_process_running())
var index: int = device.index if device else -1
python_process.call_rpc_sync("set_video_device_number", [ index ])
# -----------------------------------------------------------------------------
# Functions to start/stop the TRACKER and receive packets coming from it.
# -----------------------------------------------------------------------------
var base_port := 7098
var udp_server: PacketPeerUDP
var udp_server_port: int
func start_tracker() -> void:
assert(!is_tracker_running())
udp_server = PacketPeerUDP.new()
# Find a port number that's open to use.
udp_server_port = base_port
while udp_server.bind(udp_server_port, "127.0.0.1") != OK:
udp_server_port += 1
python_process.call_rpc_sync("set_udp_port_number", [ udp_server_port ])
python_process.call_rpc_sync("start_tracker", [])
func stop_tracker() -> void:
if !is_tracker_running(): return # Do nothing if tracker isn't running.
python_process.call_rpc_sync("stop_tracker", [])
udp_server.close()
udp_server = null
func is_tracker_running() -> bool:
return udp_server != null
func receive_tracker_packets() -> void:
assert(is_tracker_running())
while true:
var bytes := udp_server.get_packet()
if bytes.size() == 0: break
var data = JSON.parse_string(bytes.get_string_from_utf8())
if data is Dictionary: process_tracker_data(data)
# -----------------------------------------------------------------------------
# Functions to PROCESS the incoming TRACKER DATA, and update tracker objects.
# -----------------------------------------------------------------------------
func process_tracker_data(data: Dictionary) -> void:
if "error" in data: on_tracker_error(data.error); return
if "status" in data: on_tracker_status(data.status); return
convert_tracker_data(data)
# MediaPipe reports hands from a viewer's perspective, not the
# person's own actual left and right hand, so swap them out here.
var left = data["hands"]["left"]
var right = data["hands"]["right"]
data["hands"]["left"] = right
data["hands"]["right"] = left
tracker_head.transform = data["face"]["transform"]
tracker_head.position /= 100 # Centimeters to meters.
# TODO: Actually use this.
var num_hands_detected := 0
for side in hands:
var hand = hands[side]
var tracker: Node3D = hand.tracker
# TODO: Don't automatically trust the handedness of the input data.
var hand_data = data["hands"][side]
var image_landmarks: Array[Vector3] = hand_data["image_landmarks"]
var world_landmarks: Array[Vector3] = hand_data["world_landmarks"]
# FIXME: Make this configurable.
var min_confidence_threshold := 0.85
if hand_data["confidence"] < min_confidence_threshold: continue
num_hands_detected += 1
# Mirror position on the X axis, since image landmarks are in view space.
for i in image_landmarks.size(): image_landmarks[i].x = (1 - image_landmarks[i].x)
tracker.basis = get_hand_rotation(world_landmarks)
tracker.position = get_hand_viewspace_origin(image_landmarks, world_landmarks, 2.0) \
* Vector3(7.0, 7.0, 3.5) # FIXME: Fudge factor to match better with world space.
# Translate landmarks so the origin is at the wrist.
var wrist_position := world_landmarks[0]
# World landmarks are in world space, so we have to "subtract" the hand rotation.
# Also, the rotation is all wrong, so apply that here as well.
var hand_rotation := tracker.basis.inverse() * Basis.from_euler(Vector3(TAU / 2, 0, 0))
for i in world_landmarks.size():
var pos := world_landmarks[i] - wrist_position
hand.landmarks[i].position = hand_rotation * pos
# TODO: Interpolation needs to be done outside of this function,
# as it could be called multiple times a frame, or not at all.
# Smoothly interpolate tracker transforms (in a framerate-independent way).
# var f := 0.0000000001 # Yes this value needs to be THAT small.
# tracker_head .transform = tracker_head .transform.interpolate_with(head_transform , 1 - f ** delta)
# tracker_hand_left .transform = tracker_hand_left .transform.interpolate_with(hand_left_transform , 1 - f ** delta)
# tracker_hand_right.transform = tracker_hand_right.transform.interpolate_with(hand_right_transform, 1 - f ** delta)
func on_tracker_status(status: String) -> void:
set_status(status)
func on_tracker_error(error: String) -> void:
print_log("Error: " + error)
# -----------------------------------------------------------------------------
# Functions that deal with CONVERTING the TRACKER DATA to Godot types.
# -----------------------------------------------------------------------------
## Converts the arrays inside data to known data types like Vector3 and Transform3D.
func convert_tracker_data(data: Dictionary) -> void:
data["face"]["transform"] = to_transform(data["face"]["transform"])
for side in data["hands"]:
var hand = data["hands"][side]
# Convert untyped array of arrays to typed Array[Vector3].
var image_landmarks = hand["image_landmarks"].map(to_vector)
var world_landmarks = hand["world_landmarks"].map(to_vector)
hand["image_landmarks"] = Array(image_landmarks, TYPE_VECTOR3, "", null)
hand["world_landmarks"] = Array(world_landmarks, TYPE_VECTOR3, "", null)
func to_vector(array) -> Vector3:
return Vector3(array[0], array[1], array[2])
func to_transform(matrix) -> Transform3D:
return Transform3D(
Basis(Vector3(matrix[0][0], matrix[1][0], matrix[2][0]),
Vector3(matrix[0][1], matrix[1][1], matrix[2][1]),
Vector3(matrix[0][2], matrix[1][2], matrix[2][2])),
Vector3(matrix[0][3], matrix[1][3], matrix[2][3]))
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
const WRIST := 0
const THUMB_CMC := 1
const THUMB_MCP := 2
const THUMB_IP := 3
const THUMB_TIP := 4
const INDEX_FINGER_MCP := 5
const INDEX_FINGER_PIP := 6
const INDEX_FINGER_DIP := 7
const INDEX_FINGER_TIP := 8
const MIDDLE_FINGER_MCP := 9
const MIDDLE_FINGER_PIP := 10
const MIDDLE_FINGER_DIP := 12
const MIDDLE_FINGER_TIP := 13
const RING_FINGER_MCP := 14
const RING_FINGER_PIP := 15
const RING_FINGER_DIP := 16
const RING_FINGER_TIP := 17
const PINKY_MCP := 18
const PINKY_PIP := 19
const PINKY_DIP := 20
const PINKY_TIP := 21
# FIXME: I changed the way this was calculated and it doesn't quite fit the data right?
func get_hand_rotation(landmarks: Array[Vector3]) -> Basis:
var knuckles_center := (landmarks[INDEX_FINGER_MCP] + landmarks[RING_FINGER_TIP]) / 2
var wrist_to_knuckles := landmarks[WRIST].direction_to(knuckles_center)
var towards_thumb := landmarks[RING_FINGER_TIP].direction_to(landmarks[INDEX_FINGER_MCP])
var up := wrist_to_knuckles.cross(towards_thumb)
return Basis.looking_at(wrist_to_knuckles, up, true)
## Attempt to figure out the hand origin in viewspace.
## `hand_to_head_scale` is a fudge value so that we can attempt
## to force the hand and head into the same scale range, roughly.
func get_hand_viewspace_origin(
image_landmarks: Array[Vector3],
_world_landmarks: Array[Vector3],
hand_to_head_scale: float,
) -> Vector3:
# Values found through experimentation.
var known_distances := [
[ WRIST , THUMB_CMC , 0.053861 ],
[ THUMB_CMC , THUMB_MCP , 0.057096 ],
[ THUMB_MCP , THUMB_IP , 0.048795 ],
[ THUMB_IP , THUMB_TIP , 0.039851 ],
[ WRIST , INDEX_FINGER_MCP , 0.152538 ],
[ WRIST , RING_FINGER_TIP , 0.138711 ],
[ INDEX_FINGER_MCP , MIDDLE_FINGER_MCP , 0.029368 ],
[ MIDDLE_FINGER_MCP , MIDDLE_FINGER_TIP , 0.027699 ],
[ MIDDLE_FINGER_TIP , RING_FINGER_TIP , 0.032673 ],
]
# FIXME: Hardcoded fudge-factor
for d in known_distances: d[2] *= 0.25
# Iterate through known distances and add up the weighted average.
var fake_z_avg := 0.0
var total_avg_weight := 0.0
for d in known_distances:
var pt0 := image_landmarks[d[0]]
var pt1 := image_landmarks[d[1]]
# Figure out a weighted average based on how much the vector
# is facing the camera Z axis. Stuff facing into the camera
# has less accurate results, so weight it lower.
var normvec := (pt0 - pt1).normalized()
var weight := clampf(1.0 - 2.0 * abs(normvec[2]), 0.0, 1.0)
# Add to the average.
fake_z_avg += guess_depth_from_known_distance(
pt0, pt1, d[2] / hand_to_head_scale) * weight
total_avg_weight += weight
if abs(total_avg_weight) < 0.000001:
print("HEY THE THING HAPPENED", total_avg_weight)
# FIXME: Fudge value because I'm tired of this thing throwing
# exceptions all the time. Do an actual fix later.
total_avg_weight = 0.01
# Finish the average.
fake_z_avg = fake_z_avg / total_avg_weight
return ndc_to_viewspace(image_landmarks[0], -fake_z_avg)
## Figure out a depth value based on the distance between known
## normalized (clip-space) coordinates of landmarks, compared to what
## we would expect the average distance between those points to be.
func guess_depth_from_known_distance(left: Vector3, right: Vector3, distance: float) -> float:
var dist_clip := left - right
dist_clip.x *= camera_aspect_ratio # FIXME: Fudge factor
return 1.0 / (dist_clip.length() / distance)
func ndc_to_viewspace(v: Vector3, z_offset: float) -> Vector3:
# This (px, py) is pretty important and Google's
# documentation didn't give much useful info about it.
var px := 0.5
var py := 0.5
# These default to 1.0, 1.0 according to Google's docs.
# I guess that's probably fine for default camera stuff.
var fx := 1.0
var fy := camera_aspect_ratio
# Inverse equation from the section on NDC space here
# https://google.github.io/mediapipe/solutions/objectron.html#coordinate-systems
# https://web.archive.org/web/20220727063132/https://google.github.io/mediapipe/solutions/objectron.html#coordinate-systems
# which describes going from camera coordinates to NDC space. It's kinda
# ambiguous on terms, but this seems to work to get view space coordinates.
# With this, coordinates seem to be evenly scaled (between x/y and z) and in view space.
var z_scale := 1.0
var z := 1.0 / (-v[2] + (1.0 / z_offset) * z_scale)
var x := (v[0] - px) * z / fx
var y := (v[1] - py) * z / fy
return Vector3(x, y, z)

@ -0,0 +1,47 @@
[gd_scene load_steps=6 format=3 uid="uid://dykgejoidme3d"]
[ext_resource type="Script" path="res://Mods/copyMediaPipe/copyMediaPipe.gd" id="1_0kpr8"]
[ext_resource type="PackedScene" uid="uid://bsldjkd051hfj" path="res://Mods/copyMediaPipe/Resources/debug_visuals.tscn" id="2_8wmot"]
[sub_resource type="BoxMesh" id="BoxMesh_wtdv4"]
size = Vector3(0.2, 0.2, 0.2)
[sub_resource type="StandardMaterial3D" id="StandardMaterial3D_wrvph"]
shading_mode = 0
albedo_color = Color(0, 1, 0, 1)
[sub_resource type="SphereMesh" id="SphereMesh_xb663"]
material = SubResource("StandardMaterial3D_wrvph")
radius = 0.005
height = 0.01
radial_segments = 6
rings = 3
[node name="copyMediaPipe" type="Node"]
script = ExtResource("1_0kpr8")
[node name="TrackingRoot" type="Node3D" parent="."]
transform = Transform3D(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1.5, 0.5)
[node name="DebugVisuals" parent="TrackingRoot" instance=ExtResource("2_8wmot")]
[node name="Head" type="MeshInstance3D" parent="TrackingRoot"]
transform = Transform3D(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, -0.3)
mesh = SubResource("BoxMesh_wtdv4")
[node name="DebugVisuals" parent="TrackingRoot/Head" instance=ExtResource("2_8wmot")]
[node name="LeftHand" type="Node3D" parent="TrackingRoot"]
transform = Transform3D(1, 0, 0, 0, 1, 0, 0, 0, 1, -0.5, 0, -0.3)
[node name="DebugVisuals" parent="TrackingRoot/LeftHand" instance=ExtResource("2_8wmot")]
[node name="RightHand" type="Node3D" parent="TrackingRoot"]
transform = Transform3D(1, 0, 0, 0, 1, 0, 0, 0, 1, 0.5, 0, -0.3)
[node name="DebugVisuals" parent="TrackingRoot/RightHand" instance=ExtResource("2_8wmot")]
[node name="LandmarkTemplate" type="MeshInstance3D" parent="TrackingRoot"]
transform = Transform3D(1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0.5, 0)
visible = false
mesh = SubResource("SphereMesh_xb663")
Loading…
Cancel
Save