Во всех хороших программах наступает время, когда компоненты или подсистемы должны перестать напрямую взаимодействовать друг с другом.

Перевозка должна стать первоклассной.

- Рич Хикки: core.async дебют

Зависимости

(ns core-async.core
  (:require  [cljs.core.async :refer [chan put! take! >! <! buffer dropping-buffer sliding-buffer timeout close! alts!]]
             [cljs.core.async :refer-macros [go go-loop alt!]])
  (:use [clojure.repl :only (source)]))

Канонический пост

Это кросс-пост от GitHub. Если подсветка синтаксиса Medium вас разочаровывает, ознакомьтесь с каноническим постом:



Репозиторий с примером кода:

Вы можете найти весь код рабочих примеров в этом репо.

Вступление

Каналы - это основа core.async. Как и в Clojure (Script), неизменяемые коллекции данных являются основой, каналы можно рассматривать как особый вид сбора данных. Канал в некотором смысле похож на вектор (или массив в JavaScript). Как и векторы, каналы представляют собой совокупности, которые принимают значения с одного конца (восходящий поток) и выгружают их с другого (нисходящего). Это также известно как обработка по принципу «первым пришел - первым ушел» (FIFO). Однако, в отличие от векторов (или массивов), каналы передают только одно значение за раз (асинхронно).

В моем представлении я вижу вектор как двумерный блок, содержимое которого (существующее синхронно) растягивает прямоугольник по оси x. В то время как с очередью (каналом) это трехмерный блок с новыми значениями, растягивающий прямоугольник по оси z (асинхронно во времени). Однако, в отличие от векторов, core.async каналы могут не только содержать значения внутри своего «бокса» (буфера), они могут выполнять операции, стремящиеся поместить или взять из канала, ждать в строке - вне бокса в очереди - вдоль этого z- оси без необходимости разработчика использовать обратные вызовы, чтобы выровнять их! Это обеспечивает огромное удобство для асинхронного программирования. Как мы увидим в следующих примерах, эти особенности core.async абстрагируют сложность обратных вызовов, позволяя вам писать асинхронные программы в блаженно синхронной манере.

Не поймите меня неправильно. Это блаженство имеет свою цену. Это нетривиальная кривая обучения. Но я надеюсь, что после ознакомления с приведенными здесь примерами и, что немаловажно, пробуждения, у вас будут основы, которые помогут вам подготовиться к их использованию.

core.asyc превосходит другие асинхронные средства во многих отношениях, но вот лишь несколько основных моментов:

  • Пишите асинхронный код, как если бы он был синхронным
  • Они быстрее обещаний.
  • Процессы первоклассные: вы можете использовать core.async каналов не только для передачи данных / значений, но и для передачи каналов! Эта функция позволяет вам брать процессы и декомпозировать их для выполнения невероятно сложных вещей всего в нескольких строках кода.
  • Обращайтесь с данными ввода-вывода канала так же, как с любой другой коллекцией в ClojureScript. Позволяет вам изучить один набор операций (например, map, filter и т. Д ... с использованием преобразователей), чтобы управлять ими всеми!

Этот последний аргумент был для меня решающим аргументом. Идея изучения одного набора операций, которые можно было бы использовать во всем моем коде, как асинхронном, так и синхронном, таким образом, чтобы улучшить мои навыки работы с языком (а не только с библиотекой), кажется более эффективной ИМХО .

Кроме того, если вы выходите на свет через JavaScript, прочтите эти две статьи из великого Бобби Шульца:

Различия в core.async между Clojure и ClojureScript

Есть несколько различий между функциями core.async в Clojure и ClojureScript по необходимости. Хотя Clojure имеет потоки, Node / JavaScript имеет только один поток, поэтому любая операция блокировки потоков по умолчанию не будет доступна в реализации ClojureScript.

Грубо говоря, все, что имеет «блокирующую» семантику в clojure.core.async (например, что-либо с двумя !!), не будет доступно в cljs.core.async. Не волнуйтесь, это все еще волшебно для виртуальной машины JavaScript. Вот увидишь!

Хватит разговоров. Давайте начнем.

Основы работы с каналом

В этих примерах мы воспользуемся аналогией, чтобы помочь в объяснении. Допустим, у нас есть суши-бар. Для начала мы будем использовать core.async только для обработки заказов от клиентов ...

Одноразовые асинхронные заказы на chan с put! и take!

Давайте посмотрим на слайд о канале Интерфейс поставщика услуг (SPI) из презентации Рича Хики и его взаимосвязи между другими частями вашей программы (SPI = интерфейс поставщика услуг):

Начнем с трех частей SPI: chan, put! и take!.

Документы:

(source chan)
(defn chan
  "Creates a channel with an optional buffer, an optional transducer (like `(map f)`, `(filter p)` etc or a composition thereof), and an optional exception handler. If `buf-or-n` is a number, will create and use a fixed buffer of that size. If a transducer is supplied a buffer must be specified. `ex-handler` must be a fn of one argument - if an exception occurs during transformation it will be called with the thrown value as an argument, and any non-nil return value will be placed in the channel."
  ([] (chan nil))
  ([buf-or-n] (chan buf-or-n nil nil))
  ([buf-or-n xform] (chan buf-or-n xform nil))
  ([buf-or-n xform ex-handler]
   (let [buf-or-n (if (= buf-or-n 0)
                    nil
                    buf-or-n)]
     (when xform (assert buf-or-n "buffer must be supplied when transducer is"))
     (channels/chan (if (number? buf-or-n)
                      (buffer buf-or-n)
                      buf-or-n)
                    xform
                    ex-handler))))

Канал core.async (chan) может принимать от 0 до 3 аргументов. Для начала мы будем использовать простой chan, который создает безбуферный канал без преобразований значений или обработчиков исключений, который заставляет канал ставить в очередь любые отложенные операции вставки или приема вместо того, чтобы разрешать операциям размещения передавать значения в канал. Такой канал лучше всего использовать для простых транзакций. Давайте воспользуемся аналогией с приемом заказов по телефону в нашем суши-баре.

Создайте базовый канал:

(def bufferless-chan (chan))

оценка

(put!
 bufferless-chan ; channel
 "Futo Maki" ; order (data)
 #(prn (str "order put? " %))) ; put! callback

положите 1) = ›

true

eval:

(put!
 bufferless-chan
 "Vegan Spider"
 #(prn (str "order put? " %)))

положите 2) = ›

true

eval:

(take!
  bufferless-chan
  #(prn (str "order taken: " %))) ; take! callback

возьмите 1) = ›

"order taken: Futo Maki"
"order put? true"
nil

возьмите 2) = ›

"order taken: Vegan Spider"
"order put? true"
nil

возьмите 3) = ›

nil

eval последний раз! снова функция - положите 3) = ›

"order put? true"
"order taken: Vegan Spider"
true

Обратите внимание на организацию журналов обратных вызовов для наших заказов. Давайте покопаемся в источнике для put! и take!, чтобы узнать, что здесь происходит:

Документы:

(source put!)
(defn put!
  "Asynchronously puts a `val` into `port`, calling `fn1` (if supplied) when complete. `nil` values are not allowed. Will throw if closed. If `on-caller?` (default `true`) is `true`, and the put is immediately accepted, will call `fn1` on calling thread.  Returns `nil`."
  ([port val
     (if-let [ret (impl/put! port val fhnop)]
       @ret
       true)])
  ([port val fn1] (put! port val fn1 true))
  ([port val fn1 on-caller?
     (if-let [retb (impl/put! port val (fn-handler fn1))]
       (let [ret @retb]
         (if on-caller?
           (fn1 ret)
           (dispatch/run #(fn1 ret)))
         ret)
       true)]))
(source take!)
(defn take!
 "Asynchronously takes a `val` from `port`, passing to `fn1`. Will pass `nil` if closed. If `on-caller?` (default `true`) is `true`, and value is immediately available, will call `fn1` on calling thread. Returns `nil.`"
 ([port fn1] (take! port fn1 true))
 ([port fn1 on-caller?
    (let [ret (impl/take! port (fn-handler fn1))]
      (when ret
        (let [val @ret]
          (if on-caller?
            (fn1 val)
            (dispatch/run #(fn1 val)))))
      nil)]))

Такое же поведение обратного вызова присутствует в обеих асинхронных операциях. Если on-caller? (по умолчанию true) - true, и значение сразу же принято (для put!) Или доступно (для take!), Вызовет fn1 в вызывающем потоке.

Когда put! совпадает с ожидающим take! (звонки клиентов - ›повар отвечает на телефон), сначала запускается обратный вызов take! (« Спасибо, что позвонили в Conveyor Sushi! Могу я принять ваш заказ? »-› заказ передан) и nil возвращен (подтверждающий, что заказ был доставлен take!). И наоборот - когда take! встречается с ожидающим put!, первым запускается обратный вызов put! (повар звонит клиенту - ›клиент отвечает на телефонный звонок« Привет? »-› заказ передается) и возвращается true (подтверждая, что заказ был выполнен на put!).

За один раз передается только один заказ. Всем остальным придется подождать.

Это объясняет поведение обратного вызова наших операций put и take, описанных выше.

Если мы хотим изменить поведение этих обратных вызовов, мы можем установить для параметра on-caller значение false.

Теперь, когда мы знаем, как и когда запускаются эти обратные вызовы, давайте создадим несколько вспомогательных утилит, чтобы абстрагироваться от них. Мы будем использовать их в оставшихся примерах.

Заказы Utils

Создайте несколько обратных вызовов журналирования, чтобы включить их в наши операции ввода и вывода:

(defn take-logger [order]
  (prn (str "order taken: " order)))
(defn put-logger [boolean]
  (prn (str "order put? " boolean)))

Затем мы создадим ярлык для функций put и take, которым будет назначен канал и обратный вызов журналирования.

(defn put!-order [channel order]
  (put! channel order put-logger))
(defn take!-order [channel]
  (take! channel take-logger))

Переполнение робо-заказов

Что произойдет, если мы получим один из этих печально известных роботов-дозвонщиков, который просто рассылает спам по телефонной линии, пытаясь продать нам таймшер? Что ж, с нашим простым chan наша строка будет переполняться.

(def bufferless-chan (chan))
(defn bot-orders [channel order]
  (dotimes [x 1030]
    (put!-order channel order)))

оценка

(bot-orders bufferless-chan "Sushi!")

=>

Error: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer. (< (.-length puts) impl/MAX-QUEUE-SIZE)

Давайте посмотрим на еще один слайд об анатомии канала из презентации Рича Хикки, который фокусируется на внутренней работе деталей реализации core.async канала:

Вы видите там три маленьких столика:

  1. Очередь обработчиков puts с соответствующим data
  2. Необязательный buffer из data
  3. Очередь обработчиков takes

До сих пор мы использовали безбуферный канал, поэтому все наши операции put выстраивались «вне» нашего канала - таблица (1) - ожидая передачи получателями на другой стороне.

Создатели core.async сочли благоразумным (и мне кажется разумным) ограничить каналы в том, сколько ожидающих операций ввода / вывода разрешено ставить в очередь (макс. = 1024 ожидающих операции). Итак, как мы можем справиться с таким трафиком? Один из способов - использовать буферизованный канал.

Буферы

Робо-заказы с фиксированным буфером

Документы:

(source buffer)
(defn buffer
  "Returns a fixed buffer of size `n`. When full, puts will block/park."
  [n]
  (buffers/fixed-buffer n))

Есть два способа создать канал с фиксированным буфером. Один из них - явно использовать функцию buffer. Другой - просто передать целое число в качестве аргумента базовому каналу, например:

(def fixed-chan (chan 10)) ; buffer = 10 values

eval по желанию:

(bot-orders fixed-chan "Sushi!")
;=> "order put? true"
;=> "order put? true"
;=> "order put? true"
...7 more
nil

Мы видим, что 10 из наших пут были немедленно приняты каналом (в результате чего их обратные вызовы немедленно запускались здесь), в то время как остальным придется ждать в очереди помещений для будущих (асинхронных) приемов. Мы смогли сделать это в данном конкретном случае, потому что мы получили ордер 1030 и у нас был буфер, в котором хранилось 10 значений, что позволяло обрабатывать и завершать эти 10 операций ввода (1030 операций ввода - 10 операций ввода, потребленных буфером = 1020 отложенных операций ввода ‹1024 макс. ).

Вы можете думать о фиксированном буфере как о службе голосовой почты для наших заказов на суши. Или - если хотите - систему онлайн-заказов, которая сохраняет заказы (в данном случае с объемом памяти 10 заказов). Вместо того, чтобы заставлять всех клиентов ждать совпадения на другой стороне, буфер позволяет хранить первые 10 заказов «внутри» канала. Это позволит первым 10 клиентам сделать заказ и вернуться к своей жизни. Эти удачливые ублюдки.

eval по желанию:

(take!-order fixed-chan)

занимает 1–1020) = ›

"order taken: Sushi!"
"order put? true"
nil

возьмите 1021) = ›

"order taken: Sushi!"
nil

Обратные вызовы последних 10 пут уже активированы.

Теперь это работает, но давайте будем франшизой, заказы которой поступают со всего мира! Международный Дом Суши! Звучит ... ужасно на самом деле, но - для наглядности давайте сделаем это.

Чтобы мы могли отслеживать, что происходит с каналом в следующих примерах, давайте создадим новую put!-n-order функцию для включения номера заказа:

(defn put!-n-order [channel order n]
  (put! channel (str "#: " n " order: " order) put-logger))
(defn IHOS-orders [channel order]
  (dotimes [x 2100] lots-o'-orders
    (put!-n-order channel order x)))

обновить (переоценить) наш фиксированный чан

(def fixed-chan (chan 10)) ; buffer = 10 put values

eval:

(IHOS-orders fixed-chan "Nigiri!")

=>

"order put? true"
"order put? true"
"order put? true"
"order put? true"
"order put? true"
"order put? true"
"order put? true"
"order put? true"
"order put? true"
"order put? true"
...
Error: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer. (< (.-length puts) impl/MAX-QUEUE-SIZE) ...

物 の 哀 れ Произошло то, что мы достигли предела ожидающих операций на нашем канале за вычетом емкости нашего буфера (2100 пут - 10 буферов = 2090 отложенных операций> 1024 макс.).

eval:

(take!-order fixed-chan)

=>

"order taken: #: 0 order: Nigiri!"
"order put? true"
...

Итак, мы можем перейти к первым 10 ордерам в буфере и другим, которые не переполнили очередь отложенных размещений, но - с остальными - мы находимся в том же цукэмоно (рассол), что и раньше. Наш очаровательный маленький почтовый ящик, который, возможно, сослужил нам хорошую службу, прежде чем стать крупнейшей сетью суши на планете, переполнен, и мы получаем сообщение об ошибке, которое может создать хаос в нашей системе.

Что нам теперь делать? Одна вещь, которую мы могли бы сделать, - это использовать «оконный буфер», который отбрасывает некоторые заказы. Есть два типа оконных буферов: sliding-buffer и dropping-buffer.

Обработка заказов с помощью sliding-buffer (сбросить самые старые путы)

Документы:

(source sliding-buffer)
(defn sliding-buffer)
  "Returns a `buffer` of size `n`. When full, puts will complete, and be buffered, but oldest elements in buffer will be dropped (not transferred)."
  [n]
  (buffers/sliding-buffer n)

«Оконные» буферы служат контрактами или политиками для обработки входящих данных. Это может быть полезно в ряде сценариев (например, для ограничения скорости). Давайте покажем пару примеров того, как мы можем использовать оконный буфер. Во-первых, давайте используем sliding-buffer, который отбрасывает самые старые путы:

(def slide-chan (chan (sliding-buffer 10))) ; buffer = 10 put values

Предупреждение: впереди большое бревно (и я не имею в виду песню Роберта Планта. Это было бы круто…)!

(IHOS-orders slide-chan "Sashimi")

=>

"order put? true"
"order put? true"
... **2088 more**

eval:

(take!-order slide-chan)

возьмите 1) = ›

"order taken: #: 2090 order: Nigiri!"
nil

возьмите 2) = ›

"order taken: #: 2091 order: Nigiri!"
nil
...

возьмите 10) = ›

"order taken: #: 2099 order: Nigiri!"
nil

возьмите 11) = ›

nil

Примечание. Причина, по которой мы получили 2099 вместо 2100, связана с тем, что индекс начинается с 0

Теперь мы не получаем эту досадную ошибку, у нас есть последние 10 заказов, которые были сохранены в sliding-buffer, но все остальные заказы отбрасываются. В некоторых случаях это может быть то, что вам нужно. Например, для потоков данных, где самые последние / самые новые значения являются важными. Допустим, у нас есть витрина в каждом из наших ресторанов, демонстрирующая последние заказы суши, чтобы дать посетителям понять синдром «я хочу то, что у них есть».

Другая политика, которую мы можем захотеть встроить в наш источник входящих данных, - это отбрасывать самые свежие данные при достижении некоторого порогового значения. Мы можем сделать это с помощью dropping-buffer.

Обработка трафика с dropping-buffer (отбрасывать последние путы)

Документы:

(source dropping-buffer)
(defn dropping-buffer
  "Returns a buffer of size `n`. When full, puts will complete but val will be dropped (no transfer)."
  [n]
  (buffers/dropping-buffer n))

Давайте создадим канал с отбрасывающим буфером:

(def drop-chan (chan (dropping-buffer 10)))

Затем используйте на нем нашу операцию заказа на всплеск:

(IHOS-orders drop-chan "Tofu Katsu")

=>

"order put? true"
"order put? true"
... **2088 more**

eval:

(take!-order drop-chan)

возьмите 1) = ›

"order taken: #: 0 order: Tofu Katsu"
nil

возьмите 2) = ›

"order taken: #: 1 order: Tofu Katsu"
nil

возьмите 10) = ›

"order taken: #: 9 order: Tofu Katsu"

Сейчас мы храним самые старые данные. Как только наш буфер заполнится (10 значений), все оставшиеся put будут завершены (их обратные вызовы будут запущены, если они предоставлены), но будут сохранены только первые 10 значений. Остальные отброшены. Иногда такое поведение полезно при работе с исходным (размещающим) источником данных.

Для нашего суши-ресторана это все равно, что создать специальную секцию бара под названием «Рулетка» (слоган: «Вот так мы и катимся» ™). Однако это не ваша обычная гостиная в баре. Он светится в темном аквариуме с медузами и знаменитыми поварами со всего мира, поэтому люди выстраиваются в очередь, чтобы занять место. Но есть загвоздка. Если место открыто, вы можете его занять, но если нет, вас отсылают, и вы теряете место в очереди. Хотя вы можете вернуться в очередь (сделать еще одну остановку), вы не можете ждать («припарковаться») в очереди (это было бы грубо ‡ для сидящих гостей). В нашем примере выше все ожидающие и будущие путы отправляются как завершенные, но их значения отбрасываются до тех пор, пока в буфере не откроется место, когда будут приняты новые путы.

Это работает для The Roulette Room, но не подходит для нашей системы заказов. Мы не хотим отказываться от заказов (это было бы грубо, не говоря уже о вреде для бизнеса). Итак, как мы можем справиться с нашим потоком заказов, не теряя ни одного?

‡ В японском языке 16 слов означает «грубый»

Обратное давление

Пакетные заказы с (go) противодавлением в восходящем направлении (>!) и асинхронным нисходящим потоком (take!)

Один из способов, о котором вы можете позаботиться (не отбрасывать путы), - это реализовать противодавление с помощью блокирующей конструкции в точке входа.

— RH

Документы:

(source go)
(defmacro go
  "Asynchronously executes the `body`, returning immediately to the calling thread. Additionally, any visible calls to `<!`, `>!` and `alt!/alts!` channel operations within the body will block (if necessary) by 'parking' the calling thread rather than tying up an OS thread (or the only JS thread when in ClojureScript). Upon completion of the operation, the `body` will be resumed.
  Returns a channel which will receive the result of the `body` when completed"
  [& body]
  `(let [c# (cljs.core.async/chan 1)]
     (cljs.core.async.impl.dispatch/run
      (fn []
        (let [f# ~(ioc/state-machine body 1 &env ioc/async-custom-terminators)
              state# (-> (f#)
                         (ioc/aset-all! cljs.core.async.impl.ioc-helpers/USER-START-IDX c#))]
          (cljs.core.async.impl.ioc-helpers/run-state-machine-wrapped state#))))
     c#))

Здесь где происходит волшебство. go блоки создают среду, в которой мы можем избежать ада обратных вызовов. Как объясняет Стюарт Хэллоуэй в своем большом разговоре о core.async (перефразируя):

go - это абстракция процесса первого класса, которая будет либо использовать реальные потоки (виртуальная машина Java), либо использовать «волшебный обратный вызов-ад-за-кулисами-потоками» (виртуальная машина JavaScript), предоставляя пользователям, у которых нет реальных потоков для работы (т.е. пользователи ClojureScript / JavaScript) возможность писать код, как если бы они это делали. go использует конечный автомат с «блокировкой» / «парковкой», чтобы сделать это возможным.

Мы передаем работу, выполненную в блоке go, другой части нашей программы (или наоборот) через каналы. и способ, которым мы помещаем значения в канал, находясь внутри блока go, - это синтаксис размещения "парковка" >!

Документы:

(source >!)
(defn >!
  "puts a `val` into `port`. nil values are not allowed. Must be called inside a `(go ...)` block. Will park if no buffer space is available. Returns `true` unless `port` is already closed."
  [port val]
  (throw (js/Error. ">! used not in (go ...) block")))

Обратите внимание, что функция «парковки» (>!) не имеет аргумента обратного вызова (как и put!). Поскольку go позволит нам писать наш асинхронный код в синхронном стиле, они нам не понадобятся. Асинхронная обратная связь между функциями для передачи данных с течением времени ушла в прошлое, и вы можете думать, что данные, проходящие через операции в / из каналов в блоке go, доступны синхронно. Однако в иллюстративных целях нам нужны наши журналы, поэтому мы должны вывести наш put-logger за пределы операции канала, чтобы получить их.

(defn backpressured-orders [channel order]
  (go
    (dotimes [x 2100] ; increase number of bot orders
      (put-logger (>! channel (str "#: " x " order: " order))))))
(def burst-chan (chan 50))

Буферы с противодавлением не только позволяют обрабатывать большие потоки данных, они также позволяют балансировать поставленные (восходящие) и принимаемые (нисходящие) источники данных, которые могут передаваться непоследовательно. Например, если наши суши-повара получают онлайн-заказы только тогда, когда у них есть время сделать 50 заказов («пакетами» по 50), фиксированный буфер из 50 с противодавлением вверх по потоку позволит им использовать свою емкость, а затем открыть этот буфер на следующие 50, чтобы заполнить для следующего запуска. Давайте посмотрим на это в действии.

(defn burst-take! [channel]
  (dotimes [x 50] ; take 50 orders at a time
    (take!-order channel)))

eval:

(backpressured-orders burst-chan "Umami Tamago")

=>

"order put? true"
"order put? true"
... 48 more

eval:

(burst-take! burst-chan)

возьмите 1) = ›

"order taken: #: 0 order: Umami Tamago"
...
"order taken: #: 49 order: Umami Tamago"
"order put? true"
"order put? true"
... 48 more

возьмите 2) = ›

"order taken: #: 50 order: Umami Tamago"
...
"order taken: #: 99 order: Umami Tamago"
"order put? true"
"order put? true"
... 48 more

Как видите, при оценке backpressured-orders мы получаем 50 заказов, помещенных в фиксированный буфер. Затем - с каждой оценкой burst-take! - мы получаем набор из 50 дублей (исчерпывающий буфер) и еще 50 операций ввода, которые снова заполняют буфер.

Также обратите внимание, что мы значительно превышаем лимит очереди на другой стороне нашего буфера. Однако мы не получили (MAX-QUEUE-SIZE) ошибку, как в случае с put!.

То, что позволило нам поставить в очередь оставшиеся отложенные ордера (после того, как наш буфер 50 заполнен) - даже при том, что у нас превышено 1024 допущенных к отложенным путям ордеров - это противодавление. Обернув наши заказы в блок (go...) и изменив put! на соответствующий ему синтаксис парковки (>!), мы создали поток (в JavaScript это машина состояния« Инверсия управления »), который выполняет некоторые действия. magic-callback-hell-за кулисами »для регистрации и отслеживания ожидающих пут без переполнения канала. Фактически, мы можем сделать операции put ленивыми, откладывая их оценку до тех пор, пока не будут найдены совпадающие дубли и / или место в буфере.

Доработка: обратное давление предотвращает регистрацию операций помещения в очередь обработчиков в канале. Вместо использования очереди размещения канала, «блокировка» (go...) создает конечный автомат, который отслеживает состояние функции dotimes (в данном случае) и эффективно приостанавливает ее, когда в канале нет возможности для размещения. Это позволяет вышестоящим «производителям» данных управлять темпами производства в соответствии с мощностью последующих потребителей.

go Предостережения относительно блоков

Важно отметить, что макрос go перестает транслировать на границах создания функции. Так, например, если бы мы использовали этот код вместо нашей другой backpressured-orders функции, описанной выше ...

(defn >!-order [channel order count]
  (put-logger (>! channel (str "#: " count " order: " order))))
(defn backpressured-orders [channel order]
  (go
    (dotimes [x 2100] ; increase number of bot orders
      (>!-order channel order x))))

Получим ошибку:

...Error: >! used not in (go ...) block ...

Узнайте больше в Руководстве по передовой практике Алекса Миллера.

Пакетные заказы с противодавлением вверх по потоку (>!) и вниз по потоку (<!)

Документы:

(source <!)
(defn <!
  "takes a val from `port`. Must be called inside a `(go ...)` block. Will return nil if closed. Will park if nothing is available. Returns `true` unless port is already closed"
  [port]
  (throw (js/Error. "<! used not in (go ...) block")))
(def burst-chan (chan 50))
(defn burst-<! [channel]
  (go
    (dotimes [x 50]
      (take-logger (<! channel)))))
(backpressured-orders burst-chan "Umami Tamago")

так же, как и раньше

(burst-<! burst-chan)

так же, как и раньше

Пакетные заказы с асинхронным восходящим потоком (put!) Нисходящим потоком «Парковка» (<!)

Однако, если бы мы изменили обратное давление, то есть использовали синтаксис асинхронного помещения (put!) в восходящем направлении с синтаксисом «парковки» (<!) в нисходящем направлении ...

(def burst-chan (chan 50))

eval:

(IHOS-orders burst-chan "Miso Soup!")

Старый Верный:

Error: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer. (< (.-length puts) impl/MAX-QUEUE-SIZE)

eval:

(burst-<! burst-chan)

=>

No bueno

Ключевые выводы

Мы получаем несущественную разницу при использовании синтаксиса «блокировки» как в восходящем, так и в нисходящем потоке из-за большого количества операций, идущих с производящей стороны канала. Если бы наша потребительская сторона (возьмите!) Была чрезмерно усердной, этот пример мог бы сработать. Вывод состоит в том, что противодавление должно применяться к агрессивному концу канала. Если обе стороны агрессивны и несколько непостоянны в своей доступности, вам может потребоваться противодавление на обоих концах.

Теперь в этом случае ни один заказ не будет сброшен. Но что, если мы не справимся со всеми этими заказами? Мы не хотим сбрасывать заказы. Мы также не хотим, чтобы клиенты размещали заказ, если нет надежды, что они когда-либо будут выполнены. Так что же нам делать? Что ж, есть много способов контролировать поведение наших каналов, но давайте начнем с нескольких основных структур управления.

Теперь, когда мы знаем, как справляться с потоком данных восходящего потока (то есть от пут), и познакомились с ограничениями каналов ожидающих очередей операций, мы можем уменьшить количество операций ввода-вывода (и протоколирования), чтобы сосредоточиться на контроле качества поведение (а не количество трафика) наших каналов.

Заказы на отключение с close!

Документы:

(source close!)
(defn close!
   "Closes a channel. The channel will no longer accept any puts (they will be ignored). Data in the channel remains available for taking, until exhausted, after which takes will return nil. If there are any pending takes, they will be dispatched with nil. Closing a closed channel is a no-op. Returns nil. Logically closing happens after all puts have been delivered. Therefore, any blocked or parked puts will remain blocked/parked until a taker releases them."
   ([port]
    (impl/close! port)))

Функция close! позволяет отключить канал «извне». При передаче канала в функцию close! все последующие путы будут отклонены (возвращено ложь), но любые значения в буфере будут доступны для приема, а также любые припаркованные или ожидающие путы, отправленные в канал до close!.

Давайте сначала отправим несколько путей в канал с небольшим буфером достаточного количества, который будет передавать приказы в буфер, а также ставить в очередь некоторые ожидающие пути для канала.

(defn max-order [channel order]
  (go
    (dotimes [x 12]
      (put-logger (>! channel (str "#: " x " order: " order))))
    (close! channel)))
(def capacity (chan 5))
(defn take!-til-closed [channel]
  (dotimes [x 5]
    (take!-order channel)))

eval:

(max-order capacity "Wildcard")

=>

"order put? true"
"order put? true"
"order put? true"
"order put? true"
"order put? true"

eval:

(take!-til-closed capacity)

возьмите 1) = ›

"order taken: #: 0 order: Wildcard"
...
"order put? true"
...
nil

возьмите 2) = ›

"order taken: #: 5 order: Wildcard"
...
"order put? true"
...
nil

возьмите 3) = ›

"order taken: #: 10 order: Wildcard"
"order taken: #: 11 order: Wildcard"
"order taken: "
"order taken: "
"order taken: "
nil

После 12 пут закрыли канал. Таким образом, 5 заказов попали в фиксированный буфер и 7 были поставлены в очередь. Никто не был потерян. Однако, если мы попытаемся разместить больше заказов в закрытом канале…

(put! capacity "Overflow" put-logger)

=>

"order put? false"
false

Это простой способ контролировать, сколько заказов мы готовы обработать. Для начинающего суши-бара этого может быть достаточно. Давайте рассмотрим еще несколько изощренных способов работы с каналами.

Установление времени закрытия с alts! и timeout

Одна из самых важных управляющих структур в core.async: alts!

На самом деле Alt - сложная операция. Почти все, что встроено в эту реализацию канала, предназначено для поддержки alt, потому что alt - самая сложная часть.

RH

Документы:

(source alts!)
(defn alts!
  "Completes at most one of several channel operations. Must be called inside a `(go ...) block`. `ports` is a vector of channel endpoints, which can be either a channel to take from or a vector of `[channel-to-put-to val-to-put]`, in any combination. Takes will be made as if by `<!`, and puts will be made as if by `>!`. Unless the `:priority` option is `true`, if more than one port operation is ready a non-deterministic choice will be made. If no operation is ready and a `:default` value is supplied, `[default-val :default]` will be returned, otherwise `alts!` will park until the first operation to become ready completes. Returns `[val port]` of the completed operation, where `val` is the value taken for takes, and a boolean (`true` unless already closed, as per `put!`) for puts.
  Supported options: (`opts` are passed as `:key val`)
  `:default val` - the value to use if none of the operations are immediately ready
  `:priority true` - (default nil) when `true`, the operations will be tried in order.
  Note: there is no guarantee that the port exps or val exprs will be used, nor in what order should they be, so they should not be depended upon for side effects."
  [ports & {:as opts}]
  (throw (js/Error. "alts! used not in (go ...) block")))
(source timeout)
(defn timeout
  "Returns a channel that will close after msecs"
  [msecs]
  (timers/timeout msecs))

Сделаем наш пример немного интереснее. Допустим, у нас есть по три бара в каждом из наших суши-ресторанов, каждый принимает заказы (одновременно!) С разным интервалом (одни популярнее других). Допустим также, что наш ресторан перестает принимать заказы после 3000 миллисекунд работы («Самые быстрые суши на планете» ™).

(defn timeout-chan [channel]
  (let [closer (timeout 3000)]
    (go (while true (<! (timeout 250)) (>! channel "Main Bar")))
    (go (while true (<! (timeout 500)) (>! channel "Online Order")))
    (go (while true (<! (timeout 750)) (>! channel "Roulette Room")))
    (go-loop [_ []]
      (let [[val ch] (alts! [channel closer])] ; <- `alts!`
        (cond
          (= ch closer) (do
                          (close! channel)
                          (.log js/console (str "No more orders. Domo Arigatogozaimashita.")))
          :else
          (recur (.log js/console (str "Order up: " (<! channel)))))))))
(def capacity (chan 5))

eval:

(timeout-chan capacity)

=>

Order up: Online Order
Order up: Roulette Room
Order up: Online Order
Order up: Main Bar
Order up: Online Order
Order up: Main Bar
Order up: Main Bar
Order up: Main Bar
Order up: Main Bar
Order up: Roulette Room
No more orders. Domo Arigatogozaimashita.

Мы использовали timeout выше, чтобы завершить наш процесс. Важно отметить, что хотя timeout действительно возвращает канал, этот канал существует только для того, чтобы сигнализировать вам, когда истекло заданное количество миллисекунд, и он сигнализирует об этом, давая вам закрытый chan. Это один из методов правильного завершения go зацикливания: прекратить прием или подачу к цели chan, переключившись на закрытый канал (через alts!).

Но что более важно ...

Мы только что раскрутили троих. независимый. асинхронный. процессы.

Да, мы просто сделали это тремя строчками кода.

Это не раздел. Я просто подумал, что кстати большие буквы.

Теперь мы можем слить отложенные заказы из буфера и позволить поварам заняться более важными делами… Ямадзаки.

(take! capacity take-logger)
;;=> "order taken: Online Order"
;;=> "order taken: "

Готово!

Это был головокружительный тур по основам core.async. Я надеюсь - в будущих статьях я расскажу более подробно и расскажу о еще более интересных вещах, которые вы можете сделать с его помощью.

Получать помощь

Присоединяйтесь к каналу Slack от Clojurian

При создании этого руководства я задействовал замечательное сообщество пользователей Clojure (Script) на Clojurians’ Slack .

  • Если вы новичок в Clojure (Script), я настоятельно рекомендую #beginners channel
  • Если вы новичок в core.async, есть еще # core-async channel!

Я думаю, вы обнаружите, что члены сообщества - одни из самых щедрых людей, которых вы когда-либо встречали.

Специальная благодарность

Отличную помощь от этих парней из Slack Clojurians:

Дэвиду Нолену за его потрясающие сообщения в блогах и вебинары и, конечно же, за ClojureScript!

Тимоти Болдридж и Алекс Миллер для core.async. У Тима также есть отличные core.async уроки!

Дополнительные ресурсы

Если вы еще этого не сделали, взгляните на clojure.core.async (Справочник по API или docs), сделайте это!

Ознакомьтесь с этим сообщением в блоге от Говарда М. Льюиса (основного автора фантастического Clojure GraphQL Server), в котором рассказывается, о чем следует думать при развертывании потоков.

Реализация core.async Презентация Rich Hickey

Презентация: core.async дебют с Ричем Хикки

core.async: Параллелизм без обратных вызовов от Стюарта Хэллоуэя

Тайм-ауты и работа с несколькими каналами через Parking и alts! от Will Fleming.

Другие примеры из Тима Болдриджа.

Дэвид Нолен Примеры вебинаров.

Интервью с Тимом Болдриджем Обсуждая core.async.

Примеры core.async

Ознакомьтесь с некоторыми расширенными примерами core.async в действии в реальных проектах

  1. NetRunner: Игра
  2. Гойя: редактор пикселей
  3. Редактор разметки Wordsmith

Для читателя-теоретика:

Узнайте больше о концепциях, лежащих в основе core.async: Взаимодействие с последовательными процессами: CSP).