Преобразование строки формата libsvm (поле1: значение, поле2: значение) в DenseVector значений

У меня есть столбец в формате libsvm (библиотека искры ml) field1:value field2:value ...

+--------------+-----+
|      features|label|
+--------------+-----+
|   a:1 b:2 c:3|    0|
|   a:4 b:5 c:6|    0|
|   a:7 b:8 c:9|    1|
|a:10 b:11 c:12|    0|
+--------------+-----+

Я хочу извлечь значения и сохранить их в массивах для каждой строки в pyspark

features.printSchema()

root
 |-- features: string (nullable = false)
 |-- label: integer (nullable = true)

Я использую следующий udf, потому что затронутый столбец является частью фрейма данных

from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors

features_expl = udf(lambda features: Vectors.dense(features.split(" ")).map(lambda feat: float(str(feat.split(":")[1]))))
features=features.withColumn("feats", features_expl(features.features))

В результате я получаю следующее: ValueError: не удалось преобразовать строку в float: mobile: 0.0 Кажется, что он не выполняет второе разбиение и вызывает float () для строки.

Я бы хотел получить:

+--------------+-----+
|      features|label|
+--------------+-----+
|     [1, 2, 3]|    0|
|     [4, 5, 6]|    0|
|     [7, 8, 9]|    1|
|  [10, 11, 12]|    0|
+--------------+-----+

person Georgios Kourogiorgas    schedule 19.06.2019    source источник


Ответы (1)


У вас есть две серьезные проблемы с вашим udf. Во-первых, все работает не так, как вы планировали. Рассматривайте суть вашего кода как следующую функцию:

from pyspark.ml.linalg import Vectors
def features_expl_non_udf(features): 
    return Vectors.dense(
        features.split(" ")).map(lambda feat: float(str(feat.split(":")[1]))
    )

Если вы вызовете его на одной из своих строк:

features_expl_non_udf("a:1 b:2 c:3")
#ValueError: could not convert string to float: a:1

Поскольку features.split(" ") возвращает ['a:1', 'b:2', 'c:3'], который вы передаете конструктору Vectors.dense. В этом нет никакого смысла.

То, что вы намеревались сделать, было сначала разделить на пробел, а затем разбить каждое значение результирующего списка на :. Затем вы можете преобразовать эти значения в float и передать список в Vectors.dense.

Вот правильная реализация вашей логики:

def features_expl_non_udf(features): 
    return Vectors.dense(map(lambda feat: float(feat.split(":")[1]), features.split()))
features_expl_non_udf("a:1 b:2 c:3")
#DenseVector([1.0, 2.0, 3.0])

Вторая проблема с вашим udf заключается в том, что вы не указали returnType. Для DenseVector вам необходимо используйте VectorUDT в качестве returnType.

from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT

features_expl = udf(
    lambda features: Vectors.dense(
        map(lambda feat: float(feat.split(":")[1]), features.split())
    ),
    VectorUDT()
)
features.withColumn("feats", features_expl(features.features)).show()
#+--------------+-----+----------------+
#|      features|label|           feats|
#+--------------+-----+----------------+
#|   a:1 b:2 c:3|    0|   [1.0,2.0,3.0]|
#|   a:4 b:5 c:6|    0|   [4.0,5.0,6.0]|
#|   a:7 b:8 c:9|    1|   [7.0,8.0,9.0]|
#|a:10 b:11 c:12|    0|[10.0,11.0,12.0]|
#+--------------+-----+----------------+

В качестве альтернативы вы можете выполнить часть обработки строк на стороне искры, используя _ 17_ и _ 18_, но вам все равно придется использовать udf для преобразования окончательного вывода в DenseVector.

from pyspark.sql.functions import regexp_replace, split, udf
from pyspark.ml.linalg import Vectors, VectorUDT

toDenseVector = udf(Vectors.dense, VectorUDT())

features.withColumn(
    "features",
    toDenseVector(
        split(regexp_replace("features", r"\w+:", ""), "\s+").cast("array<float>")
    )
).show()
#+----------------+-----+
#|        features|label|
#+----------------+-----+
#|   [1.0,2.0,3.0]|    0|
#|   [4.0,5.0,6.0]|    0|
#|   [7.0,8.0,9.0]|    1|
#|[10.0,11.0,12.0]|    0|
#+----------------+-----+
person pault    schedule 19.06.2019
comment
Большое спасибо. Итак, тип возврата нужен только для UDF? Потому что у вас его нет в коде def ... - person Georgios Kourogiorgas; 19.06.2019
comment
@GeorgiosKourogiorgas да, def - это чистая функция Python. Для udfs, которые будут использоваться в Spark, вам понадобится returnType. Если вы не укажете, по умолчанию будет StringType. - person pault; 19.06.2019
comment
Какие исходные функции содержат значение None, например ['"b":2','"a":1 "b":2 "c":3'] - person rosefun; 29.07.2020