Получение `` Запрос превысил максимально допустимое использование памяти в 40 МБ '' при загрузке огромных данных из MongoDB в искровой фрейм данных

Я пытаюсь создать искровой фреймворк из Cosmos DB (MongoDB API). Но я получаю исключение:

* com.mongodb.MongoCommandException: команда завершилась ошибкой 16501: «Запрос превысил максимально допустимое использование памяти, равное 40 МБ. Пожалуйста, подумайте о добавлении дополнительных фильтров, чтобы уменьшить размер ответа на запрос ». на сервере ***** .documents.azure.com: 10255. Полный ответ: {_t: OKMongoResponse, ok: 0, code: 16501, errmsg: Запрос превышает максимально допустимое использование памяти 40 МБ. Пожалуйста, подумайте о добавлении дополнительных фильтров, чтобы уменьшить размер ответа на запрос., $ Err: Запрос превысил максимально допустимое использование памяти, равное 40 МБ. Рассмотрите возможность добавления дополнительных фильтров, чтобы уменьшить размер ответа на запрос.}

Есть ли способ справиться с этим: у меня нет контроля над MongoDB. Поэтому я не могу увеличить RU.

Ниже мой код для чтения данных из mongo db.

sparkSession = SparkSession.builder()
      .appName(APP_NAME)
      .master(sparkMaster)
      .config("spark.mongodb.input.uri", uri)
      .config("spark.mongodb.input.database", database_name)
      .config("spark.mongodb.input.collection", collection_name).getOrCreate()

def getReadConfig(uri: String, database: String, collection: String): ReadConfig = {
    ReadConfig(Map(
      "spark.mongodb.input.uri" -> uri,
      "spark.mongodb.input.database" -> database,
      "spark.mongodb.input.collection" -> collection,
      "readPreference.name" -> "secondaryPreferred",
      "spark.mongodb.input.partitionerOptions.shardkey" -> MONGO_INPUT_SHARDKEY_VALUE
    ), Some(ReadConfig(getSparkSession.sparkContext)))
  }

val readConfig = getReadConfig("uri", "database_name", "collection_name")

def getSubmissionsByIdRangePipeline(surveyId: String): Seq[Document] = {
    Seq(Document.parse("{ $match: { partition_key: " + "partition_value" + ", id: '" + "id" + "' } }"))}

case class Submission(survey_id: String,
                      submitter_win: String,
                      answers: List[SurveyAnswer],
                      submitted_on: Timestamp)

val submissionsChunkRDD: MongoRDD[Document] = MongoSpark
          .load(sparkSession, readConfig)
          .withPipeline(getSubmissionsByIdRangePipeline(surveyId))

val df = submissionsChunkRDD.toDF[Submission]()

df.show()


person Basant Jain    schedule 23.12.2020    source источник


Ответы (1)


Эта проблема была связана с Cosmos DB, использующей протокол Mongo Wire версии 3.2. Это ссылка на проблему GitHub.

Чтобы решить эту проблему, вам не нужно увеличивать количество единиц RU, а вместо этого более тщательно отфильтровать выходной набор данных. Для этого может быть два способа:

  1. Логика фильтрации кажется частью вашего кода. Поэтому лучше добавить к запросу дополнительный фильтр (возможно, диапазон дат отправки) и получить уменьшенный набор результатов.

  2. Кроме того, если есть какие-либо конкретные свойства, которые вам нужны из набора результатов, вы можете проецировать только те во время запроса, чтобы размер документа в наборе результатов уменьшился.

Согласно этой проблеме GitHub, Cosmos DB для обновления до проводного протокола v3.6, эта проблема должна быть решена. Эти шаги из документа MSFT можно выполнить. для обновления.

person akg179    schedule 31.12.2020