Создал собственный скрипт склейки pyspark для чтения данных из красного смещения

Я написал этот код с данными чтения из s3 и записью в s3 на AWS Glue.

Это настраиваемый код pyspark, и я не использую сгенерированные сценарии.

Вот сценарий

from pyspark import SparkConf,SparkContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import *

from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
import sys

args = getResolvedOptions(sys.argv,['TempDir','JOB_NAME'])

conf = SparkConf()
    
conf.set("spark.sql.parquet.compression.codec","snappy")
conf.set("spark.sql.parquet.writeLegacyFormat","true")

output_dir_path="s3://mygluecrawler/pysparkGlueData/"

sc = SparkContext()

glueContext = GlueContext(sc)

spark = glueContext.spark_session

job = Job(glueContext)

job.init(args['JOB_NAME'],args)

input_file = "s3://mygluecrawler/pysparkGlueData/store.csv"

#print(" Dropping the malformed data")

sparkDF = spark.read.format("csv").option("header","true").option("mode",'DROPMALFORMED').option("mode","FAILFAST").load(input_file)

sparkDF = sparkDF.withColumn("CompetitionDistance",sparkDF.CompetitionDistance.cast('float'))
sparkDF = sparkDF.withColumn("CompetitionOpenSinceMonth",sparkDF.CompetitionOpenSinceMonth.cast('int'))
sparkDF = sparkDF.withColumn("CompetitionOpenSinceYear",sparkDF.CompetitionOpenSinceYear.cast('int'))
sparkDF = sparkDF.withColumn("Promo2",sparkDF.Promo2.cast('int'))
sparkDF = sparkDF.withColumn("Promo2SinceWeek",sparkDF.Promo2SinceWeek.cast('int'))
sparkDF = sparkDF.withColumn("Promo2SinceYear",sparkDF.Promo2SinceYear.cast('int'))


#sparkDF = sparkDF.fillna(value=0)

#Replaces anything which is null with the value , here its replacing null with -99 in the array of two columns 
#mentioned in the subset

sparkDF = sparkDF.fillna(value=-99,subset=["Promo2SinceWeek","Promo2SinceYear"])


sparkColumns = sparkDF.select('StoreType','CompetitionDistance','CompetitionOpenSinceYear','Promo2SinceWeek','Promo2')

sparkColumns.write.format('parquet').partitionBy(['StoreType','Promo2']).mode('append').option("path",output_dir_path).save()

Приведенный выше сценарий работает нормально.

Теперь я делаю то же самое, но вместо того, чтобы читать файл из s3 и писать в s3, я хочу прочитать его из красного смещения в клее и записать его в красном смещении

Здесь я читаю данные из красного смещения и записываю их в красное смещение, пожалуйста, проверьте сценарий и скажите мне, правильный ли это путь. Мне нужна помощь.

from pyspark.context import SparkContext,SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import functions as f

from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.transforms import *

from awsglue.context import GlueContext
from awsglue.job import Job
import sys

args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

conf = SparkConf()

sc = SparkContext()

glueContext = GlueContext(sc)

spark = glueContext.spark_session

job = Job(glueContext)

job.init(args['JOB_NAME'], args)

jdbcURL = "jdbc:redshift://my-redshift-database.cealcs9iyaaz.us-****-2.redshift.amazonaws.com:5439/dev?user=username&password=password"

#sparkDF = spark.read.format("csv").option("header","true").option("mode",'DROPMALFORMED').option("mode","FAILFAST").load(input_file)
sparkDf = spark.read.option("url", jdbcURL) \
            .option("dbtable", "glue_poc.s3toredshift")\
            .option("tempdir", "s3://mygluecrawler/sparkLogs/")\
            .load()

sparkDf.createOrReplaceTempView("people")

newdata = spark.sql("select * from people")


dynamic_df = DynamicFrame.fromDF(newdata,glueContext, "dynamic_df")


mapped_df = ResolveChoice.apply(frame = dynamic_df,choice = "make_cols",transformation_ctx = "mapped_df")

datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = mapped_df, catalog_connection = "redshift-new-connection", 
    connection_options = {"dbtable" : "glue_poc.s3toredshift","database":"dev"},redshift_tmp_dir = args["TempDir"], 
    transformation_ctx = "datasink"
    )

job.commit()

person bigDataArtist    schedule 08.07.2021    source источник
comment
В чем проблема ?   -  person Hansanho    schedule 08.07.2021
comment
2021-07-08 10: 36: 26,307 ОШИБКА [основная] glue.ProcessLauncher (Logging.scala: logError (70)): Ошибка от Python: Traceback (последний вызов последний): File / opt / amazon / spark / python / lib / pyspark.zip / pyspark / sql / utils.py, строка 63, в деко return f (* a, ** kw) Файл /opt/amazon/spark/python/lib/py4j-0.10.7-src.zip /py4j/protocol.py, строка 328, в формате get_return_value (target_id,., name), value) py4j.protocol.Py4JJavaError: произошла ошибка при вызове o72.load. : org.apache.spark.sql.AnalysisException: невозможно вывести схему для Parquet. Его нужно указывать вручную   -  person bigDataArtist    schedule 08.07.2021
comment
@Hansanho, вы пишете собственные скрипты на клей?   -  person bigDataArtist    schedule 08.07.2021
comment
Я считаю, что вам нужно предоставить format функции spark.read. Это сообщение SO может помочь   -  person vmachan    schedule 10.07.2021
comment
@vmachan Я сделал это, но он снова показал мне ошибку. Невозможно вывести схему для csv, я дал ей формат csv   -  person bigDataArtist    schedule 11.07.2021
comment
@bigDataArtist, поскольку вы читаете Redshift, я думаю, вам нужно будет использовать формат драйвера Redshift, как показано в сообщении SO, которым я поделился в моем предыдущем комментарии.   -  person vmachan    schedule 12.07.2021
comment
@vmachan На самом деле я пытался это сделать, но мне не удалось вызвать банку из ноутбука jupyter. Может быть, мне придется развернуть эту банку на пути клея, а затем запустить ее. так что в основном эта банка должна быть на пути клея, когда я запускаю искровую работу   -  person bigDataArtist    schedule 12.07.2021