Мы живем в эпоху, когда аналитика в реальном времени становится необходимостью для многих предприятий. Это позволяет предприятиям реагировать без промедления. Они могут использовать возможности или предотвращать проблемы до того, как они возникнут.

Предисловие

Диплом специалиста Nanyang Poly в области управления большими данными (ITD35X) — мой последний сертификат в моем полуторагодичном путешествии по большим данным и машинному обучению в NYP. Эта статья о последнем 5-недельном проекте по архитектуре больших данных, над которым я работал с моими товарищами по команде.

Предыстория стартапа и цель проекта

Я и трое моих однокурсников (JunQiang Shen, TaiSan Lee и S. Vignesh) создали псевдостартап — компанию DataVar, которая предоставляет клиентам услуги по исследованию фондового рынка и аналитике. Технический анализ акций — это основная услуга DataVar, поскольку мы предвидим, как цены на акции могут колебаться в ближайшем будущем, используя методы машинного обучения и используя исторические данные в качестве эталона. Наш технический анализ акций позволяет нашим клиентам принимать правильные инвестиционные решения на основе проверенных данных, а не предположений.

Огромные объемы данных, поступающие из различных источников, генерируются на фондовом рынке ежедневно. Выполнение наивного прогноза запасов вручную с использованием прогнозирования временных рядов в файлах Excel/CSV и Python не будет масштабироваться, если потребуется вычислить данные о запасах для сотен компаний и источников данных. Ручной подход неприемлем в сфере исследований и анализа фондового рынка, где отслеживание, обработка и анализ данных в режиме реального времени становятся необходимостью. Цель нашего проекта — программно выполнять прогнозы цен на акции для наших клиентов в масштабе.

В этом проекте мы разработали стандартную платформу обработки и прогнозирования данных на облачных виртуальных машинах Microsoft Azure с использованием Apache Hadoop, Kafka, FBprophet и PySpark.

Архитектура экосистемы Hadoop

Мы создали многоузловой кластер в Hadoop/YARN, который содержит Active и Standby NameNode, 3 DataNode и 1 EdgeNode в распределенной среде Hadoop. Это использовалось для хранения и обработки данных в кластере HDFS параллельным распределенным способом. В то же время фактически хранится более одной копии данных для поддержки переключения на другую.

Мы настраиваем Apache Hive как хранилище данных, созданное поверх Hadoop. Hive предоставляет язык запросов типа SQL для целей анализа данных. Apache Kafka и потоковая передача Spark также были настроены для потоковой передачи данных API в режиме реального времени и обработки исторических данных в формате CSV.

Наш специалист по данным будет использовать Apache Zeppelin, который обеспечивает интерактивный анализ данных и машинное обучение.

ZooKeeper — это проект Apache с открытым исходным кодом, который предоставляет централизованную службу для предоставления информации о конфигурации, именования, синхронизации и групповых служб в большом кластере в распределенной системе.

Вот архитектура экосистемы Hadoop/YARN.

Потоковая передача и конвейер данных

Поскольку я играл роль «архитектора ETL» в проекте, в оставшейся части статьи я сосредоточусь на проектировании конвейера данных, который включает в себя потоковую передачу данных, хранение данных, преобразование данных и пакетное прогнозирование.

Вот архитектура сквозного конвейера данных и интеграции пакетного прогнозирования:

Структурированная потоковая передача Kafka и Spark

Чтобы создавать потоки данных, которые могут передавать данные в инструменты аналитики, как только они будут созданы, и получать почти мгновенные результаты аналитики, я решил настроить Kafka в качестве потоковой платформы, которая может принимать огромные объемы записей данных из различных источников, таких как плоские файлы и РЕСТ API. В проекте я получил данные о частоте каждой акции за одну минуту и ​​исторические данные, вызвав API, предоставленные Alpha Vantage. Я также загрузил CSV-файлы с историческими данными об акциях из Kaggle в специальные папки, отслеживаемые приложениями-производителями Kafka.

Потоки данных (Кафка называет это событиями) были организованы и сохранены в виде тем, которые поддерживали нескольких производителей, публикующих события, и нескольких потребителей, которые подписываются на события.

Затем я использовал Spark Structured Streaming, который представлял собой механизм потоковой обработки, созданный поверх Spark SQL, для создания конвейера для приема и преобразования данных из очередей сообщений Kafka, а затем записал набор данных в выходные приемники в проекте, который Улей склад.

ETL данных с PySpark

Я использовал PySpark с Hive для выполнения ряда процессов преобразования промежуточных данных из различных источников в единый окончательный набор данных, который включал фильтрацию недопустимых данных, например, тестовые данные и ненормальные цены закрытия; исключить поля данных, не добавляющие ценности; значения подстроки для удаления специальных символов и строки без добавления значения; агрегировать данные о запасах по дате и доходности: максимальный максимум/минимум минимум/первое открытие/последнее закрытие/суммарный объем; преобразовать тип данных из строки в дату, двойную и длинную; разделите и загрузите биржевые данные по символам тикера и годам биржевой даты.

Пакетное прогнозирование с помощью FBProphet

Чтобы преодолеть ограничения масштабируемости и своевременно предоставлять прогнозы клиентам, помимо потоковой передачи входных данных и распределенной обработки данных, мы также использовали Facebook Prophet, также известную как FBProphet, процедуру прогнозирования, реализованную на R и Python, которая является быстрой и обеспечивает полностью автоматизированные прогнозы, которые могут быть настроены вручную учеными данных.

Модели Prophet могут быть подобраны только один раз, и новая модель должна быть повторно подобрана, когда станут доступны новые данные. В большинстве случаев подгонка модели выполняется достаточно быстро, поэтому не возникает проблем с повторной подгонкой с нуля. Поэтому я решил настроить пакетное прогнозирование. Я настраиваю задания cron, чтобы обновить окончательный набор данных в Hive, затем запускаю прогнозы, чтобы сделать многошаговый прогноз, подбирая лучшую модель FBProphet к данным и выводя результаты на консоль и электронные письма.

Вот примеры автоматических уведомлений по электронной почте о прогнозах цен на акции на следующие n дней для наших клиентов после завершения запланированных заданий ETL.

Заключительные мысли

Создание масштабируемой и распределенной платформы для обработки и прогнозирования биржевых данных на основе Hadoop в течение 5 недель — нетривиальная задача. Я надеюсь, что эта статья даст тем, кто интересуется такой архитектурой, пищу для размышлений и отправную точку для итераций и улучшений. Особая благодарность товарищам по команде DataVar «Руководителю по эксплуатации» Джун Цян Шен, «Менеджеру проекта» Тайсан Ли и «Ученому по данным» С. Вигнешу за их технические знания, время и усилия, а также лекторам NYP г-ну Фу и доктору Оой за руководство.