Невозможно записать в кластер, если replication__factor больше 1

Я использую Spark 1.6.1, Cassandra 2.2.3 и разъем Cassandra-Spark 1.6. .

Я уже пытался писать в многоузловой кластер, но с replication_factor:1. Теперь я пытаюсь записать в кластер из 6 узлов с одним начальным числом и пространством ключей, в котором replication_factor> 1, но Spark не отвечает, и он отказывается это делать.

Как я уже упоминал, это работает, когда я пишу координатору с ключевым словом, установленным на 1.

Это лог, который я получаю, и он всегда останавливается здесь или через полчаса он начинает чистить аккумуляторы и снова останавливается на четвертом.

16/08/16 17:07:03 INFO NettyUtil: Found Netty's native epoll transport in  the classpath, using it
16/08/16 17:07:04 INFO Cluster: New Cassandra host /127.0.0.1:9042 added
16/08/16 17:07:04 INFO LocalNodeFirstLoadBalancingPolicy: Added host 127.0.0.1 (datacenter1)
16/08/16 17:07:04 INFO Cluster: New Cassandra host /127.0.0.2:9042 added
16/08/16 17:07:04 INFO LocalNodeFirstLoadBalancingPolicy: Added host  127.0.0.2 (datacenter1)
16/08/16 17:07:04 INFO Cluster: New Cassandra host /127.0.0.3:9042 added
16/08/16 17:07:04 INFO LocalNodeFirstLoadBalancingPolicy: Added host 127.0.0.3 (datacenter1)
16/08/16 17:07:04 INFO Cluster: New Cassandra host /127.0.0.4:9042 added
16/08/16 17:07:04 INFO LocalNodeFirstLoadBalancingPolicy: Added host 127.0.0.4 (datacenter1)
16/08/16 17:07:04 INFO Cluster: New Cassandra host /127.0.0.5:9042 added
16/08/16 17:07:04 INFO LocalNodeFirstLoadBalancingPolicy: Added host 127.0.0.5 (datacenter1)
16/08/16 17:07:04 INFO Cluster: New Cassandra host /127.0.0.6:9042 added
16/08/16 17:07:04 INFO CassandraConnector: Connected to Cassandra cluster: Test Cluster
16/08/16 17:07:05 INFO SparkContext: Starting job: take at CassandraRDD.scala:121
16/08/16 17:07:05 INFO DAGScheduler: Got job 3 (take at CassandraRDD.scala:121) with 1 output partitions
16/08/16 17:07:05 INFO DAGScheduler: Final stage: ResultStage 4 (take at CassandraRDD.scala:121)
16/08/16 17:07:05 INFO DAGScheduler: Parents of final stage: List()
16/08/16 17:07:05 INFO DAGScheduler: Missing parents: List()
16/08/16 17:07:05 INFO DAGScheduler: Submitting ResultStage 4 (CassandraTableScanRDD[17] at RDD at CassandraRDD.scala:18), which has no missing parents
16/08/16 17:07:05 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 8.3 KB, free 170.5 KB)
16/08/16 17:07:05 INFO MemoryStore: Block broadcast
Cluster Information:
Name: Test Cluster
Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Schema versions:
    bf6c3ae7-5c8b-3e5d-9794-8e34bee9278f: [127.0.0.1, 127.0.0.2, 127.0.0.3, 127.0.0.4, 127.0.0.5, 127.0.0.6]
piece0 stored as bytes in memory (estimated size 4.2 KB, free 174.7 KB) 16/08/16 17:07:05 INFO BlockManagerInfo: Added broadcast
Cluster Information:
Name: Test Cluster
Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Schema versions:
    bf6c3ae7-5c8b-3e5d-9794-8e34bee9278f: [127.0.0.1, 127.0.0.2, 127.0.0.3, 127.0.0.4, 127.0.0.5, 127.0.0.6]
piece0 in memory on localhost:43680 (size: 4.2 KB, free: 756.4 MB) 16/08/16 17:07:05 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1006 16/08/16 17:07:05 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 4 (CassandraTableScanRDD[17] at RDD at CassandraRDD.scala:18) 16/08/16 17:07:05 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks 16/08/16 17:07:05 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 204, localhost, partition 0,NODE_LOCAL, 22553 bytes) 16/08/16 17:07:05 INFO Executor: Running task 0.0 in stage 4.0 (TID 204) 16/08/16 17:07:06 INFO Executor: Finished task 0.0 in stage 4.0 (TID 204). 2074 bytes result sent to driver 16/08/16 17:07:06 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 204) in 1267 ms on localhost (1/1) 16/08/16 17:07:06 INFO DAGScheduler: ResultStage 4 (take at CassandraRDD.scala:121) finished in 1.276 s 16/08/16 17:07:06 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 16/08/16 17:07:06 INFO DAGScheduler: Job 3 finished: take at CassandraRDD.scala:121, took 1.310929 s 16/08/16 17:07:06 INFO SparkContext: Starting job: take at CassandraRDD.scala:121 16/08/16 17:07:06 INFO DAGScheduler: Got job 4 (take at CassandraRDD.scala:121) with 4 output partitions 16/08/16 17:07:06 INFO DAGScheduler: Final stage: ResultStage 5 (take at CassandraRDD.scala:121) 16/08/16 17:07:06 INFO DAGScheduler: Parents of final stage: List() 16/08/16 17:07:06 INFO DAGScheduler: Missing parents: List() 16/08/16 17:07:06 INFO DAGScheduler: Submitting ResultStage 5 (CassandraTableScanRDD[17] at RDD at CassandraRDD.scala:18), which has no missing parents 16/08/16 17:07:06 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 8.4 KB, free 183.1 KB) 16/08/16 17:07:06 INFO MemoryStore: Block broadcast
CREATE KEYSPACE traffic WITH replication = {'class': 'SimpleStrategy',    'replication_factor': '3'}  AND durable_writes = true;
piece0 stored as byt es in memory (estimated size 4.2 KB, free 187.3 KB) 16/08/16 17:07:06 INFO BlockManagerInfo: Added broadcast
CREATE KEYSPACE traffic WITH replication = {'class': 'SimpleStrategy',    'replication_factor': '3'}  AND durable_writes = true;
piece0 in memory on localhost:43680 (size: 4.2 KB, free: 756.3 MB) 16/08/16 17:07:06 INFO SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:1006 16/08/16 17:07:06 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 5 (CassandraTableScanRDD[17] at RDD at CassandraRDD.scala:18) 16/08/16 17:07:06 INFO TaskSchedulerImpl: Adding task set 5.0 with 4 tasks 16/08/16 17:07:06 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 205, localhost, partition 1,NODE_LOCAL, 22553 bytes) 16/08/16 17:07:06 INFO Executor: Running task 0.0 in stage 5.0 (TID 205) 16/08/16 17:07:07 INFO Executor: Finished task 0.0 in stage 5.0 (TID 205). 2074 bytes result sent to driver 16/08/16 17:07:07 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 205) in 706 ms on localhost (1/4) 16/08/16 17:07:14 INFO CassandraConnector: Disconnected from Cassandra cluster: Test Cluster 16/08/16 17:32:40 INFO BlockManagerInfo: Removed broadcast
Cluster Information:
Name: Test Cluster
Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Schema versions:
    bf6c3ae7-5c8b-3e5d-9794-8e34bee9278f: [127.0.0.1, 127.0.0.2, 127.0.0.3, 127.0.0.4, 127.0.0.5, 127.0.0.6]
piece0 on localhost:43680 in memory (size: 4.2 KB, free: 756.4 MB) 16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 14 16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 13 16/08/16 17:32:40 INFO BlockManagerInfo: Removed broadcast
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.types.{StructType, StructField, DateType,  IntegerType};




object ff {
def main(string: Array[String]) {

val conf = new SparkConf()
  .set("spark.cassandra.connection.host", "127.0.0.1")
  .setMaster("local[4]")
  .setAppName("ff")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", "true") // Use first line of all files as header
  .option("inferSchema", "true")
  .load("test.csv")

df.registerTempTable("ff_table")
//df.printSchema()

df.count
time {
  df.write
    .format("org.apache.spark.sql.cassandra")
    .options(Map("table" -> "ff_table", "keyspace" -> "traffic"))
    .save()
}
def time[A](f: => A) = {
  val s = System.nanoTime
  val ret = f
  println("time: " + (System.nanoTime - s) / 1e6 + "ms")
  ret
}



 }
}
piece0 on localhost:43680 in memory (size: 7.1 KB, free: 756.4 MB) 16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 12 16/08/16 17:32:40 INFO ContextCleaner: Cleaned shuffle 0 16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 11 16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 10 16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 9 16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 8 16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 7 16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 6 16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 5 16/08/16 17:32:40 INFO ContextCleaner: Cleaned accumulator 4 16/08/16 17:32:40 INFO BlockManagerInfo: Removed broadcast_4_piece0 on localhost:43680 in memory (size: 13.8 KB, free: 756.4 MB) 16/08/16 20:45:06 INFO SparkContext: Invoking stop() from shutdown hook

ИЗМЕНИТЬ

Это фрагмент кода, что именно я делаю:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.types.{StructType, StructField, DateType,  IntegerType};




object ff {
def main(string: Array[String]) {

val conf = new SparkConf()
  .set("spark.cassandra.connection.host", "127.0.0.1")
  .setMaster("local[4]")
  .setAppName("ff")

val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("header", "true") // Use first line of all files as header
  .option("inferSchema", "true")
  .load("test.csv")

df.registerTempTable("ff_table")
//df.printSchema()

df.count
time {
  df.write
    .format("org.apache.spark.sql.cassandra")
    .options(Map("table" -> "ff_table", "keyspace" -> "traffic"))
    .save()
}
def time[A](f: => A) = {
  val s = System.nanoTime
  val ret = f
  println("time: " + (System.nanoTime - s) / 1e6 + "ms")
  ret
}



 }
}

Кроме того, если я запустил nodetool describecluster, я получил следующие результаты:

Cluster Information:
Name: Test Cluster
Snitch: org.apache.cassandra.locator.DynamicEndpointSnitch
Partitioner: org.apache.cassandra.dht.Murmur3Partitioner
Schema versions:
    bf6c3ae7-5c8b-3e5d-9794-8e34bee9278f: [127.0.0.1, 127.0.0.2, 127.0.0.3, 127.0.0.4, 127.0.0.5, 127.0.0.6]

Моя конфигурация пространства ключей:

CREATE KEYSPACE traffic WITH replication = {'class': 'SimpleStrategy',    'replication_factor': '3'}  AND durable_writes = true;

Я попытался вставить в CLI строку для replication_factor:3, и это работает, поэтому все узлы могут видеть друг друга. Почему Spark не может ничего вставить, кто-нибудь знает?


person iMajna    schedule 16.08.2016    source источник
comment
У меня была аналогичная проблема, когда Spark не мог писать на один узел Cassandra в пространстве ключей с коэффициентом репликации > 1. Я полагаю, что это происходит, когда Cassandra не может распознать свои узлы в кластере. Обычно, когда Spark не может писать в Cassandra, в журнале появляется запись вроде wrote 0 rows to Cassandra.   -  person Saif Charaniya    schedule 16.08.2016
comment
Я внес изменения и добавил фрагмент своего кода. Хм, а как же так, что я могу писать, когда replication_factor: 1 и с nodetool status все они на ногах и видят друг друга. Как вы решили проблему?   -  person iMajna    schedule 16.08.2016
comment
Вы правы, я установил CONSISTENCY TWO в Spark, 6 моих серверов были онлайн, и я получил это сообщение ot enough replicas available for query at consistency TWO (2 required but only 1 alive). Ты помнишь, как ты это исправил?   -  person iMajna    schedule 17.08.2016