Обновление модели машинного обучения онлайн и почти в реальном времени с использованием обучающих данных, созданных производителем Kafka.

В последнее время меня все больше интересует машинное обучение онлайн — возможность обновлять веса модели машинного обучения в производственных условиях. Помимо темы, предоставляющей мне забавные архитектурные задачи, этот подход может похвастаться огромным потенциалом выгоды. Это исследование от Grubhub в 2021 году продемонстрировало + 20 % с увеличением показателей и 45-кратную экономию средств за счет использования онлайн-обучения, и я полностью об экономии денег, чтобы заработать деньги.

Однако с практической точки зрения работа с потоками данных и потоковой архитектурой все еще довольно нова для специалистов по машинному обучению. Помимо создания потока обучающих данных в реальном времени, существует довольно мало ресурсов для использования такого источника данных для обновления модели в онлайн-режиме. В этой статье я продемонстрирую:

  • Настройка экземпляра Kafka
  • Создание производителя, который генерирует обучающие данные
  • Создание потребителя, который использует эти обучающие данные для обновления модели машинного обучения.

Запуск Kafka с Docker

Я предпочитаю работать с Kafka локально через docker-compose. Если он еще не установлен в вашей среде, вы можете следовать инструкциям здесь.

Статья на эту тему ​​Шуйи Янга предоставляет общий обзор этого подхода, и мы можем использовать аналогичный файл docker-compose.yaml, который создает локальные экземпляры Kafka и Zookeeper и предоставляет Kafka через порт 9092:

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
     - "2181:2181"
  kafka:
    depends_on: 
      - zookeeper
    image: wurstmeister/kafka
    ports:
     - "9092:9092"
    expose:
     - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "ml_training_data:1:1"
    volumes:
     - /var/run/docker.sock:/var/run/docker.sock

Он также создает тему Kafka под названием ml_training_data, которую мы будем использовать позже. Вы можете запустить файл, изменив каталог с файлом выше и запустив:

docker-compose up

Производитель Kafka для обучающих данных

Во-первых, давайте установим библиотеки Python, которые нам понадобятся:

python -m pip install kafka-python river  

Далее нам нужно создать искусственный источник обучающих данных, который будет записан в нашу тему Kafka. Для этого воспользуемся библиотекой River Python, которая имеет простые в использовании API для потоковой передачи данных:

from time import sleep
from json import dumps
import random

from river import datasets
from kafka import KafkaProducer

# create a kafka product that connects to Kafka on port 9092
producer = KafkaProducer(
    bootstrap_servers=["localhost:9092"],
    value_serializer=lambda x: dumps(x).encode("utf-8"),
)

# Initialize the River phishing dataset.
# This dataset contains features from web pages 
# that are classified as phishing or not.
dataset = datasets.Phishing()

# Send observations to the Kafka topic one-at-a-time with a random sleep
for x, y in dataset:
    print(f"Sending: {x, y}")
    data = {"x": x, "y": y}
    producer.send("ml_training_data", value=data)
    sleep(random.random())

Приведенный выше код использует игрушку Набор данных River Phishing (CC BY 4.0) и отправляет помеченные наблюдения данных в нашу тему Kafka по одному за раз. Этот набор данных содержит функции с веб-страниц, которые классифицируются как фишинговые или нет. Образцы в наборе данных представляют собой кортежи, которые выглядят следующим образом:

[({'empty_server_form_handler': 0.0,
   'popup_window': 0.0,
   'https': 0.0,
   'request_from_other_domain': 0.0,
   'anchor_from_other_domain': 0.0,
   'is_popular': 0.5,
   'long_url': 1.0,
   'age_of_domain': 1,
   'ip_in_url': 1},
  True),
 ({'empty_server_form_handler': 1.0,
   'popup_window': 0.0,
   'https': 0.5,
   'request_from_other_domain': 0.5,
   'anchor_from_other_domain': 0.0,
   'is_popular': 0.5,
   'long_url': 0.0,
   'age_of_domain': 1,
   'ip_in_url': 0},
  True)]

Сначала запустите производителя:

python producer.py

Затем вы должны увидеть в консоли следующее:

Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 0.5, 'https': 1.0, 'request_from_other_domain': 1.0, 'anchor_from_other_domain': 0.5, 'is_popular': 0.5, 'long_url': 0.0, 'age_of_domain': 1, 'ip_in_url': 1}, False)
Sending: ({'empty_server_form_handler': 0.0, 'popup_window': 0.5, 'https': 0.0, 'request_from_other_domain': 0.0, 'anchor_from_other_domain': 0.0, 'is_popular': 0.5, 'long_url': 0.0, 'age_of_domain': 1, 'ip_in_url': 0}, True)
Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 1.0, 'https': 1.0, 'request_from_other_domain': 0.0, 'anchor_from_other_domain': 1.0, 'is_popular': 0.0, 'long_url': 0.5, 'age_of_domain': 1, 'ip_in_url': 0}, False)
Sending: ({'empty_server_form_handler': 0.5, 'popup_window': 0.0, 'https': 0.0, 'request_from_other_domain': 0.5, 'anchor_from_other_domain': 1.0, 'is_popular': 0.5, 'long_url': 1.0, 'age_of_domain': 0, 'ip_in_url': 0}, True)
Sending: ({'empty_server_form_handler': 0.0, 'popup_window': 0.0, 'https': 1.0, 'request_from_other_domain': 1.0, 'anchor_from_other_domain': 0.0, 'is_popular': 1.0, 'long_url': 0.0, 'age_of_domain': 0, 'ip_in_url': 0}, True)
Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 1.0, 'https': 1.0, 'request_from_other_domain': 0.5, 'anchor_from_other_domain': 0.0, 'is_popular': 1.0, 'long_url': 1.0, 'age_of_domain': 0, 'ip_in_url': 0}, False)

Потребитель Kafka для обучения модели ML

Написание простого потребителя Kafka позволит нам считывать данные, которые мы отправляем из потока по мере их поступления, и использовать их для обновления весов в нашей модели.

from json import loads
from time import sleep

from kafka import KafkaConsumer

from river import linear_model
from river import compose
from river import preprocessing
from river import metrics

# use rocauc as the metric for evaluation
metric = metrics.ROCAUC()

# create a simple LR model with a scaler
model = compose.Pipeline(
    preprocessing.StandardScaler(), linear_model.LogisticRegression()
)

# create our Kafka consumer
consumer = KafkaConsumer(
    "ml_training_data",
    bootstrap_servers=["localhost:9092"],
    auto_offset_reset="earliest",
    enable_auto_commit=True,
    group_id="my-group-id",
    value_deserializer=lambda x: loads(x.decode("utf-8")),
)

# use each event to update our model and print the metrics
for event in consumer:
    event_data = event.value
    try:
        x = event_data["x"]
        y = event_data["y"]
        y_pred = model.predict_proba_one(x)
        model.learn_one(x, y)
        metric.update(y, y_pred)
        print(metric)
    except:
        print("Processing bad data...")

Приведенный выше код инициализирует простую модель машинного обучения с использованием класса River LogisticRegression. Затем мы непрерывно обрабатываем события и используем их для обновления нашей модели машинного обучения — распечатывая метрику ROCAUC для каждого добавленного образца.

Чтобы начать обучение, запустите:

python consumer.py

Вы должны увидеть что-то вроде следующего в консоли, поскольку модель изучает наблюдение за наблюдением!

ROCAUC: 87.12%
ROCAUC: 87.29%
ROCAUC: 87.42%
ROCAUC: 87.29%
ROCAUC: 87.42%

Заключение

Как непрерывное обучение, так и онлайн-обучение имеют огромный потенциал в тех областях, где размеченные данные в режиме реального времени или почти в реальном времени могут быть доступны для моделей, принимающих решения в реальном времени. Весь код и инструкции доступны в этом репозитории Github. Скоро будет больше!