diff --git a/rpi/Point2D.py b/rpi/Point2D.py index 6975ab4..50fdd2d 100644 --- a/rpi/Point2D.py +++ b/rpi/Point2D.py @@ -76,8 +76,10 @@ def generate_point_array_from_image(image): cached_point_arrays = load_cached_point_arrays() if image_hash in cached_point_arrays: + print("Found existing point array matching png: " + image.filename + " using existing array.") return [Point2D(point["x"], point["y"], tuple(point["color"])) for point in cached_point_arrays[image_hash]] + print("No existing point array matching png: " + image.filename + " found. Generating now.") width, height = image.size pixel_array = [] @@ -91,6 +93,7 @@ def generate_point_array_from_image(image): cached_point_arrays[image_hash] = pixel_array save_cached_point_arrays(cached_point_arrays) + print("Point array generated and stored.") return [Point2D(point["x"], point["y"], tuple(point["color"])) for point in pixel_array] diff --git a/rpi/State.py b/rpi/State.py new file mode 100644 index 0000000..900ddf2 --- /dev/null +++ b/rpi/State.py @@ -0,0 +1,111 @@ +import time + + +class StateSingleton: + _instance = None + + def __new__(cls): + if not cls._instance: + cls._instance = super(StateSingleton, cls).__new__(cls) + return cls._instance + + def __init__(self): + self.states = { + 0: "open", + 1: "blink", + 2: "owo", + 3: "angry" + } + self.matrix = False + self.current_expression = self.states[0] + self.desired_expression = self.states[0] + self.new_desired_expression = self.states[0] + self.transition_steps = 10 # Number of steps to transition between states + self.transition_count = 0 # Counter to keep track of transition progress + self.animations_ready = False + self.blink_animation_FrameCanvases = [] + self.angry_animation_FrameCanvases = [] + + def set_blink_animation_frames(self, blink_animation_FrameCanvases): + self.blink_animation_FrameCanvases = blink_animation_FrameCanvases + + def set_angry_animation_frames(self, angry_animation_FrameCanvases): + self.angry_animation_FrameCanvases = angry_animation_FrameCanvases + + def set_desired_expression(self, state): + if state in self.states: + self.desired_expression = self.states[state] + self.new_desired_expression = state + else: + print("Invalid state.") + + + def update_step(self): + if self.current_expression == self.desired_expression: # already at desire expression + return + + if self.current_expression == self.states[0]: # Transition from "open" state to another state + if self.transition_count < self.transition_steps: # still transitioning + self.transition_count += 1 + return + + self.current_expression = self.desired_expression + self.transition_count = 10 + return + + else: # Transition from another state to "open" state + if 0 < self.transition_count: # still transitioning + self.transition_count -= 1 + return + self.current_expression = self.states[0] # Complete transition to "open" state + self.transition_count = 0 + + def draw_face(self): + #default face + if self.current_expression == self.desired_expression == self.states[0]: + self.matrix.SwapOnVSync(self.blink_animation_FrameCanvases[0]) + + # blinky faces + elif self.current_expression == self.desired_expression == self.states[1]: + self.matrix.SwapOnVSync(self.blink_animation_FrameCanvases[10]) + + elif self.current_expression == self.states[1] or self.desired_expression == self.states[1]: + self.matrix.SwapOnVSync(self.blink_animation_FrameCanvases[self.transition_count]) + + # angry faces + elif self.current_expression == self.desired_expression == self.states[3]: + self.matrix.SwapOnVSync(self.angry_animation_FrameCanvases[10]) + + elif self.current_expression == self.states[3] or self.desired_expression == self.states[3]: + self.matrix.SwapOnVSync(self.angry_animation_FrameCanvases[self.transition_count]) + + + def update(self): + self.update_step(self) + print("at step: " + self.transition_count + " from default in expression: " + self.desired_expression) + self.draw_face() + + + def get_current_state(self): + for state, value in self.states.items(): + if value == self.current_state: + return state + + def get_animations_ready(self): + return self.animations_ready + + def set_animations_ready(self, state): + self.animations_ready = state + + def set_matrix(self, matrix): + self.matrix = matrix + + +# Example usage: +# singleton = StateSingleton() +# singleton.set_desired_expression(3) + +#for _ in range(100): +# singleton.update() +# time.sleep(0.1) + diff --git a/rpi/antRender.py b/rpi/antRender.py index c52470b..d8075dc 100644 --- a/rpi/antRender.py +++ b/rpi/antRender.py @@ -1,6 +1,6 @@ from rgbmatrix import RGBMatrix, RGBMatrixOptions from Point2D import interpolate_point_pairs, mirror_points, generate_image_from_point_array, generate_point_array_from_image, pair_points -from ProotState import ProotState +from State import StateSingleton import time import random @@ -23,21 +23,99 @@ options.chain_length = 2 options.parallel = 1 options.hardware_mapping = 'regular' # If you have an Adafruit HAT: 'adafruit-hat' matrix = RGBMatrix(options=options) +prootState = StateSingleton() +prootState.set_matrix(matrix) endT = curr_time = round(time.time()*1000) print("configuring matrix took: " + str(endT - startT) + " ms") -# array to hold the blink animation frames -blinkFrameCanvases = [] +def generate_eye_frames(emote_eye_png): + eye_frames = [] + points_left_eye_open = generate_point_array_from_image(Image.open("faces/eyeLeftOpen.png")) + if emote_eye_png != "faces/eyeLeftOpen.png": + left_eye_pairs = pair_points(points_left_eye_open, generate_point_array_from_image(Image.open(emote_eye_png))) + for i in range(11): + eye_frames.append(interpolate_point_pairs(left_eye_pairs, i/10)) + else: + for i in range(11): + eye_frames.append(points_left_eye_open) + return eye_frames + + +def generate_mouth_frames(emote_mouth_png): + mouth_frames = [] + points_left_mouth = generate_point_array_from_image(Image.open("faces/mouthLeft.png")) + if emote_mouth_png != "faces/mouthLeft.png": + left_mouth_pairs = pair_points(points_left_mouth, generate_point_array_from_image(Image.open(emote_mouth_png))) + for i in range(11): + mouth_frames.append(interpolate_point_pairs(left_mouth_pairs, i/10)) + else: + for i in range(11): + mouth_frames.append(points_left_mouth) + return mouth_frames + + +def generate_nose_frames(emote_nose_png): + nose_frames = [] + points_left_nose = generate_point_array_from_image(Image.open("faces/noseLeft.png")) + if emote_nose_png != "faces/noseLeft.png": + left_nose_pairs = pair_points(points_left_nose, generate_point_array_from_image(Image.open(emote_nose_png))) + for i in range(11): + nose_frames.append(interpolate_point_pairs(left_nose_pairs, i/10)) + else: + for i in range(11): + nose_frames.append(points_left_nose) + return nose_frames + + +def generate_face_frames(emote_eye_png, emote_mouth_png, emote_nose_png): + eye_frames = generate_eye_frames(emote_eye_png) + mouth_frames = generate_mouth_frames(emote_mouth_png) + nose_frames = generate_nose_frames(emote_nose_png) + face_frames = [] + + for frame_number in range(11): + eyes = eye_frames[frame_number] + mirror_points(eye_frames[frame_number]) + mouth = mouth_frames[frame_number] + mirror_points(mouth_frames[frame_number]) + nose = nose_frames[frame_number] + mirror_points(nose_frames[frame_number]) + face = eyes + mouth + nose + + face_image = generate_image_from_point_array(face, 128, 32) + + offscreen_canvas = matrix.CreateFrameCanvas() + offscreen_canvas.SetImage(face_image, unsafe=False) + + face_frames.append(offscreen_canvas) + + return face_frames + +def animate(): + blink_animation_FrameCanvases = [] + angry_animation_FrameCanvases = [] + + for emote_FrameCanvasses, emote_eye_png, emote_mouth_png, emote_nose_png in [ + (blink_animation_FrameCanvases, "faces/eyeLeftClosed.png", "faces/mouthLeft.png", "faces/noseLeft.png"), + (angry_animation_FrameCanvases, "faces/eyeLeftAngry.png", "faces/mouthLeft.png", "faces/noseLeft.png") + ]: + face_frames = generate_face_frames(emote_eye_png, emote_mouth_png, emote_nose_png) + emote_FrameCanvasses.extend(face_frames) + + state = StateSingleton() + state.set_blink_animation_frames(blink_animation_FrameCanvases) + state.set_angry_animation_frames(angry_animation_FrameCanvases) + + state.set_animations_ready(True) + +animate() + def interrupt_timer(): - global blinkFrameCanvases, matrix - proot_state = ProotState() + proot_state = StateSingleton() while True: - proot_state.update_screen(blinkFrameCanvases, matrix) + proot_state.update() time.sleep(0.01) @@ -45,10 +123,13 @@ def random_blinks(): while True: time.sleep(random.randint(3, 5)) - proot_state = ProotState() + proot_state = StateSingleton() - if proot_state.get_blinks_frames_ready(): - proot_state.blink() + if proot_state.get_animations_ready(): + proot_state.set_desired_expression(1) + time.sleep(random.randint(0.1)) + proot_state.set_desired_expression(0) + # Create and start screen update interrupts @@ -60,91 +141,21 @@ screen_update_thread = threading.Thread(target=random_blinks) screen_update_thread.start() -print("start loading images") -startT = curr_time = round(time.time()*1000) -# Loading all images -# TODO looking into storing and loading lists of points -image_left_eye_open = Image.open("faces/eyeLeftOpen.png") -image_left_eye_closed = Image.open("faces/eyeLeftClosed.png") -image_left_nose = Image.open("faces/noseLeft.png") -image_left_mouth = Image.open("faces/mouthLeft.png") - -endT = curr_time = round(time.time()*1000) -print("loading images took: " + str(endT - startT) + " ms") - - - -print("start generating pixel array") -startT = curr_time = round(time.time()*1000) - -# generate pixel arrays from each image -# TODO ^ storing and loading lists of points will take away this step. (it will require a dedicated script to precompute these) -points_left_eye_open = generate_point_array_from_image(image_left_eye_open) -points_left_eye_closed = generate_point_array_from_image(image_left_eye_closed) -points_left_nose = generate_point_array_from_image(image_left_nose) -points_left_mouth = generate_point_array_from_image(image_left_mouth) - -endT = curr_time = round(time.time()*1000) -print("generating pixel array took: " + str(endT - startT) + " ms") - - - -print("start pairing points for one eye") -startT = curr_time = round(time.time()*1000) - -#calculate the point pairs between the open and closed left eye -# TODO look into precomputing and storing these animations before runtime -left_eye_blink_pairs = pair_points(points_left_eye_open, points_left_eye_closed) - -endT = curr_time = round(time.time()*1000) -print("pairing points for one eye took: " + str(endT - startT) + " ms") - - - -print("start populating matrices for each blink frame") -startT = curr_time = round(time.time()*1000) - -# -## TODO look into the possibility of precomputing and more importantly storing the matrix objects -#for alpha in range(0,11): +## Load the object from disk +#with open('my_object.pickle', 'rb') as file: +# interpolated_faces = pickle.load(file) +# +#for interpolated_face_image in interpolated_faces: # offscreen_interpolated_canvas = matrix.CreateFrameCanvas() -# -# left_eye = interpolate_point_pairs(left_eye_blink_pairs, alpha/10) -# right_eye = mirror_points(left_eye) -# nose = points_left_nose + mirror_points(points_left_nose) -# mouth = points_left_mouth + mirror_points(points_left_mouth) -# face = left_eye + right_eye + nose + mouth -# -# interpolated_face_image = generate_image_from_point_array(face, 128, 32) -# interpolated_faces.append(interpolated_face_image) -# # offscreen_interpolated_canvas.SetImage(interpolated_face_image, unsafe=False) -# blinkFrameCanvases.append(offscreen_interpolated_canvas) +# blink_animation_FrameCanvases.append(offscreen_interpolated_canvas) # +# +## Store the object to disk +#with open('my_object.pickle', 'wb') as file: +# pickle.dump(interpolated_faces, file) - -# Load the object from disk -with open('my_object.pickle', 'rb') as file: - interpolated_faces = pickle.load(file) - -for interpolated_face_image in interpolated_faces: - offscreen_interpolated_canvas = matrix.CreateFrameCanvas() - offscreen_interpolated_canvas.SetImage(interpolated_face_image, unsafe=False) - blinkFrameCanvases.append(offscreen_interpolated_canvas) - - -endT = curr_time = round(time.time()*1000) -print("populating matrices for each blink frame took: " + str(endT - startT) + " ms") - -# Store the object to disk -with open('my_object.pickle', 'wb') as file: - pickle.dump(interpolated_faces, file) - -proot_state = ProotState() -proot_state.set_matrix(matrix) -proot_state.set_blinks_frames_ready(True) -proot_state.blink() @@ -160,10 +171,18 @@ def on_message(client, userdata, message): + message.topic + "' with QoS " + str(message.qos)) bean_number = str(message.payload)[12:13] bean_state = str(message.payload)[6:10] + print("pin number: " + bean_number + " pin state: " + bean_state) - proot_state = ProotState() - proot_state.blink() - proot_state.set_bean(int(bean_number), bean_state) + + proot_state = StateSingleton() + if not proot_state.get_animations_ready(): + print("animation not yet ready.") + return + if bean_state == "rose": + proot_state.set_desired_expression(0) + elif bean_state == "fell": + proot_state.set_desired_expression(bean_number) + # MQTT broker configuration broker_address = "10.1.13.173" # Replace with your MQTT broker's address