Одним из самых последних дополнений к ECMAScript / JavaScript являются генераторы асинхронности; они построены на основе обычных генераторов, чтобы добавить асинхронную функциональность. Если вы еще не знакомы с ними, генераторы - это специальные функции, которые могут «возвращать» несколько раз, используя инструкцию yield, подобную этой:

// Generator functions are declared with an asterisk (function*)
function* makeRangeGenerator(start, end) {
  let n = 0;
  for (let i = start; i < end; i++) {
    n += 1;
    yield i;
  }
  return n;
}
let rangeGenerator = makeRangeGenerator(0, 3);
console.log(rangeGenerator.next()); // { value: 0, done: false }
console.log(rangeGenerator.next()); // { value: 1, done: false }
console.log(rangeGenerator.next()); // { value: 2, done: false }
console.log(rangeGenerator.next()); // { value: 3, done: true }

Генераторные функции отличаются от обычных функций по крайней мере двумя фундаментальными способами:

  • Функция генератора может выдавать (~ возвращать) значения несколько раз без выхода и потери своего внутреннего состояния.
  • Вызывающий может отправить значения обратно в функцию генератора в точке, где она в последний раз выдала результат - генератор возобновит выполнение с этой точки, пока не встретит следующий результат.

Обратите внимание, что вы можете перебирать выходные данные генератора, используя цикл for-of следующим образом:

for (let integer of rangeGenerator) {
  console.log(integer);
}

^ Это немного чище, чем использование rangeGenerator.next() в обычном for цикле; в этом случае наша переменная integer будет исходным значением вместо объекта {value: ..., done: false}.

Иногда нам может потребоваться передать значения в генератор следующим образом:

function* pushPullGenerator() {
  let n = 0;
  while (true) {
    n = yield n * 2;
  }
}
let gen = pushPullGenerator();
console.log(gen.next());  // { value: 0, done: false }
console.log(gen.next(2)); // { value: 4, done: false }
console.log(gen.next(3)); // { value: 6, done: false }
console.log(gen.next(4)); // { value: 8, done: false }

^ Здесь мы передаем значения в генератор с помощью gen.next(...) вызовов. Обратите внимание, что первый вызов gen.next() не имеет аргументов; это потому, что первый yield в генераторе отличается от других; он больше похож на простой return оператор. Последующие вызовы gen.next(value) будут вытягивать указанное значение в функцию генератора (вместо последнего yield), а затем выталкивать из нее новое значение (в следующих yield ).

Придумывание вариантов использования генераторов поначалу может быть неочевидным. При чрезмерном использовании генераторы могут затруднить отслеживание кода. Однако при правильном использовании генераторы могут предложить некоторые уникальные возможности. В этом руководстве мы рассмотрим конкретный тип генератора; асинхронный генератор.

Асинхронные генераторы (как в async / await) похожи на обычные генераторы, за исключением того, что вместо выдачи нормальных значений они выдают Promise объектов, которые разрешаются асинхронно. Это позволяет нам делать некоторые полезные вещи; в частности, асинхронные генераторы идеальны для представления асинхронных потоков данных.

Если вы раньше использовали RxJS Observables, вы уже должны быть знакомы с идеей использования асинхронных потоков данных декларативным (реактивным) способом. Генераторы async могут использоваться как альтернатива Observables; одно из основных преимуществ использования генераторов async по сравнению с Observables состоит в том, что они сделают ваш код более похожим на обычный синхронный JavaScript; это должно сделать его более лаконичным и читаемым.

Вот пример асинхронного генератора, который передает некоторые целые числа:

function waitForDelay(delay) {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve();
    }, delay);
  });
}
async function* createRandomStream(randomness) {
  let n = 0;
  while (true) {
    let randomDelay = Math.round(Math.random() * randomness);
    await waitForDelay(randomDelay);
    yield n++;
  }
}
async function startConsumingStream() {
  let randomStream = createRandomStream(1000);
  for await (let value of randomStream) {
    console.log(value);
  }
}
startConsumingStream();

^ randomStream в данном случае представляет собой поток целых чисел, который увеличивается на 0, 1, 2, 3, 4,… - он случайен только в том смысле, что период задержки между каждой итерацией генератора будет случайным значением от 0 до 1000. миллисекунды; в результате вы можете видеть, как в журнале происходит случайное ускорение и замедление. Обратите внимание, что в этом случае мы использовали цикл for-await-of для перебора асинхронного генератора; это асинхронный эквивалент цикла for-of, который мы использовали ранее для перебора обычного генератора. Как вызов; вы можете попробовать изменить приведенный выше код, чтобы использовать тригонометрическую функцию, например Math.sin(...), для генерации значения задержки; это должно потребовать только однострочное изменение.

Случайные потоки - это весело, но как насчет чего-нибудь более полезного? Например; возможно, мы хотим, чтобы каждая итерация нашего генератора соответствовала событию; как получение сообщения от пользователя.

Мы можем сделать что-то вроде этого:

// If the receiveMessage function is called before we
// started consuming the data stream, then we don't care about that
// data. In this case the function will be a no-op.
let receiveMessage = function () {};
function waitForNextMessage() {
  return new Promise((resolve) => {
    receiveMessage = resolve;
  });
}
async function* createMessageStream() {
  while (true) {
    yield waitForNextMessage();
  }
}
async function startConsumingMessageStream() {
  let messageStream = createMessageStream();
  for await (let message of messageStream) {
    console.log(message);
  }
}
startConsumingMessageStream();
setTimeout(() => {
  receiveMessage('Hello');
}, 500);
setTimeout(() => {
  receiveMessage('world');
}, 1000);
setTimeout(() => {
  receiveMessage('!!!');
}, 3000);

^ Хитрость здесь в том, чтобы назначить функцию resolve из Promise в функции waitForNextMessage нашей собственной функции receiveMessage; таким образом, его можно вызвать в любое время из любого другого места в нашем коде - сообщение в этом случае запускается setTimeout, но оно также могло быть получено из любого другого источника, такого как событие из HTTP-запроса или сообщения WebSocket. Обратите внимание: поскольку мы используем async / await, бесконечное ожидание не требует затрат процессора.

Небольшая проблема…

Существует потенциальная проблема с описанным выше подходом; что произойдет, если вы попытаетесь вызвать receiveMessage(message) синхронно несколько раз? Нравится:

setTimeout(() => {
  // Call receiveMessage multiple times synchronously.
  receiveMessage('Hello');
  receiveMessage('A');
  receiveMessage('B');
  receiveMessage('C');
}, 500);
setTimeout(() => {
  receiveMessage('world');
}, 1000);
setTimeout(() => {
  receiveMessage('!!!');
}, 3000);

^ В этом случае мы пропустим сообщения «A», «B» и «C» - это потому, что Promise, возвращаемый нашей функцией waitForNextMessage, будет разрешен асинхронно на следующем тике. Поэтому, если мы вызываем receiveMessage несколько раз, находясь в одном стеке вызовов, в конечном итоге он будет пытаться разрешить один и тот же Promise несколько раз (вместо разных обещаний для каждого сообщения). Есть несколько способов обойти эту проблему. Один из подходов - изменить receiveMessage так, чтобы он буферизовал сообщения синхронно и разрешал обещание с буфером; таким образом мы можем продолжать синхронно помещать больше сообщений в буфер до того, как Promise завершит разрешение.

Решение может выглядеть так:

let receiveMessage = function () {};
function waitForNextMessageList() {
  let messageBuffer = [];
  return new Promise((resolve) => {
    receiveMessage = (message) => {
      messageBuffer.push(message);
      // Only resolve the promise for the first message in the
      // current call stack. After that we can keep pushing messages
      // to the buffer.
      if (messageBuffer.length === 1) {
        resolve(messageBuffer);
      }
    };
  });
}
async function* createMessageListStream() {
  while (true) {
    yield waitForNextMessageList();
  }
}
async function* createMessageStream() {
  let messageListStream = createMessageListStream();
  for await (let messageList of messageListStream) {
    for (let message of messageList) {
      yield message;
    }
  }
}
async function startConsumingMessageStream() {
  let messageStream = createMessageStream();
  for await (let messages of messageStream) {
    console.log(messages);
  }
}
startConsumingMessageStream();
setTimeout(() => {
  receiveMessage('Hello');
  receiveMessage('A');
  receiveMessage('B');
  receiveMessage('C');
}, 500);
setTimeout(() => {
  receiveMessage('world');
}, 1000);
setTimeout(() => {
  receiveMessage('!!!');
}, 3000);

Этот код стал немного сложнее после того, как мы добавили буферизацию, но если подумать с точки зрения потребителя потока, он все равно выглядит довольно просто:

async function startConsumingMessageStream() {
  let messageStream = createMessageStream();
  for await (let message of messageStream) {
    console.log(message);
  }
}
startConsumingMessageStream();

Удобное решение

Приведенный выше код может быть полезен в некоторых ситуациях, когда поток имеет только одного потребителя. Если мы вызовем startConsumingMessageStream() несколько раз, потребители будут соревноваться за каждое сообщение, и только одно из них будет работать. Это связано с тем, что наш поток в настоящее время ведет себя как очередь FIFO, поэтому каждый новый элемент, помещенный в него, может иметь только одного потребителя.

Для практических сценариев мы можем захотеть разрешить совместное использование потоков несколькими потребителями. Кроме того, мы должны предоставить эту функциональность через простой объект Iterable.

Опробовав несколько разных подходов, я остановился на решении на основе односвязных списков. Ознакомьтесь с WritableAsyncIterableStream: https://github.com/SocketCluster/writable-async-iterable-stream

Если вы хотите использовать асинхронные итерации в качестве альтернативы объектам EventEmitter, вы можете также проверить StreamDemux: https://github.com/socketcluster/stream-demux