422 lines
18 KiB
Python
422 lines
18 KiB
Python
import yaml
|
|
import paho.mqtt.client as mqtt
|
|
import threading
|
|
from Point2D import divide_points_into_groups, generate_image_from_point_array, generate_point_array_from_image, interpolate_point_pairs, mirror_points, pair_groups, pair_points
|
|
from PIL import Image
|
|
import time
|
|
import random
|
|
from rgbmatrix import RGBMatrix, RGBMatrixOptions
|
|
|
|
# Configuration for the matrix screens
|
|
options = RGBMatrixOptions()
|
|
options.rows = 32
|
|
options.cols = 64
|
|
options.chain_length = 2
|
|
options.parallel = 1
|
|
options.hardware_mapping = 'regular'
|
|
matrix = RGBMatrix(options=options)
|
|
|
|
class DuplicateTriggerError(Exception):
|
|
"""Custom exception for duplicated triggers."""
|
|
|
|
def load_animations(yaml_file):
|
|
try:
|
|
with open(yaml_file, 'r') as file:
|
|
animations_data = yaml.safe_load(file)
|
|
animations = animations_data.get('animations', [])
|
|
|
|
# Create a dictionary to store animations by trigger
|
|
trigger_animations = {}
|
|
|
|
# Iterate through animations and modify graphics as needed
|
|
for animation in animations:
|
|
trigger = animation.get('trigger')
|
|
|
|
# Check if the trigger is already in the dictionary
|
|
if trigger in trigger_animations:
|
|
trigger_animations[trigger].append(animation.get('name', 'unnamed animation'))
|
|
else:
|
|
trigger_animations[trigger] = [animation.get('name', 'unnamed animation')]
|
|
|
|
graphics = animation.get('graphics', [])
|
|
|
|
for graphic in graphics:
|
|
if graphic.get('type') == 'transition':
|
|
to_file = graphic.get('to_file')
|
|
point_array = generate_point_array_from_image(Image.open(to_file))
|
|
graphic['to_point_array'] = point_array # Store the point array
|
|
|
|
elif graphic.get('type') == 'image' or graphic.get('type') == 'animation':
|
|
# Check if the 'source' is set to 'current'
|
|
if graphic.get('source') != 'current':
|
|
graphic['source_file'] = graphic.get('source_file')
|
|
source_file = graphic.get('source_file')
|
|
point_array = generate_point_array_from_image(Image.open(source_file))
|
|
graphic['point_array'] = point_array # Store the point array
|
|
|
|
# Check for duplicated triggers
|
|
duplicated_triggers = [trigger for trigger, animations in trigger_animations.items() if len(animations) > 1]
|
|
if duplicated_triggers:
|
|
error_message = "Duplicated trigger(s) found:\n"
|
|
for trigger in duplicated_triggers:
|
|
error_message += f"Trigger: {trigger}, Animations: {', '.join(trigger_animations[trigger])}\n"
|
|
raise DuplicateTriggerError(error_message)
|
|
|
|
return animations
|
|
except DuplicateTriggerError as e:
|
|
print(e) # Print the error message with duplicated triggers
|
|
raise e
|
|
except Exception as e:
|
|
print(f"Error parsing YAML file: {e}")
|
|
return []
|
|
|
|
|
|
|
|
|
|
# Function to simulate playing an animation
|
|
def play_animation(animation):
|
|
print(f"Playing animation: {animation['name']}")
|
|
|
|
|
|
# Decision-Making Layer
|
|
class DecisionLayer:
|
|
def __init__(self, animations, rendering_layer):
|
|
self.animations = animations
|
|
self.rendering_layer = rendering_layer
|
|
|
|
def get_animation_by_trigger(self, trigger):
|
|
"""
|
|
Get an animation from the list of parsed animations based on the specified trigger.
|
|
|
|
Args:
|
|
animations (list): List of parsed animations.
|
|
trigger (str): The trigger to match.
|
|
|
|
Returns:
|
|
dict: The animation dictionary matching the trigger, or None if not found.
|
|
"""
|
|
for animation in self.animations:
|
|
if animation.get('trigger') == trigger:
|
|
return animation
|
|
return None # Return None if no animation with the specified trigger is found
|
|
|
|
|
|
def handle_trigger(self, source, payload):
|
|
trigger = None
|
|
if source == "boot":
|
|
trigger = "boot"
|
|
|
|
elif source == "blinkTimer":
|
|
if len(self.rendering_layer.animation_queue) == 0:
|
|
trigger = "blinkTimer"
|
|
|
|
elif source == "mqtt":
|
|
print("Received message '" + str(payload))
|
|
if len(str(payload)) < 17:
|
|
print("received massage too short to be valid")
|
|
sensor_type = str(payload)[2:6]
|
|
sensor_specifier = str(payload)[7:11]
|
|
sensor_state = str(payload)[12:16]
|
|
|
|
if sensor_type == "Bean":
|
|
if sensor_state == "0000":
|
|
print("Bean ID:" + sensor_specifier + " Fell.")
|
|
elif sensor_state == "0001":
|
|
print("Bean ID:" + sensor_specifier + " Rose.")
|
|
else:
|
|
print("Bean ID:" + sensor_specifier + " in illegal state.")
|
|
|
|
elif sensor_type == "Butn":
|
|
if sensor_state == "0000":
|
|
print("Button ID:" + sensor_specifier + " Fell.")
|
|
elif sensor_state == "0001":
|
|
print("Button ID:" + sensor_specifier + " Rose.")
|
|
else:
|
|
print("Received illegal state: " + sensor_state + " for Button ID:" + sensor_specifier)
|
|
|
|
elif sensor_type == "Move":
|
|
if sensor_specifier == "000X":
|
|
print("Movement in X axis: " + sensor_state)
|
|
elif sensor_specifier == "000Y":
|
|
print("Movement in Y axis: " + sensor_state)
|
|
elif sensor_specifier == "000Z":
|
|
print("Movement in Z axis: " + sensor_state)
|
|
else:
|
|
print("Received illegal movement axis.")
|
|
|
|
elif sensor_type == "Rott":
|
|
if sensor_specifier == "000X":
|
|
print("Rotation in X axis: " + sensor_state)
|
|
elif sensor_specifier == "000Y":
|
|
print("Rotation in Y axis: " + sensor_state)
|
|
elif sensor_specifier == "000Z":
|
|
print("Rotation in Z axis: " + sensor_state)
|
|
else:
|
|
print("Received illegal Rotation axis.")
|
|
|
|
elif sensor_type == "Gest":
|
|
if sensor_specifier == "XXXX":
|
|
print("Gesture received: " + sensor_state)
|
|
else:
|
|
print("Received illegal gesture")
|
|
|
|
else:
|
|
print("received illegal sensor type: " + sensor_type)
|
|
|
|
trigger = str(payload)[2:16]
|
|
|
|
# Implement logic to decide which animation to play based on the trigger and payload
|
|
for animation in self.animations:
|
|
if animation.get('trigger') == trigger:
|
|
self.rendering_layer.play_animation(animation)
|
|
|
|
|
|
class RenderingLayer:
|
|
def __init__(self, animations, frame_rate=40):
|
|
self.animations = animations
|
|
self.current_point_array = []
|
|
self.current_animation_action = {}
|
|
self.frame_rate = frame_rate # Set the desired frame rate
|
|
self.frame_duration = 1.0 / frame_rate # Calculate the frame duration
|
|
self.animation_queue = [] # Initialize the animation queue
|
|
|
|
self.previous_to = []
|
|
self.previous_point_pairs_groups = []
|
|
|
|
def play_animation_by_name(self, animation_name):
|
|
for animation in self.animations:
|
|
if animation.get('name') == animation_name:
|
|
self.play_animation(animation)
|
|
|
|
def play_animation(self, animation):
|
|
if len(self.animation_queue) > 0:
|
|
print("Stopping current animation...")
|
|
# Replace the currently playing animation with the new one
|
|
self.animation_queue = self.generate_animation_queue(animation) # Add frames to the queue
|
|
|
|
|
|
def append_animation(self, animation):
|
|
self.animation_queue = self.animation_queue + self.generate_animation_queue(animation) # Add frames to the queue
|
|
|
|
def generate_animation_queue(self, animation):
|
|
animation_queue = []
|
|
graphics = animation.get('graphics', [])
|
|
|
|
for graphic in graphics:
|
|
if graphic.get('type') == 'image':
|
|
point_array = graphic.get('point_array')
|
|
duration = graphic.get('duration', 1) # Default duration is 1 frame if not specified
|
|
|
|
# Add frames to the queue based on the specified duration
|
|
for _ in range(int(duration)):
|
|
animation_queue.append({'type': graphic.get('type'), 'point_array': point_array})
|
|
|
|
if graphic.get('type') == 'transition':
|
|
to_file_name = graphic.get('to_file')
|
|
to_point_array = graphic.get('to_point_array')
|
|
duration = graphic.get('duration', 1) # Default duration is 1 frame if not specified
|
|
|
|
# Add frames to the queue based on the specified duration
|
|
for i in range(int(duration)):
|
|
animation_queue.append({'type': graphic.get('type'), 'to_point_array': to_point_array, 'to_file_name': to_file_name, 'additionalStepPercentage' : (1/(int(duration)-i)), 'stepPercentage' : ((i+1)/duration)})
|
|
|
|
return animation_queue
|
|
|
|
def start_rendering(self):
|
|
frameCount = 0
|
|
new_image = Image.new("RGB", (128, 32), "black")
|
|
|
|
devisiontime = 0
|
|
pairgrouptime = 0
|
|
pairpointtime = 0
|
|
imagingtime = 0
|
|
interpolatetime = 0
|
|
transitionFrameCount = 1
|
|
|
|
|
|
while True:
|
|
|
|
start_time = time.time() # Get the current time before rendering
|
|
|
|
if len(self.animation_queue) > 0:
|
|
current_animation_action = self.animation_queue.pop(0)
|
|
print("update action is: " + current_animation_action.get('type'))
|
|
|
|
|
|
# Render the next frame in the queue
|
|
if current_animation_action.get('type') == "image":
|
|
new_image = generate_image_from_point_array(current_animation_action.get('point_array'), 128, 32)
|
|
|
|
self.current_point_array = current_animation_action.get('point_array')
|
|
print("image generated")
|
|
|
|
elif current_animation_action.get('type') == "transition":
|
|
|
|
|
|
frame_time_start = time.time()
|
|
|
|
|
|
|
|
if self.previous_to == str(current_animation_action.get('to_file_name')):
|
|
|
|
|
|
point_pairs_groups = self.previous_point_pairs_groups
|
|
|
|
new_point_array = []
|
|
for point_pairs in point_pairs_groups:
|
|
interpolatetime_start = time.time()
|
|
new_point_array += interpolate_point_pairs(point_pairs, current_animation_action.get('stepPercentage'))
|
|
print("interpolationg took: " + str(time.time() - interpolatetime_start) + " sec. (cached)")
|
|
print("step percentate for this transition step is: " + str(current_animation_action.get('stepPercentage')))
|
|
|
|
else:
|
|
print("starting transition generation")
|
|
transitionFrameCount += 1
|
|
divtime_start = time.time()
|
|
|
|
groupsa = divide_points_into_groups(self.current_point_array)
|
|
groupsb = divide_points_into_groups(current_animation_action.get('to_point_array'))
|
|
devisiontime += time.time() - divtime_start
|
|
|
|
print("groups divided, len(groupsa), len(groups(b)=", len(groupsa), len(groupsb))
|
|
|
|
pairgrouptime_start = time.time()
|
|
paired_groups = pair_groups(groupsa, groupsb)
|
|
pairgrouptime += time.time() - pairgrouptime_start
|
|
|
|
|
|
print("paired_groups generated, len(paired_groups)=", len(paired_groups))
|
|
|
|
self.previous_point_pairs_groups = []
|
|
new_point_array = []
|
|
for pair in paired_groups:
|
|
pairpointtime_start = time.time()
|
|
point_pairs = pair_points(pair[0], pair[1])
|
|
self.previous_point_pairs_groups.append(point_pairs)
|
|
pairpointtime += time.time() - pairpointtime_start
|
|
print("step percentate for this transition stepp is: " + str(current_animation_action.get('additionalStepPercentage')))
|
|
|
|
interpolatetime_start = time.time()
|
|
new_point_array += interpolate_point_pairs(point_pairs, current_animation_action.get('additionalStepPercentage'))
|
|
print("interpolationg took: " + str(time.time() - interpolatetime_start) + " sec.")
|
|
|
|
print("face feature interpolated len(new_point_array) =", len(new_point_array))
|
|
|
|
# set previous "to" screen. This is used
|
|
self.previous_to = str(current_animation_action.get('to_file_name'))
|
|
|
|
imagingtime_start = time.time()
|
|
|
|
new_image = generate_image_from_point_array(new_point_array + mirror_points(new_point_array), 128, 32)
|
|
#new_image.save("output/frameNumber"+str(frameCount)+".png")
|
|
|
|
imagingtime += time.time() - imagingtime_start
|
|
|
|
self.current_point_array = new_point_array
|
|
print("transition generated, len(new_point_array) =", len(new_point_array))
|
|
|
|
print("creating transition frame took: " + str(time.time() - frame_time_start) + " sec.")
|
|
print("================== end frame ==================")
|
|
|
|
else:
|
|
print("unknown action: ", current_animation_action)
|
|
|
|
print("setting image to canvas")
|
|
offscreen_canvas = matrix.CreateFrameCanvas()
|
|
offscreen_canvas.SetImage(new_image, unsafe=False)
|
|
print("pushing image to matrix")
|
|
matrix.SwapOnVSync(offscreen_canvas)
|
|
print("pushing image done")
|
|
|
|
# Save the image to a file with the desired format and file name
|
|
# new_image.save("output/frameNumber"+str(frameCount)+".png")
|
|
frameCount += 1
|
|
# new_image.save("output/frameNumber"+str(frameCount)+".png")
|
|
|
|
elapsed_time = time.time() - start_time # Calculate time elapsed during rendering
|
|
|
|
# Calculate the time to sleep to achieve the desired frame rate
|
|
sleep_time = self.frame_duration - elapsed_time
|
|
print("remaining time in frame: " + str(sleep_time))
|
|
if sleep_time > 0:
|
|
time.sleep(sleep_time)
|
|
# print("average time cost per part for transition frames:")
|
|
# print("devisiontime :" + str(devisiontime /transitionFrameCount ))
|
|
# print("pairgrouptime :" + str(pairgrouptime /transitionFrameCount))
|
|
# print("pairpointtime :" + str(pairpointtime /transitionFrameCount))
|
|
# print("interpolatetime :" + str(interpolatetime /transitionFrameCount))
|
|
# print("imagingtime :" + str(imagingtime /transitionFrameCount))
|
|
|
|
|
|
devisiontime = 0
|
|
pairgrouptime = 0
|
|
pairpointtime = 0
|
|
imagingtime = 0
|
|
transitionFrameCount = 0
|
|
|
|
|
|
|
|
|
|
# Function responsible for the blinking behaviour when Idle
|
|
def random_blinks():
|
|
while True:
|
|
time.sleep(random.randint(5, 7))
|
|
decision_layer.handle_trigger("blinkTimer", "")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# functions called by the MQTT listener
|
|
def on_connect(client, userdata, flags, response_code):
|
|
print("Connected to MQTT broker with result code " + str(response_code))
|
|
client.subscribe("test")
|
|
|
|
|
|
# Function to handle MQTT message reception
|
|
def on_message(client, userdata, message):
|
|
# Pass the received message to the decision-making layer
|
|
decision_layer.handle_trigger("mqtt", message.payload)
|
|
|
|
|
|
def main():
|
|
yaml_file = 'testAnimationYaml.yaml' # Replace with the path to your YAML file
|
|
|
|
# Parse the YAML file to get animations
|
|
animations = load_animations(yaml_file)
|
|
|
|
|
|
# MQTT broker configuration
|
|
broker_address = "localhost"
|
|
broker_port = 1883
|
|
broker_keepalive = 60
|
|
|
|
mqtt_client = mqtt.Client()
|
|
mqtt_client.on_connect = on_connect
|
|
mqtt_client.on_message = on_message
|
|
|
|
mqtt_client.connect(broker_address, broker_port, broker_keepalive)
|
|
|
|
mqtt_client.loop_start()
|
|
|
|
# Initialize the rendering layer
|
|
rendering_layer = RenderingLayer(animations, frame_rate=40)
|
|
rendering_thread = threading.Thread(target=rendering_layer.start_rendering)
|
|
rendering_thread.start()
|
|
|
|
# Initialize the decision-making layer
|
|
global decision_layer
|
|
decision_layer = DecisionLayer(animations, rendering_layer)
|
|
|
|
# Create and start random blinks interrupts
|
|
screen_update_thread = threading.Thread(target=random_blinks)
|
|
screen_update_thread.start()
|
|
|
|
|
|
decision_layer.handle_trigger("boot", "")
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|