Сериализуйте MinMaxPriorityQueue Guava

После нескольких дней исследования, почему мое приложение Flink не работает должным образом, я Я пришел к выводу, что проблема заключается в MinMaxPriorityQueue, который я использую.

Кажется, что эта структура не сериализуема. Я пробовал несколько способов сериализовать его:

env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[JavaSerializer])
env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[java.lang.Double]], classOf[ProtobufSerializer]);
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);

всем им не повезло.

Однако я нашел это: Сериализация неизменяемой таблицы Guava

Есть ли эквивалент MinMaxPriorityQueue или способ его сериализации?

Обновлять

Я перевел Томаша на scala:

class MinMaxPriorityQueueSerializer extends Serializer[MinMaxPriorityQueue[Object]] {
  private[this] val log = LoggerFactory.getLogger(this.getClass)
  setImmutable(false)
  setAcceptsNull(false)

  val OPTIMIZE_POSITIVE = true

  override def read(kryo: Kryo, input: Input, aClass: Class[MinMaxPriorityQueue[Object]]): MinMaxPriorityQueue[Object] = {
    log.error("Kryo READ")
    val comparator: Ordering[Object] = kryo.readClassAndObject(input).asInstanceOf[Ordering[Object]]
    val size = input.readInt(OPTIMIZE_POSITIVE)

    val queue: MinMaxPriorityQueue[Object] = MinMaxPriorityQueue.orderedBy(comparator)
      .expectedSize(size)
      .create()

    (0 to size).foreach(_ => queue.offer(kryo.readClassAndObject(input)))

    queue
  }

  override def write(kryo: Kryo, output: Output, queue: MinMaxPriorityQueue[Object]): Unit = {
    log.error("Kryo WRITE")
    kryo.writeClassAndObject(output, queue.comparator)

    val declaredSize = queue.size
    output.writeInt(declaredSize, OPTIMIZE_POSITIVE)

    val actualSize = queue.toArray.foldLeft(0) {
      case (z, q) =>
        kryo.writeClassAndObject(output, q)
        z + 1
    }

    Preconditions.checkState(
      declaredSize == actualSize,
      "Declared size (%s) different than actual size (%s)", declaredSize, actualSize)
  }
}

И установите kryo in flink для использования этого сериализатора:

env.getConfig.addDefaultKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[MinMaxPriorityQueueSerializer])       

env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[MinMaxPriorityQueueSerializer])

Однако кажется, что он никогда не вызывается, поскольку я нигде в журналах не вижу выходных данных log.error("Kryo READ") и log.error("Kryo WRITE").

И преобразование по-прежнему возвращает пустой MinMaxPriorityQueue, даже если я его обновляю.

Обновление 2

Я реализовал SerializerTester, но получаю bufferUnderflow:

object Main {

  def main(args: Array[String]) {

    val tester = new MinMaxPriorityQueueSerializerTester()

    val inQueue: MinMaxPriorityQueue[java.lang.Double] = MinMaxPriorityQueue.create()
    inQueue.add(1.0)

    val outputStream = new ByteArrayOutputStream()
    tester.serialize(outputStream, inQueue)

    val inputStream = new ByteArrayInputStream(outputStream.toByteArray())
    val outQueue: MinMaxPriorityQueue[java.lang.Double] = tester.deserialize(inputStream);

    System.out.println(inQueue);
    System.out.println(outQueue);

  }

  class MinMaxPriorityQueueSerializerTester {
    val kryo = new Kryo
    kryo.setInstantiatorStrategy(new StdInstantiatorStrategy)
    registerMinMaxSerializer();
    //  allowForClassesWithoutNoArgConstructor(); // needed to serialize Ordering

    def registerMinMaxSerializer() {
      kryo.addDefaultSerializer(classOf[MinMaxPriorityQueue[java.lang.Double]], new MinMaxPriorityQueueSerializer());
    }

    def serialize(out: OutputStream, queue: MinMaxPriorityQueue[java.lang.Double]) {
      // try (Output output = new Output(out)) {
      val output = new Output(out)
      kryo.writeClassAndObject(output, queue)
      //      kryo.writeObject(output, queue)
      //}
      output.flush
    }

    def deserialize(in: InputStream): MinMaxPriorityQueue[java.lang.Double] = {
      //try (Input input = new Input(in)) {
      val input = new Input(in)
      //kryo.readObject(input, classOf[MinMaxPriorityQueue[java.lang.Double]])
      kryo.readClassAndObject(input).asInstanceOf[MinMaxPriorityQueue[java.lang.Double]]
      //p}
    }
  }

person ElBaulP    schedule 28.06.2018    source источник
comment
Вы можете создать проблему в репозитории Guava на Github и запросить эту функцию. Была проблема № 615, в которой упоминалось MinMaxPriorityQueue, но она не была включена в объем билета в конце.   -  person Xaerxess    schedule 28.06.2018
comment
@Xaerxess, спасибо, я открыл его: github.com/google/guava/issues/3192   -  person ElBaulP    schedule 28.06.2018
comment
@elbaulp Не могли бы вы опубликовать трассировку стека bufferUnderflow?   -  person Tomasz Linkowski    schedule 29.06.2018
comment
Nevermind, я решил использовать эту реализацию intervalHeap: github .com / allenbh / gkutil_java / blob / master / src / gkimfl / util /, поскольку он работает из коробки. Спасибо за помощь, я принимаю ваш ответ.   -  person ElBaulP    schedule 29.06.2018
comment
@elbaulp Спасибо, хотя мой ответ на самом деле не решил вашу проблему. Вы можете опубликовать отдельный ответ с упоминанием того, как вы ее решили, на случай, если у кого-то еще есть аналогичная проблема.   -  person Tomasz Linkowski    schedule 29.06.2018


Ответы (2)


Вы можете использовать собственный Kryo Serializer.

Вот пример (на Java):

class MinMaxPriorityQueueSerializer extends Serializer<MinMaxPriorityQueue<Object>> {

    private static final boolean OPTIMIZE_POSITIVE = true;

    protected MinMaxPriorityQueueSerializer() {
        setAcceptsNull(false);
        setImmutable(false);
    }

    @Override
    public void write(Kryo kryo, Output output, MinMaxPriorityQueue<Object> queue) {
        kryo.writeClassAndObject(output, queue.comparator());

        int declaredSize = queue.size();
        output.writeInt(declaredSize, OPTIMIZE_POSITIVE);

        int actualSize = 0;
        for (Object element : queue) {
            kryo.writeClassAndObject(output, element);
            actualSize++;
        }

        Preconditions.checkState(
                declaredSize == actualSize,
                "Declared size (%s) different than actual size (%s)", declaredSize, actualSize
        );
    }

    @Override
    public MinMaxPriorityQueue<Object> read(Kryo kryo, Input input, Class<MinMaxPriorityQueue<Object>> type) {
        @SuppressWarnings("unchecked")
        Comparator<Object> comparator = (Comparator<Object>) kryo.readClassAndObject(input);
        int size = input.readInt(OPTIMIZE_POSITIVE);

        MinMaxPriorityQueue<Object> queue = MinMaxPriorityQueue.orderedBy(comparator)
                .expectedSize(size)
                .create();

        for (int i = 0; i < size; ++i) {
            queue.offer(kryo.readClassAndObject(input));
        }
        return queue;
    }
}

Вот как вы могли бы это использовать:

class MinMaxPriorityQueueSerializerTester {

    public static void main(String[] args) {
        MinMaxPriorityQueueSerializerTester tester = new MinMaxPriorityQueueSerializerTester();

        MinMaxPriorityQueue<Integer> inQueue = MinMaxPriorityQueue.<Integer>orderedBy(Comparator.reverseOrder())
                .create(Arrays.asList(5, 2, 7, 2, 4));

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        tester.serialize(outputStream, inQueue);

        ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
        @SuppressWarnings("unchecked")
        MinMaxPriorityQueue<Integer> outQueue = (MinMaxPriorityQueue<Integer>) tester.deserialize(inputStream);

        System.out.println(inQueue);
        System.out.println(outQueue);
    }

    private final Kryo kryo;

    public MinMaxPriorityQueueSerializerTester() {
        this.kryo = new Kryo();
        registerMinMaxSerializer();
        allowForClassesWithoutNoArgConstructor(); // needed to serialize Ordering
    }

    private void registerMinMaxSerializer() {
        kryo.addDefaultSerializer(MinMaxPriorityQueue.class, new MinMaxPriorityQueueSerializer());
    }

    private void allowForClassesWithoutNoArgConstructor() {
        ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
                .setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
    }


    public void serialize(OutputStream out, MinMaxPriorityQueue<?> queue) {
        try (Output output = new Output(out)) {
            kryo.writeObject(output, queue);
        }
    }

    public MinMaxPriorityQueue<?> deserialize(InputStream in) {
        try (Input input = new Input(in)) {
            return kryo.readObject(input, MinMaxPriorityQueue.class);
        }
    }
}
person Tomasz Linkowski    schedule 28.06.2018
comment
Спасибо, попробовал, безуспешно :-( - person ElBaulP; 29.06.2018
comment
@elbaulp Я тестировал этот код на Java, и он работал. Вы тестировали сериализацию в Scala (вне Flink)? Если нет, сделайте это (как я сделал в MinMaxPriorityQueueSerializerTester), и тогда мы узнаем больше. - person Tomasz Linkowski; 29.06.2018

Я наконец сдался и попытался использовать другую структуру данных и сделать ее сериализуемой с помощью java.io.Serializable.

Эта структура данных представляет собой IntervalHeap, реализованный здесь , Я просто сделал его сериализуемым в мой проект.

Теперь все работает правильно.

person ElBaulP    schedule 29.06.2018