Практический пример того, как привнести ценности в ваш бизнес с помощью науки о данных

Предположить. Что сейчас является самым ценным активом в мире?

Это не золото, не сырая нефть… это данные. Вы, должно быть, слышали о популярном модном слове «большие данные» и задаетесь вопросом, что именно означает этот термин. Подумайте о своих любимых сервисах потоковой передачи музыки - Spotify, Pandora… и т. Д. Каждую секунду во всем мире множество разных пользователей входят в сервис, имеют свою долю взаимодействий с сервисом. Поскольку каждое движение соответствует точке собираемых данных, вы можете представить себе проблемы, связанные с хранением таких больших данных.

К счастью, эти большие наборы данных могут принести реальную пользу бизнесу, если они будут использоваться правильно. В этом сообщении блога мы обсудим типичный вариант использования этих больших наборов данных - прогнозирование ухода клиентов.

Что такое отток

Отбрасывание относится к поведению незаметной потери клиентов из-за большого количества клиентов. Некоторые из оттесненных клиентов могут обратиться к конкурентам, а некоторые из них могут просто покинуть сервисы навсегда.

Вначале это может быть незаметно, но эффект от таких потерь со временем накапливается. Как лица, принимающие решения в этих поставщиках услуг потоковой передачи музыки, мы хотели бы понять факторы, способствующие такому поведению, которое будет служить основной целью нашего проекта сегодня.

Мы начнем с изучения доступного набора данных.

Полный набор данных - это журнал пользователя, который хранится в файле .json размером 12 ГБ - размер, который может быть обработан только некоторыми инструментами для работы с большими данными, такими как Spark. Чтобы полностью понять доступные поля, мы начнем с получения небольшого подмножества данных (~ 128 МБ) для исследовательского анализа данных на одной машине. Мы загрузим наш набор данных с помощью следующих команд:

# create a Spark session
spark = (SparkSession.builder 
                    .master("local") 
                    .appName("Creating Features") 
                    .getOrCreate())
# Read in .json file as events
events = spark.read.json('mini_sparkify_event_data.json')
events.persist()

В Spark есть отличный однострочный ярлык для отображения всех полей и соответствующих им типов данных.

events.printSchema()
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

Посмотрев на заголовок набора данных, мы узнаем, что все действия пользователя записываются в столбце page.

В то время как большинство пользователей продолжают играть следующую песню, некоторые пользователи переходят на страницу «Отмена» - и все они подтвердили свою отмену на странице «Подтверждение отмены» (см. Такое же количество «Отмена» и «Подтверждение отмены» S. Мы определим активность по замене, в частности, как количество подтверждений отмены. Пользователи со столбцом page с надписью "Подтверждение отмены" будут отталкиваемыми пользователями, которыми мы конкретно являемся. увлекающийся.

Разработка функций

Мы успешно определили отток пользователей, теперь пришло время использовать наш деловой образ мышления, чтобы подумать о факторах, способствующих оттоку пользователей. Что бы вы сделали, если бы действительно ненавидели свои сервисы потоковой передачи музыки? Я придумал следующие 7 функций,

  1. Типичное взаимодействие с пользователем

Мы могли бы разумно ожидать, что некоторые другие взаимодействия с пользователем в конечном итоге приведут к оттоку нашего пользователя. Мы могли бы использовать коробчатую диаграмму для выполнения фильтрации первого уровня. Коробчатые диаграммы эффективно помогают нам визуализировать минимум, 25-й процентиль, среднее, 75-й процентиль и максимум для определенного распределения данных. Построив ящичковые диаграммы как для уволенных, так и для непереключенных пользователей, мы могли четко интерпретировать разницу между двумя типами пользователей для конкретного взаимодействия.

Следующие взаимодействия показали значительные различия в распределении между двумя группами,

  • Добавить друзей - уволенные пользователи с меньшей вероятностью добавят друзей.
  • Добавить в плейлист: уволенные пользователи с меньшей вероятностью добавят в плейлисты.
  • Обновление. У уволенных пользователей есть широкий спектр действий по обновлению.
  • NextSong - уволенные пользователи с меньшей вероятностью будут играть следующую песню.
  • ThumbsUp - уволенные пользователи реже поднимают палец вверх.
  • Ролл-реклама - уволенные пользователи имеют более широкий разброс по рулонной рекламе.
  • Настройки: уволенные пользователи реже посещают страницу настроек.
  • Выход: уволенные пользователи с меньшей вероятностью будут выходить из системы (из-за меньшего количества входов в систему).
  • Справка - пользователи, которые не были уволены, чаще обращаются за помощью.
  • Главная - уволенные пользователи реже посещают главную страницу.

Обратите внимание, что все наши взаимодействия с пользователем находятся в одном столбце, нам нужно будет развернуть и агрегировать общее количество определенного взаимодействия для каждого клиента. Основываясь на вышеупомянутой фильтрации, мы отбросим некоторые из менее значимых взаимодействий.

events = events.drop('firstName', 'lastName', 'auth',
                      'gender', 'song','artist',
                      'status', 'method', 'location', 
                      'registration', 'itemInSession')
events_pivot = (events.groupby(["userId"])
                      .pivot("page")
                      .count()
                      .fillna(0))
events_pivot = events_pivot.drop('About', 'Cancel', 'Login',  
                                 'Submit Registration','Register',
                                 'Save Settings')

2. Среднее время воспроизведения музыки

Для себя я, вероятно, использовал бы его короче, чем обычные пользователи. Таким образом, средняя продолжительность времени, которое пользователь потратил на воспроизведение музыки, будет действительно важным фактором. Простая визуализация показала подтверждающие результаты ниже.

Мы добавим эту функцию в нашу таблицу events_pivot,

# filter events log to contain only next song
events_songs = events.filter(events.page == 'NextSong')
# Total songs length played
total_length = (events_songs.groupby(events_songs.userId)
                           .agg(sum('length')))
# join events pivot
events_pivot = (events_pivot.join(total_length, on = 'userId', 
                                  how = 'left')
                     .withColumnRenamed("Cancellation Confirmation",
                                        "Churn")
                     .withColumnRenamed("sum(length)", 
                                        "total_length"))

3. Количество активных дней

Мы также ожидаем разницы в количестве активных дней между оттоком / неотбранными группами. Поскольку столбец datetime содержит только единицы измерения в секундах, нам нужно будет использовать оконную функцию, чтобы агрегировать общее время активности для каждого клиента и преобразовать значение в дни. Мы добавим эту функцию в events_pivot.

convert = 1000*60*60*24 # conversion factor to days
# Find minimum/maximum time stamp of each user
min_timestmp = events.select(["userId", "ts"])
                     .groupby("userId")
                     .min("ts")
max_timestmp = events.select(["userId", "ts"])
                     .groupby("userId")
                     .max("ts")
# Find days active of each user
daysActive = min_timestmp.join(max_timestmp, on="userId")
daysActive = (daysActive.withColumn("days_active", 
                      (col("max(ts)")-col("min(ts)")) / convert))
daysActive = daysActive.select(["userId", "days_active"])
# join events pivot
events_pivot = events_pivot.join(daysActive, 
                                 on = 'userId',
                                 how = 'left')

4. Количество дней в качестве платных пользователей

Точно так же мы могли бы рассчитать количество дней в качестве платных пользователей с помощью оконной функции, нам нужно только добавить фильтр, чтобы покупатель был платным пользователем.

# Find minimum/maximum time stamp of each user as paid user
paid_min_ts = events.filter(events.level == 'paid')
                    .groupby("userId").min("ts")
paid_max_ts = events.filter(events.level == 'paid')
                    .groupby("userId").max("ts")
# Find days as paid user of each user
daysPaid = paid_min_ts.join(paid_max_ts, on="userId")
daysPaid = (daysPaid.withColumn("days_paid", 
                    (col("max(ts)")-col("min(ts)")) / convert))
daysPaid = daysPaid.select(["userId", "days_paid"])
# join events pivot
events_pivot = events_pivot.join(daysPaid,
                                 on = 'userId', 
                                 how='left')

5. Количество дней бесплатного пользования.

Теперь, используя бесплатный фильтр пользователей, мы можем найти количество дней в качестве бесплатного пользователя для каждого клиента,

# Find minimum/maximum time stamp of each user as paid user
free_min_ts = events.filter(events.level == 'free')
                    .groupby("userId").min("ts")
free_max_ts = events.filter(events.level == 'free')
                    .groupby("userId").max("ts")
# Find days as paid user of each user
daysFree = free_min_ts.join(free_max_ts, on="userId")
daysFree = (daysFree.withColumn("days_free", 
                        (col("max(ts)")-col("min(ts)")) / convert))
daysFree = daysFree.select(["userId", "days_free"])
# join events pivot
events_pivot = events_pivot.join(daysFree, 
                                on = 'userId', 
                                how='left')

6. Количество сеансов

Количество музыкальных сессий также могло быть фактором. Поскольку sessionId доступен в этом наборе данных, мы можем напрямую подсчитать количество уникальных идентификаторов для каждого пользователя с помощью предложения groupby.

# count the number of sessions
numSessions = (events.select(["userId", "sessionId"])
                      .distinct()
                      .groupby("userId")
                       .count()
                      .withColumnRenamed("count", "num_sessions"))
# join events pivot
events_pivot = events_pivot.join(numSessions,
                                 on = 'userId',
                                 how = 'left')

7. Агент доступа пользователей

Служба потоковой передачи может иметь разную производительность на разных пользовательских агентах. Мы постараемся добавить этот фактор в модель. Поскольку существует 56 различных пользовательских агентов, мы будем использовать горячий кодировщик Spark, чтобы превратить эти разные пользовательские агенты в вектор.

# find user access agents, and perform one-hot encoding on the user 
userAgents = events.select(['userId', 'userAgent']).distinct()
userAgents = userAgents.fillna('Unknown')
# build string indexer
stringIndexer = StringIndexer(inputCol="userAgent",      
                              outputCol="userAgentIndex")
model = stringIndexer.fit(userAgents)
userAgents = model.transform(userAgents)
# one hot encode userAgent column
encoder = OneHotEncoder(inputCol="userAgentIndex", 
                        outputCol="userAgentVec")
userAgents = encoder.transform(userAgents)
                    .select(['userId', 'userAgentVec'])
# join events pivot
events_pivot = events_pivot.join(userAgents, 
                                 on = 'userId',
                                 how ='left')

Построение модели

После того, как мы разработали соответствующие функции, мы построим три модели - логистическую регрессию, случайный лес и деревья градиентного повышения. Чтобы избежать написания избыточного кода, мы будем создавать объекты stage и создавать конвейеры с другим классификатором в конце конвейера.

# Split data into train and test set
events_pivot = events_pivot.withColumnRenamed('Churn', 'label')
training, test = events_pivot.randomSplit([0.8, 0.2])
# Create vector from feature data
feature_names = events_pivot.drop('label', 'userId').schema.names
vec_asembler = VectorAssembler(inputCols = feature_names,
                               outputCol = "Features")
# Scale each column
scalar = MinMaxScaler(inputCol="Features", 
                      outputCol="ScaledFeatures")
# Build classifiers
rf = RandomForestClassifier(featuresCol="ScaledFeatures", 
                            labelCol="label",
                            numTrees = 50, 
                            featureSubsetStrategy='sqrt')
lr = LogisticRegression(featuresCol="ScaledFeatures",  
                        labelCol="label", 
                        maxIter=10,
                        regParam=0.01)
gbt = GBTClassifier(featuresCol="ScaledFeatures", 
                    labelCol="label")
# Consturct 3 pipelines
pipeline_rf = Pipeline(stages=[vec_asembler, scalar, rf])
pipeline_lr = Pipeline(stages=[vec_asembler, scalar, lr])
pipeline_gbt = Pipeline(stages=[vec_asembler, scalar, gbt])
# Fit the models
rf_model = pipeline_rf.fit(training)
lr_model = pipeline_lr.fit(training)
gbt_model = pipeline_gbt.fit(training)

теперь три объекта rf_model, lr_model, gbt_model представляют 3 разные подогнанные модели.

Оценка модели

Мы проверим характеристики подогнанных моделей и выберем в качестве окончательной модели ту, которая имеет наилучшие характеристики. Мы начнем с создания функции специально для этой цели,

def modelEvaluations(model, metric, data):
    """ Evaluate a machine learning model's performance 
        Input: 
            model - pipeline object
            metric - the metric of the evaluations
            data - data being evaluated
        Output:
            [score, confusion matrix]
    """
    # generate predictions
    evaluator = MulticlassClassificationEvaluator(
                metricName = metric)
    predictions = model.transform(data)
    
    # calcualte score
    score = evaluator.evaluate(predictions)
    confusion_matrix = (predictions.groupby("label")
                                   .pivot("prediction")
                                   .count()
                                   .toPandas())
    return [score, confusion_matrix]

Мы вызовем указанную выше функцию для оценки вышеуказанных моделей.

f1_rf, conf_mtx_rf = modelEvaluations(rf_model, 'f1', test)
f1_lr, conf_mtx_lr = modelEvaluations(lr_model, 'f1', test)
f1_gbt, conf_mtx_gbt = modelEvaluations(gbt_model, 'f1', test)

Модель повышения градиента показала лучшую производительность (F1 Score) в тестовом наборе. Оценка F1 определяется как среднее гармоническое значение точности и запоминания, рассчитываемое по следующей формуле:

Точность - это вычисление доли положительных определений классов, являющихся правильными, в математических выражениях это

точность = tp / (tp + fp)

Напоминание - это вычисление доли фактических положительных образцов класса, идентифицируемых правильно, в математических выражениях это:

отзыв = tp / (tp + fn)

Вам может быть интересно, почему мы предпочитаем гораздо более сложную метрику, чем наиболее интуитивно понятную точность - это связано с существованием распределений классов дисбаланса в наборе данных. Поскольку в конечном итоге отток пользователей произойдет лишь у небольшой части пользователей, мы хотели бы, чтобы наша модель правильно их идентифицировала, а не добивалась высокой общей эффективности. Представьте себе, что если только 6% клиентов будут оттока в рамках истинного распределения населения, прогнозирование всех как неоттока все равно даст нам точность 94%. С другой стороны, оценка F1 будет штрафовать за плохие результаты в одном классе, что эффективно смягчит такие проблемы. Характеристика класса дисбаланса будет присутствовать в каждой задаче прогнозирования оттока - F1 всегда будет метрикой для использования в будущем.

Важность функции

Мы будем использовать функцию важности функции и визуализировать относительный рейтинг важности каждой функции, которую мы создали. Поскольку последняя функция userAgentVec фактически является вектором с горячим кодированием, мы будем рассматривать функцию userAgentVec как единое целое. В приведенном ниже коде будут суммированы все значения важности функций для всех подфункций, полученных из вектора с горячим кодированием.

feature_importances = np.array(gbt_model.stages[-1]
                               .featureImportances)
userAgentVec = feature_importances[len(feature_names) :].sum()
feature_importances = feature_importances[:len(feature_names)]
                      + [userAgentVec]

Теперь мы наносим на график важность функции для деревьев с усилением градиента.

Большинство созданных нами функций являются очень важными факторами, способствующими оттоку пользователей, причем наиболее важным фактором является days_active.

Запуск полного набора данных

Мы создали соответствующую структуру - мы готовы выполнить те же шаги, что и выше, чтобы позволить модели работать с полным набором данных 12 ГБ с использованием сервиса AWS EMR. Инициализируем сессию следующим образом

# Create spark session
spark = (SparkSession 
        .builder 
        .appName("Sparkify") 
        .getOrCreate())
# Read in full sparkify dataset
event_data = "s3n://dsnd-sparkify/sparkify_event_data.json"
events = spark.read.json(event_data)

Повторять шаги снова не будем - полный скрипт я прикрепил на сайте nbviewer.

В конце концов, модель повышения градиента дала F1 балл 0,8896, что является отличной производительностью.

+-----+----+---+
|label| 0.0|1.0|
+-----+----+---+
|    0|1612| 70|
|    1| 163|344|
+-----+----+---+

Бизнес-стратегия

Какое путешествие мы прошли - но еще не завершили нашу миссию. В мире науки о данных за каждой моделью стоит бизнес-намерение. С учетом важности функций, которые мы разработали, мы определенно могли бы придумать некоторые бизнес-стратегии для противодействия оттоку клиентов. Мы кратко обсудим две возможные стратегии, которые действительно будут определять некоторые ценности для наших поставщиков.

Мы знаем, что количество активных дней является наиболее важным фактором, мы могли бы посоветовать высшему руководству создать систему вознаграждений, чтобы побуждать малоактивных пользователей оставаться в сети в течение длительных периодов времени .

Кроме того, поскольку количество агентов, которые пользователи использовали для доступа к службе, также весьма значительны, мы также можем выявить неэффективного агента и заставить нашу команду инженеров работать специально для решения проблемы. .

Надеюсь, вам понравилось читать этот пост так же, как мне нравится его писать. Вместе мы использовали Spark, платформу для анализа больших данных, чтобы построить непрерывный рабочий процесс машинного обучения для выявления потенциальных клиентов, которые предпочитают сервисы потоковой передачи музыки, из пользовательских журналов. Мы выполнили следующие шаги,

  1. Исследовательский анализ данных
  2. Функциональная инженерия
  3. Построение модели
  4. Оценка модели
  5. Увеличить масштаб модели
  6. Бизнес-стратегии

Из-за огромного размера данных, с которыми мы работаем, выполнение поиска по гиперпараметрам было особенно трудным. Можно представить, что оставлять блокнот открытым на длительное время - не самый эффективный способ использовать наши ресурсы. Некоторые из хороших приемов использования ssh для обеспечения работоспособности программы определенно могут помочь в решении этой проблемы.

Если есть что-то, что мы должны вынести из этого фрагмента - это возвращается к тому моменту, который мы упомянули в начале рассказа. Как блестящие инженеры, нас часто тянет к созданию лучших высокопроизводительных моделей с помощью различных методов, но наша модель должна решать бизнес-задачи, чтобы быть полезной.