CiscoTheProot/rpi/blinkingTest.py
CiscoTheWolf ab34ad2455 Moved ProotState to own file.
Moved Point2D to own file.
Added caching for parsed images.
2023-05-29 20:09:29 +02:00

121 lines
No EOL
3.6 KiB
Python

from rgbmatrix import RGBMatrix, RGBMatrixOptions
import paho.mqtt.client as mqtt
import time
from PIL import Image
# Configuration for the matrix
options = RGBMatrixOptions()
options.rows = 32
options.cols = 64
options.chain_length = 2
options.parallel = 1
options.hardware_mapping = 'regular' # If you have an Adafruit HAT: 'adafruit-hat'
matrix = RGBMatrix(options=options)
def pure_pil_alpha_to_color_v2(image, color=(0, 0, 0)):
image.load() # needed for split()
background = Image.new('RGB', image.size, color)
background.paste(image, mask=image.split()[3]) # 3 is the alpha channel
return background
def image_interpolation(image1, image2, alpha):
# Ensure both images have the same size
assert image1.size == image2.size, "Input images must have the same shape"
# Normalize the alpha value
alpha = max(0, min(1, alpha))
# Perform interpolation
interpolated_image = Image.blend(image1, image2, alpha)
return interpolated_image
image = Image.open("faces/prootface3.bmp")
image.thumbnail((128, 32), Image.ANTIALIAS)
noBlinkImage = pure_pil_alpha_to_color_v2(image)
image = Image.open("faces/prootface4.bmp")
image.thumbnail((128, 32), Image.ANTIALIAS)
fullBlinkImage = pure_pil_alpha_to_color_v2(image)
DesiredBlinkState = 10
currentBlinkState = 0
blinkFrameCanvases = []
offscreen_interpolated_canvasA = matrix.CreateFrameCanvas()
offscreen_interpolated_canvasA.brightness = 50
offscreen_interpolated_canvasA.SetImage(noBlinkImage, unsafe=False)
blinkFrameCanvases.append(offscreen_interpolated_canvasA)
for alpha in range(1,10):
offscreen_interpolated_canvas = matrix.CreateFrameCanvas()
offscreen_interpolated_canvas.brightness = 50
interpolated_image = image_interpolation(noBlinkImage, fullBlinkImage, alpha/10)
interpolated_image.thumbnail((matrix.width, matrix.height), Image.ANTIALIAS)
offscreen_interpolated_canvas.SetImage(interpolated_image, unsafe=False)
blinkFrameCanvases.append(offscreen_interpolated_canvas)
offscreen_interpolated_canvasB = matrix.CreateFrameCanvas()
offscreen_interpolated_canvasB.brightness = 50
offscreen_interpolated_canvasB.SetImage(fullBlinkImage, unsafe=False)
blinkFrameCanvases.append(offscreen_interpolated_canvasB)
def update_screen():
global DesiredBlinkState, currentBlinkState, blinkFrameCanvases, matrix, offscreen_interpolated_canvasA
# open eye again after blink
if currentBlinkState == 10:
DesiredBlinkState = 0
if currentBlinkState == DesiredBlinkState:
next_canvas = blinkFrameCanvases[currentBlinkState]
return
next_canvas = blinkFrameCanvases[currentBlinkState]
if currentBlinkState < DesiredBlinkState:
currentBlinkState += 1
else:
currentBlinkState -= 1
next_canvas = matrix.SwapOnVSync(next_canvas)
# functions called by the MQTT listener
def on_connect(client, userdata, flags, response_code):
print("Connected to MQTT broker with result code " + str(response_code))
client.subscribe("test")
def on_message(client, userdata, message):
print("Received message '" + str(message.payload) + "' on topic '"
+ message.topic + "' with QoS " + str(message.qos))
global DesiredBlinkState
DesiredBlinkState = 10
# MQTT broker configuration
broker_address = "10.1.13.173" # Replace with your MQTT broker's address
broker_port = 1883
broker_keepalive = 60
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker_address, broker_port, broker_keepalive)
client.loop_start()
while True:
time.sleep(0.05)
update_screen()