CiscoTheProot/rpi/antRender.py

295 lines
9 KiB
Python
Raw Normal View History

from rgbmatrix import RGBMatrix, RGBMatrixOptions
import paho.mqtt.client as mqtt
import time
from PIL import Image
import numpy as np
import math
from scipy.optimize import linear_sum_assignment
class ProotState:
def __init__(self):
self.current_blink_state = 0
self.desired_blink_state = 0
def startBlink(self):
self.desired_blink_state = 10
def next_blink_state(self) -> int:
if self.current_blink_state == self.desired_blink_state and self.current_blink_state == 10:
self.desired_blink_state = 0
return 10
if self.current_blink_state < self.desired_blink_state:
self.current_blink_state += 1
else:
self.current_blink_state -= 1
return self.current_blink_state
class Point2D:
x = 0
y = 0
color:tuple[int, int, int] = (0, 0, 0)
def __init__(self, x, y, color: tuple[int, int, int] = (0, 0, 0)):
self.x = x
self.y = y
self.color = color
def round(self):
self.x = round(self.x)
self.y = round(self.y)
return self
def distance(self, other):
dx = self.x - other.x
dy = self.y - other.y
return math.sqrt(dx**2 + dy**2)
def interpolate(self, other, percentage):
new_x = self.x + (other.x - self.x) * percentage
new_y = self.y + (other.y - self.y) * percentage
return Point2D(new_x, new_y)
def __eq__(self, other):
return (self.x, self.y) == (other.x, other.y)
def mirror_points(points: list[Point2D]) -> list[Point2D]:
mirrored_points = []
for point in points:
mirrored_x = 128 - point.x # Calculate the mirrored x-coordinate
mirrored_point = Point2D(mirrored_x, point.y)
mirrored_points.append(mirrored_point)
return mirrored_points
def generate_point_array_from_image(image: Image) -> list[Point2D]:
image = image.convert("RGB") # Convert image to RGB color mode
width, height = image.size
point_array = []
# Iterate over the pixels and generate Point2D instances
for y in range(height):
for x in range(width):
pixel = image.getpixel((x, y))
if pixel != (0, 0, 0): # Assuming white pixels
point = Point2D(x, y)
point_array.append(point)
return point_array
def generate_image_from_point_array(points: list[Point2D], width: int, height: int) -> Image:
# Create a new blank image
image = Image.new("RGB", (width, height), "black")
# Set the pixels corresponding to the points as white
pixels = image.load()
for point in points:
point = point.round()
x = point.x
y = point.y
pixels[x, y] = (255, 255, 255)
return image
def pair_points(points1: list[Point2D], points2: list[Point2D]) -> list[tuple[Point2D, Point2D]]:
# Determine the size of the point arrays
size1 = len(points1)
size2 = len(points2)
# Create a cost matrix based on the distances between points
cost_matrix = np.zeros((size1, size2))
for i in range(size1):
for j in range(size2):
cost_matrix[i, j] = points1[i].distance(points2[j])
# Duplicate points in the smaller array to match the size of the larger array
if size1 > size2:
num_duplicates = size1 - size2
duplicated_points = np.random.choice(points2, size=num_duplicates).tolist()
points2 += duplicated_points
elif size2 > size1:
num_duplicates = size2 - size1
duplicated_points = np.random.choice(points1, size=num_duplicates).tolist()
points1 += duplicated_points
# Update the size of the point arrays
size1 = len(points1)
size2 = len(points2)
# Create a new cost matrix with the updated sizes
cost_matrix = np.zeros((size1, size2))
for i in range(size1):
for j in range(size2):
cost_matrix[i, j] = points1[i].distance(points2[j])
# Solve the assignment problem using the Hungarian algorithm
row_ind, col_ind = linear_sum_assignment(cost_matrix)
# Create pairs of points based on the optimal assignment
pairs = []
for i, j in zip(row_ind, col_ind):
pairs.append((points1[i], points2[j]))
return pairs
def interpolate_point_pairs(pairs: list[tuple[Point2D, Point2D]], percentage: float) -> list[Point2D]:
interpolated_points:list[Point2D] = []
for pair in pairs:
point1, point2 = pair
interpolated_point = point1.interpolate(point2, percentage)
interpolated_points.append(interpolated_point)
return interpolated_points
print("start configuring matrix")
2023-05-23 19:51:52 +02:00
startT = curr_time = round(time.time()*1000)
# Configuration for the matrix
options = RGBMatrixOptions()
options.rows = 32
options.cols = 64
options.chain_length = 2
options.parallel = 1
options.hardware_mapping = 'regular' # If you have an Adafruit HAT: 'adafruit-hat'
matrix = RGBMatrix(options=options)
2023-05-23 19:51:52 +02:00
endT = curr_time = round(time.time()*1000)
print("configuring matrix took: " + str(endT - startT) + " ms")
print("start loading images")
2023-05-23 19:51:52 +02:00
startT = curr_time = round(time.time()*1000)
# Loading all images
# TODO looking into storing and loading lists of points
image_left_eye_open = Image.open("faces/eyeLeftOpen.png")
image_left_eye_closed = Image.open("faces/eyeLeftClosed.png")
image_left_nose = Image.open("faces/noseLeft.png")
image_left_mouth = Image.open("faces/mouthLeft.png")
2023-05-23 19:51:52 +02:00
endT = curr_time = round(time.time()*1000)
print("loading images took: " + str(endT - startT) + " ms")
print("start generating pixel array")
2023-05-23 19:51:52 +02:00
startT = curr_time = round(time.time()*1000)
# generate pixel arrays from each image
# TODO ^ storing and loading lists of points will take away this step. (it will require a dedicated script to precompute these)
points_left_eye_open = generate_point_array_from_image(image_left_eye_open)
points_left_eye_closed = generate_point_array_from_image(image_left_eye_closed)
points_left_nose = generate_point_array_from_image(image_left_nose)
points_left_mouth = generate_point_array_from_image(image_left_mouth)
2023-05-23 19:51:52 +02:00
endT = curr_time = round(time.time()*1000)
print("generating pixel array took: " + str(endT - startT) + " ms")
print("start pairing points for one eye")
2023-05-23 19:51:52 +02:00
startT = curr_time = round(time.time()*1000)
#calculate the point pairs between the open and closed left eye
# TODO look into precomputing and storing these animations before runtime
left_eye_blink_pairs = pair_points(points_left_eye_open, points_left_eye_closed)
2023-05-23 19:51:52 +02:00
endT = curr_time = round(time.time()*1000)
print("pairing points for one eye took: " + str(endT - startT) + " ms")
DesiredBlinkState = 10
currentBlinkState = 0
blinkFrameCanvases = []
print("start populating matrices for each blink frame")
2023-05-23 19:51:52 +02:00
startT = curr_time = round(time.time()*1000)
# TODO look into the possibility of precomputing and more importantly storing the matrix objects
for alpha in range(0,11):
offscreen_interpolated_canvas = matrix.CreateFrameCanvas()
left_eye = interpolate_point_pairs(left_eye_blink_pairs, alpha/10)
right_eye = mirror_points(left_eye)
nose = points_left_nose + mirror_points(points_left_nose)
mouth = points_left_mouth + mirror_points(points_left_mouth)
face = left_eye + right_eye + nose + mouth
interpolated_face_image = generate_image_from_point_array(face, 128, 32)
offscreen_interpolated_canvas.SetImage(interpolated_face_image, unsafe=False)
blinkFrameCanvases.append(offscreen_interpolated_canvas)
2023-05-23 19:51:52 +02:00
endT = curr_time = round(time.time()*1000)
print("populating matrices for each blink frame took: " + str(endT - startT) + " ms")
def update_screen():
# TODO move blinking animation logic to the ProotState class
global DesiredBlinkState, currentBlinkState, blinkFrameCanvases, matrix
# open eye again after blink
if currentBlinkState == 10:
DesiredBlinkState = 0
if currentBlinkState == DesiredBlinkState:
next_canvas = blinkFrameCanvases[currentBlinkState]
2023-05-22 23:00:10 +02:00
next_canvas = matrix.SwapOnVSync(next_canvas)
return
next_canvas = blinkFrameCanvases[currentBlinkState]
if currentBlinkState < DesiredBlinkState:
currentBlinkState += 1
else:
currentBlinkState -= 1
next_canvas = matrix.SwapOnVSync(next_canvas)
# functions called by the MQTT listener
def on_connect(client, userdata, flags, response_code):
print("Connected to MQTT broker with result code " + str(response_code))
client.subscribe("test")
def on_message(client, userdata, message):
print("Received message '" + str(message.payload) + "' on topic '"
+ message.topic + "' with QoS " + str(message.qos))
global DesiredBlinkState
DesiredBlinkState = 10
# MQTT broker configuration
broker_address = "10.1.13.173" # Replace with your MQTT broker's address
broker_port = 1883
broker_keepalive = 60
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker_address, broker_port, broker_keepalive)
client.loop_start()
while True:
time.sleep(0.01)
update_screen()