Утечка памяти Spark off heap в Yarn с прямым потоком Kafka

Я запускаю искровую потоковую передачу 1.4.0 на Yarn (дистрибутив Apache 2.6.0) с java 1.8.0_45, а также с прямым потоком Kafka. Я также использую Spark с поддержкой scala 2.11.

Проблема, которую я вижу, заключается в том, что контейнеры драйвера и исполнителя постепенно увеличивают использование физической памяти до точки, когда контейнер пряжи убивает его. Я настроил до 192M Heap и 384 off heap в моем драйвере, но в конечном итоге он иссякнет.

Память кучи, похоже, подходит для регулярных циклов сборки мусора. Ни в одном из таких прогонов не встречается OutOffMemory.

На самом деле я не генерирую трафик в очередях kafka, но это происходит. Вот код, который я использую

object SimpleSparkStreaming extends App {

val conf = new SparkConf()
val ssc = new StreamingContext(conf,Seconds(conf.getLong("spark.batch.window.size",1L)));
ssc.checkpoint("checkpoint")
val topics = Set(conf.get("spark.kafka.topic.name")); 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> conf.get("spark.kafka.broker.list"))
            val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)
            kafkaStream.foreachRDD(rdd => {
                rdd.foreach(x => {
                    println(x._2)
                })

            })
    kafkaStream.print()
            ssc.start() 

            ssc.awaitTermination()

}

Я запускаю это на CentOS 7. Команда, используемая для отправки искры, следующая

./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
--conf spark.yarn.executor.memoryOverhead=256 \
--conf spark.yarn.driver.memoryOverhead=384 \
--conf spark.kafka.topic.name=test \
--conf spark.kafka.broker.list=172.31.45.218:9092 \
--conf spark.batch.window.size=1 \
--conf spark.app.name="Simple Spark Kafka application" \
--master yarn-cluster \
--num-executors 1 \
--driver-memory 192m \
--executor-memory 128m \
--executor-cores 1 \
/home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar 

Любая помощь приветствуется

С уважением,

Апурва


person Apoorva Sareen    schedule 13.07.2015    source источник
comment
Я столкнулся с той же проблемой, которую вы нашли решение?   -  person crak    schedule 23.02.2016
comment
У меня аналогичная проблема, но вы не достигли точки насыщения: http://stackoverflow.com/questions/35693211/spark-streaming-application-health   -  person Mohitt    schedule 01.03.2016
comment
дай мне знать, если найдешь какое-то решение   -  person Mohitt    schedule 01.03.2016
comment
Я нахожусь в той же ситуации, вы случайно не нашли причину или обходной путь?   -  person Felix    schedule 31.05.2018


Ответы (3)


Попробуйте увеличить ядра исполнителя. В вашем примере единственное ядро ​​предназначено для использования потоковых данных, не оставляя ядер для обработки входящих данных.

person Praneeth Reddy G    schedule 17.03.2016
comment
это DirectStream, одно ядро ​​исполнителя в порядке spark.apache.org/docs/latest/ - person Artur Bartnik; 08.06.2016

Это может быть утечка памяти ... Вы пробовали использовать conf.set ("spark.executor.extraJavaOptions", "- XX: + UseG1GC")?

person Jorge Machado    schedule 20.07.2016

Это не ответ Kafka, он будет изолирован от Spark и о том, как его система каталогизации некачественная, когда дело доходит до согласованной устойчивости и больших операций. Если вы постоянно выполняете запись на уровне перситентности (т.е. в цикле, повторно сохраняющем DF после большой операции, а затем выполняя ее снова) или выполняете большой запрос (например, inputDF.distinct.count); задание Spark начнет помещать некоторые данные в память и неэффективно удалять устаревшие объекты.

Это означает, что со временем объект, который был способен быстро запускаться один раз, будет постоянно замедляться, пока не останется доступной памяти. Для всех, кто находится дома, запускает AWS EMR с большим DataFrame, загруженным в среду, выполните следующий запрос:

var iterator = 1
val endState = 15
var currentCount = 0
while (iterator <= endState) {
  currentCount = inputDF.distinct.count
  print("The number of unique records are : " + currentCount)
  iterator = iterator + 1
}

Во время выполнения задания следите за управлением памятью пользовательского интерфейса Spark, если DF достаточно велик для сеанса, вы начнете замечать снижение времени выполнения с каждым последующим запуском, в основном блоки становятся устаревшими, но Spark не может идентифицировать когда чистить эти блоки.

Лучший способ найти решение этой проблемы - это написать свой DF локально, очистить слой настойчивости и загрузить данные обратно. Это подход к проблеме «кувалдой», но для моего бизнес-кейса это был Решение, которое легко реализовать, привело к увеличению времени выполнения наших больших таблиц на 90% (от 540 минут до примерно 40 с меньшим объемом памяти).

В настоящее время я использую следующий код:

val interimDF = inputDF.action
val tempDF = interimDF.write.format(...).option("...","...").save("...")
spark.catalog.clearCache
val interimDF = spark.read..format(...).option("...","...").save("...").persist
interimDF.count

Вот производная, если вы не теряете DF в дочерних подпроцессах:

val interimDF = inputDF.action
val tempDF = interimDF.write.format(...).option("...","...").save("...")
for ((k,v) <- sc.getPersistentRDDs) {
  v.unpersist()
}
val interimDF = spark.read..format(...).option("...","...").save("...").persist
interimDF.count
person afeldman    schedule 12.08.2019