Я пытаюсь создать звуковой сигнал ворот. Я создал и обучил модель CNN, но я понятия не имею, как использовать ее для прогнозирования данных в реальном времени.
Я хочу делать прогнозы живого класса на основе последней секунды звука со встроенного микрофона, чтобы классифицировать самый последний звук как YES_GOAL, YES_WIN или NO_GOAL.
Конечная цель моего проекта - включить звуковой сигнал в iTunes каждый раз, когда я кричу GOAL!
Когда я пытаюсь запустить свой код, я получаю
ValueError: вход 0 плотного слоя несовместим со слоем: ожидаемая ось -1 входной формы должна иметь значение 2200, но получен вход с фигурой [32, 1]
Вот мой код:
import pyaudio
import librosa
import numpy as np
import time
import subprocess
import os
import sys
#import kbHitMod
import tensorflow.keras as keras
MODEL_PATH = "/Users/schoolwork/Documents/Goal_Horn_Project_Stuff/Goal Horn Program/Goal_Model.model"
GOAL_TRACK = "1 New York Islanders Overtime Goal and Win Horn || NYCB Live: Home of the Nassau Veterans Memorial Coliseum"
WIN_TRACK = "2 New York Islanders Win Horn || NYCB Live: Home of the Nassau Veterans Memorial Coliseum"
OT_GOAL_TRACK = "3 New York Islanders Goal Horn || NYCB Live Home of the Nassau Veterans Memorial Coliseum"
QUIET_TRACK = "4 pure silence"
PAUSE_COMMAND = "osascript -e 'tell application \"iTunes\" to pause'"
class RingBuffer:
""" class that implements a not-yet-full buffer """
def __init__(self,size_max):
self.max = size_max
self.data = []
class __Full:
""" class that implements a full buffer """
def append(self, x):
""" Append an element overwriting the oldest one. """
self.data[self.cur] = x
self.cur = (self.cur+1) % self.max
def get(self):
""" return list of elements in correct order """
return self.data[self.cur:]+self.data[:self.cur]
def append(self,x):
"""append an element at the end of the buffer"""
self.data.append(x)
if len(self.data) == self.max:
self.cur = 0
# Permanently change self's class from non-full to full
self.__class__ = self.__Full
def get(self):
""" Return a list of elements from the oldest to the newest. """
return self.data
# ring buffer will keep the last 1 second worth of audio
ringBuffer = RingBuffer(1 * 22050)
overtime = False
print("\nOvertime mode: off\n")
def play(track_name):
subprocess.getoutput("osascript -e 'tell application \"iTunes\" to play (first track of playlist \"Library\" whose name is \"4 pure silence\")'")
subprocess.getoutput("osascript -e 'tell application \"iTunes\" to play (first track of playlist \"Library\" whose name is \"" + track_name + "\")'")
def callback(in_data, frame_count, time_info, flag):
state = subprocess.getoutput("osascript -e 'tell application \"iTunes\" to player state as string'")
model = keras.models.load_model(MODEL_PATH, compile=True)
audio_data = np.fromstring(in_data, dtype=np.float32)
# we trained on audio with a sample rate of 22050 so we need to convert it
audio_data = librosa.resample(audio_data, 44100, 22050)
ringBuffer.append(audio_data)
# machine learning model takes live audio as input and
# decides if the last 1 second of audio contains a goal
if model.predict_classes(ringBuffer.get()) == "YES_GOAL" and state == "paused":
# GOAL!!
if overtime:
play(GOAL_TRACK)
else:
play(OT_GOAL_TRACK)
# decides if the last 1 second of audio contains a win
elif model.predict_classes(ringBuffer.get()) == "YES_WIN" and state == "paused":
play(WIN_TRACK)
return (in_data, pyaudio.paContinue)
pa = pyaudio.PyAudio()
stream = pa.open(format = pyaudio.paFloat32,
channels = 1,
rate = 44100,
output = False,
input = True,
stream_callback=callback)
# start the stream
stream.start_stream()
i = 0 # This is just an alternative to breaking the loop with kbHitMod
while stream.is_active():
time.sleep(0.25)
"""
kb = kbHitMod.KBHit() # detects if a key has been pressed
ot = kb.getch()
if ot == "o":
if overtime == False:
overtime = True
print("Overtime mode: ON\n")
else:
overtime = False
print("Overtime mode: off\n")
elif ot == "q":
print("Quitting... Goodbye!\n")
break
"""
i += 1
if i >= 100:
break
stream.close()
pa.terminate()
play(QUIET_TRACK)
subprocess.getoutput(PAUSE_COMMAND)
print("Program terminated. \n")
И моя модель:
import json
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow.keras as keras
DATA_PATH = "/Users/schoolwork/Documents/Goal_Horn_Project_Stuff/Goal Horn Program/data.json"
MODEL_PATH = "/Users/schoolwork/Documents/Goal_Horn_Project_Stuff/Goal Horn Program/Goal_Model.model"
def load_data(data_path):
"""Loads training dataset from json file.
:param data_path (str): Path to json file containing data
:return X (ndarray): Inputs
:return y (ndarray): Targets
"""
with open(data_path, "r") as fp:
data = json.load(fp)
X = np.array(data["mfcc"])
y = np.array(data["labels"])
return X, y
def prepare_datasets(test_size, validation_size):
# load data
X, y = load_data(DATA_PATH)
# create train/test split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size)
# create train/validation split
X_train, X_validation, y_train, y_validation = train_test_split(X_train, y_train, test_size=validation_size)
# 3d array -> (130, 50, 1)
X_train = X_train[..., np.newaxis] # 4d array -> (num_samples, 130, 50, 1) (I don't know where these numbers are coming from. They might not be right)
X_validation = X_validation[..., np.newaxis]
X_test = X_test[..., np.newaxis]
return X_train, X_validation, X_test, y_train, y_validation, y_test
def build_model(input_shape):
# create model
model = keras.Sequential()
# 1st conv layer
model.add(keras.layers.Conv2D(32, (3, 3), activation="relu", input_shape=input_shape))
model.add(keras.layers.MaxPool2D((3, 3), strides=(2, 2), padding="same"))
model.add(keras.layers.BatchNormalization())
# 2nd conv layer
model.add(keras.layers.Conv2D(32, (3, 3), activation="relu", input_shape=input_shape))
model.add(keras.layers.MaxPool2D((3, 3), strides=(2, 2), padding="same"))
model.add(keras.layers.BatchNormalization())
# 3rd conv layer
model.add(keras.layers.Conv2D(32, (2, 2), activation="relu", input_shape=input_shape))
model.add(keras.layers.MaxPool2D((2, 2), strides=(2, 2), padding="same"))
model.add(keras.layers.BatchNormalization())
# flatten the output and feed it into dense layer
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(64, activation="relu"))
model.add(keras.layers.Dropout(0.3))
# output layer
model.add(keras.layers.Dense(3, activation="softmax"))
return model
def predict(model, X, y):
X = X[np.newaxis, ...]
# prediction = [ [0.1, 0.2, ...] ]
prediction = model.predict(X) # X -> (1, 130, 50, 1)
# extract index with max_value
predicted_index = np.argmax(prediction, axis=-1) # [4]
print("Expected index: {}, Predicted index: {}".format(y, predicted_index))
if __name__ == "__main__":
# create train, validation and test sets
X_train, X_validation, X_test, y_train, y_validation, y_test = prepare_datasets(0.25, 0.2)
# build the CNN net
input_shape = (X_train.shape[1], X_train.shape[2], X_train.shape[3])
model = build_model(input_shape)
# compile the network
optimizer = keras.optimizers.Adam(learning_rate=0.0001)
model.compile(optimizer=optimizer,
loss="sparse_categorical_crossentropy",
metrics=["accuracy"])
# train the CNN
model.fit(X_train, y_train, validation_data=(X_validation, y_validation), batch_size=32, epochs=30)
# evaluate the CNN on the test set
test_error, test_accuracy = model.evaluate(X_test, y_test, verbose=1)
print("Accuracy on test set is: {}".format(test_accuracy))
# make prediction on a sample
X = X_test[2]
y = y_test[2]
print(X_test.shape)
predict(model, X, y)
model.save(MODEL_PATH)