загрузка mongodb oplog.rs в искру фрейма данных

Я пытаюсь загрузить oplog.rs из MongoDB в искровой DataFrame, он загружает метаданные, и я проверил их с помощью функции printSchema, но когда я пытаюсь выполнить такое действие, как show или count, он выдает мне эту ошибку scala.MatchError: ((BsonMinKey,null),0) (of class scala.Tuple2). Я также попытался зарегистрировать его как соблазнительный, но все равно дает ту же ошибку.

val customReadConfig = ReadConfig(Map(
  "uri" -> 
    "mongodb://username:password@host_name:port/local.oplog.rs?authSource=xxxxx"
))

val dataframe = sqlContext.read.format("com.mongodb.spark.sql").
  options(customReadConfig.asOptions).load

person Hemaraj ku    schedule 03.03.2017    source источник


Ответы (1)


Для потомков:

Разделителем по умолчанию для версий Mongo> = 3.2 является MongoSamplePartitioner, он использует (как и все другие разделители) partitionKey, а когда он создает разделы, он использует BsonMinKey и BsonMaxKey для определения границ для каждого раздела. Ошибка совпадения, с которой вы столкнулись, скорее всего, происходит здесь:

  def createPartitions(partitionKey: String, splitKeys: Seq[BsonValue], 
      locations: Seq[String] = Nil, addMinMax: Boolean = true): 
      Array[MongoPartition] = {
        val minKeyMaxKeys = (new BsonMinKey(), new BsonMaxKey())
        val minToMaxSplitKeys: Seq[BsonValue] = if (addMinMax) minKeyMaxKeys._1 +: splitKeys :+ minKeyMaxKeys._2 else splitKeys
        val minToMaxKeysToPartition = if (minToMaxSplitKeys.length == 1) minToMaxSplitKeys else minToMaxSplitKeys.tail
        val partitionPairs: Seq[(BsonValue, BsonValue)] = minToMaxSplitKeys zip minToMaxKeysToPartition
        partitionPairs.zipWithIndex.map({
           case ((min: BsonValue, max: BsonValue), i: Int) => MongoPartition(i, createBoundaryQuery(partitionKey, min, max), locations)
      }).toArray
}

Эта ошибка сообщает вам, что для вашего max установлено значение null, и, как вы можете видеть в коде, обрабатывается только один случай. Если вы не установили partitionKey для использования, разделитель будет использовать _id по умолчанию, вы можете прочитать об этом здесь

Коллекция oplog.rs по умолчанию не имеет ключа _id, уникальный идентификатор записи oplog - удивительно названный h, это число. Итак, чтобы разделитель работал правильно, вам нужно установить в вашем SparkConf или ReadConfig spark.mongodb.input.partitionerOptions.partitionKey на h.

 new SparkConf()
   //all of your other settings
   .set("spark.mongodb.input.partitionerOptions.partitionKey", "h")
person Ahmad Ragab    schedule 06.07.2018