CiscoTheProot/rpi/antRender.py

215 lines
7.5 KiB
Python

from rgbmatrix import RGBMatrix, RGBMatrixOptions
from Point2D import interpolate_point_pairs, mirror_points, generate_image_from_point_array, generate_point_array_from_image, pair_points
from State import StateSingleton
import time
import random
from PIL import Image
import paho.mqtt.client as mqtt
import time
import threading
# Configuration for the matrix screens
options = RGBMatrixOptions()
options.rows = 32
options.cols = 64
options.chain_length = 2
options.parallel = 1
options.hardware_mapping = 'regular'
matrix = RGBMatrix(options=options)
prootState = StateSingleton()
prootState.set_matrix(matrix)
def generate_eye_frames(emote_eye_png):
eye_frames = []
points_left_eye_open = generate_point_array_from_image(Image.open("faces/eyeLeftOpen.png"))
if emote_eye_png != "faces/eyeLeftOpen.png":
left_eye_pairs = pair_points(points_left_eye_open, generate_point_array_from_image(Image.open(emote_eye_png)))
for i in range(11):
eye_frames.append(interpolate_point_pairs(left_eye_pairs, i/10))
else:
for i in range(11):
eye_frames.append(points_left_eye_open)
return eye_frames
def generate_mouth_frames(emote_mouth_png):
mouth_frames = []
points_left_mouth = generate_point_array_from_image(Image.open("faces/mouthLeft.png"))
if emote_mouth_png != "faces/mouthLeft.png":
left_mouth_pairs = pair_points(points_left_mouth, generate_point_array_from_image(Image.open(emote_mouth_png)))
for i in range(11):
mouth_frames.append(interpolate_point_pairs(left_mouth_pairs, i/10))
else:
for i in range(11):
mouth_frames.append(points_left_mouth)
return mouth_frames
def generate_nose_frames(emote_nose_png):
nose_frames = []
points_left_nose = generate_point_array_from_image(Image.open("faces/noseLeft.png"))
if emote_nose_png != "faces/noseLeft.png":
left_nose_pairs = pair_points(points_left_nose, generate_point_array_from_image(Image.open(emote_nose_png)))
for i in range(11):
nose_frames.append(interpolate_point_pairs(left_nose_pairs, i/10))
else:
for i in range(11):
nose_frames.append(points_left_nose)
return nose_frames
def generate_face_frames_canvases(emote_eye_png, emote_mouth_png, emote_nose_png):
eye_frames = generate_eye_frames(emote_eye_png)
mouth_frames = generate_mouth_frames(emote_mouth_png)
nose_frames = generate_nose_frames(emote_nose_png)
face_frames_canvases = []
for frame_number in range(11):
eyes = eye_frames[frame_number] + mirror_points(eye_frames[frame_number])
mouth = mouth_frames[frame_number] + mirror_points(mouth_frames[frame_number])
nose = nose_frames[frame_number] + mirror_points(nose_frames[frame_number])
face = eyes + mouth + nose
face_image = generate_image_from_point_array(face, 128, 32)
offscreen_canvas = matrix.CreateFrameCanvas()
offscreen_canvas.SetImage(face_image, unsafe=False)
face_frames_canvases.append(offscreen_canvas)
return face_frames_canvases
# Function that pre-computes all the transition frames
def animate():
blink_animation_FrameCanvases = []
angry_animation_FrameCanvases = []
stun_animation_FrameCanvases = []
love_animation_FrameCanvases = []
for emote_FrameCanvasses, emote_eye_png, emote_mouth_png, emote_nose_png in [
(blink_animation_FrameCanvases, "faces/eyeLeftClosed.png", "faces/mouthLeft.png", "faces/noseLeft.png"),
(angry_animation_FrameCanvases, "faces/eyeLeftAngry.png", "faces/mouthLeft.png", "faces/noseLeft.png"),
(stun_animation_FrameCanvases, "faces/eyeLeftStunned.png", "faces/mouthLeftSad.png", "faces/noseLeft.png"),
(love_animation_FrameCanvases, "faces/eyeLeftLove.png", "faces/mouthLeft.png", "faces/noseLeft.png")
]:
print("start generating ten face frames for " + emote_eye_png)
startT = round(time.time()*1000)
print("generating face with features: " + emote_eye_png +" "+ emote_mouth_png +" "+ emote_nose_png)
face_frames_canvases = generate_face_frames_canvases(emote_eye_png, emote_mouth_png, emote_nose_png)
emote_FrameCanvasses.extend(face_frames_canvases)
endT = round(time.time()*1000)
print("generating ten face frames took: " + str(endT - startT) + " ms")
state = StateSingleton()
state.set_blink_animation_frames(blink_animation_FrameCanvases)
state.set_angry_animation_frames(angry_animation_FrameCanvases)
state.set_stun_animation_frames(stun_animation_FrameCanvases)
state.set_love_animation_frames(love_animation_FrameCanvases)
state.set_animations_ready()
animate()
# The interupt time is responsible for the (roughly) 100 hz frame rate
def interrupt_timer():
proot_state = StateSingleton()
while True:
if proot_state.get_animations_ready():
proot_state.update()
time.sleep(0.01)
# Function responsible for the blinking behaviour when Idle
def random_blinks():
while True:
time.sleep(random.randint(3, 5))
proot_state = StateSingleton()
if proot_state.get_animations_ready():
if proot_state.current_expression == proot_state.states[0]:
proot_state.set_desired_expression(1)
time.sleep(0.25)
proot_state.set_desired_expression(0)
# Create and start screen update interrupts
screen_update_thread = threading.Thread(target=interrupt_timer)
screen_update_thread.start()
# Create and start random blinks interrupts
screen_update_thread = threading.Thread(target=random_blinks)
screen_update_thread.start()
## Load the object from disk
#with open('my_object.pickle', 'rb') as file:
# interpolated_faces = pickle.load(file)
#
#for interpolated_face_image in interpolated_faces:
# offscreen_interpolated_canvas = matrix.CreateFrameCanvas()
# offscreen_interpolated_canvas.SetImage(interpolated_face_image, unsafe=False)
# blink_animation_FrameCanvases.append(offscreen_interpolated_canvas)
#
#
## Store the object to disk
#with open('my_object.pickle', 'wb') as file:
# pickle.dump(interpolated_faces, file)
# functions called by the MQTT listener
def on_connect(client, userdata, flags, response_code):
print("Connected to MQTT broker with result code " + str(response_code))
client.subscribe("test")
def on_message(client, userdata, message):
print("Received message '" + str(message.payload) + "' on topic '"
+ message.topic + "' with QoS " + str(message.qos))
bean_number = str(message.payload)[12:13]
bean_state = str(message.payload)[6:10]
print("pin number: " + bean_number + " pin state: " + bean_state)
proot_state = StateSingleton()
if not proot_state.get_animations_ready():
print("animation not yet ready.")
return
if bean_state == "rose":
proot_state.set_desired_expression(0)
elif bean_state == "fell":
proot_state.set_desired_expression(int(bean_number))
# MQTT broker configuration
broker_address = "localhost" # Replace with your MQTT broker's address
broker_port = 1883
broker_keepalive = 60
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker_address, broker_port, broker_keepalive)
client.loop_start()
while True:
# this sleep sets the time between finishing one screen update and the next starting
pass