Обучение GAN в keras с помощью .fit_generator ()

Я тренировал условную архитектуру GAN, подобную Pix2Pix, со следующим циклом обучения:

for epoch in range(start_epoch, end_epoch):
    for batch_i, (input_batch, target_batch) in enumerate(dataLoader.load_batch(batch_size)):
                fake_batch= self.generator.predict(input_batch)

                d_loss_real = self.discriminator.train_on_batch(target_batch, valid)
                d_loss_fake = self.discriminator.train_on_batch(fake_batch, invalid)
                d_loss = np.add(d_loss_fake, d_loss_real) * 0.5

                g_loss = self.combined.train_on_batch([target_batch, input_batch], [valid, target_batch])

Теперь это работает хорошо, но не очень эффективно, поскольку загрузчик данных быстро становится узким местом с точки зрения времени. Я изучил функцию .fit_generator (), которую предоставляет keras, которая позволяет генератору работать в рабочем потоке и работает намного быстрее.

self.combined.fit_generator(generator=trainLoader,
                                    validation_data=evalLoader
                                    callbacks=[checkpointCallback, historyCallback],
                                    workers=1,
                                    use_multiprocessing=True)

Мне потребовалось некоторое время, чтобы убедиться, что это было неверно, я больше не тренировал свой генератор и дискриминатор по отдельности, а дискриминатор вообще не обучался, так как он был установлен на trainable = False в комбинированной модели, что по существу разрушило любые виды состязательности. потеря, и я мог бы также обучить свой генератор сам с помощью MSE.

Теперь мой вопрос: есть ли какая-то работа, например, обучение моего дискриминатора внутри настраиваемого обратного вызова, который запускается каждой партией метода .fit_generator ()? Можно реализовать создание пользовательских обратных вызовов, например, вот так:

class MyCustomCallback(tf.keras.callbacks.Callback):
  def on_train_batch_end(self, batch, logs=None):
    discriminator.train_on_batch()

Другой возможностью было бы распараллелить исходный цикл обучения, но я боюсь, что у меня сейчас нет времени на это.


person Ahmad Moussa    schedule 10.11.2019    source источник


Ответы (2)


Обновление: для этого есть встроенные очереди:

Вы можете проверить быстрый способ их использования в этом ответе: https://stackoverflow.com/a/59214794/2097240


Старый ответ:

Именно для этой цели я и создал этот распараллеленный итератор. Использую на тренировках;

Вот как вы его используете:

for epoch, batchIndex, originalBatchIndex, xAndY in ParallelIterator(
                                       generator, 
                                       epochs, 
                                       shuffle_bool, 
                                       use_on_epoch_end_from_generator_bool,
                                       workers = 8, 
                                       queue_size=10):
    #loop content
    x_train_batch, y_train_batch = xAndY
    model.train_on_batch(x_train_batch, y_train_batch)


generator там должен быть ваш dataloader, но он должен быть keras.utils.Sequence, а не просто генератором доходности.

Но при необходимости адаптироваться не так уж и сложно. (Я просто не знаю, будет ли он правильно распараллеливаться, хотя я не знаю, можно ли правильно распараллелить циклы yield)
В приведенном ниже определении итератора вы должны заменить:

  • len(keras_sequence) с steps_per_epoch
  • keras_sequence[i] с next(keras_sequence)
  • use_on_epoch_end = False

А это определение итератора:


import multiprocessing.dummy as mp

#A generator that wraps a Keras Sequence and simulates a `fit_generator` behavior for custom training loops
#It will also work with any iterator that has `__len__` and `__getitem__`.    
def ParallelIterator(keras_sequence, epochs, shuffle, use_on_epoch_end, workers = 4, queue_size = 10):

    sourceQueue = mp.Queue()                     #queue for getting batch indices
    batchQueue = mp.Queue(maxsize = queue_size)  #queue for getting actual batches 
    indices = np.arange(len(keras_sequence))     #array of indices to be shuffled

    use_on_epoch_end = 'on_epoch_end' in dir(keras_sequence) if use_on_epoch_end == True else False
    batchesLeft = 0

#     printQueue = mp.Queue()                      #queue for printing messages
#     import threading
#     screenLock = threading.Semaphore(value=1)
#     totalWorkers= 0

#     def printer():
#         nonlocal printQueue, printing
#         while printing:
#             while not printQueue.empty():
#                 text = printQueue.get(block=True)
#                 screenLock.acquire()
#                 print(text)
#                 screenLock.release()

    #fills the batch indices queue (called when sourceQueue is empty -> a few batches before an epoch ends)
    def fillSource():
        nonlocal batchesLeft

#         printQueue.put("Iterator: fill source - source qsize = " + str(sourceQueue.qsize()))
        if shuffle == True:
            np.random.shuffle(indices)

        #puts the indices in the indices queue
        batchesLeft += len(indices)
#         printQueue.put("Iterator: batches left:" + str(batchesLeft))
        for i in indices:
            sourceQueue.put(i)

    #function that will load batches from the Keras Sequence
    def worker():
        nonlocal sourceQueue, batchQueue, keras_sequence, batchesLeft
#         nonlocal printQueue, totalWorkers
#         totalWorkers += 1
#         thisWorker = totalWorkers

        while True:
#             printQueue.put('Worker: ' + str(thisWorker) + ' will try to get item')
            index = sourceQueue.get(block = True) #get index from the queue
#             printQueue.put('Worker: ' + str(thisWorker) + ' got item ' +  str(index) + " - source q size = " + str(sourceQueue.qsize()))

            if index is None:
                break

            item = keras_sequence[index] #get batch from the sequence
            batchesLeft -= 1
#             printQueue.put('Worker: ' + str(thisWorker) + ' batches left ' + str(batchesLeft))

            batchQueue.put((index,item), block=True) #puts batch in the batch queue
#             printQueue.put('Worker: ' + str(thisWorker) + ' added item ' + str(index) + ' - queue: ' + str(batchQueue.qsize()))

#         printQueue.put("hitting end of worker" + str(thisWorker))

#       #printing pool that will print messages from the print queue
#     printing = True
#     printPool = mp.Pool(1, printer)

    #creates the thread pool that will work automatically as we get from the batch queue
    pool = mp.Pool(workers, worker)    
    fillSource()   #at this point, data starts being taken and stored in the batchQueue

    #generation loop
    for epoch in range(epochs):

        #if not waiting for epoch end synchronization, always keeps 1 epoch filled ahead
        if (use_on_epoch_end == False):
            if epoch + 1 < epochs: #only fill if not last epoch
                fillSource()

        for batch in range(len(keras_sequence)):

            #if waiting for epoch end synchronization, wait for workers to have no batches left to get, then call epoch end and fill
            if use_on_epoch_end == True:
                if batchesLeft == 0:
                    keras_sequence.on_epoch_end()
                    if epoch + 1 < epochs:  #only fill if not last epoch
                        fillSource()
                    else:
                        batchesLeft = -1   #in the last epoch, prevents from calling epoch end again and again

            #yields batches for the outside loop that is using this generator
            originalIndex, batchItems = batchQueue.get(block = True)
            yield epoch, batch, originalIndex, batchItems


#         print("iterator epoch end")
#     printQueue.put("closing threads")

    #terminating the pool - add None to the queue so any blocked worker gets released
    for i in range(workers):
        sourceQueue.put(None)
    pool.terminate()
    pool.close()
    pool.join()
#     printQueue.put("terminated")

#     printing = False
#     printPool.terminate()
#     printPool.close()
#     printPool.join()


    del pool,sourceQueue,batchQueue
#     del printPool, printQueue
person Daniel Möller    schedule 13.11.2019
comment
У меня еще не было времени попробовать ваш ответ, но с таким же успехом могу наградить награду до истечения ее срока. В любом случае спасибо за ответ! - person Ahmad Moussa; 17.11.2019
comment
Спасибо: D - Работает, сейчас использую в своем коде :)) - person Daniel Möller; 17.11.2019
comment
Наконец-то я нашел время проверить ваш итератор, и он супер умный. Спасибо! Ваш ответ определенно стоил 50 повторений: D - person Ahmad Moussa; 27.11.2019
comment
@AhmadMoussa, я обнаружил двух enqueuer'ов, которые выполняют эту работу с меньшим количеством кода :), обновил свой ответ. - person Daniel Möller; 06.12.2019
comment
Привет, спасибо за обновление! Это супер мило с вашей стороны, я бы еще раз проголосовал за вас, если бы мог! - person Ahmad Moussa; 07.12.2019

Хотя решение вашей проблемы уже существует, я хочу ответить на ваш исходный вопрос, можете ли вы обучить свой дискриминатор пользовательскому обратному вызову внутри вашей комбинированной модели.

Простой ответ - Да.

Будьте осторожны при компиляции ваших моделей (Дискриминатор и комбинированная модель) и следуйте инструкциям, указанным здесь: https://github.com/keras-team/keras/issues/8585#issuecomment-385729276

Обратитесь к генератору подгонки или подгонки вашей комбинированной модели:

combined_model.fit_generator(train_loader, epochs, callbacks=[gan_callback])

gan_callback - это настраиваемый класс обратного вызова, перезаписывающий on_batch_end, где вы вызываете (как вы заявили)

def on_batch_end(self, batch_idx, logs=None):
    logs_disc = model_disc.train_on_batch(x, y)

Чтобы получить модель дискриминатора внутри вашего обратного вызова, либо укажите ее во время создания в качестве параметра, либо получите ее через унаследованную переменную self.model (model.layers).

Я считаю, что это элегантное решение, если вы хотите вывести свои потери и показатели в тензорную таблицу.

Внутри вашей функции on_batch_end в gan_callback у вас есть оба журнала (содержащие значения ваших потерь и метрики) прямо под рукой:

  • logs_disc от дискриминатора
  • журналы из генератора, которые являются параметром on_batch_end ()

В зависимости от вашей конфигурации это может привести к появлению предупреждения, которое можно проигнорировать:

UserWarning: Method on_batch_end() is slow compared to the batch update (0.151899).    Check your callbacks.
person SeVe    schedule 23.01.2020