Невозможно аутентифицировать кластер cassandra через программу Spark Scala

Пожалуйста, предложите мне решить проблему, указанную ниже, или предложите другой подход для решения моей проблемы. Я получаю данные откуда-то и вставляю их в кассандру ежедневно, тогда мне нужно получить данные из кассандры в течение целой недели, выполнить некоторую обработку и вставить результат обратно в кассандру.

У меня много записей, каждая из которых выполняет большинство из перечисленных ниже операций. Согласно моему предыдущему сообщению, предложение Представление предупреждения о готовом заявлении, чтобы избежать повторного представления подготовленного оператора, попыталось сохранить карту строки запроса vs подготовленные заявления.

Я попытался написать следующую программу Spark Scala, я проверил данные хоста cassandra из cqlsh, я могу подключиться к нему. Но через программу, когда я пытаюсь, я получаю ошибку.

class StatementCache {
  val acluster = CassandraUtils.initialize(nodes,user,pass, cassport,racdc)

  val session = acluster.connect("keyspacename");

      val statementCache: ConcurrentHashMap[String,PreparedStatement] = new ConcurrentHashMap


      def getStatement(cql : String): BoundStatement = {
    var ps : PreparedStatement = statementCache.get(cql);
     if (ps == null) {
                ps = session.prepare(cql);
                statementCache.put(cql, ps);
            }
            return ps.bind();
        }
    }


object CassandraUtils {
  println("##########entered cassandrutils")
   //val st=new STMT();
 private val psCache  : StatementCache = new StatementCache();
 val selectQuery = "select * from k1.table1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt >= ? and summ_dt <= ?;"
  val selectTripQuery = "select * from k1.tale1 where s_id = ? and a_id = ? and summ_typ = ? and summ_dt = ? and t_summ_id = ?;"

  val insertQuery = "insert into k1.table1 (s_id, a_id, summ_typ, summ_dt, t_summ_id, a_s_no, avg_sp, c_dist, c_epa, c_gal, c_mil, d_id, d_s_no, dist, en_dt, en_lat, en_long, epa, gal, h_dist, h_epa,h_gal, h_mil, id_tm, max_sp, mil, rec_crt_dt, st_lat, st_long, tr_dis, tr_dt, tr_dur,st_addr,en_addr) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?,?);"
  val updateQuery = "update k1.table1 set tr_dur=?,id_tm=?,max_sp=?,c_dist=?,h_dist=?,dist=?,c_gal=?,c_mil=?,h_gal=?,h_mil=?,c_epa=?,h_epa=?,epa=?,gal=?,rec_crt_dt=?,mil=?,avg_sp=?,tr_dis=?,en_lat=?,en_long=? where s_id= ? and a_id= ? and summ_typ= ? and  summ_dt= ? and t_summ_id=?; "

  def insert(session: Session, data: TripHistoryData, batch: BatchStatement) {
   batch.add(psCache.getStatement(insertQuery));
  }

  def update(session: Session, data: TripHistoryData, batch: BatchStatement) {
    batch.add(psCache.getStatement(updateQuery));
    }

     def initialize(clusterNodes: String, uid: String, pwd: String, port: Int, racdc:String): Cluster = {

    val builder = Cluster.builder().addContactPoints(InetAddress.getByName(clusterNodes))
      .withRetryPolicy(DefaultRetryPolicy.INSTANCE)
      .withLoadBalancingPolicy(
        new TokenAwarePolicy(
          DCAwareRoundRobinPolicy.builder() //You can directly use the DCaware without TokenAware as well
            .withLocalDc(racdc) //This is case sensitive as defined in rac-dc properties file
            //.withUsedHostsPerRemoteDc(2) //Try at most 2 remote DC nodes in case all local nodes are dead in the current DC
            //.allowRemoteDCsForLocalConsistencyLevel()
            .build()))

    if (StringUtils.isNotEmpty(uid)) {
      builder.withCredentials(uid, pwd)
    }

    val cluster: Cluster = builder.build()
    cluster
  }
}

-----------------------------------------------------------------------------------------------------------------

Я получаю следующую ошибку:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ExceptionInInitializerError
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:91)
    at com.vzt.afm.hum.dh.app.ProcessMPacket$$anonfun$1.apply(ProcessMPacket.scala:45)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.AuthenticationException: Authentication error on host hostname1: Host hostname1 requires authentication, but no authenticator found in Cluster configuration
    at com.datastax.driver.core.AuthProvider$1.newAuthenticator(AuthProvider.java:40)
    at com.datastax.driver.core.Connection$5.apply(Connection.java:261)
    at com.datastax.driver.core.Connection$5.apply(Connection.java:243)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:906)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1$1.run(Futures.java:635)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.MoreExecutors$DirectExecutorService.execute(MoreExecutors.java:299)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1.run(Futures.java:632)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:457)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)
    at shade.com.datastax.spark.connector.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:185)
    at com.datastax.driver.core.Connection$Future.onSet(Connection.java:1288)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1070)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:993)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    ... 1 more

person shantha ramadurga    schedule 06.09.2018    source источник


Ответы (3)


вам необходимо обеспечить аутентификацию в вашем экземпляре кластера .withCredentials (username.trim (), password.trim ())

Или вам нужно отключить аутентификацию на уровне cassandra, изменив значение ключа аутентификатора на AllowAllAuthenticator в cassandra.yaml ..

Примечание: изменение yaml требует перезапуска cassandra

person Laxmikant    schedule 06.09.2018
comment
нет, я попробовал первый вариант, у меня нет прав на изменение на уровне cassandra. но предыдущий код работал нормально до изменения кода. - person shantha ramadurga; 06.09.2018
comment
Правильный ли синтаксис класса statementCache и метода вставки cassandraUtil? Я попытался реализовать вашу идею. - person shantha ramadurga; 06.09.2018
comment
Привет, laxmikant, я решил эту ошибку, но я все еще получаю подготовленное заявление, влияющее на предупреждение о производительности. - person shantha ramadurga; 06.09.2018
comment
Как ты решил? Вы должны получить это предупреждение один раз за запрос, если ваш кеш глобальный. - person Laxmikant; 06.09.2018
comment
как сделать кеш глобальным в искровом коде Scala здесь? - person shantha ramadurga; 06.09.2018

я решил проблему. Поместив следующие строки внутри метода getStatement вместо внешнего метода.

val acluster = CassandraUtils.initialize(nodes,user,pass, cassport,racdc)

val session = acluster.connect("keyspacename");
person shantha ramadurga    schedule 06.09.2018

Ваша проблема в том, что вы пытаетесь выполнить «ручное» управление подключением - это не работает со Spark - экземпляры _1 _ / _ 2_ должны быть отправлены исполнителям, но они не будут работать правильно, как эти экземпляры создан в драйвере. Конечно, вы можете использовать «типичный» шаблон выполнения foreachPartition и т. Д., Как описано в этот вопрос.

Лучший способ работать с Cassandra из Spark - использовать Cassandra Spark Connector - он будет автоматически распределяет нагрузку между узлами и выполняет правильную вставку и обновление данных. В этом случае вы настраиваете параметры подключения, включая аутентификацию, через свойства Spark (spark.cassandra.auth.username & spark.cassandra.auth.password). Дополнительная информация о подключении находится в документации. .

person Alex Ott    schedule 06.09.2018
comment
Алекс, я попытался использовать разъем Spark Cassandra, но запутался, не могли бы вы проверить эту ссылку stackoverflow.com/q/52225041/10246547 и скажи мне, в чем я ошибаюсь .. пожалуйста, помогите мне. - person shantha ramadurga; 07.09.2018