懒惰计数器是并发数据结构,它是性能与准确度折中的结果。

懒惰计数器通过多个局部计数器和一个全局计数器来实现一个逻辑计数器,其中每个CPU核心有一个局部计数器,另外有一个全局的计数器。

基本思想是如果一个核心上的线程想要增加计数器,那就增加它的局部计数器,访问这个局部计数器通过对应的局部锁同步。因为每个CPU有自己的局部计数器,所以不同CPU上的线程不会竞争,但是为了保持全局计数器更新,局部值会定期转移给全局计数器,方法是获取全局锁,让全局计数器加上局部计数器的值,然后局部计数器置零。

这里设定的一个阈值与扩展性成反比,阈值越小,扩展性越差,阈值越大,扩展性越高,相对应精度就越低。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>

#define NUMCPUS 10

typedef struct counter_t {
int global; //global count
pthread_mutex_t glock; //global lock
int local[NUMCPUS]; //local count (per cpu)
pthread_mutex_t llock[NUMCPUS]; // .. and locks
int threshold; // update frequency

} counter_t;

void init(counter_t *c, int threshold) {
c->threshold = threshold;

c->global = 0;
pthread_mutex_init(&c->glock, NULL);

int i;
for(i = 0; i < NUMCPUS; i++) {
c->local[i] = 0;
pthread_mutex_init(&c->llock[i], NULL);
}
}

void update(counter_t *c, int threadID, int amt) {
pthread_mutex_lock(&c->llock[threadID]);
c->local[threadID] += amt;
if(c->local[threadID] >= c->threshold) {
pthread_mutex_lock(&c->glock);
c->global += c->local[threadID];
pthread_mutex_unlock(&c->glock);
c->local[threadID] = 0;
}
pthread_mutex_unlock(&c->llock[threadID]);
}

int get(counter_t *c){
pthread_mutex_lock(&c->glock);
int val = c->global;
pthread_mutex_unlock(&c->glock);
return val;
}

void force_update(counter_t *c){
pthread_mutex_lock(&c->glock);
for(int i = 0; i < NUMCPUS; i++) {
pthread_mutex_lock(&c->llock[i]);
c->global += c->local[i];
c->local[i] = 0;
pthread_mutex_unlock(&c->llock[i]);
}
pthread_mutex_unlock(&c->glock);
}

counter_t c;

void* increment_counter(void* arg){
printf("%s: start\n", (char*) arg);
for(int i = 0; i < 2e5; i++){
for(int j = 0; j < NUMCPUS; j++){
update(&c, j, 1);
}
}
printf("%s: end, counter is %d\n", (char*) arg, get(&c));
}


int main() {
pthread_t p1,p2,p3,p4;
init(&c, 1024);
pthread_create(&p1, NULL, increment_counter, "A");
pthread_create(&p2, NULL, increment_counter, "B");
pthread_create(&p3, NULL, increment_counter, "C");
pthread_create(&p4, NULL, increment_counter, "D");
pthread_join(p1, NULL);
pthread_join(p2, NULL);
pthread_join(p3, NULL);
pthread_join(p4, NULL);
force_update(&c);
printf("finally counter is %d\n", get(&c));
return 0;
}