Анализируйте транзакции по кредитной карте в режиме реального времени, чтобы определить, достигает ли клиент кредитного лимита карты.

Мы, как потребители, ежедневно пользуемся кредитными картами. Но, к сожалению, иногда наше покупательское поведение приводит к тому, что кредитная карта исчерпывается, а мы не узнаем об этом. Так что было бы неплохо, если бы компания, выпускающая мою кредитную карту, давала мне дружеское предупреждение, когда мои общие расходы достигают определенных пороговых значений моего кредитного лимита.

Например:

  • Отправлять мне электронное письмо, когда достигается 50% кредитного лимита.
  • Отправка SMS при достижении 75% кредитного лимита.
  • Звонит мне, когда он достигает 90%.

Такое предупреждение заставляет меня осознать мои привычки тратить и заставляет с осторожностью относиться к будущим покупкам.

В этой статье я расскажу вам, как реализовать эту логику в типичной компании, выпускающей кредитные карты. Для этого я использую потоковую обработку в реальном времени с WSO2 Streaming Integrator в качестве движка потоковой обработки.

Интегратор потоковой передачи WSO2

Прежде чем продолжить, позвольте мне познакомить вас с нашим потоковым процессором WSO2 Streaming Integrator. Это движок потоковой обработки с открытым исходным кодом, поддерживаемый WSO2.

При этом вы пишете приложения для потоковой обработки, используя Siddhi, язык запросов с открытым исходным кодом для потоковой обработки. Вы обнаружите, что сиддхи очень близок к стандартному диалекту SQL, что позволяет вам выучить его быстрее.

Типичное приложение Сиддхи получает события через источники. Затем вы можете выполнить некоторую обработку в середине, такую ​​как преобразования, агрегаты, корреляции и т. Д. Наконец, обработанные события покидают Streaming Integrator через приемники.

Чтобы узнать больше о языке сиддхи, посетите здесь.

Архитектура решения

Готовое решение будет выглядеть следующим образом.

Во-первых, у нас есть транзакции по кредитным картам, поступающие в Streaming Integrator в виде потока событий. Чтобы упростить задачу, давайте не будем слишком беспокоиться о том, откуда они берутся.

Затем база данных содержит две таблицы; клиенты и прошлые_транзакции. На рисунке ниже показаны их схемы и взаимосвязь.

Когда событие транзакции поступает в Streaming Integrator, оно отправляется в приложение Siddhi, которое содержит логику обработки. Затем событие объединяется с двумя вышеупомянутыми таблицами для дальнейшего обогащения.

Наконец, обогащенное событие будет оцениваться по трем пороговым уровням, чтобы определить, какое действие следует предпринять.

Имея в виду эту высокоуровневую архитектуру, давайте перейдем к ее реализации.

Пошаговое руководство по решению

Мы начинаем с создания нового приложения Siddhi внутри инструментария Streaming Integrator.

Создайте поток для получения событий транзакции

Первым шагом является определение Потока для представления событий входящих транзакций по карте.

Предположим, типичное событие транзакции имеет следующий формат.

Это определение потока представляет собой поток транзакционных событий внутри приложения Siddhi.

define stream transactions (id int, customer_id int, amount double);

Для простоты я не буду заострять внимание на том, где я получаю эти события. Они могут поступать из темы Kafka, очереди сообщений или даже базы данных. Здесь я буду использовать встроенный симулятор для отправки событий в поток транзакций, поскольку я хочу больше сосредоточиться на логике обработки, чем на подключении событий.

При необходимости вы можете аннотировать поток с помощью Source, чтобы указать, где приложение должно читать события. Ниже приведен пример конфигурации источника для чтения из темы Kafka.

@source(type='kafka',
        topic.list='kafka_sample_topic',
        partition.no.list='0',
        threading.option='single.thread',
        group.id="group",
        bootstrap.servers='localhost:9092',
 @map(type='text',fail.on.missing.attribute='true', regex.A='(id):(.*)', regex.B='(amount):([-.0-9]+)',
                @attributes(id = 'A[2]', amount = 'B[2]')))
define stream SweetProductionStream(id string, amount double);

Определите таблицы для объединения потоков и обогащения

Когда поток создан, мне нужно определить две таблицы для чтения из таблиц past_transactions и customers. Таблица представляет собой набор событий, считанных из внешнего хранилища, такого как база данных или поисковый индекс. Здесь я использую аннотации Store, чтобы указать детали подключения к базе данных.

@Store(type="rdbms",
       jdbc.url="jdbc:mysql://localhost:3306/foobank?useSSL=false",
       username="root",
       password="*****" ,
       jdbc.driver.name="com.mysql.jdbc.Driver")
@PrimaryKey("id")
define table past_transactions (id int, customer_id int, amount double);
@Store(type="rdbms",
       jdbc.url="jdbc:mysql://localhost:3306/foobank?useSSL=false",
       username="root",
       password="*****" ,
       jdbc.driver.name="com.mysql.jdbc.Driver")
@PrimaryKey("customer_id")
define table customers (customer_id int, credit_limit double);

Обратите внимание, что вам нужно будет использовать точное имя таблицы базы данных при именовании таблиц в Сиддхи.

Узнайте сумму прошлых транзакций, связанных с этим клиентом.

Затем я присоединяюсь к потоку transactions с таблицей past_transactions, чтобы рассчитать общие расходы клиента на данный момент. Соединение основано на атрибуте customer_id.

@info(name='Calculate the total spend so far.')
from past_transactions as p join transactions as t
on p.customer_id == t.customer_id
select p.customer_id as customer_id, t.amount + sum(p.amount) as total
insert into tempStream;

Приведенный выше запрос сиддхи очень похож на стандартный диалект SQL. Он объединяет поток и таблицу, вычисляет сумму amount и проецирует вывод в промежуточный поток с именем tempStream.

Событие, исходящее из tempStream, теперь имеет следующую структуру.

{customer_id,total}

Обогатите tempStream, присоединившись к таблице клиентов

Теперь у нас есть общие расходы для клиента. Следующим шагом будет пополнение tempStream кредитным лимитом клиента.

Кредитный лимит клиента указан в таблице клиенты. Поэтому я присоединяюсь к таблице tempStream с таблицей customers на основе customer_id.

@info(name='Enrich with the credit limit.')
from tempStream as t join customers as c
on t.customer_id == c.customer_id 
select t.customer_id, t.total, c.credit_limit
insert into alertStream;

Вышеупомянутый запрос создает событие для alertStream, которое содержит customer_id, общие расходы и кредитный лимит.

{customer_id,total,credit_limit}

Проверьте alertStream и выполните необходимые действия

Посмотрев на значения alertStream, мы можем определить, следует ли отправлять электронное письмо, SMS или звонить на мобильный.

Чтобы определить соответствующее действие, мы можем использовать следующее выражение. Пороговые значения могут составлять 50%, 75% и 90%.

Такие действия, как отправка SMS или электронного письма, считаются побочными эффектами. Как правило, логика потоковой обработки не обрабатывает побочные эффекты. Вместо этого он делегируется внешней системе. Это делает логику обработки чистой и независимой.

В Сиддхи мы делаем это, записывая события в раковину. Sinks принимает события из потоков и публикует их на внешние конечные точки по различным транспортным протоколам и форматам. Например, чтобы отправить электронное письмо, вы можете использовать приемник типа email.

Поэтому я определяю три дополнительных потока следующим образом. Для простоты я использую приемник журнала по умолчанию, который выводит содержимое событий на консоль. Но вы можете использовать в Сиддхи другие типы приемников, такие как электронная почта, Kafka, файл и т. Д.

@sink(type='log')
define stream smsStream (customer_id int);
@sink(type='log')
define stream emailStream (customer_id int);
@sink(type='log')
define stream callCenterStream (customer_id int);

Следующие запросы проверяют alertStream на наличие различных пороговых значений и записывают в соответствующие приемники.

--beginning of side effects
@info(name='Add to SMS stream')
from alertStream[total > credit_limit*0.5 and total < credit_limit*0.75]
select customer_id
insert into smsStream;
@info(name='Add to Email stream')
from alertStream[total > credit_limit*0.75 and total < credit_limit*0.9]
select customer_id
insert into emailStream;
@info(name='Add to call center stream')
from alertStream[total > credit_limit*0.9]
select customer_id
insert into callCenterStream;

Последнее приложение Сиддхи выглядит так.

Выводы - что можно было сделать лучше

Моя бизнес-логика - вымышленная. Так что не используйте это в продакшене :)

Хотя эта реализация дает вам хорошее начало, все еще есть возможности для улучшения. Например, вместо расчета суммы прошлых расходов для каждого события следует использовать кэшированную предварительно вычисленную агрегацию. Это хорошо масштабируется по количеству записей транзакций в таблице.

Также, когда вы пишете в стоки, обогатите финальное мероприятие информацией о клиентах, насколько это возможно. Например, при записи в приемник электронной почты укажите адрес электронной почты вместе с событием.

Ключевым выводом здесь является проактивное уведомление клиентов в режиме реального времени. Это избавит их от многих неприятностей, обеспечит безопасность транзакций и повысит лояльность банка к бренду.

Если вам нужна помощь с кодом, свяжитесь со мной через Twitter.