from rgbmatrix import RGBMatrix, RGBMatrixOptions import paho.mqtt.client as mqtt import time from PIL import Image import numpy as np import math from PIL import Image import numpy as np from scipy.optimize import linear_sum_assignment class ProotState: def __init__(self): self.current_blink_state = 0 self.desired_blink_state = 0 def startBlink(self): self.desired_blink_state = 10 def next_blink_state(self) -> int: if self.current_blink_state == self.desired_blink_state and self.current_blink_state == 10: self.desired_blink_state = 0 return 10 if self.current_blink_state < self.desired_blink_state: self.current_blink_state += 1 else: self.current_blink_state -= 1 return self.current_blink_state class Point2D: x = 0 y = 0 def __init__(self, x, y): self.x = x self.y = y def round(self): self.x = round(self.x) self.y = round(self.y) return self def distance(self, other): dx = self.x - other.x dy = self.y - other.y return math.sqrt(dx**2 + dy**2) def interpolate(self, other, percentage): new_x = self.x + (other.x - self.x) * percentage new_y = self.y + (other.y - self.y) * percentage return Point2D(new_x, new_y) def __eq__(self, other): return (self.x, self.y) == (other.x, other.y) def mirror_points(points: list[Point2D]) -> list[Point2D]: mirrored_points = [] for point in points: mirrored_x = 128 - point.x # Calculate the mirrored x-coordinate mirrored_point = Point2D(mirrored_x, point.y) mirrored_points.append(mirrored_point) return mirrored_points def generate_point_array_from_image(image: Image) -> list[Point2D]: image = image.convert("RGB") # Convert image to RGB color mode width, height = image.size point_array = [] # Iterate over the pixels and generate Point2D instances for y in range(height): for x in range(width): pixel = image.getpixel((x, y)) if pixel != (0, 0, 0): # Assuming white pixels point = Point2D(x, y) point_array.append(point) return point_array def generate_image_from_point_array(points: list[Point2D], width: int, height: int) -> Image: # Create a new blank image image = Image.new("RGB", (width, height), "black") # Set the pixels corresponding to the points as white pixels = image.load() for point in points: point = point.round() x = point.x y = point.y pixels[x, y] = (255, 255, 255) return image def pair_points(points1: list[Point2D], points2: list[Point2D]) -> list[tuple[Point2D, Point2D]]: # Determine the size of the point arrays size1 = len(points1) size2 = len(points2) # Create a cost matrix based on the distances between points cost_matrix = np.zeros((size1, size2)) for i in range(size1): for j in range(size2): cost_matrix[i, j] = points1[i].distance(points2[j]) # Duplicate points in the smaller array to match the size of the larger array if size1 > size2: num_duplicates = size1 - size2 duplicated_points = np.random.choice(points2, size=num_duplicates).tolist() points2 += duplicated_points elif size2 > size1: num_duplicates = size2 - size1 duplicated_points = np.random.choice(points1, size=num_duplicates).tolist() points1 += duplicated_points # Update the size of the point arrays size1 = len(points1) size2 = len(points2) # Create a new cost matrix with the updated sizes cost_matrix = np.zeros((size1, size2)) for i in range(size1): for j in range(size2): cost_matrix[i, j] = points1[i].distance(points2[j]) # Solve the assignment problem using the Hungarian algorithm row_ind, col_ind = linear_sum_assignment(cost_matrix) # Create pairs of points based on the optimal assignment pairs = [] for i, j in zip(row_ind, col_ind): pairs.append((points1[i], points2[j])) return pairs def interpolate_point_pairs(pairs: list[tuple[Point2D, Point2D]], percentage: float) -> list[Point2D]: interpolated_points:list[Point2D] = [] for pair in pairs: point1, point2 = pair interpolated_point = point1.interpolate(point2, percentage) interpolated_points.append(interpolated_point) return interpolated_points print("start configuring matrix") startT = curr_time = round(time.time()) # Configuration for the matrix options = RGBMatrixOptions() options.rows = 32 options.cols = 64 options.chain_length = 2 options.parallel = 1 options.hardware_mapping = 'regular' # If you have an Adafruit HAT: 'adafruit-hat' matrix = RGBMatrix(options=options) endT = curr_time = round(time.time()) print("configuring matrix took: " + str(endT - startT)) print("start loading images") startT = curr_time = round(time.time()) Image1 = Image.open("faces/eyeLeftOpen.png") Image2 = Image.open("faces/eyeLeftClosed.png") endT = curr_time = round(time.time()) print("loading images took: " + str(endT - startT)) print("start generating pixel array") startT = curr_time = round(time.time()) pixelArray1 = generate_point_array_from_image(Image1) pixelArray2 = generate_point_array_from_image(Image2) endT = curr_time = round(time.time()) print("generating pixel array took: " + str(endT - startT)) print("start pairing points for one eye") startT = curr_time = round(time.time()) LeftEyeBlinkPairs = pair_points(pixelArray1, pixelArray2) endT = curr_time = round(time.time()) print("pairing points for one eye took: " + str(endT - startT)) DesiredBlinkState = 10 currentBlinkState = 0 blinkFrameCanvases = [] print("start populating matrices for each blink frame") startT = curr_time = round(time.time()) for alpha in range(0,11): offscreen_interpolated_canvas = matrix.CreateFrameCanvas() leftEye = interpolate_point_pairs(LeftEyeBlinkPairs, alpha/10) RightEye = mirror_points(leftEye) face = leftEye + RightEye interpolated_image = generate_image_from_point_array(face, 128, 32) offscreen_interpolated_canvas.SetImage(interpolated_image, unsafe=False) blinkFrameCanvases.append(offscreen_interpolated_canvas) endT = curr_time = round(time.time()) print("populating matrices for each blink frame took: " + str(endT - startT)) def move_option(current_option: int, desired_option: int) -> int: if current_option == desired_option: return current_option if current_option < desired_option: current_option += 1 else: current_option -= 1 return current_option def update_screen(): global DesiredBlinkState, currentBlinkState, blinkFrameCanvases, matrix # open eye again after blink if currentBlinkState == 10: DesiredBlinkState = 0 if currentBlinkState == DesiredBlinkState: next_canvas = blinkFrameCanvases[currentBlinkState] next_canvas = matrix.SwapOnVSync(next_canvas) return next_canvas = blinkFrameCanvases[currentBlinkState] if currentBlinkState < DesiredBlinkState: currentBlinkState += 1 else: currentBlinkState -= 1 next_canvas = matrix.SwapOnVSync(next_canvas) # functions called by the MQTT listener def on_connect(client, userdata, flags, response_code): print("Connected to MQTT broker with result code " + str(response_code)) client.subscribe("test") def on_message(client, userdata, message): print("Received message '" + str(message.payload) + "' on topic '" + message.topic + "' with QoS " + str(message.qos)) global DesiredBlinkState DesiredBlinkState = 10 # MQTT broker configuration broker_address = "10.1.13.173" # Replace with your MQTT broker's address broker_port = 1883 broker_keepalive = 60 client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect(broker_address, broker_port, broker_keepalive) client.loop_start() while True: time.sleep(0.01) update_screen()