国产亚洲精品福利在线无卡一,国产精久久一区二区三区,亚洲精品无码国模,精品久久久久久无码专区不卡

當(dāng)前位置: 首頁 > news >正文

網(wǎng)站查詢域名訪問網(wǎng)絡(luò)營銷渠道建設(shè)方案

網(wǎng)站查詢域名訪問,網(wǎng)絡(luò)營銷渠道建設(shè)方案,昆山專業(yè)網(wǎng)站建設(shè)公司哪家好,介紹個人網(wǎng)站的ppt怎么做Go語言從入門到實戰(zhàn) — 并發(fā)篇 協(xié)程 Thread vs Groutine 相比之下,協(xié)程的棧大小就小很多了,創(chuàng)建起來也會更快,也更節(jié)省系統(tǒng)資源。 一個 goroutine 的棧,和操作系統(tǒng)線程一樣,會保存其活躍或掛起的函數(shù)調(diào)用的本地變量…

Go語言從入門到實戰(zhàn) — 并發(fā)篇

協(xié)程

Thread vs Groutine

image-20230505105233850

相比之下,協(xié)程的棧大小就小很多了,創(chuàng)建起來也會更快,也更節(jié)省系統(tǒng)資源。

一個 goroutine 的棧,和操作系統(tǒng)線程一樣,會保存其活躍或掛起的函數(shù)調(diào)用的本地變量,但是和OS線程不太一樣的是,一個 goroutine 的棧大小并不是固定的;棧的大小會根據(jù)需要動態(tài)地伸縮,初始化大小為 2KB。而 goroutine 的棧的最大值有1GB,比傳統(tǒng)的固定大小的線程棧要大得多,盡管一般情況下,大多 goroutine 都不需要這么大的棧。

image-20230505105306798

那么這種多對多的對應(yīng)關(guān)系對我們程序有什么意義呢?

如果是1:1,那么我們的線程(Thread)是由我們的內(nèi)核實體直接進行調(diào)度,這種方式,它的調(diào)度效率非常高,但是這里有一個問題,如果線程之間發(fā)生上下文切換時,會牽扯到內(nèi)核對象的相互切換,這將是一個消耗非常大的事。

相對來說如果是多個協(xié)程由同一個內(nèi)核實體來調(diào)度,那么協(xié)程之間的切換不涉及內(nèi)核對象間的切換,在內(nèi)部就能完成,它們之間的切換就會小非常多了,Go 也就是主打這個方面。

image-20230505105449883

Go中的調(diào)度機制

Go 的協(xié)程處理器P(Processor)是掛在系統(tǒng)線程M(System thread)下面的,協(xié)程處理器P下面又掛有準(zhǔn)備運行的協(xié)程隊列(Goroutine), 每個協(xié)程隊列中每次有一個協(xié)程G是是正在運行中的。

如果正在運行的協(xié)程執(zhí)行時間特別長,會不會堵塞住協(xié)程隊列呢?

Go 的處理機制是這樣的,Go 在運行協(xié)程時,會啟動一個守護線程去計數(shù),計每個 Processor 完成的協(xié)程數(shù)量,當(dāng)它發(fā)現(xiàn)一段時間后某個 Processor 完成的協(xié)程數(shù)量沒有任何變化后,它就會往協(xié)程的任務(wù)棧里面插入一個特殊的標(biāo)記,當(dāng)協(xié)程運行遇到非內(nèi)聯(lián)函數(shù)時就會讀到這個標(biāo)記,就會把自己中斷下來,插入到協(xié)程隊列的隊尾,然后切換到下一個協(xié)程繼續(xù)運行。

另一個并發(fā)機制是這樣的,當(dāng)某個協(xié)程被系統(tǒng)中斷了,例如 I/O 需要等待的時候,為了提高整體的并發(fā),Processor 會把自己移動到另一個可使用的系統(tǒng)線程當(dāng)中,繼續(xù)執(zhí)行它所掛的協(xié)程隊列里面其它的協(xié)程。當(dāng)上次中斷的協(xié)程又被重新喚醒后,它會把自己加入其中一個 Processor 的等待隊列,或者全局等待隊列中。協(xié)程中斷期間,它在寄存器中的運行狀態(tài),會保存在協(xié)程對象里,當(dāng)協(xié)程再次有運行機會的時候,這些數(shù)據(jù)又會重新寫入寄存器,然后繼續(xù)運行。

大致我們可以知道這種協(xié)程機制與系統(tǒng)線程是多對多的關(guān)系,以及它是如何高效的利用系統(tǒng)線程,盡量多的運行并發(fā)的協(xié)程任務(wù)。

第一種:channel 阻塞或網(wǎng)絡(luò) I/O 情況下的調(diào)度

如果 G 被阻塞在某個 channel 操作或網(wǎng)絡(luò) I/O 操作上時,G 會被放置到某個等待(wait)隊列中,而 M 會嘗試運行 P 的下一個可運行的 G。如果這個時候 P 沒有可運行的 G 供 M 運行,那么 M 將解綁 P,M 進入掛起狀態(tài)。當(dāng) I/O 操作完成或 channel 操作完成,在等待隊列中的 G 會被喚醒,標(biāo)記為可運行(runnable),并被放入到某 P 的隊列中,綁定一個 M 后繼續(xù)執(zhí)行。

第二種:系統(tǒng)調(diào)用阻塞情況下的調(diào)度

如果 G 被阻塞在某個系統(tǒng)調(diào)用(system call)上,那么不光 G 會阻塞,執(zhí)行這個 G 的 M 也會解綁 P,M 與 G 一起進入掛起狀態(tài)。如果此時有空閑的 M,那么 P 就會和它綁定,并繼續(xù)執(zhí)行其他 G;如果沒有空閑的 M,但仍然有其他 G 要去執(zhí)行,那么 Go 運行時就會創(chuàng)建一個新 M(線程)。

當(dāng)系統(tǒng)調(diào)用返回后,阻塞在這個系統(tǒng)調(diào)用上的 G 會嘗試獲取一個可用的 P,如果沒有可用的 P,那么 G 會被標(biāo)記為 runnable(如果一直沒有可用的 P,經(jīng)過一定輪次后 G 會被放入到全局的 P 中),之前的那個掛起的 M 將再次進入掛起狀態(tài)(M 經(jīng)過一段時間后會進入空閑列表,重新獲取可用的 P)。

image-20230505105523193

詳細(xì)分析可參考該文:Go 協(xié)程(goroutine)調(diào)度原理

Go協(xié)程的使用

Go協(xié)程的使用很簡單,只需要在方法前面加一個 go 關(guān)鍵字即可。

// Go 協(xié)程的使用
func TestGoroutine(t *testing.T) {for i := 0; i < 5; i++ {// 加到匿名函數(shù)前go func(i int) {fmt.Println(i)}(i)}time.Sleep(time.Millisecond * 50) // 讓上面的程序先全部執(zhí)行完
}

image-20230507193018030

運行結(jié)果跟java創(chuàng)建多線程類似,協(xié)程被調(diào)用的順序并不是按照方法的順序來調(diào)度的。

共享內(nèi)存并發(fā)機制

Lock

image-20230507193728581

非線程安全

func TestCounter(t *testing.T) {counter := 0for i := 0; i < 5000; i++ {go func() {counter++ // 創(chuàng)建5000個協(xié)程,對counter自增了5000次 預(yù)期值為5000}()}time.Sleep(1 * time.Second) // 使上面的程序先執(zhí)行完t.Logf("counter = %d", counter)
}

image-20230507194513259

并沒有達到我們5000的預(yù)期,這是因為我們使用的counter在不同的協(xié)程之間競爭,導(dǎo)致出現(xiàn)了并發(fā)競爭,也就是非線程安全的程序,出現(xiàn)了無效的寫操作,如果我們要保證它的線程安全,就需要對這塊共享內(nèi)存加鎖。

線程安全 sync.Mutex

func TestCounterSafe(t *testing.T) {var mut sync.Mutexcounter := 0for i := 0; i < 5000; i++ {go func() {// 鎖的釋放我們一般要寫在defer中,類似java的finally。defer func() {mut.Unlock() // 在這個協(xié)程執(zhí)行完的最后釋放鎖}()mut.Lock() // 加鎖counter++}()}time.Sleep(1 * time.Second) // 使上面的程序先執(zhí)行完t.Logf("counter = %d", counter)
}

image-20230507195002194

達到了我們5000的預(yù)期。

WaitGroup

同步各個線程的方法,相當(dāng)于java中的 join、CountDownLatch。

只有我 wait 的所有內(nèi)容都完成后,程序才可以繼續(xù)向下執(zhí)行。

func TestCounterWaitGroup(t *testing.T) {var mut sync.Mutexvar wg sync.WaitGroupcounter := 0for i := 0; i < 5000; i++ {wg.Add(1) // 每啟動1個協(xié)程,WaitGroup的數(shù)量就+1go func() {// 鎖的釋放我們一般要寫在defer中,類似java的finally。defer func() {mut.Unlock() // 在這個協(xié)程執(zhí)行完的最后釋放鎖}()mut.Lock() // 加鎖counter++wg.Done() // 每執(zhí)行完1個協(xié)程,WaitGroup的數(shù)量就-1}()}wg.Wait() // 如果WaitGroup中的數(shù)量不為0則一直等待t.Logf("counter = %d", counter)
}

image-20230507200751199

那么 WaitGroup 為什么更好呢,可以看一下最后的執(zhí)行時間,如果采用 time.Sleep(),因為我們并不知道5000個協(xié)程要執(zhí)行多久,這個時間不好把控,我們?yōu)榱说玫秸_的結(jié)果,人為預(yù)估了 1 秒,但是實際上只需要 0.00 秒就能執(zhí)行完畢,故用 WaitGroup 即能防止錯誤的預(yù)估協(xié)程的執(zhí)行時間,又能保證線程安全,是上上之選。

RWLock讀寫鎖

它把讀鎖和寫鎖進行了分離,讀不互質(zhì),寫互斥,比 Mutex 完全互斥的效率高一些,更建議使用讀寫鎖。

CSP并發(fā)機制

CSP(Communicating Sequential Processes)通信順序進程,是一種消息傳遞模型,通過通道 channel 在 Goroutine 之間傳遞數(shù)據(jù)來傳遞消息,而不是對數(shù)據(jù)加鎖來實現(xiàn)對數(shù)據(jù)的同步訪問。

CSP VS Actor

Actor Model

image-20230510142119686

  • Actor 的機制是直接進行通訊,CSP 模式則是通過 channel 進行通訊的,更松耦合一些。
  • Actor、Erlang 是通過 mailbox 來進行消息存儲的,mailbox 的容量是無限的,Go 的 channel 是有容量限制的。
  • Actor、Erlang 的接收進程總是被動地處理消息,Go 的協(xié)程會主動去處理從 channel 里面?zhèn)鬟^來的消息。

image-20230510143143131

Channel

典型消息傳輸機制

進行通訊的發(fā)送方和接收方必須同時在 channel 上才能完成這次交互,任何一方不在都會導(dǎo)致另一方阻塞等待。

image-20230510143410224

buffer channel機制

這種機制下,消息的發(fā)送方和接收方是更加松耦合的一種機制,我們可以給 channel 設(shè)定一個容量,只要在這個容量還沒有滿的情況下,放消息的人都是可以把消息放進去的,如果容量滿了,則需要阻塞等待了,等到收消息的人拿走一個消息,放消息的人才能繼續(xù)往里面放。同理,對收消息的人來說呢,只要這個 channel 里面有消息,就可以一直拿到,直到 channel 里面一個消息都沒有了,就會阻塞等待,直到有新的消息進來。

image-20230510143421732

異步返回

當(dāng)我們調(diào)用一個任務(wù),并不需要馬上拿到它的返回結(jié)果,可以先去執(zhí)行其它的邏輯,直到我們需要這個結(jié)果的時候,在去 get 這個結(jié)果。這將大大減少程序的整體運行時間,提升程序的效率。如果我們 get 這個任務(wù)的結(jié)果時,任務(wù)的結(jié)果還沒有出來,就會堵塞在那里,直到拿到結(jié)果為止。

Java代碼

image-20230510143647616

同步(串行執(zhí)行)

func service() string {time.Sleep(time.Millisecond * 50)return "service執(zhí)行完成"
}func otherTask() {fmt.Println("otherTask的各種執(zhí)行邏輯代碼")time.Sleep(time.Millisecond * 100)fmt.Println("otherTask執(zhí)行完成")
}// 測試同步執(zhí)行效果, 先調(diào)用 service() 方法,在調(diào)用 otherTask() 方法,
// 理論上最后程序的執(zhí)行時間為二者相加。
func TestService(t *testing.T) {fmt.Println(service())otherTask()
}

image-20230510144123221

0.15s,符合預(yù)期。

典型 channel 異步返回

func service() string {time.Sleep(time.Millisecond * 50)return "service執(zhí)行完成"
}func otherTask() {fmt.Println("otherTask的各種執(zhí)行邏輯代碼")time.Sleep(time.Millisecond * 100)fmt.Println("otherTask執(zhí)行完成")
}func syncService() chan string {// 聲明一個channel,數(shù)據(jù)只能存放 string 類型resCh := make(chan string)// 創(chuàng)建一個協(xié)程去執(zhí)行service任務(wù)go func() {ret := service()fmt.Println("service 結(jié)果已返回")// 因為不是用的 buffer channel,所以,協(xié)程會被阻塞在這一步的消息傳遞過程中,// 只有接受者拿到了 channel 中的消息,channel 放完消息后面的邏輯才會被執(zhí)行。resCh <- ret // 存數(shù)據(jù),從 channel 里面存放數(shù)據(jù)都用這個 “<-” 符號fmt.Println("channel 放完消息后面的邏輯")}()return resCh
}// 異步返回執(zhí)行結(jié)果,先調(diào)用 SyncService(),把它放入channel,用協(xié)程去執(zhí)行,
// 然后主程序繼續(xù)執(zhí)行 otherTask(),最后把 SyncService() 的返回結(jié)果從 channel 里面取出來。
func TestSyncService(t *testing.T) {resCh := syncService()otherTask()fmt.Println(<-resCh) // 取數(shù)據(jù),從 channel 里面存放數(shù)據(jù)都用這個 “<-” 符號
}

image-20230510145307325

優(yōu)化到 0.1s,說明 otherTask() 執(zhí)行 0.1 秒,service() 因為只需要 0.05 秒,所以就提前執(zhí)行完了,只需要在需要的地方取出結(jié)果就行,極大的減少了程序的整體執(zhí)行時間。

  • 從 channel 里面存放數(shù)據(jù)都用這個 “<-” 符號
  • 聲明 channel:make(chan string)

buffer channel 異步返回

我們會發(fā)現(xiàn)上面這中機制仍然有一個小問題,那就是 service() 執(zhí)行完畢后,往 channel 里面放數(shù)據(jù),此時協(xié)程就阻塞在這里了,需要等到接收者拿到消息,協(xié)程才會繼續(xù)往下走,我們可不可以讓協(xié)程不阻塞呢?當(dāng)service() 執(zhí)行完畢后,我們將消息放入 channel 中,然后繼續(xù)執(zhí)行其它的邏輯。答案是可以的,此時我們的 buffer channel 就派上用場了。

func service() string {time.Sleep(time.Millisecond * 50)return "service執(zhí)行完成"
}func otherTask() {fmt.Println("otherTask的各種執(zhí)行邏輯代碼")time.Sleep(time.Millisecond * 100)fmt.Println("otherTask執(zhí)行完成")
}// 異步執(zhí)行 service(), 并將結(jié)果放入 buffer channel
func syncServiceBufferChannel() chan string {// 聲明一個 channel,數(shù)據(jù)只能存放 string 類型// 后面的數(shù)字表示 buffer 的容量resCh := make(chan string, 1)go func() {ret := service()fmt.Println("service 結(jié)果已返回")// 此時使用的是 buffer channel,所以只要 service() 結(jié)果返回了,buffer容量未滿// channel放完消息后面的邏輯就會被執(zhí)行,不會被阻塞。resCh <- ret // 存數(shù)據(jù),從 channel 里面存放數(shù)據(jù)都用這個 “<-” 符號fmt.Println("channel 放完消息后面的邏輯")}()return resCh
}// 異步返回執(zhí)行結(jié)果,先調(diào)用 SyncService(),把它放入 buffer channel,用協(xié)程去執(zhí)行,
// 此時協(xié)程不會被阻塞,然后主程序繼續(xù)執(zhí)行 otherTask(),
// 最后把 TestSyncServiceBufferChannel() 的返回結(jié)果從 channel 里面取出來。
func TestSyncServiceBufferChannel(t *testing.T) {resCh := syncServiceBufferChannel()otherTask()fmt.Println(<-resCh) // 取數(shù)據(jù),從 channel 里面存放數(shù)據(jù)都用這個 “<-” 符號
}

image-20230510151432831

我們會發(fā)現(xiàn)采用了 buffer channel 后,當(dāng) service() 的返回結(jié)果放入 buffer channel 后,協(xié)程并沒有阻塞,而是繼續(xù)執(zhí)行了 “channel 放完消息后面的邏輯”,其它的結(jié)果和典型 channel 一致。

時間雖然同樣也是 0.1s,但我們要知道如果任務(wù)非常多且執(zhí)行的時間較長,則優(yōu)化肯定是非常明顯的。

多路選擇和超時控制

select多路選擇機制

select 的語法和 switch 的語法很類似,它的執(zhí)行順序并不一定是按照我們代碼的前后關(guān)系來決定的,而是滿足哪個 case ,就執(zhí)行這個 case 的結(jié)果。如果所有的 channel 都處于阻塞中,則走 default。

select {
// 從 channel 上等待一個消息
case ret := <-retCh1:t.Logf("result:%s", ret)
// 從另一個 channel 上等待一個消息
case ret := <-retCh2:t.Logf("result:%s", ret)
// 如果所有的 channel 都處于阻塞中,則走 default
default:t.Error("No more returned")
}

超時控制

利用 select 的多路選擇機制,我們可以實現(xiàn)一個超時機制,例如當(dāng)某個 channel 多久后還沒有消息返回,我們就返回超時。

select {
case ret := <-retCh1:t.Logf("result:%s", ret)
case ret := <-time.After(time.Second * 5):t.Error("time out")
}

time.After() 是在一段時間后, 它特定的 channel 會返回一個消息,當(dāng)沒有達到設(shè)定的時間,這個 case 會被阻塞在這,當(dāng)超過了我們設(shè)定的 duration 后,這個 case 就能從 channel 里面拿到一個消息,這樣就可以用來做超時控制。

func service() string {time.Sleep(time.Millisecond * 50)return "service執(zhí)行完成"
}func otherTask() {fmt.Println("otherTask的各種執(zhí)行邏輯代碼")time.Sleep(time.Millisecond * 100)fmt.Println("otherTask執(zhí)行完成")
}func syncService() chan string {// 聲明一個channel,數(shù)據(jù)只能存放 string 類型resCh := make(chan string)// 創(chuàng)建一個協(xié)程去執(zhí)行service任務(wù)go func() {ret := service()fmt.Println("service 結(jié)果已返回")// 因為不是用的 buffer channel,所以,協(xié)程會被阻塞在這一步的消息傳遞過程中,// 只有接受者拿到了 channel 中的消息,channel 放完消息后面的邏輯才會被執(zhí)行。resCh <- ret // 存數(shù)據(jù),從 channel 里面存放數(shù)據(jù)都用這個 “<-” 符號fmt.Println("channel 放完消息后面的邏輯")}()return resCh
}// 異步返回執(zhí)行結(jié)果,先調(diào)用 SyncService(), 把它放入channel,用協(xié)程去執(zhí)行,
// 然后主程序繼續(xù)執(zhí)行 otherTask(),最后把 SyncService() 的返回結(jié)果 從 channel 里面取出來。
func TestSyncService(t *testing.T) {select {case ret := <-syncService():otherTask()t.Logf("result:%s", ret)case <-time.After(time.Millisecond * 10):t.Error("time out")}
}

image-20230510154712189

因為 service() 需要執(zhí)行 0.05秒,我們設(shè)置了 0.01 秒就超時,所以就走了 time out。

channel的關(guān)閉和廣播

不關(guān)閉 channel 會怎么樣

寫一個數(shù)據(jù)生產(chǎn)者和數(shù)據(jù)消費者的程序,數(shù)據(jù)生產(chǎn)者不斷生產(chǎn)數(shù)據(jù),消費者不斷消費生產(chǎn)者生產(chǎn)的數(shù)據(jù),通過 channel 交互。

// 數(shù)據(jù)生產(chǎn)者
func dataProducer(ch chan int, wg *sync.WaitGroup) chan int {wg.Add(1)go func() {for i := 0; i < 10; i++ {ch <- i}wg.Done()}()return ch
}// 數(shù)據(jù)消費者
func dataConsumer(ch chan int, wg *sync.WaitGroup) {wg.Add(1)go func() {for i := 0; i < 10; i++ {data := <-chfmt.Println(data)}wg.Done()}()
}// 數(shù)據(jù)消費者
func dataConsumer2(ch chan int, wg *sync.WaitGroup) {wg.Add(1)go func() {for i := 0; i < 10; i++ {data := <-chfmt.Println(data)}wg.Done()}()
}// channel還未關(guān)閉的場景
func TestChannelNotClosed(t *testing.T) {ch := make(chan int)var wg sync.WaitGroupdataProducer(ch, &wg)dataConsumer(ch, &wg)wg.Wait()
}

一旦我們生產(chǎn)的數(shù)據(jù)和消費的數(shù)據(jù)不一致時,比如生產(chǎn)者可以生成 11 個數(shù),消費者仍然只消費 10 個數(shù),或者生產(chǎn)者生成 10 個數(shù),而消費者去消費 11 個數(shù)時,就會報下面的錯誤:

image-20230510194033431

為了解決這種問題,Go 急需 channel 具有關(guān)閉功能,且關(guān)閉后會廣播所有的訂閱者。

channel 的關(guān)閉

image-20230510193339574

語法格式

// 關(guān)閉 channel
close(channelName)// ok=true表示正常接收,false表示通道關(guān)閉
if val, ok := <-ch; ok {// other code
}

當(dāng) channel 已正常關(guān)閉,數(shù)據(jù)接收者還繼續(xù)接收數(shù)據(jù),則接收的數(shù)據(jù)為 channel 對應(yīng)數(shù)據(jù)的默認(rèn)值。

// 數(shù)據(jù)生產(chǎn)者
func dataProducer(ch chan int, wg *sync.WaitGroup) chan int {wg.Add(1)go func() {for i := 0; i < 10; i++ {ch <- i}// 關(guān)閉 channelclose(ch)//ch <- 11 // 向關(guān)閉的 channel 發(fā)送消息,會報 panic: send on closed channelwg.Done()}()return ch
}// 數(shù)據(jù)消費者
func dataReceiver(ch chan int, wg *sync.WaitGroup) {wg.Add(1)go func() {// 我們這里多接收一個數(shù)據(jù),看看拿到的值是什么for i := 0; i < 11; i++ {data := <-chfmt.Print(data, " ")}wg.Done()}()
}// 關(guān)閉channel
func TestCloseChannel(t *testing.T) {ch := make(chan int)var wg sync.WaitGroupdataProducer(ch, &wg)dataReceiver(ch, &wg)wg.Wait()
}

image-20230510195707682

我們會發(fā)現(xiàn),當(dāng) channel 已關(guān)閉后,我們多接收了一個值,由于我們 channel 定義的數(shù)據(jù)類型為 int,則拿到的數(shù)據(jù)類型將是 int 型的默認(rèn)值 0

一般我們的 1 個 channel 可能對應(yīng)多個消費者,所以當(dāng)這個 channel 關(guān)閉后,廣播機制就會經(jīng)常被使用,通知所有消費者該 channel 已經(jīng)被關(guān)閉了。

任務(wù)的取消

傳統(tǒng)的方案是,假設(shè)一段任務(wù)在執(zhí)行,我們通過設(shè)置共享內(nèi)存的一個變量的值為 true 或者 false 來進行判斷。現(xiàn)在我們將利用 CSP, select 多路選擇機制和 channel 的關(guān)閉與廣播實現(xiàn)任務(wù)取消功能。

實現(xiàn)原理

  • 通過 CSP 在 channel 上廣播一個消息,告訴所有的協(xié)程,大家現(xiàn)在可以停了。

如何判斷

  • 通過 select 多路選擇機制,如果從 channel 上收到一個消息,代表需要執(zhí)行任務(wù)取消功能,否則不執(zhí)行。

代碼示例

// 任務(wù)是否已被取消
// 實現(xiàn)原理:
// 檢查是否從 channel 收到一個消息,如果收到一個消息,我們就返回 true,代表任務(wù)已經(jīng)被取消了
// 當(dāng)沒有收到消息,channel 會被阻塞,多路選擇機制就會走到 default 分支上去。
func isCanceled(cancelChan chan struct{}) bool {select {case <-cancelChan:return truedefault:return false}
}// 執(zhí)行任務(wù)取消
// 因為 close() 是一個廣播機制,所以所有的協(xié)程都會收到消息
func execCancel(cancelChan chan struct{}) {// close(cancelChan)會使所有處于處于阻塞等待狀態(tài)的消息接收者(<-cancelChan)收到消息close(cancelChan)
}// 利用 CSP,多路選擇機制和 channel 的關(guān)閉與廣播實現(xiàn)任務(wù)取消功能
func TestCancel(t *testing.T) {var wg sync.WaitGroupcancelChan := make(chan struct{}, 0)// 啟動 5 個協(xié)程for i := 0; i < 5; i++ {wg.Add(1)go func(i int, cancelChan chan struct{}, wg *sync.WaitGroup) {// 做一個 while(true) 的循環(huán),一直檢查任務(wù)是否有被取消for {if isCanceled(cancelChan) {fmt.Println(i, "is Canceled")wg.Done()break} else {// 其它正常業(yè)務(wù)邏輯time.Sleep(time.Millisecond * 5)}}}(i, cancelChan, &wg)}// 執(zhí)行任務(wù)取消execCancel(cancelChan)wg.Wait()
}

image-20230510201314157

所有的協(xié)程都被取消了。

close() 是一個廣播機制,會使所有處于處于阻塞等待狀態(tài)的消息接收者收到消息。

Context與任務(wù)取消

關(guān)聯(lián)任務(wù)的取消

場景:當(dāng)我們啟動了多個子任務(wù)的同時,子任務(wù)還有子任務(wù)的時,產(chǎn)生了關(guān)聯(lián):

image-20230510201502220

如果我們只是想要取消掉一個葉子節(jié)點的任務(wù)時,那利用 CSP,select 多路選擇機制和 channel 的關(guān)閉與廣播就可以實現(xiàn)。

image-20230510201653350

但是我們現(xiàn)在的場景是當(dāng)我們?nèi)∠舾腹?jié)點的任務(wù)時,想要把子節(jié)點的全部任務(wù)也一起取消掉,那該如何實現(xiàn)呢?

image-20230510201742833

當(dāng)然我們可以自己來實現(xiàn),但在 Golang 的 1.9 以后就把 Context 正式并入Go的內(nèi)置包里面了,它就是專門來做這件事的。

Context

image-20230510201534133

ctx, cancel := context.WithCancel(context.Background())

context.WithCancel() 方法,把根節(jié)點 context.Background() 傳進去之后,返回的一個是 ctx ,一個是 cancel 方法,調(diào)用 cancel 方法則執(zhí)行取消功能。而 ctx 可傳到子任務(wù)里面,用來取消子任務(wù),從而實現(xiàn)父節(jié)點和子任務(wù)都被取消掉。取消的通知形式是通過 ctx.Done() 來獲得消息,從而判斷是否收到通知。這個 ctx.Done() 就類比 channel 里面 close() 之后,所有的 channel 都會收到一個通知。

代碼實現(xiàn)

// 任務(wù)是否已被取消
// 實現(xiàn)原理:
// 通過 ctx.Done() 接收context的消息,如果收到消息,我們就返回 true,代表任務(wù)已經(jīng)被取消了
// 當(dāng)沒有收到消息,多路選擇機制就會走到 default 分支上去。
func isCanceled(ctx context.Context) bool {select {case <-ctx.Done():return truedefault:return false}
}// 通過context實現(xiàn)任務(wù)取消功能
func TestCancel(t *testing.T) {var wg sync.WaitGroup// ctx傳到子節(jié)點中去,可以取消子節(jié)點,調(diào)用cancel()方法則執(zhí)行取消功能ctx, cancel := context.WithCancel(context.Background())// 啟動 5 個協(xié)程for i := 0; i < 5; i++ {wg.Add(1)go func(i int, ctx context.Context, wg *sync.WaitGroup) {// 做一個 while(true) 的循環(huán),一直檢查任務(wù)是否有被取消for {if isCanceled(ctx) {fmt.Println(i, "is Canceled")wg.Done()break} else {// 其它正常業(yè)務(wù)邏輯time.Sleep(time.Millisecond * 5)}}}(i, ctx, &wg)}// 執(zhí)行任務(wù)取消cancel()wg.Wait()
}

image-20230510203310557

協(xié)程都被取消了,符合預(yù)期。

并發(fā)任務(wù)

只執(zhí)行一次 - 單例模式

  • Java代碼 - 單例模式 - 懶漢式 - 線程安全(double check)

image-20230515204029731

  • Go代碼

    sync.Once() 能確保里面的 Do() 方法在多線程的情況下只會被執(zhí)行一次。

    type Singleton struct {
    }var singleInstance *Singleton
    var once sync.Once// 獲取一個單例對象
    func GetSingletonObj() *Singleton {once.Do(func() {fmt.Println("Create a singleton Obj")singleInstance = new(Singleton)})return singleInstance
    }// 啟動多個協(xié)程,測試我們單例對象是否只創(chuàng)建了一次
    func TestGetSingletonObj(t *testing.T)  {var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func() {obj := GetSingletonObj()fmt.Printf("%x\n", unsafe.Pointer(obj))wg.Done()}()}wg.Wait()
    }
    

    image-20230515205536923

    可以看到 Do() 方法中輸出內(nèi)容只打印了一次,多個協(xié)程拿到的地址值都相同,實現(xiàn)單例模式。

僅需任意任務(wù)完成

當(dāng)我們需要執(zhí)行許多并發(fā)任務(wù),但是只要任意一個任務(wù)執(zhí)行完畢,就可以將結(jié)果返回給用戶。例如我們同時向百度和 google 去搜索某一個搜索詞,任何一個搜索引擎第一個返回,我們就可以把結(jié)果返回給用戶了,不需要所有場景都返回。

  • 這里我們利用 CSP 的機制實現(xiàn)這個模式
// 從網(wǎng)站上執(zhí)行搜索功能
func searchFromWebSite(webSite string) string {time.Sleep(10 * time.Millisecond)return fmt.Sprintf("search from %s", webSite)
}// 收到第一個結(jié)果后立刻返回
func FirstResponse() string {var arr = [2]string{"baidu", "google"}// 防止協(xié)程泄露,這里用 buffer channel 很重要,否則可能導(dǎo)致剩下的協(xié)程會被阻塞在那里,// 當(dāng)阻塞的協(xié)程達到一定量后,最終可能導(dǎo)致服務(wù)器資源耗盡而出現(xiàn)重大故障ch := make(chan string, len(arr))for _, val := range arr {go func(v string) {// 拿到所有結(jié)果放入 channelch <- searchFromWebSite(v)}(val)}// 這里沒有使用 WaitGroup,因為我們的需求是當(dāng) channel 收到第一個消息后就立刻返回return <-ch
}func TestFirstResponse(t *testing.T) {t.Log("Before:", runtime.NumGoroutine()) // 輸出當(dāng)前系統(tǒng)中的協(xié)程數(shù)t.Log(FirstResponse())t.Log("After:", runtime.NumGoroutine()) // 輸出當(dāng)前系統(tǒng)中的協(xié)程數(shù)
}

image-20230516131515928

所有任務(wù)完成

有時候我們需要所有任務(wù)都完成才進入下一個環(huán)節(jié),當(dāng)我們下單成功后,只有積分和優(yōu)惠券都贈送了才顯示所有優(yōu)惠贈送成功。

這個模式當(dāng)然可以用 WaitGroup 實現(xiàn),但我們這里再使用 CSP 機制實現(xiàn)。

// 送豪禮方法
func sendGift(gift string) string {time.Sleep(10 * time.Millisecond)return fmt.Sprintf("送%s", gift)
}// 使用 CSP 拿到所有的結(jié)果才返回
func CspAllResponse() []string {var arr = [2]string{"優(yōu)惠券", "積分"}// 防止協(xié)程泄露,這里用 buffer channel 很重要,否則可能導(dǎo)致剩下的協(xié)程會被阻塞在那里,// 當(dāng)阻塞的協(xié)程達到一定量后,最終可能導(dǎo)致服務(wù)器資源耗盡而出現(xiàn)重大故障ch := make(chan string, len(arr))for _, val := range arr {go func(v string) {// 拿到所有結(jié)果放入 channelch <- sendGift(v)}(val)}var finalRes = make([]string, len(arr), len(arr))// 等到所有的的協(xié)程都執(zhí)行完畢,把結(jié)果一起返回for i := 0; i < len(arr); i++ {finalRes[i] = <-ch}return finalRes
}func TestAllResponse(t *testing.T) {t.Log("Before:", runtime.NumGoroutine())t.Log(CspAllResponse())t.Log("After:", runtime.NumGoroutine())
}

image-20230516132812771

對象池

在我們?nèi)粘5拈_發(fā)中,經(jīng)常會有像數(shù)據(jù)庫連接,網(wǎng)絡(luò)連接等,我們經(jīng)常需要把它們池化,以免對象被重復(fù)創(chuàng)建。在 Go 語言中我們可以使用 buffered channel 實現(xiàn)對象池,通過設(shè)定 buffer 的大小來設(shè)定池的大小,我們可以從這個 buffer 池中拿到一個對象,用完了再還到 channel 上。

// 可重用對象,比如連接等
type Reusable struct {
}// 對象池
type ObjPool struct {bufChan chan *Reusable // 用于緩存可重用對象
}// 創(chuàng)建一個包含多個可重用對象的對象池
func NewObjPool(numOfObj int) *ObjPool {// 聲明對象池objPool := ObjPool{}// 初始化 objPool.bufChan 為一個 channelobjPool.bufChan = make(chan *Reusable, numOfObj)// 往 objPool 對象池里面放多個可重用對象for i := 0; i < numOfObj; i++ {objPool.bufChan <- &Reusable{}}return &objPool
}// 從對象池拿到一個對象
func (objPool *ObjPool) GetObj(timeout time.Duration) (*Reusable, error) {select {case ret := <-objPool.bufChan:return ret, nilcase <-time.After(timeout): // 超時控制return nil, errors.New("time out")}
}// 將可重用對象還回對象池
func (objPool *ObjPool) ReleaseObj(ReusableObj *Reusable) error {select {case objPool.bufChan <- ReusableObj:return nildefault:return errors.New("overflow") // 超出可重用對象池容量}
}// 從對象池里面拿出對象,用完了再放回去
func TestObjPool(t *testing.T) {pool := NewObjPool(3)for i := 0; i < 3; i++ {if obj, err := pool.GetObj(time.Second * 1); err != nil {t.Error(err)} else {fmt.Printf("%T\n", obj)if err := pool.ReleaseObj(obj); err != nil {t.Error(err)}}}t.Log("Done")
}

image-20230516134633855

sync.Pool 對象緩存

其實 sync.Pool 并不是對象池的類,而是個對象緩存,叫 sync.Cache 更貼切。

sync.Pool 有兩個重要的概念,私有對象共享池

  • 私有對象:協(xié)程安全,寫入的時候不需要鎖。
  • 共享池:協(xié)程不安全,寫入的時候需要鎖。

它們兩個存放在我們之前講過的 Processor 中。

sync.Pool 對象獲取

image-20230516135420115

sync.Pool 對象放回

image-20230516135850416

sync.Pool 的生命周期

image-20230516140013906

這也就是為什么不能拿它來當(dāng)對象池用。

使用 sync.Pool

偽代碼

// 使用 New 關(guān)鍵字創(chuàng)建新對象
pool := &sync.Pool{New: func() interface{} {return 0},
}// 從 pool 中獲取一個對象,因為返回的是空接口interface{},所以要自己做斷言
array := pool.Get().(int)// 往 pool 中放入一個對象
pool.Put(10)
基本使用
// 調(diào)試 sync.Pool 對象
func TestSyncPool(t *testing.T) {pool := &sync.Pool{New: func() interface{} {fmt.Println("Create a new object")return 1},}// 第一次從池中獲取對象,我們知道它一定是空的,所有肯定會調(diào)用 New 方法去創(chuàng)建一個新對象v := pool.Get().(int)fmt.Println(v) // 1// 放一個不存在的對象,它會優(yōu)先放入私有對象pool.Put(2)// 此時私有對象已經(jīng)存在了,所以會優(yōu)先拿到私有對象的值v1 := pool.Get().(int)fmt.Println(v1) // 2// 模擬系統(tǒng)調(diào)用GC, GC會清除 sync.pool中緩存的對象//runtime.GC()
}

image-20230516145651073

過程中發(fā)生一次 GC:

// 調(diào)試 sync.Pool 對象
func TestSyncPool2(t *testing.T) {pool := &sync.Pool{New: func() interface{} {fmt.Println("Create a new object")return 1},}// 第一次從池中獲取對象,我們知道它一定是空的,所有肯定會調(diào)用 New 方法去創(chuàng)建一個新對象v := pool.Get().(int)fmt.Println(v) // 1// 放一個不存在的對象,它會優(yōu)先放入私有對象pool.Put(2)// 模擬系統(tǒng)調(diào)用GC, GC會清除 sync.pool中緩存的對象runtime.GC()// 此時私有對象已經(jīng)被GC掉了,所以這里又新建了一次對象v1 := pool.Get().(int)fmt.Println(v1) // 1
}

image-20230516150706206

創(chuàng)建了 2 次新對象,符合預(yù)期。

注意:使用 Get() 方法新創(chuàng)建的對象是不會放入到私有對象中的,只有 Put() 方法才會放到私有對象中。

在多協(xié)程中的應(yīng)用
// 調(diào)試 sync.Pool 在多個協(xié)程中的應(yīng)用場景
func TestSyncPoolInMultiGoroutine(t *testing.T) {pool := sync.Pool{New: func() interface{} {fmt.Println("Create a new object")return 0},}pool.Put(1)pool.Put(2)pool.Put(3)var wg sync.WaitGroupfor i := 0; i < 5; i++ {wg.Add(1)go func() {v, _ := pool.Get().(int)fmt.Println(v)wg.Done()}()}wg.Wait()
}

image-20230516151329903

sync.Pool 總結(jié)

image-20230516151423032

單元測試

內(nèi)置單元測試框架

image-20230517101035997

func TestErrorInCode(t *testing.T) {fmt.Println("Start")t.Error("Error")fmt.Println("End")
}func TestFailInCode(t *testing.T) {fmt.Println("Start")t.Fatal("Error")fmt.Println("End")
}

image-20230517101527274

使用 Error 的測試方法,測試?yán)^續(xù)執(zhí)行,使用 Fatal 的測試方法,測試中斷。

顯示代碼覆蓋率

go test -v -cover

斷言

https://github.com/stretchr/testify

安裝 assert

go get -u github.com/stretchr/testify

image-20230517112432974

// 平方 故意+1計算錯誤,使斷言生效
func square(num int) int {return num * num + 1
}// 表格測試法
func TestSquare(t *testing.T) {// 輸入值inputs := [...]int{1, 2, 3}// 期望值expected := [...]int{2, 4, 9}for i := 0; i< len(inputs); i++ {ret := square(inputs[i])// 調(diào)用 assert 斷言包assert.Equal(t, expected[i], ret)}
}

image-20230517112848737

Benchmark

用途

  • 對程序中某些代碼片段的進行一個性能測評,比較一下哪種寫法會更好一些。
  • 對第三方庫進行一個測評,看哪個庫性能更好一些。

使用示例

image-20230517151428677

b.ResetTimer()b.StopTimer() 來隔離與性能測試無關(guān)的代碼。

代碼測試:比較字符串拼接的性能

// 通過“+=”的方式拼接字符串
func ConcatStringByLink() string {elements := [...]string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10","11", "12", "13", "14", "15", "16", "17", "18", "19", "20"}str := ""for _, elem := range elements {str += elem}return str
}// 通過字節(jié)數(shù)組 bytes.buffer 拼接字符串
func ConcatStringByBytesBuffer() string {elements := [...]string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10","11", "12", "13", "14", "15", "16", "17", "18", "19", "20"}var buf bytes.Bufferfor _, elem := range elements {buf.WriteString(elem)}return buf.String()
}// 用benchmark測試字符串拼接方法的性能
func BenchmarkConcatStringWithLink(b *testing.B) {// 與性能測試無關(guān)的代碼的開始位置b.ResetTimer()for i := 0; i < b.N; i++ {ConcatStringByLink()}// 與性能測試無關(guān)代碼的結(jié)束為止b.StopTimer()
}// 用 benchmark 測試 bytes.buffer 連接字符串的性能
func BenchmarkConcatStringWithByteBuffer(b *testing.B) {// 與性能測試無關(guān)的代碼的開始位置b.ResetTimer()for i := 0; i < b.N; i++ {ConcatStringByBytesBuffer()}// 與性能測試無關(guān)代碼的結(jié)束為止b.StopTimer()
}
方式代碼運行次數(shù)單次運行時間
使用 += 拼接1813815649.9 ns/op
使用 bytes.Buffer 拼接6804018172.6 ns/op

這只是拼接了 20 個字符串,如果拼接的字符串更多,則差距會更加明顯。

原生命令

// -bench= 后面跟方法名,如果是所有方法就寫"."
go test -bench=.// 注意:windows下使用 go test 命令時, -bench=.應(yīng)該寫成 -bench="."// 如果想知道 代碼每一次的內(nèi)存分配情況,這種方案為什么快,那種方案為什么慢,可以加一個-benchmem參數(shù)
go test -bench=. -benchmem

image-20230517152647857

通過 += 的方式我們總共使用 allocs 分配了 19 次空間,而通過 byte.Buffer 只分配了一次,性能提升在這里。

BDD

BDD(Behavior Driven Development),行為驅(qū)動開發(fā)。

為了讓我們和客戶間的溝通更加順暢,我們會用同一種“語言”來描述一個系統(tǒng),避免表達不一致的問題,當(dāng)出現(xiàn)了什么行為,會出現(xiàn)什么結(jié)果。

image-20230517154138740

image-20230517154428331

BDD in Go

goconvey 項目網(wǎng)站:

https://github.com/smartystreets/goconvey/

安裝
go get -u github.com/smartystreets/goconvey/convey
代碼示例
package bddimport ("testing"// 前面這個"."點,表示將import進來的package的方法是在當(dāng)前名字空間的,可以直接使用里面的方法// 例如使用 So()方法,就可以直接用,不用寫成 convey.So(). "github.com/smartystreets/goconvey/convey"
)// BDD框架 convey的使用
func TestSpec(t *testing.T) {Convey("Given 2 even numbers", t, func() {a := 3b := 4Convey("When add the two numbers", func() {c := a + bConvey("Then the result is still even", func() {So(c%2, ShouldEqual, 0) // 判斷c % 2是否為 0})})})
}

image-20230517161022762

啟動 WEB UI
~/go/bin/goconvey 

image-20230517162927075

Web 界面非常友好:

image-20230517163014679

如果端口沖突了,可以這樣解決

~/go/bin/goconvey -port 8081

筆記整理自極客時間視頻教程:Go語言從入門到實戰(zhàn)

http://aloenet.com.cn/news/34968.html

相關(guān)文章:

  • 公司的網(wǎng)站建設(shè)一般需要多少費用sem優(yōu)化怎么做
  • 中英文網(wǎng)站asp怎么做seo推廣軟件費用
  • 找人做網(wǎng)站應(yīng)該注意什么福州seo兼職
  • 打開鏈接的網(wǎng)站網(wǎng)絡(luò)營銷計劃的七個步驟
  • 自制網(wǎng)站的動態(tài)圖怎么做創(chuàng)意廣告
  • 廣州中小企業(yè)網(wǎng)站建設(shè)免費發(fā)帖推廣的平臺
  • 深圳外文網(wǎng)站制作喬拓云智能建站官網(wǎng)
  • 福州企業(yè)網(wǎng)站推廣網(wǎng)絡(luò)營銷推廣方式
  • 馬鞍山制作網(wǎng)站網(wǎng)絡(luò)營銷方式有哪幾種
  • 學(xué)校網(wǎng)站制作2345網(wǎng)址導(dǎo)航大全
  • 做二手房的網(wǎng)站技巧網(wǎng)站做成app
  • 網(wǎng)站設(shè)計價格大概多少谷歌瀏覽器下載手機版
  • 做網(wǎng)站優(yōu)化給業(yè)務(wù)員提成百度資源提交
  • wordpress+admin主題武漢seo招聘信息
  • 揚中網(wǎng)站建設(shè) 優(yōu)幫云站長工具seo查詢5g5g
  • 珠海網(wǎng)站建設(shè)科速互聯(lián)百度知道網(wǎng)頁版進入
  • 徐匯做網(wǎng)站無錫百度推廣公司哪家好
  • 青島做外貿(mào)網(wǎng)站建設(shè)網(wǎng)絡(luò)營銷服務(wù)的特點
  • 微信網(wǎng)站模板免費下載seo免費入門教程
  • wordpress foxseo 關(guān)鍵詞優(yōu)化
  • 快速免費建網(wǎng)站常用的營銷策略
  • 邢臺做網(wǎng)站優(yōu)化百度排名優(yōu)化軟件
  • 淮北做網(wǎng)站的公司百度seo優(yōu)化服務(wù)項目
  • 滎陽網(wǎng)站建設(shè)公司網(wǎng)絡(luò)關(guān)鍵詞優(yōu)化軟件
  • 便宜做網(wǎng)站seo算法優(yōu)化
  • 佛山正規(guī)網(wǎng)站建設(shè)報價優(yōu)化大師app下載安裝
  • 口碑好網(wǎng)站建設(shè)公司seo關(guān)鍵詞優(yōu)化平臺
  • 不同網(wǎng)站對商家做o2o的政策阿里seo排名優(yōu)化軟件
  • 湖南教育平臺網(wǎng)站建設(shè)流量寶
  • 梧州專業(yè)網(wǎng)站推廣官方百度平臺