Как преобразовать RDD плотного вектора в DataFrame в pyspark?

У меня есть DenseVector RDD вот так

>>> frequencyDenseVectors.collect()
[DenseVector([1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0]), DenseVector([1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0])]

Я хочу преобразовать это в Dataframe. я пробовал вот так

>>> spark.createDataFrame(frequencyDenseVectors, ['rawfeatures']).collect()

Выдает такую ​​ошибку

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 520, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 360, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 340, in _inferSchema
    schema = _infer_schema(first)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 991, in _infer_schema
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 968, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'numpy.ndarray'>

старое решение

frequencyVectors.map(lambda vector: DenseVector(vector.toArray()))

Редактировать 1 — Воспроизводимый код

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import split

from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.mllib.linalg import SparseVector, DenseVector

sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
sc.setLogLevel('ERROR')

sentenceData = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (0, "I wish Java could use case classes"),
    (1, "Logistic regression models are neat")
], ["label", "sentence"])
sentenceData = sentenceData.withColumn("sentence", split("sentence", "\s+"))
sentenceData.show()

vectorizer = CountVectorizer(inputCol="sentence", outputCol="rawfeatures").fit(sentenceData)
countVectors = vectorizer.transform(sentenceData).select("label", "rawfeatures")

idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(countVectors)
tfidf = idfModel.transform(countVectors).select("label", "features")
frequencyDenseVectors = tfidf.rdd.map(lambda vector: [vector[0],DenseVector(vector[1].toArray())])
frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])

person Hardik Gupta    schedule 26.12.2016    source источник


Ответы (2)


Вы не можете конвертировать RDD[Vector] напрямую. Он должен быть сопоставлен с RDD объектами, которые можно интерпретировать как structs, например RDD[Tuple[Vector]]:

frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])

В противном случае Spark попытается преобразовать объект __dict__ и создать неподдерживаемый массив NumPy в качестве поля.

from pyspark.ml.linalg import DenseVector  
from pyspark.sql.types import _infer_schema

v = DenseVector([1, 2, 3])
_infer_schema(v)
TypeError                                 Traceback (most recent call last)
... 
TypeError: not supported type: <class 'numpy.ndarray'>

vs.

_infer_schema((v, ))
StructType(List(StructField(_1,VectorUDT,true)))

Примечания:

  • В Spark 2.0 вы должны использовать правильные локальные типы:

    • pyspark.ml.linalg when working DataFrame based pyspark.ml API.
    • pyspark.mllib.linalg при работе RDD на основе pyspark.mllib API.

    Эти два пространства имен больше не могут быть совместимы и требуют явных преобразований (например, Как преобразовать из org.apache.spark.mllib.linalg.VectorUDT в мл.linalg.VectorUDT).

  • Код, указанный в редактировании, не эквивалентен коду из исходного вопроса. Вы должны знать, что tuple и list имеют разную семантику. Если вы сопоставляете вектор с парой, используйте tuple и преобразуйте непосредственно в DataFrame:

    tfidf.rdd.map(
        lambda row: (row[0], DenseVector(row[1].toArray()))
    ).toDF()
    

    использование tuple (тип продукта) также будет работать для вложенной структуры, но я сомневаюсь, что это то, что вы хотите:

    (tfidf.rdd
        .map(lambda row: (row[0], DenseVector(row[1].toArray())))
        .map(lambda x: (x, ))
        .toDF())
    

    list в любом другом месте, кроме верхнего уровня row, интерпретируется как ArrayType.

  • Гораздо чище использовать UDF для преобразования (Spark Python: стандартная ошибка масштабирования Не поддерживать... SparseVector).

person zero323    schedule 26.12.2016

Я считаю, что проблема здесь в том, что createDataframe не принимает в качестве аргумента плотноVactor Пожалуйста, попробуйте преобразовать плотноВектор в соответствующую коллекцию [т.е. Массив или список]. В скале и джаве

массив()

метод доступен, вы можете преобразовать плотный вектор в массив или список, а затем попытаться создать dataFrame.

person Akash Sethi    schedule 26.12.2016