У вас есть две серьезные проблемы с вашим udf
. Во-первых, все работает не так, как вы планировали. Рассматривайте суть вашего кода как следующую функцию:
from pyspark.ml.linalg import Vectors
def features_expl_non_udf(features):
return Vectors.dense(
features.split(" ")).map(lambda feat: float(str(feat.split(":")[1]))
)
Если вы вызовете его на одной из своих строк:
features_expl_non_udf("a:1 b:2 c:3")
#ValueError: could not convert string to float: a:1
Поскольку features.split(" ")
возвращает ['a:1', 'b:2', 'c:3']
, который вы передаете конструктору Vectors.dense
. В этом нет никакого смысла.
То, что вы намеревались сделать, было сначала разделить на пробел, а затем разбить каждое значение результирующего списка на :
. Затем вы можете преобразовать эти значения в float
и передать список в Vectors.dense
.
Вот правильная реализация вашей логики:
def features_expl_non_udf(features):
return Vectors.dense(map(lambda feat: float(feat.split(":")[1]), features.split()))
features_expl_non_udf("a:1 b:2 c:3")
#DenseVector([1.0, 2.0, 3.0])
Вторая проблема с вашим udf
заключается в том, что вы не указали returnType
. Для DenseVector
вам необходимо используйте VectorUDT
в качестве returnType
.
from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT
features_expl = udf(
lambda features: Vectors.dense(
map(lambda feat: float(feat.split(":")[1]), features.split())
),
VectorUDT()
)
features.withColumn("feats", features_expl(features.features)).show()
#+--------------+-----+----------------+
#| features|label| feats|
#+--------------+-----+----------------+
#| a:1 b:2 c:3| 0| [1.0,2.0,3.0]|
#| a:4 b:5 c:6| 0| [4.0,5.0,6.0]|
#| a:7 b:8 c:9| 1| [7.0,8.0,9.0]|
#|a:10 b:11 c:12| 0|[10.0,11.0,12.0]|
#+--------------+-----+----------------+
В качестве альтернативы вы можете выполнить часть обработки строк на стороне искры, используя _ 17_ и _ 18_, но вам все равно придется использовать udf
для преобразования окончательного вывода в DenseVector
.
from pyspark.sql.functions import regexp_replace, split, udf
from pyspark.ml.linalg import Vectors, VectorUDT
toDenseVector = udf(Vectors.dense, VectorUDT())
features.withColumn(
"features",
toDenseVector(
split(regexp_replace("features", r"\w+:", ""), "\s+").cast("array<float>")
)
).show()
#+----------------+-----+
#| features|label|
#+----------------+-----+
#| [1.0,2.0,3.0]| 0|
#| [4.0,5.0,6.0]| 0|
#| [7.0,8.0,9.0]| 1|
#|[10.0,11.0,12.0]| 0|
#+----------------+-----+
person
pault
schedule
19.06.2019