В начале я должен сказать, что использовал AWS Glue Studio, чтобы узнать, как использовать Glue с PySpark, и пока все идет очень хорошо. Так было до тех пор, пока я не столкнулся с ошибкой, которую не могу понять (не говоря уже о решении). Пример данных можно найти внизу.
Контекст
Все, что я делал, это простое преобразование данных. Input S3 Bucket --> CustomTransform --> Output S3
. Но программа продолжает вылетать после экспорта некоторых данных. Я тоже упомянул об этом позже, но я даже попытался удалить CustomTransformation, но экспорт данных S3 все еще не удался, даже при переходе от одного ведра к другому.
Ошибка
Вот часть ошибки Python, которую я получаю (скопировано из CloudWatch):
2021-03-26 09:03:09,200 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last):
File "/tmp/GlueTest.py", line 69, in <module>
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "glueparquet", connection_options = {
"path": "s3://example-bucket-here/data/",
"compression": "snappy",
"partitionKeys": []
}, transformation_ctx = "DataSink0")
File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 640, in from_options
format_options, transformation_ctx)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 242, in write_dynamic_frame_from_options
format, format_options, transformation_ctx)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 265, in write_from_options
return sink.write(frame_or_dfc)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write
return self.writeFrame(dynamic_frame_or_dfc, info)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o85.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 76, 172.36.109.34, executor 6): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
Настоящая головоломка
Что меня больше всего сбивает с толку, так это то, что этот сбой происходит после, когда большая часть данных уже экспортирована в S3. Сразу же это может означать, что с данными что-то не так, поскольку они попадают в некоторые поврежденные (или плохо отформатированные) данные, а затем вылетает.
Итак, я посмотрел на разницу между успешно экспортированными данными и входными данными и нашел все строки, которые не были экспортированы. Ничто не показалось мне странным или причиной неудачного экспорта.
Когда я выбираю S3 Bucket в качестве входного источника, может быть полезно знать, что схема определяется AWS Glue.
Что я пробовал
Поэтому я попытался экспортировать данные во всех форматах, поддерживаемых Glue, но ни один из них не работал. Я также попытался пропустить все преобразования данных и просто взять Input S3 Bucket и экспортировать прямо в Output S3 Bucket, но он все равно вылетел с той же ошибкой (на самом деле это сообщение об ошибке, которое я включил выше!).
Опять же, все это говорит о том, что с данными что-то не так, но я просмотрел все данные, которые не прошли через процесс (всего около 180 записей), и все они выглядят так же, как данные, которые прошли.
И для проверки работоспособности я использовал метод Input S3 - ›Output S3 для некоторых других (очень похожих) данных, и он работал нормально, в основном действовал как копипаст.
Я также наткнулся на эту статью. Но это не очень помогло, когда я попытался изменить выходной формат, чтобы получить больше информации, я столкнулся с той же ошибкой - без дополнительной информации.
Может ли кто-нибудь помочь здесь определить проблему? Ничто не говорит о том, что это должно привести к сбою. Я рад предоставить остальную часть ошибки Java, если это поможет людям.
Пример данных
Вот как выглядят мои данные:
Date ticker_name currency exchange_name instrument_type first_trade_date Amount
1612229400 0382.HK HKD HKG EQUITY 1563240600 0.049
1613140200 SO USD NYQ EQUITY 378657000 0.64
1613053800 SIGI USD NMS EQUITY 322151400 0.25
1614240000 SIGT.L GBp LSE EQUITY 828601200 1.68
1612249200 SIH.BE EUR BER EQUITY 1252044000 0.038
Все поля представляют собой строки, кроме Date (long), first_trade_date (long) и Amount (double).
Когда я звоню .printSchema()
, я получаю следующее:
root
|-- Date: long
|-- currency: string
|-- exchange_name: string
|-- instrument_type: string
|-- first_trade_date: long
|-- Amount: double