前置知识
在实现线程安全的消息队列前,需要了解一些线程安全相关的库函数用法。
消息队列实现
1. 不考虑线程安全的消息队列实现
以下是一个不考虑线程安全的简单消息队列实现。这个实现中没有使用任何锁或条件变量,因此不适用于多线程环境,但在单线程环境或明确控制访问时可以使用。
1#include <stdio.h>
2#include <stdlib.h>
3
4#define QUEUE_MAX_SIZE 10
5#define QUEUE_OK 0
6#define QUEUE_ERROR -1
7#define QUEUE_FULL 1
8#define QUEUE_EMPTY 2
9
10typedef struct {
11 void **data; // 存储消息数据的指针数组
12 unsigned int nelts; // 当前队列中的元素数量
13 unsigned int in; // 下一次插入的位置
14 unsigned int out; // 下一次弹出的位置
15 unsigned int bounds; // 队列的最大容量
16} message_queue_t;
17
18// 初始化队列
19message_queue_t *queue_create(unsigned int max_size) {
20 message_queue_t *queue = (message_queue_t *)malloc(sizeof(message_queue_t));
21 if (!queue) return NULL;
22
23 queue->data = (void **)malloc(max_size * sizeof(void *));
24 if (!queue->data) {
25 free(queue);
26 return NULL;
27 }
28
29 queue->nelts = 0;
30 queue->in = 0;
31 queue->out = 0;
32 queue->bounds = max_size;
33
34 return queue;
35}
36
37// 销毁队列
38void queue_destroy(message_queue_t *queue) {
39 free(queue->data);
40 free(queue);
41}
42
43// 插入消息到队列
44int queue_push(message_queue_t *queue, void *data) {
45 if (queue->nelts == queue->bounds) {
46 return QUEUE_FULL; // 队列已满
47 }
48
49 queue->data[queue->in] = data;
50 queue->in = (queue->in + 1) % queue->bounds;
51 queue->nelts++;
52
53 return QUEUE_OK;
54}
55
56// 从队列中弹出消息
57int queue_pop(message_queue_t *queue, void **data) {
58 if (queue->nelts == 0) {
59 return QUEUE_EMPTY; // 队列为空
60 }
61
62 *data = queue->data[queue->out];
63 queue->out = (queue->out + 1) % queue->bounds;
64 queue->nelts--;
65
66 return QUEUE_OK;
67}
68
69// 测试代码
70int main() {
71 message_queue_t *queue = queue_create(QUEUE_MAX_SIZE);
72 if (!queue) {
73 printf("Failed to create queue\n");
74 return QUEUE_ERROR;
75 }
76
77 // 插入消息
78 for (int i = 0; i < 15; ++i) {
79 int *message = (int *)malloc(sizeof(int));
80 *message = i;
81 int result = queue_push(queue, message);
82 if (result == QUEUE_FULL) {
83 printf("Queue is full, cannot push message %d\n", *message);
84 free(message); // 队列已满时释放消息
85 } else {
86 printf("Produced: %d\n", *message);
87 }
88 }
89
90 // 弹出消息
91 void *data;
92 while (queue_pop(queue, &data) == QUEUE_OK) {
93 int *message = (int *)data;
94 printf("Consumed: %d\n", *message);
95 free(message);
96 }
97
98 queue_destroy(queue);
99 return 0;
100}
实现说明
queue_create:- 初始化一个新的消息队列,分配内存,并设置初始值。
queue_destroy:- 销毁队列,释放已分配的内存。
queue_push:- 将消息插入到队列中。如果队列已满,返回
QUEUE_FULL。
- 将消息插入到队列中。如果队列已满,返回
queue_pop:- 从队列中弹出消息。如果队列为空,返回
QUEUE_EMPTY。
- 从队列中弹出消息。如果队列为空,返回
main函数测试:- 创建一个消息队列,插入 15 条消息,超出队列最大容量时停止插入。
- 然后依次从队列中弹出消息,打印出结果,并释放消息内存。
2. 考虑线程安全的消息队列实现
基本的线程安全的消息队列
以下是一个使用互斥锁 (pthread_mutex_t) 和条件变量 (pthread_cond_t) 来实现的简单的线程安全消息队列,包括创建队列、推送消息、弹出消息,以及销毁队列的功能。
1#include <stdio.h>
2#include <stdlib.h>
3#include <pthread.h>
4
5#define QUEUE_MAX_SIZE 10
6
7typedef struct {
8 void **data; // 存储消息数据的指针数组
9 unsigned int nelts; // 当前队列中的元素数量
10 unsigned int in; // 下一次插入的位置
11 unsigned int out; // 下一次弹出的位置
12 unsigned int bounds; // 队列的最大容量
13 pthread_mutex_t mutex; // 用于保护队列的互斥锁
14 pthread_cond_t not_empty; // 队列不为空的条件变量
15 pthread_cond_t not_full; // 队列不满的条件变量
16 int terminated; // 队列终止标志
17} message_queue_t;
18
19// 初始化队列
20message_queue_t *queue_create(unsigned int max_size) {
21 message_queue_t *queue = (message_queue_t *)malloc(sizeof(message_queue_t));
22 if (!queue) return NULL;
23
24 queue->data = (void **)malloc(max_size * sizeof(void *));
25 if (!queue->data) {
26 free(queue);
27 return NULL;
28 }
29
30 queue->nelts = 0;
31 queue->in = 0;
32 queue->out = 0;
33 queue->bounds = max_size;
34 queue->terminated = 0;
35
36 pthread_mutex_init(&queue->mutex, NULL);
37 pthread_cond_init(&queue->not_empty, NULL);
38 pthread_cond_init(&queue->not_full, NULL);
39
40 return queue;
41}
42
43// 销毁队列
44void queue_destroy(message_queue_t *queue) {
45 pthread_mutex_destroy(&queue->mutex);
46 pthread_cond_destroy(&queue->not_empty);
47 pthread_cond_destroy(&queue->not_full);
48 free(queue->data);
49 free(queue);
50}
51
52// 插入消息到队列
53int queue_push(message_queue_t *queue, void *data) {
54 pthread_mutex_lock(&queue->mutex);
55
56 while (queue->nelts == queue->bounds && !queue->terminated) {
57 pthread_cond_wait(&queue->not_full, &queue->mutex); // 等待队列不满
58 }
59
60 if (queue->terminated) {
61 pthread_mutex_unlock(&queue->mutex);
62 return -1; // 队列已终止
63 }
64
65 queue->data[queue->in] = data;
66 queue->in = (queue->in + 1) % queue->bounds;
67 queue->nelts++;
68
69 pthread_cond_signal(&queue->not_empty); // 通知队列不为空
70 pthread_mutex_unlock(&queue->mutex);
71 return 0;
72}
73
74// 从队列中弹出消息
75int queue_pop(message_queue_t *queue, void **data) {
76 pthread_mutex_lock(&queue->mutex);
77
78 while (queue->nelts == 0 && !queue->terminated) {
79 pthread_cond_wait(&queue->not_empty, &queue->mutex); // 等待队列不为空
80 }
81
82 if (queue->terminated) {
83 pthread_mutex_unlock(&queue->mutex);
84 return -1; // 队列已终止
85 }
86
87 *data = queue->data[queue->out];
88 queue->out = (queue->out + 1) % queue->bounds;
89 queue->nelts--;
90
91 pthread_cond_signal(&queue->not_full); // 通知队列不满
92 pthread_mutex_unlock(&queue->mutex);
93 return 0;
94}
95
96// 终止队列
97void queue_terminate(message_queue_t *queue) {
98 pthread_mutex_lock(&queue->mutex);
99 queue->terminated = 1;
100 pthread_cond_broadcast(&queue->not_empty); // 唤醒所有等待线程
101 pthread_cond_broadcast(&queue->not_full); // 唤醒所有等待线程
102 pthread_mutex_unlock(&queue->mutex);
103}
104
105// 测试代码
106void *producer(void *arg) {
107 message_queue_t *queue = (message_queue_t *)arg;
108 for (int i = 0; i < 20; ++i) {
109 int *message = (int *)malloc(sizeof(int));
110 *message = i;
111 queue_push(queue, message);
112 printf("Produced: %d\n", *message);
113 }
114 queue_terminate(queue); // 生产者结束后终止队列
115 return NULL;
116}
117
118void *consumer(void *arg) {
119 message_queue_t *queue = (message_queue_t *)arg;
120 void *data;
121 while (queue_pop(queue, &data) == 0) {
122 int *message = (int *)data;
123 printf("Consumed: %d\n", *message);
124 free(message);
125 }
126 return NULL;
127}
128
129int main() {
130 message_queue_t *queue = queue_create(QUEUE_MAX_SIZE);
131 pthread_t prod_thread, cons_thread;
132
133 pthread_create(&prod_thread, NULL, producer, queue);
134 pthread_create(&cons_thread, NULL, consumer, queue);
135
136 pthread_join(prod_thread, NULL);
137 pthread_join(cons_thread, NULL);
138
139 queue_destroy(queue);
140 return 0;
141}
实现说明
创建和销毁队列:
queue_create函数分配队列结构并初始化互斥锁和条件变量。queue_destroy函数释放队列和条件变量所占用的资源。
消息的推送和弹出:
queue_push函数在队列满时等待,并在插入数据后发出不为空的信号。queue_pop函数在队列空时等待,并在弹出数据后发出不满的信号。
终止队列:
queue_terminate函数设置终止标志并唤醒所有等待的线程,避免死锁。
生产者-消费者测试:
producer和consumer函数模拟生产者和消费者线程,将数据插入和弹出队列,验证线程安全性。
考虑消息时效性的线程安全的消息队列
如果消息具有时效性(即消息只能在一定的时间范围内有效),需要在消息队列中增加超时机制,以确保过期的消息不会被处理。以下是一个简单的处理时效性消息的方法:
在插入和弹出时检查消息的超时时间
在消息结构中添加时间戳: 每条消息应带有一个时间戳,记录它的生成或有效期的结束时间。
在插入消息时检查队列的可用性: 如果队列已满,可以选择丢弃超时的旧消息,或不插入新消息,或者替换已有的超时消息。
在弹出消息时检查超时时间: 当消费者弹出消息时,检查消息的时间戳是否在当前时间范围内有效。如果消息已超时,则丢弃该消息并继续弹出下一条消息,直到找到一条有效的消息或队列为空。
下面是改进的线程安全消息队列实现,加入了消息时效性的处理:
1#include <stdio.h>
2#include <stdlib.h>
3#include <pthread.h>
4#include <time.h>
5#include <unistd.h> // for sleep
6
7#define QUEUE_MAX_SIZE 10
8
9typedef struct {
10 void *data; // 存储消息数据的指针
11 time_t expiration_time; // 消息的过期时间
12} message_t;
13
14typedef struct {
15 message_t *data; // 存储消息的结构数组
16 unsigned int nelts; // 当前队列中的元素数量
17 unsigned int in; // 下一次插入的位置
18 unsigned int out; // 下一次弹出的位置
19 unsigned int bounds; // 队列的最大容量
20 pthread_mutex_t mutex; // 用于保护队列的互斥锁
21 pthread_cond_t not_empty; // 队列不为空的条件变量
22 pthread_cond_t not_full; // 队列不满的条件变量
23 int terminated; // 队列终止标志
24} message_queue_t;
25
26// 初始化队列
27message_queue_t *queue_create(unsigned int max_size) {
28 message_queue_t *queue = (message_queue_t *)malloc(sizeof(message_queue_t));
29 if (!queue) return NULL;
30
31 queue->data = (message_t *)malloc(max_size * sizeof(message_t));
32 if (!queue->data) {
33 free(queue);
34 return NULL;
35 }
36
37 queue->nelts = 0;
38 queue->in = 0;
39 queue->out = 0;
40 queue->bounds = max_size;
41 queue->terminated = 0;
42
43 pthread_mutex_init(&queue->mutex, NULL);
44 pthread_cond_init(&queue->not_empty, NULL);
45 pthread_cond_init(&queue->not_full, NULL);
46
47 return queue;
48}
49
50// 销毁队列
51void queue_destroy(message_queue_t *queue) {
52 pthread_mutex_destroy(&queue->mutex);
53 pthread_cond_destroy(&queue->not_empty);
54 pthread_cond_destroy(&queue->not_full);
55 free(queue->data);
56 free(queue);
57}
58
59// 插入消息到队列
60int queue_push(message_queue_t *queue, void *data, time_t expiration_time) {
61 pthread_mutex_lock(&queue->mutex);
62
63 while (queue->nelts == queue->bounds && !queue->terminated) {
64 pthread_cond_wait(&queue->not_full, &queue->mutex); // 等待队列不满
65 }
66
67 if (queue->terminated) {
68 pthread_mutex_unlock(&queue->mutex);
69 return -1; // 队列已终止
70 }
71
72 queue->data[queue->in].data = data;
73 queue->data[queue->in].expiration_time = expiration_time;
74 queue->in = (queue->in + 1) % queue->bounds;
75 queue->nelts++;
76
77 pthread_cond_signal(&queue->not_empty); // 通知队列不为空
78 pthread_mutex_unlock(&queue->mutex);
79 return 0;
80}
81
82// 从队列中弹出消息
83int queue_pop(message_queue_t *queue, void **data) {
84 pthread_mutex_lock(&queue->mutex);
85
86 while (queue->nelts == 0 && !queue->terminated) {
87 pthread_cond_wait(&queue->not_empty, &queue->mutex); // 等待队列不为空
88 }
89
90 if (queue->terminated) {
91 pthread_mutex_unlock(&queue->mutex);
92 return -1; // 队列已终止
93 }
94
95 // 检查并丢弃过期消息
96 time_t current_time = time(NULL);
97 while (queue->nelts > 0 && queue->data[queue->out].expiration_time <= current_time) {
98 printf("Discarding expired message\n");
99 queue->out = (queue->out + 1) % queue->bounds;
100 queue->nelts--;
101 }
102
103 if (queue->nelts == 0) {
104 pthread_mutex_unlock(&queue->mutex);
105 return -1; // 所有消息已过期
106 }
107
108 *data = queue->data[queue->out].data;
109 queue->out = (queue->out + 1) % queue->bounds;
110 queue->nelts--;
111
112 pthread_cond_signal(&queue->not_full); // 通知队列不满
113 pthread_mutex_unlock(&queue->mutex);
114 return 0;
115}
116
117// 终止队列
118void queue_terminate(message_queue_t *queue) {
119 pthread_mutex_lock(&queue->mutex);
120 queue->terminated = 1;
121 pthread_cond_broadcast(&queue->not_empty); // 唤醒所有等待线程
122 pthread_cond_broadcast(&queue->not_full); // 唤醒所有等待线程
123 pthread_mutex_unlock(&queue->mutex);
124}
125
126// 测试代码
127void *producer(void *arg) {
128 message_queue_t *queue = (message_queue_t *)arg;
129 for (int i = 0; i < 20; ++i) {
130 int *message = (int *)malloc(sizeof(int));
131 *message = i;
132 time_t expiration_time = time(NULL) + 5; // 设置消息5秒后过期
133 queue_push(queue, message, expiration_time);
134 printf("Produced: %d\n", *message);
135 sleep(1); // 模拟生产时间
136 }
137 queue_terminate(queue); // 生产者结束后终止队列
138 return NULL;
139}
140
141void *consumer(void *arg) {
142 message_queue_t *queue = (message_queue_t *)arg;
143 void *data;
144 while (queue_pop(queue, &data) == 0) {
145 int *message = (int *)data;
146 printf("Consumed: %d\n", *message);
147 free(message);
148 }
149 return NULL;
150}
151
152int main() {
153 message_queue_t *queue = queue_create(QUEUE_MAX_SIZE);
154 pthread_t prod_thread, cons_thread;
155
156 pthread_create(&prod_thread, NULL, producer, queue);
157 pthread_create(&cons_thread, NULL, consumer, queue);
158
159 pthread_join(prod_thread, NULL);
160 pthread_join(cons_thread, NULL);
161
162 queue_destroy(queue);
163 return 0;
164}
改进说明
增加消息时效性:
- 使用
message_t结构来存储消息数据及其过期时间 (expiration_time)。 - 在
queue_push函数中,将生成的消息及其过期时间一起插入队列。 - 在
queue_pop函数中,检查消息的过期时间。如果消息已过期,则丢弃该消息并继续检查下一条。
- 使用
插入和弹出时的过期处理:
- 在插入时不做过多检查,只要队列未满就插入。
- 在弹出时,使用当前时间 (
current_time) 检查消息的有效性,丢弃所有已过期的消息,直到找到有效的消息或队列为空。
消息的时效性管理:
- 使用
time(NULL)获取当前时间,与消息的expiration_time比较,判断消息是否过期。 - 采用循环机制丢弃所有过期的消息,确保队列中只有有效的消息被处理。
- 使用
还考虑队列操作超时的消息队列
将 queue_push 和 queue_pop 两个函数分别改为带有超时的版本。在这些函数中,当队列满或者为空时,将会使用 own_thread_cond_timedwait 来实现有超时机制的等待。
以下是改进后的线程安全消息队列实现
1#include <stdio.h>
2#include <stdlib.h>
3#include <pthread.h>
4#include <time.h>
5#include <unistd.h> // for sleep
6
7#define QUEUE_MAX_SIZE 10
8#define OWN_OK 0
9#define OWN_ERROR -1
10#define OWN_TIMEUP 1
11
12typedef long long own_time_t;
13
14// 时间相关的辅助函数
15own_time_t own_time_from_sec(long seconds) {
16 return seconds * 1000000LL;
17}
18
19long own_time_sec(own_time_t usec) {
20 return usec / 1000000LL;
21}
22
23long own_time_usec(own_time_t usec) {
24 return usec % 1000000LL;
25}
26
27void own_gettimeofday(struct timeval *tv) {
28 gettimeofday(tv, NULL);
29}
30
31static int own_thread_cond_timedwait(
32 pthread_cond_t *cond, pthread_mutex_t *mutex, own_time_t timeout) {
33 int r;
34 struct timespec to;
35 struct timeval tv;
36 own_time_t usec;
37
38 own_gettimeofday(&tv);
39 usec = own_time_from_sec(tv.tv_sec) + tv.tv_usec + timeout;
40 to.tv_sec = own_time_sec(usec);
41 to.tv_nsec = own_time_usec(usec) * 1000;
42
43 r = pthread_cond_timedwait(cond, mutex, &to);
44 if (r == 0)
45 return OWN_OK;
46 else if (r == ETIMEDOUT)
47 return OWN_TIMEUP;
48 else
49 return OWN_ERROR;
50}
51
52typedef struct {
53 void *data; // 存储消息数据的指针
54 time_t expiration_time; // 消息的过期时间
55} message_t;
56
57typedef struct {
58 message_t *data; // 存储消息的结构数组
59 unsigned int nelts; // 当前队列中的元素数量
60 unsigned int in; // 下一次插入的位置
61 unsigned int out; // 下一次弹出的位置
62 unsigned int bounds; // 队列的最大容量
63 pthread_mutex_t mutex; // 用于保护队列的互斥锁
64 pthread_cond_t not_empty; // 队列不为空的条件变量
65 pthread_cond_t not_full; // 队列不满的条件变量
66 int terminated; // 队列终止标志
67} message_queue_t;
68
69// 初始化队列
70message_queue_t *queue_create(unsigned int max_size) {
71 message_queue_t *queue = (message_queue_t *)malloc(sizeof(message_queue_t));
72 if (!queue) return NULL;
73
74 queue->data = (message_t *)malloc(max_size * sizeof(message_t));
75 if (!queue->data) {
76 free(queue);
77 return NULL;
78 }
79
80 queue->nelts = 0;
81 queue->in = 0;
82 queue->out = 0;
83 queue->bounds = max_size;
84 queue->terminated = 0;
85
86 pthread_mutex_init(&queue->mutex, NULL);
87 pthread_cond_init(&queue->not_empty, NULL);
88 pthread_cond_init(&queue->not_full, NULL);
89
90 return queue;
91}
92
93// 销毁队列
94void queue_destroy(message_queue_t *queue) {
95 pthread_mutex_destroy(&queue->mutex);
96 pthread_cond_destroy(&queue->not_empty);
97 pthread_cond_destroy(&queue->not_full);
98 free(queue->data);
99 free(queue);
100}
101
102// 插入消息到队列(带超时机制)
103int queue_push(message_queue_t *queue, void *data, time_t expiration_time, own_time_t timeout) {
104 pthread_mutex_lock(&queue->mutex);
105
106 while (queue->nelts == queue->bounds && !queue->terminated) {
107 int wait_result = own_thread_cond_timedwait(&queue->not_full, &queue->mutex, timeout);
108 if (wait_result == OWN_TIMEUP) {
109 pthread_mutex_unlock(&queue->mutex);
110 return OWN_TIMEUP; // 等待超时
111 } else if (wait_result != OWN_OK) {
112 pthread_mutex_unlock(&queue->mutex);
113 return OWN_ERROR; // 其他错误
114 }
115 }
116
117 if (queue->terminated) {
118 pthread_mutex_unlock(&queue->mutex);
119 return OWN_ERROR; // 队列已终止
120 }
121
122 queue->data[queue->in].data = data;
123 queue->data[queue->in].expiration_time = expiration_time;
124 queue->in = (queue->in + 1) % queue->bounds;
125 queue->nelts++;
126
127 pthread_cond_signal(&queue->not_empty); // 通知队列不为空
128 pthread_mutex_unlock(&queue->mutex);
129 return OWN_OK;
130}
131
132// 从队列中弹出消息(带超时机制)
133int queue_pop(message_queue_t *queue, void **data, own_time_t timeout) {
134 pthread_mutex_lock(&queue->mutex);
135
136 while (queue->nelts == 0 && !queue->terminated) {
137 int wait_result = own_thread_cond_timedwait(&queue->not_empty, &queue->mutex, timeout);
138 if (wait_result == OWN_TIMEUP) {
139 pthread_mutex_unlock(&queue->mutex);
140 return OWN_TIMEUP; // 等待超时
141 } else if (wait_result != OWN_OK) {
142 pthread_mutex_unlock(&queue->mutex);
143 return OWN_ERROR; // 其他错误
144 }
145 }
146
147 if (queue->terminated) {
148 pthread_mutex_unlock(&queue->mutex);
149 return OWN_ERROR; // 队列已终止
150 }
151
152 // 检查并丢弃过期消息
153 time_t current_time = time(NULL);
154 while (queue->nelts > 0 && queue->data[queue->out].expiration_time <= current_time) {
155 printf("Discarding expired message\n");
156 queue->out = (queue->out + 1) % queue->bounds;
157 queue->nelts--;
158 }
159
160 if (queue->nelts == 0) {
161 pthread_mutex_unlock(&queue->mutex);
162 return OWN_ERROR; // 所有消息已过期
163 }
164
165 *data = queue->data[queue->out].data;
166 queue->out = (queue->out + 1) % queue->bounds;
167 queue->nelts--;
168
169 pthread_cond_signal(&queue->not_full); // 通知队列不满
170 pthread_mutex_unlock(&queue->mutex);
171 return OWN_OK;
172}
173
174// 终止队列
175void queue_terminate(message_queue_t *queue) {
176 pthread_mutex_lock(&queue->mutex);
177 queue->terminated = 1;
178 pthread_cond_broadcast(&queue->not_empty); // 唤醒所有等待线程
179 pthread_cond_broadcast(&queue->not_full); // 唤醒所有等待线程
180 pthread_mutex_unlock(&queue->mutex);
181}
182
183// 测试代码
184void *producer(void *arg) {
185 message_queue_t *queue = (message_queue_t *)arg;
186 for (int i = 0; i < 20; ++i) {
187 int *message = (int *)malloc(sizeof(int));
188 *message = i;
189 time_t expiration_time = time(NULL) + 5; // 设置消息5秒后过期
190 if (queue_push(queue, message, expiration_time, 2000000) == OWN_TIMEUP) {
191 printf("Producer timed out waiting to push message %d\n", *message);
192 free(message);
193 } else {
194 printf("Produced: %d\n", *message);
195 }
196 sleep(1); // 模拟生产时间
197 }
198 queue_terminate(queue); // 生产者结束后终止队列
199 return NULL;
200}
201
202void *consumer(void *arg) {
203 message_queue_t *queue = (message_queue_t *)arg;
204 void *data;
205 while (queue_pop(queue, &data, 2000000) == OWN_OK) {
206 int *message = (int *)data;
207 printf("Consumed: %d\n", *message);
208 free(message);
209 }
210 return NULL;
211}
212
213int main() {
214 message_queue_t *queue = queue_create(QUEUE_MAX_SIZE);
215 pthread_t prod_thread, cons_thread;
216
217 pthread_create(&prod_thread, NULL, producer, queue);
218 pthread_create(&cons_thread, NULL, consumer, queue);
219
220 pthread_join(prod_thread, NULL);
221 pthread_join(cons_thread, NULL);
222
223 queue_destroy(queue);
224 return 0;
225}
改进点说明
在插入和弹出操作中加入了超时机制:
- 使用
own_thread_cond_timedwait替代原来的pthread_cond_wait,在等待条件变量时设定超时时间。 - 如果等待超时,则立即返回,并不会无限期地阻塞。
- 使用
提供超时和错误的处理:
- 如果
queue_push或queue_pop函数等待超时,返回OWN_TIMEUP,并根据具体情况进行相应的处理。
- 如果
处理消息的时效性:
- 在
queue_pop中,弹出消息前检查消息的过期时间,丢弃已过期的消息。
- 在
通过这样的改进,可以让消息队列在并发环境中更加健壮,既避免了无限等待,又能根据消息的时效性及时处理过期的消息。
附录. 确保消息时效性的方法
确保消息时效性的方法有很多种,具体方法取决于应用场景和需求。以下是几种常见的确保消息时效性的方式:
1. 使用超时机制
- 描述: 在消息队列中为每条消息设置一个超时时间,消息到达此时间后会被认为过期。
- 实现: 每次尝试从队列中读取消息时,检查当前时间与消息的超时时间。如果消息过期,则丢弃它。
- 优点: 直接在队列层面控制消息的时效性,简单有效。
- 缺点: 每次读取消息都需要额外的时间检查,可能影响性能。
2. 消息到期检查(定时任务)
- 描述: 使用后台线程或定时器,定期检查队列中的消息是否过期,并移除所有过期的消息。
- 实现: 可以在服务器上设置定时任务,定期遍历队列,移除过期消息。
- 优点: 不影响队列的正常读取操作,过期消息能被及时清理。
- 缺点: 在消息量较大时,定时检查可能导致一定的性能开销。
3. 消息优先级队列
- 描述: 使用优先级队列来处理消息,根据消息的到期时间或其他优先级因素来确定处理顺序。
- 实现: 将消息按超时时间或优先级进行排序,先处理即将过期或优先级较高的消息。
- 优点: 提高重要消息的处理优先级,减少低优先级或即将过期消息占用队列。
- 缺点: 需要更多的内存和计算资源来维护优先级,复杂度增加。
4. 使用TTL(Time-To-Live)机制
- 描述: 为消息设置一个TTL(生存时间),当消息到达队列时,开始计时。超过TTL时间后,消息自动丢弃。
- 实现: 在消息生产者发送消息时附带TTL字段,消费者或队列管理器根据TTL字段自动丢弃过期消息。
- 优点: 自动化程度高,消息过期处理由系统负责,使用广泛。
- 缺点: 需要消息传递的中间件或队列系统支持TTL功能(如RabbitMQ、Redis等)。
5. 在消息生产者端控制时效
- 描述: 生产者在发送消息时检查时效性,如果消息超时则不发送。
- 实现: 生产者端逻辑中,加入时间戳检查,确保只发送未过期的消息。
- 优点: 降低了网络和队列的负担,因为过期消息不会被发送。
- 缺点: 需要生产者具备时效性检查的逻辑,增加了生产者端的复杂度。
6. 消息版本控制
- 描述: 使用消息的版本号或唯一标识符(如时间戳)来跟踪消息的有效性。
- 实现: 在接收消息时,只处理最新版本的消息或指定时间范围内的消息。
- 优点: 可以灵活控制消息的时效性,特别适用于更新频率较高的场景。
- 缺点: 需要维护版本号和额外的逻辑来识别过期消息。
7. 使用队列中间件的过期策略
- 描述: 利用消息队列中间件(如RabbitMQ、Kafka、Redis等)提供的内置过期策略。
- 实现: 设置消息的过期时间(TTL),中间件在消息过期后自动将其删除或移动到死信队列(DLQ)。
- 优点: 利用已有中间件的功能,简单有效,自动化程度高。
- 缺点: 需要依赖特定中间件的特性和配置。
8. 使用缓存来临时存储消息
- 描述: 使用高速缓存(如Redis或Memcached)存储短时效消息,消息过期自动删除。
- 实现: 将消息放入缓存系统中,并设置过期时间,消息到期后由缓存系统自动删除。
- 优点: 高效,适合短时效、高并发的场景。
- 缺点: 不适合长时效消息,因为缓存系统通常适用于短期数据存储。
9. 消息消费者端的时效性处理
- 描述: 消费者在处理消息时检查消息的时间戳或时效信息,确定是否处理或丢弃。
- 实现: 消费者收到消息后,根据消息的时间戳和当前时间判断是否已过期,决定是否处理。
- 优点: 使消费者有更多的控制权来决定如何处理过期的消息。
- 缺点: 每个消费者都需要实现时效性检查逻辑,增加了复杂性。
10. 使用数据库来跟踪消息状态
- 描述: 将消息及其状态(有效或已过期)存储在数据库中,定期清理或更新过期消息。
- 实现: 使用数据库表记录消息的创建时间和状态字段,利用数据库查询或定时任务清理过期消息。
- 优点: 易于持久化,适合长时效的消息。
- 缺点: 数据库操作相对较慢,处理大批量消息时可能产生性能瓶颈。
总结
根据应用场景、消息数量和系统架构,可以选择一种或多种上述方法结合使用。例如,对于高并发场景,使用TTL机制和缓存可能更合适;对于需要更灵活控制的场景,可以使用消息优先级队列和消费者端检查策略。
最后修改于 2024-09-11 18:16