線程池的應(yīng)用
在我認(rèn)知中,任何網(wǎng)絡(luò)服務(wù)器都是一個(gè)死循環(huán)。這個(gè)死循環(huán)長(zhǎng)下面這個(gè)樣子。
基本上服務(wù)器框架都是基于這個(gè)架構(gòu)而不斷開發(fā)拓展的。
這個(gè)死循環(huán)總共分為四個(gè)步驟,可以涵蓋所有客戶端的需求,然而目前絕大多數(shù)企業(yè)不會(huì)用這樣的架構(gòu)。
問題在于容易產(chǎn)生阻塞。
作為客戶端,我們當(dāng)然希望訪問服務(wù)器的時(shí)候,能夠在短時(shí)間內(nèi)收到回復(fù),意味著自己連接上了該服務(wù)器。但是上述架構(gòu)卻很容易產(chǎn)生響應(yīng)延遲。
當(dāng)某一個(gè)連接的2.3.4時(shí)間過長(zhǎng),也許是因?yàn)榭蛻羯蟼髁撕艽蟮臄?shù)據(jù),也許是因?yàn)闃I(yè)務(wù)處理起來比較麻煩,需要計(jì)算很多東西,也許是客戶需要下載很大的東西,總之只要2,3,4的時(shí)間延遲,意味著下一個(gè)循環(huán)處理其它連接的動(dòng)作也會(huì)被無限延遲。
也就是說,系統(tǒng)需要處理一個(gè)很慢的客戶端的連接,后面的所有連接,哪怕只是耗時(shí)很短的任務(wù),都需要等這個(gè)很慢的任務(wù)完成才能進(jìn)行。
所以除了像redis的服務(wù)器,數(shù)據(jù)庫(kù)都是基于hash的key-value結(jié)構(gòu),業(yè)務(wù)處理起來十分快速,才會(huì)將1,2,3,4都在一個(gè)線程中完成,其它的服務(wù)器若要提供千萬乃至億級(jí)別的客戶接入量,必須更快地處理客戶的連接,解除1和234之間的耦合,這才引入了多線程,我的主線程只負(fù)責(zé)1,然后將2,3,4分發(fā)到其它線程中執(zhí)行。
然而,如果服務(wù)器選擇這種多線程架構(gòu),當(dāng)我們面臨著巨大的客戶端流量,則勢(shì)必需要頻繁地創(chuàng)建和銷毀線程,這個(gè)過程十分浪費(fèi)系統(tǒng)資源,還容易造成系統(tǒng)崩潰,然后老板震驚,被迫畢業(yè),流落街頭,思之令人發(fā)笑。
解決辦法就是線程池。
我們預(yù)先創(chuàng)建好一系列線程,就好比后宮佳麗三千,然后皇上(線程池中樞)來了興致(收到任務(wù)),就去翻一個(gè)妃子(線程池中某個(gè)線程)的牌子。妃子(線程)解決完需求后,回到后宮(線程池),等待下一次召喚。
不用創(chuàng)建和銷毀,而是回收利用,所有池式結(jié)構(gòu)都可以看做是一種對(duì)資源調(diào)度的緩沖,這就是線程池的精髓。
線程池設(shè)計(jì)
我們手撕線程池,目的還是搞懂基本原理,不弄太多花里胡哨的架構(gòu),比如工廠模式之類的。
當(dāng)前這個(gè)版本的線程池是基于互斥鎖和條件變量實(shí)現(xiàn)的。
預(yù)告(畫餅):無鎖線程池后續(xù)也會(huì)手撕。
線程池總體上可以分為三大組件。
- 任務(wù)隊(duì)列(存還沒有執(zhí)行的任務(wù))
- 執(zhí)行隊(duì)列(可以看成就是線程池,存放著可以用來執(zhí)行任務(wù)的線程)
- 線程池管理中樞(負(fù)責(zé)封裝前兩個(gè)類,任務(wù)的分發(fā),線程池的創(chuàng)建,銷毀,等等。對(duì)外提供統(tǒng)一的接口)
其工作流程大概如圖所示
任務(wù)隊(duì)列節(jié)點(diǎn)數(shù)據(jù)結(jié)構(gòu)
class TaskEle{
public:
void (*taskCallback)(void *arg);
string user_data;
void setFunc(void (*tcb)(void *arg)){
taskCallback = tcb;
}
};
任務(wù)隊(duì)列負(fù)責(zé)存還沒有執(zhí)行的業(yè)務(wù),我們可以將每個(gè)業(yè)務(wù)都抽象成一個(gè)函數(shù),每個(gè)函數(shù)自然有可能需要參數(shù)。
所以任務(wù)隊(duì)列的節(jié)點(diǎn)需要兩個(gè)成員:
- taskCallback:函數(shù)回調(diào),執(zhí)行客戶端想要的業(yè)務(wù)。
- user_data:函數(shù)參數(shù),包含客戶端的信息,比如socketfd等。
順便提供了一個(gè)接口,可以修改回調(diào)函數(shù)。
執(zhí)行隊(duì)列節(jié)點(diǎn)數(shù)據(jù)結(jié)構(gòu)
class ExecEle{
public:
pthread_t tid;
bool usable = true;
ThreadPool* pool;
static void* start(void* arg);
};
- tid:每個(gè)節(jié)點(diǎn)都對(duì)應(yīng)一個(gè)線程,所以需要一個(gè)id成員來存線程id,
- usable:這個(gè)成員非常妙,它代表當(dāng)前線程是否可用,默認(rèn)為true,一旦設(shè)置為false,則該線程會(huì)結(jié)束。
- 使用usable可以在最后銷毀線程池的時(shí)候,以一種優(yōu)雅的方式結(jié)束每個(gè)線程,而代替pthread_cancel這種強(qiáng)制銷毀線程的方式,因?yàn)槟悴恢谰€程中的任務(wù)是否處理完,強(qiáng)制銷毀就會(huì)使某些業(yè)務(wù)中斷。
- pool:這個(gè)成員是指向中樞管理(后面會(huì)講)的指針,主要是為了在每個(gè)線程中通過pool獲取到一個(gè)全局(對(duì)于所有線程池線程共享)的互斥鎖和條件變量。
- start:是線程池對(duì)象執(zhí)行的一個(gè)實(shí)現(xiàn)線程再回收利用的任務(wù)循環(huán)。具體實(shí)現(xiàn)代碼也是在后面會(huì)講。
線程池管理中樞設(shè)計(jì)
總體結(jié)構(gòu):
public:
//-任務(wù)隊(duì)列和執(zhí)行隊(duì)列
deque task_queue;
deque exec_queue;
//-條件變量
pthread_cond_t cont;
//-互斥鎖
pthread_mutex_t mutex;
//-線程池大小
int thread_count;
//-構(gòu)造函數(shù)
ThreadPool(int thread_count):thread_count(thread_count);
//-創(chuàng)建線程池
void createPool();
//-加入任務(wù)
void push_task(void(*tcb)(void* arg),int i);
//-利用析構(gòu)銷毀線程池
~ThreadPool();
};*>*>
關(guān)于數(shù)據(jù)成員:
- task_queue、exec_queue: 任務(wù)隊(duì)列和執(zhí)行隊(duì)列,我使用deque作為容器實(shí)現(xiàn)隊(duì)列。
- cont:所有線程共享的條件變量。
- mutex:所有線程共享的互斥鎖。
- thread_count: 線程池創(chuàng)建的時(shí)候,初始大小
關(guān)于成員方法:
- ThreadPool:構(gòu)造函數(shù)
- createPool:創(chuàng)建線程池
- push_task: 給服務(wù)器主循環(huán)用的,給線程池添加任務(wù)。
- ~ ThreadPool: 銷毀線程池,事實(shí)上應(yīng)該單獨(dú)定義一個(gè)destroyPool的api,我這里為了簡(jiǎn)便合并到析構(gòu)中了。
ExecEle的start函數(shù)實(shí)現(xiàn)
現(xiàn)在對(duì)于ThreadPool對(duì)象有概念以后,可以先將剛剛執(zhí)行隊(duì)列節(jié)點(diǎn)ExecEle的start函數(shù)實(shí)現(xiàn),其代表了每個(gè)線程池的線程始終在跑的循環(huán),在無任務(wù)分配的時(shí)候阻塞在某個(gè)位置。
//-獲得執(zhí)行對(duì)象
ExecEle *ee = (ExecEle*)arg;
while(true){
//-加鎖
pthread_mutex_lock(&(ee->pool->mutex));
while(ee->pool->task_queue.empty()){//-如果任務(wù)隊(duì)列為空,等待新任務(wù)
if(!ee->usable){
break;
}
pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
}
if(!ee -> usable){
pthread_mutex_unlock(&ee -> pool -> mutex);
break;
}
TaskEle *te = ee->pool->task_queue.front();
ee->pool->task_queue.pop_front();
//-解鎖
pthread_mutex_unlock(&(ee->pool -> mutex));
//-執(zhí)行任務(wù)回調(diào)
te->user_data+=to_string(pthread_self());
te->taskCallback(te);
}
//-刪除線程執(zhí)行對(duì)象
delete ee;
fprintf(stdout,"destroy thread %dn",pthread_self());
return NULL;
}
arg參數(shù)指向的是該線程函數(shù)對(duì)應(yīng)的執(zhí)行元素ExecEle本身的指針,我們定義其為ee。然后進(jìn)入死循環(huán),通過ee,我們可以獲得線程池中樞對(duì)象pool。
通過pool。我們可以獲得任務(wù)隊(duì)列的情況,當(dāng)任務(wù)隊(duì)列為空,則線程進(jìn)入阻塞狀態(tài),等待任務(wù)隊(duì)列有任務(wù)進(jìn)來后,通過條件變量通知,再恢復(fù)執(zhí)行。
恢復(fù)執(zhí)行后,從任務(wù)隊(duì)列中取出隊(duì)首的任務(wù),這個(gè)過程需要在mutex的范圍內(nèi),保證獨(dú)占性。
之后解除互斥鎖,開始執(zhí)行任務(wù)的回調(diào)。執(zhí)行完進(jìn)行入下個(gè)循環(huán),嘗試再次獲得互斥鎖。
最后說一說usable,當(dāng)我們銷毀線程池的時(shí)候,設(shè)置每一個(gè)線程的usable為false,那么不會(huì)立刻中斷每個(gè)線程正在執(zhí)行的回調(diào),而是等回調(diào)結(jié)束后,在下一次循環(huán)中如果檢測(cè)到usable為false后,就會(huì)退出整個(gè)大循環(huán),并釋放自己的鎖,喚醒線程池其它休眠的線程。退出大循環(huán)后,線程自然而優(yōu)雅地結(jié)束。
之后是ThreadPool自己的api實(shí)現(xiàn)
構(gòu)造函數(shù)ThreadPool實(shí)現(xiàn):
//-初始化條件變量和互斥鎖
pthread_cond_init(&cont,NULL);
pthread_mutex_init(&mutex,NULL);
}
主要為了初始化cont,mutex,thread_count。
創(chuàng)建線程池createPool實(shí)現(xiàn):
int ret;
//-初始執(zhí)行隊(duì)列
for(int i = 0;i ExecEle *ee = new ExecEle;
ee->pool = const_cast(this);
if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
delete ee;
ERR_EXIT_THREAD(ret,"pthread_create");
}
fprintf(stdout,"create thread %dn",i);
exec_queue.push_back(ee);
}
fprintf(stdout,"create pool finish...n");
}*>;++i){
通過pthread_create創(chuàng)建thread_count個(gè)線程,每個(gè)線程執(zhí)行自己的start函數(shù)進(jìn)入等待任務(wù)循環(huán),并阻塞在鎖和條件變量的位置。將exec對(duì)象push進(jìn)執(zhí)行隊(duì)列。
添加任務(wù) push_task實(shí)現(xiàn):
TaskEle *te = new TaskEle;
te->setFunc(tcb);
te->user_data = "Task "+to_string(i)+" run in thread ";
//-加鎖
pthread_mutex_lock(&mutex);
task_queue.push_back(te);
//-通知執(zhí)行隊(duì)列中的一個(gè)進(jìn)行任務(wù)
pthread_cond_signal(&cont);
//-解鎖
pthread_mutex_unlock(&mutex);
}
主要功能是構(gòu)造TaskEle對(duì)象并加入到執(zhí)行隊(duì)列中。
每個(gè)TaskEle可能需要執(zhí)行不同的業(yè)務(wù),所以push_task需要傳入對(duì)應(yīng)業(yè)務(wù)的回調(diào)tcb(task callback)
i是我加的額外參數(shù),代表主線程中連接的客戶端編號(hào),其意義可以是socketfd。
注意在修改執(zhí)行隊(duì)列(push)的時(shí)候,需要加鎖保證獨(dú)占。
銷毀線程池~ ThreadPool 實(shí)現(xiàn):
for(int i = 0;i exec_queue[i]->usable = false;
}
pthread_mutex_lock(&mutex);
//-清空任務(wù)隊(duì)列
task_queue.clear();
//-廣播給每個(gè)執(zhí)行線程令其退出(執(zhí)行線程破開循環(huán)會(huì)free掉堆內(nèi)存)
pthread_cond_broadcast(&cont);
pthread_mutex_unlock(&mutex);//-讓其他線程拿到鎖
//-等待所有線程退出
for(int i = 0;i pthread_join(exec_queue[i] -> tid,NULL);
}
//-清空?qǐng)?zhí)行隊(duì)列
exec_queue.clear();
//-銷毀鎖和條件變量
pthread_cond_destroy(&cont);
pthread_mutex_destroy(&mutex);
}();>();++i){
先將所有線程的usable設(shè)置為false,之后加鎖,清空任務(wù)隊(duì)列,并通過條件變量通知所有線程,等所有線程退出后,銷毀執(zhí)行隊(duì)列,銷毀鎖和條件變量。
業(yè)務(wù)代碼和服務(wù)器主循環(huán)
void execFunc(void* arg){
TaskEle *te =(TaskEle*)arg;
fprintf(stdout, "%sn",te->user_data.c_str());
};
int main(){
//-創(chuàng)建線程池
ThreadPool pool(100);
pool.createPool();
//-創(chuàng)建任務(wù)
for(int i =0;i<1000;++i){
pool.push_task(&execFunc,i);
}
exit(EXIT_SUCCESS);
}
隨便寫的線程執(zhí)行的業(yè)務(wù),打印一下客戶信息。
主線程創(chuàng)建100大小的線程池,并添加1000個(gè)任務(wù)(連接)。
完整代碼
//-三個(gè)組件:任務(wù)隊(duì)列,執(zhí)行隊(duì)列,線程池(中樞管理)
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
//-打印線程錯(cuò)誤專用,根據(jù)err來識(shí)別錯(cuò)誤信息
static inline void ERR_EXIT_THREAD(int err, const char * msg){
fprintf(stderr,"%s:%sn",strerror(err),msg);
exit(EXIT_FAILURE);
}
class ThreadPool;//-聲明
//- 任務(wù)隊(duì)列元素
class TaskEle{
public:
void (*taskCallback)(void *arg);
string user_data;
void setFunc(void (*tcb)(void *arg)){
taskCallback = tcb;
}
};
//-執(zhí)行隊(duì)列元素
class ExecEle{
public:
pthread_t tid;
bool usable = true;
ThreadPool* pool;
static void* start(void* arg);
};
//-線程池
class ThreadPool{
public:
//-任務(wù)隊(duì)列和執(zhí)行隊(duì)列
deque task_queue;
deque exec_queue;
//-條件變量
pthread_cond_t cont;
//-互斥鎖
pthread_mutex_t mutex;
//-線程池大小
int thread_count;
//-構(gòu)造函數(shù)
ThreadPool(int thread_count):thread_count(thread_count){
//-初始化條件變量和互斥鎖
pthread_cond_init(&cont,NULL);
pthread_mutex_init(&mutex,NULL);
}
void createPool(){
int ret;
//-初始執(zhí)行隊(duì)列
for(int i = 0;i ExecEle *ee = new ExecEle;
ee->pool = const_cast(this);
if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
delete ee;
ERR_EXIT_THREAD(ret,"pthread_create");
}
fprintf(stdout,"create thread %dn",i);
exec_queue.push_back(ee);
}
fprintf(stdout,"create pool finish...n");
}
//-加入任務(wù)
void push_task(void(*tcb)(void* arg),int i){
TaskEle *te = new TaskEle;
te->setFunc(tcb);
te->user_data = "Task "+to_string(i)+" run in thread ";
//-加鎖
pthread_mutex_lock(&mutex);
task_queue.push_back(te);
//-通知執(zhí)行隊(duì)列中的一個(gè)進(jìn)行任務(wù)
pthread_cond_signal(&cont);
//-解鎖
pthread_mutex_unlock(&mutex);
}
//-銷毀線程池
~ThreadPool() {
for(int i = 0;i exec_queue[i]->usable = false;
}
pthread_mutex_lock(&mutex);
//-清空任務(wù)隊(duì)列
task_queue.clear();
//-廣播給每個(gè)執(zhí)行線程令其退出(執(zhí)行線程破開循環(huán)會(huì)free掉堆內(nèi)存)
pthread_cond_broadcast(&cont);
pthread_mutex_unlock(&mutex);//-讓其他線程拿到鎖
//-等待所有線程退出
for(int i = 0;i pthread_join(exec_queue[i] -> tid,NULL);
}
//-清空?qǐng)?zhí)行隊(duì)列
exec_queue.clear();
//-銷毀鎖和條件變量
pthread_cond_destroy(&cont);
pthread_mutex_destroy(&mutex);
}
};
void* ExecEle::start(void*arg){
//-獲得執(zhí)行對(duì)象
ExecEle *ee = (ExecEle*)arg;
while(true){
//-加鎖
pthread_mutex_lock(&(ee->pool->mutex));
while(ee->pool->task_queue.empty()){//-如果任務(wù)隊(duì)列為空,等待新任務(wù)
if(!ee->usable){
break;
}
pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
}
if(!ee -> usable){
pthread_mutex_unlock(&ee -> pool -> mutex);
break;
}
TaskEle *te = ee->pool->task_queue.front();
ee->pool->task_queue.pop_front();
//-解鎖
pthread_mutex_unlock(&(ee->pool -> mutex));
//-執(zhí)行任務(wù)回調(diào)
te->user_data+=to_string(pthread_self());
te->taskCallback(te);
}
//-刪除線程執(zhí)行對(duì)象
delete ee;
fprintf(stdout,"destroy thread %dn",pthread_self());
return NULL;
}
//-線程執(zhí)行的業(yè)務(wù)函數(shù)
void execFunc(void* arg){
TaskEle *te =(TaskEle*)arg;
fprintf(stdout, "%sn",te->user_data.c_str());
};
int main(){
//-創(chuàng)建線程池
ThreadPool pool(100);
pool.createPool();
//-創(chuàng)建任務(wù)
for(int i =0;i<1000;++i){
pool.push_task(&execFunc,i);
}
exit(EXIT_SUCCESS);
}();>();++i){
*>;++i){
*>*>
-
服務(wù)器
+關(guān)注
關(guān)注
13文章
9796瀏覽量
88015 -
數(shù)據(jù)庫(kù)
+關(guān)注
關(guān)注
7文章
3927瀏覽量
66272 -
線程池
+關(guān)注
關(guān)注
0文章
57瀏覽量
7138
發(fā)布評(píng)論請(qǐng)先 登錄
基于線程池技術(shù)集群接入點(diǎn)的應(yīng)用研究
如何正確關(guān)閉線程池
基于Nacos的簡(jiǎn)單動(dòng)態(tài)化線程池實(shí)現(xiàn)
線程池的兩個(gè)思考

Spring 的線程池應(yīng)用

線程池三大核心參數(shù)的含義 線程池核心線程數(shù)制定策略

評(píng)論