— ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ. Эта концепция находится в стадии разработки и является чисто экспериментальной, своего рода «кошмары», о которых я мечтаю, и я просто хотел записать ее (включая рабочий код)

Введение

При создании распределенных систем и микрослужб платформы обмена сообщениями и потоковой передачи событий, такие как Kafka, являются популярным выбором для обработки источников событий и передачи сообщений между службами. Однако в некоторых случаях может потребоваться более легкое и гибкое решение. В этой статье мы обсудим альтернативный подход к потоковой передаче событий с использованием Redis и модуля item-store-redis.

import { SortedItemRepository, IItem } from 'item-store-redis'
import Redis from 'ioredis'

const redis = new Redis({ host: 'localhost', port: 6379 })
const repository = new SortedItemRepository<T>('my-sorted-repo', redis)

Использование Redis для потоковой передачи событий

Redis — это хранилище структур данных в памяти, которое поддерживает различные структуры данных, включая отсортированные наборы. Сортированные наборы в Redis позволяют нам хранить и извлекать данные в отсортированном порядке на основе оценки, связанной с каждым элементом. Эта функция делает Redis подходящим для реализации хранилища событий, в котором события можно упорядочивать на основе их метки времени или любых других критериев.

SortedItemRepository

Класс SortedItemRepository реализует интерфейс ISortedItemRepository. Этот класс позволяет нам взаимодействовать с репозиторием отсортированных элементов, который использует Redis в качестве системы хранения. Используя класс SortedItemRepository, мы можем легко создать хранилище событий, которое можно использовать для поиска событий и передачи сообщений между микрослужбами.

/**
 * ISortedItemRepository is an interface that defines the methods for interacting with a
 * sorted item repository that uses Redis as the storage system.
 */
export interface ISortedItemRepository<T> {
  name: string
  redis: Redis | Cluster

  /**
   * Sets an item in the repository.
   * @param item - The item to be added.
   */
  set: (item: IItem<T>) => Promise<void>

  /**
   * Retrieves an item from the repository by its ID.
   * @param id - The ID of the item to retrieve.
   * @returns The item if found, null otherwise.
   */
  getById: (id: string) => Promise<IItem<T> | null>

  /**
   * Retrieves all items from the repository.
   * @returns An array of all items in the repository.
   */
  getAll: () => Promise<IItem<T>[]>

  /**
   * Retrieves a paginated set of items from the repository.
   * @param page - The page number to retrieve.
   * @param pageSize - The number of items per page.
   * @returns An object containing the paginated items and the total count of items.
   */
  getPaginated: (page: number, pageSize: number) => Promise<IPaginatedItems<T>>

  /**
   * Retrieves the score of an item by its ID.
   * @param id - The ID of the item.
   * @returns The score of the item if found, null otherwise.
   */
  getItemScoreById: (id: string) => Promise<number | null>

  /**
   * Retrieves items by their scores within a specified range.
   * @param min - The minimum score value.
   * @param max - The maximum score value.
   * @returns An array of items with scores within the specified range.
   */
  getItemsByScore: (min: number, max: number) => Promise<IItem<T>[]>

  /**
   * Deletes a page of items from the repository.
   * @param page - The page number of items to delete.
   * @param pageSize - The number of items per page.
   */
  deletePage: (page: number, pageSize: number) => Promise<void>

  /**
   * Checks if an item exists in the repository by its ID.
   * @param id - The ID of the item.
   * @returns A boolean indicating whether the item exists.
   */
  hasItem: (id: string) => Promise<boolean>

  /**
   * Deletes an item from the repository by its ID.
   * @param id - The ID of the item to delete.
   */
  deleteById: (id: string) => Promise<void>

  /**
   * Deletes all items from the repository.
   */
  deleteAll: () => Promise<void>

  /**
   * Retrieves the count of items in the repository.
   * @returns The count of items.
   */
  count: () => Promise<number>

  /**
   * Retrieves the first N items from the repository.
   * @param n - The number of items to retrieve.
   * @returns An array of the first N items.
   */
  getFirstNItems: (n: number) => Promise<IItem<T>[]>

  /**
   * Retrieves the last N items from the repository.
   * @param n - The number of items to retrieve.
   * @returns An array of the last N items.
   */
  getLastNItems: (n: number) => Promise<IItem<T>[]>

  /**
   * Retrieves items within a specified index range.
   * @param start - The starting index.
   * @param end - The ending index.
   *
   * @returns An array of items within the specified index range.
   */
  getItemsInRange: (start: number, end: number) => Promise<IItem<T>[]>

  /**
   *
   * Checks if any items exist within a specified score range.
   * @param min - The minimum score value.
   * @param max - The maximum score value.
   * @returns A boolean indicating whether any items exist within the specified score range.
   */
  existsInRange: (min: number, max: number) => Promise<boolean>

  /**
   *
   * Retrieves the next N items with scores greater than a specified score.
   * @param score - The score to compare.
   * @param n - The number of items to retrieve.
   * @returns An array of the next N items with scores greater than the specified score.
   */
  getNextNItemsGreaterThanScore: (score: number, n: number) => Promise<IItem<T>[]>
}


Публикация событий

Микросервисы могут использовать класс SortedItemRepository для публикации событий, вызывая метод set:

import { Redis } from 'ioredis'
import { SortedItemRepository, IItem } from 'item-store-redis'

const redis = new Redis(/* Redis configuration */)
const sortedRepository = new SortedItemRepository<EventData>('my-service-tx-logs', redis)

async function publishEvent(event: IItem<EventData>): Promise<void> {
  await sortedRepository.set(event)
}

Подписка и обработка событий

Микросервисы могут подписываться на события, используя метод getNextNItemsGreaterThanScore класса SortedItemRepository. Это позволяет им извлекать события с оценкой выше указанного значения, что можно использовать для реализации источника событий:

async function pollEvents(lastSeenScore: number): Promise<IItem<EventData>[]> {
  const events = await sortedRepository.getNextNItemsGreaterThanScore(lastSeenScore, 10)

  for (const event of events) {
    // Process the event
    console.log(`Processing event: ${event.id}`)

    // Update the last seen score
    lastSeenScore = await sortedRepository.getItemScoreById(event.id)
  }

  // Continue polling for new events
  setTimeout(() => pollEvents(lastSeenScore), 1000)
}

// Start polling for events
pollEvents(0)

Этот пример демонстрирует простой механизм опроса для подписки на события.

В реальном сценарии вам может понадобиться реализовать более продвинутый механизм подписки с использованием Redis Pub/Sub или других шаблонов обмена сообщениями. Мы также не рассматриваем здесь, как сохранить смещение (последнее обработанное событие для группы подписчиков).

Также обратите внимание, что для работы этого подхода нам необходимо убедиться, что работает только один подписчик (на группу), чтобы избежать дублирования обработки событий. Дополнительные возможности, такие как «распределенная блокировка», будут введены для поддержки нескольких работающих абонентов.

Очистка репозитория (также известного как журнал транзакций)

Со временем объем хранимых событий в вашем репозитории может вырасти, что потребует регулярной очистки. Класс SortedItemRepository предоставляет для этой цели удобный метод deletePage. Этот метод позволяет вам удалить указанную страницу элементов из репозитория, предоставляя простой способ управления размером ваших сохраненных данных.

Вот как вы можете использовать метод deletePage для очистки репозитория:

const pageNumber = 1;
const pageSize = 10;

// Delete a page of items from the repository
await sortedRepository.deletePage(pageNumber, pageSize);

В приведенном выше примере мы удаляем первую страницу элементов с размером страницы 10 элементов. Вы можете настроить параметры pageNumber и pageSize в соответствии с вашими требованиями к очистке.

Важно тщательно спланировать стратегию очистки с учетом таких факторов, как политики хранения данных, емкость хранилища и влияние на производительность. Вы также можете автоматизировать процесс очистки, запланировав периодическую очистку или инициировав очистку на основе определенных условий, таких как достижение определенного размера репозитория или по прошествии определенного времени с момента сохранения событий. Правильно управляя размером репозитория, вы можете оптимизировать использование хранилища и поддерживать производительность системы.

Выводы

Используя класс SortedItemRepository и Redis, мы продемонстрировали, как микросервисы могут выполнять источник событий в архитектуре на основе Redis. Этот подход предлагает более легкую и гибкую альтернативу традиционным решениям для поиска событий, таким как Kafka, особенно для небольших наборов данных.

Одним из ключевых преимуществ использования Redis является его простота. Как хранилище структуры данных в памяти, Redis прост в настройке и управлении, что делает его отличным выбором для разработчиков, которым необходимо быстро создавать и развертывать решения для источников событий. Кроме того, класс SortedItemRepository абстрагирует сложность взаимодействия с Redis, предоставляя простой и удобный для разработчиков интерфейс для работы с отсортированными наборами.

С точки зрения производительности Redis отлично справляется с обработкой небольших наборов данных, поскольку хранит все данные в памяти. Это приводит к невероятно быстрым операциям чтения и записи, что может привести к значительному повышению производительности по сравнению с дисковыми системами хранения, такими как Kafka. Это делает Redis привлекательным вариантом для сценариев, в которых обработка событий с малой задержкой является приоритетом.

Важно отметить, что, хотя в некоторых случаях Redis может быть мощным инструментом для поиска событий, он может быть не лучшим выбором для каждой ситуации. Операторы должны убедиться, что Redis работает в режиме высокой доступности (HA) или в постоянном режиме, чтобы избежать потери данных. Таким образом, они могут использовать возможности Redis, сохраняя при этом целостность и надежность данных.
Для крупномасштабных приложений с высокой пропускной способностью Kafka и другие распределенные системы обмена сообщениями все еще могут быть более подходящими из-за их надежных возможностей разделения и отказоустойчивости.

Однако для небольших и средних наборов данных простота и преимущества производительности источника событий на основе Redis с помощью SortedItemRepository делают его привлекательной альтернативой, которую стоит поэкспериментировать.