NodeJS: последовательный и асинхронный анализ данных

Я пишу приложение NodeJS, которое будет загружать данные из базы данных, анализировать их, а затем сохранять проанализированный результат в другой таблице базы данных. Вот что у меня есть на данный момент:

parse(index, from, to) {
    var collection = this.getCollectionName();
    var interval = global.Settings.Parser.ParseInterval;

    var promises = [];
    console.log('%d - %d', from, from + interval);
    for(from; from < to; from += interval) {
        promises.push(new Promise((resolve, reject) => {
            var scoped = from;
            this.data.query(collection, {[index]: { $gte: from, $lte: from + interval}, (result) => {
                for(var i = 0; i < result.length; i++)
                    this.sendToBuilder(result[i]);

                resolve();
            });
        }));
    }

    promises.reduce((promise) => {
        Promise.resolve()
    });
}

Код, кажется, делает то, что должен, но поскольку запрос к базе данных является асинхронным, кажется, что нарушение порядка является обычным явлением. Я не хочу, чтобы это произошло. Я хочу, чтобы каждый запрос и обещание выполнялись последовательно, чтобы поддерживать порядок данных.

Я пытаюсь использовать метод array.reduce(), чтобы попытаться связать каждое обещание для последовательного выполнения, но из-за природы обещаний он просто запускает обещание и продолжает работу, заставляя их все срабатывать одновременно.

Как я могу гарантировать, что он будет выполняться последовательно? Я не возражаю против задержек между каждым обещанием, если это не блокирует фактический поток.


person w0f    schedule 09.02.2018    source источник
comment
Вы просто хотите знать, когда все ваши запросы к базе данных будут выполнены, или вы действительно хотите запускать их в последовательном порядке (ожидая завершения одного перед запуском следующего)?   -  person jfriend00    schedule 10.02.2018
comment
@jfriend00 Предполагая, что для interval установлено значение 1000, я бы хотел, чтобы он запускал запрос, вызывал sendToBuilder(), а затем запускал следующий.   -  person w0f    schedule 10.02.2018
comment
Ну, самый простой способ сделать это - использовать await внутри вашего цикла, но мне нужно знать, какую базу данных вы используете (чтобы использовать для нее интерфейс на основе обещаний), и является ли this.sendToBuilder() асинхронным или нет, и если это асинхронный, как вы узнаете, когда это будет сделано.   -  person jfriend00    schedule 10.02.2018
comment
@ jfriend00 Я использую MongoDB с коннектором NodeJS. Я написал для него оболочку под названием DataClient, в которой есть очередь, в которой хранятся запросы и другие операции во время подключения к базе данных. Когда соединение завершено, очередь выполняется.   -  person w0f    schedule 10.02.2018
comment
А как насчет другой части моего вопроса? Является ли this.sendToBuilder() асинхронным? Если да, то как узнать, что он завершен?   -  person jfriend00    schedule 10.02.2018
comment
Кроме того, вы this.data.query() не можете сообщить об ошибке.   -  person jfriend00    schedule 10.02.2018
comment
@ jfriend00 sendToBuilder() - это просто функция форматирования, которая допускает полиморфизм класса ES6, когда данные из нескольких источников могут быть проанализированы с помощью наследующего класса, форматирующего данные в общем виде. this.data.query() — это функция-оболочка, которая выдает ошибку, если она присутствует.   -  person w0f    schedule 10.02.2018
comment
Как он может выдать ошибку, если он асинхронный? Извините, но я не могу предоставить вам хороший способ сделать то, что вы пытаетесь сделать, не разбираясь в асинхронных интерфейсах, которые вы пытаетесь использовать. Это было бы намного проще, если бы я мог напрямую использовать интерфейс обещания mogodb.   -  person jfriend00    schedule 10.02.2018
comment
@jfriend00 jfriend00 Я действительно могу использовать интерфейс MongoDB напрямую, если у вас есть решение сделать это таким образом. Мне просто нужно было бы включить способ убедиться, что он уже подключен, прежде чем выполнять что-либо, что было бы легко сделать всего несколькими строками.   -  person w0f    schedule 10.02.2018


Ответы (1)


Вот рабочий пример в духе вашего кода с использованием async/await:

function getData(reqId, collection, index, gte, lte) {
    return new Promise((resolve, reject) => {
        const delay = Math.floor(Math.random() * 2000) + 1;
        console.log(`[getData ${reqId}] delay: ${delay}`);

        const params = { collection, [index]: { $gte: gte, $lte: lte } };
        const results = [0, 1, 2].map(r => `result ${reqId}.${r}`);

        setTimeout(() => {
            console.log(`[getData ${reqId}] this.data.query(${JSON.stringify(params)})`);
            resolve(results);
        }, delay);
    });
}

async function parse(index, from, to) {
    const collection = 'My Collection';
    const interval = 10;

    console.log(`Processing ${from} to ${to} by ${interval}:`);

    for (from; from <= to; from += interval) {
        const reqId = from;
        console.log(`[parse] BEGIN ${reqId}`);
        const results = await getData(reqId, collection, index, from, from + interval);
        results.forEach(result => {
            console.log(`[parse - awaited ${reqId}] this.sendToBuilder(${result})`);
        });
        console.log(`[parse] END ${reqId}`);
    }
}

parse('idx', 200, 250);

person rob3c    schedule 09.02.2018
comment
Обещание — это просто монитор для уже запущенной асинхронной операции. Если у вас уже есть массив промисов (что подразумевает ваш код), то операции не выполняются последовательно, поскольку все они уже запущены. - person jfriend00; 10.02.2018
comment
lol oops - я ответил слишком быстро сегодня днем. Я хорошо разбираюсь в промисах и постоянно использую эту конструкцию (правильно для of с функциями async/await и promise, а не с уже запущенными). Спасибо, что указали на это. - person rob3c; 10.02.2018
comment
@jfriend00 Код был обновлен до рабочего фрагмента. Вы видите, что еще чего-то не хватает? - person rob3c; 11.02.2018