Я написал этот код с данными чтения из s3 и записью в s3 на AWS Glue.
Это настраиваемый код pyspark, и я не использую сгенерированные сценарии.
Вот сценарий
from pyspark import SparkConf,SparkContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
import sys
args = getResolvedOptions(sys.argv,['TempDir','JOB_NAME'])
conf = SparkConf()
conf.set("spark.sql.parquet.compression.codec","snappy")
conf.set("spark.sql.parquet.writeLegacyFormat","true")
output_dir_path="s3://mygluecrawler/pysparkGlueData/"
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)
input_file = "s3://mygluecrawler/pysparkGlueData/store.csv"
#print(" Dropping the malformed data")
sparkDF = spark.read.format("csv").option("header","true").option("mode",'DROPMALFORMED').option("mode","FAILFAST").load(input_file)
sparkDF = sparkDF.withColumn("CompetitionDistance",sparkDF.CompetitionDistance.cast('float'))
sparkDF = sparkDF.withColumn("CompetitionOpenSinceMonth",sparkDF.CompetitionOpenSinceMonth.cast('int'))
sparkDF = sparkDF.withColumn("CompetitionOpenSinceYear",sparkDF.CompetitionOpenSinceYear.cast('int'))
sparkDF = sparkDF.withColumn("Promo2",sparkDF.Promo2.cast('int'))
sparkDF = sparkDF.withColumn("Promo2SinceWeek",sparkDF.Promo2SinceWeek.cast('int'))
sparkDF = sparkDF.withColumn("Promo2SinceYear",sparkDF.Promo2SinceYear.cast('int'))
#sparkDF = sparkDF.fillna(value=0)
#Replaces anything which is null with the value , here its replacing null with -99 in the array of two columns
#mentioned in the subset
sparkDF = sparkDF.fillna(value=-99,subset=["Promo2SinceWeek","Promo2SinceYear"])
sparkColumns = sparkDF.select('StoreType','CompetitionDistance','CompetitionOpenSinceYear','Promo2SinceWeek','Promo2')
sparkColumns.write.format('parquet').partitionBy(['StoreType','Promo2']).mode('append').option("path",output_dir_path).save()
Приведенный выше сценарий работает нормально.
Теперь я делаю то же самое, но вместо того, чтобы читать файл из s3 и писать в s3, я хочу прочитать его из красного смещения в клее и записать его в красном смещении
Здесь я читаю данные из красного смещения и записываю их в красное смещение, пожалуйста, проверьте сценарий и скажите мне, правильный ли это путь. Мне нужна помощь.
from pyspark.context import SparkContext,SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import functions as f
from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.transforms import *
from awsglue.context import GlueContext
from awsglue.job import Job
import sys
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
conf = SparkConf()
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
jdbcURL = "jdbc:redshift://my-redshift-database.cealcs9iyaaz.us-****-2.redshift.amazonaws.com:5439/dev?user=username&password=password"
#sparkDF = spark.read.format("csv").option("header","true").option("mode",'DROPMALFORMED').option("mode","FAILFAST").load(input_file)
sparkDf = spark.read.option("url", jdbcURL) \
.option("dbtable", "glue_poc.s3toredshift")\
.option("tempdir", "s3://mygluecrawler/sparkLogs/")\
.load()
sparkDf.createOrReplaceTempView("people")
newdata = spark.sql("select * from people")
dynamic_df = DynamicFrame.fromDF(newdata,glueContext, "dynamic_df")
mapped_df = ResolveChoice.apply(frame = dynamic_df,choice = "make_cols",transformation_ctx = "mapped_df")
datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = mapped_df, catalog_connection = "redshift-new-connection",
connection_options = {"dbtable" : "glue_poc.s3toredshift","database":"dev"},redshift_tmp_dir = args["TempDir"],
transformation_ctx = "datasink"
)
job.commit()
format
функцииspark.read
. Это сообщение SO может помочь - person vmachan   schedule 10.07.2021