狗好看の世界

Less words, more attempting.

pub-sub

Table of Contents

Pubbing 和 Subbing

并没有怎么看懂, 先记录下来

首先启动REPL, 并加载async.

(use 'clojure.core.async)

Publishing (发行) 和 Subscribing (订阅) 是在处理异步事件时经常用的两个概念. pub 函数接收一个 channel 和一个 topic 函数, 返回一个 publication:

(def input-chan (chan))
(def our-pub (pub input-chan :msg-type))

publication 并不是 channel, 不要用put!向他发送消息.

(put! our-pub {:msg-type :greeting :text "hello"})
;; IllegalArgumentException No implementation of method: :put! of protocol

我们应该直接发送消息给原来的channel:

(>!! input-chan {:msg-type :greeting :text "hello"})
; true

(通常情况下, 如果没有 input-chan 的消费者, 这里会阻塞. 但是 pub 的实现中, 会为它创建两个 go 块来 代替主线程挂起. )

topic-fn 会作用在所有的传给 publication 的值上, 在上面的情况中, :msg-type 被用来作为这个函数. 并且会返回 :greeting, topic-fn 用来给消息分类, 但不会修改消息. 如果一个 publication使用 inc 作为topic函数 传递的值也是不会改变的. 我们如何获得这个值, 我们可以用对应 sub 函数:

(def output-chan (chan))
(sub our-pub :greeting output-chan)
; #object[clojure.core.async.impl.channels.ManyToManyChannel

sub 接受一个 publication, 一个topic和一个channel. 这个channel会接受来自publication中的, 所有的满足 (= (topic-fn v) topic) 的值.

(go-loop []
  (let [{:keys [text]} (<! output-chan)]
    (println text)
    (recur)))

注意, 尽管我们之前已经把一个值放到了 input-chan, 但是这里什么都不会发生. 因为 publication 会抛弃所有不符合 subscrbe 的 topic. 我们的原始的信息丢弃了. 但是有了 subscribe channel 之后, 我们可以发布新的值.

(>!! input-chan {:msg-type :greeting :text "hi"})
;true
;hi

如果我们发布了一个频道不接受的值, 整个发布会阻塞.

(def loser-chan (chan))
(sub our-pub :loser loser-chan)
(>!! input-chan {:msg-type :loser :text "I won't be accepted"})

小心: 这里看起来返回 true 并且不会阻塞, 因为 pubilcation 实际上是从 input-chan 中取值. 但是在发行中的 go 块是会阻塞的. >!! 一个 :loser 在主线程不会阻塞, 但是在接下来的线程会阻塞.

如果你向 publication 订阅一个channel, 确保他可以接受值. 你可以通过很多方式来实现, 但是最好从 publication 来处理. 因为你可能不知道哪里出问题了, 直到另外一个 topic 也被阻塞了.

有时候你无法这么做, 比如消费的操作是由资源决定的. 这种情况下, 你可以创建 topic 指定的 buffer: pub 函数最后传入一个 buf-fn 作为参数. buf-fn 接受一个topic, 返回一个 buffer 和一个数值. buffer-fn 可以简单的是一个 map:

  {:predict-election 100
   :flip-a-coin 3}

(def pub-central
  (let [topic-fn #(case (:msg-type %) 
                    :db-change :acid
                    :http-request :stateless)
        buf-fn #(case %
                  :stateless (sliding-buffer 10)
                  :acid (dropping-buffer 1000))]
    (pub request-source topic-fn buf-fn)))

Comments