You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
595 lines
22 KiB
595 lines
22 KiB
4 weeks ago
|
#!/usr/bin/python3
|
||
|
|
||
|
import copy
|
||
|
import gc
|
||
|
import json
|
||
|
import os
|
||
|
import re
|
||
|
import socket
|
||
|
import sys
|
||
|
import threading
|
||
|
import time
|
||
|
import traceback
|
||
|
|
||
|
import cv2
|
||
|
import mediapipe
|
||
|
import numpy
|
||
|
|
||
|
BaseOptions = mediapipe.tasks.BaseOptions
|
||
|
FaceLandmarker = mediapipe.tasks.vision.FaceLandmarker
|
||
|
FaceLandmarkerOptions = mediapipe.tasks.vision.FaceLandmarkerOptions
|
||
|
FaceLandmarkerResult = mediapipe.tasks.vision.FaceLandmarkerResult
|
||
|
# PoseLandmarker = mediapipe.tasks.vision.PoseLandmarker
|
||
|
# PoseLandmarkerOptions = mediapipe.tasks.vision.PoseLandmarkerOptions
|
||
|
HandLandmarker = mediapipe.tasks.vision.HandLandmarker
|
||
|
HandLandmarkerOptions = mediapipe.tasks.vision.HandLandmarkerOptions
|
||
|
HandLandmarkerResult = mediapipe.tasks.vision.HandLandmarkerResult
|
||
|
RunningMode = mediapipe.tasks.vision.RunningMode
|
||
|
|
||
|
# Indices of hand landmarks.
|
||
|
WRIST = 0
|
||
|
THUMB_CMC = 1
|
||
|
THUMB_MCP = 2
|
||
|
THUMB_IP = 3
|
||
|
THUMB_TIP = 4
|
||
|
INDEX_FINGER_MCP = 5
|
||
|
INDEX_FINGER_PIP = 6
|
||
|
INDEX_FINGER_DIP = 7
|
||
|
INDEX_FINGER_TIP = 8
|
||
|
MIDDLE_FINGER_MCP = 9
|
||
|
MIDDLE_FINGER_PIP = 10
|
||
|
MIDDLE_FINGER_DIP = 12
|
||
|
MIDDLE_FINGER_TIP = 13
|
||
|
RING_FINGER_MCP = 14
|
||
|
RING_FINGER_PIP = 15
|
||
|
RING_FINGER_DIP = 16
|
||
|
RING_FINGER_TIP = 17
|
||
|
PINKY_MCP = 18
|
||
|
PINKY_PIP = 19
|
||
|
PINKY_DIP = 20
|
||
|
PINKY_TIP = 21
|
||
|
|
||
|
DEFAULT_TRACKING_DATA = {
|
||
|
"face" : {
|
||
|
"confidence" : 0.0, # Currently either 0.0 or 1.0.
|
||
|
"transform" : [ [ 1.0, 0.0, 0.0, 0.0 ],
|
||
|
[ 0.0, 1.0, 0.0, 0.0 ],
|
||
|
[ 0.0, 0.0, 1.0, 0.0 ],
|
||
|
[ 0.0, 0.0, 0.0, 1.0 ], ],
|
||
|
"blendshapes" : {
|
||
|
"_neutral" : 0.0,
|
||
|
"browDownLeft" : 0.0,
|
||
|
"browDownRight" : 0.0,
|
||
|
"browInnerUp" : 0.0,
|
||
|
"browOuterUpLeft" : 0.0,
|
||
|
"browOuterUpRight" : 0.0,
|
||
|
"cheekPuff" : 0.0,
|
||
|
"cheekSquintLeft" : 0.0,
|
||
|
"cheekSquintRight" : 0.0,
|
||
|
"eyeBlinkLeft" : 0.0,
|
||
|
"eyeBlinkRight" : 0.0,
|
||
|
"eyeLookDownLeft" : 0.0,
|
||
|
"eyeLookDownRight" : 0.0,
|
||
|
"eyeLookInLeft" : 0.0,
|
||
|
"eyeLookInRight" : 0.0,
|
||
|
"eyeLookOutLeft" : 0.0,
|
||
|
"eyeLookOutRight" : 0.0,
|
||
|
"eyeLookUpLeft" : 0.0,
|
||
|
"eyeLookUpRight" : 0.0,
|
||
|
"eyeSquintLeft" : 0.0,
|
||
|
"eyeSquintRight" : 0.0,
|
||
|
"eyeWideLeft" : 0.0,
|
||
|
"eyeWideRight" : 0.0,
|
||
|
"jawForward" : 0.0,
|
||
|
"jawLeft" : 0.0,
|
||
|
"jawOpen" : 0.0,
|
||
|
"jawRight" : 0.0,
|
||
|
"mouthClose" : 0.0,
|
||
|
"mouthDimpleLeft" : 0.0,
|
||
|
"mouthDimpleRight" : 0.0,
|
||
|
"mouthFrownLeft" : 0.0,
|
||
|
"mouthFrownRight" : 0.0,
|
||
|
"mouthFunnel" : 0.0,
|
||
|
"mouthLeft" : 0.0,
|
||
|
"mouthLowerDownLeft" : 0.0,
|
||
|
"mouthLowerDownRight" : 0.0,
|
||
|
"mouthPressLeft" : 0.0,
|
||
|
"mouthPressRight" : 0.0,
|
||
|
"mouthPucker" : 0.0,
|
||
|
"mouthRight" : 0.0,
|
||
|
"mouthRollLower" : 0.0,
|
||
|
"mouthRollUpper" : 0.0,
|
||
|
"mouthShrugLower" : 0.0,
|
||
|
"mouthShrugUpper" : 0.0,
|
||
|
"mouthSmileLeft" : 0.0,
|
||
|
"mouthSmileRight" : 0.0,
|
||
|
"mouthStretchLeft" : 0.0,
|
||
|
"mouthStretchRight" : 0.0,
|
||
|
"mouthUpperUpLeft" : 0.0,
|
||
|
"mouthUpperUpRight" : 0.0,
|
||
|
"noseSneerLeft" : 0.0,
|
||
|
"noseSneerRight" : 0.0,
|
||
|
},
|
||
|
},
|
||
|
"hands" : {
|
||
|
"left" : {
|
||
|
"confidence" : 0.0,
|
||
|
"image_landmarks" : [ [ 0.0, 0.0, 0.0 ] ] * 21,
|
||
|
"world_landmarks" : [ [ 0.0, 0.0, 0.0 ] ] * 21,
|
||
|
},
|
||
|
"right" : {
|
||
|
"confidence" : 0.0,
|
||
|
"image_landmarks" : [ [ 0.0, 0.0, 0.0 ] ] * 21,
|
||
|
"world_landmarks" : [ [ 0.0, 0.0, 0.0 ] ] * 21,
|
||
|
},
|
||
|
},
|
||
|
}
|
||
|
|
||
|
class MediaPipeTracker:
|
||
|
|
||
|
def __init__(self):
|
||
|
self.the_big_ugly_mutex = threading.Lock()
|
||
|
self._tracker_worker_thread = None
|
||
|
|
||
|
# We need these to avoid deadlocks. If we're queueing frames
|
||
|
# faster than they can process, we'll hit a deadlock in
|
||
|
# MediaPipe.
|
||
|
self.frames_queued_face = 0
|
||
|
self.frames_queued_hands = 0
|
||
|
self.frames_queued_mutex = threading.Lock()
|
||
|
self.should_quit_threads = False
|
||
|
|
||
|
# Open the socket immediately so we can start sending error
|
||
|
# and status stuff to the hosting application.
|
||
|
self._udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
|
self.udp_port_number = 7098
|
||
|
|
||
|
# FIXME: Make this editable.
|
||
|
self.minimum_frame_time = 0.016
|
||
|
|
||
|
self.video_device_index = -1
|
||
|
self.video_device_capture = None
|
||
|
|
||
|
self.landmarker = None
|
||
|
# self.landmarker_pose = None
|
||
|
self.landmarker_hands = None
|
||
|
|
||
|
# These are for more deadlock avoidance, so we can keep track
|
||
|
# of how behind the hand tracker is.
|
||
|
self._last_hand_result_timestamp = (time.time() * 1000)
|
||
|
self._last_hand_detect_timestamp = (time.time() * 1000)
|
||
|
|
||
|
self.output_data = copy.deepcopy(DEFAULT_TRACKING_DATA)
|
||
|
|
||
|
def _close_video_device(self):
|
||
|
with self.the_big_ugly_mutex:
|
||
|
self.video_device_capture = None
|
||
|
|
||
|
def _open_video_device(self):
|
||
|
with self.the_big_ugly_mutex:
|
||
|
|
||
|
if self.video_device_index == -1:
|
||
|
self.video_device_capture = None
|
||
|
return
|
||
|
|
||
|
# Check to make sure we don't already have the device open.
|
||
|
if self.video_device_capture != None:
|
||
|
return
|
||
|
|
||
|
# Try opening it!
|
||
|
self._write_log("Opening a video device!")
|
||
|
|
||
|
self.video_device_capture = cv2.VideoCapture(self.video_device_index)
|
||
|
|
||
|
# Enforce low-res capture for performance reasons.
|
||
|
try:
|
||
|
self.video_device_capture.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
|
||
|
self.video_device_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
|
||
|
except Exception as e:
|
||
|
# Failed? Whatever. Just use the resolution it's stuck with.
|
||
|
pass
|
||
|
|
||
|
if self.video_device_capture.isOpened():
|
||
|
self._write_log("Video device acquired")
|
||
|
else:
|
||
|
self.video_device_capture = None
|
||
|
self._write_log("Failed to open video device: %s" % str(self.video_device_index))
|
||
|
|
||
|
def _init_mediapipe(self):
|
||
|
asset_path = os.path.abspath(os.path.dirname(__file__))
|
||
|
|
||
|
face_landmarker_path = os.path.join(asset_path, "face_landmarker.task")
|
||
|
# FIXME: Last minute breakages.
|
||
|
# pose_landmarker_path = os.path.join(asset_path, "pose_landmarker.task")
|
||
|
hand_landmarker_path = os.path.join(asset_path, "hand_landmarker.task")
|
||
|
|
||
|
options = FaceLandmarkerOptions(
|
||
|
base_options = BaseOptions(model_asset_path = face_landmarker_path),
|
||
|
running_mode = RunningMode.LIVE_STREAM,
|
||
|
output_face_blendshapes = True,
|
||
|
output_facial_transformation_matrixes = True,
|
||
|
result_callback = self._handle_result_face)
|
||
|
|
||
|
# FIXME: Last minute breakages.
|
||
|
# options_pose = PoseLandmarkerOptions(
|
||
|
# base_options = BaseOptions(model_asset_path = pose_landmarker_path),
|
||
|
# running_mode = RunningMode.LIVE_STREAM,
|
||
|
# output_segmentation_masks = False,
|
||
|
# result_callback = self._handle_result_pose)
|
||
|
|
||
|
options_hands = HandLandmarkerOptions(
|
||
|
base_options = BaseOptions(model_asset_path = hand_landmarker_path),
|
||
|
running_mode = RunningMode.LIVE_STREAM,
|
||
|
num_hands = 2,
|
||
|
|
||
|
# FIXME: Make these adjustable.
|
||
|
# Were working in the 4.1 version.
|
||
|
min_hand_detection_confidence = 0.75,
|
||
|
min_tracking_confidence = 0.75,
|
||
|
min_hand_presence_confidence = 0.9,
|
||
|
|
||
|
result_callback = self._handle_result_hands)
|
||
|
|
||
|
self._shutdown_mediapipe()
|
||
|
|
||
|
self._write_log("Init face landmarker...")
|
||
|
self.landmarker = FaceLandmarker.create_from_options(options)
|
||
|
|
||
|
# self._write_log("Init pose landmarker...")
|
||
|
# self.landmarker_pose = PoseLandmarker.create_from_options(options_pose)
|
||
|
|
||
|
self._write_log("Init hand landmarker...")
|
||
|
self.landmarker_hands = HandLandmarker.create_from_options(options_hands)
|
||
|
|
||
|
self._write_log("Init done")
|
||
|
|
||
|
def _write_log(self, *args):
|
||
|
try:
|
||
|
print(*args)
|
||
|
except Exception as e:
|
||
|
# Concerning...
|
||
|
pass
|
||
|
try:
|
||
|
self._send_status_packet(" ".join(str(s) for s in args))
|
||
|
except Exception as e:
|
||
|
pass
|
||
|
|
||
|
def _send_status_packet(self, status_str):
|
||
|
output_data = { "status" : status_str }
|
||
|
output_data_json = json.dumps(output_data, indent=4).encode("utf-8")
|
||
|
self._udp_socket.sendto(output_data_json, ("127.0.0.1", self.udp_port_number))
|
||
|
|
||
|
# Create a face landmarker instance with the live stream mode:
|
||
|
def _handle_result_face(
|
||
|
self,
|
||
|
result: FaceLandmarkerResult,
|
||
|
output_image: mediapipe.Image,
|
||
|
timestamp_ms: int,
|
||
|
):
|
||
|
with self.frames_queued_mutex:
|
||
|
self.frames_queued_face -= 1
|
||
|
|
||
|
face = self.output_data["face"]
|
||
|
face["confidence"] = 0.0
|
||
|
|
||
|
if len(result.facial_transformation_matrixes) > 0:
|
||
|
face["confidence"] = 1.0
|
||
|
face["transform"] = result.facial_transformation_matrixes[0].tolist()
|
||
|
|
||
|
if len(result.face_blendshapes) > 0:
|
||
|
face["confidence"] = 1.0
|
||
|
for shape in result.face_blendshapes[0]:
|
||
|
face["blendshapes"][shape.category_name] = shape.score
|
||
|
|
||
|
# FIXME: If we ever come back to it, finish this.
|
||
|
def _handle_result_pose(
|
||
|
self,
|
||
|
x,
|
||
|
output_image: mediapipe.Image,
|
||
|
timestamp_ms: int
|
||
|
):
|
||
|
for y in x.pose_world_landmarks:
|
||
|
pass
|
||
|
|
||
|
def _handle_result_hands(
|
||
|
self,
|
||
|
result: HandLandmarkerResult,
|
||
|
output_image: mediapipe.Image,
|
||
|
timestamp_ms: int,
|
||
|
):
|
||
|
with self.frames_queued_mutex:
|
||
|
self.frames_queued_hands -= 1
|
||
|
|
||
|
self._last_hand_result_timestamp = timestamp_ms
|
||
|
|
||
|
self.output_data["hands"]["left"]["confidence"] = 0.0
|
||
|
self.output_data["hands"]["right"]["confidence"] = 0.0
|
||
|
|
||
|
# TODO: Get actual dimensions of the camera?
|
||
|
frame_height, frame_width = (640, 480)
|
||
|
focal_length = frame_width * 0.75
|
||
|
center = (frame_width / 2, frame_height / 2)
|
||
|
camera_matrix = numpy.array([
|
||
|
[ focal_length, 0, center[0] ],
|
||
|
[ 0, focal_length, center[1] ],
|
||
|
[ 0, 0, 1 ]
|
||
|
], dtype = "double")
|
||
|
distortion = numpy.zeros((4, 1))
|
||
|
|
||
|
for index in range(len(result.hand_landmarks)):
|
||
|
handedness = result.handedness[index][0]
|
||
|
image_landmarks = result.hand_landmarks[index]
|
||
|
world_landmarks = result.hand_world_landmarks[index]
|
||
|
|
||
|
side = handedness.category_name.lower()
|
||
|
hand = self.output_data["hands"][side]
|
||
|
|
||
|
hand["confidence"] = handedness.score
|
||
|
for [i, image_landmark] in enumerate(image_landmarks):
|
||
|
world_landmark = world_landmarks[i]
|
||
|
hand["image_landmarks"][i] = [ image_landmark.x, image_landmark.y, image_landmark.z ]
|
||
|
hand["world_landmarks"][i] = [ world_landmark.x, world_landmark.y, world_landmark.z ]
|
||
|
|
||
|
def _tracker_worker_thread_func(self):
|
||
|
try:
|
||
|
|
||
|
# Deadlock-avoidance.
|
||
|
|
||
|
self._write_log("locking mutex before init mediapipe")
|
||
|
with self.the_big_ugly_mutex:
|
||
|
self._init_mediapipe()
|
||
|
self._write_log("Initializing MediaPipe")
|
||
|
self.output_data = copy.deepcopy(DEFAULT_TRACKING_DATA)
|
||
|
|
||
|
input_image = None
|
||
|
success = True
|
||
|
start_time = time.time()
|
||
|
frame_count = 0
|
||
|
|
||
|
# We'll send this when we're panicking from too many frames queued, as
|
||
|
# a last-ditch attempt to un-clog the queue before we get a deadlock
|
||
|
# thanks to the MediaPipe bug.
|
||
|
blank_image_cv2 = numpy.zeros((1,1,3), dtype=numpy.uint8)
|
||
|
blank_image_mp = mediapipe.Image(mediapipe.ImageFormat.SRGB, data=blank_image_cv2)
|
||
|
|
||
|
# Main capturing loop.
|
||
|
last_frame_time = 0
|
||
|
while not self.should_quit_threads:
|
||
|
|
||
|
# Wait for the minimum frame time.
|
||
|
time_to_sleep = self.minimum_frame_time - (time.time() - last_frame_time)
|
||
|
if time_to_sleep > 0.0:
|
||
|
time.sleep(time_to_sleep)
|
||
|
|
||
|
# If the video device got disconnected, reconnect it.
|
||
|
self._open_video_device()
|
||
|
|
||
|
with self.the_big_ugly_mutex:
|
||
|
|
||
|
last_frame_time = time.time()
|
||
|
last_timestamp_used = int(time.time() * 1000)
|
||
|
|
||
|
# Capture a frame.
|
||
|
if self.video_device_capture:
|
||
|
success, image = self.video_device_capture.read()
|
||
|
else:
|
||
|
# No camera connected at the moment. Just feed in
|
||
|
# blank images.
|
||
|
success = True
|
||
|
image = blank_image_cv2.copy()
|
||
|
|
||
|
if success:
|
||
|
|
||
|
# Convert image to MediaPipe.
|
||
|
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
||
|
# FIXME: Find out why we do this. I think it was
|
||
|
# mentioned in the MediaPipe tutorial.
|
||
|
image.flags.writeable = False
|
||
|
mp_image = mediapipe.Image(
|
||
|
image_format=mediapipe.ImageFormat.SRGB,
|
||
|
data=image)
|
||
|
|
||
|
# Generate a timestamp to feed into the MediaPipe
|
||
|
# system. If we're still somehow inside the same
|
||
|
# millisecond as the last processed image, then skip
|
||
|
# this frame.
|
||
|
this_time = int(time.time() * 1000)
|
||
|
if this_time <= last_timestamp_used:
|
||
|
continue
|
||
|
|
||
|
# Check to see if we have too many face tracking
|
||
|
# frames queued.
|
||
|
need_reset = False
|
||
|
with self.frames_queued_mutex:
|
||
|
if self.frames_queued_face > 5:
|
||
|
need_reset = True
|
||
|
else:
|
||
|
self.frames_queued_face += 1
|
||
|
|
||
|
# Reset if we have too face frames queued. Avoid a
|
||
|
# deadlock.
|
||
|
if need_reset:
|
||
|
# Deadlock-avoidance.
|
||
|
self.landmarker._runner.restart()
|
||
|
self.frames_queued_face = 0
|
||
|
else:
|
||
|
self.landmarker.detect_async(mp_image, this_time)
|
||
|
|
||
|
# Hands
|
||
|
|
||
|
# If the last result we got back was too much time
|
||
|
# since the last one we queued up, then wait until
|
||
|
# some amount of time (which we guess in the most
|
||
|
# convoluted way possible) has passed.
|
||
|
#
|
||
|
# FIXME: Make this less stupid. Make it make
|
||
|
# sense. Then apply it to the face tracking.
|
||
|
hand_landmarker_time_skew = self._last_hand_detect_timestamp - self._last_hand_result_timestamp
|
||
|
if hand_landmarker_time_skew > 50: # FIXME: Make configurable (milliseconds)
|
||
|
self._last_hand_result_timestamp += this_time - self._last_hand_detect_timestamp
|
||
|
else:
|
||
|
# Check to see if we have too many hand tracking
|
||
|
# frames queued.
|
||
|
need_reset = False
|
||
|
with self.frames_queued_mutex:
|
||
|
if self.frames_queued_face > 5:
|
||
|
need_reset = True
|
||
|
else:
|
||
|
self.frames_queued_hands += 1
|
||
|
|
||
|
# If we do have too many frames queued, just reset
|
||
|
# the tracker to avoid a deadlock.
|
||
|
if need_reset:
|
||
|
self.landmarker_hands._runner.restart()
|
||
|
self.frames_queued_hands = 0
|
||
|
else:
|
||
|
self.landmarker_hands.detect_async(mp_image, this_time)
|
||
|
self._last_hand_detect_timestamp = this_time
|
||
|
|
||
|
# Track the last timestamp because we have to keep
|
||
|
# these monotonically increasing and we can't send
|
||
|
# the same timestamp twice.
|
||
|
last_timestamp_used = this_time
|
||
|
|
||
|
output_data_json = json.dumps(self.output_data, indent=4).encode("utf-8")
|
||
|
|
||
|
with self.frames_queued_mutex:
|
||
|
status_packet_str = "Tracking data sending. (Queue: %2d hand, %2d face)" % (self.frames_queued_hands, self.frames_queued_face)
|
||
|
self._write_log(status_packet_str)
|
||
|
|
||
|
# Output the packet.
|
||
|
self._udp_socket.sendto(output_data_json, ("127.0.0.1", self.udp_port_number))
|
||
|
|
||
|
self._write_log("Quitting")
|
||
|
|
||
|
except Exception as e:
|
||
|
|
||
|
exception_string_generator = traceback.TracebackException.from_exception(e)
|
||
|
exception_string = "".join(exception_string_generator.format())
|
||
|
self._write_log(exception_string)
|
||
|
|
||
|
|
||
|
def start_tracker(self):
|
||
|
if self._tracker_worker_thread:
|
||
|
stop_tracker()
|
||
|
|
||
|
assert(not self._tracker_worker_thread)
|
||
|
self._write_log("Starting worker thread.")
|
||
|
self._tracker_worker_thread = threading.Thread(
|
||
|
target=self._tracker_worker_thread_func,
|
||
|
daemon=True)
|
||
|
self._tracker_worker_thread.start()
|
||
|
self._write_log("Starting worker thread done.")
|
||
|
|
||
|
def stop_tracker(self):
|
||
|
assert(self._tracker_worker_thread)
|
||
|
self.should_quit_threads = True
|
||
|
self._write_log("Waiting for worker thread to join.")
|
||
|
self._tracker_worker_thread.join()
|
||
|
self._write_log("Worker thread joined.")
|
||
|
self._tracker_worker_thread = None
|
||
|
self.should_quit_threads = False
|
||
|
|
||
|
|
||
|
# Set to -1 to just release all devices.
|
||
|
def set_video_device_number(self, new_number):
|
||
|
if self.video_device_index != new_number:
|
||
|
with self.the_big_ugly_mutex:
|
||
|
self.video_device_index = new_number
|
||
|
self._close_video_device()
|
||
|
self._open_video_device()
|
||
|
|
||
|
def set_udp_port_number(self, new_number):
|
||
|
with self.the_big_ugly_mutex:
|
||
|
self.udp_port_number = new_number
|
||
|
|
||
|
def _shutdown_mediapipe(self):
|
||
|
if self.landmarker: self.landmarker.close()
|
||
|
# if self.landmarker_pose: self.landmarker_pose.close()
|
||
|
if self.landmarker_hands: self.landmarker_hands.close()
|
||
|
|
||
|
self.landmarker = None
|
||
|
# self.landmarker_pose = None
|
||
|
self.landmarker_hands = None
|
||
|
|
||
|
# Grumblegrumblegrumble...
|
||
|
gc.collect()
|
||
|
|
||
|
def __del__(self):
|
||
|
with self.the_big_ugly_mutex:
|
||
|
self._close_video_device()
|
||
|
self._shutdown_mediapipe()
|
||
|
|
||
|
|
||
|
|
||
|
# ----------------------------------------------------------------------
|
||
|
mediapipe_controller = MediaPipeTracker()
|
||
|
|
||
|
# ----------------------------------------------------------------------
|
||
|
# External interface (called from Godot)
|
||
|
|
||
|
def start_tracker():
|
||
|
global mediapipe_controller
|
||
|
mediapipe_controller.start_tracker()
|
||
|
|
||
|
def stop_tracker():
|
||
|
global mediapipe_controller
|
||
|
mediapipe_controller.stop_tracker()
|
||
|
|
||
|
# Set to -1 to just release all devices.
|
||
|
def set_video_device_number(new_number):
|
||
|
global mediapipe_controller
|
||
|
mediapipe_controller.set_video_device_number(new_number)
|
||
|
|
||
|
def set_udp_port_number(new_number):
|
||
|
global mediapipe_controller
|
||
|
mediapipe_controller.set_udp_port_number(new_number)
|
||
|
|
||
|
def enumerate_camera_devices():
|
||
|
from cv2_enumerate_cameras import enumerate_cameras
|
||
|
|
||
|
capture_api_preference=cv2.CAP_ANY
|
||
|
# Having issues with GSTREAMER sources, so let's just use V4L only.
|
||
|
if sys.platform == "linux": capture_api_preference = cv2.CAP_V4L2
|
||
|
|
||
|
# On Linux, we sometimes see stuff showing up as just "video#", so
|
||
|
# let's at least try to correlate paths and IDs from
|
||
|
# /dev/v4l/by-id .
|
||
|
path_to_name_mappings = {}
|
||
|
if sys.platform == "linux":
|
||
|
try:
|
||
|
device_id_list = os.listdir("/dev/v4l/by-id")
|
||
|
for device_id in device_id_list:
|
||
|
full_link_path = os.path.join("/dev/v4l/by-id", device_id)
|
||
|
actual_dev_file = os.path.abspath(os.path.join("/dev/v4l/by-id", os.readlink(full_link_path)))
|
||
|
path_to_name_mappings[actual_dev_file] = device_id
|
||
|
except IOError:
|
||
|
pass
|
||
|
|
||
|
all_camera_data = []
|
||
|
for camera_info in enumerate_cameras(apiPreference=capture_api_preference):
|
||
|
camera_name = camera_info.name
|
||
|
|
||
|
if re.match("video[0-9]+", camera_info.name):
|
||
|
if camera_info.path in path_to_name_mappings:
|
||
|
camera_name = path_to_name_mappings[camera_info.path]
|
||
|
|
||
|
# Figure out the backend.
|
||
|
backend_index = camera_info.backend
|
||
|
if sys.platform == "linux":
|
||
|
# For some reason, in Linux the backend is stored in the
|
||
|
# index and not the backend field.
|
||
|
backend_index = camera_info.index - (camera_info.index % 100)
|
||
|
backend_name = cv2.videoio_registry.getBackendName(backend_index)
|
||
|
|
||
|
camera_data = {
|
||
|
"name" : camera_name,
|
||
|
"backend" : backend_name,
|
||
|
"path" : camera_info.path,
|
||
|
"index" : camera_info.index,
|
||
|
}
|
||
|
|
||
|
all_camera_data.append(camera_data)
|
||
|
|
||
|
return all_camera_data
|