Moved ProotState to own file.

Moved Point2D to own file.
Added caching for parsed images.
This commit is contained in:
CiscoTheWolf 2023-05-29 20:09:29 +02:00
parent bdcb648765
commit ab34ad2455
6 changed files with 340 additions and 259 deletions

View file

@ -2,15 +2,24 @@ import math
from PIL import Image
import numpy as np
from scipy.optimize import linear_sum_assignment
import json
import os.path
import hashlib
# location to store array cache
CACHE_FILE_PATH = "point_array_cache.json"
class Point2D:
x = 0
y = 0
color: tuple[int, int, int] = (0, 0, 0)
def __init__(self, x, y):
def __init__(self, x, y, color: tuple[int, int, int] = (0, 0, 0)):
self.x = x
self.y = y
self.color = color
def round(self):
self.x = round(self.x)
@ -25,27 +34,65 @@ class Point2D:
def interpolate(self, other, percentage):
new_x = self.x + (other.x - self.x) * percentage
new_y = self.y + (other.y - self.y) * percentage
return Point2D(new_x, new_y)
new_color = tuple(int((1 - percentage) * self.color[i] + percentage * other.color[i]) for i in range(3))
return Point2D(new_x, new_y, new_color)
def __eq__(self, other):
return (self.x, self.y) == (other.x, other.y)
def mirror_points(points: list[Point2D]) -> list[Point2D]:
mirrored_points = []
for point in points:
mirrored_x = 128 - point.x # Calculate the mirrored x-coordinate
mirrored_point = Point2D(mirrored_x, point.y)
mirrored_points.append(mirrored_point)
return mirrored_points
def get_image_hash(image):
image_hash = hashlib.sha1(image.tobytes()).hexdigest()
return image_hash
def load_cached_point_arrays():
cached_point_arrays = {}
if os.path.isfile(CACHE_FILE_PATH):
with open(CACHE_FILE_PATH, "r") as file:
cached_point_arrays = json.load(file)
return cached_point_arrays
def save_cached_point_arrays(cached_point_arrays):
with open(CACHE_FILE_PATH, "w") as file:
json.dump(cached_point_arrays, file)
def generate_point_array_from_image(image):
image = image.convert("RGB") # Convert image to RGB color mode
image_hash = get_image_hash(image)
cached_point_arrays = load_cached_point_arrays()
if image_hash in cached_point_arrays:
return [Point2D(point["x"], point["y"], tuple(point["color"])) for point in cached_point_arrays[image_hash]]
width, height = image.size
point_array = []
pixel_array = []
# Iterate over the pixels and generate Point2D instances
for y in range(height):
for x in range(width):
pixel = image.getpixel((x, y))
if pixel != (0, 0, 0): # Assuming white pixels
point = Point2D(x, y)
point_array.append(point)
if pixel != (0, 0, 0): # any non-white pixels
point = {"x": x, "y": y, "color": pixel}
pixel_array.append(point)
return point_array
cached_point_arrays[image_hash] = pixel_array
save_cached_point_arrays(cached_point_arrays)
return [Point2D(point["x"], point["y"], tuple(point["color"])) for point in pixel_array]
def generate_image_from_point_array(points, width, height):
@ -114,8 +161,8 @@ def interpolate_point_pairs(pairs, percentage):
return interpolated_points
Image1 = Image.open("CiscoTheProot/faces/prootface3.png")
Image2 = Image.open("CiscoTheProot/faces/prootface4.png")
Image1 = Image.open("faces/prootface3.png")
Image2 = Image.open("faces/prootface4.png")
pixelArray1 = generate_point_array_from_image(Image1)
pixelArray2 = generate_point_array_from_image(Image2)

144
rpi/Point2D.py Normal file
View file

@ -0,0 +1,144 @@
import hashlib
import json
import os.path
import math
from scipy.optimize import linear_sum_assignment
import numpy as np
from PIL import Image
# location to store array cache
CACHE_FILE_PATH = "point_array_cache.json"
class Point2D:
x = 0
y = 0
color: tuple[int, int, int] = (0, 0, 0)
def __init__(self, x, y, color: tuple[int, int, int] = (0, 0, 0)):
self.x = x
self.y = y
self.color = color
def round(self):
self.x = round(self.x)
self.y = round(self.y)
return self
def distance(self, other):
dx = self.x - other.x
dy = self.y - other.y
return math.sqrt(dx ** 2 + dy ** 2)
def interpolate(self, other, percentage):
new_x = self.x + (other.x - self.x) * percentage
new_y = self.y + (other.y - self.y) * percentage
new_color = tuple(int((1 - percentage) * self.color[i] + percentage * other.color[i]) for i in range(3))
return Point2D(new_x, new_y, new_color)
def __eq__(self, other):
return (self.x, self.y) == (other.x, other.y)
def mirror_points(points: list[Point2D]) -> list[Point2D]:
mirrored_points = []
for point in points:
mirrored_x = 128 - point.x # Calculate the mirrored x-coordinate
mirrored_point = Point2D(mirrored_x, point.y)
mirrored_points.append(mirrored_point)
return mirrored_points
def get_image_hash(image):
image_hash = hashlib.sha1(image.tobytes()).hexdigest()
return image_hash
def load_cached_point_arrays():
cached_point_arrays = {}
if os.path.isfile(CACHE_FILE_PATH):
with open(CACHE_FILE_PATH, "r") as file:
cached_point_arrays = json.load(file)
return cached_point_arrays
def save_cached_point_arrays(cached_point_arrays):
with open(CACHE_FILE_PATH, "w") as file:
json.dump(cached_point_arrays, file)
def generate_point_array_from_image(image):
image = image.convert("RGB") # Convert image to RGB color mode
image_hash = get_image_hash(image)
cached_point_arrays = load_cached_point_arrays()
if image_hash in cached_point_arrays:
return [Point2D(point["x"], point["y"], tuple(point["color"])) for point in cached_point_arrays[image_hash]]
def generate_image_from_point_array(points: list[Point2D], width: int, height: int) -> Image:
# Create a new blank image
image = Image.new("RGB", (width, height), "black")
# Set the pixels corresponding to the points as white
pixels = image.load()
for point in points:
point = point.round()
x = point.x
y = point.y
pixels[x, y] = (255, 255, 255)
return image
def interpolate_point_pairs(pairs: list[tuple[Point2D, Point2D]], percentage: float) -> list[Point2D]:
interpolated_points:list[Point2D] = []
for pair in pairs:
point1, point2 = pair
interpolated_point = point1.interpolate(point2, percentage)
interpolated_points.append(interpolated_point)
return interpolated_points
def pair_points(points1: list[Point2D], points2: list[Point2D]) -> list[tuple[Point2D, Point2D]]:
# Determine the size of the point arrays
size1 = len(points1)
size2 = len(points2)
# Create a cost matrix based on the distances between points
cost_matrix = np.zeros((size1, size2))
for i in range(size1):
for j in range(size2):
cost_matrix[i, j] = points1[i].distance(points2[j])
# Duplicate points in the smaller array to match the size of the larger array
if size1 > size2:
num_duplicates = size1 - size2
duplicated_points = np.random.choice(points2, size=num_duplicates).tolist()
points2 += duplicated_points
elif size2 > size1:
num_duplicates = size2 - size1
duplicated_points = np.random.choice(points1, size=num_duplicates).tolist()
points1 += duplicated_points
# Update the size of the point arrays
size1 = len(points1)
size2 = len(points2)
# Create a new cost matrix with the updated sizes
cost_matrix = np.zeros((size1, size2))
for i in range(size1):
for j in range(size2):
cost_matrix[i, j] = points1[i].distance(points2[j])
# Solve the assignment problem using the Hungarian algorithm
row_ind, col_ind = linear_sum_assignment(cost_matrix)
# Create pairs of points based on the optimal assignment
pairs = []
for i, j in zip(row_ind, col_ind):
pairs.append((points1[i], points2[j]))
return pairs

105
rpi/ProotState.py Normal file
View file

@ -0,0 +1,105 @@
import time
import random
class ProotState:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.current_blink_state = 0
cls._instance.desired_blink_state = 0
cls._instance.blinks_frames_ready = False
cls._instance.loading_screen = True
cls._instance.loading_time = 0
cls._instance.frame_canvas_prootScreen_1 = False
cls._instance.frame_canvas_prootScreen_2 = False
cls._instance.frame_canvas_prootScreen_3 = False
return cls._instance
def next_blink_frame_number(self) -> int:
if self.current_blink_state == self.desired_blink_state == 10:
self.desired_blink_state = 0
return self.current_blink_state
if self.current_blink_state == self.desired_blink_state == 0:
return self.current_blink_state
if self.current_blink_state < self.desired_blink_state:
self.current_blink_state += 1
else:
self.current_blink_state -= 1
return self.current_blink_state
def blink(self, state = 10):
self.set_desired_blink_state(state)
def set_desired_blink_state(self, state: int):
self.desired_blink_state = state
def set_blinks_frames_ready(self, ready: bool):
self.blinks_frames_ready = ready
def get_desired_blink_state(self) -> int:
return self.desired_blink_state
def get_blinks_frames_ready(self) -> bool:
return self.blinks_frames_ready
# This function animates the loading screen. It asumes that the function gets called frequently(every frame update)
def set_ProotScreen(self, matrix):
self.loading_time += 1
self.loading_time = self.loading_time % 75
if not self.frame_canvas_prootScreen_1:
self.frame_canvas_prootScreen_1 = matrix.CreateFrameCanvas()
image_proot_screen_1 = Image.open("faces/ProotScreen1.png").convert('RGB')
self.frame_canvas_prootScreen_1.SetImage(image_proot_screen_1, unsafe=False)
matrix.SwapOnVSync(self.frame_canvas_prootScreen_1)
if not self.frame_canvas_prootScreen_2:
self.frame_canvas_prootScreen_2 = matrix.CreateFrameCanvas()
image_proot_screen_2 = Image.open("faces/ProotScreen2.png").convert('RGB')
self.frame_canvas_prootScreen_2.SetImage(image_proot_screen_2, unsafe=False)
matrix.SwapOnVSync(self.frame_canvas_prootScreen_2)
if not self.frame_canvas_prootScreen_3:
self.frame_canvas_prootScreen_3 = matrix.CreateFrameCanvas()
image_proot_screen_3 = Image.open("faces/ProotScreen3.png").convert('RGB')
self.frame_canvas_prootScreen_3.SetImage(image_proot_screen_3, unsafe=False)
matrix.SwapOnVSync(self.frame_canvas_prootScreen_3)
if self.loading_time < 25:
matrix.SwapOnVSync(self.frame_canvas_prootScreen_1)
elif self.loading_time < 50:
matrix.SwapOnVSync(self.frame_canvas_prootScreen_2)
else:
matrix.SwapOnVSync(self.frame_canvas_prootScreen_3)
def update_screen():
global blinkFrameCanvases, matrix
proot_state = ProotState()
if proot_state.get_blinks_frames_ready():
# TODO move blinking animation writing logic to the ProotState class
matrix.SwapOnVSync(blinkFrameCanvases[proot_state.next_blink_frame_number()])
else:
proot_state.set_ProotScreen(matrix)
def interrupt_timer():
while True:
update_screen()
time.sleep(0.01)
def random_blinks():
while True:
time.sleep(random.randint(3, 5))
proot_state = ProotState()
if proot_state.get_blinks_frames_ready():
proot_state.blink()

View file

@ -1,12 +1,11 @@
from rgbmatrix import RGBMatrix, RGBMatrixOptions
from Point2D import Point2D
from ProotState import ProotState
from PIL import Image
import paho.mqtt.client as mqtt
import time
from PIL import Image
import numpy as np
import math
from scipy.optimize import linear_sum_assignment
import threading
import random
print("start configuring matrix")
@ -29,237 +28,12 @@ print("configuring matrix took: " + str(endT - startT) + " ms")
blinkFrameCanvases = []
class ProotState:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.current_blink_state = 0
cls._instance.desired_blink_state = 0
cls._instance.blinks_frames_ready = False
cls._instance.loading_screen = True
cls._instance.loading_time = 0
cls._instance.frame_canvas_prootScreen_1 = False
cls._instance.frame_canvas_prootScreen_2 = False
cls._instance.frame_canvas_prootScreen_3 = False
return cls._instance
def next_blink_frame_number(self) -> int:
if self.current_blink_state == self.desired_blink_state == 10:
self.desired_blink_state = 0
return self.current_blink_state
if self.current_blink_state == self.desired_blink_state == 0:
return self.current_blink_state
if self.current_blink_state < self.desired_blink_state:
self.current_blink_state += 1
else:
self.current_blink_state -= 1
return self.current_blink_state
def blink(self):
self.set_desired_blink_state(10)
def set_desired_blink_state(self, state: int):
self.desired_blink_state = state
def set_blinks_frames_ready(self, ready: bool):
self.blinks_frames_ready = ready
def get_desired_blink_state(self) -> int:
return self.desired_blink_state
def get_blinks_frames_ready(self) -> bool:
return self.blinks_frames_ready
def set_ProotScreen(self, matrix):
self.loading_time += 1
self.loading_time = self.loading_time % 75
if not self.frame_canvas_prootScreen_1:
self.frame_canvas_prootScreen_1 = matrix.CreateFrameCanvas()
image_proot_screen_1 = Image.open("faces/ProotScreen1.png").convert('RGB')
self.frame_canvas_prootScreen_1.SetImage(image_proot_screen_1, unsafe=False)
matrix.SwapOnVSync(self.frame_canvas_prootScreen_1)
if not self.frame_canvas_prootScreen_2:
self.frame_canvas_prootScreen_2 = matrix.CreateFrameCanvas()
image_proot_screen_2 = Image.open("faces/ProotScreen2.png").convert('RGB')
self.frame_canvas_prootScreen_2.SetImage(image_proot_screen_2, unsafe=False)
matrix.SwapOnVSync(self.frame_canvas_prootScreen_2)
if not self.frame_canvas_prootScreen_3:
self.frame_canvas_prootScreen_3 = matrix.CreateFrameCanvas()
image_proot_screen_3 = Image.open("faces/ProotScreen3.png").convert('RGB')
self.frame_canvas_prootScreen_3.SetImage(image_proot_screen_3, unsafe=False)
matrix.SwapOnVSync(self.frame_canvas_prootScreen_3)
if self.loading_time < 25:
matrix.SwapOnVSync(self.frame_canvas_prootScreen_1)
elif self.loading_time < 50:
matrix.SwapOnVSync(self.frame_canvas_prootScreen_2)
else:
matrix.SwapOnVSync(self.frame_canvas_prootScreen_3)
class Point2D:
x = 0
y = 0
color:tuple[int, int, int] = (0, 0, 0)
def __init__(self, x, y, color: tuple[int, int, int] = (0, 0, 0)):
self.x = x
self.y = y
self.color = color
def round(self):
self.x = round(self.x)
self.y = round(self.y)
return self
def distance(self, other):
dx = self.x - other.x
dy = self.y - other.y
return math.sqrt(dx**2 + dy**2)
def interpolate(self, other, percentage):
new_x = self.x + (other.x - self.x) * percentage
new_y = self.y + (other.y - self.y) * percentage
return Point2D(new_x, new_y)
def __eq__(self, other):
return (self.x, self.y) == (other.x, other.y)
def mirror_points(points: list[Point2D]) -> list[Point2D]:
mirrored_points = []
for point in points:
mirrored_x = 128 - point.x # Calculate the mirrored x-coordinate
mirrored_point = Point2D(mirrored_x, point.y)
mirrored_points.append(mirrored_point)
return mirrored_points
def generate_point_array_from_image(image: Image) -> list[Point2D]:
image = image.convert("RGB") # Convert image to RGB color mode
width, height = image.size
point_array = []
# Iterate over the pixels and generate Point2D instances
for y in range(height):
for x in range(width):
pixel = image.getpixel((x, y))
if pixel != (0, 0, 0): # Assuming white pixels
point = Point2D(x, y)
point_array.append(point)
return point_array
def generate_image_from_point_array(points: list[Point2D], width: int, height: int) -> Image:
# Create a new blank image
image = Image.new("RGB", (width, height), "black")
# Set the pixels corresponding to the points as white
pixels = image.load()
for point in points:
point = point.round()
x = point.x
y = point.y
pixels[x, y] = (255, 255, 255)
return image
def pair_points(points1: list[Point2D], points2: list[Point2D]) -> list[tuple[Point2D, Point2D]]:
# Determine the size of the point arrays
size1 = len(points1)
size2 = len(points2)
# Create a cost matrix based on the distances between points
cost_matrix = np.zeros((size1, size2))
for i in range(size1):
for j in range(size2):
cost_matrix[i, j] = points1[i].distance(points2[j])
# Duplicate points in the smaller array to match the size of the larger array
if size1 > size2:
num_duplicates = size1 - size2
duplicated_points = np.random.choice(points2, size=num_duplicates).tolist()
points2 += duplicated_points
elif size2 > size1:
num_duplicates = size2 - size1
duplicated_points = np.random.choice(points1, size=num_duplicates).tolist()
points1 += duplicated_points
# Update the size of the point arrays
size1 = len(points1)
size2 = len(points2)
# Create a new cost matrix with the updated sizes
cost_matrix = np.zeros((size1, size2))
for i in range(size1):
for j in range(size2):
cost_matrix[i, j] = points1[i].distance(points2[j])
# Solve the assignment problem using the Hungarian algorithm
row_ind, col_ind = linear_sum_assignment(cost_matrix)
# Create pairs of points based on the optimal assignment
pairs = []
for i, j in zip(row_ind, col_ind):
pairs.append((points1[i], points2[j]))
return pairs
def interpolate_point_pairs(pairs: list[tuple[Point2D, Point2D]], percentage: float) -> list[Point2D]:
interpolated_points:list[Point2D] = []
for pair in pairs:
point1, point2 = pair
interpolated_point = point1.interpolate(point2, percentage)
interpolated_points.append(interpolated_point)
return interpolated_points
def update_screen():
global blinkFrameCanvases, matrix
proot_state = ProotState()
if proot_state.get_blinks_frames_ready():
# TODO move blinking animation writing logic to the ProotState class
matrix.SwapOnVSync(blinkFrameCanvases[proot_state.next_blink_frame_number()])
else:
proot_state.set_ProotScreen(matrix)
def interrupt_timer():
while True:
update_screen()
time.sleep(0.01)
def random_blinks():
while True:
time.sleep(random.randint(3, 5))
proot_state = ProotState()
if proot_state.get_blinks_frames_ready():
proot_state.blink()
# Create and start screen update interrupts
screen_update_thread = threading.Thread(target=interrupt_timer)
screen_update_thread = threading.Thread(target=ProotState.interrupt_timer)
screen_update_thread.start()
# Create and start random blinks interrupts
screen_update_thread = threading.Thread(target=random_blinks)
screen_update_thread = threading.Thread(target=ProotState.random_blinks)
screen_update_thread.start()
@ -283,10 +57,10 @@ startT = curr_time = round(time.time()*1000)
# generate pixel arrays from each image
# TODO ^ storing and loading lists of points will take away this step. (it will require a dedicated script to precompute these)
points_left_eye_open = generate_point_array_from_image(image_left_eye_open)
points_left_eye_closed = generate_point_array_from_image(image_left_eye_closed)
points_left_nose = generate_point_array_from_image(image_left_nose)
points_left_mouth = generate_point_array_from_image(image_left_mouth)
points_left_eye_open = Point2D.generate_point_array_from_image(image_left_eye_open)
points_left_eye_closed = Point2D.generate_point_array_from_image(image_left_eye_closed)
points_left_nose = Point2D.generate_point_array_from_image(image_left_nose)
points_left_mouth = Point2D.generate_point_array_from_image(image_left_mouth)
endT = curr_time = round(time.time()*1000)
print("generating pixel array took: " + str(endT - startT) + " ms")
@ -298,7 +72,7 @@ startT = curr_time = round(time.time()*1000)
#calculate the point pairs between the open and closed left eye
# TODO look into precomputing and storing these animations before runtime
left_eye_blink_pairs = pair_points(points_left_eye_open, points_left_eye_closed)
left_eye_blink_pairs = Point2D.pair_points(points_left_eye_open, points_left_eye_closed)
endT = curr_time = round(time.time()*1000)
print("pairing points for one eye took: " + str(endT - startT) + " ms")
@ -312,13 +86,13 @@ startT = curr_time = round(time.time()*1000)
for alpha in range(0,11):
offscreen_interpolated_canvas = matrix.CreateFrameCanvas()
left_eye = interpolate_point_pairs(left_eye_blink_pairs, alpha/10)
right_eye = mirror_points(left_eye)
nose = points_left_nose + mirror_points(points_left_nose)
mouth = points_left_mouth + mirror_points(points_left_mouth)
left_eye = Point2D.interpolate_point_pairs(left_eye_blink_pairs, alpha/10)
right_eye = Point2D.mirror_points(left_eye)
nose = points_left_nose + Point2D.mirror_points(points_left_nose)
mouth = points_left_mouth + Point2D.mirror_points(points_left_mouth)
face = left_eye + right_eye + nose + mouth
interpolated_face_image = generate_image_from_point_array(face, 128, 32)
interpolated_face_image = Point2D.generate_image_from_point_array(face, 128, 32)
offscreen_interpolated_canvas.SetImage(interpolated_face_image, unsafe=False)
blinkFrameCanvases.append(offscreen_interpolated_canvas)

View file

@ -2,7 +2,6 @@ from rgbmatrix import RGBMatrix, RGBMatrixOptions
import paho.mqtt.client as mqtt
import time
from PIL import Image
import numpy as np
# Configuration for the matrix

12
rpi/micTest.py Normal file
View file

@ -0,0 +1,12 @@
# Print out realtime audio volume as ascii bars
import sounddevice as sd
import numpy as np
def callback(indata: np.ndarray, outdata: np.ndarray, frames: int, time, status) -> None:
print(indata.shape)
volume_norm = np.linalg.norm(indata)*10
print ("|" * int(volume_norm))
with sd.Stream(callback=callback):
sd.sleep(10000)