定時器屬于基本的基礎(chǔ)組件,不管是用戶空間的程序開發(fā),還是內(nèi)核空間的程序開發(fā),很多時候都需要有定時器作為基礎(chǔ)組件的支持,但使用場景的不同,對定時器的實現(xiàn)考慮也不盡相同,本文討論了在 Linux 環(huán)境下,應(yīng)用層和內(nèi)核層的定時器的各種實現(xiàn)方法,并分析了各種實現(xiàn)方法的利弊以及適宜的使用環(huán)境。
概論
定時器屬于基本的基礎(chǔ)組件,不管是用戶空間的程序開發(fā),還是內(nèi)核空間的程序開發(fā),很多時候都需要有定時器作為基礎(chǔ)組件的支持,但使用場景的不同,對定時器的實現(xiàn)考慮也不盡相同,本文討論了在 Linux 環(huán)境下,應(yīng)用層和內(nèi)核層的定時器的各種實現(xiàn)方法,并分析了各種實現(xiàn)方法的利弊以及適宜的使用環(huán)境。
首先,給出一個基本模型,定時器的實現(xiàn),需要具備以下幾個行為,這也是在后面評判各種定時器實現(xiàn)的一個基本模型 [1]:
StartTimer(Interval, TimerId, ExpiryAction)
注冊一個時間間隔為 Interval 后執(zhí)行 ExpiryAction 的定時器實例,其中,返回 TimerId 以區(qū)分在定時器系統(tǒng)中的其他定時器實例。
StopTimer(TimerId)
根據(jù) TimerId 找到注冊的定時器實例并執(zhí)行 Stop 。
PerTickBookkeeping()
在一個 Tick 內(nèi),定時器系統(tǒng)需要執(zhí)行的動作,它最主要的行為,就是檢查定時器系統(tǒng)中,是否有定時器實例已經(jīng)到期。注意,這里的 Tick 實際上已經(jīng)隱含了一個時間粒度 (granularity) 的概念。
ExpiryProcessing()
在定時器實例到期之后,執(zhí)行預(yù)先注冊好的 ExpiryAction 行為。
上面說了基本的定時器模型,但是針對實際的使用情況,又有以下 2 種基本行為的定時器:
Single-Shot Timer
這種定時器,從注冊到終止,僅僅只執(zhí)行一次。
Repeating Timer
這種定時器,在每次終止之后,會自動重新開始。本質(zhì)上,可以認為 Repeating Timer 是在 Single-Shot Timer 終止之后,再次注冊到定時器系統(tǒng)里的 Single-Shot Timer,因此,在支持 Single-Shot Timer 的基礎(chǔ)上支持 Repeating Timer 并不算特別的復(fù)雜。
基于鏈表和信號實現(xiàn)定時器 (2.4 版內(nèi)核情況下 )
在 2.4 的內(nèi)核中,并沒有提供 POSIX timer [ 2 ]的支持,要在進程環(huán)境中支持多個定時器,只能自己來實現(xiàn),好在 Linux 提供了 setitimer(2) 的接口。它是一個具有間隔功能的定時器 (interval timer),但如果想在進程環(huán)境中支持多個計時器,不得不自己來管理所有的計時器。 setitimer(2) 的定義如下:
清單 1. setitimer 的原型
#include ? setitimer 能夠在 Timer 到期之后,自動再次啟動自己,因此,用它來解決 Single-Shot Timer 和 Repeating Timer 的問題顯得很簡單。該函數(shù)可以工作于 3 種模式: ITIMER_REAL 以實時時間 (real time) 遞減,在到期之后發(fā)送 SIGALRM 信號 ITIMER_VIRTUAL 僅進程在用戶空間執(zhí)行時遞減,在到期之后發(fā)送 SIGVTALRM 信號 ITIMER_PROF 進程在用戶空間執(zhí)行以及內(nèi)核為該進程服務(wù)時 ( 典型如完成一個系統(tǒng)調(diào)用 ) 都會遞減,與 ITIMER_VIRTUAL 共用時可度量該應(yīng)用在內(nèi)核空間和用戶空間的時間消耗情況,在到期之后發(fā)送 SIGPROF 信號 定時器的值由下面的結(jié)構(gòu)定義: struct itimerval { struct timeval it_interval; /* next value */ struct timeval it_value; /* current value */ }; struct timeval { long tv_sec; /* seconds */ long tv_usec; /* microseconds */ }; ? setitimer() 以 new_value 設(shè)置特定的定時器,如果 old_value 非空,則它返回 which 類型時間間隔定時器的前一個值。定時器從 it_value 遞減到零,然后產(chǎn)生一個信號,并重新設(shè)置為 it_interval,如果此時 it_interval 為零,則該定時器停止。任何時候,只要 it_value 設(shè)置為零,該定時器就會停止。 由于 setitimer() 不支持在同一進程中同時使用多次以支持多個定時器,因此,如果需要同時支持多個定時實例的話,需要由實現(xiàn)者來管理所有的實例。用 setitimer() 和鏈表,可以構(gòu)造一個在進程環(huán)境下支持多個定時器實例的 Timer,在一般的實現(xiàn)中的 PerTickBookkeeping 時,會遞增每個定時器的 elapse 值,直到該值遞增到最初設(shè)定的 interval 則表示定時器到期。 基于鏈表實現(xiàn)的定時器可以定義為: typedef int timer_id; /** * The type of callback function to be called by timer scheduler when a timer * has expired. * * @param id The timer id. * @param user_data The user data. * $param len The length of user data. */ typedef int timer_expiry(timer_id id, void *user_data, int len); /** * The type of the timer */ struct timer { LIST_ENTRY(timer) entries;/**< list entry */ timer_id id; /**< timer id */ int interval; /**< timer interval(second) */ int elapse; /**< 0 -> interval */ timer_expiry *cb; /**< call if expiry */ void *user_data; /**< callback arg */ int len; /**< user_data length */ }; ? 定時器的時間間隔以 interval 表示,而 elapse 則在 PerTickBookkeeping() 時遞增,直到 interval 表示定時器中止,此時調(diào)用回調(diào)函數(shù) cb 來執(zhí)行相關(guān)的行為,而 user_data 和 len 為用戶可以傳遞給回調(diào)函數(shù)的參數(shù)。 所有的定時器實例以鏈表來管理: /** * The timer list */ struct timer_list { LIST_HEAD(listheader, timer) header; /**< list header */ int num; /**< timer entry number */ int max_num; /**< max entry number */ void (*old_sigfunc)(int); /**< save previous signal handler */ void (*new_sigfunc)(int); /**< our signal handler */ struct itimerval ovalue; /**< old timer value */ struct itimerval value; /**< our internal timer value */ }; ? 這里關(guān)于鏈表的實現(xiàn)使用了 BSD 風(fēng)格關(guān)于鏈表的一組宏,避免了再造輪子;該結(jié)構(gòu)中,old_sigfunc 在 init_timer 初始定時器鏈表時候用來保存系統(tǒng)對 SIGALRM 的處理函數(shù),在定時器系統(tǒng) destory 時用來恢復(fù)到之前的處理函數(shù); ovalue 的用途與此類似。 /** * Create a timer list. * * @param count The maximum number of timer entries to be supported initially. * * @return 0 means ok, the other means fail. */ int init_timer(int count) { int ret = 0; if(count <=0 || count > MAX_TIMER_NUM) { printf("the timer max number MUST less than %d.\n", MAX_TIMER_NUM); return -1; } memset(&timer_list, 0, sizeof(struct timer_list)); LIST_INIT(&timer_list.header); timer_list.max_num = count; /* Register our internal signal handler and store old signal handler */ if ((timer_list.old_sigfunc = signal(SIGALRM, sig_func)) == SIG_ERR) { return -1; } timer_list.new_sigfunc = sig_func; /*Setting our interval timer for driver our mutil-timer and store old timer value*/ timer_list.value.it_value.tv_sec = TIMER_START; timer_list.value.it_value.tv_usec = 0; timer_list.value.it_interval.tv_sec = TIMER_TICK; timer_list.value.it_interval.tv_usec = 0; ret = setitimer(ITIMER_REAL, &timer_list.value, &timer_list.ovalue); return ret; } /** * Destroy the timer list. * * @return 0 means ok, the other means fail. */ int destroy_timer(void) { struct timer *node = NULL; if ((signal(SIGALRM, timer_list.old_sigfunc)) == SIG_ERR) { return -1; } if((setitimer(ITIMER_REAL, &timer_list.ovalue, &timer_list.value)) < 0) { return -1; } while (!LIST_EMPTY(&timer_list.header)) { /* Delete. */ node = LIST_FIRST(&timer_list.header); LIST_REMOVE(node, entries); /* Free node */ printf("Remove id %d\n", node->id); free(node->user_data); free(node); } memset(&timer_list, 0, sizeof(struct timer_list)); return 0; } ? 添加定時器的動作非常的簡單,本質(zhì)只是一個鏈表的插入而已: /** * Add a timer to timer list. * * @param interval The timer interval(second). * @param cb When cb!= NULL and timer expiry, call it. * @param user_data Callback's param. * @param len The length of the user_data. * * @return The timer ID, if == INVALID_TIMER_ID, add timer fail. */ timer_id add_timer(int interval, timer_expiry *cb, void *user_data, int len) { struct timer *node = NULL; if (cb == NULL || interval <= 0) { return INVALID_TIMER_ID; } if(timer_list.num < timer_list.max_num) { timer_list.num++; } else { return INVALID_TIMER_ID; } if((node = malloc(sizeof(struct timer))) == NULL) { return INVALID_TIMER_ID; } if(user_data != NULL || len != 0) { node->user_data = malloc(len); memcpy(node->user_data, user_data, len); node->len = len; } node->cb = cb; node->interval = interval; node->elapse = 0; node->id = timer_list.num; LIST_INSERT_HEAD(&timer_list.header, node, entries); return node->id; } ? 注冊的信號處理函數(shù)則用來驅(qū)動定時器系統(tǒng): /* Tick Bookkeeping */ static void sig_func(int signo) { struct timer *node = timer_list.header.lh_first; for ( ; node != NULL; node = node->entries.le_next) { node->elapse++; if(node->elapse >= node->interval) { node->elapse = 0; node->cb(node->id, node->user_data, node->len); } } } ? 它主要是在每次收到 SIGALRM 信號時,執(zhí)行定時器鏈表中的每個定時器 elapse 的自增操作,并與 interval 相比較,如果相等,代表注冊的定時器已經(jīng)超時,這時則調(diào)用注冊的回調(diào)函數(shù)。 上面的實現(xiàn),有很多可以優(yōu)化的地方:考慮另外一種思路,在定時器系統(tǒng)內(nèi)部將維護的相對 interval 轉(zhuǎn)換成絕對時間,這樣,在每 PerTickBookkeeping 時,只需將當前時間與定時器的絕對時間相比較,就可以知道是否該定時器是否到期。這種方法,把遞增操作變?yōu)榱吮容^操作。并且上面的實現(xiàn)方式,效率也不高,在執(zhí)行 StartTimer,StopTimer,PerTickBookkeeping 時,算法復(fù)雜度分別為 O(1),O(n),O(n),可以對上面的實現(xiàn)做一個簡單的改進,在 StartTimer 時,即在添加 Timer 實例時,對鏈表進行排序,這樣的改進,可以使得在執(zhí)行 StartTimer,StopTimer,PerTickBookkeeping 時,算法復(fù)雜度分別為 O(n),O(1),O(1) 。改進后的定時器系統(tǒng)如下圖 1: ? 基于 2.6 版本內(nèi)核定時器的實現(xiàn) (Posix 實時定時器 ) Linux 自 2.6 開始,已經(jīng)開始支持 POSIX timer [ 2 ]所定義的定時器,它主要由下面的接口構(gòu)成 : #include ? 這套接口是為了讓操作系統(tǒng)對實時有更好的支持,在鏈接時需要指定 -lrt 。 timer_create(2): 創(chuàng)建了一個定時器。 timer_settime(2): 啟動或者停止一個定時器。 timer_gettime(2): 返回到下一次到期的剩余時間值和定時器定義的時間間隔。出現(xiàn)該接口的原因是,如果用戶定義了一個 1ms 的定時器,可能當時系統(tǒng)負荷很重,導(dǎo)致該定時器實際山 10ms 后才超時,這種情況下,overrun=9ms 。 timer_getoverrun(2): 返回上次定時器到期時超限值。 timer_delete(2): 停止并刪除一個定時器。 上面最重要的接口是 timer_create(2),其中,clockid 表明了要使用的時鐘類型,在 POSIX 中要求必須實現(xiàn) CLOCK_REALTIME 類型的時鐘。 evp 參數(shù)指明了在定時到期后,調(diào)用者被通知的方式。該結(jié)構(gòu)體定義如下 : union sigval { int sival_int; void *sival_ptr; }; struct sigevent { int sigev_notify; /* Notification method */ int sigev_signo; /* Timer expiration signal */ union sigval sigev_value; /* Value accompanying signal or passed to thread function */ void (*sigev_notify_function) (union sigval); /* Function used for thread notifications (SIGEV_THREAD) */ void *sigev_notify_attributes; /* Attributes for notification thread (SIGEV_THREAD) */ pid_t sigev_notify_thread_id; /* ID of thread to signal (SIGEV_THREAD_ID) */ }; ? 其中,sigev_notify 指明了通知的方式 : SIGEV_NONE 當定時器到期時,不發(fā)送異步通知,但該定時器的運行進度可以使用 timer_gettime(2) 監(jiān)測。 SIGEV_SIGNAL 當定時器到期時,發(fā)送 sigev_signo 指定的信號。 SIGEV_THREAD 當定時器到期時,以 sigev_notify_function 開始一個新的線程。該函數(shù)使用 sigev_value 作為其參數(shù),當 sigev_notify_attributes 非空,則制定該線程的屬性。注意,由于 Linux 上線程的特殊性,這個功能實際上是由 glibc 和內(nèi)核一起實現(xiàn)的。 SIGEV_THREAD_ID (Linux-specific) 僅推薦在實現(xiàn)線程庫時候使用。 如果 evp 為空的話,則該函數(shù)的行為等效于:sigev_notify = SIGEV_SIGNAL,sigev_signo = SIGVTALRM,sigev_value.sival_int = timer ID 。 由于 POSIX timer [ 2 ]接口支持在一個進程中同時擁有多個定時器實例,所以在上面的基于 setitimer() 和鏈表的 PerTickBookkeeping 動作就交由 Linux 內(nèi)核來維護,這大大減輕了實現(xiàn)定時器的負擔(dān)。由于 POSIX timer [ 2 ]接口在定時器到期時,有更多的控制能力,因此,可以使用實時信號避免信號的丟失問題,并將 sigev_value.sival_int 值指定為 timer ID,這樣,就可以將多個定時器一起管理了。需要注意的是,POSIX timer [ 2 ]接口只在進程環(huán)境下才有意義 (fork(2) 和 exec(2) 也需要特殊對待 ),并不適合多線程環(huán)境。與此相類似的,Linux 提供了基于文件描述符的相關(guān)定時器接口: #include ? 這樣,由于基于文件描述符,使得該接口可以支持 select(2),poll(2) 等異步接口,使得定時器的實現(xiàn)和使用更加的方便,更重要的是,支持 fork(2),exec(2) 這樣多進程的語義,因此,可以用在多線程環(huán)境之中,它們的使用比 POSIX timer [ 2 ]更加的靈活,其根本原因在于定時器的管理統(tǒng)一到了 unix/linux 基本哲學(xué)之一 ---- “一切皆文件”之下。 最小堆實現(xiàn)的定時器 最小堆指的是滿足除了根節(jié)點以外的每個節(jié)點都不小于其父節(jié)點的堆。這樣,堆中的最小值就存放在根節(jié)點中,并且在以某個結(jié)點為根的子樹中,各節(jié)點的值都不小于該子樹根節(jié)點的值。一個最小堆的例子如下圖 2: 一個最小堆,一般支持以下幾種操作: Insert(TimerHeap, Timer): 在堆中插入一個值,并保持最小堆性質(zhì),具體對應(yīng)于定時器的實現(xiàn),則是把定時器插入到定時器堆中。根據(jù)最小堆的插入算法分析,可以知道該操作的時間復(fù)雜度為 O(lgn) 。 Minimum(TimerHeap): 獲取最小堆的中最小值;在定時器系統(tǒng)中,則是返回定時器堆中最先可能終止的定時器。由于是最小堆,只需返回堆的 root 即可。此時的算法復(fù)雜度為 O(1) 。 ExtractMin(TimerHeap): 在定時器到期后,執(zhí)行相關(guān)的動作,它的算法復(fù)雜度為 O(1) 。 最小堆本質(zhì)上是一種最小優(yōu)先級隊列 (min-priority queue) 。定時可以作為最小優(yōu)先級隊列的一個應(yīng)用,該優(yōu)先級隊列把定時器的時間間隔值轉(zhuǎn)化為一個絕對時間來處理,ExtractMin 操則是在所有等待的定時器中,找出最先超時的定時器。在任何時候,一個新的定時器實例都可通過 Insert 操作加入到定時器隊列中去。 在 pjsip 項目的基礎(chǔ)庫 pjlib 中,有基于最小堆實現(xiàn)的定時器,它主要提供了以下的幾個接口: /** * Create a timer heap. */ PJ_DECL(pj_status_t) pj_timer_heap_create( pj_pool_t *pool, pj_size_t count, pj_timer_heap_t **ht); /** * Destroy the timer heap. */ PJ_DECL(void) pj_timer_heap_destroy( pj_timer_heap_t *ht ); /** * Initialize a timer entry. Application should call this function at least * once before scheduling the entry to the timer heap, to properly initialize * the timer entry. */ PJ_DECL(pj_timer_entry*) pj_timer_entry_init( pj_timer_entry *entry, int id, void *user_data, pj_timer_heap_callback *cb ); /** * Schedule a timer entry which will expire AFTER the specified delay. */ PJ_DECL(pj_status_t) pj_timer_heap_schedule( pj_timer_heap_t *ht, pj_timer_entry *entry, const pj_time_val *delay); /** * Cancel a previously registered timer. */ PJ_DECL(int) pj_timer_heap_cancel( pj_timer_heap_t *ht, pj_timer_entry *entry); /** * Poll the timer heap, check for expired timers and call the callback for * each of the expired timers. */ PJ_DECL(unsigned) pj_timer_heap_poll( pj_timer_heap_t *ht, pj_time_val *next_delay); ? pjlib 中的定時器在內(nèi)部使用數(shù)組的方式實現(xiàn)堆,這樣對于內(nèi)存空間的使用將更加的緊湊;它的實現(xiàn)還可在定時器的數(shù)量超過預(yù)先設(shè)定的最大數(shù)量時會自己增加最大定時器數(shù)量。文件 pjlib/src/pjlib-test/timer.c 是它的一個單元測試。與基于鏈表方式的實現(xiàn)相比較,明顯它的時間復(fù)雜度要低一些,這樣可以支持更多的定時器實例。 基于時間輪 (Timing-Wheel) 方式實現(xiàn)的定時器 時間輪 (Timing-Wheel) 算法類似于一以恒定速度旋轉(zhuǎn)的左輪手槍,槍的撞針則撞擊槍膛,如果槍膛中有子彈,則會被擊發(fā);與之相對應(yīng)的是:對于 PerTickBookkeeping,其最本質(zhì)的工作在于以 Tick 為單位增加時鐘,如果發(fā)現(xiàn)有任何定時器到期,則調(diào)用相應(yīng)的 ExpiryProcessing 。設(shè)定一個循環(huán)為 N 個 Tick 單元,當前時間是在 S 個循環(huán)之后指向元素 i (i>=0 and i<= N - 1),則當前時間 (Current Time)Tc 可以表示為:Tc = S*N + i ;如果此時插入一個時間間隔 (Time Interval) 為 Ti 的定時器,設(shè)定它將會放入元素 n(Next) 中,則 n = (Tc + Ti)mod N = (S*N + i + Ti) mod N = (i + Ti) mod N 。如果我們的 N 足夠的大,顯然 StartTimer,StopTimer,PerTickBookkeeping 時,算法復(fù)雜度分別為 O(1),O(1),O(1) 。在 [5] 中,給出了一個簡單定時器輪實現(xiàn)的定時。下圖 3 是一個簡單的時間輪定時器: 如果需要支持的定時器范圍非常的大,上面的實現(xiàn)方式則不能滿足這樣的需求。因為這樣將消耗非常可觀的內(nèi)存,假設(shè)需要表示的定時器范圍為:0 – 2^3-1ticks,則簡單時間輪需要 2^32 個元素空間,這對于內(nèi)存空間的使用將非常的龐大。也許可以降低定時器的精度,使得每個 Tick 表示的時間更長一些,但這樣的代價是定時器的精度將大打折扣?,F(xiàn)在的問題是,度量定時器的粒度,只能使用唯一粒度嗎?想想日常生活中常遇到的水表,如下圖 4: 在上面的水表中,為了表示度量范圍,分成了不同的單位,比如 1000,100,10 等等,相似的,表示一個 32bits 的范圍,也不需要 2^32 個元素的數(shù)組。實際上,Linux 的內(nèi)核把定時器分為 5 組,每組的粒度分別表示為:1 jiffies,256 jiffies,256*64 jiffies,256*64*64 jiffies,256*64*64*64 jiffies,每組中桶的數(shù)量分別為:256,64,64,64,64,這樣,在 256+64+64+64+64 = 512 個桶中,表示的范圍為 2^32 。有了這樣的實現(xiàn),驅(qū)動內(nèi)核定時器的機制也可以通過水表的例子來理解了,就像水表,每個粒度上都有一個指針指向當前時間,時間以固定 tick 遞增,而當前時間指針則也依次遞增,如果發(fā)現(xiàn)當前指針的位置可以確定為一個注冊的定時器,就觸發(fā)其注冊的回調(diào)函數(shù)。 Linux 內(nèi)核定時器本質(zhì)上是 Single-Shot Timer,如果想成為 Repeating Timer,可以在注冊的回調(diào)函數(shù)中再次的注冊自己。內(nèi)核定時器如下圖 5: 結(jié)論 由上面的分析,可以看到各種定時器實現(xiàn)算法的復(fù)雜度: ? 如果需要能在線程環(huán)境中使用的定時器,對于基于鏈表的定時器,可能需要很小心的處理信號的問題;而 POSIX timer [ 2 ]接口的定時器,只具有進程的語義,如果想在多線程環(huán)境下也 n 能使用,可以使用 Linux 提供的 timerfd_create(2) 接口。如果需要支持的定時器數(shù)量非常的大,可以考慮使用基于最小堆和時間輪的方式來實現(xiàn)
清單 2. setitimer 定時器的值定義
清單 3. 基于鏈表的定時器定義
清單 4. 定時器鏈表
清單 5. 定時器鏈表的創(chuàng)建和 Destroy
清單 6. 向定時器鏈表中添加定時器
清單 7. 信號處理函數(shù)驅(qū)動定時器
圖 1. 基于排序鏈表的定時器
清單 8. POSIX timer 接口
清單 9. POSIX timer 接口中的信號和事件定義
清單 10. Linux 提供的基于文件描述符的定時器接口
圖 2. 最小堆
?
清單 10. pjlib 提供的基于最小堆的定時器接口
圖 3. 簡單時間輪
?
圖 4. 水表
?
圖 5. Linux 時間輪
?
表 1. 定時器實現(xiàn)算法復(fù)雜度
電子發(fā)燒友App




















評論