Обновление модели машинного обучения онлайн и почти в реальном времени с использованием обучающих данных, созданных производителем Kafka.
В последнее время меня все больше интересует машинное обучение онлайн — возможность обновлять веса модели машинного обучения в производственных условиях. Помимо темы, предоставляющей мне забавные архитектурные задачи, этот подход может похвастаться огромным потенциалом выгоды. Это исследование от Grubhub в 2021 году продемонстрировало + 20 % с увеличением показателей и 45-кратную экономию средств за счет использования онлайн-обучения, и я полностью об экономии денег, чтобы заработать деньги.
Однако с практической точки зрения работа с потоками данных и потоковой архитектурой все еще довольно нова для специалистов по машинному обучению. Помимо создания потока обучающих данных в реальном времени, существует довольно мало ресурсов для использования такого источника данных для обновления модели в онлайн-режиме. В этой статье я продемонстрирую:
- Настройка экземпляра Kafka
- Создание производителя, который генерирует обучающие данные
- Создание потребителя, который использует эти обучающие данные для обновления модели машинного обучения.
Запуск Kafka с Docker
Я предпочитаю работать с Kafka локально через docker-compose
. Если он еще не установлен в вашей среде, вы можете следовать инструкциям здесь.
Статья на эту тему Шуйи Янга предоставляет общий обзор этого подхода, и мы можем использовать аналогичный файл docker-compose.yaml
, который создает локальные экземпляры Kafka и Zookeeper и предоставляет Kafka через порт 9092:
version: '3' services: zookeeper: image: wurstmeister/zookeeper:3.4.6 ports: - "2181:2181" kafka: depends_on: - zookeeper image: wurstmeister/kafka ports: - "9092:9092" expose: - "9093" environment: KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092 KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CREATE_TOPICS: "ml_training_data:1:1" volumes: - /var/run/docker.sock:/var/run/docker.sock
Он также создает тему Kafka под названием ml_training_data
, которую мы будем использовать позже. Вы можете запустить файл, изменив каталог с файлом выше и запустив:
docker-compose up
Производитель Kafka для обучающих данных
Во-первых, давайте установим библиотеки Python, которые нам понадобятся:
python -m pip install kafka-python river
Далее нам нужно создать искусственный источник обучающих данных, который будет записан в нашу тему Kafka. Для этого воспользуемся библиотекой River Python, которая имеет простые в использовании API для потоковой передачи данных:
from time import sleep from json import dumps import random from river import datasets from kafka import KafkaProducer # create a kafka product that connects to Kafka on port 9092 producer = KafkaProducer( bootstrap_servers=["localhost:9092"], value_serializer=lambda x: dumps(x).encode("utf-8"), ) # Initialize the River phishing dataset. # This dataset contains features from web pages # that are classified as phishing or not. dataset = datasets.Phishing() # Send observations to the Kafka topic one-at-a-time with a random sleep for x, y in dataset: print(f"Sending: {x, y}") data = {"x": x, "y": y} producer.send("ml_training_data", value=data) sleep(random.random())
Приведенный выше код использует игрушку Набор данных River Phishing (CC BY 4.0) и отправляет помеченные наблюдения данных в нашу тему Kafka по одному за раз. Этот набор данных содержит функции с веб-страниц, которые классифицируются как фишинговые или нет. Образцы в наборе данных представляют собой кортежи, которые выглядят следующим образом:
[({'empty_server_form_handler': 0.0, 'popup_window': 0.0, 'https': 0.0, 'request_from_other_domain': 0.0, 'anchor_from_other_domain': 0.0, 'is_popular': 0.5, 'long_url': 1.0, 'age_of_domain': 1, 'ip_in_url': 1}, True), ({'empty_server_form_handler': 1.0, 'popup_window': 0.0, 'https': 0.5, 'request_from_other_domain': 0.5, 'anchor_from_other_domain': 0.0, 'is_popular': 0.5, 'long_url': 0.0, 'age_of_domain': 1, 'ip_in_url': 0}, True)]
Сначала запустите производителя:
python producer.py
Затем вы должны увидеть в консоли следующее:
Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 0.5, 'https': 1.0, 'request_from_other_domain': 1.0, 'anchor_from_other_domain': 0.5, 'is_popular': 0.5, 'long_url': 0.0, 'age_of_domain': 1, 'ip_in_url': 1}, False) Sending: ({'empty_server_form_handler': 0.0, 'popup_window': 0.5, 'https': 0.0, 'request_from_other_domain': 0.0, 'anchor_from_other_domain': 0.0, 'is_popular': 0.5, 'long_url': 0.0, 'age_of_domain': 1, 'ip_in_url': 0}, True) Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 1.0, 'https': 1.0, 'request_from_other_domain': 0.0, 'anchor_from_other_domain': 1.0, 'is_popular': 0.0, 'long_url': 0.5, 'age_of_domain': 1, 'ip_in_url': 0}, False) Sending: ({'empty_server_form_handler': 0.5, 'popup_window': 0.0, 'https': 0.0, 'request_from_other_domain': 0.5, 'anchor_from_other_domain': 1.0, 'is_popular': 0.5, 'long_url': 1.0, 'age_of_domain': 0, 'ip_in_url': 0}, True) Sending: ({'empty_server_form_handler': 0.0, 'popup_window': 0.0, 'https': 1.0, 'request_from_other_domain': 1.0, 'anchor_from_other_domain': 0.0, 'is_popular': 1.0, 'long_url': 0.0, 'age_of_domain': 0, 'ip_in_url': 0}, True) Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 1.0, 'https': 1.0, 'request_from_other_domain': 0.5, 'anchor_from_other_domain': 0.0, 'is_popular': 1.0, 'long_url': 1.0, 'age_of_domain': 0, 'ip_in_url': 0}, False)
Потребитель Kafka для обучения модели ML
Написание простого потребителя Kafka позволит нам считывать данные, которые мы отправляем из потока по мере их поступления, и использовать их для обновления весов в нашей модели.
from json import loads from time import sleep from kafka import KafkaConsumer from river import linear_model from river import compose from river import preprocessing from river import metrics # use rocauc as the metric for evaluation metric = metrics.ROCAUC() # create a simple LR model with a scaler model = compose.Pipeline( preprocessing.StandardScaler(), linear_model.LogisticRegression() ) # create our Kafka consumer consumer = KafkaConsumer( "ml_training_data", bootstrap_servers=["localhost:9092"], auto_offset_reset="earliest", enable_auto_commit=True, group_id="my-group-id", value_deserializer=lambda x: loads(x.decode("utf-8")), ) # use each event to update our model and print the metrics for event in consumer: event_data = event.value try: x = event_data["x"] y = event_data["y"] y_pred = model.predict_proba_one(x) model.learn_one(x, y) metric.update(y, y_pred) print(metric) except: print("Processing bad data...")
Приведенный выше код инициализирует простую модель машинного обучения с использованием класса River LogisticRegression
. Затем мы непрерывно обрабатываем события и используем их для обновления нашей модели машинного обучения — распечатывая метрику ROCAUC для каждого добавленного образца.
Чтобы начать обучение, запустите:
python consumer.py
Вы должны увидеть что-то вроде следующего в консоли, поскольку модель изучает наблюдение за наблюдением!
ROCAUC: 87.12% ROCAUC: 87.29% ROCAUC: 87.42% ROCAUC: 87.29% ROCAUC: 87.42%
Заключение
Как непрерывное обучение, так и онлайн-обучение имеют огромный потенциал в тех областях, где размеченные данные в режиме реального времени или почти в реальном времени могут быть доступны для моделей, принимающих решения в реальном времени. Весь код и инструкции доступны в этом репозитории Github. Скоро будет больше!