即使再仔細(xì)的檢查,仍然可能在并發(fā)上犯錯(cuò)。Go 的 runtime 提供了動(dòng)態(tài)分析工具:競(jìng)態(tài)檢測(cè)器(race detectotr)。
在下一節(jié)的示例中會(huì)用到競(jìng)態(tài)檢測(cè)器,所以在用之前,先了解一下這個(gè)工具。
簡(jiǎn)單地把 -race 命令行參數(shù)加到 go build、go run、go test 命令里即可使用該功能。它會(huì)讓編譯器為你的應(yīng)用或測(cè)試構(gòu)建一個(gè)修改后的版本,這個(gè)版本有額外的手法可以高效記錄在執(zhí)行時(shí)對(duì)共享變量的所有訪問(wèn),以及讀寫(xiě)這些變量的 goroutine 標(biāo)識(shí)。除此之外,還會(huì)記錄所有的同步事件、包括 go 語(yǔ)句、通道操作、鎖的調(diào)用等。(完整的同步事件集合可以在語(yǔ)言規(guī)范中的 “The Go Memory Model” 文檔中找到。)
競(jìng)態(tài)檢測(cè)器會(huì)研究事件流,找到那些有問(wèn)題的案例,即一個(gè) goroutine 寫(xiě)入一個(gè)變量后,中間沒(méi)有任何同步的操作,就有另外一個(gè) goroutine 讀寫(xiě)了該變量。這種情況表明有對(duì)共享變量的并發(fā)訪問(wèn),即數(shù)據(jù)競(jìng)態(tài)。工具會(huì)輸出一份報(bào)告,包括變量的標(biāo)識(shí)以及讀寫(xiě) goroutine 當(dāng)時(shí)的調(diào)用棧。通常情況下這些信息足以定位問(wèn)題了,下一章的示例會(huì)應(yīng)用到實(shí)戰(zhàn)中。
競(jìng)態(tài)檢測(cè)器報(bào)告所有實(shí)際運(yùn)行了的數(shù)據(jù)競(jìng)態(tài)。但只能檢測(cè)到那些在運(yùn)行時(shí)發(fā)生的競(jìng)態(tài),無(wú)法用來(lái)保證肯定不發(fā)生競(jìng)態(tài)。所以為了保證效果,需要全部測(cè)試包含了并發(fā)調(diào)用的場(chǎng)景。
由于存在額外的記錄工作,帶競(jìng)態(tài)檢測(cè)功能的程序在執(zhí)行時(shí)需要更長(zhǎng)的時(shí)間和更多的內(nèi)存,但即使對(duì)于生成環(huán)境的任務(wù),這種額外開(kāi)支也是可以接受的。對(duì)于那些偶發(fā)的競(jìng)態(tài)條件,使用競(jìng)態(tài)檢測(cè)器可以節(jié)省很多調(diào)試的時(shí)間。
創(chuàng)建一個(gè)并發(fā)非阻塞的緩存系統(tǒng),它能解決函數(shù)記憶(memoizing)的問(wèn)題,即緩存函數(shù)的結(jié)果,達(dá)到多次調(diào)用但只須計(jì)算一次結(jié)果。這個(gè)問(wèn)題在并發(fā)實(shí)戰(zhàn)中很常見(jiàn)但已有的庫(kù)不能很好地解決這個(gè)問(wèn)題。這里的解決方案將會(huì)是并發(fā)安全的,并且要避免簡(jiǎn)單地對(duì)整個(gè)緩存使用單個(gè)鎖而帶來(lái)的鎖爭(zhēng)奪問(wèn)題。
在做系統(tǒng)之前,先準(zhǔn)備一個(gè)將要被測(cè)試的函數(shù)。這里將使用下面的 httpGetBody 函數(shù)作為示例來(lái)演示函數(shù)記憶。調(diào)用 HTTP 請(qǐng)求相當(dāng)昂貴,所以我希望只在第一次請(qǐng)求的時(shí)候去發(fā)起請(qǐng)求,而之后都可以在緩存中找到結(jié)果直接返回:
func httpGetBody(url string) (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
先保證能緩存這個(gè)函數(shù)的執(zhí)行結(jié)果,之后再使用更多個(gè)函數(shù)來(lái)測(cè)試和驗(yàn)證功能。
這是一個(gè)并發(fā)不安全的版本,不過(guò)把基本功能先實(shí)現(xiàn),并發(fā)安全的問(wèn)題之后再進(jìn)行優(yōu)化:
// memo包提供了一個(gè)對(duì)類(lèi)型 Func 并發(fā)不安全的函數(shù)記憶功能
package memo
// Memo 緩存了調(diào)用 Func 的結(jié)果
type Memo struct {
f Func
cache map[string]result
}
// Func 是用于記憶的函數(shù)類(lèi)型
type Func func(key string) (interface{}, error)
type result struct {
value interface{}
err error
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]result)}
}
// 注意:并發(fā)不安全
func (memo *Memo) Get(key string) (interface{}, error) {
res, ok := memo.cache[key]
if !ok {
res.value, res.err = memo.f(key)
memo.cache[key] = res
}
return res.value, res.err
}
Memo 實(shí)例包含了被記憶的函數(shù) f (類(lèi)型為Func),以及緩存,類(lèi)型為一個(gè) key 為字符串,value 為 result 的 map。每個(gè) result 都是調(diào)用 f 產(chǎn)生的結(jié)果:一個(gè)值和一個(gè)錯(cuò)誤,在設(shè)計(jì)的推進(jìn)過(guò)程中會(huì)展示 Memo 的幾種變體,但所有變體都會(huì)遵守這些基本概念。
串行測(cè)試
下面的例子展示了如何使用 Memo。下面是完整的測(cè)試源碼文件,包括上一小節(jié)寫(xiě)的被測(cè)試的函數(shù),以及一串 URL。每個(gè) URL 會(huì)發(fā)起兩次請(qǐng)求。對(duì)于每個(gè) URL,首先調(diào)用 Get,打印延時(shí)和返回的數(shù)據(jù)長(zhǎng)度:
package memo
import (
"io/ioutil"
"log"
"net/http"
"sync"
"testing"
"time"
)
func httpGetBody(url string) (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
var urls = []string{
"http://docscn.studygolang.com/",
"https://studygolang.com/",
"https://studygolang.com/pkgdoc",
"https://github.com/adonovan/gopl.io/tree/master/ch9",
}
func TestSequential(t *testing.T) { // 串行
m := New(httpGetBody)
urls = append(urls, urls...) // 每個(gè) URL 請(qǐng)求兩次
for _, url := range urls {
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
}
t.Logf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
}
}
func TestConcurrent(t *testing.T) { // 并行
m := New(httpGetBody)
var n sync.WaitGroup
urls = append(urls, urls...) // 每個(gè) URL 請(qǐng)求兩次
n.Add(len(urls))
for _, url := range urls {
go func(url string) {
defer n.Done()
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
}
t.Logf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
}(url)
}
n.Wait()
}
這里使用 testing 包系統(tǒng)的測(cè)試效果。上面有兩個(gè)測(cè)試函數(shù),先只用 TestSequential 進(jìn)行測(cè)試,串行的發(fā)起請(qǐng)求。從下面的測(cè)試結(jié)果看,每一個(gè) URL 第一次調(diào)用都會(huì)消耗一定的時(shí)間,但對(duì) URL 第二次的請(qǐng)求會(huì)立刻返回結(jié)果:
PS H:\Go\src\gopl\output\memo\memo1> go test -run=TestSequential -v
=== RUN TestSequential
http://docscn.studygolang.com/, 87.1978ms, 6612 bytes
https://studygolang.com/, 203.3312ms, 81819 bytes
https://studygolang.com/pkgdoc, 33.0053ms, 1261 bytes
https://github.com/adonovan/gopl.io/tree/master/ch9, 1.4428937s, 61185 bytes
http://docscn.studygolang.com/, 0s, 6612 bytes
https://studygolang.com/, 0s, 81819 bytes
https://studygolang.com/pkgdoc, 0s, 1261 bytes
https://github.com/adonovan/gopl.io/tree/master/ch9, 0s, 61185 bytes
--- PASS: TestSequential (1.81s)
PASS
ok gopl/output/memo/memo1 2.063s
PS H:\Go\src\gopl\output\memo\memo1>
默認(rèn)在測(cè)試成功的時(shí)候不打印這類(lèi)日志,不過(guò)可以加上 -v 參數(shù)在成功時(shí)也打印測(cè)試日志。
并行測(cè)試
這次測(cè)試中所有的 Get 都是串行的。因?yàn)?HTTP 請(qǐng)求通過(guò)并發(fā)來(lái)改善的空間很大,所以這次使用 TestConcurrent 進(jìn)行測(cè)試,讓所有的請(qǐng)求并發(fā)進(jìn)行。這個(gè)測(cè)試要使用 sync.WaitGroup 等待所有的請(qǐng)求完成后再返回結(jié)果。
這次的測(cè)試結(jié)果基本上都是緩存無(wú)效的情況,不過(guò)偶爾還會(huì)出現(xiàn)無(wú)法正常運(yùn)行的情況。除了緩存無(wú)效,可能還會(huì)有緩存命中后返回錯(cuò)誤結(jié)果,甚至崩潰:
PS H:\Go\src\gopl\output\memo\memo1> go test -run=TestConcurrent -v
=== RUN TestConcurrent
http://docscn.studygolang.com/, 92.9972ms, 6612 bytes
http://docscn.studygolang.com/, 98.9889ms, 6612 bytes
https://studygolang.com/pkgdoc, 204.8383ms, 1261 bytes
https://studygolang.com/pkgdoc, 205.8387ms, 1261 bytes
https://studygolang.com/, 234.1566ms, 81819 bytes
https://studygolang.com/, 235.1749ms, 81819 bytes
https://github.com/adonovan/gopl.io/tree/master/ch9, 1.5041445s, 61184 bytes
https://github.com/adonovan/gopl.io/tree/master/ch9, 2.1051443s, 61184 bytes
--- PASS: TestConcurrent (2.11s)
PASS
ok gopl/output/memo/memo1 2.346s
PS H:\Go\src\gopl\output\memo\memo1>
加上競(jìng)態(tài)檢測(cè)器進(jìn)行并行測(cè)試
更糟糕的是,多數(shù)時(shí)候這樣都能正常運(yùn)行,所以甚至很難注意到這樣并發(fā)調(diào)用是有問(wèn)題的。但是如果加上 -race 標(biāo)志后再運(yùn)行,那么競(jìng)態(tài)檢測(cè)器就會(huì)輸出如下的報(bào)告:
PS H:\Go\src\gopl\output\memo\memo1> go test -run=TestConcurrent -v -race
=== RUN TestConcurrent
==================
WARNING: DATA RACE
Write at 0x00c000062cf0 by goroutine 11:
runtime.mapassign_faststr()
D:/Go/src/runtime/map_faststr.go:190 +0x0
gopl/output/memo/memo1.(*Memo).Get()
H:/Go/src/gopl/output/memo/memo1/memo.go:27 +0x1d8
gopl/output/memo/memo1.TestConcurrent.func1()
H:/Go/src/gopl/output/memo/memo1/memo_test.go:57 +0xc0
Previous write at 0x00c000062cf0 by goroutine 7:
runtime.mapassign_faststr()
D:/Go/src/runtime/map_faststr.go:190 +0x0
gopl/output/memo/memo1.(*Memo).Get()
H:/Go/src/gopl/output/memo/memo1/memo.go:27 +0x1d8
gopl/output/memo/memo1.TestConcurrent.func1()
H:/Go/src/gopl/output/memo/memo1/memo_test.go:57 +0xc0
...
FAIL gopl/output/memo/memo1 2.883s
這里就是因?yàn)閮蓚€(gè) goroutine 在沒(méi)使用同步的情況下更新了 Memo.cache 這個(gè) map。因?yàn)檎麄€(gè) Get 并不是并發(fā)安全的,它存在數(shù)據(jù)競(jìng)態(tài):
// 注意:并發(fā)不安全
func (memo *Memo) Get(key string) (interface{}, error) {
res, ok := memo.cache[key]
if !ok {
res.value, res.err = memo.f(key)
memo.cache[key] = res
}
return res.value, res.err
}
所以,接下來(lái)就是要改進(jìn),實(shí)現(xiàn)并發(fā)安全。
讓緩存并發(fā)安全最簡(jiǎn)單的方法就是用一個(gè)基于監(jiān)控的同步機(jī)制。需要給 Memo 加一個(gè)互斥量,并在 Get 開(kāi)始就獲取互斥鎖,在返回前釋放互斥鎖,這樣就可以讓 cache 相關(guān)的操作發(fā)生在臨界區(qū)域內(nèi)了:
// Memo 緩存了調(diào)用 Func 的結(jié)果
type Memo struct {
f Func
mu sync.Mutex // 保護(hù) cache
cache map[string]result
}
// Get 是并發(fā)安全的
func (memo *Memo) Get(key string) (interface{}, error) {
memo.mu.Lock()
res, ok := memo.cache[key]
if !ok {
res.value, res.err = memo.f(key)
memo.cache[key] = res
}
memo.mu.Unlock()
return res.value, res.err
}
加上鎖之后,再運(yùn)行并發(fā)測(cè)試函數(shù),競(jìng)態(tài)檢測(cè)器不報(bào)警了。但是這次的修改后,之前對(duì)性能的優(yōu)化就失效了。由于每次調(diào)用 Memo.f 時(shí)都上鎖,所以現(xiàn)在的 Get 方法運(yùn)行的使用實(shí)際又是串行的了。這里需要一個(gè)非阻塞的緩存,一個(gè)不會(huì)把他需要記憶的函數(shù)串行運(yùn)行的緩存。
調(diào)用 Get 是不需要鎖保護(hù)的。調(diào)用 Get 的判斷依據(jù)是之前的獲取 map 的 key,這個(gè)操作需要加鎖。調(diào)用 Get 返回后,需要把返回結(jié)果更新到 map 中去,這個(gè)操作也需要加鎖。在 map 查詢結(jié)束后,先釋放鎖。不加鎖的情況下調(diào)用 Get。等到結(jié)果返回需要更新 map 的時(shí)候,再加鎖更新 map。具體修改如下:
func (memo *Memo) Get(key string) (interface{}, error) {
memo.mu.Lock()
res, ok := memo.cache[key]
memo.mu.Unlock()
if !ok {
res.value, res.err = memo.f(key)
memo.mu.Lock()
memo.cache[key] = res
memo.mu.Unlock()
}
return res.value, res.err
}
現(xiàn)在,可以安全的并行運(yùn)行了,但是緩存又失效了。某些URL被獲取了兩次。修改一下測(cè)試源碼文件的被測(cè)試函數(shù) httpGetBody,在開(kāi)頭輸出一行日志,可以觀察到每個(gè)URL被調(diào)用的次數(shù):
func httpGetBody(url string) (interface{}, error) {
log.Printf("httpGetBody: %s", url) // 輸出哪些 url 被函數(shù)調(diào)用了,從緩存獲取結(jié)果時(shí)不會(huì)有這個(gè)輸出
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
修改之后,可以用最初的串行版本再測(cè)試一下。那個(gè)版本是一定用到緩存的效果的。而現(xiàn)在的版本,在并發(fā)的情況下無(wú)法用上緩存。
在幾個(gè) goroutine 幾乎同時(shí)調(diào)用的 Get 來(lái)獲取同一個(gè) URL 時(shí),每個(gè) goroutine 都首先查詢緩存,發(fā)現(xiàn)緩存中沒(méi)有需要的數(shù)據(jù),然后就都去執(zhí)行 Get 來(lái)獲取結(jié)果,最后又都用獲得的結(jié)果來(lái)更新 map,其中一個(gè)結(jié)果會(huì)被另外一個(gè)覆蓋。
在理想的情況下,應(yīng)該要避免這種額外的處理。這個(gè)功能有時(shí)稱為重復(fù)抑制(duplicate suppression)。
下面這個(gè)版本,map 的每個(gè)元素是一個(gè)指向 entry 結(jié)構(gòu)的指針。除了與之前一樣包含一個(gè)已經(jīng)記住的函數(shù) f 調(diào)用結(jié)果之外,每個(gè) entry 還新加一個(gè)通道 ready。在設(shè)置了 entry 和 result 字段后,通道會(huì)關(guān)閉,正在等待的 goroutine 會(huì)收到廣播,然后就可以從 entry 字段讀取結(jié)果了:
// memo包提供了一個(gè)對(duì)類(lèi)型 Func 并發(fā)安全的函數(shù)記憶功能
// 并發(fā)、重復(fù)抑制、非阻塞的緩存
package memo
import "sync"
// Func 是用于記憶的函數(shù)類(lèi)型
type Func func(key string) (interface{}, error)
type result struct {
value interface{}
err error
}
type entry struct {
res result
ready chan struct{} // res 準(zhǔn)備好之后會(huì)被關(guān)閉
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]*entry)}
}
type Memo struct {
f Func
mu sync.Mutex // 保護(hù) cache
cache map[string]*entry
}
// Get 是并發(fā)安全的
func (memo *Memo) Get(key string) (interface{}, error) {
memo.mu.Lock()
e := memo.cache[key]
if e == nil {
// 對(duì) key 的第一次訪問(wèn),這個(gè) goroutine 負(fù)責(zé)獲取數(shù)據(jù)和廣播數(shù)據(jù)準(zhǔn)備好了的消息
e = &entry{ready: make(chan struct{})}
memo.cache[key] = e
memo.mu.Unlock()
e.res.value, e.res.err = memo.f(key)
close(e.ready) // 廣播數(shù)據(jù)已經(jīng)準(zhǔn)備好的消息
} else {
// 對(duì)這個(gè) key 的重復(fù)訪問(wèn)
memo.mu.Unlock()
<-e.ready // 等待數(shù)據(jù)準(zhǔn)備完畢
}
return e.res.value, e.res.err
}
關(guān)于這里的 map 是否包含某個(gè)元素的判斷,之前都是返回兩個(gè)值,通過(guò)ok來(lái)判斷。之前的示例中,map的元素是結(jié)構(gòu)體,由于結(jié)構(gòu)體類(lèi)型的零值不是nil,通過(guò)ok來(lái)判斷比較好。這里的元素類(lèi)型是結(jié)構(gòu)體指針,當(dāng)然可以繼續(xù)使用ok來(lái)判斷。不過(guò)現(xiàn)在是指針類(lèi)型了,零值是nil也不會(huì)和非零值的情況搞混,所以也可以直接通過(guò)nil來(lái)判斷。
現(xiàn)在調(diào)用 Get 會(huì)獲取鎖,然后去 map 中查詢,如果沒(méi)有找到,就直接分配并插入一個(gè)新的值,然后釋放鎖。之后其他 goroutine 來(lái)查詢的時(shí)候,會(huì)發(fā)現(xiàn)值存在,那么就直接獲取到 map 的值,然后釋放鎖。
map 里的值并不是 Get 返回的數(shù)據(jù),而是數(shù)據(jù)是否準(zhǔn)備好的通道,和存放數(shù)據(jù)的字段。此時(shí)數(shù)據(jù)可能還沒(méi)準(zhǔn)備好,數(shù)據(jù)是否準(zhǔn)備好,可以從 ready 通道進(jìn)行判斷。對(duì) ready 通道的讀取操作,會(huì)在數(shù)據(jù)沒(méi)有準(zhǔn)備好的時(shí)候一直阻塞。一旦數(shù)據(jù)準(zhǔn)備好了,就會(huì)關(guān)閉 ready 通道,所有從 ready 通道的讀取操作就會(huì)立刻返回。這是利用通道進(jìn)行廣播的方式。所以查詢 map 后獲取值的步驟就是先讀取 ready 通道等待,一旦通道的讀取返回,就表示數(shù)據(jù)已經(jīng)準(zhǔn)備好了,此時(shí)就可以去讀取字段 res 里的內(nèi)容并返回。
注意,entry 中的變量 e.res.value 和 e.res.err 被多個(gè) goroutine 共享。創(chuàng)建 entry 的 goroutine 會(huì)對(duì)這兩個(gè)變量的值進(jìn)行設(shè)置,其他 goroutine 在收到數(shù)據(jù)準(zhǔn)備完畢的廣播后才會(huì)開(kāi)始讀取這兩個(gè)變量。盡管被多個(gè) goroutine 訪問(wèn),但是此處不需要加鎖。ready 通道的關(guān)閉先于其他 goroutine 收到廣播事件,所以第一個(gè) goroutine 對(duì)變量的寫(xiě)入也先于后續(xù)多個(gè) goroutine 的讀取事件。這種情況下數(shù)據(jù)競(jìng)態(tài)不存在。
到此,并發(fā)、重復(fù)抑制、非阻塞緩存就完成了。
上面的示例是使用一個(gè)互斥量來(lái)保護(hù) map 變量的并發(fā)安全。下面是另一種設(shè)計(jì),讓 map 變量限制在一個(gè)監(jiān)控 goroutine 中。
首先是類(lèi)型聲明,New 函數(shù)在創(chuàng)建實(shí)例并返回的同時(shí),還會(huì)啟動(dòng)一個(gè) server 方法。該方法會(huì)集中處理所有的 Get 調(diào)用。我們?cè)讷@取實(shí)例后,依然是調(diào)用 Get 來(lái)獲取結(jié)果:
// memo包提供了一個(gè)對(duì)類(lèi)型 Func 并發(fā)安全的函數(shù)記憶功能
// 并發(fā)、重復(fù)抑制、非阻塞的緩存
// 通過(guò)監(jiān)控 goroutine 來(lái)實(shí)現(xiàn)并發(fā)安全
package memo
// Func 是用于記憶的函數(shù)類(lèi)型
type Func func(key string) (interface{}, error)
type result struct {
value interface{}
err error
}
type entry struct {
res result
ready chan struct{} // res 準(zhǔn)備好之后會(huì)被關(guān)閉
}
// Func、result、entry 的聲明和之前一致
// request 是一條請(qǐng)求消息
type request struct {
key string // 需要 Func 運(yùn)行的參數(shù)
response chan<- result // 每個(gè)客戶端接收結(jié)果的通道
}
type Memo struct{ requests chan request }
func New(f Func) *Memo {
memo := &Memo{requests: make(chan request)} // 創(chuàng)建實(shí)例
go memo.server(f) // 啟動(dòng)服務(wù)端 goroutine
return memo // 返回實(shí)例,供客戶端調(diào)用
}
可以先往后看客戶端和服務(wù)端的處理邏輯,在回過(guò)來(lái)看這里聲明的數(shù)據(jù)類(lèi)型已經(jīng)通道的作用。
客戶端
現(xiàn)在 Get 就需要要給監(jiān)控 goroutine 的通道發(fā)送請(qǐng)求和一個(gè)接收返回結(jié)果的通道。服務(wù)端會(huì)在收到處理請(qǐng)求后進(jìn)行處理,之后再通過(guò)客戶端發(fā)來(lái)的通道返回結(jié)果。而客戶端發(fā)送請(qǐng)求之后,只需要從自己創(chuàng)建的這個(gè)通道中接收,直到接收到數(shù)據(jù)后,再返回即可:
func (memo *Memo) Get(key string) (interface{}, error) {
response := make(chan result)
memo.requests <- request{key, response}
res := <- response
return res.value, res.err
}
func (memo *Memo) Close() { close(memo.requests) }
客戶端使用完之后,可以調(diào)用 Close 方法關(guān)閉發(fā)送請(qǐng)求的通道。
服務(wù)端
上面的 Get 相當(dāng)于一個(gè)客戶端,還需要一個(gè)服務(wù)端來(lái)處理 Get 發(fā)來(lái)的請(qǐng)求:
func (memo *Memo) server(f Func) {
cache := make(map[string]*entry)
for req := range memo.requests { // 一次處理收到的請(qǐng)求
e := cache[req.key]
if e == nil {
// 對(duì)這個(gè) key 的第一次請(qǐng)求
e = &entry{ready: make(chan struct{})}
cache[req.key] = e
go e.call(f, req.key) // 調(diào)用 f(key)
}
// 無(wú)論是否第一次請(qǐng)求,最后要回復(fù)結(jié)果,都有等待 ready 通道返回后,再去讀取結(jié)果
go e.deliver(req.response)
}
}
func (e *entry) call(f Func, key string) {
// 執(zhí)行函數(shù)
e.res.value, e.res.err = f(key)
// 發(fā)送廣播通知,數(shù)據(jù)已經(jīng)準(zhǔn)備好了
close(e.ready)
}
func (e *entry) deliver(response chan<- result) {
// 等待數(shù)據(jù)準(zhǔn)備完畢
<-e.ready
// 向客戶端發(fā)送結(jié)果
response <- e.res
}
變量 cache 被限制在監(jiān)控 goroutine 中,就是上面的 server 方法。監(jiān)控 goroutine 從 requests 的通道中讀取請(qǐng)求,直到這個(gè)通道被關(guān)閉。對(duì)于每個(gè)請(qǐng)求,先查詢緩存,如果沒(méi)有找到就插入一個(gè)新的 entry。
這里 call 和 deliver 方法需要在獨(dú)立的 goroutine 中運(yùn)行,以確保監(jiān)控 goroutine 內(nèi)持續(xù)處理新請(qǐng)求。
完整示例代碼
下面貼上這個(gè)實(shí)現(xiàn)方式的完整代碼:
// memo包提供了一個(gè)對(duì)類(lèi)型 Func 并發(fā)安全的函數(shù)記憶功能
// 并發(fā)、重復(fù)抑制、非阻塞的緩存
// 通過(guò)監(jiān)控 goroutine 來(lái)實(shí)現(xiàn)并發(fā)安全
package memo
// Func 是用于記憶的函數(shù)類(lèi)型
type Func func(key string) (interface{}, error)
type result struct {
value interface{}
err error
}
type entry struct {
res result
ready chan struct{} // res 準(zhǔn)備好之后會(huì)被關(guān)閉
}
// Func、result、entry 的聲明和之前一致
// request 是一條請(qǐng)求消息
type request struct {
key string // 需要 Func 運(yùn)行的參數(shù)
response chan<- result // 每個(gè)客戶端接收結(jié)果的通道
}
type Memo struct{ requests chan request }
func New(f Func) *Memo {
memo := &Memo{requests: make(chan request)} // 創(chuàng)建實(shí)例
go memo.server(f) // 啟動(dòng)服務(wù)端 goroutine
return memo // 返回實(shí)例,供客戶端調(diào)用
}
func (memo *Memo) Close() { close(memo.requests) }
func (memo *Memo) Get(key string) (interface{}, error) {
response := make(chan result)
memo.requests <- request{key, response}
res := <-response
return res.value, res.err
}
func (memo *Memo) server(f Func) {
cache := make(map[string]*entry)
for req := range memo.requests { // 一次處理收到的請(qǐng)求
e := cache[req.key]
if e == nil {
// 對(duì)這個(gè) key 的第一次請(qǐng)求
e = &entry{ready: make(chan struct{})}
cache[req.key] = e
go e.call(f, req.key) // 調(diào)用 f(key)
}
// 無(wú)論是否第一次請(qǐng)求,最后要回復(fù)結(jié)果,都有等待 ready 通道返回后,再去讀取結(jié)果
go e.deliver(req.response)
}
}
func (e *entry) call(f Func, key string) {
// 執(zhí)行函數(shù)
e.res.value, e.res.err = f(key)
// 發(fā)送廣播通知,數(shù)據(jù)已經(jīng)準(zhǔn)備好了
close(e.ready)
}
func (e *entry) deliver(response chan<- result) {
// 等待數(shù)據(jù)準(zhǔn)備完畢
<-e.ready
// 向客戶端發(fā)送結(jié)果
response <- e.res
}
針對(duì)上面的 memo 包的測(cè)試代碼:
package memo
import (
"io/ioutil"
"log"
"net/http"
"sync"
"testing"
"time"
)
func httpGetBody(url string) (interface{}, error) {
log.Printf("httpGetBody: %s", url)
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
var urls = []string{ // 換一批慢一些的網(wǎng)站,加載時(shí)間1s左右的國(guó)外資源
"https://github.com/adonovan/gopl.io/tree/master/ch9",
"https://www.djangoproject.com/",
"https://getbootstrap.com/",
"https://www.python.org/",
}
func TestSequential(t *testing.T) { // 串行
m := New(httpGetBody)
defer m.Close()
urls = append(urls, urls...) // 每個(gè) URL 請(qǐng)求兩次
for _, url := range urls {
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
}
t.Logf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
}
}
func TestConcurrent(t *testing.T) { // 并行
m := New(httpGetBody)
defer m.Close()
var n sync.WaitGroup
urls = append(urls, urls...) // 每個(gè) URL 請(qǐng)求兩次
n.Add(len(urls))
for _, url := range urls {
go func(url string) {
defer n.Done()
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
}
t.Logf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
}(url)
}
n.Wait()
}
這里的例子展示了可以使用兩種方案來(lái)構(gòu)建并發(fā)結(jié)構(gòu):
第一種是大家普遍認(rèn)知的,也是Java或者C++等語(yǔ)言中的多線程開(kāi)發(fā)。
第二種是 Go 語(yǔ)言特有的,也是 Go 語(yǔ)言推薦的。下面是一句推薦的原話:
Do not communicate by sharing memory; instead, share memory by communicating.
Go 箴言:“不要通過(guò)共享內(nèi)存來(lái)通信,而應(yīng)該通過(guò)通信來(lái)共享內(nèi)存”。
在給定的情況下也許很難判定哪種方案更好,不過(guò)了解他們還是有價(jià)值的。有時(shí)候從一種方案切換到另外一種方案能讓代碼更簡(jiǎn)單。
CSP并發(fā)模型
CSP 是 Communicating Sequential Process 的簡(jiǎn)稱,中文可以叫做通信順序進(jìn)程,是一種并發(fā)編程模型。
CSP 模型由并發(fā)執(zhí)行的實(shí)體(線程或者進(jìn)程)所組成,實(shí)體之間通過(guò)發(fā)送消息進(jìn)行通信,這里發(fā)送消息時(shí)使用的就是通道(channel)。CSP 模型的關(guān)鍵是關(guān)注 channel,而不關(guān)注發(fā)送消息的實(shí)體。Go 語(yǔ)言就是借用 CSP 模型的一些概念為之實(shí)現(xiàn)并發(fā)進(jìn)行理論支持。Go 語(yǔ)言并沒(méi)有完全實(shí)現(xiàn) CSP 模型的所有理論,僅僅是借用了 process 和 channel 這兩個(gè)概念。process 在 Go 語(yǔ)言上的表現(xiàn)就是 goroutine 是實(shí)際并發(fā)執(zhí)行的實(shí)體,每個(gè)實(shí)體之間通過(guò) channel 通訊來(lái)實(shí)現(xiàn)數(shù)據(jù)共享。
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無(wú)理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國(guó)服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡(jiǎn)單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專(zhuān)為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。
分享名稱:Go并發(fā)非阻塞緩存-創(chuàng)新互聯(lián)
URL分享:http://m.rwnh.cn/article6/ehdog.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供用戶體驗(yàn)、域名注冊(cè)、響應(yīng)式網(wǎng)站、云服務(wù)器、面包屑導(dǎo)航、靜態(tài)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容