網(wǎng)站添加鏈接網(wǎng)站申請流程
Linux——線程條件變量(同步)-CSDN博客
文章目錄
目錄
文章目錄
前言
一、信號量是什么?
二、信號量
1、主要類型
2、操作
3、應(yīng)用場景
三、信號量函數(shù)
1、sem_init 函數(shù)
2、sem_wait 函數(shù)
3、sem_post 函數(shù)
4、sem_destroy 函數(shù)
???????5、sem_getvalue 函數(shù)
6、sem_trywait 函數(shù)
7、sem_timedwait 函數(shù)
四、環(huán)形隊列
1、定義與原理
2、操作
五、線程池
基本原理
主要功能
實(shí)現(xiàn)方式
六、基于環(huán)形隊列的消費(fèi)者模型
1、main函數(shù)
2、RingQueue.hpp?
3、Task.hpp?
?編輯
前言
信號量(Semaphore)是一種用于多線程或多進(jìn)程環(huán)境下實(shí)現(xiàn)同步和互斥的機(jī)制。
一、信號量是什么?
信號量本質(zhì)上是一個計數(shù)器,用于控制對共享資源的訪問。它的值表示當(dāng)前可用的資源數(shù)量。當(dāng)一個線程或進(jìn)程想要訪問某個共享資源時,它需要先檢查信號量的值。如果信號量的值大于 0,則表示有可用資源,該線程或進(jìn)程可以獲取資源并將信號量的值減 1;如果信號量的值為 0,則表示沒有可用資源,該線程或進(jìn)程需要等待,直到其他線程或進(jìn)程釋放資源,使信號量的值大于 0。
二、信號量
1、主要類型
- 二進(jìn)制信號量:也稱為互斥信號量,它的值只能是 0 或 1。主要用于實(shí)現(xiàn)互斥訪問,確保在任何時刻只有一個線程或進(jìn)程能夠訪問共享資源,就像一個房間只有一把鑰匙,誰拿到鑰匙誰才能進(jìn)入房間使用里面的資源,使用完后把鑰匙放回,其他人才有機(jī)會拿到鑰匙進(jìn)入。
- 計數(shù)信號量:其值可以是任意非負(fù)整數(shù),用于控制同時訪問共享資源的線程或進(jìn)程數(shù)量。比如有一個停車場有 10 個停車位,就可以用一個初始值為 10 的計數(shù)信號量來表示,每有一輛車進(jìn)入停車場,信號量的值就減 1,當(dāng)信號量的值為 0 時,表示停車場已滿,后續(xù)車輛需要等待。
2、操作
- P 操作:也稱為 wait 操作或 down 操作。當(dāng)一個進(jìn)程或線程執(zhí)行 P 操作時,它會檢查信號量的值。如果信號量的值大于 0,則將信號量的值減 1,然后進(jìn)程或線程可以繼續(xù)執(zhí)行;如果信號量的值為 0,則進(jìn)程或線程會被阻塞,放入等待隊列,直到信號量的值大于 0。
- V 操作:也稱為 signal 操作或 up 操作。當(dāng)一個進(jìn)程或線程執(zhí)行 V 操作時,它會將信號量的值加 1。如果此時有其他進(jìn)程或線程正在等待該信號量(即信號量的值為 0 且有進(jìn)程在等待隊列中),則系統(tǒng)會從等待隊列中喚醒一個進(jìn)程或線程,使其能夠執(zhí)行 P 操作并獲取資源。
3、應(yīng)用場景
- 資源管理:可以用于管理系統(tǒng)中的各種資源,如內(nèi)存、文件、網(wǎng)絡(luò)連接等。通過信號量可以確保資源的合理分配和使用,避免資源沖突和過度使用。
- 進(jìn)程同步:在多個進(jìn)程或線程協(xié)同工作的場景中,信號量可以用于實(shí)現(xiàn)進(jìn)程之間的同步。例如,一個進(jìn)程需要等待另一個進(jìn)程完成某個任務(wù)后才能繼續(xù)執(zhí)行,就可以使用信號量來實(shí)現(xiàn)這種等待和喚醒機(jī)制。
- 生產(chǎn)者 - 消費(fèi)者問題:是信號量應(yīng)用的經(jīng)典場景。生產(chǎn)者進(jìn)程生產(chǎn)數(shù)據(jù)并將其放入緩沖區(qū),消費(fèi)者進(jìn)程從緩沖區(qū)中取出數(shù)據(jù)進(jìn)行消費(fèi)。通過信號量可以控制生產(chǎn)者和消費(fèi)者的行為,確保緩沖區(qū)不會被過度寫入或讀取。
三、信號量函數(shù)
1、sem_init 函數(shù)
- 功能:用于初始化一個信號量。
- 原型:
int sem_init(sem_t *sem, int pshared, unsigned int value);
- 參數(shù):
sem
是指向要初始化的信號量的指針;pshared
指定信號量是否在進(jìn)程間共享,0 表示僅在線程間共享,非 0 表示在進(jìn)程間共享;value
是信號量的初始值。 - 返回值:成功時返回 0,失敗時返回 - 1,并設(shè)置
errno
以指示錯誤原因。
2、sem_wait 函數(shù)
- 功能:對信號量執(zhí)行 P 操作,即等待信號量變?yōu)榭捎谩?/strong>
- 原型:
int sem_wait(sem_t *sem);
- 參數(shù):
sem
是指向要操作的信號量的指針。 - 返回值:成功時返回 0,若信號量的值為 0,則線程會阻塞直到信號量可用;失敗時返回 - 1,并設(shè)置
errno
。
3、sem_post 函數(shù)
- 功能:對信號量執(zhí)行 V 操作,釋放信號量,使信號量的值加 1。
- 原型:
int sem_post(sem_t *sem);
- 參數(shù):
sem
是指向要操作的信號量的指針。 - 返回值:成功時返回 0,失敗時返回 - 1,并設(shè)置
errno
。
4、sem_destroy 函數(shù)
- 功能:銷毀一個信號量,釋放相關(guān)資源。
- 原型:
int sem_destroy(sem_t *sem);
- 參數(shù):
sem
是指向要銷毀的信號量的指針。 - 返回值:成功時返回 0,失敗時返回 - 1,并設(shè)置
errno
。
???????5、sem_getvalue 函數(shù)
- 功能:獲取信號量的當(dāng)前值。
- 原型:
int sem_getvalue(sem_t *sem, int *sval);
- 參數(shù):
sem
是指向要查詢的信號量的指針;sval
是一個整數(shù)指針,用于存儲信號量的當(dāng)前值。??????? - 返回值:成功時返回 0,并將信號量的當(dāng)前值存儲在
sval
指向的位置;失敗時返回 - 1,并設(shè)置errno
以指示錯誤原因。
6、sem_trywait 函數(shù)
- 功能:嘗試對信號量執(zhí)行 P 操作,但不會阻塞線程。如果信號量的值大于 0,則將信號量的值減 1 并立即返回;如果信號量的值為 0,則立即返回錯誤,而不會阻塞線程。
- 原型:
int sem_trywait(sem_t *sem);
- 參數(shù):
sem
是指向要操作的信號量的指針。 - 返回值:成功時返回 0,此時表示成功獲取信號量并將其值減 1;如果信號量的值為 0,無法獲取信號量,則返回 - 1,并將
errno
設(shè)置為EAGAIN
。
7、sem_timedwait 函數(shù)
- 功能:對信號量執(zhí)行 P 操作,但會設(shè)置一個超時時間。如果在超時時間內(nèi)信號量變?yōu)榭捎?#xff0c;則獲取信號量并返回;如果超時時間已過,信號量仍不可用,則返回錯誤。
- 原型:
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
- 參數(shù):
sem
是指向要操作的信號量的指針;abs_timeout
是一個指向struct timespec
結(jié)構(gòu)體的指針,用于指定絕對超時時間。 - 返回值:成功時返回 0,若在超時時間內(nèi)未獲取到信號量,則返回 - 1,并將
errno
設(shè)置為ETIMEDOUT
。
四、環(huán)形隊列
1、定義與原理
- 環(huán)形隊列是一種基于隊列的數(shù)據(jù)結(jié)構(gòu),它將隊列的首尾相連,形成一個環(huán)形的存儲空間。與普通隊列不同,環(huán)形隊列可以更有效地利用存儲空間,避免了普通隊列在元素出隊后出現(xiàn)的前端空閑空間無法利用的問題。
- 它通過使用兩個指針,即隊頭指針(front)和隊尾指針(rear)來管理隊列中的元素。當(dāng)隊尾指針到達(dá)隊列的末尾時,它會重新回到隊列的開頭,繼續(xù)存儲新元素,從而實(shí)現(xiàn)了循環(huán)利用空間的功能。
2、操作
- 初始化:創(chuàng)建一個指定大小的數(shù)組來存儲隊列元素,并將隊頭指針和隊尾指針都初始化為 0,表示隊列為空。
- 入隊操作:當(dāng)要將一個新元素插入到環(huán)形隊列中時,首先檢查隊列是否已滿。如果未滿,將新元素存儲在隊尾指針?biāo)赶虻奈恢?#xff0c;然后將隊尾指針向后移動一位。如果隊尾指針已經(jīng)到達(dá)數(shù)組的末尾,則將其重新設(shè)置為數(shù)組的開頭位置。
- 出隊操作:從環(huán)形隊列中刪除元素時,首先檢查隊列是否為空。如果不為空,取出隊頭指針?biāo)赶虻脑?#xff0c;然后將隊頭指針向后移動一位。同樣,如果隊頭指針到達(dá)數(shù)組的末尾,也需要將其重新設(shè)置為數(shù)組的開頭位置。
- 判斷隊列空滿
- 一般采用犧牲一個存儲空間的方法來區(qū)分隊列空和滿的情況,即當(dāng)
(rear + 1) % maxSize == front
時,認(rèn)為隊列已滿,其中maxSize
是隊列的最大容量;當(dāng)front == rear
時,認(rèn)為隊列是空的。 - 也可以使用一個計數(shù)器來記錄隊列中元素的個數(shù),當(dāng)計數(shù)器的值為 0 時表示隊列為空,當(dāng)計數(shù)器的值等于
maxSize
時表示隊列已滿。
- 一般采用犧牲一個存儲空間的方法來區(qū)分隊列空和滿的情況,即當(dāng)
五、線程池
線程池是一種多線程處理形式,它將多個線程預(yù)先創(chuàng)建并放入一個池中,以方便對線程進(jìn)行管理和重復(fù)利用,從而提高系統(tǒng)性能和資源利用率。以下是關(guān)于線程池的詳細(xì)介紹:
基本原理
- 線程創(chuàng)建與管理:線程池在初始化時會創(chuàng)建一定數(shù)量的線程,并將它們放入線程池中。這些線程在創(chuàng)建后不會立即執(zhí)行具體任務(wù),而是處于等待狀態(tài),等待接收任務(wù)并執(zhí)行。
- 任務(wù)隊列:線程池通常會維護(hù)一個任務(wù)隊列,用于存儲待執(zhí)行的任務(wù)。當(dāng)有新任務(wù)到來時,會將任務(wù)添加到任務(wù)隊列中。線程池中的線程會不斷從任務(wù)隊列中獲取任務(wù),并執(zhí)行相應(yīng)的操作。
- 線程復(fù)用:線程執(zhí)行完一個任務(wù)后,不會立即銷毀,而是返回到線程池中,繼續(xù)等待下一個任務(wù)。這樣可以避免頻繁地創(chuàng)建和銷毀線程,減少了線程創(chuàng)建和銷毀所帶來的開銷,提高了系統(tǒng)的性能和響應(yīng)速度。
主要功能
- 提高資源利用率:通過復(fù)用線程,避免了因頻繁創(chuàng)建和銷毀線程而帶來的資源浪費(fèi),尤其是在處理大量短時間任務(wù)時,能顯著提高系統(tǒng)資源的利用率。
- 控制并發(fā)度:可以限制同時執(zhí)行的線程數(shù)量,避免過多線程同時運(yùn)行導(dǎo)致系統(tǒng)資源過度消耗,從而保證系統(tǒng)的穩(wěn)定性和響應(yīng)能力。
- 簡化線程管理:將線程的創(chuàng)建、調(diào)度和管理等工作封裝在一個線程池中,使得開發(fā)者無需直接管理大量的線程,降低了多線程編程的復(fù)雜性,提高了代碼的可維護(hù)性和可讀性。
實(shí)現(xiàn)方式
- 線程池的組成部分
- 線程集合:存儲線程池中的所有線程,一般使用線程數(shù)組或線程列表來實(shí)現(xiàn)。
- 任務(wù)隊列:用于存放待執(zhí)行的任務(wù),通常使用隊列數(shù)據(jù)結(jié)構(gòu),如阻塞隊列來實(shí)現(xiàn)。當(dāng)任務(wù)隊列滿時,新任務(wù)可能會被阻塞或根據(jù)特定的策略進(jìn)行處理。
- 線程池管理模塊:負(fù)責(zé)線程池的初始化、線程的創(chuàng)建與銷毀、任務(wù)的分配與調(diào)度等管理工作。它根據(jù)任務(wù)隊列的狀態(tài)和線程池的配置參數(shù),決定是否需要創(chuàng)建新的線程或回收空閑線程。
- 工作流程
- 任務(wù)提交:用戶將任務(wù)提交到線程池,任務(wù)會被放入任務(wù)隊列中。
- 任務(wù)分配:線程池中的線程會不斷從任務(wù)隊列中獲取任務(wù)。當(dāng)線程獲取到任務(wù)后,就開始執(zhí)行任務(wù)。
- 線程管理:線程池管理模塊會監(jiān)控線程的狀態(tài),當(dāng)線程執(zhí)行完任務(wù)后,會將其重新放回線程池中,使其可以繼續(xù)執(zhí)行下一個任務(wù)。如果線程池中的線程數(shù)量超過了最大線程數(shù),或者有空閑線程超過了一定的空閑時間,線程池管理模塊會負(fù)責(zé)銷毀這些線程,以釋放資源。
六、基于環(huán)形隊列的消費(fèi)者模型
1、main函數(shù)
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <ctime>
#include "RingQueue.hpp"
#include "Task.hpp"using namespace std;struct ThreadData
{RingQueue<Task> *rq;std::string threadname;
};void *Productor(void *args)
{// sleep(3);ThreadData *td = static_cast<ThreadData*>(args);RingQueue<Task> *rq = td->rq;std::string name = td->threadname;int len = opers.size();while (true){// 1. 獲取數(shù)據(jù)int data1 = rand() % 10 + 1;usleep(10);int data2 = rand() % 10;char op = opers[rand() % len];Task t(data1, data2, op);// 2. 生產(chǎn)數(shù)據(jù)rq->Push(t);cout << "Productor task done, task is : " << t.GetTask() << " who: " << name << endl;sleep(1);}return nullptr;
}void *Consumer(void *args)
{ThreadData *td = static_cast<ThreadData*>(args);RingQueue<Task> *rq = td->rq;std::string name = td->threadname;while (true){// 1. 消費(fèi)數(shù)據(jù)Task t;rq->Pop(&t);// 2. 處理數(shù)據(jù)t();cout << "Consumer get task, task is : " << t.GetTask() << " who: " << name << " result: " << t.GetResult() << endl;// sleep(1);}return nullptr;
}int main()
{srand(time(nullptr) ^ getpid());RingQueue<Task> *rq = new RingQueue<Task>(50);pthread_t c[5], p[3];for (int i = 0; i < 1; i++){ThreadData *td = new ThreadData();td->rq = rq;td->threadname = "Productor-" + std::to_string(i);pthread_create(p + i, nullptr, Productor, td);}for (int i = 0; i < 1; i++){ThreadData *td = new ThreadData();td->rq = rq;td->threadname = "Consumer-" + std::to_string(i);pthread_create(c + i, nullptr, Consumer, td);}for (int i = 0; i < 1; i++){pthread_join(p[i], nullptr);}for (int i = 0; i < 1; i++){pthread_join(c[i], nullptr);}return 0;
}
2、RingQueue.hpp?
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>const static int defaultcap = 5;template<class T>
class RingQueue{
private:void P(sem_t &sem){sem_wait(&sem);}void V(sem_t &sem){sem_post(&sem);}void Lock(pthread_mutex_t &mutex){pthread_mutex_lock(&mutex);}void Unlock(pthread_mutex_t &mutex){pthread_mutex_unlock(&mutex);}
public:RingQueue(int cap = defaultcap):ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0){sem_init(&cdata_sem_, 0, 0);sem_init(&pspace_sem_, 0, cap);pthread_mutex_init(&c_mutex_, nullptr);pthread_mutex_init(&p_mutex_, nullptr);}void Push(const T &in) // 生產(chǎn){P(pspace_sem_);Lock(p_mutex_); // ?ringqueue_[p_step_] = in;// 位置后移,維持環(huán)形特性p_step_++;p_step_ %= cap_;Unlock(p_mutex_); V(cdata_sem_);}void Pop(T *out) // 消費(fèi){P(cdata_sem_);Lock(c_mutex_); // ?*out = ringqueue_[c_step_];// 位置后移,維持環(huán)形特性c_step_++;c_step_ %= cap_;Unlock(c_mutex_); V(pspace_sem_);}~RingQueue(){sem_destroy(&cdata_sem_);sem_destroy(&pspace_sem_);pthread_mutex_destroy(&c_mutex_);pthread_mutex_destroy(&p_mutex_);}
private:std::vector<T> ringqueue_;int cap_;int c_step_; // 消費(fèi)者下標(biāo)int p_step_; // 生產(chǎn)者下標(biāo)sem_t cdata_sem_; // 消費(fèi)者關(guān)注的數(shù)據(jù)資源sem_t pspace_sem_; // 生產(chǎn)者關(guān)注的空間資源pthread_mutex_t c_mutex_;pthread_mutex_t p_mutex_;
};
3、Task.hpp?
#pragma once
#include <iostream>
#include <string>std::string opers="+-*/%";enum{DivZero=1,ModZero,Unknown
};class Task
{
public:Task(){}Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0){}void run(){switch (oper_){case '+':result_ = data1_ + data2_;break;case '-':result_ = data1_ - data2_;break;case '*':result_ = data1_ * data2_;break;case '/':{if(data2_ == 0) exitcode_ = DivZero;else result_ = data1_ / data2_;}break;case '%':{if(data2_ == 0) exitcode_ = ModZero;else result_ = data1_ % data2_;} break;default:exitcode_ = Unknown;break;}}void operator ()(){run();}std::string GetResult(){std::string r = std::to_string(data1_);r += oper_;r += std::to_string(data2_);r += "=";r += std::to_string(result_);r += "[code: ";r += std::to_string(exitcode_);r += "]";return r;}std::string GetTask(){std::string r = std::to_string(data1_);r += oper_;r += std::to_string(data2_);r += "=?";return r;}~Task(){}private:int data1_;int data2_;char oper_;int result_;int exitcode_;
};