AWS Glue с PySpark - при экспорте DynamicFrame в S3 возникает ошибка UnsupportedOperationException.

В начале я должен сказать, что использовал AWS Glue Studio, чтобы узнать, как использовать Glue с PySpark, и пока все идет очень хорошо. Так было до тех пор, пока я не столкнулся с ошибкой, которую не могу понять (не говоря уже о решении). Пример данных можно найти внизу.

Контекст

Все, что я делал, это простое преобразование данных. Input S3 Bucket --> CustomTransform --> Output S3. Но программа продолжает вылетать после экспорта некоторых данных. Я тоже упомянул об этом позже, но я даже попытался удалить CustomTransformation, но экспорт данных S3 все еще не удался, даже при переходе от одного ведра к другому.

Ошибка

Вот часть ошибки Python, которую я получаю (скопировано из CloudWatch):

2021-03-26 09:03:09,200 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(70)): Error from Python:Traceback (most recent call last):
  File "/tmp/GlueTest.py", line 69, in <module>
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "glueparquet", connection_options = {
    "path": "s3://example-bucket-here/data/",
    "compression": "snappy",
    "partitionKeys": []
}, transformation_ctx = "DataSink0")
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 640, in from_options
    format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 242, in write_dynamic_frame_from_options
    format, format_options, transformation_ctx)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 265, in write_from_options
    return sink.write(frame_or_dfc)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write
    return self.writeFrame(dynamic_frame_or_dfc, info)
  File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
    return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o85.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 1.0 failed 4 times, most recent failure: Lost task 4.3 in stage 1.0 (TID 76, 172.36.109.34, executor 6): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary

Настоящая головоломка

Что меня больше всего сбивает с толку, так это то, что этот сбой происходит после, когда большая часть данных уже экспортирована в S3. Сразу же это может означать, что с данными что-то не так, поскольку они попадают в некоторые поврежденные (или плохо отформатированные) данные, а затем вылетает.

Итак, я посмотрел на разницу между успешно экспортированными данными и входными данными и нашел все строки, которые не были экспортированы. Ничто не показалось мне странным или причиной неудачного экспорта.

Когда я выбираю S3 Bucket в качестве входного источника, может быть полезно знать, что схема определяется AWS Glue.

Что я пробовал

Поэтому я попытался экспортировать данные во всех форматах, поддерживаемых Glue, но ни один из них не работал. Я также попытался пропустить все преобразования данных и просто взять Input S3 Bucket и экспортировать прямо в Output S3 Bucket, но он все равно вылетел с той же ошибкой (на самом деле это сообщение об ошибке, которое я включил выше!).

Опять же, все это говорит о том, что с данными что-то не так, но я просмотрел все данные, которые не прошли через процесс (всего около 180 записей), и все они выглядят так же, как данные, которые прошли.

И для проверки работоспособности я использовал метод Input S3 - ›Output S3 для некоторых других (очень похожих) данных, и он работал нормально, в основном действовал как копипаст.

Я также наткнулся на эту статью. Но это не очень помогло, когда я попытался изменить выходной формат, чтобы получить больше информации, я столкнулся с той же ошибкой - без дополнительной информации.

Может ли кто-нибудь помочь здесь определить проблему? Ничто не говорит о том, что это должно привести к сбою. Я рад предоставить остальную часть ошибки Java, если это поможет людям.

Пример данных

Вот как выглядят мои данные:

Date        ticker_name  currency exchange_name instrument_type first_trade_date Amount
1612229400  0382.HK      HKD      HKG           EQUITY          1563240600       0.049
1613140200  SO           USD      NYQ           EQUITY          378657000        0.64
1613053800  SIGI         USD      NMS           EQUITY          322151400        0.25
1614240000  SIGT.L       GBp      LSE           EQUITY          828601200        1.68
1612249200  SIH.BE       EUR      BER           EQUITY          1252044000       0.038

Все поля представляют собой строки, кроме Date (long), first_trade_date (long) и Amount (double).

Когда я звоню .printSchema(), я получаю следующее:

root
|-- Date: long
|-- currency: string
|-- exchange_name: string
|-- instrument_type: string
|-- first_trade_date: long
|-- Amount: double

person Jamie    schedule 30.03.2021    source источник
comment
Было бы полезно узнать, как выглядит структура данных вашего источника данных.   -  person Robert Kossendey    schedule 01.04.2021
comment
@RobertKossendey Обновлено, чтобы теперь включить пример данных.   -  person Jamie    schedule 01.04.2021
comment
Можете ли вы попробовать писать в S3, но раньше не опускали столбец ticker_name?   -  person Robert Kossendey    schedule 01.04.2021
comment
@RobertKossendey Просто попробовал, но, к сожалению, все равно не удалось с той же ошибкой.   -  person Jamie    schedule 01.04.2021
comment
Можете ли вы вызвать .printSchema () в своем динамическом фрейме?   -  person Robert Kossendey    schedule 01.04.2021
comment
@RobertKossendey Обновлено сообщение, чтобы включить вывод   -  person Jamie    schedule 06.04.2021
comment
Хорошо, вы можете снова включить ticker_name, теперь отпустите Amount, так как в ошибке написано 'PlainDoubleDictionary'   -  person Robert Kossendey    schedule 06.04.2021
comment
@RobertKossendey Я сбросил Amount, но все равно получаю ту же ошибку.   -  person Jamie    schedule 06.04.2021


Ответы (1)


Решение

Поэтому, если у кого-то возникла эта проблема, это может расстраивать, потому что эта ошибка, похоже, не дает никакой информации о том, что на самом деле идет не так. Одной из немногих подсказок, которые у меня были, была эта статья. Это говорит о том, что с моей схемой что-то не так.

Мне пришлось очень внимательно изучить свои данные и в конце концов заметил, что я получаю эту ошибку только тогда, когда я запускаю ее с определенными файлами в сочетании с другими файлами.

Оказалось, что некоторые из моих паркетных файлов имели дату в формате int, а в других случаях это было float. Эти данные были созданы из Pandas DataFrame с использованием .to_parquet() в другой функции, поэтому я не уверен, почему возникла несогласованность в типах данных.

Что меня больше всего озадачило, так это то, почему, когда я попытался привести тип даты к int (как показано здесь), я все равно получил ошибка.

В любом случае, мое решение состояло в том, чтобы исправить способ вывода данных Pandas и убедиться, что он всегда выводит дату в виде целого числа до того, как Glue обработает данные.

person Jamie    schedule 12.04.2021