Обрезка перегородок на слиянии дельты озера Искра

Я использую дельта-озеро ("io.delta" %% "delta-core"% "0.4.0") и объединяю в foreachBatch, например:

foreachBatch { (s, batchid) =>
        deltaTable.alias("t")
          .merge(
            s.as("s"),
            "s.eventid = t.eventid and t.categories in ('a1', 'a2')")
          .whenMatched("s.eventtime < t.eventtime").updateAll()
          .whenNotMatched().insertAll()
          .execute()
      }

Дельта-таблица разделена на категории. Если я добавлю фильтр разделов, например 'и t.categories в (' a1 ',' a2 ')', из искрового графа я вижу, что ввод - это не вся таблица. Я думаю, что это произошло с обрезкой разделов. Однако, если я сделаю: «s.eventid = t.eventid and t.categories = s.categories», он все равно загрузит все данные из дельта-таблицы. Я ожидаю, что он может автоматически определять, какие разделы следует перейти, чтобы выполнить соединение, своего рода выталкивание. Возможно ли сокращение раздела без указания конкретных значений раздела? Я также пытался добавить ("spark.databricks.optimizer.dynamicPartitionPruning", "true"), но не работал.

Спасибо


person processadd    schedule 13.11.2019    source источник
comment
Та же проблема, с которой я столкнулся и в версии 0.5.   -  person KD157    schedule 10.01.2020
comment
динамическая обрезка фрагментов доступна только в среде выполнения databricks 5.5 lts   -  person murtihash    schedule 09.07.2020


Ответы (1)


Вы можете передать это двумя способами. Один из них - статический способ передачи значений, а другой - вы динамически устанавливаете разделы в операторе слияния.

  1. Статический способ передачи значений раздела.
val categoriesList = List("a1", "a2")  
val catergoryPartitionList  = categoriesList.mkString("','")

foreachBatch { (s, batchid) =>
    deltaTable.alias("t")
      .merge(
        s.as("s"),
        "s.eventid = t.eventid and t.categories in ('$catergoryPartitionList')")
      .whenMatched("s.eventtime < t.eventtime").updateAll()
      .whenNotMatched().insertAll()
      .execute()
  }
  1. Динамический способ передачи категорий в оператор Merge следующий:
val selectedCategories = deltaTable.select("categories").dropDuplicates()
  
val categoriesList = selectedCategories.map(_.getString(0)).collect()

val catergoryPartitionList  = categoriesList.mkString("','")

foreachBatch { (s, batchid) =>
    deltaTable.alias("t")
      .merge(
        s.as("s"),
        "s.eventid = t.eventid and t.categories in ('$catergoryPartitionList')")
      .whenMatched("s.eventtime < t.eventtime").updateAll()
      .whenNotMatched().insertAll()
      .execute()
  }
person Nikunj Kakadiya    schedule 14.08.2020