добавление Кассандры в качестве приемника в ошибке Flink: все хосты, которые пытались выполнить запрос, не удались

Я следил за примером на https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/cassandra.html, чтобы подключить Кассандру в качестве приемника во Flink.

Мой код для показан ниже

public class writeToCassandra {

    private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE test WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};";
    private static final String createTable = "CREATE TABLE test.cassandraData(id varchar, heart_rate varchar, PRIMARY KEY(id));" ;


    private final static Collection<String> collection = new ArrayList<>(50);

    static {
        for (int i = 1; i <= 50; ++i) {
            collection.add("element " + i);
        }
    }

    public static void main(String[] args) throws Exception {


        //setting the env variable to local
        StreamExecutionEnvironment envrionment = StreamExecutionEnvironment.createLocalEnvironment(1);


        DataStream<Tuple2<String, String>> dataStream = envrionment
                .fromCollection(collection)
                .map(new MapFunction<String, Tuple2<String, String>>() {

                    final String mapped = " mapped ";
                    String[] splitted;

                    @Override
                    public Tuple2<String, String> map(String s) throws Exception {
                        splitted = s.split("\\s+");
                        return Tuple2.of(
                                UUID.randomUUID().toString(),
                                splitted[0] + mapped + splitted[1]
                        );
                    }
                });


        CassandraSink.addSink(dataStream)
                .setQuery("INSERT INTO test.cassandraData(id,heart_rate) values (?,?);")
                .setHost("127.0.0.1")
                .build();


        envrionment.execute();

    } //main




} //writeToCassandra

Я получаю следующую ошибку

Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [/127.0.0.1] Cannot connect))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)

person Amarjit Dhillon    schedule 31.08.2017    source источник
comment
Я думаю, что совершенно очевидно, что он не может подключиться к кластеру cassandra. Вы уверены, что на всех узлах диспетчера задач есть узел cassandra?   -  person Dawid Wysakowicz    schedule 31.08.2017


Ответы (2)


Не уверен, всегда ли это требуется, но я настраиваю CassandraSink следующим образом:

CassandraSink
    .addSink(dataStream)
    .setClusterBuilder(new ClusterBuilder() {
        @Override
        protected Cluster buildCluster(Cluster.Builder builder) {
            return Cluster.builder()
                .addContactPoints(myListOfCassandraUrlsString.split(","))
                .withPort(portNumber)
                .build();
        }
    })
    .build();

Я аннотировал объекты POJO, возвращаемые потоком данных, поэтому мне не нужен запрос, но вы должны просто включить «.setQuery (...)» после строки «.addSink (...)».

person Jicaar    schedule 01.09.2017

Исключение просто указывает, что программа-пример не может получить доступ к базе данных C *.

  1. flink-cassandra-connector предлагает потоковый API для подключения к указанной базе данных C *. Таким образом, у вас должен быть запущен экземпляр C *.
  2. Каждое задание потоковой передачи отправляется / сериализуется на узел, на котором работает диспетчер задач. В вашем примере вы предполагаете, что C * работает на том же узле, что и узел TM. Альтернативой является изменение адреса C * с 127.0.0.1 на публичный адрес.
person mcfongtw    schedule 02.10.2017