from rgbmatrix import RGBMatrix, RGBMatrixOptions import paho.mqtt.client as mqtt import time from PIL import Image import numpy as np # Configuration for the matrix options = RGBMatrixOptions() options.rows = 32 options.cols = 64 options.chain_length = 2 options.parallel = 1 options.hardware_mapping = 'regular' # If you have an Adafruit HAT: 'adafruit-hat' matrix = RGBMatrix(options=options) def pure_pil_alpha_to_color_v2(image, color=(0, 0, 0)): image.load() # needed for split() background = Image.new('RGB', image.size, color) background.paste(image, mask=image.split()[3]) # 3 is the alpha channel return background def image_interpolation(image1, image2, alpha): # Ensure both images have the same size assert image1.size == image2.size, "Input images must have the same shape" # Normalize the alpha value alpha = max(0, min(1, alpha)) # Perform interpolation interpolated_image = Image.blend(image1, image2, alpha) return interpolated_image image = Image.open("faces/prootface1.bmp") image.thumbnail((128, 32), Image.ANTIALIAS) noBlinkImage = pure_pil_alpha_to_color_v2(image) image = Image.open("faces/prootface2.bmp") image.thumbnail((128, 32), Image.ANTIALIAS) fullBlinkImage = pure_pil_alpha_to_color_v2(image) DesiredBlinkState = 10 currentBlinkState = 0 blinkFrameCanvases = {} for alpha in range(10): offscreen_interpolated_canvas = matrix.CreateFrameCanvas() interpolated_image = image_interpolation(noBlinkImage, fullBlinkImage, alpha/10) blinkFrameCanvases[alpha] = offscreen_interpolated_canvas.SetImage(interpolated_image) blinkFrameCanvases[0] = offscreen_interpolated_canvas.SetImage(noBlinkImage) blinkFrameCanvases[10] = offscreen_interpolated_canvas.SetImage(fullBlinkImage) # offscreen canvas that can be written to and then set to the matrix asynchronously offscreen_canvas = matrix.CreateFrameCanvas() offscreen_canvas.brightness = 50 offscreen_text_canvas = matrix.CreateFrameCanvas() offscreen_text_canvas.brightness = 50 offscreen_text_canvas.SetImage(noBlinkImage) def update_screen(): global DesiredBlinkState, currentBlinkState, blinkFrameCanvases, matrix, offscreen_canvas # open eye again after blink if currentBlinkState == 10: DesiredBlinkState = 0 if currentBlinkState == DesiredBlinkState: return next_canvas = blinkFrameCanvases[currentBlinkState] if currentBlinkState < DesiredBlinkState: currentBlinkState =+ 1 else: currentBlinkState =- 1 next_canvas = matrix.SwapOnVSync(next_canvas) # functions called by the MQTT listener def on_connect(client, userdata, flags, response_code): print("Connected to MQTT broker with result code " + str(response_code)) client.subscribe("test") def on_message(client, userdata, message): print("Received message '" + str(message.payload) + "' on topic '" + message.topic + "' with QoS " + str(message.qos)) global DesiredBlinkState DesiredBlinkState = 10 # MQTT broker configuration broker_address = "10.1.13.173" # Replace with your MQTT broker's address broker_port = 1883 broker_keepalive = 60 client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect(broker_address, broker_port, broker_keepalive) client.loop_start() while True: time.sleep(0.05) update_screen()