import yaml import paho.mqtt.client as mqtt import threading from Point2D import divide_points_into_groups, generate_image_from_point_array, generate_point_array_from_image, interpolate_point_pairs, mirror_points, pair_groups, pair_points from PIL import Image import time import random from rgbmatrix import RGBMatrix, RGBMatrixOptions # Configuration for the matrix screens options = RGBMatrixOptions() options.rows = 32 options.cols = 64 options.chain_length = 2 options.parallel = 1 options.hardware_mapping = 'regular' matrix = RGBMatrix(options=options) class DuplicateTriggerError(Exception): """Custom exception for duplicated triggers.""" def load_animations(yaml_file): try: with open(yaml_file, 'r') as file: animations_data = yaml.safe_load(file) animations = animations_data.get('animations', []) # Create a dictionary to store animations by trigger trigger_animations = {} # Iterate through animations and modify graphics as needed for animation in animations: trigger = animation.get('trigger') # Check if the trigger is already in the dictionary if trigger in trigger_animations: trigger_animations[trigger].append(animation.get('name', 'unnamed animation')) else: trigger_animations[trigger] = [animation.get('name', 'unnamed animation')] graphics = animation.get('graphics', []) for graphic in graphics: if graphic.get('type') == 'transition': to_file = graphic.get('to_file') point_array = generate_point_array_from_image(Image.open(to_file)) graphic['to_point_array'] = point_array # Store the point array elif graphic.get('type') == 'image' or graphic.get('type') == 'animation': # Check if the 'source' is set to 'current' if graphic.get('source') != 'current': graphic['source_file'] = graphic.get('source_file') source_file = graphic.get('source_file') point_array = generate_point_array_from_image(Image.open(source_file)) graphic['point_array'] = point_array # Store the point array # Check for duplicated triggers duplicated_triggers = [trigger for trigger, animations in trigger_animations.items() if len(animations) > 1] if duplicated_triggers: error_message = "Duplicated trigger(s) found:\n" for trigger in duplicated_triggers: error_message += f"Trigger: {trigger}, Animations: {', '.join(trigger_animations[trigger])}\n" raise DuplicateTriggerError(error_message) return animations except DuplicateTriggerError as e: print(e) # Print the error message with duplicated triggers raise e except Exception as e: print(f"Error parsing YAML file: {e}") return [] # Function to simulate playing an animation def play_animation(animation): print(f"Playing animation: {animation['name']}") # Decision-Making Layer class DecisionLayer: def __init__(self, animations, rendering_layer): self.animations = animations self.rendering_layer = rendering_layer def get_animation_by_trigger(self, trigger): """ Get an animation from the list of parsed animations based on the specified trigger. Args: animations (list): List of parsed animations. trigger (str): The trigger to match. Returns: dict: The animation dictionary matching the trigger, or None if not found. """ for animation in self.animations: if animation.get('trigger') == trigger: return animation return None # Return None if no animation with the specified trigger is found def handle_trigger(self, source, payload): trigger = None if source == "boot": trigger = "boot" elif source == "blinkTimer": if len(self.rendering_layer.animation_queue) == 0: trigger = "blinkTimer" elif source == "mqtt": print("Received message '" + str(payload)) if len(str(payload)) < 17: print("received massage too short to be valid") sensor_type = str(payload)[2:6] sensor_specifier = str(payload)[7:11] sensor_state = str(payload)[12:16] if sensor_type == "Bean": if sensor_state == "0000": print("Bean ID:" + sensor_specifier + " Fell.") elif sensor_state == "0001": print("Bean ID:" + sensor_specifier + " Rose.") else: print("Bean ID:" + sensor_specifier + " in illegal state.") elif sensor_type == "Butn": if sensor_state == "0000": print("Button ID:" + sensor_specifier + " Fell.") elif sensor_state == "0001": print("Button ID:" + sensor_specifier + " Rose.") else: print("Received illegal state: " + sensor_state + " for Button ID:" + sensor_specifier) elif sensor_type == "Move": if sensor_specifier == "000X": print("Movement in X axis: " + sensor_state) elif sensor_specifier == "000Y": print("Movement in Y axis: " + sensor_state) elif sensor_specifier == "000Z": print("Movement in Z axis: " + sensor_state) else: print("Received illegal movement axis.") elif sensor_type == "Rott": if sensor_specifier == "000X": print("Rotation in X axis: " + sensor_state) elif sensor_specifier == "000Y": print("Rotation in Y axis: " + sensor_state) elif sensor_specifier == "000Z": print("Rotation in Z axis: " + sensor_state) else: print("Received illegal Rotation axis.") elif sensor_type == "Gest": if sensor_specifier == "XXXX": print("Gesture received: " + sensor_state) else: print("Received illegal gesture") else: print("received illegal sensor type: " + sensor_type) trigger = str(payload)[2:16] # Implement logic to decide which animation to play based on the trigger and payload for animation in self.animations: if animation.get('trigger') == trigger: self.rendering_layer.play_animation(animation) class RenderingLayer: def __init__(self, animations, frame_rate=40): self.animations = animations self.current_point_array = [] self.current_animation = {} self.current_animation_action = {} self.frame_rate = frame_rate # Set the desired frame rate self.frame_duration = 1.0 / frame_rate # Calculate the frame duration self.animation_queue = [] # Initialize the animation queue self.previous_to = [] self.previous_point_pairs_groups = [] def play_animation_by_name(self, animation_name): for animation in self.animations: if animation.get('name') == animation_name: self.play_animation(animation) def play_animation(self, animation): self.current_animation = animation if len(self.animation_queue) > 0: print("Stopping current animation...") # Replace the currently playing animation with the new one self.animation_queue = self.generate_animation_queue(animation) # Add frames to the queue def append_animation(self, animation): self.animation_queue = self.animation_queue + self.generate_animation_queue(animation) # Add frames to the queue def generate_animation_queue(self, animation): animation_queue = [] graphics = animation.get('graphics', []) for graphic in graphics: if graphic.get('type') == 'image': point_array = graphic.get('point_array') duration = graphic.get('duration', 1) # Default duration is 1 frame if not specified # Add frames to the queue based on the specified duration for _ in range(int(duration)): animation_queue.append({'type': graphic.get('type'), 'point_array': point_array}) if graphic.get('type') == 'transition': to_file_name = graphic.get('to_file') to_point_array = graphic.get('to_point_array') duration = graphic.get('duration', 1) # Default duration is 1 frame if not specified # Add frames to the queue based on the specified duration for i in range(int(duration)): animation_queue.append({'type': graphic.get('type'), 'to_point_array': to_point_array, 'to_file_name': to_file_name, 'additionalStepPercentage' : (1/(int(duration)-i)), 'stepPercentage' : ((i+1)/duration)}) return animation_queue def start_rendering(self): frameCount = 0 new_image = Image.new("RGB", (128, 32), "black") devisiontime = 0 pairgrouptime = 0 pairpointtime = 0 imagingtime = 0 interpolatetime = 0 transitionFrameCount = 1 offscreen_canvas = matrix.CreateFrameCanvas() while True: start_time = time.time() # Get the current time before rendering if len(self.animation_queue) > 0: current_animation_action = self.animation_queue.pop(0) #print("update action is: " + current_animation_action.get('type')) # Render the next frame in the queue if current_animation_action.get('type') == "image": new_image = generate_image_from_point_array(current_animation_action.get('point_array'), 128, 32) self.current_point_array = current_animation_action.get('point_array') #print("image generated") elif current_animation_action.get('type') == "transition": frame_time_start = time.time() if self.previous_to == str(current_animation_action.get('to_file_name')): point_pairs_groups = self.previous_point_pairs_groups new_point_array = [] for point_pairs in point_pairs_groups: interpolatetime_start = time.time() new_point_array += interpolate_point_pairs(point_pairs, current_animation_action.get('stepPercentage')) print("interpolationg took: " + str(time.time() - interpolatetime_start) + " sec. (cached)") print("step percentate for this transition step is: " + str(current_animation_action.get('stepPercentage'))) else: print("starting transition generation") transitionFrameCount += 1 divtime_start = time.time() groupsa = divide_points_into_groups(self.current_point_array) groupsb = divide_points_into_groups(current_animation_action.get('to_point_array')) devisiontime += time.time() - divtime_start print("groups divided, len(groupsa), len(groups(b)=", len(groupsa), len(groupsb)) pairgrouptime_start = time.time() paired_groups = pair_groups(groupsa, groupsb) pairgrouptime += time.time() - pairgrouptime_start print("paired_groups generated, len(paired_groups)=", len(paired_groups)) self.previous_point_pairs_groups = [] new_point_array = [] for pair in paired_groups: pairpointtime_start = time.time() point_pairs = pair_points(pair[0], pair[1]) self.previous_point_pairs_groups.append(point_pairs) pairpointtime += time.time() - pairpointtime_start print("step percentate for this transition stepp is: " + str(current_animation_action.get('additionalStepPercentage'))) interpolatetime_start = time.time() new_point_array += interpolate_point_pairs(point_pairs, current_animation_action.get('additionalStepPercentage')) print("interpolationg took: " + str(time.time() - interpolatetime_start) + " sec.") print("face feature interpolated len(new_point_array) =", len(new_point_array)) # set previous "to" screen. This is used self.previous_to = str(current_animation_action.get('to_file_name')) imagingtime_start = time.time() new_image = generate_image_from_point_array(new_point_array + mirror_points(new_point_array), 128, 32) #new_image.save("output/frameNumber"+str(frameCount)+".png") imagingtime += time.time() - imagingtime_start self.current_point_array = new_point_array print("transition generated, len(new_point_array) =", len(new_point_array)) print("creating transition frame took: " + str(time.time() - frame_time_start) + " sec.") print("================== end frame ==================") else: print("unknown action: ", current_animation_action) #print("setting image to canvas") offscreen_canvas.SetImage(new_image, unsafe=False) #print("pushing image to matrix") matrix.SwapOnVSync(offscreen_canvas) #print("pushing image done") # Save the image to a file with the desired format and file name # new_image.save("output/frameNumber"+str(frameCount)+".png") else: #print("buffer empty") if self.current_animation.get('repeat', False): #print("repeating") self.play_animation(self.current_animation) frameCount += 1 # new_image.save("output/frameNumber"+str(frameCount)+".png") elapsed_time = time.time() - start_time # Calculate time elapsed during rendering # Calculate the time to sleep to achieve the desired frame rate sleep_time = self.frame_duration - elapsed_time print("remaining time in frame: " + str(sleep_time)) if sleep_time > 0: time.sleep(sleep_time) # print("average time cost per part for transition frames:") # print("devisiontime :" + str(devisiontime /transitionFrameCount )) # print("pairgrouptime :" + str(pairgrouptime /transitionFrameCount)) # print("pairpointtime :" + str(pairpointtime /transitionFrameCount)) # print("interpolatetime :" + str(interpolatetime /transitionFrameCount)) # print("imagingtime :" + str(imagingtime /transitionFrameCount)) devisiontime = 0 pairgrouptime = 0 pairpointtime = 0 imagingtime = 0 transitionFrameCount = 0 # Function responsible for the blinking behaviour when Idle def random_blinks(): while True: time.sleep(random.randint(5, 7)) decision_layer.handle_trigger("blinkTimer", "") # functions called by the MQTT listener def on_connect(client, userdata, flags, response_code): print("Connected to MQTT broker with result code " + str(response_code)) client.subscribe("test") # Function to handle MQTT message reception def on_message(client, userdata, message): # Pass the received message to the decision-making layer decision_layer.handle_trigger("mqtt", message.payload) def main(): yaml_file = 'testAnimationYaml.yaml' # Replace with the path to your YAML file # Parse the YAML file to get animations animations = load_animations(yaml_file) # MQTT broker configuration broker_address = "localhost" broker_port = 1883 broker_keepalive = 60 mqtt_client = mqtt.Client() mqtt_client.on_connect = on_connect mqtt_client.on_message = on_message mqtt_client.connect(broker_address, broker_port, broker_keepalive) mqtt_client.loop_start() # Initialize the rendering layer rendering_layer = RenderingLayer(animations, frame_rate=40) rendering_thread = threading.Thread(target=rendering_layer.start_rendering) rendering_thread.start() # Initialize the decision-making layer global decision_layer decision_layer = DecisionLayer(animations, rendering_layer) # Create and start random blinks interrupts # screen_update_thread = threading.Thread(target=random_blinks) # screen_update_thread.start() decision_layer.handle_trigger("boot", "") if __name__ == "__main__": main()