Я использую отличную библиотеку Papa Parse в режиме nodejs для потоковой передачи большого (500 МБ) CSV-файла с более чем 1 миллионом строк в API с медленным сохранением, который может принимать только один запрос за раз. API постоянства основан на Promise
s, но от Papa Parse я получаю каждую проанализированную строку CSV в синхронном событии, например так: parseStream.on("data", row => { ... }
Проблема, с которой я сталкиваюсь, заключается в том, что Papa Parse выгружает свои строки CSV из потока так быстро, что мой медленный API сохранения не может угнаться за ним. Поскольку Papa синхронный, а мой API основан на Promise, я не могу просто вызвать await doDirtyWork(row)
в обработчике событий on
, потому что синхронный и асинхронный код не смешиваются.
Или они могут смешиваться, а я просто не знаю, как?
Мой вопрос: могу ли я заставить обработчик событий Papa ждать завершения моего вызова API? Что-то вроде выполнения запроса к API сохранения непосредственно в событии on("data")
, заставляя функцию on()
каким-то образом задерживаться до тех пор, пока не будет выполнена грязная работа API?
Решение, которое у меня есть до сих пор, не намного лучше, чем использование папиного непотокового режима с точки зрения объема памяти. На самом деле мне нужно поставить в очередь поток событий on("data")
в форме итераций функции генератора. Я мог бы также поставить в очередь фабрики обещаний в массиве и отработать их в цикле. В любом случае, я в конечном итоге сохраняю почти весь файл CSV в виде огромной коллекции будущих обещаний (фабрик обещаний) в памяти, пока мои медленные вызовы API не отработают полностью.
async importCSV(filePath) {
let parsedNum = 0, processedNum = 0;
async function* gen() {
let pf = yield;
do {
pf = yield await pf();
} while (typeof pf === "function");
};
var g = gen();
g.next();
await new Promise((resolve, reject) => {
try {
const dataStream = fs.createReadStream(filePath);
const parseStream = Papa.parse(Papa.NODE_STREAM_INPUT, {delimiter: ",", header: false});
dataStream.pipe(parseStream);
parseStream.on("data", row => {
// Received a CSV row from Papa.parse()
try {
console.log("PA#", parsedNum, ": parsed", row.filter((e, i) => i <= 2 ? e : undefined)
);
parsedNum++;
// Simulate some really slow async/await dirty work here, for example
// send requests to a one-at-a-time persistence API
g.next(() => { // don't execute now, call in sequence via the generator above
return new Promise((res, rej) => {
console.log(
"DW#", processedNum, ": dirty work START",
row.filter((e, i) => i <= 2 ? e : undefined)
);
setTimeout(() => {
console.log(
"DW#", processedNum, ": dirty work STOP ",
row.filter((e, i) => i <= 2 ? e : undefined)
);
processedNum++;
res();
}, 1000)
})
});
} catch (err) {
console.log(err.stack);
reject(err);
}
});
parseStream.on("finish", () => {
console.log(`Parsed ${parsedNum} rows`);
resolve();
});
} catch (err) {
console.log(err.stack);
reject(err);
}
});
while(!(await g.next()).done);
}
Так зачем торопиться, папа? Почему бы не позволить мне работать с файлом немного медленнее — данные в исходном CSV-файле не исчезнут, у нас есть часы, чтобы закончить потоковую передачу, зачем забивать меня on("data")
событиями, которые я не могу замедлить вниз?
Итак, что мне действительно нужно, так это чтобы Папа стал больше дедушкой и минимизировал или устранил любые очереди или буферизацию строк CSV. В идеале я мог бы полностью синхронизировать события парсинга Papa со скоростью (или ее отсутствием) моего API. Поэтому, если бы не догма о том, что асинхронный код не может заставить код синхронизации спать, я бы в идеале отправлял каждую строку CSV в API внутри события Papa, и только затем strong> вернуть контроль папе.
Предложения? Какая-то слабая связь обработчика событий с медлительностью моего асинхронного API — это тоже нормально. Я не возражаю, если несколько сотен строк будут поставлены в очередь. Но когда накапливаются десятки тысяч, у меня быстро кончается куча.
fs.writeFileSync()
для промежуточных файлов JSON, которые я буду медленно импортировать позже. Довольно акробатика для того, что должно быть простым потоковым импортом CSV. LMK, если я правильно понял ваше предложение. - person blitter   schedule 05.09.2020dirtyWork
. Вы можете возобновить работу с того места, где вы остановились, в случае ошибок, чтобы вы могли исправить ошибки. Это разделяет вашу логику приема и отправки, что имеет свои преимущества. Согласен, что это не проще реализовать, и поэтому ничего надежного. - person Abhishek chandel   schedule 06.09.2020