CiscoTheProot/rpi/antRender.py
2023-05-29 20:23:33 +02:00

162 lines
5 KiB
Python

from rgbmatrix import RGBMatrix, RGBMatrixOptions
import Point2D
import ProotState
import time
import random
from PIL import Image
import paho.mqtt.client as mqtt
import time
import threading
print("start configuring matrix")
startT = curr_time = round(time.time()*1000)
# Configuration for the matrix screens
options = RGBMatrixOptions()
options.rows = 32
options.cols = 64
options.chain_length = 2
options.parallel = 1
options.hardware_mapping = 'regular' # If you have an Adafruit HAT: 'adafruit-hat'
matrix = RGBMatrix(options=options)
endT = curr_time = round(time.time()*1000)
print("configuring matrix took: " + str(endT - startT) + " ms")
# array to hold the blink animation frames
blinkFrameCanvases = []
def interrupt_timer():
global blinkFrameCanvases, matrix
proot_state = ProotState()
while True:
proot_state.update_screen(blinkFrameCanvases, matrix)
time.sleep(0.01)
def random_blinks():
while True:
time.sleep(random.randint(3, 5))
proot_state = ProotState()
if proot_state.get_blinks_frames_ready():
proot_state.blink()
# Create and start screen update interrupts
screen_update_thread = threading.Thread(target=ProotState.interrupt_timer)
screen_update_thread.start()
# Create and start random blinks interrupts
screen_update_thread = threading.Thread(target=ProotState.random_blinks)
screen_update_thread.start()
print("start loading images")
startT = curr_time = round(time.time()*1000)
# Loading all images
# TODO looking into storing and loading lists of points
image_left_eye_open = Image.open("faces/eyeLeftOpen.png")
image_left_eye_closed = Image.open("faces/eyeLeftClosed.png")
image_left_nose = Image.open("faces/noseLeft.png")
image_left_mouth = Image.open("faces/mouthLeft.png")
endT = curr_time = round(time.time()*1000)
print("loading images took: " + str(endT - startT) + " ms")
print("start generating pixel array")
startT = curr_time = round(time.time()*1000)
# generate pixel arrays from each image
# TODO ^ storing and loading lists of points will take away this step. (it will require a dedicated script to precompute these)
points_left_eye_open = Point2D.generate_point_array_from_image(image_left_eye_open)
points_left_eye_closed = Point2D.generate_point_array_from_image(image_left_eye_closed)
points_left_nose = Point2D.generate_point_array_from_image(image_left_nose)
points_left_mouth = Point2D.generate_point_array_from_image(image_left_mouth)
endT = curr_time = round(time.time()*1000)
print("generating pixel array took: " + str(endT - startT) + " ms")
print("start pairing points for one eye")
startT = curr_time = round(time.time()*1000)
#calculate the point pairs between the open and closed left eye
# TODO look into precomputing and storing these animations before runtime
left_eye_blink_pairs = Point2D.pair_points(points_left_eye_open, points_left_eye_closed)
endT = curr_time = round(time.time()*1000)
print("pairing points for one eye took: " + str(endT - startT) + " ms")
print("start populating matrices for each blink frame")
startT = curr_time = round(time.time()*1000)
# TODO look into the possibility of precomputing and more importantly storing the matrix objects
for alpha in range(0,11):
offscreen_interpolated_canvas = matrix.CreateFrameCanvas()
left_eye = Point2D.interpolate_point_pairs(left_eye_blink_pairs, alpha/10)
right_eye = Point2D.mirror_points(left_eye)
nose = points_left_nose + Point2D.mirror_points(points_left_nose)
mouth = points_left_mouth + Point2D.mirror_points(points_left_mouth)
face = left_eye + right_eye + nose + mouth
interpolated_face_image = Point2D.generate_image_from_point_array(face, 128, 32)
offscreen_interpolated_canvas.SetImage(interpolated_face_image, unsafe=False)
blinkFrameCanvases.append(offscreen_interpolated_canvas)
endT = curr_time = round(time.time()*1000)
print("populating matrices for each blink frame took: " + str(endT - startT) + " ms")
proot_state = ProotState.ProotState()
proot_state.set_matrix(matrix)
proot_state.set_blinks_frames_ready(True)
proot_state.blink()
# functions called by the MQTT listener
def on_connect(client, userdata, flags, response_code):
print("Connected to MQTT broker with result code " + str(response_code))
client.subscribe("test")
def on_message(client, userdata, message):
print("Received message '" + str(message.payload) + "' on topic '"
+ message.topic + "' with QoS " + str(message.qos))
proot_state = ProotState.ProotState()
proot_state.blink()
# MQTT broker configuration
broker_address = "10.1.13.173" # Replace with your MQTT broker's address
broker_port = 1883
broker_keepalive = 60
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker_address, broker_port, broker_keepalive)
client.loop_start()
while True:
# this sleep sets the time between finishing one screen update and the next starting
pass
# TODO create a splash screen to display super quick before the rest of the assets are loading