Я пытаюсь создать искровой фреймворк из Cosmos DB (MongoDB API). Но я получаю исключение:
* com.mongodb.MongoCommandException: команда завершилась ошибкой 16501: «Запрос превысил максимально допустимое использование памяти, равное 40 МБ. Пожалуйста, подумайте о добавлении дополнительных фильтров, чтобы уменьшить размер ответа на запрос ». на сервере ***** .documents.azure.com: 10255. Полный ответ: {_t: OKMongoResponse, ok: 0, code: 16501, errmsg: Запрос превышает максимально допустимое использование памяти 40 МБ. Пожалуйста, подумайте о добавлении дополнительных фильтров, чтобы уменьшить размер ответа на запрос., $ Err: Запрос превысил максимально допустимое использование памяти, равное 40 МБ. Рассмотрите возможность добавления дополнительных фильтров, чтобы уменьшить размер ответа на запрос.}
Есть ли способ справиться с этим: у меня нет контроля над MongoDB. Поэтому я не могу увеличить RU.
Ниже мой код для чтения данных из mongo db.
sparkSession = SparkSession.builder()
.appName(APP_NAME)
.master(sparkMaster)
.config("spark.mongodb.input.uri", uri)
.config("spark.mongodb.input.database", database_name)
.config("spark.mongodb.input.collection", collection_name).getOrCreate()
def getReadConfig(uri: String, database: String, collection: String): ReadConfig = {
ReadConfig(Map(
"spark.mongodb.input.uri" -> uri,
"spark.mongodb.input.database" -> database,
"spark.mongodb.input.collection" -> collection,
"readPreference.name" -> "secondaryPreferred",
"spark.mongodb.input.partitionerOptions.shardkey" -> MONGO_INPUT_SHARDKEY_VALUE
), Some(ReadConfig(getSparkSession.sparkContext)))
}
val readConfig = getReadConfig("uri", "database_name", "collection_name")
def getSubmissionsByIdRangePipeline(surveyId: String): Seq[Document] = {
Seq(Document.parse("{ $match: { partition_key: " + "partition_value" + ", id: '" + "id" + "' } }"))}
case class Submission(survey_id: String,
submitter_win: String,
answers: List[SurveyAnswer],
submitted_on: Timestamp)
val submissionsChunkRDD: MongoRDD[Document] = MongoSpark
.load(sparkSession, readConfig)
.withPipeline(getSubmissionsByIdRangePipeline(surveyId))
val df = submissionsChunkRDD.toDF[Submission]()
df.show()