Как записать фрейм данных искры в один файл в локальной системе без использования объединения

Я хочу создать файл avro из кадра данных pyspark, и в настоящее время я делаю coalesce, как показано ниже.

df = df.coalesce(1)
df.write.format('avro').save('file:///mypath')

Но теперь это приводит к проблемам с памятью, поскольку все данные будут загружаться в память перед записью, а размер моих данных постоянно растет с каждым днем. Поэтому я хочу записать данные по каждому разделу, чтобы данные записывались на диск кусками и не вызывали проблем с OOM. Я обнаружил, что toLocalIterator помогает в достижении этого. Но я не уверен, как его использовать. Я пробовал использовать ниже, и он возвращает все строки

iter = df.toLocalIterator()
for i in iter:
    print('writing some data')
    # write the data into disk/file

Iter выполняет итерацию по каждой строке, а не по каждому разделу. Как мне это сделать?


person newbie    schedule 25.08.2020    source источник
comment
вы пробовали repartition вместо coalesce ? У меня уже были некоторые проблемы с памятью, которые были решены с помощью перераспределения вместо объединения.   -  person Steven    schedule 25.08.2020
comment
.repartition(n) выполняет равное разбиение, а coalsce(n) выполняет приблизительно одинаковое разбиение, чтобы свести к минимуму перемешивание. Если вы хотите записывать/читать данные по разделам, почему бы вам не сделать df.repartition(partitionColumns).write.partitionBy(partitionColumns)...? Разве avro не поддерживает разбиение?   -  person Samir Vyas    schedule 25.08.2020


Ответы (1)


когда вы делаете df = df.coalesce(1), все данные собираются в один из рабочих узлов. если этот узел не может справиться с таким огромным из-за ограничений ресурсов на узле, задание завершится ошибкой OOM.

Согласно документации искры toLocalIterator Возвращает итератор, который содержит все строки в этом текущем наборе данных и Максимальное количество памяти, которое он может использовать, эквивалентно наибольшему разделу в этом наборе данных

Как работает toLocalIterator?

Первый раздел отправляется драйверу. Если вы продолжите итерацию и достигнете конца первого раздела, второй раздел будет отправлен на узел драйвера и так далее до последнего раздела... поэтому (максимальная память, которую он может занимать = самый большой раздел) убедитесь, что ваш главный узел имеет достаточно оперативной памяти и диска.

Метод toLocalIterator.next() обеспечивает извлечение следующих записей раздела, если обработка предыдущего раздела выполнена.

what you can do is 

    //batch objects like 1000 per batch
    df.toLocalIterator().foreach(obj => {
      //add object in array
      //if batch size is reached ... 
        //then serialize them and use FileOutputStream and save in local location 
    })

примечание: обязательно кешируйте свой parentDF .. иначе в некоторых сценариях каждый раздел необходимо пересчитывать.

person kavetiraviteja    schedule 25.08.2020