Обновить QProgressBar из нескольких QThreads

Я нашел в Интернете несколько руководств, в которых объясняется, как обновить QProgressBar во время долгих вычислений. Один из них: использовать QThread для вычисления, а затем испустить сигнал, который подключен к progressBar.setValue(int).

Я думал, что это также должно работать для нескольких QThread, которые работают одновременно, но что-то работает неправильно.

Итак, вот что я делаю: я хочу вычислить траектории нескольких частиц (каждая с длинным циклом). Чтобы использовать многоядерную обработку, я создаю QThread для каждой из этих частиц и позволяю ей вызывать соответствующий метод расчета. Это работает нормально, используются все ядра, и расчет завершается примерно за четверть времени, чем раньше.

Я написал класс Worker на основе этого руководства http://mayaposch.wordpress.com/2011/11/01/how-to-really-truly-use-qthreads-the-full-explanation/. Заголовок выглядит так: (worker.h)

#include <world.h>

class Worker: public QObject
{
    Q_OBJECT

public:
    explicit Worker(World *world = 0, double deltaTau = 0., double maxDist = 0., double iterations = 0., double index = 0);

public slots:
    void process();

signals:
    void finished();
    void eror(QString err);

private:
    World *w;
    double my_deltaTau;
    double my_maxDist;
    int my_iterations;
    int my_index;
};

И источник вроде этого: (worker.cpp)

#include "worker.h"

Worker::Worker(World *world, double deltaTau, double maxDist, double iterations, double index)
{
    w = world;
    my_deltaTau = deltaTau;
    my_maxDist = maxDist;
    my_iterations = iterations;
    my_index = index;
}

void Worker::process()
{
    w->runParticle(my_deltaTau, my_maxDist, my_iterations, my_index);
    emit finished();
}

В world.cpp у меня есть функция run, которая запускает все потоки, и функцию runParticle, которая вызывается Worker:

void World::run(double deltaTau, double maxDist, int iterations)
{
    globalProgress = 0;
    for (int j = 0; j < particles->size(); j++) { //loop over all particles
        QThread *thread = new QThread;
        Worker *worker = new Worker(this, deltaTau, maxDist, iterations, j);
        worker->moveToThread(thread);
        connect(thread, SIGNAL(started()), worker, SLOT(process()));
        connect(worker, SIGNAL(finished()), thread, SLOT(quit()));
        connect(worker, SIGNAL(finished()), thread, SLOT(deleteLater()));
        connect(thread, SIGNAL(finished()), worker, SLOT(deleteLater()));
        thread->start();
    }
}

void World::runParticle(double deltaTau, double maxDist, int iterations, int index)
{
    for (int i = 0; i < iterations; i++) { //loop over iteration steps
        if (i % 1000 == 0) { //only update the progress bar every 1000th iteration
            emit updateProgress(++globalProgress);
            qApp->processEvents(); // <--- I added this line, no effect!
        }
        [...] // <--- do my calculations for the particle's trajectories
    }
}

Публичный слот updateProgress(int) вызывается здесь каждую 1000-ю итерацию. Он подключен к QProgressBar в моем MainWindow следующим образом:

progressBar->setValue(0);
progressBar->setMaximum(nrPart * iter / 1000); //number of particles * number of iteration steps / 1000
connect(world, SIGNAL(updateProgress(int)), progressBar, SLOT(setValue(int)));
world->run(timeStep, dist, iter);

Моя проблема в том, что индикатор выполнения не перемещается, пока все вычисления не будут завершены, а затем я вижу, что он довольно быстро переходит на 100%.

Кто-нибудь видит мою ошибку или знает как правильно это сделать?

ИЗМЕНИТЬ

Я внес следующие изменения:

(worker.h)

#include "world.h"

class Worker: public QObject
{
    Q_OBJECT

public:
    explicit Worker(World *world = 0, Particle *particle = 0, QList<MagneticField> *bfields = 0, double deltaTau = 0., double maxDist = 0., int iterations = 0);

public slots:
    void process();

signals:
    void finished();
    void updateProgress(int value);
    void ProcessParticle();
    void eror(QString err);

private:
    int i;
    Particle *p;
    QList<MagneticField> *magneticFields;
    double my_deltaTau;
    double my_maxDist;
    int my_iterations;
};

(worker.cpp)

#include "worker.h"

Worker::Worker(World *world, Particle *particle, QList<MagneticField> *bfields, double deltaTau, double maxDist, int iterations)
{
    i = 0;
    const World *w = world;
    p = particle;
    magneticFields = bfields;
    my_deltaTau = deltaTau;
    my_maxDist = maxDist;
    my_iterations = iterations;
    connect(this, SIGNAL(updateProgress(int)), w, SLOT(updateTheProgress(int)));
    connect(this, SIGNAL(ProcessParticle()), this, SLOT(process()), Qt::QueuedConnection);
}

void Worker::process()
{
    const int modNr = my_iterations / 1000;
    QDateTime start = QDateTime::currentDateTime();
    while (i < my_iterations) { //loop over iteration steps
        [...] // <--- do my calculations
        //handle progress
        emit updateProgress(1);
        if (QDateTime::currentDateTime() > start.addMSecs(300)) {
            emit ProcessParticle();
            ++i; //ensure we return to the next iteration
            return;
        }
        i++;
    }
    qDebug() << "FINISHED"; // <--- I can see this, so finished() should be emitted now...
    emit finished();
}

(часть world.h)

public slots:
    void threadFinished();
    void updateTheProgress(int value);

signals:
    void updateProgress(int value);

(часть world.cpp)

void World::threadFinished()
{
    particleCounter++;
    qDebug() << "particles finished: " << particleCounter; // <--- this is NEVER called !?!?
    if (particleCounter == particles->size()) {
        hasRun = true;
    }
}

void World::updateTheProgress(int value)
{
    globalProgress += value;
    emit updateProgress(globalProgress);
}

void World::run(double deltaTau, double maxDist, int iterations)
{
    globalProgress = 0;
    particleCounter = 0;
    hasRun = false;
    for (int i = 0; i < particles->size(); i++) { //loop over all particles
        QThread *thread = new QThread;
        Worker *worker = new Worker(this, &(*particles)[i], bfields, deltaTau, maxDist, iterations);
        worker->moveToThread(thread);
        connect(thread, SIGNAL(started()), worker, SLOT(process()));
        connect(worker, SIGNAL(finished()), thread, SLOT(quit()));
        connect(worker, SIGNAL(finished()), thread, SLOT(deleteLater()));
        connect(worker, SIGNAL(finished()), this, SLOT(threadFinished())); // <--- this connection SHOULD make sure, I count the finished threads
        connect(thread, SIGNAL(finished()), worker, SLOT(deleteLater()));
        thread->start();
    }
}

(где-то в MainWindow.cpp)

progressBar->setValue(0);
progressBar->setMaximum(nrPart * iter);
connect(world, SIGNAL(updateProgress(int)), progressBar, SLOT(setValue(int)));
world->run(timeStep, dist, iter);
while (!world->hasBeenRunning()) {} //wait for all threads to finish

Как я отметил в приведенном выше коде, я никогда не получаю уведомления, когда потоки завершаются, и я оказываюсь в бесконечном цикле в MainWindow. Что-то не так со связями World ‹-> Worker?


person Bianfable    schedule 11.06.2014    source источник
comment
Тот факт, что Worker, который живет в отдельном потоке, вызывает метод World, который живет в основном потоке и пытается послать сигнал, заставляет меня нервничать. Сигнал updateProgress действительно должен быть сигналом для рабочего, чтобы вызов connect распознал его как межпоточный сигнал. Не уверен, что это проблема, но из того, что я знаю о потоковой модели Qt, ваш код не кошерный.   -  person Sebastian Redl    schedule 11.06.2014
comment
Ваши классы World и Worker кажутся очень тесно связанными. Подумайте о перепроектировании вашего World класса, чтобы он ничего не знал о вашем Worker классе и ничего не об итерациях. Переместите итерации и излучение сигнала в класс Worker, а затем подключите его к индикатору выполнения.   -  person RobbieE    schedule 11.06.2014


Ответы (2)


ИМО, это неправильно. Создание цепочки дорого обходится, и вы хотите создать их много. Прежде всего, вы должны использовать QThreadPool, ваш случай точно соответствует функциональности этого класс.

Существуют также методы QtConcurrent, которые резко сокращают код стандартной пластины ( это заброшенная функция Qt, поэтому рекомендуется использовать QThreadPool, но вы можете попробовать, что она работает достаточно хорошо).

person Marek R    schedule 12.06.2014
comment
Никогда раньше не слышал о QThreadPool, но мне кажется, что я должен использовать его здесь. Пока я моделирую только 100 частиц, то есть создаю 100 потоков за раз, и это еще не дает сбоев. Если я попробую 1000 частиц, он сразу вылетит. Надеюсь, QThreadPool решит эту проблему ... - person Bianfable; 12.06.2014
comment
@Bianfable, я только что это заметил. Проблема здесь не в использовании QThread, а в том, как вы его используете. Вам не нужно больше потоков, чем ядер процессора, и от этого не будет никакой пользы. Не создавайте по одному потоку на частицу. Вместо этого переместите несколько рабочих объектов частиц в один поток. - person TheDarkKnight; 13.06.2014
comment
Хорошо, теперь я переписал в основном весь свой код, чтобы сделать следующее: из MainWindow я вызываю world- ›run (...), который затем использует QtConcurrent :: map (...), чтобы начать мои вычисления в правильном количество потоков. Затем я использую QFutureWatcher, чтобы получить прогресс и показать его в QProgressDialog, как описано здесь: - person Bianfable; 13.06.2014

Проблема в том, что для обработки излучаемого сигнала необходимо разрешить запуск цикла событий в новом потоке. Если оставаться в цикле for в runParticle, этого не произойдет, пока функция не завершится.

Есть грубый способ исправить это - вызывать QApplication :: processEvents время от времени во время вашего цикла.

Лучшим способом было бы изменить дизайн объекта так, чтобы обрабатывать несколько итераций, прежде чем выйти и позволить циклу событий работать естественным образом.

Итак, чтобы создать временной интервал для обработки, в вашем цикле for вы должны указать, сколько времени занимают итерации. Если время превысило 1/30 секунды, вызовите сигнал с типом QueuedConnection, чтобы снова вызвать функцию вашего слота и выйти из цикла for.

QueuedConnection гарантирует, что все события будут обработаны, а затем ваша функция будет вызвана снова.

Предполагая, что runParticle - это слот: -

void Worker::runParticle(...)
{
    static int i = 0;

    QDateTime start = QDateTime::currentDateTime();

    for(i; i < iteration; ++i)
    {
        // do processing


        emit updateProgress(++globalProgress);

        // check if we've been here too long
        if(QDateTime::currentDateTime() > start.addMSecs(300))
        {
          emit ProcessParticle(); // assuming this is connected to runParticle with a Queued Connection
          ++i; // ensure we return to the next iteration
          return;
        }
    }
}

С другой стороны, когда объект перемещается в новый поток, все его дочерние элементы также перемещаются, где дочерний элемент является частью иерархии QObject.

Если объект Worker содержит указатель на World, он напрямую вызывает функцию runParticle World, которая все еще находится в основном потоке. Хотя это небезопасно, это также означает, что функция runParticle обрабатывается в основном потоке.

Вам нужно переместить функцию runParticle в объект Worker, который находится в новом потоке.

person TheDarkKnight    schedule 11.06.2014
comment
Как видите, я уже пробовал использовать qApp- ›processEvents (), а QApplication :: processEvents () ничего не меняет в цикле. Другое ваше предложение звучит интересно, но я не знаю, как установить соединение в очереди !? - person Bianfable; 11.06.2014
comment
Qt :: QueuedConnection - это параметр, который вы передаете для вызова для соединения. См. qt-project.org/doc/qt-4.8/qobject .html # connect-3. - person TheDarkKnight; 11.06.2014
comment
Итак, я делаю runParticle слотом и добавляю новый сигнал ProcessParticle, связанный с этим аргументом? И где мне их подключить? - person Bianfable; 11.06.2014
comment
Я отредактировал свой ответ. Пожалуйста, прочтите ниже пример кода. Что касается соединения, вы должны сделать это непосредственно в конструкторе класса Worker: connect (this, & Worker :: ProcessParticles, this, & Worker :: runParticle, Qt :: QueuedConnection); - person TheDarkKnight; 11.06.2014
comment
Ах, хорошо, но это немного раздражает, так как вычисление требует доступа к множеству вещей из класса Particle (который я не показал), у которого у меня есть QList в World. Тогда я попробую переместить это в Worker ... - person Bianfable; 11.06.2014
comment
1. Нет необходимости указывать Qt::QueuedConnection при установлении соединений между объектами, живущими в разных потоках. Qt сделает это за вас автоматически. 2. Передача сигнала в данном потоке не требует выполнения цикла обработки событий в том же потоке. В этом весь смысл сигналов. Вы можете испускать их из собственных потоков, через прокладки из кода C и т. Д. Только получающие объекты должны жить в потоке с запущенным циклом событий iff поток отличается от потока отправителя. - person Kuba hasn't forgotten Monica; 11.06.2014
comment
@KubaOber, это адресовано мне или OP? Я указал QueuedConnection, потому что объект отправляет сигнал самому себе и должен вернуться к событиям обработки, прежде чем продолжить. - person TheDarkKnight; 12.06.2014
comment
Вы подключаете слот finished () к слоту quit () потока. Если это вызывается до threadFinished (), тогда нет цикла событий для обработки функции threadFinished. Я предлагаю вам вызвать threadFinished в конце Process (), а затем передать finished () в threadFinished (). - person TheDarkKnight; 12.06.2014
comment
Хорошо, сейчас я вызываю threadFinished в конце процесса (), и он работает. Спасибо за это;) Однако индикатор выполнения по-прежнему не перемещается, пока не завершатся все потоки!?! - person Bianfable; 12.06.2014
comment
В Worker :: process вы испустили updateProgress (1); Я не думаю, что постоянный аргумент 1 здесь поможет. - person TheDarkKnight; 13.06.2014
comment
UpdateProgress в Worker связан с updateTheProgress в World, который затем подсчитывает переменную globalProgress для World и выдает updateProgress с этим globalProgress. Может быть, поэтому он снова не работает, потому что я выбираю этот обходной путь с классом World, но как еще я могу узнать globalProgress в Worker (есть несколько потоков, работающих одновременно)? - person Bianfable; 13.06.2014