pub-sub
Table of Contents
Pubbing 和 Subbing
并没有怎么看懂, 先记录下来
首先启动REPL, 并加载async.
(use 'clojure.core.async)
Publishing
(发行) 和 Subscribing
(订阅) 是在处理异步事件时经常用的两个概念.
pub
函数接收一个 channel 和一个 topic 函数, 返回一个 publication
:
(def input-chan (chan)) (def our-pub (pub input-chan :msg-type))
publication
并不是 channel, 不要用put!向他发送消息.
(put! our-pub {:msg-type :greeting :text "hello"}) ;; IllegalArgumentException No implementation of method: :put! of protocol
我们应该直接发送消息给原来的channel:
(>!! input-chan {:msg-type :greeting :text "hello"}) ; true
(通常情况下, 如果没有 input-chan
的消费者, 这里会阻塞. 但是 pub
的实现中, 会为它创建两个 go
块来
代替主线程挂起. )
topic-fn
会作用在所有的传给 publication 的值上, 在上面的情况中, :msg-type
被用来作为这个函数.
并且会返回 :greeting
, topic-fn
用来给消息分类, 但不会修改消息. 如果一个 publication使用 inc
作为topic函数
传递的值也是不会改变的.
我们如何获得这个值, 我们可以用对应 sub
函数:
(def output-chan (chan)) (sub our-pub :greeting output-chan) ; #object[clojure.core.async.impl.channels.ManyToManyChannel
sub
接受一个 publication, 一个topic和一个channel. 这个channel会接受来自publication中的, 所有的满足
(= (topic-fn v) topic)
的值.
(go-loop [] (let [{:keys [text]} (<! output-chan)] (println text) (recur)))
注意, 尽管我们之前已经把一个值放到了 input-chan
, 但是这里什么都不会发生.
因为 publication 会抛弃所有不符合 subscrbe 的 topic. 我们的原始的信息丢弃了.
但是有了 subscribe channel 之后, 我们可以发布新的值.
(>!! input-chan {:msg-type :greeting :text "hi"}) ;true ;hi
如果我们发布了一个频道不接受的值, 整个发布会阻塞.
(def loser-chan (chan)) (sub our-pub :loser loser-chan) (>!! input-chan {:msg-type :loser :text "I won't be accepted"})
小心: 这里看起来返回 true
并且不会阻塞, 因为 pubilcation 实际上是从 input-chan
中取值.
但是在发行中的 go
块是会阻塞的. >!!
一个 :loser
在主线程不会阻塞, 但是在接下来的线程会阻塞.
如果你向 publication 订阅一个channel, 确保他可以接受值. 你可以通过很多方式来实现, 但是最好从 publication 来处理. 因为你可能不知道哪里出问题了, 直到另外一个 topic 也被阻塞了.
有时候你无法这么做, 比如消费的操作是由资源决定的. 这种情况下, 你可以创建 topic 指定的 buffer: pub
函数最后传入一个
buf-fn
作为参数. buf-fn
接受一个topic, 返回一个 buffer 和一个数值. buffer-fn 可以简单的是一个 map:
{:predict-election 100 :flip-a-coin 3} (def pub-central (let [topic-fn #(case (:msg-type %) :db-change :acid :http-request :stateless) buf-fn #(case % :stateless (sliding-buffer 10) :acid (dropping-buffer 1000))] (pub request-source topic-fn buf-fn)))