как подключиться к более чем 1 хостам cassandra с помощью искрового коннектора cassandra

У меня есть искровое приложение, которое считывает данные из одного кластера кассандры и после некоторых вычислений сохраняет данные в другой кластер кассандры. Я могу установить только 1 конфигурацию cassandra в sparkconf. но мне нужно подключиться к еще одному кластеру кассандры.

Я вижу класс CassandraConnector, который используется для подключения к cassandra, но он использует объект CassandraConnectorConf для создания объекта, который принимает множество параметров, которых я не знаю.

Любая помощь будет полезна


person Sorabh Kumar    schedule 11.08.2015    source источник


Ответы (2)


Используйте следующий код:

SparkConf confForCassandra = new SparkConf().setAppName("ConnectToCassandra")
                .setMaster("local[*]")
                .set("spark.cassandra.connection.host", "<cassandraHost>");

CassandraConnector connector = CassandraConnector.apply(confForCassandra);

javaFunctions(rdd).writerBuilder("keyspace", "table", mapToRow(Table.class)).withConnector(connector).saveToCassandra();
person Sorabh Kumar    schedule 21.08.2015
comment
Похоже, вы подключаетесь только к одному хосту cassandra. Куда вы добавляете другой хост? Есть где-нибудь конфигурационный файл? - person Nadine; 02.12.2015
comment
Вы имеете в виду, что вы создаете 2 переменные SparkConf, по одной для каждого хоста cassandra, а затем две разные переменные коннектора? - person Nadine; 02.12.2015

Если вы хотите подключиться к двум кластерам Cassandra с помощью Scala и Spark, вы можете использовать следующий код:

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._

import org.apache.spark.SparkContext


def twoClusterExample ( sc: SparkContext) = {
  val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1"))
  val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2"))

  val rddFromClusterOne = {
    // Sets connectorToClusterOne as default connection for everything in this code block
    implicit val c = connectorToClusterOne
    sc.cassandraTable("ks","tab")
  }

  {
    //Sets connectorToClusterTwo as the default connection for everything in this code block
    implicit val c = connectorToClusterTwo
    rddFromClusterOne.saveToCassandra("ks","tab")
  }

} 

Исходный код был написан RusselSpitzer здесь: https://gist.github.com/RussellSpitzer/437f57cdae4fd4b >

В настоящее время это невозможно сделать с помощью Python и Spark.

person Nadine    schedule 30.12.2015