У меня есть следующий код для тестирования интеграции flink и hive. Подаю заявку через flink run -m yarn-cluster ....
. HiveConfDir - это локальный каталог, который находится на машине, на которую я отправляю приложение, я бы спросил, как flink может читать этот локальный каталог, когда основной класс работает в кластере (yarn-cluster)? Спасибо!
package org.example.app
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.types.Row
object FlinkBatchHiveTableIntegrationTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val name = "myHiveCatalog"
val defaultDatabase = "default"
//how does flink could read this local directory
val hiveConfDir = "/apache-hive-2.3.7-bin/conf"
val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir)
tenv.registerCatalog(name, hive)
tenv.useCatalog(name)
val sql =
"""
select * from testdb.t1
""".stripMargin(' ')
val table = tenv.sqlQuery(sql)
table.printSchema()
table.toAppendStream[Row].print()
env.execute("FlinkHiveIntegrationTest")
}
}