from rgbmatrix import RGBMatrix, RGBMatrixOptions import paho.mqtt.client as mqtt import time from PIL import Image import numpy as np # Configuration for the matrix options = RGBMatrixOptions() options.rows = 32 options.cols = 64 options.chain_length = 2 options.parallel = 1 options.hardware_mapping = 'regular' # If you have an Adafruit HAT: 'adafruit-hat' matrix = RGBMatrix(options=options) import math from PIL import Image import numpy as np from scipy.optimize import linear_sum_assignment class Point2D: x = 0 y = 0 def __init__(self, x, y): self.x = x self.y = y def round(self): self.x = round(self.x) self.y = round(self.y) return self def distance(self, other): dx = self.x - other.x dy = self.y - other.y return math.sqrt(dx**2 + dy**2) def interpolate(self, other, percentage): new_x = self.x + (other.x - self.x) * percentage new_y = self.y + (other.y - self.y) * percentage return Point2D(new_x, new_y) def __eq__(self, other): return (self.x, self.y) == (other.x, other.y) def generate_point_array_from_image(image): image = image.convert("RGB") # Convert image to RGB color mode width, height = image.size point_array = [] # Iterate over the pixels and generate Point2D instances for y in range(height): for x in range(width): pixel = image.getpixel((x, y)) if pixel != (0, 0, 0): # Assuming white pixels point = Point2D(x, y) point_array.append(point) return point_array def generate_image_from_point_array(points, width, height): # Create a new blank image image = Image.new("RGB", (width, height), "black") # Set the pixels corresponding to the points as white pixels = image.load() for point in points: point = point.round() x = point.x y = point.y pixels[x, y] = (255, 255, 255) return image def pair_points(points1, points2): # Determine the size of the point arrays size1 = len(points1) size2 = len(points2) # Create a cost matrix based on the distances between points cost_matrix = np.zeros((size1, size2)) for i in range(size1): for j in range(size2): cost_matrix[i, j] = points1[i].distance(points2[j]) # Duplicate points in the smaller array to match the size of the larger array if size1 > size2: num_duplicates = size1 - size2 duplicated_points = np.random.choice(points2, size=num_duplicates).tolist() points2 += duplicated_points elif size2 > size1: num_duplicates = size2 - size1 duplicated_points = np.random.choice(points1, size=num_duplicates).tolist() points1 += duplicated_points # Update the size of the point arrays size1 = len(points1) size2 = len(points2) # Create a new cost matrix with the updated sizes cost_matrix = np.zeros((size1, size2)) for i in range(size1): for j in range(size2): cost_matrix[i, j] = points1[i].distance(points2[j]) # Solve the assignment problem using the Hungarian algorithm row_ind, col_ind = linear_sum_assignment(cost_matrix) # Create pairs of points based on the optimal assignment pairs = [] for i, j in zip(row_ind, col_ind): pairs.append((points1[i], points2[j])) return pairs def interpolate_point_pairs(pairs, percentage): interpolated_points = [] for pair in pairs: point1, point2 = pair interpolated_point = point1.interpolate(point2, percentage) interpolated_points.append(interpolated_point) return interpolated_points Image1 = Image.open("faces/prootface3.png") Image2 = Image.open("faces/prootface4.png") pixelArray1 = generate_point_array_from_image(Image1) pixelArray2 = generate_point_array_from_image(Image2) pairs = pair_points(pixelArray1, pixelArray2) DesiredBlinkState = 10 currentBlinkState = 0 blinkFrameCanvases = [] offscreen_interpolated_canvasA = matrix.CreateFrameCanvas() offscreen_interpolated_canvasA.SetImage(generate_image_from_point_array(interpolate_point_pairs(pairs, 0), 128, 32), unsafe=False) blinkFrameCanvases.append(offscreen_interpolated_canvasA) for alpha in range(1,10): offscreen_interpolated_canvas = matrix.CreateFrameCanvas() interpolated_image = generate_image_from_point_array(interpolate_point_pairs(pairs, alpha/10), 128, 32) offscreen_interpolated_canvas.SetImage(interpolated_image, unsafe=False) blinkFrameCanvases.append(offscreen_interpolated_canvas) offscreen_interpolated_canvasB = matrix.CreateFrameCanvas() offscreen_interpolated_canvasB.SetImage(generate_image_from_point_array(interpolate_point_pairs(pairs, 1), 128, 32), unsafe=False) blinkFrameCanvases.append(offscreen_interpolated_canvasB) def update_screen(): global DesiredBlinkState, currentBlinkState, blinkFrameCanvases, matrix, offscreen_interpolated_canvasA # open eye again after blink if currentBlinkState == 10: DesiredBlinkState = 0 if currentBlinkState == DesiredBlinkState: next_canvas = blinkFrameCanvases[currentBlinkState] next_canvas = matrix.SwapOnVSync(next_canvas) return next_canvas = blinkFrameCanvases[currentBlinkState] if currentBlinkState < DesiredBlinkState: currentBlinkState += 1 else: currentBlinkState -= 1 next_canvas = matrix.SwapOnVSync(next_canvas) # functions called by the MQTT listener def on_connect(client, userdata, flags, response_code): print("Connected to MQTT broker with result code " + str(response_code)) client.subscribe("test") def on_message(client, userdata, message): print("Received message '" + str(message.payload) + "' on topic '" + message.topic + "' with QoS " + str(message.qos)) global DesiredBlinkState DesiredBlinkState = 10 # MQTT broker configuration broker_address = "10.1.13.173" # Replace with your MQTT broker's address broker_port = 1883 broker_keepalive = 60 client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect(broker_address, broker_port, broker_keepalive) client.loop_start() while True: time.sleep(0.05) update_screen()