создать новый столбец в фрейме данных pyspark, используя существующие столбцы

Я пытаюсь работать с фреймами данных pyspark, и я хотел бы знать, как я могу создать и заполнить новый столбец, используя существующие столбцы.

Допустим, у меня есть фрейм данных, который выглядит так:

+-----+---+---+
|   _1| _2| _3|
+-----+---+---+
|x1-y1|  3| z1|
|x2-y2|  2| z2|
|x3-y3|  1| z3|
+-----+---+---+

Я ищу способ создать фрейм данных, который выглядит так:

+-----+---+---+----+--------+
|   _1| _2| _3|  _4|      _5|
+-----+---+---+----+--------+
|x1-y1|  3| z1|x1y1|x1=y1=z1|
|x2-y2|  2| z2|x2y2|x2=y2=z2|
|x3-y3|  1| z3|x3y3|x3=y3=z3|
+-----+---+---+----+--------+

_4 просто удаляется из _1, а _5 использует значения из _1 и _3

  • Я использую spark-2.3.3 и python 2.7

Спасибо!


person Shashank BR    schedule 15.03.2019    source источник


Ответы (1)


Вы можете использовать pyspark.sql.functions для этого.

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

sqlContext = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate()
data = [('x1-y1', 3,'z1'),
        ('x2-y2', 2,'z2'),
        ('x3-y3', 1,'z3')]
test_df = sqlContext.createDataFrame(data, schema=['_1', '_2', '_3'])

test_df = test_df.withColumn('_4', F.regexp_replace('_1', '-', ''))
test_df = test_df.withColumn('_5', F.concat(F.regexp_replace('_1', '-', '='),F.lit('='),F.col('_3')))
test_df.show()

+-----+---+---+----+--------+
|   _1| _2| _3|  _4|      _5|
+-----+---+---+----+--------+
|x1-y1|  3| z1|x1y1|x1=y1=z1|
|x2-y2|  2| z1|x2y2|x2=y2=z2|
|x3-y3|  1| z1|x3y3|x3=y3=z3|
+-----+---+---+----+--------+
person giser_yugang    schedule 15.03.2019
comment
Спасибо за быстрый ответ, но я все равно получаю эту ошибку, когда пытаюсь использовать ваше решение: AttributeError: объект «PipelinedRDD» не имеет атрибута «withColumn» - person Shashank BR; 15.03.2019
comment
@ShanbogShashank Это так странно. Можете ли вы просто правильно запустить мой пример? - person giser_yugang; 15.03.2019
comment
Когда я копирую и вставляю ваш код, он работает. Я пытаюсь адаптировать его к своему коду, но получаю сообщение об ошибке. rdd1 = sc.parallelize([('x1-y1', 3, 'z1'), ('x2-y2', 2, 'z2'), ('x3-y3', 1, 'z3') ]) df1 = sqlContext.createDataFrame(rdd1) df2 = df2.withColumn('_4', F.regexp_replace('_1', '-', '')) df2 = df2.withColumn('_5', F.concat(F.regexp_replace('_1', '-', '='),F.lit('='),F.col('_3'))) df2.show() - person Shashank BR; 15.03.2019
comment
@ShanbogShashank Может проблема в инициализации Spark. Поскольку SparkSession новее, я рекомендовал вам этот способ. - person giser_yugang; 15.03.2019
comment
Здорово! Это сработало, когда я изменил sc = pyspark.SparkContext.getOrCreate() sqlContext = SQLContext(sc) на: sqlContext = SparkSession.builder.appName("test").enableHiveSupport().getOrCreate() Большое спасибо! - person Shashank BR; 15.03.2019