Major refactoring now using State instead of ProotState.

Expect runtime bugs
This commit is contained in:
CiscoTheWolf 2023-06-05 21:44:27 +02:00
parent 18cf352d1a
commit 56b34cd083
3 changed files with 225 additions and 92 deletions

View file

@ -76,8 +76,10 @@ def generate_point_array_from_image(image):
cached_point_arrays = load_cached_point_arrays() cached_point_arrays = load_cached_point_arrays()
if image_hash in cached_point_arrays: if image_hash in cached_point_arrays:
print("Found existing point array matching png: " + image.filename + " using existing array.")
return [Point2D(point["x"], point["y"], tuple(point["color"])) for point in cached_point_arrays[image_hash]] return [Point2D(point["x"], point["y"], tuple(point["color"])) for point in cached_point_arrays[image_hash]]
print("No existing point array matching png: " + image.filename + " found. Generating now.")
width, height = image.size width, height = image.size
pixel_array = [] pixel_array = []
@ -91,6 +93,7 @@ def generate_point_array_from_image(image):
cached_point_arrays[image_hash] = pixel_array cached_point_arrays[image_hash] = pixel_array
save_cached_point_arrays(cached_point_arrays) save_cached_point_arrays(cached_point_arrays)
print("Point array generated and stored.")
return [Point2D(point["x"], point["y"], tuple(point["color"])) for point in pixel_array] return [Point2D(point["x"], point["y"], tuple(point["color"])) for point in pixel_array]

111
rpi/State.py Normal file
View file

@ -0,0 +1,111 @@
import time
class StateSingleton:
_instance = None
def __new__(cls):
if not cls._instance:
cls._instance = super(StateSingleton, cls).__new__(cls)
return cls._instance
def __init__(self):
self.states = {
0: "open",
1: "blink",
2: "owo",
3: "angry"
}
self.matrix = False
self.current_expression = self.states[0]
self.desired_expression = self.states[0]
self.new_desired_expression = self.states[0]
self.transition_steps = 10 # Number of steps to transition between states
self.transition_count = 0 # Counter to keep track of transition progress
self.animations_ready = False
self.blink_animation_FrameCanvases = []
self.angry_animation_FrameCanvases = []
def set_blink_animation_frames(self, blink_animation_FrameCanvases):
self.blink_animation_FrameCanvases = blink_animation_FrameCanvases
def set_angry_animation_frames(self, angry_animation_FrameCanvases):
self.angry_animation_FrameCanvases = angry_animation_FrameCanvases
def set_desired_expression(self, state):
if state in self.states:
self.desired_expression = self.states[state]
self.new_desired_expression = state
else:
print("Invalid state.")
def update_step(self):
if self.current_expression == self.desired_expression: # already at desire expression
return
if self.current_expression == self.states[0]: # Transition from "open" state to another state
if self.transition_count < self.transition_steps: # still transitioning
self.transition_count += 1
return
self.current_expression = self.desired_expression
self.transition_count = 10
return
else: # Transition from another state to "open" state
if 0 < self.transition_count: # still transitioning
self.transition_count -= 1
return
self.current_expression = self.states[0] # Complete transition to "open" state
self.transition_count = 0
def draw_face(self):
#default face
if self.current_expression == self.desired_expression == self.states[0]:
self.matrix.SwapOnVSync(self.blink_animation_FrameCanvases[0])
# blinky faces
elif self.current_expression == self.desired_expression == self.states[1]:
self.matrix.SwapOnVSync(self.blink_animation_FrameCanvases[10])
elif self.current_expression == self.states[1] or self.desired_expression == self.states[1]:
self.matrix.SwapOnVSync(self.blink_animation_FrameCanvases[self.transition_count])
# angry faces
elif self.current_expression == self.desired_expression == self.states[3]:
self.matrix.SwapOnVSync(self.angry_animation_FrameCanvases[10])
elif self.current_expression == self.states[3] or self.desired_expression == self.states[3]:
self.matrix.SwapOnVSync(self.angry_animation_FrameCanvases[self.transition_count])
def update(self):
self.update_step(self)
print("at step: " + self.transition_count + " from default in expression: " + self.desired_expression)
self.draw_face()
def get_current_state(self):
for state, value in self.states.items():
if value == self.current_state:
return state
def get_animations_ready(self):
return self.animations_ready
def set_animations_ready(self, state):
self.animations_ready = state
def set_matrix(self, matrix):
self.matrix = matrix
# Example usage:
# singleton = StateSingleton()
# singleton.set_desired_expression(3)
#for _ in range(100):
# singleton.update()
# time.sleep(0.1)

View file

@ -1,6 +1,6 @@
from rgbmatrix import RGBMatrix, RGBMatrixOptions from rgbmatrix import RGBMatrix, RGBMatrixOptions
from Point2D import interpolate_point_pairs, mirror_points, generate_image_from_point_array, generate_point_array_from_image, pair_points from Point2D import interpolate_point_pairs, mirror_points, generate_image_from_point_array, generate_point_array_from_image, pair_points
from ProotState import ProotState from State import StateSingleton
import time import time
import random import random
@ -23,21 +23,99 @@ options.chain_length = 2
options.parallel = 1 options.parallel = 1
options.hardware_mapping = 'regular' # If you have an Adafruit HAT: 'adafruit-hat' options.hardware_mapping = 'regular' # If you have an Adafruit HAT: 'adafruit-hat'
matrix = RGBMatrix(options=options) matrix = RGBMatrix(options=options)
prootState = StateSingleton()
prootState.set_matrix(matrix)
endT = curr_time = round(time.time()*1000) endT = curr_time = round(time.time()*1000)
print("configuring matrix took: " + str(endT - startT) + " ms") print("configuring matrix took: " + str(endT - startT) + " ms")
# array to hold the blink animation frames def generate_eye_frames(emote_eye_png):
blinkFrameCanvases = [] eye_frames = []
points_left_eye_open = generate_point_array_from_image(Image.open("faces/eyeLeftOpen.png"))
if emote_eye_png != "faces/eyeLeftOpen.png":
left_eye_pairs = pair_points(points_left_eye_open, generate_point_array_from_image(Image.open(emote_eye_png)))
for i in range(11):
eye_frames.append(interpolate_point_pairs(left_eye_pairs, i/10))
else:
for i in range(11):
eye_frames.append(points_left_eye_open)
return eye_frames
def generate_mouth_frames(emote_mouth_png):
mouth_frames = []
points_left_mouth = generate_point_array_from_image(Image.open("faces/mouthLeft.png"))
if emote_mouth_png != "faces/mouthLeft.png":
left_mouth_pairs = pair_points(points_left_mouth, generate_point_array_from_image(Image.open(emote_mouth_png)))
for i in range(11):
mouth_frames.append(interpolate_point_pairs(left_mouth_pairs, i/10))
else:
for i in range(11):
mouth_frames.append(points_left_mouth)
return mouth_frames
def generate_nose_frames(emote_nose_png):
nose_frames = []
points_left_nose = generate_point_array_from_image(Image.open("faces/noseLeft.png"))
if emote_nose_png != "faces/noseLeft.png":
left_nose_pairs = pair_points(points_left_nose, generate_point_array_from_image(Image.open(emote_nose_png)))
for i in range(11):
nose_frames.append(interpolate_point_pairs(left_nose_pairs, i/10))
else:
for i in range(11):
nose_frames.append(points_left_nose)
return nose_frames
def generate_face_frames(emote_eye_png, emote_mouth_png, emote_nose_png):
eye_frames = generate_eye_frames(emote_eye_png)
mouth_frames = generate_mouth_frames(emote_mouth_png)
nose_frames = generate_nose_frames(emote_nose_png)
face_frames = []
for frame_number in range(11):
eyes = eye_frames[frame_number] + mirror_points(eye_frames[frame_number])
mouth = mouth_frames[frame_number] + mirror_points(mouth_frames[frame_number])
nose = nose_frames[frame_number] + mirror_points(nose_frames[frame_number])
face = eyes + mouth + nose
face_image = generate_image_from_point_array(face, 128, 32)
offscreen_canvas = matrix.CreateFrameCanvas()
offscreen_canvas.SetImage(face_image, unsafe=False)
face_frames.append(offscreen_canvas)
return face_frames
def animate():
blink_animation_FrameCanvases = []
angry_animation_FrameCanvases = []
for emote_FrameCanvasses, emote_eye_png, emote_mouth_png, emote_nose_png in [
(blink_animation_FrameCanvases, "faces/eyeLeftClosed.png", "faces/mouthLeft.png", "faces/noseLeft.png"),
(angry_animation_FrameCanvases, "faces/eyeLeftAngry.png", "faces/mouthLeft.png", "faces/noseLeft.png")
]:
face_frames = generate_face_frames(emote_eye_png, emote_mouth_png, emote_nose_png)
emote_FrameCanvasses.extend(face_frames)
state = StateSingleton()
state.set_blink_animation_frames(blink_animation_FrameCanvases)
state.set_angry_animation_frames(angry_animation_FrameCanvases)
state.set_animations_ready(True)
animate()
def interrupt_timer(): def interrupt_timer():
global blinkFrameCanvases, matrix proot_state = StateSingleton()
proot_state = ProotState()
while True: while True:
proot_state.update_screen(blinkFrameCanvases, matrix) proot_state.update()
time.sleep(0.01) time.sleep(0.01)
@ -45,10 +123,13 @@ def random_blinks():
while True: while True:
time.sleep(random.randint(3, 5)) time.sleep(random.randint(3, 5))
proot_state = ProotState() proot_state = StateSingleton()
if proot_state.get_blinks_frames_ready(): if proot_state.get_animations_ready():
proot_state.blink() proot_state.set_desired_expression(1)
time.sleep(random.randint(0.1))
proot_state.set_desired_expression(0)
# Create and start screen update interrupts # Create and start screen update interrupts
@ -60,91 +141,21 @@ screen_update_thread = threading.Thread(target=random_blinks)
screen_update_thread.start() screen_update_thread.start()
print("start loading images")
startT = curr_time = round(time.time()*1000)
# Loading all images ## Load the object from disk
# TODO looking into storing and loading lists of points #with open('my_object.pickle', 'rb') as file:
image_left_eye_open = Image.open("faces/eyeLeftOpen.png") # interpolated_faces = pickle.load(file)
image_left_eye_closed = Image.open("faces/eyeLeftClosed.png") #
image_left_nose = Image.open("faces/noseLeft.png") #for interpolated_face_image in interpolated_faces:
image_left_mouth = Image.open("faces/mouthLeft.png")
endT = curr_time = round(time.time()*1000)
print("loading images took: " + str(endT - startT) + " ms")
print("start generating pixel array")
startT = curr_time = round(time.time()*1000)
# generate pixel arrays from each image
# TODO ^ storing and loading lists of points will take away this step. (it will require a dedicated script to precompute these)
points_left_eye_open = generate_point_array_from_image(image_left_eye_open)
points_left_eye_closed = generate_point_array_from_image(image_left_eye_closed)
points_left_nose = generate_point_array_from_image(image_left_nose)
points_left_mouth = generate_point_array_from_image(image_left_mouth)
endT = curr_time = round(time.time()*1000)
print("generating pixel array took: " + str(endT - startT) + " ms")
print("start pairing points for one eye")
startT = curr_time = round(time.time()*1000)
#calculate the point pairs between the open and closed left eye
# TODO look into precomputing and storing these animations before runtime
left_eye_blink_pairs = pair_points(points_left_eye_open, points_left_eye_closed)
endT = curr_time = round(time.time()*1000)
print("pairing points for one eye took: " + str(endT - startT) + " ms")
print("start populating matrices for each blink frame")
startT = curr_time = round(time.time()*1000)
#
## TODO look into the possibility of precomputing and more importantly storing the matrix objects
#for alpha in range(0,11):
# offscreen_interpolated_canvas = matrix.CreateFrameCanvas() # offscreen_interpolated_canvas = matrix.CreateFrameCanvas()
#
# left_eye = interpolate_point_pairs(left_eye_blink_pairs, alpha/10)
# right_eye = mirror_points(left_eye)
# nose = points_left_nose + mirror_points(points_left_nose)
# mouth = points_left_mouth + mirror_points(points_left_mouth)
# face = left_eye + right_eye + nose + mouth
#
# interpolated_face_image = generate_image_from_point_array(face, 128, 32)
# interpolated_faces.append(interpolated_face_image)
#
# offscreen_interpolated_canvas.SetImage(interpolated_face_image, unsafe=False) # offscreen_interpolated_canvas.SetImage(interpolated_face_image, unsafe=False)
# blinkFrameCanvases.append(offscreen_interpolated_canvas) # blink_animation_FrameCanvases.append(offscreen_interpolated_canvas)
# #
#
## Store the object to disk
#with open('my_object.pickle', 'wb') as file:
# pickle.dump(interpolated_faces, file)
# Load the object from disk
with open('my_object.pickle', 'rb') as file:
interpolated_faces = pickle.load(file)
for interpolated_face_image in interpolated_faces:
offscreen_interpolated_canvas = matrix.CreateFrameCanvas()
offscreen_interpolated_canvas.SetImage(interpolated_face_image, unsafe=False)
blinkFrameCanvases.append(offscreen_interpolated_canvas)
endT = curr_time = round(time.time()*1000)
print("populating matrices for each blink frame took: " + str(endT - startT) + " ms")
# Store the object to disk
with open('my_object.pickle', 'wb') as file:
pickle.dump(interpolated_faces, file)
proot_state = ProotState()
proot_state.set_matrix(matrix)
proot_state.set_blinks_frames_ready(True)
proot_state.blink()
@ -160,10 +171,18 @@ def on_message(client, userdata, message):
+ message.topic + "' with QoS " + str(message.qos)) + message.topic + "' with QoS " + str(message.qos))
bean_number = str(message.payload)[12:13] bean_number = str(message.payload)[12:13]
bean_state = str(message.payload)[6:10] bean_state = str(message.payload)[6:10]
print("pin number: " + bean_number + " pin state: " + bean_state) print("pin number: " + bean_number + " pin state: " + bean_state)
proot_state = ProotState()
proot_state.blink() proot_state = StateSingleton()
proot_state.set_bean(int(bean_number), bean_state) if not proot_state.get_animations_ready():
print("animation not yet ready.")
return
if bean_state == "rose":
proot_state.set_desired_expression(0)
elif bean_state == "fell":
proot_state.set_desired_expression(bean_number)
# MQTT broker configuration # MQTT broker configuration
broker_address = "10.1.13.173" # Replace with your MQTT broker's address broker_address = "10.1.13.173" # Replace with your MQTT broker's address