1 什么是協(xié)程
協(xié)程是與其他函數(shù)或方法一起并發(fā)運(yùn)行的函數(shù)或方法。Go協(xié)程可以看作是輕量級(jí)線(xiàn)程,與線(xiàn)程相比,創(chuàng)建一個(gè) Go 協(xié)程的成本很小。
1.1 協(xié)程與線(xiàn)程的對(duì)比
協(xié)程的成本極低。堆棧大小只有若干KB(2或4KB),并且可以根據(jù)應(yīng)用的需求進(jìn)行增減。而線(xiàn)程必須指定堆棧的大小,其堆棧是固定不變的(一般默認(rèn)2MB)。固定了棧的大小導(dǎo)致兩個(gè)問(wèn)題:
一是對(duì)于很多只需要很小的??臻g的線(xiàn)程來(lái)說(shuō)是一個(gè)巨大的浪費(fèi)
二是對(duì)于少數(shù)需要巨大棧空間的線(xiàn)程來(lái)說(shuō)又面臨棧溢出的風(fēng)險(xiǎn)
協(xié)程會(huì)復(fù)用(Multiplex)數(shù)量更少的 OS 線(xiàn)程。即使程序有數(shù)以千計(jì)的協(xié)程,也可能只有一個(gè)線(xiàn)程。
如果該線(xiàn)程中的某一Go協(xié)程發(fā)生了阻塞(比如說(shuō)等待用戶(hù)輸入),那么系統(tǒng)會(huì)再創(chuàng)建一個(gè)OS線(xiàn)程,并把其余協(xié)程都移動(dòng)到這個(gè)新的OS線(xiàn)程。所有這一切都在運(yùn)行時(shí)進(jìn)行,作為程序員,我們沒(méi)有直接面臨這些復(fù)雜的細(xì)節(jié),而是有一個(gè)簡(jiǎn)潔的 API 來(lái)處理并發(fā)。
Go內(nèi)置半搶占式的協(xié)作調(diào)度器,在用戶(hù)態(tài)進(jìn)行協(xié)程的調(diào)度。
Go協(xié)程使用信道(Channel)來(lái)進(jìn)行通信。信道用于防止多個(gè)協(xié)程訪(fǎng)問(wèn)共享內(nèi)存時(shí)發(fā)生競(jìng)態(tài)條件(Race Condition)。信道可以看作是協(xié)程之間通信的管道。
1.2 啟動(dòng)協(xié)程
調(diào)用函數(shù)或者方法時(shí),在前面加上關(guān)鍵字go,可以讓一個(gè)新的Go協(xié)程并發(fā)地運(yùn)行。需要注意:
啟動(dòng)一個(gè)新的協(xié)程時(shí),協(xié)程的調(diào)用會(huì)立即返回。程序控制不會(huì)去等待Go協(xié)程執(zhí)行完畢。在調(diào)用Go協(xié)程之后,程序控制會(huì)立即返回到代碼的下一行,忽略該協(xié)程的任何返回值。
如果希望運(yùn)行其他Go協(xié)程,Go 主協(xié)程必須繼續(xù)運(yùn)行著。如果Go主協(xié)程終止,則程序終止,于是其他協(xié)程也不會(huì)繼續(xù)運(yùn)行。
使用示例如下:
package?main import?(?? ????"fmt" ????"time" ) func?numbers()?{?? ????for?i?:=?1;?i?<=?5;?i++?{ ????????time.Sleep(250?*?time.Millisecond) ????????fmt.Printf("%d?",?i) ????} } func?alphabets()?{?? ????for?i?:=?'a';?i?<=?'e';?i++?{ ????????time.Sleep(400?*?time.Millisecond) ????????fmt.Printf("%c?",?i) ????} } func?main()?{?? ????go?numbers()?//啟動(dòng)協(xié)程 ????go?alphabets()?//啟動(dòng)協(xié)程 ????//等待子協(xié)程允許完畢,后面介紹更高級(jí)的信道方式,這里就簡(jiǎn)單的等待 ????time.Sleep(3000?*?time.Millisecond) ????fmt.Println("main?terminated") } //輸出:1?a?2?3?b?4?c?5?d?e?main?terminated
下圖可以清晰的看到三個(gè)協(xié)程的運(yùn)行關(guān)系:

2 信道
2.1 信道的創(chuàng)建
信道可以想像成協(xié)程之間通信的管道。如同管道中的水會(huì)從一端流到另一端,通過(guò)使用信道,數(shù)據(jù)也可以從一端發(fā)送,在另一端接收。所有信道都關(guān)聯(lián)了一個(gè)類(lèi)型。信道只能運(yùn)輸這種類(lèi)型的數(shù)據(jù),而運(yùn)輸其他類(lèi)型的數(shù)據(jù)都是非法的。chan T表示T類(lèi)型的信道,使用make函數(shù)進(jìn)行初始化。例如:
a?:=?make(chan?int)
2.2 信道的收發(fā)
信道旁的箭頭方向指定了是發(fā)送數(shù)據(jù)還是接收數(shù)據(jù)
data?:=?<-?a?//?讀取信道a,保存值到data a?<-?data?//?寫(xiě)入信道a
發(fā)送與接收默認(rèn)是阻塞的。當(dāng)把數(shù)據(jù)發(fā)送到信道時(shí),程序控制會(huì)在發(fā)送數(shù)據(jù)的語(yǔ)句處發(fā)生阻塞,直到有其它協(xié)程從信道讀取到數(shù)據(jù),才會(huì)解除阻塞。與此類(lèi)似,當(dāng)讀取信道的數(shù)據(jù)時(shí),如果沒(méi)有其它的協(xié)程把數(shù)據(jù)寫(xiě)入到這個(gè)信道,那么讀取過(guò)程就會(huì)一直阻塞著。**信道的這種特性能夠幫助Go協(xié)程之間進(jìn)行高效的通信,不需要用到其他編程語(yǔ)言常見(jiàn)的顯式鎖或條件變量。 借助阻塞這個(gè)特性,我們可以用一個(gè)讀操作等待子協(xié)程結(jié)束,而不是使用sleep:
func?hello(done?chan?bool)?{??
????fmt.Println("Hello?world?goroutine")
????done?<-?true//子協(xié)程結(jié)束,寫(xiě)入數(shù)據(jù)
}
func?main()?{??
????done?:=?make(chan?bool)//創(chuàng)建bool信道
????go?hello(done)
????<-done?//讀操作,一直阻塞直到子協(xié)程結(jié)束
????fmt.Println("main?function")
}
2.3 小心死鎖
使用信道需要考慮的一個(gè)重點(diǎn)是死鎖。
當(dāng)Go協(xié)程給一個(gè)信道發(fā)送數(shù)據(jù)時(shí),照理說(shuō)會(huì)有其他Go協(xié)程來(lái)接收數(shù)據(jù)。如果沒(méi)有的話(huà),程序就會(huì)在運(yùn)行時(shí)觸發(fā) panic,形成死鎖。
當(dāng)有Go協(xié)程等著從一個(gè)信道接收數(shù)據(jù)時(shí),我們期望其他的Go協(xié)程會(huì)向該信道寫(xiě)入數(shù)據(jù),要不然程序就會(huì)觸發(fā) panic。
2.4 關(guān)閉信道和range遍歷
數(shù)據(jù)發(fā)送方可以關(guān)閉信道,通知接收方這個(gè)信道不再有數(shù)據(jù)發(fā)送過(guò)來(lái)。當(dāng)從信道接收數(shù)據(jù)時(shí),接收方可以多用一個(gè)變量來(lái)檢查信道是否已經(jīng)關(guān)閉。
func?producer(chnl?chan?int)?{??
????for?i?:=?0;?i?10;?i++?{
????????chnl?<-?i
????}
????close(chnl)//關(guān)閉信道
}
func?main()?{??
????ch?:=?make(chan?int)
????go?producer(ch)
????for?{
????????v,?ok?:=?<-ch?//判斷信道是否關(guān)閉
????????if?ok?==?false?{
????????????break
????????}
????????fmt.Println("Received?",?v,?ok)
????}
}
上面的語(yǔ)句里,如果成功接收信道所發(fā)送的數(shù)據(jù),那么 ok 等于 true。而如果 ok 等于 false,說(shuō)明我們?cè)噲D讀取一個(gè)關(guān)閉的通道。從關(guān)閉的信道讀取到的值會(huì)是該信道類(lèi)型的零值。 或者我們可以用range遍歷信道,代替上面示例中的for循環(huán):
func?main()?{??
????ch?:=?make(chan?int)
????go?producer(ch)
????for?v?:=?range?ch?{//range可以在信道關(guān)閉后自動(dòng)結(jié)束,不用顯示的判斷
????????fmt.Println("Received?",v)
????}
}
2.5 緩沖信道
上面無(wú)緩沖信道的發(fā)送和接收過(guò)程是阻塞的,讀寫(xiě)操作會(huì)一直阻塞。我們還可以創(chuàng)建一個(gè)有緩沖的信道(Buffered Channel)。只在緩沖已滿(mǎn)的情況,才會(huì)阻塞向緩沖信道發(fā)送數(shù)據(jù)。同樣,只有在緩沖為空的時(shí)候,才會(huì)阻塞從緩沖信道接收數(shù)據(jù)。 通過(guò)向 make 函數(shù)時(shí)再傳遞一個(gè)表示容量的參數(shù)(指定緩沖的大小,sizeof(type) * capacity),就可以創(chuàng)建緩沖信道。
ch?:=?make(chan?type,?capacity)//capacity?應(yīng)該大于?0。無(wú)緩沖信道的容量默認(rèn)為?0
緩沖區(qū)容量和長(zhǎng)度的區(qū)別:
容量是指信道可以存儲(chǔ)的值的數(shù)量(總的大?。?。我們?cè)谑褂胢ake函數(shù)創(chuàng)建緩沖信道的時(shí)候會(huì)指定容量大小。
長(zhǎng)度是指信道中當(dāng)前排隊(duì)的元素個(gè)數(shù)(當(dāng)前保存的大小)。
使用示例如下:
func?write(ch?chan?int)?{??
????for?i?:=?0;?i?5;?i++?{
????????ch?<-?i?//寫(xiě)入兩個(gè)值之后緩沖區(qū)滿(mǎn),阻塞等待緩沖區(qū)空閑
????????fmt.Println("successfully?wrote",?i,?"to?ch")
????}
????close(ch)
}
func?main()?{??
????ch?:=?make(chan?int,?2)//緩沖大小為2
????go?write(ch)
????time.Sleep(2?*?time.Second)
????for?v?:=?range?ch?{
????????fmt.Println("read?value",?v,"from?ch")
????????time.Sleep(2?*?time.Second)
????}
}
2.6 select
select 語(yǔ)句用于在多個(gè)發(fā)送/接收信道操作中進(jìn)行選擇。該語(yǔ)法與 switch 類(lèi)似,所不同的是,這里的每個(gè) case 語(yǔ)句都是信道操作。
select 語(yǔ)句會(huì)一直阻塞,直到發(fā)送/接收操作準(zhǔn)備就緒。如果有多個(gè)信道操作準(zhǔn)備完畢,select 會(huì)隨機(jī)地選取其中之一執(zhí)行。
在沒(méi)有case準(zhǔn)備就緒時(shí),可以執(zhí)行select語(yǔ)句中的默認(rèn)情況(Default Case),這通常用于防止select語(yǔ)句一直阻塞,沒(méi)有信道可用時(shí)會(huì)立刻返回。
使用示例:
func?server1(ch?chan?string)?{??
????time.Sleep(6?*?time.Second)
????ch?<-?"from?server1"
}
func?server2(ch?chan?string)?{??
????time.Sleep(3?*?time.Second)
????ch?<-?"from?server2"
}
func?main()?{??
????output1?:=?make(chan?string)
????output2?:=?make(chan?string)
????go?server1(output1)
????go?server2(output2)
????select?{//一直阻塞,直到某個(gè)信道可用
????case?s1?:=?<-output1:
????????fmt.Println(s1)
????case?s2?:=?<-output2:
????????fmt.Println(s2)
????}
}
3 WaitGroup
3.1 WaitGroup的使用
WaitGroup可以用來(lái)等待一批go協(xié)程執(zhí)行結(jié)束,類(lèi)似于C++的std::join。使用示例如下:
import?(
????"fmt"
????"sync"
????"time"
)
func?process(i?int,?wg?*sync.WaitGroup)?{//waitgroup參數(shù)指針,因?yàn)橐薷膬?nèi)部的值,不能是值傳遞
????fmt.Println("started?Goroutine?",?i)
????time.Sleep(2?*?time.Second)
????fmt.Printf("Goroutine?%d?ended
",?i)
????wg.Done()//子協(xié)程結(jié)束,調(diào)用done減少計(jì)數(shù)器
}
func?main()?{
????no?:=?3
????var?wg?sync.WaitGroup?//定義waitgroup
????for?i?:=?0;?i?
3.2 實(shí)現(xiàn)一個(gè)協(xié)程池
基本思路:
創(chuàng)建一個(gè)Go協(xié)程池,監(jiān)聽(tīng)一個(gè)等待作業(yè)分配的輸入型緩沖信道
將作業(yè)添加到該輸入型緩沖信道中
作業(yè)完成后,再將結(jié)果寫(xiě)入一個(gè)輸出型緩沖信道
從輸出型緩沖信道讀取并打印結(jié)果
代碼和解析如下:
package?main
import?(??
????"fmt"
????"math/rand"
????"sync"
????"time"
)
//定義任務(wù)和結(jié)果兩個(gè)結(jié)構(gòu)體
type?Job?struct?{??
????id???????int
????randomno?int
}
type?Result?struct?{??
????job?????????Job?//包含job結(jié)構(gòu)體
????sumofdigits?int
}
//創(chuàng)建任務(wù)和結(jié)果的兩個(gè)緩沖信道
var?jobs?=?make(chan?Job,?10)??
var?results?=?make(chan?Result,?10)
//計(jì)算一個(gè)整數(shù)每一位相加的和
func?digits(number?int)?int?{??
????sum?:=?0
????no?:=?number
????for?no?!=?0?{
????????digit?:=?no?%?10
????????sum?+=?digit
????????no?/=?10
????}
????time.Sleep(2?*?time.Second)
????return?sum
}
//遍歷job信道,計(jì)算后每個(gè)job的數(shù)字并將結(jié)果寫(xiě)入reslut信道
func?worker(wg?*sync.WaitGroup)?{??
????for?job?:=?range?jobs?{
????????output?:=?Result{job,?digits(job.randomno)}
????????results?<-?output
????}
????wg.Done()
}
//初始化waitgroup,并開(kāi)啟多個(gè)協(xié)程開(kāi)始計(jì)算
func?createWorkerPool(noOfWorkers?int)?{??
????var?wg?sync.WaitGroup
????for?i?:=?0;?i?4 協(xié)程的同步手段
4.1 互斥與Mutex
Mutex用于提供一種加鎖機(jī)制(Locking Mechanism),可確保在某時(shí)刻只有一個(gè)協(xié)程在臨界區(qū)運(yùn)行,以防止出現(xiàn)競(jìng)態(tài)條件。Mutex可以在sync包內(nèi)找到。Mutex 定義了兩個(gè)方法:Lock和Unlock。所有在 Lock 和 Unlock 之間的代碼,都只能由一個(gè)Go協(xié)程執(zhí)行,于是就可以避免競(jìng)態(tài)條件。
mutex.Lock()
x?=?x?+?1??
mutex.Unlock()
使用示例:
//互斥鎖保證線(xiàn)程同步
package?main
import?(
?"fmt"
?"sync"
)
var?total?struct?{?//全局的結(jié)構(gòu)體變量
?sync.Mutex?//互斥鎖
?value??????int
}
func?worker(wg?*sync.WaitGroup)?{
?defer?wg.Done()
?for?i?:=?0;?i?<=?100;?i++?{
??total.Lock()?//加鎖
??total.value++
??total.Unlock()?//解鎖
?}
}
func?main()?{
?var?wg?sync.WaitGroup
?wg.Add(2)
?go?worker(&wg)
?go?worker(&wg)
?wg.Wait()
?fmt.Println(total.value)
}
4.2 原子操作
用互斥鎖來(lái)保護(hù)一個(gè)數(shù)值型的共享資源,麻煩且效率低下。標(biāo)準(zhǔn)庫(kù)的sync/atomic包對(duì)原子操作提供了豐富的支持:sync/atomic包對(duì)基本的數(shù)值類(lèi)型及復(fù)雜對(duì)象的讀寫(xiě)都提供了原子操作的支持。atomic.Value原子對(duì)象提供了Load和Store兩個(gè)原子方法,分別用于加載和保存數(shù)據(jù),返回值和參數(shù)都是interface{}類(lèi)型。
//原子操作實(shí)現(xiàn)線(xiàn)程同步
package?main
import?(
?"fmt"
?"sync"
?"sync/atomic"
)
var?total?uint64
func?worker(wg?*sync.WaitGroup)?{
?defer?wg.Done()
?var?i?uint64
?for?i?=?0;?i?<=?100;?i++?{
??atomic.AddUint64(&total,?1)?//原子操作,線(xiàn)程安全的
?}
}
func?main()?{
?var?wg?sync.WaitGroup
?wg.Add(2)
?go?worker(&wg)
?go?worker(&wg)
?wg.Wait()
?fmt.Println(atomic.LoadUint64(&total))?//讀取值
}
4.3 阻塞信道
上面的示例我們也可以用信道來(lái)實(shí)現(xiàn)互斥(還是推薦實(shí)際中使用Mutex),使用大小為1的緩沖信道可以導(dǎo)致可寫(xiě)阻塞,這樣其他協(xié)程就不能繼續(xù)執(zhí)行,只能等待阻塞結(jié)束。在并發(fā)編程中,對(duì)共享資源的正確訪(fǎng)問(wèn)需要精確的控制,在目前的絕大多數(shù)語(yǔ)言中,都是通過(guò)加鎖等線(xiàn)程同步方案來(lái)解決這一困難問(wèn)題,而Go語(yǔ)言卻另辟蹊徑,它將共享的值通過(guò)Channel傳遞(實(shí)際上多個(gè)獨(dú)立執(zhí)行的線(xiàn)程很少主動(dòng)共享資源)。在任意給定的時(shí)刻,最好只有一個(gè)Goroutine能夠擁有該資源。
//使用channel實(shí)現(xiàn)線(xiàn)程同步
package?main
import?(
?"fmt"
?"sync"
)
var?total?uint64
func?worker(wg?*sync.WaitGroup,?ch?chan?bool)?{
?defer?wg.Done()
?var?i?uint64
?for?i?=?0;?i?<=?100;?i++?{
??ch?<-?true?//信道被寫(xiě)入值,其他協(xié)程到這一句也想寫(xiě)入值,就會(huì)阻塞等待信道可寫(xiě)
??total++
??<-ch?//本協(xié)程讀取信道,信道空了,其他協(xié)程可以寫(xiě)入了
?}
}
func?main()?{
?ch?:=?make(chan?bool,?1)?//?創(chuàng)建大小為1的chan
?var?wg?sync.WaitGroup
?wg.Add(2)
?go?worker(&wg,?ch)
?go?worker(&wg,?ch)
?wg.Wait()
?fmt.Println(total)?//讀取值
}不僅如此,我們還可以通過(guò)設(shè)置chan的緩存大小來(lái)控制最大并發(fā)數(shù)。
5 常見(jiàn)并發(fā)模型
5.1 生產(chǎn)者消費(fèi)者模型
通過(guò)平衡生產(chǎn)線(xiàn)程和消費(fèi)線(xiàn)程的工作能力來(lái)提高程序的整體處理數(shù)據(jù)的速度。簡(jiǎn)單地說(shuō),就是生產(chǎn)者生產(chǎn)一些數(shù)據(jù),然后放到成果隊(duì)列中,同時(shí)消費(fèi)者從成果隊(duì)列中來(lái)取這些數(shù)據(jù)。這樣就讓生產(chǎn)消費(fèi)變成了異步的兩個(gè)過(guò)程。當(dāng)成果隊(duì)列中沒(méi)有數(shù)據(jù)時(shí),消費(fèi)者就進(jìn)入饑餓的等待中;而當(dāng)成果隊(duì)列中數(shù)據(jù)已滿(mǎn)時(shí),生產(chǎn)者則面臨因產(chǎn)品擠壓導(dǎo)致CPU被剝奪的下崗問(wèn)題。 Go可以使用帶緩沖區(qū)的chan作為成功隊(duì)列,由不同的協(xié)程負(fù)責(zé)接入和讀取,很簡(jiǎn)單的實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型:
package?main
import?(
?"fmt"
?"os"
?"os/signal"
?"syscall"
)
//?生產(chǎn)者:?生成?factor?整數(shù)倍的序列
func?Producer(factor?int,?out?chan<-?int)?{
?for?i?:=?0;?;?i++?{
??out?<-?i?*?factor?//往信道緩沖區(qū)內(nèi)寫(xiě)入數(shù)據(jù)
?}
}
//?消費(fèi)者
func?Consumer(in?<-chan?int)?{
?for?v?:=?range?in?{
??fmt.Println(v)?//從信道讀取數(shù)據(jù)打印
?}
}
func?main()?{
?ch?:=?make(chan?int,?64)?//?成果隊(duì)列,大小為64
?//開(kāi)啟了2個(gè)Producer生產(chǎn)流水線(xiàn),分別用于生成3和5的倍數(shù)的序列
?//然后開(kāi)啟1個(gè)Consumer消費(fèi)者線(xiàn)程,打印獲取的結(jié)果
?go?Producer(3,?ch)?//?生成?3?的倍數(shù)的序列
?go?Producer(5,?ch)?//?生成?5?的倍數(shù)的序列
?go?Consumer(ch)????//?消費(fèi)?生成的隊(duì)列
?//?Ctrl+C?退出
?sig?:=?make(chan?os.Signal,?1)
?signal.Notify(sig,?syscall.SIGINT,?syscall.SIGTERM)
?fmt.Printf("quit?(%v)
",?<-sig)
}
5.2 發(fā)布訂閱模型
發(fā)布訂閱(publish/subscribe)模型通常被簡(jiǎn)寫(xiě)為pub/sub模型。在這個(gè)模型中,消息生產(chǎn)者成為發(fā)布者(publisher),而消息消費(fèi)者則成為訂閱者(subscriber),生產(chǎn)者和消費(fèi)者是M:N的關(guān)系。在傳統(tǒng)生產(chǎn)者和消費(fèi)者模型中,是將消息發(fā)送到一個(gè)隊(duì)列中,而發(fā)布訂閱模型則是將消息發(fā)布給一個(gè)主題。在發(fā)布訂閱模型中,每條消息都會(huì)傳送給多個(gè)訂閱者。發(fā)布者通常不會(huì)知道、也不關(guān)心哪一個(gè)訂閱者正在接收主題消息。訂閱者和發(fā)布者可以在運(yùn)行時(shí)動(dòng)態(tài)添加,是一種松散的耦合關(guān)系,這使得系統(tǒng)的復(fù)雜性可以隨時(shí)間的推移而增長(zhǎng)。在現(xiàn)實(shí)生活中,像天氣預(yù)報(bào)之類(lèi)的應(yīng)用就可以應(yīng)用這個(gè)并發(fā)模式。 示例代碼如下:
//?發(fā)布訂閱模型實(shí)現(xiàn)
package?pubsub
import?(
?"sync"
?"time"
)
type?(
?subscriber?chan?interface{}?????????//?訂閱者為一個(gè)管道
?topicFunc??func(v?interface{})?bool?//?主題為一個(gè)過(guò)濾器
)
//?發(fā)布者對(duì)象
type?Publisher?struct?{
?m???????????sync.RWMutex?????????????//?讀寫(xiě)鎖,保護(hù)訂閱者map
?buffer??????int??????????????????????//?訂閱隊(duì)列的緩存大小
?timeout?????time.Duration????????????//?發(fā)布超時(shí)時(shí)間
?subscribers?map[subscriber]topicFunc?//?訂閱者信息
}
//?構(gòu)建一個(gè)發(fā)布者對(duì)象,?可以設(shè)置發(fā)布超時(shí)時(shí)間和緩存隊(duì)列的長(zhǎng)度
func?NewPublisher(publishTimeout?time.Duration,?buffer?int)?*Publisher?{
?return?&Publisher{?//返回對(duì)象指針
??buffer:??????buffer,
??timeout:?????publishTimeout,
??subscribers:?make(map[subscriber]topicFunc),?//創(chuàng)建訂閱者map
?}
}
//?添加一個(gè)新的訂閱者,訂閱全部主題
func?(p?*Publisher)?Subscribe()?chan?interface{}?{
?return?p.SubscribeTopic(nil)
}
//?添加一個(gè)新的訂閱者,訂閱過(guò)濾器篩選后的主題
func?(p?*Publisher)?SubscribeTopic(topic?topicFunc)?chan?interface{}?{
?ch?:=?make(chan?interface{},?p.buffer)
?p.m.Lock()
?p.subscribers[ch]?=?topic
?p.m.Unlock()
?return?ch
}
//?退出訂閱
func?(p?*Publisher)?Evict(sub?chan?interface{})?{
?p.m.Lock()
?defer?p.m.Unlock()?//函數(shù)退出時(shí)解鎖
?delete(p.subscribers,?sub)?//根據(jù)key刪除map中一項(xiàng)
?close(sub)?????????????????//關(guān)閉chan
}
//?發(fā)布一個(gè)主題
func?(p?*Publisher)?Publish(v?interface{})?{
?p.m.RLock()
?defer?p.m.RUnlock()
?var?wg?sync.WaitGroup
?for?sub,?topic?:=?range?p.subscribers?{
??wg.Add(1)
??go?p.sendTopic(sub,?topic,?v,?&wg)
?}
?wg.Wait()
}
//?關(guān)閉發(fā)布者對(duì)象,同時(shí)關(guān)閉所有的訂閱者管道。
func?(p?*Publisher)?Close()?{
?p.m.Lock()
?defer?p.m.Unlock()
?for?sub?:=?range?p.subscribers?{
??delete(p.subscribers,?sub)
??close(sub)
?}
}
//?發(fā)送主題,可以容忍一定的超時(shí)
func?(p?*Publisher)?sendTopic(sub?subscriber,?topic?topicFunc,?v?interface{},?wg?*sync.WaitGroup)?{
?defer?wg.Done()
?if?topic?!=?nil?&&?!topic(v)?{
??return
?}
?//監(jiān)聽(tīng)sub?chan寫(xiě)入成功或超時(shí)
?select?{
?case?sub?<-?v:
?case?<-time.After(p.timeout):
?}
}
我們可以選擇訂閱全部,或指定自定義函數(shù)只訂閱符合要求的消息,返回chan對(duì)象:
all?:=?p.Subscribe()?//添加一個(gè)訂閱者,訂閱全部消息
//添加一個(gè)訂閱者,只關(guān)系有g(shù)olang字符串的內(nèi)容
golang?:=?p.SubscribeTopic(func(v?interface{})?bool?{
?if?s,?ok?:=?v.(string);?ok?{
??return?strings.Contains(s,?"golang")
?}
?return?false
})
編輯:黃飛
?
?
?
電子發(fā)燒友App





評(píng)論