Можно ли заставить обработчик событий ждать, пока не будет выполнен асинхронный/промис-код?

Я использую отличную библиотеку Papa Parse в режиме nodejs для потоковой передачи большого (500 МБ) CSV-файла с более чем 1 миллионом строк в API с медленным сохранением, который может принимать только один запрос за раз. API постоянства основан на Promises, но от Papa Parse я получаю каждую проанализированную строку CSV в синхронном событии, например так: parseStream.on("data", row => { ... }

Проблема, с которой я сталкиваюсь, заключается в том, что Papa Parse выгружает свои строки CSV из потока так быстро, что мой медленный API сохранения не может угнаться за ним. Поскольку Papa синхронный, а мой API основан на Promise, я не могу просто вызвать await doDirtyWork(row) в обработчике событий on, потому что синхронный и асинхронный код не смешиваются.

Или они могут смешиваться, а я просто не знаю, как?

Мой вопрос: могу ли я заставить обработчик событий Papa ждать завершения моего вызова API? Что-то вроде выполнения запроса к API сохранения непосредственно в событии on("data"), заставляя функцию on() каким-то образом задерживаться до тех пор, пока не будет выполнена грязная работа API?

Решение, которое у меня есть до сих пор, не намного лучше, чем использование папиного непотокового режима с точки зрения объема памяти. На самом деле мне нужно поставить в очередь поток событий on("data") в форме итераций функции генератора. Я мог бы также поставить в очередь фабрики обещаний в массиве и отработать их в цикле. В любом случае, я в конечном итоге сохраняю почти весь файл CSV в виде огромной коллекции будущих обещаний (фабрик обещаний) в памяти, пока мои медленные вызовы API не отработают полностью.

async importCSV(filePath) {
    let parsedNum = 0, processedNum = 0;

    async function* gen() {
        let pf = yield;
        do {
            pf = yield await pf();
        } while (typeof pf === "function");
    };

    var g = gen();
    g.next();


    await new Promise((resolve, reject) => {
        try {
            const dataStream = fs.createReadStream(filePath);
            const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
            dataStream.pipe(parseStream);

            parseStream.on("data", row => {

                // Received a CSV row from Papa.parse()

                try {
                    console.log("PA#", parsedNum, ": parsed", row.filter((e, i) => i <= 2 ? e : undefined)
                    );
                    parsedNum++;

                    // Simulate some really slow async/await dirty work here, for example 
                    // send requests to a one-at-a-time persistence API

                    g.next(() => {  // don't execute now, call in sequence via the generator above
                        return new Promise((res, rej) => {
                            console.log(
                                "DW#", processedNum, ": dirty work START",
                                row.filter((e, i) => i <= 2 ? e : undefined)
                            );
                            setTimeout(() => {
                                console.log(
                                    "DW#", processedNum, ": dirty work STOP ",
                                    row.filter((e, i) => i <= 2 ? e : undefined)
                                );
                                processedNum++;
                                res();
                            }, 1000)
                        })
                    
                    });
                } catch (err) {
                    console.log(err.stack);
                    reject(err);                    
                }
            });
            parseStream.on("finish", () => {
                console.log(`Parsed ${parsedNum} rows`);
                resolve();
            });

        } catch (err) {
            console.log(err.stack);
            reject(err);                    
        }
    });
    while(!(await g.next()).done);
}

Так зачем торопиться, папа? Почему бы не позволить мне работать с файлом немного медленнее — данные в исходном CSV-файле не исчезнут, у нас есть часы, чтобы закончить потоковую передачу, зачем забивать меня on("data") событиями, которые я не могу замедлить вниз?

Итак, что мне действительно нужно, так это чтобы Папа стал больше дедушкой и минимизировал или устранил любые очереди или буферизацию строк CSV. В идеале я мог бы полностью синхронизировать события парсинга Papa со скоростью (или ее отсутствием) моего API. Поэтому, если бы не догма о том, что асинхронный код не может заставить код синхронизации спать, я бы в идеале отправлял каждую строку CSV в API внутри события Papa, и только затем strong> вернуть контроль папе.

Предложения? Какая-то слабая связь обработчика событий с медлительностью моего асинхронного API — это тоже нормально. Я не возражаю, если несколько сотен строк будут поставлены в очередь. Но когда накапливаются десятки тысяч, у меня быстро кончается куча.


person blitter    schedule 05.09.2020    source источник
comment
используйте это, microservices.io/patterns/data/transactional-outbox.html   -  person Abhishek chandel    schedule 05.09.2020
comment
Спасибо, если я правильно понимаю ваше предложение, я должен записать поток строк CSV Папы в какой-нибудь исходящий ящик, например, ключ Redis, который, безусловно, может выполнять 1000 транзакций в секунду. Хотя Redis снова жрет оперативную память. Таким образом, единственный исходящий ящик, который не потребляет память, — это fs.writeFileSync() для промежуточных файлов JSON, которые я буду медленно импортировать позже. Довольно акробатика для того, что должно быть простым потоковым импортом CSV. LMK, если я правильно понял ваше предложение.   -  person blitter    schedule 05.09.2020
comment
transactional-outbox — это шаблон проектирования, обычно используемый для надежной доставки сообщений в несколько систем. Надежная реализация может дать гибкость в управлении тем, как вы делаете dirtyWork. Вы можете возобновить работу с того места, где вы остановились, в случае ошибок, чтобы вы могли исправить ошибки. Это разделяет вашу логику приема и отправки, что имеет свои преимущества. Согласен, что это не проще реализовать, и поэтому ничего надежного.   -  person Abhishek chandel    schedule 06.09.2020


Ответы (2)


Зачем забивать меня on("data") событиями, которые я не могу замедлить?

Можешь, ты просто не просила папу остановиться. Вы можете сделать это, вызвав stream.pause(), а затем stream.resume(), чтобы использовать встроенный противодавление потока Node.

Однако есть гораздо более удобный API, чем заниматься этим самостоятельно в коде на основе обратного вызова: использовать поток как асинхронный итератор! Когда вы await находитесь в теле цикла for await, генератор также должен останавливаться. Итак, вы можете написать

async importCSV(filePath) {
    let parsedNum = 0;

    const dataStream = fs.createReadStream(filePath);
    const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
    dataStream.pipe(parseStream);

    for await (const row of parseStream) {
        // Received a CSV row from Papa.parse()
        const data = row.filter((e, i) => i <= 2 ? e : undefined);
        console.log("PA#", parsedNum, ": parsed", data);
        parsedNum++;
        await dirtyWork(data);
    }
    console.log(`Parsed ${parsedNum} rows`);
}

importCSV('sample.csv').catch(console.error);

let processedNum = 0;
function dirtyWork(data) {
    // Simulate some really slow async/await dirty work here,
    // for example send requests to a one-at-a-time persistence API
    return new Promise((res, rej) => {
        console.log("DW#", processedNum, ": dirty work START", data)
        setTimeout(() => {
             console.log("DW#", processedNum, ": dirty work STOP ", data);
             processedNum++;
             res();
        }, 1000);
    });
}
person Bergi    schedule 05.09.2020
comment
Спасибо, я попробую это прямо сейчас. Если это работает, нам повезло, что мы имеем дело с потоком, который можно приостановить/возобновить. Хотя я сталкивался с этой проблемой раньше, когда события не происходили из потока. Они просто как-то создаются - есть советы, что делать в таких случаях? - person blitter; 05.09.2020
comment
@blitter Если события каким-то образом генерируются и нет способа их заблокировать, все, что вы можете сделать, это буферизовать их или найти лучший API для использования, который поддерживает противодавление. - person Bergi; 05.09.2020
comment
Спасибо — по шкале от 1 до 10, насколько мы уверены, что обратное давление просто не накапливает весь CSV в буфере потока узла, возвращая нас к исходной точке? Потому что некоторые потоки просто нельзя держать открытыми в течение нескольких часов или дней — представьте себе поток, который приходит из Интернета. Знаете ли вы какие-либо документы по этому поводу? Я протестирую ваше решение с файлом размером 2 ГБ и посмотрю, перейдет ли обратное давление в нехватку памяти... - person blitter; 05.09.2020
comment
@blitter Node readFileStream наверняка будет учитывать обратное давление, и я ожидаю, что парсер csv папы будет реализован правильно. Если ваш источник потока не поддерживает обратное давление (обратите внимание, что TCP на самом деле поддерживает) и вы не можете обрабатывать данные так же быстро, как они приходят, вы просто проиграли, и в какой-то момент ваше приложение рухнет. - person Bergi; 05.09.2020
comment
Это работает, чистый гений, спасибо! Ура для потоков. Я протестировал буферизацию 100 из 1000 строк, и использование памяти оставалось около 100 МБ, независимо от того, как долго оно работало. Одна идея, которую я попытаюсь замедлить непотоковые on() события: библиотека Fibers. Это своего рода мошенничество, поскольку оно скомпилировано на C++, но, эй, это лучше, чем создавать миллионы временных файлов и писать еще один цикл очереди для работы с ними... - person blitter; 05.09.2020
comment
@blitter Я не думаю, что использование волокон поможет или даже что-то упростит. Если источник асинхронных событий находится под вашим контролем, вам необходимо изменить его для поддержки обратного давления, если он не находится под вашим контролем, вам необходимо повысить скорость обработки. - person Bergi; 05.09.2020

Асинхронный код в JavaScript иногда может быть немного сложным для понимания. Важно помнить, как Node работает с параллелизмом.

Процесс node является однопоточным, но он использует концепцию, называемую циклом событий. Следствием этого является то, что асинхронный код и обратные вызовы по сути являются эквивалентными представлениями одного и того же.

Конечно, вам нужна асинхронная функция для использования await, но ваш обратный вызов из Papa Parse может быть асинхронной функцией:

parse.on("data", async row => {
  await sync(row)
})

Как только операция await завершится, функция стрелки завершится, и все ссылки на строку будут удалены, поэтому сборщик мусора сможет успешно собрать row, освободив эту память.

Результатом этого является одновременное выполнение sync каждый раз, когда анализируется строка, поэтому, если вы можете синхронизировать только одну запись за раз, я бы рекомендовал обернуть функцию синхронизации в средство устранения дребезга.

person MayorMonty    schedule 05.09.2020
comment
Просто имейте в виду, что с этим решением вы в конечном итоге создадите много обещаний, по 1 для каждого события данных, что резко замедлит ваше приложение. Лучший способ справиться с этим — использовать шаблон исходящих транзакций microservices.io/patterns/data/transactional -outbox.html. - person Abhishek chandel; 05.09.2020
comment
Несмотря ни на что, вы собираетесь создать 1 миллион обещаний - person MayorMonty; 05.09.2020
comment
Создать 1 миллион промисов не проблема, проблема в том, что большинство из них ждут одновременно. Неограниченный код, подобный приведенному выше, должен использовать промисы контролируемым образом. - person Abhishek chandel; 05.09.2020
comment
Во-вторых, простое добавление ключевого слова async и ожидание внутри обработчика не поможет. Потому что ваша потоковая передача не будет ждать разрешения обещания обработчика. - person Abhishek chandel; 05.09.2020
comment
Спасибо за ответ, я попытался реорганизовать обработчик on в async, но, как здесь прокомментировано, это не приводит к тому, что соответствующий await происходит на стороне Papa Parse. Его код высечен в камне. Нам все еще приходится иметь дело с 1 миллионом одновременно выполняемых промисов, которые разрушат мой API сохранения. В моем коде я уже сериализую выполнение промисов с помощью генератора — это мог бы быть и массив фабрик промисов — но независимо от того, весь смысл потокового CSV-процессора в том, что он НЕ загружает всю базу данных в память, будь то массивы или промисы, ожидающие выполнения. Идеи? - person blitter; 05.09.2020
comment
Должен существовать супер общий шаблон проектирования, который мне не хватает, чтобы как-то слабо связать время входящих событий с любой асинхронной медленной грязной работой, которую необходимо выполнять для каждого такого события. Должен быть законный способ остановить поток событий, чтобы у моего обрабатывающего асинхронного кода было время оставаться более или менее на вершине потока. Я не могу быть первым человеком, у которого есть эта проблема...? Я могу придумать неуклюжие обходные пути, такие как запись строки CSV через fs.writeFileSync() в другой (JSON?) буферный файл или, возможно, в сотни файлов меньшего размера для последующей обработки, но тогда зачем вообще использовать Papa? - person blitter; 05.09.2020