CiscoTheProot/yaml parse test/prootOS.py

422 lines
18 KiB
Python

import yaml
import paho.mqtt.client as mqtt
import threading
from Point2D import divide_points_into_groups, generate_image_from_point_array, generate_point_array_from_image, interpolate_point_pairs, mirror_points, pair_groups, pair_points
from PIL import Image
import time
import random
from rgbmatrix import RGBMatrix, RGBMatrixOptions
# Configuration for the matrix screens
options = RGBMatrixOptions()
options.rows = 32
options.cols = 64
options.chain_length = 2
options.parallel = 1
options.hardware_mapping = 'regular'
matrix = RGBMatrix(options=options)
class DuplicateTriggerError(Exception):
"""Custom exception for duplicated triggers."""
def load_animations(yaml_file):
try:
with open(yaml_file, 'r') as file:
animations_data = yaml.safe_load(file)
animations = animations_data.get('animations', [])
# Create a dictionary to store animations by trigger
trigger_animations = {}
# Iterate through animations and modify graphics as needed
for animation in animations:
trigger = animation.get('trigger')
# Check if the trigger is already in the dictionary
if trigger in trigger_animations:
trigger_animations[trigger].append(animation.get('name', 'unnamed animation'))
else:
trigger_animations[trigger] = [animation.get('name', 'unnamed animation')]
graphics = animation.get('graphics', [])
for graphic in graphics:
if graphic.get('type') == 'transition':
to_file = graphic.get('to_file')
point_array = generate_point_array_from_image(Image.open(to_file))
graphic['to_point_array'] = point_array # Store the point array
elif graphic.get('type') == 'image' or graphic.get('type') == 'animation':
# Check if the 'source' is set to 'current'
if graphic.get('source') != 'current':
graphic['source_file'] = graphic.get('source_file')
source_file = graphic.get('source_file')
point_array = generate_point_array_from_image(Image.open(source_file))
graphic['point_array'] = point_array # Store the point array
# Check for duplicated triggers
duplicated_triggers = [trigger for trigger, animations in trigger_animations.items() if len(animations) > 1]
if duplicated_triggers:
error_message = "Duplicated trigger(s) found:\n"
for trigger in duplicated_triggers:
error_message += f"Trigger: {trigger}, Animations: {', '.join(trigger_animations[trigger])}\n"
raise DuplicateTriggerError(error_message)
return animations
except DuplicateTriggerError as e:
print(e) # Print the error message with duplicated triggers
raise e
except Exception as e:
print(f"Error parsing YAML file: {e}")
return []
# Function to simulate playing an animation
def play_animation(animation):
print(f"Playing animation: {animation['name']}")
# Decision-Making Layer
class DecisionLayer:
def __init__(self, animations, rendering_layer):
self.animations = animations
self.rendering_layer = rendering_layer
def get_animation_by_trigger(self, trigger):
"""
Get an animation from the list of parsed animations based on the specified trigger.
Args:
animations (list): List of parsed animations.
trigger (str): The trigger to match.
Returns:
dict: The animation dictionary matching the trigger, or None if not found.
"""
for animation in self.animations:
if animation.get('trigger') == trigger:
return animation
return None # Return None if no animation with the specified trigger is found
def handle_trigger(self, source, payload):
trigger = None
if source == "boot":
trigger = "boot"
elif source == "blinkTimer":
if len(self.rendering_layer.animation_queue) == 0:
trigger = "blinkTimer"
elif source == "mqtt":
print("Received message '" + str(payload))
if len(str(payload)) < 17:
print("received massage too short to be valid")
sensor_type = str(payload)[2:6]
sensor_specifier = str(payload)[7:11]
sensor_state = str(payload)[12:16]
if sensor_type == "Bean":
if sensor_state == "0000":
print("Bean ID:" + sensor_specifier + " Fell.")
elif sensor_state == "0001":
print("Bean ID:" + sensor_specifier + " Rose.")
else:
print("Bean ID:" + sensor_specifier + " in illegal state.")
elif sensor_type == "Butn":
if sensor_state == "0000":
print("Button ID:" + sensor_specifier + " Fell.")
elif sensor_state == "0001":
print("Button ID:" + sensor_specifier + " Rose.")
else:
print("Received illegal state: " + sensor_state + " for Button ID:" + sensor_specifier)
elif sensor_type == "Move":
if sensor_specifier == "000X":
print("Movement in X axis: " + sensor_state)
elif sensor_specifier == "000Y":
print("Movement in Y axis: " + sensor_state)
elif sensor_specifier == "000Z":
print("Movement in Z axis: " + sensor_state)
else:
print("Received illegal movement axis.")
elif sensor_type == "Rott":
if sensor_specifier == "000X":
print("Rotation in X axis: " + sensor_state)
elif sensor_specifier == "000Y":
print("Rotation in Y axis: " + sensor_state)
elif sensor_specifier == "000Z":
print("Rotation in Z axis: " + sensor_state)
else:
print("Received illegal Rotation axis.")
elif sensor_type == "Gest":
if sensor_specifier == "XXXX":
print("Gesture received: " + sensor_state)
else:
print("Received illegal gesture")
else:
print("received illegal sensor type: " + sensor_type)
trigger = str(payload)[2:16]
# Implement logic to decide which animation to play based on the trigger and payload
for animation in self.animations:
if animation.get('trigger') == trigger:
self.rendering_layer.play_animation(animation)
class RenderingLayer:
def __init__(self, animations, frame_rate=40):
self.animations = animations
self.current_point_array = []
self.current_animation_action = {}
self.frame_rate = frame_rate # Set the desired frame rate
self.frame_duration = 1.0 / frame_rate # Calculate the frame duration
self.animation_queue = [] # Initialize the animation queue
self.previous_to = []
self.previous_point_pairs_groups = []
def play_animation_by_name(self, animation_name):
for animation in self.animations:
if animation.get('name') == animation_name:
self.play_animation(animation)
def play_animation(self, animation):
if len(self.animation_queue) > 0:
print("Stopping current animation...")
# Replace the currently playing animation with the new one
self.animation_queue = self.generate_animation_queue(animation) # Add frames to the queue
def append_animation(self, animation):
self.animation_queue = self.animation_queue + self.generate_animation_queue(animation) # Add frames to the queue
def generate_animation_queue(self, animation):
animation_queue = []
graphics = animation.get('graphics', [])
for graphic in graphics:
if graphic.get('type') == 'image':
point_array = graphic.get('point_array')
duration = graphic.get('duration', 1) # Default duration is 1 frame if not specified
# Add frames to the queue based on the specified duration
for _ in range(int(duration)):
animation_queue.append({'type': graphic.get('type'), 'point_array': point_array})
if graphic.get('type') == 'transition':
to_file_name = graphic.get('to_file')
to_point_array = graphic.get('to_point_array')
duration = graphic.get('duration', 1) # Default duration is 1 frame if not specified
# Add frames to the queue based on the specified duration
for i in range(int(duration)):
animation_queue.append({'type': graphic.get('type'), 'to_point_array': to_point_array, 'to_file_name': to_file_name, 'additionalStepPercentage' : (1/(int(duration)-i)), 'stepPercentage' : ((i+1)/duration)})
return animation_queue
def start_rendering(self):
frameCount = 0
new_image = Image.new("RGB", (128, 32), "black")
devisiontime = 0
pairgrouptime = 0
pairpointtime = 0
imagingtime = 0
interpolatetime = 0
transitionFrameCount = 1
while True:
start_time = time.time() # Get the current time before rendering
if len(self.animation_queue) > 0:
current_animation_action = self.animation_queue.pop(0)
print("update action is: " + current_animation_action.get('type'))
# Render the next frame in the queue
if current_animation_action.get('type') == "image":
new_image = generate_image_from_point_array(current_animation_action.get('point_array'), 128, 32)
self.current_point_array = current_animation_action.get('point_array')
print("image generated")
elif current_animation_action.get('type') == "transition":
frame_time_start = time.time()
if self.previous_to == str(current_animation_action.get('to_file_name')):
point_pairs_groups = self.previous_point_pairs_groups
new_point_array = []
for point_pairs in point_pairs_groups:
interpolatetime_start = time.time()
new_point_array += interpolate_point_pairs(point_pairs, current_animation_action.get('stepPercentage'))
print("interpolationg took: " + str(time.time() - interpolatetime_start) + " sec. (cached)")
print("step percentate for this transition step is: " + str(current_animation_action.get('stepPercentage')))
else:
print("starting transition generation")
transitionFrameCount += 1
divtime_start = time.time()
groupsa = divide_points_into_groups(self.current_point_array)
groupsb = divide_points_into_groups(current_animation_action.get('to_point_array'))
devisiontime += time.time() - divtime_start
print("groups divided, len(groupsa), len(groups(b)=", len(groupsa), len(groupsb))
pairgrouptime_start = time.time()
paired_groups = pair_groups(groupsa, groupsb)
pairgrouptime += time.time() - pairgrouptime_start
print("paired_groups generated, len(paired_groups)=", len(paired_groups))
self.previous_point_pairs_groups = []
new_point_array = []
for pair in paired_groups:
pairpointtime_start = time.time()
point_pairs = pair_points(pair[0], pair[1])
self.previous_point_pairs_groups.append(point_pairs)
pairpointtime += time.time() - pairpointtime_start
print("step percentate for this transition stepp is: " + str(current_animation_action.get('additionalStepPercentage')))
interpolatetime_start = time.time()
new_point_array += interpolate_point_pairs(point_pairs, current_animation_action.get('additionalStepPercentage'))
print("interpolationg took: " + str(time.time() - interpolatetime_start) + " sec.")
print("face feature interpolated len(new_point_array) =", len(new_point_array))
# set previous "to" screen. This is used
self.previous_to = str(current_animation_action.get('to_file_name'))
imagingtime_start = time.time()
new_image = generate_image_from_point_array(new_point_array + mirror_points(new_point_array), 128, 32)
#new_image.save("output/frameNumber"+str(frameCount)+".png")
imagingtime += time.time() - imagingtime_start
self.current_point_array = new_point_array
print("transition generated, len(new_point_array) =", len(new_point_array))
print("creating transition frame took: " + str(time.time() - frame_time_start) + " sec.")
print("================== end frame ==================")
else:
print("unknown action: ", current_animation_action)
print("setting image to canvas")
offscreen_canvas = matrix.CreateFrameCanvas()
offscreen_canvas.SetImage(new_image, unsafe=False)
print("pushing image to matrix")
matrix.SwapOnVSync(offscreen_canvas)
print("pushing image done")
# Save the image to a file with the desired format and file name
# new_image.save("output/frameNumber"+str(frameCount)+".png")
frameCount += 1
# new_image.save("output/frameNumber"+str(frameCount)+".png")
elapsed_time = time.time() - start_time # Calculate time elapsed during rendering
# Calculate the time to sleep to achieve the desired frame rate
sleep_time = self.frame_duration - elapsed_time
print("remaining time in frame: " + str(sleep_time))
if sleep_time > 0:
time.sleep(sleep_time)
# print("average time cost per part for transition frames:")
# print("devisiontime :" + str(devisiontime /transitionFrameCount ))
# print("pairgrouptime :" + str(pairgrouptime /transitionFrameCount))
# print("pairpointtime :" + str(pairpointtime /transitionFrameCount))
# print("interpolatetime :" + str(interpolatetime /transitionFrameCount))
# print("imagingtime :" + str(imagingtime /transitionFrameCount))
devisiontime = 0
pairgrouptime = 0
pairpointtime = 0
imagingtime = 0
transitionFrameCount = 0
# Function responsible for the blinking behaviour when Idle
def random_blinks():
while True:
time.sleep(random.randint(5, 7))
decision_layer.handle_trigger("blinkTimer", "")
# functions called by the MQTT listener
def on_connect(client, userdata, flags, response_code):
print("Connected to MQTT broker with result code " + str(response_code))
client.subscribe("test")
# Function to handle MQTT message reception
def on_message(client, userdata, message):
# Pass the received message to the decision-making layer
decision_layer.handle_trigger("mqtt", message.payload)
def main():
yaml_file = 'testAnimationYaml.yaml' # Replace with the path to your YAML file
# Parse the YAML file to get animations
animations = load_animations(yaml_file)
# MQTT broker configuration
broker_address = "localhost"
broker_port = 1883
broker_keepalive = 60
mqtt_client = mqtt.Client()
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(broker_address, broker_port, broker_keepalive)
mqtt_client.loop_start()
# Initialize the rendering layer
rendering_layer = RenderingLayer(animations, frame_rate=40)
rendering_thread = threading.Thread(target=rendering_layer.start_rendering)
rendering_thread.start()
# Initialize the decision-making layer
global decision_layer
decision_layer = DecisionLayer(animations, rendering_layer)
# Create and start random blinks interrupts
# screen_update_thread = threading.Thread(target=random_blinks)
# screen_update_thread.start()
decision_layer.handle_trigger("boot", "")
if __name__ == "__main__":
main()