PySpark реверсирует StringIndexer во вложенном массиве

Я использую PySpark для совместной фильтрации с помощью ALS. Мои исходные идентификаторы пользователя и элемента являются строками, поэтому я использовал StringIndexer, чтобы преобразовать их в числовые индексы (модель ALS PySpark обязывает нас делать это).

После того, как я подогнал модель, я могу получить 3 основных рекомендации для каждого пользователя:

recs = (
    model
    .recommendForAllUsers(3)
)

Фрейм данных recs выглядит так:

+-----------+--------------------+
|userIdIndex|     recommendations|
+-----------+--------------------+
|       1580|[[10096,3.6725707...|
|       4900|[[10096,3.0137873...|
|       5300|[[10096,2.7274625...|
|       6620|[[10096,2.4493625...|
|       7240|[[10096,2.4928937...|
+-----------+--------------------+
only showing top 5 rows

root
 |-- userIdIndex: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- productIdIndex: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

Я хочу создать огромный дамп JSOM с этим фреймом данных, и мне это нравится:

(
    recs
    .toJSON()
    .saveAsTextFile("name_i_must_hide.recs")
)

и образец этих jsons:

{
  "userIdIndex": 1580,
  "recommendations": [
    {
      "productIdIndex": 10096,
      "rating": 3.6725707
    },
    {
      "productIdIndex": 10141,
      "rating": 3.61542
    },
    {
      "productIdIndex": 11591,
      "rating": 3.536216
    }
  ]
}

Клавиши userIdIndex и productIdIndex связаны с преобразованием StringIndexer.

Как мне вернуть исходное значение этих столбцов? Я подозреваю, что должен использовать IndexToString трансформатор, но я не могу понять, как это сделать, поскольку данные вложены в массив внутри recs Dataframe.

Я пытался использовать оценщик Pipeline (stages=[StringIndexer, ALS, IndexToString]), но похоже, что этот оценщик не поддерживает эти индексаторы.

Ваше здоровье!


person Daniel Severo    schedule 20.08.2017    source источник


Ответы (1)


В обоих случаях вам понадобится доступ к списку ярлыков. Доступ к нему можно получить с помощью StringIndexerModel

user_indexer_model = ...  # type: StringIndexerModel
user_labels = user_indexer_model.labels

product_indexer_model = ...  # type: StringIndexerModel
product_labels = product_indexer_model.labels

или метаданные столбца.

Для userIdIndex вы можете просто подать IndexToString:

from pyspark.ml.feature import IndexToString

user_id_to_label = IndexToString(
    inputCol="userIdIndex", outputCol="userId", labels=user_labels)
user_id_to_label.transform(recs)

Для рекомендаций вам понадобится udf или такое выражение:

from pyspark.sql.functions import array, col, lit, struct

n = 3  # Same as numItems

product_labels_ = array(*[lit(x) for x in product_labels])
recommendations = array(*[struct(
    product_labels_[col("recommendations")[i]["productIdIndex"]].alias("productId"),
    col("recommendations")[i]["rating"].alias("rating")
) for i in range(n)])

recs.withColumn("recommendations", recommendations)
person zero323    schedule 21.08.2017
comment
Потрясающие! Работал :) - person Daniel Severo; 06.10.2017