diff --git a/minDistance.py b/minDistance.py index 0bee9b6..82c92a3 100644 --- a/minDistance.py +++ b/minDistance.py @@ -2,50 +2,97 @@ import math from PIL import Image import numpy as np from scipy.optimize import linear_sum_assignment +import json +import os.path +import hashlib + + +# location to store array cache +CACHE_FILE_PATH = "point_array_cache.json" class Point2D: x = 0 y = 0 - - def __init__(self, x, y): + color: tuple[int, int, int] = (0, 0, 0) + + def __init__(self, x, y, color: tuple[int, int, int] = (0, 0, 0)): self.x = x self.y = y - + self.color = color + def round(self): self.x = round(self.x) self.y = round(self.y) return self - + def distance(self, other): dx = self.x - other.x dy = self.y - other.y - return math.sqrt(dx**2 + dy**2) - + return math.sqrt(dx ** 2 + dy ** 2) + def interpolate(self, other, percentage): new_x = self.x + (other.x - self.x) * percentage new_y = self.y + (other.y - self.y) * percentage - return Point2D(new_x, new_y) - + new_color = tuple(int((1 - percentage) * self.color[i] + percentage * other.color[i]) for i in range(3)) + return Point2D(new_x, new_y, new_color) + def __eq__(self, other): return (self.x, self.y) == (other.x, other.y) +def mirror_points(points: list[Point2D]) -> list[Point2D]: + mirrored_points = [] + for point in points: + mirrored_x = 128 - point.x # Calculate the mirrored x-coordinate + mirrored_point = Point2D(mirrored_x, point.y) + mirrored_points.append(mirrored_point) + return mirrored_points + + + +def get_image_hash(image): + image_hash = hashlib.sha1(image.tobytes()).hexdigest() + return image_hash + + +def load_cached_point_arrays(): + cached_point_arrays = {} + + if os.path.isfile(CACHE_FILE_PATH): + with open(CACHE_FILE_PATH, "r") as file: + cached_point_arrays = json.load(file) + + return cached_point_arrays + + +def save_cached_point_arrays(cached_point_arrays): + with open(CACHE_FILE_PATH, "w") as file: + json.dump(cached_point_arrays, file) + + def generate_point_array_from_image(image): image = image.convert("RGB") # Convert image to RGB color mode + image_hash = get_image_hash(image) + cached_point_arrays = load_cached_point_arrays() + + if image_hash in cached_point_arrays: + return [Point2D(point["x"], point["y"], tuple(point["color"])) for point in cached_point_arrays[image_hash]] width, height = image.size - point_array = [] + pixel_array = [] - # Iterate over the pixels and generate Point2D instances for y in range(height): for x in range(width): pixel = image.getpixel((x, y)) - if pixel != (0, 0, 0): # Assuming white pixels - point = Point2D(x, y) - point_array.append(point) + if pixel != (0, 0, 0): # any non-white pixels + point = {"x": x, "y": y, "color": pixel} + pixel_array.append(point) - return point_array + cached_point_arrays[image_hash] = pixel_array + save_cached_point_arrays(cached_point_arrays) + + return [Point2D(point["x"], point["y"], tuple(point["color"])) for point in pixel_array] def generate_image_from_point_array(points, width, height): @@ -114,8 +161,8 @@ def interpolate_point_pairs(pairs, percentage): return interpolated_points -Image1 = Image.open("CiscoTheProot/faces/prootface3.png") -Image2 = Image.open("CiscoTheProot/faces/prootface4.png") +Image1 = Image.open("faces/prootface3.png") +Image2 = Image.open("faces/prootface4.png") pixelArray1 = generate_point_array_from_image(Image1) pixelArray2 = generate_point_array_from_image(Image2) diff --git a/rpi/Point2D.py b/rpi/Point2D.py new file mode 100644 index 0000000..61e2d16 --- /dev/null +++ b/rpi/Point2D.py @@ -0,0 +1,144 @@ +import hashlib +import json +import os.path +import math +from scipy.optimize import linear_sum_assignment +import numpy as np +from PIL import Image + + +# location to store array cache +CACHE_FILE_PATH = "point_array_cache.json" + + +class Point2D: + x = 0 + y = 0 + color: tuple[int, int, int] = (0, 0, 0) + + def __init__(self, x, y, color: tuple[int, int, int] = (0, 0, 0)): + self.x = x + self.y = y + self.color = color + + def round(self): + self.x = round(self.x) + self.y = round(self.y) + return self + + def distance(self, other): + dx = self.x - other.x + dy = self.y - other.y + return math.sqrt(dx ** 2 + dy ** 2) + + def interpolate(self, other, percentage): + new_x = self.x + (other.x - self.x) * percentage + new_y = self.y + (other.y - self.y) * percentage + new_color = tuple(int((1 - percentage) * self.color[i] + percentage * other.color[i]) for i in range(3)) + return Point2D(new_x, new_y, new_color) + + def __eq__(self, other): + return (self.x, self.y) == (other.x, other.y) + + +def mirror_points(points: list[Point2D]) -> list[Point2D]: + mirrored_points = [] + for point in points: + mirrored_x = 128 - point.x # Calculate the mirrored x-coordinate + mirrored_point = Point2D(mirrored_x, point.y) + mirrored_points.append(mirrored_point) + return mirrored_points + + +def get_image_hash(image): + image_hash = hashlib.sha1(image.tobytes()).hexdigest() + return image_hash + + +def load_cached_point_arrays(): + cached_point_arrays = {} + + if os.path.isfile(CACHE_FILE_PATH): + with open(CACHE_FILE_PATH, "r") as file: + cached_point_arrays = json.load(file) + + return cached_point_arrays + + +def save_cached_point_arrays(cached_point_arrays): + with open(CACHE_FILE_PATH, "w") as file: + json.dump(cached_point_arrays, file) + + +def generate_point_array_from_image(image): + image = image.convert("RGB") # Convert image to RGB color mode + image_hash = get_image_hash(image) + cached_point_arrays = load_cached_point_arrays() + + if image_hash in cached_point_arrays: + return [Point2D(point["x"], point["y"], tuple(point["color"])) for point in cached_point_arrays[image_hash]] + + +def generate_image_from_point_array(points: list[Point2D], width: int, height: int) -> Image: + # Create a new blank image + image = Image.new("RGB", (width, height), "black") + + # Set the pixels corresponding to the points as white + pixels = image.load() + for point in points: + point = point.round() + x = point.x + y = point.y + pixels[x, y] = (255, 255, 255) + + return image + + +def interpolate_point_pairs(pairs: list[tuple[Point2D, Point2D]], percentage: float) -> list[Point2D]: + interpolated_points:list[Point2D] = [] + for pair in pairs: + point1, point2 = pair + interpolated_point = point1.interpolate(point2, percentage) + interpolated_points.append(interpolated_point) + return interpolated_points + +def pair_points(points1: list[Point2D], points2: list[Point2D]) -> list[tuple[Point2D, Point2D]]: + # Determine the size of the point arrays + size1 = len(points1) + size2 = len(points2) + + # Create a cost matrix based on the distances between points + cost_matrix = np.zeros((size1, size2)) + for i in range(size1): + for j in range(size2): + cost_matrix[i, j] = points1[i].distance(points2[j]) + + # Duplicate points in the smaller array to match the size of the larger array + if size1 > size2: + num_duplicates = size1 - size2 + duplicated_points = np.random.choice(points2, size=num_duplicates).tolist() + points2 += duplicated_points + elif size2 > size1: + num_duplicates = size2 - size1 + duplicated_points = np.random.choice(points1, size=num_duplicates).tolist() + points1 += duplicated_points + + # Update the size of the point arrays + size1 = len(points1) + size2 = len(points2) + + # Create a new cost matrix with the updated sizes + cost_matrix = np.zeros((size1, size2)) + for i in range(size1): + for j in range(size2): + cost_matrix[i, j] = points1[i].distance(points2[j]) + + # Solve the assignment problem using the Hungarian algorithm + row_ind, col_ind = linear_sum_assignment(cost_matrix) + + # Create pairs of points based on the optimal assignment + pairs = [] + for i, j in zip(row_ind, col_ind): + pairs.append((points1[i], points2[j])) + + return pairs \ No newline at end of file diff --git a/rpi/ProotState.py b/rpi/ProotState.py new file mode 100644 index 0000000..a18e5cf --- /dev/null +++ b/rpi/ProotState.py @@ -0,0 +1,105 @@ +import time +import random + +class ProotState: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance.current_blink_state = 0 + cls._instance.desired_blink_state = 0 + cls._instance.blinks_frames_ready = False + cls._instance.loading_screen = True + cls._instance.loading_time = 0 + cls._instance.frame_canvas_prootScreen_1 = False + cls._instance.frame_canvas_prootScreen_2 = False + cls._instance.frame_canvas_prootScreen_3 = False + return cls._instance + + def next_blink_frame_number(self) -> int: + if self.current_blink_state == self.desired_blink_state == 10: + self.desired_blink_state = 0 + return self.current_blink_state + + if self.current_blink_state == self.desired_blink_state == 0: + return self.current_blink_state + + if self.current_blink_state < self.desired_blink_state: + self.current_blink_state += 1 + else: + self.current_blink_state -= 1 + + return self.current_blink_state + + def blink(self, state = 10): + self.set_desired_blink_state(state) + + def set_desired_blink_state(self, state: int): + self.desired_blink_state = state + + def set_blinks_frames_ready(self, ready: bool): + self.blinks_frames_ready = ready + + def get_desired_blink_state(self) -> int: + return self.desired_blink_state + + def get_blinks_frames_ready(self) -> bool: + return self.blinks_frames_ready + + # This function animates the loading screen. It asumes that the function gets called frequently(every frame update) + def set_ProotScreen(self, matrix): + self.loading_time += 1 + self.loading_time = self.loading_time % 75 + if not self.frame_canvas_prootScreen_1: + self.frame_canvas_prootScreen_1 = matrix.CreateFrameCanvas() + image_proot_screen_1 = Image.open("faces/ProotScreen1.png").convert('RGB') + self.frame_canvas_prootScreen_1.SetImage(image_proot_screen_1, unsafe=False) + matrix.SwapOnVSync(self.frame_canvas_prootScreen_1) + + if not self.frame_canvas_prootScreen_2: + self.frame_canvas_prootScreen_2 = matrix.CreateFrameCanvas() + image_proot_screen_2 = Image.open("faces/ProotScreen2.png").convert('RGB') + self.frame_canvas_prootScreen_2.SetImage(image_proot_screen_2, unsafe=False) + matrix.SwapOnVSync(self.frame_canvas_prootScreen_2) + + if not self.frame_canvas_prootScreen_3: + self.frame_canvas_prootScreen_3 = matrix.CreateFrameCanvas() + image_proot_screen_3 = Image.open("faces/ProotScreen3.png").convert('RGB') + self.frame_canvas_prootScreen_3.SetImage(image_proot_screen_3, unsafe=False) + matrix.SwapOnVSync(self.frame_canvas_prootScreen_3) + + if self.loading_time < 25: + matrix.SwapOnVSync(self.frame_canvas_prootScreen_1) + elif self.loading_time < 50: + matrix.SwapOnVSync(self.frame_canvas_prootScreen_2) + else: + matrix.SwapOnVSync(self.frame_canvas_prootScreen_3) + + +def update_screen(): + global blinkFrameCanvases, matrix + + proot_state = ProotState() + + if proot_state.get_blinks_frames_ready(): + # TODO move blinking animation writing logic to the ProotState class + matrix.SwapOnVSync(blinkFrameCanvases[proot_state.next_blink_frame_number()]) + else: + proot_state.set_ProotScreen(matrix) + + +def interrupt_timer(): + while True: + update_screen() + time.sleep(0.01) + + +def random_blinks(): + while True: + time.sleep(random.randint(3, 5)) + + proot_state = ProotState() + + if proot_state.get_blinks_frames_ready(): + proot_state.blink() \ No newline at end of file diff --git a/rpi/antRender.py b/rpi/antRender.py index 4465776..4d17704 100644 --- a/rpi/antRender.py +++ b/rpi/antRender.py @@ -1,12 +1,11 @@ from rgbmatrix import RGBMatrix, RGBMatrixOptions +from Point2D import Point2D +from ProotState import ProotState + +from PIL import Image import paho.mqtt.client as mqtt import time -from PIL import Image -import numpy as np -import math -from scipy.optimize import linear_sum_assignment import threading -import random print("start configuring matrix") @@ -29,237 +28,12 @@ print("configuring matrix took: " + str(endT - startT) + " ms") blinkFrameCanvases = [] - -class ProotState: - _instance = None - - def __new__(cls): - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance.current_blink_state = 0 - cls._instance.desired_blink_state = 0 - cls._instance.blinks_frames_ready = False - cls._instance.loading_screen = True - cls._instance.loading_time = 0 - cls._instance.frame_canvas_prootScreen_1 = False - cls._instance.frame_canvas_prootScreen_2 = False - cls._instance.frame_canvas_prootScreen_3 = False - return cls._instance - - def next_blink_frame_number(self) -> int: - if self.current_blink_state == self.desired_blink_state == 10: - self.desired_blink_state = 0 - return self.current_blink_state - - if self.current_blink_state == self.desired_blink_state == 0: - return self.current_blink_state - - if self.current_blink_state < self.desired_blink_state: - self.current_blink_state += 1 - else: - self.current_blink_state -= 1 - - return self.current_blink_state - - def blink(self): - self.set_desired_blink_state(10) - - def set_desired_blink_state(self, state: int): - self.desired_blink_state = state - - def set_blinks_frames_ready(self, ready: bool): - self.blinks_frames_ready = ready - - def get_desired_blink_state(self) -> int: - return self.desired_blink_state - - def get_blinks_frames_ready(self) -> bool: - return self.blinks_frames_ready - - def set_ProotScreen(self, matrix): - self.loading_time += 1 - self.loading_time = self.loading_time % 75 - if not self.frame_canvas_prootScreen_1: - self.frame_canvas_prootScreen_1 = matrix.CreateFrameCanvas() - image_proot_screen_1 = Image.open("faces/ProotScreen1.png").convert('RGB') - self.frame_canvas_prootScreen_1.SetImage(image_proot_screen_1, unsafe=False) - matrix.SwapOnVSync(self.frame_canvas_prootScreen_1) - - if not self.frame_canvas_prootScreen_2: - self.frame_canvas_prootScreen_2 = matrix.CreateFrameCanvas() - image_proot_screen_2 = Image.open("faces/ProotScreen2.png").convert('RGB') - self.frame_canvas_prootScreen_2.SetImage(image_proot_screen_2, unsafe=False) - matrix.SwapOnVSync(self.frame_canvas_prootScreen_2) - if not self.frame_canvas_prootScreen_3: - self.frame_canvas_prootScreen_3 = matrix.CreateFrameCanvas() - image_proot_screen_3 = Image.open("faces/ProotScreen3.png").convert('RGB') - self.frame_canvas_prootScreen_3.SetImage(image_proot_screen_3, unsafe=False) - matrix.SwapOnVSync(self.frame_canvas_prootScreen_3) - - if self.loading_time < 25: - matrix.SwapOnVSync(self.frame_canvas_prootScreen_1) - elif self.loading_time < 50: - matrix.SwapOnVSync(self.frame_canvas_prootScreen_2) - else: - matrix.SwapOnVSync(self.frame_canvas_prootScreen_3) - - -class Point2D: - x = 0 - y = 0 - color:tuple[int, int, int] = (0, 0, 0) - - def __init__(self, x, y, color: tuple[int, int, int] = (0, 0, 0)): - self.x = x - self.y = y - self.color = color - - def round(self): - self.x = round(self.x) - self.y = round(self.y) - return self - - def distance(self, other): - dx = self.x - other.x - dy = self.y - other.y - return math.sqrt(dx**2 + dy**2) - - def interpolate(self, other, percentage): - new_x = self.x + (other.x - self.x) * percentage - new_y = self.y + (other.y - self.y) * percentage - return Point2D(new_x, new_y) - - def __eq__(self, other): - return (self.x, self.y) == (other.x, other.y) - - -def mirror_points(points: list[Point2D]) -> list[Point2D]: - mirrored_points = [] - for point in points: - mirrored_x = 128 - point.x # Calculate the mirrored x-coordinate - mirrored_point = Point2D(mirrored_x, point.y) - mirrored_points.append(mirrored_point) - return mirrored_points - - -def generate_point_array_from_image(image: Image) -> list[Point2D]: - image = image.convert("RGB") # Convert image to RGB color mode - - width, height = image.size - point_array = [] - - # Iterate over the pixels and generate Point2D instances - for y in range(height): - for x in range(width): - pixel = image.getpixel((x, y)) - if pixel != (0, 0, 0): # Assuming white pixels - point = Point2D(x, y) - point_array.append(point) - - return point_array - - -def generate_image_from_point_array(points: list[Point2D], width: int, height: int) -> Image: - # Create a new blank image - image = Image.new("RGB", (width, height), "black") - - # Set the pixels corresponding to the points as white - pixels = image.load() - for point in points: - point = point.round() - x = point.x - y = point.y - pixels[x, y] = (255, 255, 255) - - return image - - -def pair_points(points1: list[Point2D], points2: list[Point2D]) -> list[tuple[Point2D, Point2D]]: - # Determine the size of the point arrays - size1 = len(points1) - size2 = len(points2) - - # Create a cost matrix based on the distances between points - cost_matrix = np.zeros((size1, size2)) - for i in range(size1): - for j in range(size2): - cost_matrix[i, j] = points1[i].distance(points2[j]) - - # Duplicate points in the smaller array to match the size of the larger array - if size1 > size2: - num_duplicates = size1 - size2 - duplicated_points = np.random.choice(points2, size=num_duplicates).tolist() - points2 += duplicated_points - elif size2 > size1: - num_duplicates = size2 - size1 - duplicated_points = np.random.choice(points1, size=num_duplicates).tolist() - points1 += duplicated_points - - # Update the size of the point arrays - size1 = len(points1) - size2 = len(points2) - - # Create a new cost matrix with the updated sizes - cost_matrix = np.zeros((size1, size2)) - for i in range(size1): - for j in range(size2): - cost_matrix[i, j] = points1[i].distance(points2[j]) - - # Solve the assignment problem using the Hungarian algorithm - row_ind, col_ind = linear_sum_assignment(cost_matrix) - - # Create pairs of points based on the optimal assignment - pairs = [] - for i, j in zip(row_ind, col_ind): - pairs.append((points1[i], points2[j])) - - return pairs - - -def interpolate_point_pairs(pairs: list[tuple[Point2D, Point2D]], percentage: float) -> list[Point2D]: - interpolated_points:list[Point2D] = [] - for pair in pairs: - point1, point2 = pair - interpolated_point = point1.interpolate(point2, percentage) - interpolated_points.append(interpolated_point) - return interpolated_points - - -def update_screen(): - global blinkFrameCanvases, matrix - - proot_state = ProotState() - - if proot_state.get_blinks_frames_ready(): - # TODO move blinking animation writing logic to the ProotState class - matrix.SwapOnVSync(blinkFrameCanvases[proot_state.next_blink_frame_number()]) - else: - proot_state.set_ProotScreen(matrix) - - - -def interrupt_timer(): - while True: - update_screen() - time.sleep(0.01) - - -def random_blinks(): - while True: - time.sleep(random.randint(3, 5)) - - proot_state = ProotState() - - if proot_state.get_blinks_frames_ready(): - proot_state.blink() - - # Create and start screen update interrupts -screen_update_thread = threading.Thread(target=interrupt_timer) +screen_update_thread = threading.Thread(target=ProotState.interrupt_timer) screen_update_thread.start() # Create and start random blinks interrupts -screen_update_thread = threading.Thread(target=random_blinks) +screen_update_thread = threading.Thread(target=ProotState.random_blinks) screen_update_thread.start() @@ -283,10 +57,10 @@ startT = curr_time = round(time.time()*1000) # generate pixel arrays from each image # TODO ^ storing and loading lists of points will take away this step. (it will require a dedicated script to precompute these) -points_left_eye_open = generate_point_array_from_image(image_left_eye_open) -points_left_eye_closed = generate_point_array_from_image(image_left_eye_closed) -points_left_nose = generate_point_array_from_image(image_left_nose) -points_left_mouth = generate_point_array_from_image(image_left_mouth) +points_left_eye_open = Point2D.generate_point_array_from_image(image_left_eye_open) +points_left_eye_closed = Point2D.generate_point_array_from_image(image_left_eye_closed) +points_left_nose = Point2D.generate_point_array_from_image(image_left_nose) +points_left_mouth = Point2D.generate_point_array_from_image(image_left_mouth) endT = curr_time = round(time.time()*1000) print("generating pixel array took: " + str(endT - startT) + " ms") @@ -298,7 +72,7 @@ startT = curr_time = round(time.time()*1000) #calculate the point pairs between the open and closed left eye # TODO look into precomputing and storing these animations before runtime -left_eye_blink_pairs = pair_points(points_left_eye_open, points_left_eye_closed) +left_eye_blink_pairs = Point2D.pair_points(points_left_eye_open, points_left_eye_closed) endT = curr_time = round(time.time()*1000) print("pairing points for one eye took: " + str(endT - startT) + " ms") @@ -312,13 +86,13 @@ startT = curr_time = round(time.time()*1000) for alpha in range(0,11): offscreen_interpolated_canvas = matrix.CreateFrameCanvas() - left_eye = interpolate_point_pairs(left_eye_blink_pairs, alpha/10) - right_eye = mirror_points(left_eye) - nose = points_left_nose + mirror_points(points_left_nose) - mouth = points_left_mouth + mirror_points(points_left_mouth) + left_eye = Point2D.interpolate_point_pairs(left_eye_blink_pairs, alpha/10) + right_eye = Point2D.mirror_points(left_eye) + nose = points_left_nose + Point2D.mirror_points(points_left_nose) + mouth = points_left_mouth + Point2D.mirror_points(points_left_mouth) face = left_eye + right_eye + nose + mouth - interpolated_face_image = generate_image_from_point_array(face, 128, 32) + interpolated_face_image = Point2D.generate_image_from_point_array(face, 128, 32) offscreen_interpolated_canvas.SetImage(interpolated_face_image, unsafe=False) blinkFrameCanvases.append(offscreen_interpolated_canvas) diff --git a/rpi/blinkingTest.py b/rpi/blinkingTest.py index ef2878b..c5ba979 100644 --- a/rpi/blinkingTest.py +++ b/rpi/blinkingTest.py @@ -2,7 +2,6 @@ from rgbmatrix import RGBMatrix, RGBMatrixOptions import paho.mqtt.client as mqtt import time from PIL import Image -import numpy as np # Configuration for the matrix diff --git a/rpi/micTest.py b/rpi/micTest.py new file mode 100644 index 0000000..8f8fcf6 --- /dev/null +++ b/rpi/micTest.py @@ -0,0 +1,12 @@ +# Print out realtime audio volume as ascii bars + +import sounddevice as sd +import numpy as np + +def callback(indata: np.ndarray, outdata: np.ndarray, frames: int, time, status) -> None: + print(indata.shape) + volume_norm = np.linalg.norm(indata)*10 + print ("|" * int(volume_norm)) + +with sd.Stream(callback=callback): + sd.sleep(10000) \ No newline at end of file