Как писать и обновлять API kudu в Spark 2.1

Я хочу писать и обновлять Kudu API. Это зависимость maven:

<dependency>
  <groupId>org.apache.kudu</groupId>
  <artifactId>kudu-client</artifactId>
  <version>1.1.0</version>
</dependency>
<dependency>
  <groupId>org.apache.kudu</groupId>
  <artifactId>kudu-spark2_2.11</artifactId>
  <version>1.1.0</version>
</dependency>

В следующем коде я понятия не имею о параметре KuduContext.

Мой код в spark2-shell:

val kuduContext = new KuduContext("master:7051") 

Также такая же ошибка в потоковой передаче Spark 2.1:

import org.apache.kudu.spark.kudu._
import org.apache.kudu.client._
val sparkConf = new SparkConf().setAppName("DirectKafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val messages = KafkaUtils.createDirectStream("")
messages.foreachRDD(rdd => {
   val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
   import spark.implicits._
   val bb = spark.read.options(Map("kudu.master" -> "master:7051","kudu.table" -> "table")).kudu //good 
   val kuduContext = new KuduContext("master:7051") //error
})

Затем ошибка:

org.apache.spark.SparkException: в этой JVM может работать только один SparkContext (см. SPARK-2243). Чтобы игнорировать эту ошибку, установите spark.driver.allowMultipleContexts = true. Текущий запущенный SparkContext был создан по адресу: org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:860)


person Autumn    schedule 09.01.2018    source источник
comment
Кажется, у вас уже есть активный SparkContext (поскольку вы получаете конфигурацию от rdd.sparkContext.getConf. Зачем вы создаете новый?   -  person Shaido    schedule 09.01.2018
comment
я запускаю код в spark2-shell, который по умолчанию включает sparksession.   -  person Autumn    schedule 09.01.2018
comment
Если вы используете искровую оболочку, вам не нужна зависимость от maven. Включите банку куду при запуске оболочки.   -  person Shaido    schedule 09.01.2018
comment
я могу ввести вас в заблуждение. я обновил свой вопрос сейчас.   -  person Autumn    schedule 10.01.2018
comment
Вы должны прекратить создавать/получать новый SparkSession и KuduContext для каждого RDD.   -  person OneCricketeer    schedule 10.01.2018
comment
мне нужен SparkSession, который spark.read.json для каждого RDD.   -  person Autumn    schedule 10.01.2018
comment
Хорошо, а почему ты не можешь сделать это за пределами foreach? У вас есть SparkConf, так что вам не нужен этот rdd.sparkContext.getConf   -  person OneCricketeer    schedule 10.01.2018
comment
@cricket_007 Мне нужно определить потоковую передачу, val ssc = new StreamingContext(sparkConf, Seconds(2)   -  person Autumn    schedule 10.01.2018


Ответы (1)


Обновите свою версию Kudu до последней версии (в настоящее время 1.5.0). KuduContext принимает SparkContext в качестве входного параметра в более поздних версиях, и это должно предотвратить эту проблему.

Кроме того, выполните первоначальную инициализацию Spark за пределами файла foreachRDD. В предоставленном вами коде переместите spark и kuduContext из foreach. Также вам не нужно создавать отдельный sparkConf, вы можете использовать только более новый SparkSession.

val spark = SparkSession.builder.appName("DirectKafka").master("local[*]").getOrCreate()
import spark.implicits._

val kuduContext = new KuduContext("master:7051", spark.sparkContext)
val bb = spark.read.options(Map("kudu.master" -> "master:7051", "kudu.table" -> "table")).kudu

val messages = KafkaUtils.createDirectStream("")
messages.foreachRDD(rdd => {   
  // do something with the bb table and messages       
})
person Shaido    schedule 10.01.2018
comment
@крикет_007. с kudu-spark2_2.11_1.1.0 кажется только один параметр KuduContext(org.apache.kudu.spark.kudu) - person Autumn; 10.01.2018
comment
Инициализация искры внутри foreachRDD из-за потокового документа искры. из foreachRD есть val ssc = new StreamingContext(sparkConf, Seconds(2). - person Autumn; 10.01.2018
comment
@Autumn: Никогда не должно быть необходимости в такой инициализации внутри foreach. Где ты это увидел? - person Shaido; 10.01.2018
comment
это документ:spark.apache .org/docs/2.1.0/ - person Autumn; 10.01.2018
comment
@Autumn: Интересно, каждый день узнаешь что-то новое. Хотя я предполагаю, что это должно быть необходимо только в том случае, если конфигурация изменяется между потоковыми кадрами данных, поэтому в большинстве случаев снаружи все должно быть в порядке. Важное различие между документом и тем, что вы использовали, заключается в том, что SparkSession на самом деле не создается в foreachRDD в документах, он извлекает существующий сеанс. Другими словами. - person Shaido; 10.01.2018
comment
@Autumn: Глядя на исходный код, связанный с документацией, они фактически определяют объект SparkSessionSingleton, который они используют внутри цикла. - person Shaido; 10.01.2018
comment
Который определяется в нижней части класса как одна строка - person OneCricketeer; 10.01.2018
comment
@cricket_007: Знаете ли вы, есть ли веская причина, по которой SparkSessionSingleton создается и используется вместо простого использования SparkSession? Что-то с конфигурацией? т.е. что rdd.sparkContext.getConf используется в цикле. - person Shaido; 10.01.2018
comment
Это синглтон, поэтому getOrCreate вызывается только один раз? Я не знаю, почему контекст RDD будет другим - person OneCricketeer; 10.01.2018
comment
@cricket_007: Это будет вызвано только один раз. Однако я думал, что getOrCreate уже позаботится об этом, другими словами, он создаст объект, если он не существует, в противном случае просто извлеките его. Такое ощущение, что это синглтон вокруг синглтона... - person Shaido; 10.01.2018
comment
@ Шайдо, спасибо за ваш терпеливый ответ. приведенный выше источник совпадает с моим кодом. как SparkSessionSingleton, так и spark находятся внутри цикла, ошибка одинакова независимо от того, kuduContext внутри или снаружи. - person Autumn; 10.01.2018
comment
@Autumn: Можете ли вы попробовать добавить дополнительный параметр при создании KuduContext? Я видел здесь: kudu.apache.org/docs/developing.html, что, возможно, это необходим и для версии 1.1.0 (при использовании Spark 2.0+). Добавил его в код в ответе. - person Shaido; 10.01.2018
comment
@ Шайдо, не могу. он предупредит при добавлении 2 параметров. По идее, это указывает на val kuduContext = new KuduContext(org.apache.kudu.spark.kudu) - person Autumn; 10.01.2018
comment
@Осень: понятно. Возможно ли обновление Kudu (1.5.0 — самая новая версия)? - person Shaido; 10.01.2018
comment
@Shaido: 1.5.0 KuduContext также нужен только параметр. кстати, KuduContext отбрасывается; в 1.5.0 (я использую spark2.1 и scala 2.11). теперь я нахожу API, который может создавать, удалять или записывать в таблицы Kudu вместо KuduContext . - person Autumn; 10.01.2018
comment
@Autumn: согласно kudu.apache.org/releases/1.5. .0/docs/ в версии 1.5 должно быть два параметра. Поскольку для ввода используется sparkContext, он должен (надеюсь) решить проблему. - person Shaido; 10.01.2018
comment
@Shaido, о да, в версии 1.5 это два параметра. большое спасибо. не могли бы вы обновить свой ответ, я бы принял его. - person Autumn; 10.01.2018
comment
@Шайдо. к сожалению, другие вопросы, которые отличаются от этого вопроса. поэтому я открываю новый вопрос - person Autumn; 10.01.2018