#!/usr/bin/python3 import copy import gc import json import os import re import socket import sys import threading import time import traceback import cv2 import mediapipe import numpy BaseOptions = mediapipe.tasks.BaseOptions FaceLandmarker = mediapipe.tasks.vision.FaceLandmarker FaceLandmarkerOptions = mediapipe.tasks.vision.FaceLandmarkerOptions FaceLandmarkerResult = mediapipe.tasks.vision.FaceLandmarkerResult # PoseLandmarker = mediapipe.tasks.vision.PoseLandmarker # PoseLandmarkerOptions = mediapipe.tasks.vision.PoseLandmarkerOptions HandLandmarker = mediapipe.tasks.vision.HandLandmarker HandLandmarkerOptions = mediapipe.tasks.vision.HandLandmarkerOptions HandLandmarkerResult = mediapipe.tasks.vision.HandLandmarkerResult RunningMode = mediapipe.tasks.vision.RunningMode # Indices of hand landmarks. WRIST = 0 THUMB_CMC = 1 THUMB_MCP = 2 THUMB_IP = 3 THUMB_TIP = 4 INDEX_FINGER_MCP = 5 INDEX_FINGER_PIP = 6 INDEX_FINGER_DIP = 7 INDEX_FINGER_TIP = 8 MIDDLE_FINGER_MCP = 9 MIDDLE_FINGER_PIP = 10 MIDDLE_FINGER_DIP = 12 MIDDLE_FINGER_TIP = 13 RING_FINGER_MCP = 14 RING_FINGER_PIP = 15 RING_FINGER_DIP = 16 RING_FINGER_TIP = 17 PINKY_MCP = 18 PINKY_PIP = 19 PINKY_DIP = 20 PINKY_TIP = 21 DEFAULT_TRACKING_DATA = { "face" : { "confidence" : 0.0, # Currently either 0.0 or 1.0. "transform" : [ [ 1.0, 0.0, 0.0, 0.0 ], [ 0.0, 1.0, 0.0, 0.0 ], [ 0.0, 0.0, 1.0, 0.0 ], [ 0.0, 0.0, 0.0, 1.0 ], ], "blendshapes" : { "_neutral" : 0.0, "browDownLeft" : 0.0, "browDownRight" : 0.0, "browInnerUp" : 0.0, "browOuterUpLeft" : 0.0, "browOuterUpRight" : 0.0, "cheekPuff" : 0.0, "cheekSquintLeft" : 0.0, "cheekSquintRight" : 0.0, "eyeBlinkLeft" : 0.0, "eyeBlinkRight" : 0.0, "eyeLookDownLeft" : 0.0, "eyeLookDownRight" : 0.0, "eyeLookInLeft" : 0.0, "eyeLookInRight" : 0.0, "eyeLookOutLeft" : 0.0, "eyeLookOutRight" : 0.0, "eyeLookUpLeft" : 0.0, "eyeLookUpRight" : 0.0, "eyeSquintLeft" : 0.0, "eyeSquintRight" : 0.0, "eyeWideLeft" : 0.0, "eyeWideRight" : 0.0, "jawForward" : 0.0, "jawLeft" : 0.0, "jawOpen" : 0.0, "jawRight" : 0.0, "mouthClose" : 0.0, "mouthDimpleLeft" : 0.0, "mouthDimpleRight" : 0.0, "mouthFrownLeft" : 0.0, "mouthFrownRight" : 0.0, "mouthFunnel" : 0.0, "mouthLeft" : 0.0, "mouthLowerDownLeft" : 0.0, "mouthLowerDownRight" : 0.0, "mouthPressLeft" : 0.0, "mouthPressRight" : 0.0, "mouthPucker" : 0.0, "mouthRight" : 0.0, "mouthRollLower" : 0.0, "mouthRollUpper" : 0.0, "mouthShrugLower" : 0.0, "mouthShrugUpper" : 0.0, "mouthSmileLeft" : 0.0, "mouthSmileRight" : 0.0, "mouthStretchLeft" : 0.0, "mouthStretchRight" : 0.0, "mouthUpperUpLeft" : 0.0, "mouthUpperUpRight" : 0.0, "noseSneerLeft" : 0.0, "noseSneerRight" : 0.0, }, }, "hands" : { "left" : { "confidence" : 0.0, "image_landmarks" : [ [ 0.0, 0.0, 0.0 ] ] * 21, "world_landmarks" : [ [ 0.0, 0.0, 0.0 ] ] * 21, }, "right" : { "confidence" : 0.0, "image_landmarks" : [ [ 0.0, 0.0, 0.0 ] ] * 21, "world_landmarks" : [ [ 0.0, 0.0, 0.0 ] ] * 21, }, }, } class MediaPipeTracker: def __init__(self): self.the_big_ugly_mutex = threading.Lock() self._tracker_worker_thread = None # We need these to avoid deadlocks. If we're queueing frames # faster than they can process, we'll hit a deadlock in # MediaPipe. self.frames_queued_face = 0 self.frames_queued_hands = 0 self.frames_queued_mutex = threading.Lock() self.should_quit_threads = False # Open the socket immediately so we can start sending error # and status stuff to the hosting application. self._udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_port_number = 7098 # FIXME: Make this editable. self.minimum_frame_time = 0.016 self.video_device_index = -1 self.video_device_capture = None self.landmarker = None # self.landmarker_pose = None self.landmarker_hands = None # These are for more deadlock avoidance, so we can keep track # of how behind the hand tracker is. self._last_hand_result_timestamp = (time.time() * 1000) self._last_hand_detect_timestamp = (time.time() * 1000) self.output_data = copy.deepcopy(DEFAULT_TRACKING_DATA) def _close_video_device(self): with self.the_big_ugly_mutex: self.video_device_capture = None def _open_video_device(self): with self.the_big_ugly_mutex: if self.video_device_index == -1: self.video_device_capture = None return # Check to make sure we don't already have the device open. if self.video_device_capture != None: return # Try opening it! self._write_log("Opening a video device!") self.video_device_capture = cv2.VideoCapture(self.video_device_index) # Enforce low-res capture for performance reasons. try: self.video_device_capture.set(cv2.CAP_PROP_FRAME_WIDTH, 640) self.video_device_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) except Exception as e: # Failed? Whatever. Just use the resolution it's stuck with. pass if self.video_device_capture.isOpened(): self._write_log("Video device acquired") else: self.video_device_capture = None self._write_log("Failed to open video device: %s" % str(self.video_device_index)) def _init_mediapipe(self): asset_path = os.path.abspath(os.path.dirname(__file__)) face_landmarker_path = os.path.join(asset_path, "face_landmarker.task") # FIXME: Last minute breakages. # pose_landmarker_path = os.path.join(asset_path, "pose_landmarker.task") hand_landmarker_path = os.path.join(asset_path, "hand_landmarker.task") options = FaceLandmarkerOptions( base_options = BaseOptions(model_asset_path = face_landmarker_path), running_mode = RunningMode.LIVE_STREAM, output_face_blendshapes = True, output_facial_transformation_matrixes = True, result_callback = self._handle_result_face) # FIXME: Last minute breakages. # options_pose = PoseLandmarkerOptions( # base_options = BaseOptions(model_asset_path = pose_landmarker_path), # running_mode = RunningMode.LIVE_STREAM, # output_segmentation_masks = False, # result_callback = self._handle_result_pose) options_hands = HandLandmarkerOptions( base_options = BaseOptions(model_asset_path = hand_landmarker_path), running_mode = RunningMode.LIVE_STREAM, num_hands = 2, # FIXME: Make these adjustable. # Were working in the 4.1 version. min_hand_detection_confidence = 0.75, min_tracking_confidence = 0.75, min_hand_presence_confidence = 0.9, result_callback = self._handle_result_hands) self._shutdown_mediapipe() self._write_log("Init face landmarker...") self.landmarker = FaceLandmarker.create_from_options(options) # self._write_log("Init pose landmarker...") # self.landmarker_pose = PoseLandmarker.create_from_options(options_pose) self._write_log("Init hand landmarker...") self.landmarker_hands = HandLandmarker.create_from_options(options_hands) self._write_log("Init done") def _write_log(self, *args): try: print(*args) except Exception as e: # Concerning... pass try: self._send_status_packet(" ".join(str(s) for s in args)) except Exception as e: pass def _send_status_packet(self, status_str): output_data = { "status" : status_str } output_data_json = json.dumps(output_data, indent=4).encode("utf-8") self._udp_socket.sendto(output_data_json, ("127.0.0.1", self.udp_port_number)) # Create a face landmarker instance with the live stream mode: def _handle_result_face( self, result: FaceLandmarkerResult, output_image: mediapipe.Image, timestamp_ms: int, ): with self.frames_queued_mutex: self.frames_queued_face -= 1 face = self.output_data["face"] face["confidence"] = 0.0 if len(result.facial_transformation_matrixes) > 0: face["confidence"] = 1.0 face["transform"] = result.facial_transformation_matrixes[0].tolist() if len(result.face_blendshapes) > 0: face["confidence"] = 1.0 for shape in result.face_blendshapes[0]: face["blendshapes"][shape.category_name] = shape.score # FIXME: If we ever come back to it, finish this. def _handle_result_pose( self, x, output_image: mediapipe.Image, timestamp_ms: int ): for y in x.pose_world_landmarks: pass def _handle_result_hands( self, result: HandLandmarkerResult, output_image: mediapipe.Image, timestamp_ms: int, ): with self.frames_queued_mutex: self.frames_queued_hands -= 1 self._last_hand_result_timestamp = timestamp_ms self.output_data["hands"]["left"]["confidence"] = 0.0 self.output_data["hands"]["right"]["confidence"] = 0.0 # TODO: Get actual dimensions of the camera? frame_height, frame_width = (640, 480) focal_length = frame_width * 0.75 center = (frame_width / 2, frame_height / 2) camera_matrix = numpy.array([ [ focal_length, 0, center[0] ], [ 0, focal_length, center[1] ], [ 0, 0, 1 ] ], dtype = "double") distortion = numpy.zeros((4, 1)) for index in range(len(result.hand_landmarks)): handedness = result.handedness[index][0] image_landmarks = result.hand_landmarks[index] world_landmarks = result.hand_world_landmarks[index] side = handedness.category_name.lower() hand = self.output_data["hands"][side] hand["confidence"] = handedness.score for [i, image_landmark] in enumerate(image_landmarks): world_landmark = world_landmarks[i] hand["image_landmarks"][i] = [ image_landmark.x, image_landmark.y, image_landmark.z ] hand["world_landmarks"][i] = [ world_landmark.x, world_landmark.y, world_landmark.z ] def _tracker_worker_thread_func(self): try: # Deadlock-avoidance. self._write_log("locking mutex before init mediapipe") with self.the_big_ugly_mutex: self._init_mediapipe() self._write_log("Initializing MediaPipe") self.output_data = copy.deepcopy(DEFAULT_TRACKING_DATA) input_image = None success = True start_time = time.time() frame_count = 0 # We'll send this when we're panicking from too many frames queued, as # a last-ditch attempt to un-clog the queue before we get a deadlock # thanks to the MediaPipe bug. blank_image_cv2 = numpy.zeros((1,1,3), dtype=numpy.uint8) blank_image_mp = mediapipe.Image(mediapipe.ImageFormat.SRGB, data=blank_image_cv2) # Main capturing loop. last_frame_time = 0 while not self.should_quit_threads: # Wait for the minimum frame time. time_to_sleep = self.minimum_frame_time - (time.time() - last_frame_time) if time_to_sleep > 0.0: time.sleep(time_to_sleep) # If the video device got disconnected, reconnect it. self._open_video_device() with self.the_big_ugly_mutex: last_frame_time = time.time() last_timestamp_used = int(time.time() * 1000) # Capture a frame. if self.video_device_capture: success, image = self.video_device_capture.read() else: # No camera connected at the moment. Just feed in # blank images. success = True image = blank_image_cv2.copy() if success: # Convert image to MediaPipe. image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # FIXME: Find out why we do this. I think it was # mentioned in the MediaPipe tutorial. image.flags.writeable = False mp_image = mediapipe.Image( image_format=mediapipe.ImageFormat.SRGB, data=image) # Generate a timestamp to feed into the MediaPipe # system. If we're still somehow inside the same # millisecond as the last processed image, then skip # this frame. this_time = int(time.time() * 1000) if this_time <= last_timestamp_used: continue # Check to see if we have too many face tracking # frames queued. need_reset = False with self.frames_queued_mutex: if self.frames_queued_face > 5: need_reset = True else: self.frames_queued_face += 1 # Reset if we have too face frames queued. Avoid a # deadlock. if need_reset: # Deadlock-avoidance. self.landmarker._runner.restart() self.frames_queued_face = 0 else: self.landmarker.detect_async(mp_image, this_time) # Hands # If the last result we got back was too much time # since the last one we queued up, then wait until # some amount of time (which we guess in the most # convoluted way possible) has passed. # # FIXME: Make this less stupid. Make it make # sense. Then apply it to the face tracking. hand_landmarker_time_skew = self._last_hand_detect_timestamp - self._last_hand_result_timestamp if hand_landmarker_time_skew > 50: # FIXME: Make configurable (milliseconds) self._last_hand_result_timestamp += this_time - self._last_hand_detect_timestamp else: # Check to see if we have too many hand tracking # frames queued. need_reset = False with self.frames_queued_mutex: if self.frames_queued_face > 5: need_reset = True else: self.frames_queued_hands += 1 # If we do have too many frames queued, just reset # the tracker to avoid a deadlock. if need_reset: self.landmarker_hands._runner.restart() self.frames_queued_hands = 0 else: self.landmarker_hands.detect_async(mp_image, this_time) self._last_hand_detect_timestamp = this_time # Track the last timestamp because we have to keep # these monotonically increasing and we can't send # the same timestamp twice. last_timestamp_used = this_time output_data_json = json.dumps(self.output_data, indent=4).encode("utf-8") with self.frames_queued_mutex: status_packet_str = "Tracking data sending. (Queue: %2d hand, %2d face)" % (self.frames_queued_hands, self.frames_queued_face) self._write_log(status_packet_str) # Output the packet. self._udp_socket.sendto(output_data_json, ("127.0.0.1", self.udp_port_number)) self._write_log("Quitting") except Exception as e: exception_string_generator = traceback.TracebackException.from_exception(e) exception_string = "".join(exception_string_generator.format()) self._write_log(exception_string) def start_tracker(self): if self._tracker_worker_thread: stop_tracker() assert(not self._tracker_worker_thread) self._write_log("Starting worker thread.") self._tracker_worker_thread = threading.Thread( target=self._tracker_worker_thread_func, daemon=True) self._tracker_worker_thread.start() self._write_log("Starting worker thread done.") def stop_tracker(self): assert(self._tracker_worker_thread) self.should_quit_threads = True self._write_log("Waiting for worker thread to join.") self._tracker_worker_thread.join() self._write_log("Worker thread joined.") self._tracker_worker_thread = None self.should_quit_threads = False # Set to -1 to just release all devices. def set_video_device_number(self, new_number): if self.video_device_index != new_number: with self.the_big_ugly_mutex: self.video_device_index = new_number self._close_video_device() self._open_video_device() def set_udp_port_number(self, new_number): with self.the_big_ugly_mutex: self.udp_port_number = new_number def _shutdown_mediapipe(self): if self.landmarker: self.landmarker.close() # if self.landmarker_pose: self.landmarker_pose.close() if self.landmarker_hands: self.landmarker_hands.close() self.landmarker = None # self.landmarker_pose = None self.landmarker_hands = None # Grumblegrumblegrumble... gc.collect() def __del__(self): with self.the_big_ugly_mutex: self._close_video_device() self._shutdown_mediapipe() # ---------------------------------------------------------------------- mediapipe_controller = MediaPipeTracker() # ---------------------------------------------------------------------- # External interface (called from Godot) def start_tracker(): global mediapipe_controller mediapipe_controller.start_tracker() def stop_tracker(): global mediapipe_controller mediapipe_controller.stop_tracker() # Set to -1 to just release all devices. def set_video_device_number(new_number): global mediapipe_controller mediapipe_controller.set_video_device_number(new_number) def set_udp_port_number(new_number): global mediapipe_controller mediapipe_controller.set_udp_port_number(new_number) def enumerate_camera_devices(): from cv2_enumerate_cameras import enumerate_cameras capture_api_preference=cv2.CAP_ANY # Having issues with GSTREAMER sources, so let's just use V4L only. if sys.platform == "linux": capture_api_preference = cv2.CAP_V4L2 # On Linux, we sometimes see stuff showing up as just "video#", so # let's at least try to correlate paths and IDs from # /dev/v4l/by-id . path_to_name_mappings = {} if sys.platform == "linux": try: device_id_list = os.listdir("/dev/v4l/by-id") for device_id in device_id_list: full_link_path = os.path.join("/dev/v4l/by-id", device_id) actual_dev_file = os.path.abspath(os.path.join("/dev/v4l/by-id", os.readlink(full_link_path))) path_to_name_mappings[actual_dev_file] = device_id except IOError: pass all_camera_data = [] for camera_info in enumerate_cameras(apiPreference=capture_api_preference): camera_name = camera_info.name if re.match("video[0-9]+", camera_info.name): if camera_info.path in path_to_name_mappings: camera_name = path_to_name_mappings[camera_info.path] # Figure out the backend. backend_index = camera_info.backend if sys.platform == "linux": # For some reason, in Linux the backend is stored in the # index and not the backend field. backend_index = camera_info.index - (camera_info.index % 100) backend_name = cv2.videoio_registry.getBackendName(backend_index) camera_data = { "name" : camera_name, "backend" : backend_name, "path" : camera_info.path, "index" : camera_info.index, } all_camera_data.append(camera_data) return all_camera_data