Как изменить количество разделов по умолчанию в Flink DataSet?

Вот требование: набор данных слишком велик, нам нужно разделить данные, вычислить локальный результат в каждом разделе, а затем объединить. Например, если имеется 1 миллион фрагментов данных, разделенных на 100 разделов, каждая копия будет содержать только около 10 000 фрагментов данных. Поскольку для настройки необходимо использовать количество разделов, количество разделов должно быть переменным. Кроме того, все данные раздела должны вычисляться партиями и не могут рассчитываться по одному.

Реализация выглядит следующим образом: после фазы разделения каждый фрагмент данных будет иметь ключ, представляющий раздел, к которому он принадлежит. И теперь данные должны выглядеть так: afterPartitionedData=[(0,data1),(0,data2)…(1,data3),(1,data4),…,(99,datan)]。 Затем используйте операторы Flink partitionCustom и mapPartition.

  dataSet = env. fromCollection(afterPartitionedData)
  dataset
      .partitionCustom(new myPartitioner(),0)
      .mapPartition(new myMapPartitionFunction[(Int,String),Int]())
…
…
  class myPartitioner extends  Partitioner[Int]{
    override def partition(key: Int, numPartitions: Int) = {
      println("numPartitions="+numPartitions) // 6 , CPU number
      key // just return the partitionID
    }
  }

Однако сообщается об ошибке:

...
Caused by: java.lang.ArrayIndexOutOfBoundsException: 6
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:226)
...

Похоже, это связано с тем, что количество разделов Flink по умолчанию DataSet - это количество процессоров, которое на моем компьютере равно 6, поэтому об этом будет сообщено java.lang.ArrayIndexOutOfBoundsException : 6.

Итак, мой вопрос: есть ли способ изменить количество разделов по желанию? Я нашел этот параметр в методе Partition (key: int, numpartitions: int) в API Partitioner, но не знал, как его изменить.

Есть ли способ изменить количество DataSet разделов?

Версия Flink - 1.6, а тестовый код:

object SimpleFlinkFromBlog {

  def main(args: Array[String]): Unit = {
    val  env  =  ExecutionEnvironment.getExecutionEnvironment
    val afterPartitionedData = new mutable.MutableList[(Int,  String)]
    afterPartitionedData.+=((0,  "0"))

    afterPartitionedData.+=((1,  "1"))

    afterPartitionedData.+=((2, "2"))
    afterPartitionedData.+=((2, "2"))

    afterPartitionedData.+=((3,  "3"))
    afterPartitionedData.+=((3,  "3"))
    afterPartitionedData.+=((3,  "3"))

    afterPartitionedData.+=((4,  "4"))

    afterPartitionedData.+=((5,  "5"))
    afterPartitionedData.+=((5,  "5"))
    afterPartitionedData.+=((5,  "5"))

    // Comment this line will not report an error.
    // java.lang.ArrayIndexOutOfBoundsException : 6
    afterPartitionedData.+=((6,  "will wrong"))

    val dataSet = env.fromCollection( afterPartitionedData )
    val localRes = dataSet
      .partitionCustom(new myPartitioner(),0)
      .mapPartition(new MapPartitionFunction[(Int,String),Int] {
        override def mapPartition(values: lang.Iterable[(Int, String)], out: Collector[Int]) = {
          var count = 0;
          values.forEach(new Consumer[(Int, String)] {
            override def accept(t: (Int, String)): Unit = {
              count=count+1;
              print("current count is " + count + "   tuple is " + t + "\n");
            }
          })
          out.collect(count)
        }
      })

    localRes.collect().foreach(println)
  }

  class myPartitioner extends  Partitioner[Int]{
    override def partition(key: Int, numPartitions: Int) = {
//      println("numPartitions="+numPartitions)
      key
    }
  }
}

Спасибо!


person shg    schedule 06.04.2021    source источник


Ответы (1)


Количество разделов - это параллелизм, который вы можете установить в командной строке при отправке задания или в flink-conf.yaml.

person David Anderson    schedule 06.04.2021