from rgbmatrix import RGBMatrix, RGBMatrixOptions import paho.mqtt.client as mqtt import time from PIL import Image import numpy as np import math from scipy.optimize import linear_sum_assignment class ProotState: def __init__(self): self.current_blink_state = 0 self.desired_blink_state = 0 def startBlink(self): self.desired_blink_state = 10 def next_blink_state(self) -> int: if self.current_blink_state == self.desired_blink_state and self.current_blink_state == 10: self.desired_blink_state = 0 return 10 if self.current_blink_state < self.desired_blink_state: self.current_blink_state += 1 else: self.current_blink_state -= 1 return self.current_blink_state class Point2D: x = 0 y = 0 color:tuple[int, int, int] = (0, 0, 0) def __init__(self, x, y, color: tuple[int, int, int] = (0, 0, 0)): self.x = x self.y = y self.color = color def round(self): self.x = round(self.x) self.y = round(self.y) return self def distance(self, other): dx = self.x - other.x dy = self.y - other.y return math.sqrt(dx**2 + dy**2) def interpolate(self, other, percentage): new_x = self.x + (other.x - self.x) * percentage new_y = self.y + (other.y - self.y) * percentage return Point2D(new_x, new_y) def __eq__(self, other): return (self.x, self.y) == (other.x, other.y) def set_ProotScreen(matrix): prootScreen_frame_canvas = matrix.CreateFrameCanvas() image_prootScreen = Image.open("faces/ProotScreen.png").convert('RGB') prootScreen_frame_canvas.SetImage(image_prootScreen, unsafe=False) matrix.SwapOnVSync(prootScreen_frame_canvas) def mirror_points(points: list[Point2D]) -> list[Point2D]: mirrored_points = [] for point in points: mirrored_x = 128 - point.x # Calculate the mirrored x-coordinate mirrored_point = Point2D(mirrored_x, point.y) mirrored_points.append(mirrored_point) return mirrored_points def generate_point_array_from_image(image: Image) -> list[Point2D]: image = image.convert("RGB") # Convert image to RGB color mode width, height = image.size point_array = [] # Iterate over the pixels and generate Point2D instances for y in range(height): for x in range(width): pixel = image.getpixel((x, y)) if pixel != (0, 0, 0): # Assuming white pixels point = Point2D(x, y) point_array.append(point) return point_array def generate_image_from_point_array(points: list[Point2D], width: int, height: int) -> Image: # Create a new blank image image = Image.new("RGB", (width, height), "black") # Set the pixels corresponding to the points as white pixels = image.load() for point in points: point = point.round() x = point.x y = point.y pixels[x, y] = (255, 255, 255) return image def pair_points(points1: list[Point2D], points2: list[Point2D]) -> list[tuple[Point2D, Point2D]]: # Determine the size of the point arrays size1 = len(points1) size2 = len(points2) # Create a cost matrix based on the distances between points cost_matrix = np.zeros((size1, size2)) for i in range(size1): for j in range(size2): cost_matrix[i, j] = points1[i].distance(points2[j]) # Duplicate points in the smaller array to match the size of the larger array if size1 > size2: num_duplicates = size1 - size2 duplicated_points = np.random.choice(points2, size=num_duplicates).tolist() points2 += duplicated_points elif size2 > size1: num_duplicates = size2 - size1 duplicated_points = np.random.choice(points1, size=num_duplicates).tolist() points1 += duplicated_points # Update the size of the point arrays size1 = len(points1) size2 = len(points2) # Create a new cost matrix with the updated sizes cost_matrix = np.zeros((size1, size2)) for i in range(size1): for j in range(size2): cost_matrix[i, j] = points1[i].distance(points2[j]) # Solve the assignment problem using the Hungarian algorithm row_ind, col_ind = linear_sum_assignment(cost_matrix) # Create pairs of points based on the optimal assignment pairs = [] for i, j in zip(row_ind, col_ind): pairs.append((points1[i], points2[j])) return pairs def interpolate_point_pairs(pairs: list[tuple[Point2D, Point2D]], percentage: float) -> list[Point2D]: interpolated_points:list[Point2D] = [] for pair in pairs: point1, point2 = pair interpolated_point = point1.interpolate(point2, percentage) interpolated_points.append(interpolated_point) return interpolated_points print("start configuring matrix") startT = curr_time = round(time.time()*1000) # Configuration for the matrix options = RGBMatrixOptions() options.rows = 32 options.cols = 64 options.chain_length = 2 options.parallel = 1 options.hardware_mapping = 'regular' # If you have an Adafruit HAT: 'adafruit-hat' matrix = RGBMatrix(options=options) endT = curr_time = round(time.time()*1000) print("configuring matrix took: " + str(endT - startT) + " ms") print("start setting ProotScreen") startT = curr_time = round(time.time()*1000) # setting ProotScreen, This is the loading splash screen set_ProotScreen(matrix) endT = curr_time = round(time.time()*1000) print("setting ProotScreen took: " + str(endT - startT) + " ms") print("start loading images") startT = curr_time = round(time.time()*1000) # Loading all images # TODO looking into storing and loading lists of points image_left_eye_open = Image.open("faces/eyeLeftOpen.png") image_left_eye_closed = Image.open("faces/eyeLeftClosed.png") image_left_nose = Image.open("faces/noseLeft.png") image_left_mouth = Image.open("faces/mouthLeft.png") endT = curr_time = round(time.time()*1000) print("loading images took: " + str(endT - startT) + " ms") print("start generating pixel array") startT = curr_time = round(time.time()*1000) # generate pixel arrays from each image # TODO ^ storing and loading lists of points will take away this step. (it will require a dedicated script to precompute these) points_left_eye_open = generate_point_array_from_image(image_left_eye_open) points_left_eye_closed = generate_point_array_from_image(image_left_eye_closed) points_left_nose = generate_point_array_from_image(image_left_nose) points_left_mouth = generate_point_array_from_image(image_left_mouth) endT = curr_time = round(time.time()*1000) print("generating pixel array took: " + str(endT - startT) + " ms") print("start pairing points for one eye") startT = curr_time = round(time.time()*1000) #calculate the point pairs between the open and closed left eye # TODO look into precomputing and storing these animations before runtime left_eye_blink_pairs = pair_points(points_left_eye_open, points_left_eye_closed) endT = curr_time = round(time.time()*1000) print("pairing points for one eye took: " + str(endT - startT) + " ms") DesiredBlinkState = 10 currentBlinkState = 0 blinkFrameCanvases = [] print("start populating matrices for each blink frame") startT = curr_time = round(time.time()*1000) # TODO look into the possibility of precomputing and more importantly storing the matrix objects for alpha in range(0,11): offscreen_interpolated_canvas = matrix.CreateFrameCanvas() left_eye = interpolate_point_pairs(left_eye_blink_pairs, alpha/10) right_eye = mirror_points(left_eye) nose = points_left_nose + mirror_points(points_left_nose) mouth = points_left_mouth + mirror_points(points_left_mouth) face = left_eye + right_eye + nose + mouth interpolated_face_image = generate_image_from_point_array(face, 128, 32) offscreen_interpolated_canvas.SetImage(interpolated_face_image, unsafe=False) blinkFrameCanvases.append(offscreen_interpolated_canvas) endT = curr_time = round(time.time()*1000) print("populating matrices for each blink frame took: " + str(endT - startT) + " ms") def update_screen(): # TODO move blinking animation logic to the ProotState class global DesiredBlinkState, currentBlinkState, blinkFrameCanvases, matrix # open eye again after blink if currentBlinkState == 10: DesiredBlinkState = 0 if currentBlinkState == DesiredBlinkState: next_canvas = blinkFrameCanvases[currentBlinkState] next_canvas = matrix.SwapOnVSync(next_canvas) return next_canvas = blinkFrameCanvases[currentBlinkState] if currentBlinkState < DesiredBlinkState: currentBlinkState += 1 else: currentBlinkState -= 1 next_canvas = matrix.SwapOnVSync(next_canvas) # functions called by the MQTT listener def on_connect(client, userdata, flags, response_code): print("Connected to MQTT broker with result code " + str(response_code)) client.subscribe("test") def on_message(client, userdata, message): print("Received message '" + str(message.payload) + "' on topic '" + message.topic + "' with QoS " + str(message.qos)) global DesiredBlinkState DesiredBlinkState = 10 # MQTT broker configuration broker_address = "10.1.13.173" # Replace with your MQTT broker's address broker_port = 1883 broker_keepalive = 60 client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect(broker_address, broker_port, broker_keepalive) client.loop_start() while True: # this sleep sets the time between finishing one screen update and the next starting # TODO replace this mechanism with an interupt to use the cpu time between frame updates. time.sleep(0.01) update_screen() # TODO create a splash screen to display super quick before the rest of the assets are loading