Почему Spark убивает процесс драйвера в случае исключения?

Я новый пользователь Spark Streaming и Spark. Во время своих тестов я заметил, что одна ошибка в потоке приводит к сбою всего потокового приложения.

Для большей ясности позвольте мне объяснить на примере. Предположим, что представленное приложение потребляет целочисленный поток, например 15, 10, 21, 12, .... Но этот поток может по ошибке содержать некоторые нецелочисленные строки (например, 15, 10, 21, 12, foo, 32, ...). Если в моем коде есть ошибка и предполагается, что все элементы данных являются целыми числами, тогда он вызовет исключение при обработке строки foo в потоке.

В таком случае движок Spark повторяет выполнение задачи по умолчанию 3 раза (вы можете проверить документацию по конфигурации для параметра spark.task.maxFailures). Затем, после всех неудачных попыток, он прерывает процесс драйвера и, следовательно, исполнителей.

Я не уверен, что это правильное поведение. Вместо этого я думал, что текущая задача (т.е. выполнение частичных данных) или пакет (т.е. набор элементов данных, считанных из потока) будет отброшен. Итак, драйвер обработает оставшийся поток.

Есть идеи, почему Spark так себя ведет? Существуют ли какие-либо конфигурации, чтобы заставить двигатель игнорировать сбои и продолжать работу?

Кстати, я использую Spark в автономном режиме. Может ли в этом помочь YARN или Mesos?

Заранее спасибо.


person ovunccetin    schedule 29.10.2015    source источник
comment
Я бы прочитал его как строковый столбец и обработал неверные данные вручную.   -  person Reactormonk    schedule 29.10.2015


Ответы (1)


Если Spark потерпит неудачу тихо, как вы узнаете, что что-то пошло не так? (YARN или Mesos в этом не помогут.)

Как предлагает Reactormonk, вы должны указать, как вы хотите, чтобы сбои обрабатывались как часть функции, которую вы передаете Spark. Если вы просто хотите отбросить ошибочные строки и используете Scala, вы можете сделать что-то вроде следующего:

val strRDD = sc.parallelize(Array("15", "10", "21", "12", "foo", "32"),1)
val intRDD = strRDD.flatMap(x => try{Some(x.toInt)} catch {case e: Exception => None})
intRDD.collect()

Это вернет Array[Int] = Array(15, 10, 21, 12, 32).

person Matthew Graves    schedule 29.10.2015
comment
Хорошо с ручным управлением, но мне нужно узнать, есть ли в Spark встроенная поддержка для тихого пропуска ошибочного выполнения. Он может уведомлять меня о неудачных задачах через API прослушивателя, и я могу сказать, игнорируйте это и продолжайте. - person ovunccetin; 29.10.2015