线程安全的消息队列简单实现
线程安全的消息队列简单实现

前置知识

在实现线程安全的消息队列前,需要了解一些线程安全相关的库函数用法。

消息队列实现

1. 不考虑线程安全的消息队列实现

以下是一个不考虑线程安全的简单消息队列实现。这个实现中没有使用任何锁或条件变量,因此不适用于多线程环境,但在单线程环境或明确控制访问时可以使用。

  1#include <stdio.h>
  2#include <stdlib.h>
  3
  4#define QUEUE_MAX_SIZE 10
  5#define QUEUE_OK 0
  6#define QUEUE_ERROR -1
  7#define QUEUE_FULL 1
  8#define QUEUE_EMPTY 2
  9
 10typedef struct {
 11    void **data;            // 存储消息数据的指针数组
 12    unsigned int nelts;     // 当前队列中的元素数量
 13    unsigned int in;        // 下一次插入的位置
 14    unsigned int out;       // 下一次弹出的位置
 15    unsigned int bounds;    // 队列的最大容量
 16} message_queue_t;
 17
 18// 初始化队列
 19message_queue_t *queue_create(unsigned int max_size) {
 20    message_queue_t *queue = (message_queue_t *)malloc(sizeof(message_queue_t));
 21    if (!queue) return NULL;
 22
 23    queue->data = (void **)malloc(max_size * sizeof(void *));
 24    if (!queue->data) {
 25        free(queue);
 26        return NULL;
 27    }
 28
 29    queue->nelts = 0;
 30    queue->in = 0;
 31    queue->out = 0;
 32    queue->bounds = max_size;
 33
 34    return queue;
 35}
 36
 37// 销毁队列
 38void queue_destroy(message_queue_t *queue) {
 39    free(queue->data);
 40    free(queue);
 41}
 42
 43// 插入消息到队列
 44int queue_push(message_queue_t *queue, void *data) {
 45    if (queue->nelts == queue->bounds) {
 46        return QUEUE_FULL; // 队列已满
 47    }
 48
 49    queue->data[queue->in] = data;
 50    queue->in = (queue->in + 1) % queue->bounds;
 51    queue->nelts++;
 52
 53    return QUEUE_OK;
 54}
 55
 56// 从队列中弹出消息
 57int queue_pop(message_queue_t *queue, void **data) {
 58    if (queue->nelts == 0) {
 59        return QUEUE_EMPTY; // 队列为空
 60    }
 61
 62    *data = queue->data[queue->out];
 63    queue->out = (queue->out + 1) % queue->bounds;
 64    queue->nelts--;
 65
 66    return QUEUE_OK;
 67}
 68
 69// 测试代码
 70int main() {
 71    message_queue_t *queue = queue_create(QUEUE_MAX_SIZE);
 72    if (!queue) {
 73        printf("Failed to create queue\n");
 74        return QUEUE_ERROR;
 75    }
 76
 77    // 插入消息
 78    for (int i = 0; i < 15; ++i) {
 79        int *message = (int *)malloc(sizeof(int));
 80        *message = i;
 81        int result = queue_push(queue, message);
 82        if (result == QUEUE_FULL) {
 83            printf("Queue is full, cannot push message %d\n", *message);
 84            free(message); // 队列已满时释放消息
 85        } else {
 86            printf("Produced: %d\n", *message);
 87        }
 88    }
 89
 90    // 弹出消息
 91    void *data;
 92    while (queue_pop(queue, &data) == QUEUE_OK) {
 93        int *message = (int *)data;
 94        printf("Consumed: %d\n", *message);
 95        free(message);
 96    }
 97
 98    queue_destroy(queue);
 99    return 0;
100}

实现说明

  1. queue_create:

    • 初始化一个新的消息队列,分配内存,并设置初始值。
  2. queue_destroy:

    • 销毁队列,释放已分配的内存。
  3. queue_push:

    • 将消息插入到队列中。如果队列已满,返回 QUEUE_FULL
  4. queue_pop:

    • 从队列中弹出消息。如果队列为空,返回 QUEUE_EMPTY
  5. main 函数测试:

    • 创建一个消息队列,插入 15 条消息,超出队列最大容量时停止插入。
    • 然后依次从队列中弹出消息,打印出结果,并释放消息内存。

2. 考虑线程安全的消息队列实现

基本的线程安全的消息队列

以下是一个使用互斥锁 (pthread_mutex_t) 和条件变量 (pthread_cond_t) 来实现的简单的线程安全消息队列,包括创建队列、推送消息、弹出消息,以及销毁队列的功能。

  1#include <stdio.h>
  2#include <stdlib.h>
  3#include <pthread.h>
  4
  5#define QUEUE_MAX_SIZE 10
  6
  7typedef struct {
  8    void **data;                 // 存储消息数据的指针数组
  9    unsigned int nelts;          // 当前队列中的元素数量
 10    unsigned int in;             // 下一次插入的位置
 11    unsigned int out;            // 下一次弹出的位置
 12    unsigned int bounds;         // 队列的最大容量
 13    pthread_mutex_t mutex;       // 用于保护队列的互斥锁
 14    pthread_cond_t not_empty;    // 队列不为空的条件变量
 15    pthread_cond_t not_full;     // 队列不满的条件变量
 16    int terminated;              // 队列终止标志
 17} message_queue_t;
 18
 19// 初始化队列
 20message_queue_t *queue_create(unsigned int max_size) {
 21    message_queue_t *queue = (message_queue_t *)malloc(sizeof(message_queue_t));
 22    if (!queue) return NULL;
 23
 24    queue->data = (void **)malloc(max_size * sizeof(void *));
 25    if (!queue->data) {
 26        free(queue);
 27        return NULL;
 28    }
 29
 30    queue->nelts = 0;
 31    queue->in = 0;
 32    queue->out = 0;
 33    queue->bounds = max_size;
 34    queue->terminated = 0;
 35
 36    pthread_mutex_init(&queue->mutex, NULL);
 37    pthread_cond_init(&queue->not_empty, NULL);
 38    pthread_cond_init(&queue->not_full, NULL);
 39
 40    return queue;
 41}
 42
 43// 销毁队列
 44void queue_destroy(message_queue_t *queue) {
 45    pthread_mutex_destroy(&queue->mutex);
 46    pthread_cond_destroy(&queue->not_empty);
 47    pthread_cond_destroy(&queue->not_full);
 48    free(queue->data);
 49    free(queue);
 50}
 51
 52// 插入消息到队列
 53int queue_push(message_queue_t *queue, void *data) {
 54    pthread_mutex_lock(&queue->mutex);
 55
 56    while (queue->nelts == queue->bounds && !queue->terminated) {
 57        pthread_cond_wait(&queue->not_full, &queue->mutex); // 等待队列不满
 58    }
 59
 60    if (queue->terminated) {
 61        pthread_mutex_unlock(&queue->mutex);
 62        return -1; // 队列已终止
 63    }
 64
 65    queue->data[queue->in] = data;
 66    queue->in = (queue->in + 1) % queue->bounds;
 67    queue->nelts++;
 68
 69    pthread_cond_signal(&queue->not_empty); // 通知队列不为空
 70    pthread_mutex_unlock(&queue->mutex);
 71    return 0;
 72}
 73
 74// 从队列中弹出消息
 75int queue_pop(message_queue_t *queue, void **data) {
 76    pthread_mutex_lock(&queue->mutex);
 77
 78    while (queue->nelts == 0 && !queue->terminated) {
 79        pthread_cond_wait(&queue->not_empty, &queue->mutex); // 等待队列不为空
 80    }
 81
 82    if (queue->terminated) {
 83        pthread_mutex_unlock(&queue->mutex);
 84        return -1; // 队列已终止
 85    }
 86
 87    *data = queue->data[queue->out];
 88    queue->out = (queue->out + 1) % queue->bounds;
 89    queue->nelts--;
 90
 91    pthread_cond_signal(&queue->not_full); // 通知队列不满
 92    pthread_mutex_unlock(&queue->mutex);
 93    return 0;
 94}
 95
 96// 终止队列
 97void queue_terminate(message_queue_t *queue) {
 98    pthread_mutex_lock(&queue->mutex);
 99    queue->terminated = 1;
100    pthread_cond_broadcast(&queue->not_empty); // 唤醒所有等待线程
101    pthread_cond_broadcast(&queue->not_full);  // 唤醒所有等待线程
102    pthread_mutex_unlock(&queue->mutex);
103}
104
105// 测试代码
106void *producer(void *arg) {
107    message_queue_t *queue = (message_queue_t *)arg;
108    for (int i = 0; i < 20; ++i) {
109        int *message = (int *)malloc(sizeof(int));
110        *message = i;
111        queue_push(queue, message);
112        printf("Produced: %d\n", *message);
113    }
114    queue_terminate(queue); // 生产者结束后终止队列
115    return NULL;
116}
117
118void *consumer(void *arg) {
119    message_queue_t *queue = (message_queue_t *)arg;
120    void *data;
121    while (queue_pop(queue, &data) == 0) {
122        int *message = (int *)data;
123        printf("Consumed: %d\n", *message);
124        free(message);
125    }
126    return NULL;
127}
128
129int main() {
130    message_queue_t *queue = queue_create(QUEUE_MAX_SIZE);
131    pthread_t prod_thread, cons_thread;
132
133    pthread_create(&prod_thread, NULL, producer, queue);
134    pthread_create(&cons_thread, NULL, consumer, queue);
135
136    pthread_join(prod_thread, NULL);
137    pthread_join(cons_thread, NULL);
138
139    queue_destroy(queue);
140    return 0;
141}

实现说明

  1. 创建和销毁队列

    • queue_create 函数分配队列结构并初始化互斥锁和条件变量。
    • queue_destroy 函数释放队列和条件变量所占用的资源。
  2. 消息的推送和弹出

    • queue_push 函数在队列满时等待,并在插入数据后发出不为空的信号。
    • queue_pop 函数在队列空时等待,并在弹出数据后发出不满的信号。
  3. 终止队列

    • queue_terminate 函数设置终止标志并唤醒所有等待的线程,避免死锁。
  4. 生产者-消费者测试

    • producerconsumer 函数模拟生产者和消费者线程,将数据插入和弹出队列,验证线程安全性。

考虑消息时效性的线程安全的消息队列

如果消息具有时效性(即消息只能在一定的时间范围内有效),需要在消息队列中增加超时机制,以确保过期的消息不会被处理。以下是一个简单的处理时效性消息的方法:

在插入和弹出时检查消息的超时时间

  1. 在消息结构中添加时间戳: 每条消息应带有一个时间戳,记录它的生成或有效期的结束时间。

  2. 在插入消息时检查队列的可用性: 如果队列已满,可以选择丢弃超时的旧消息,或不插入新消息,或者替换已有的超时消息。

  3. 在弹出消息时检查超时时间: 当消费者弹出消息时,检查消息的时间戳是否在当前时间范围内有效。如果消息已超时,则丢弃该消息并继续弹出下一条消息,直到找到一条有效的消息或队列为空。

下面是改进的线程安全消息队列实现,加入了消息时效性的处理:

  1#include <stdio.h>
  2#include <stdlib.h>
  3#include <pthread.h>
  4#include <time.h>
  5#include <unistd.h> // for sleep
  6
  7#define QUEUE_MAX_SIZE 10
  8
  9typedef struct {
 10    void *data;                 // 存储消息数据的指针
 11    time_t expiration_time;     // 消息的过期时间
 12} message_t;
 13
 14typedef struct {
 15    message_t *data;            // 存储消息的结构数组
 16    unsigned int nelts;         // 当前队列中的元素数量
 17    unsigned int in;            // 下一次插入的位置
 18    unsigned int out;           // 下一次弹出的位置
 19    unsigned int bounds;        // 队列的最大容量
 20    pthread_mutex_t mutex;      // 用于保护队列的互斥锁
 21    pthread_cond_t not_empty;   // 队列不为空的条件变量
 22    pthread_cond_t not_full;    // 队列不满的条件变量
 23    int terminated;             // 队列终止标志
 24} message_queue_t;
 25
 26// 初始化队列
 27message_queue_t *queue_create(unsigned int max_size) {
 28    message_queue_t *queue = (message_queue_t *)malloc(sizeof(message_queue_t));
 29    if (!queue) return NULL;
 30
 31    queue->data = (message_t *)malloc(max_size * sizeof(message_t));
 32    if (!queue->data) {
 33        free(queue);
 34        return NULL;
 35    }
 36
 37    queue->nelts = 0;
 38    queue->in = 0;
 39    queue->out = 0;
 40    queue->bounds = max_size;
 41    queue->terminated = 0;
 42
 43    pthread_mutex_init(&queue->mutex, NULL);
 44    pthread_cond_init(&queue->not_empty, NULL);
 45    pthread_cond_init(&queue->not_full, NULL);
 46
 47    return queue;
 48}
 49
 50// 销毁队列
 51void queue_destroy(message_queue_t *queue) {
 52    pthread_mutex_destroy(&queue->mutex);
 53    pthread_cond_destroy(&queue->not_empty);
 54    pthread_cond_destroy(&queue->not_full);
 55    free(queue->data);
 56    free(queue);
 57}
 58
 59// 插入消息到队列
 60int queue_push(message_queue_t *queue, void *data, time_t expiration_time) {
 61    pthread_mutex_lock(&queue->mutex);
 62
 63    while (queue->nelts == queue->bounds && !queue->terminated) {
 64        pthread_cond_wait(&queue->not_full, &queue->mutex); // 等待队列不满
 65    }
 66
 67    if (queue->terminated) {
 68        pthread_mutex_unlock(&queue->mutex);
 69        return -1; // 队列已终止
 70    }
 71
 72    queue->data[queue->in].data = data;
 73    queue->data[queue->in].expiration_time = expiration_time;
 74    queue->in = (queue->in + 1) % queue->bounds;
 75    queue->nelts++;
 76
 77    pthread_cond_signal(&queue->not_empty); // 通知队列不为空
 78    pthread_mutex_unlock(&queue->mutex);
 79    return 0;
 80}
 81
 82// 从队列中弹出消息
 83int queue_pop(message_queue_t *queue, void **data) {
 84    pthread_mutex_lock(&queue->mutex);
 85
 86    while (queue->nelts == 0 && !queue->terminated) {
 87        pthread_cond_wait(&queue->not_empty, &queue->mutex); // 等待队列不为空
 88    }
 89
 90    if (queue->terminated) {
 91        pthread_mutex_unlock(&queue->mutex);
 92        return -1; // 队列已终止
 93    }
 94
 95    // 检查并丢弃过期消息
 96    time_t current_time = time(NULL);
 97    while (queue->nelts > 0 && queue->data[queue->out].expiration_time <= current_time) {
 98        printf("Discarding expired message\n");
 99        queue->out = (queue->out + 1) % queue->bounds;
100        queue->nelts--;
101    }
102
103    if (queue->nelts == 0) {
104        pthread_mutex_unlock(&queue->mutex);
105        return -1; // 所有消息已过期
106    }
107
108    *data = queue->data[queue->out].data;
109    queue->out = (queue->out + 1) % queue->bounds;
110    queue->nelts--;
111
112    pthread_cond_signal(&queue->not_full); // 通知队列不满
113    pthread_mutex_unlock(&queue->mutex);
114    return 0;
115}
116
117// 终止队列
118void queue_terminate(message_queue_t *queue) {
119    pthread_mutex_lock(&queue->mutex);
120    queue->terminated = 1;
121    pthread_cond_broadcast(&queue->not_empty); // 唤醒所有等待线程
122    pthread_cond_broadcast(&queue->not_full);  // 唤醒所有等待线程
123    pthread_mutex_unlock(&queue->mutex);
124}
125
126// 测试代码
127void *producer(void *arg) {
128    message_queue_t *queue = (message_queue_t *)arg;
129    for (int i = 0; i < 20; ++i) {
130        int *message = (int *)malloc(sizeof(int));
131        *message = i;
132        time_t expiration_time = time(NULL) + 5; // 设置消息5秒后过期
133        queue_push(queue, message, expiration_time);
134        printf("Produced: %d\n", *message);
135        sleep(1); // 模拟生产时间
136    }
137    queue_terminate(queue); // 生产者结束后终止队列
138    return NULL;
139}
140
141void *consumer(void *arg) {
142    message_queue_t *queue = (message_queue_t *)arg;
143    void *data;
144    while (queue_pop(queue, &data) == 0) {
145        int *message = (int *)data;
146        printf("Consumed: %d\n", *message);
147        free(message);
148    }
149    return NULL;
150}
151
152int main() {
153    message_queue_t *queue = queue_create(QUEUE_MAX_SIZE);
154    pthread_t prod_thread, cons_thread;
155
156    pthread_create(&prod_thread, NULL, producer, queue);
157    pthread_create(&cons_thread, NULL, consumer, queue);
158
159    pthread_join(prod_thread, NULL);
160    pthread_join(cons_thread, NULL);
161
162    queue_destroy(queue);
163    return 0;
164}

改进说明

  1. 增加消息时效性

    • 使用 message_t 结构来存储消息数据及其过期时间 (expiration_time)。
    • queue_push 函数中,将生成的消息及其过期时间一起插入队列。
    • queue_pop 函数中,检查消息的过期时间。如果消息已过期,则丢弃该消息并继续检查下一条。
  2. 插入和弹出时的过期处理

    • 在插入时不做过多检查,只要队列未满就插入。
    • 在弹出时,使用当前时间 (current_time) 检查消息的有效性,丢弃所有已过期的消息,直到找到有效的消息或队列为空。
  3. 消息的时效性管理

    • 使用 time(NULL) 获取当前时间,与消息的 expiration_time 比较,判断消息是否过期。
    • 采用循环机制丢弃所有过期的消息,确保队列中只有有效的消息被处理。

还考虑队列操作超时的消息队列

queue_pushqueue_pop 两个函数分别改为带有超时的版本。在这些函数中,当队列满或者为空时,将会使用 own_thread_cond_timedwait 来实现有超时机制的等待。

以下是改进后的线程安全消息队列实现

  1#include <stdio.h>
  2#include <stdlib.h>
  3#include <pthread.h>
  4#include <time.h>
  5#include <unistd.h> // for sleep
  6
  7#define QUEUE_MAX_SIZE 10
  8#define OWN_OK 0
  9#define OWN_ERROR -1
 10#define OWN_TIMEUP 1
 11
 12typedef long long own_time_t;
 13
 14// 时间相关的辅助函数
 15own_time_t own_time_from_sec(long seconds) {
 16    return seconds * 1000000LL;
 17}
 18
 19long own_time_sec(own_time_t usec) {
 20    return usec / 1000000LL;
 21}
 22
 23long own_time_usec(own_time_t usec) {
 24    return usec % 1000000LL;
 25}
 26
 27void own_gettimeofday(struct timeval *tv) {
 28    gettimeofday(tv, NULL);
 29}
 30
 31static int own_thread_cond_timedwait(
 32        pthread_cond_t *cond, pthread_mutex_t *mutex, own_time_t timeout) {
 33    int r;
 34    struct timespec to;
 35    struct timeval tv;
 36    own_time_t usec;
 37
 38    own_gettimeofday(&tv);
 39    usec = own_time_from_sec(tv.tv_sec) + tv.tv_usec + timeout;
 40    to.tv_sec = own_time_sec(usec);
 41    to.tv_nsec = own_time_usec(usec) * 1000;
 42
 43    r = pthread_cond_timedwait(cond, mutex, &to);
 44    if (r == 0)
 45        return OWN_OK; 
 46    else if (r == ETIMEDOUT)
 47        return OWN_TIMEUP;
 48    else 
 49        return OWN_ERROR;
 50}
 51
 52typedef struct {
 53    void *data;                 // 存储消息数据的指针
 54    time_t expiration_time;     // 消息的过期时间
 55} message_t;
 56
 57typedef struct {
 58    message_t *data;            // 存储消息的结构数组
 59    unsigned int nelts;         // 当前队列中的元素数量
 60    unsigned int in;            // 下一次插入的位置
 61    unsigned int out;           // 下一次弹出的位置
 62    unsigned int bounds;        // 队列的最大容量
 63    pthread_mutex_t mutex;      // 用于保护队列的互斥锁
 64    pthread_cond_t not_empty;   // 队列不为空的条件变量
 65    pthread_cond_t not_full;    // 队列不满的条件变量
 66    int terminated;             // 队列终止标志
 67} message_queue_t;
 68
 69// 初始化队列
 70message_queue_t *queue_create(unsigned int max_size) {
 71    message_queue_t *queue = (message_queue_t *)malloc(sizeof(message_queue_t));
 72    if (!queue) return NULL;
 73
 74    queue->data = (message_t *)malloc(max_size * sizeof(message_t));
 75    if (!queue->data) {
 76        free(queue);
 77        return NULL;
 78    }
 79
 80    queue->nelts = 0;
 81    queue->in = 0;
 82    queue->out = 0;
 83    queue->bounds = max_size;
 84    queue->terminated = 0;
 85
 86    pthread_mutex_init(&queue->mutex, NULL);
 87    pthread_cond_init(&queue->not_empty, NULL);
 88    pthread_cond_init(&queue->not_full, NULL);
 89
 90    return queue;
 91}
 92
 93// 销毁队列
 94void queue_destroy(message_queue_t *queue) {
 95    pthread_mutex_destroy(&queue->mutex);
 96    pthread_cond_destroy(&queue->not_empty);
 97    pthread_cond_destroy(&queue->not_full);
 98    free(queue->data);
 99    free(queue);
100}
101
102// 插入消息到队列(带超时机制)
103int queue_push(message_queue_t *queue, void *data, time_t expiration_time, own_time_t timeout) {
104    pthread_mutex_lock(&queue->mutex);
105
106    while (queue->nelts == queue->bounds && !queue->terminated) {
107        int wait_result = own_thread_cond_timedwait(&queue->not_full, &queue->mutex, timeout);
108        if (wait_result == OWN_TIMEUP) {
109            pthread_mutex_unlock(&queue->mutex);
110            return OWN_TIMEUP; // 等待超时
111        } else if (wait_result != OWN_OK) {
112            pthread_mutex_unlock(&queue->mutex);
113            return OWN_ERROR; // 其他错误
114        }
115    }
116
117    if (queue->terminated) {
118        pthread_mutex_unlock(&queue->mutex);
119        return OWN_ERROR; // 队列已终止
120    }
121
122    queue->data[queue->in].data = data;
123    queue->data[queue->in].expiration_time = expiration_time;
124    queue->in = (queue->in + 1) % queue->bounds;
125    queue->nelts++;
126
127    pthread_cond_signal(&queue->not_empty); // 通知队列不为空
128    pthread_mutex_unlock(&queue->mutex);
129    return OWN_OK;
130}
131
132// 从队列中弹出消息(带超时机制)
133int queue_pop(message_queue_t *queue, void **data, own_time_t timeout) {
134    pthread_mutex_lock(&queue->mutex);
135
136    while (queue->nelts == 0 && !queue->terminated) {
137        int wait_result = own_thread_cond_timedwait(&queue->not_empty, &queue->mutex, timeout);
138        if (wait_result == OWN_TIMEUP) {
139            pthread_mutex_unlock(&queue->mutex);
140            return OWN_TIMEUP; // 等待超时
141        } else if (wait_result != OWN_OK) {
142            pthread_mutex_unlock(&queue->mutex);
143            return OWN_ERROR; // 其他错误
144        }
145    }
146
147    if (queue->terminated) {
148        pthread_mutex_unlock(&queue->mutex);
149        return OWN_ERROR; // 队列已终止
150    }
151
152    // 检查并丢弃过期消息
153    time_t current_time = time(NULL);
154    while (queue->nelts > 0 && queue->data[queue->out].expiration_time <= current_time) {
155        printf("Discarding expired message\n");
156        queue->out = (queue->out + 1) % queue->bounds;
157        queue->nelts--;
158    }
159
160    if (queue->nelts == 0) {
161        pthread_mutex_unlock(&queue->mutex);
162        return OWN_ERROR; // 所有消息已过期
163    }
164
165    *data = queue->data[queue->out].data;
166    queue->out = (queue->out + 1) % queue->bounds;
167    queue->nelts--;
168
169    pthread_cond_signal(&queue->not_full); // 通知队列不满
170    pthread_mutex_unlock(&queue->mutex);
171    return OWN_OK;
172}
173
174// 终止队列
175void queue_terminate(message_queue_t *queue) {
176    pthread_mutex_lock(&queue->mutex);
177    queue->terminated = 1;
178    pthread_cond_broadcast(&queue->not_empty); // 唤醒所有等待线程
179    pthread_cond_broadcast(&queue->not_full);  // 唤醒所有等待线程
180    pthread_mutex_unlock(&queue->mutex);
181}
182
183// 测试代码
184void *producer(void *arg) {
185    message_queue_t *queue = (message_queue_t *)arg;
186    for (int i = 0; i < 20; ++i) {
187        int *message = (int *)malloc(sizeof(int));
188        *message = i;
189        time_t expiration_time = time(NULL) + 5; // 设置消息5秒后过期
190        if (queue_push(queue, message, expiration_time, 2000000) == OWN_TIMEUP) {
191            printf("Producer timed out waiting to push message %d\n", *message);
192            free(message);
193        } else {
194            printf("Produced: %d\n", *message);
195        }
196        sleep(1); // 模拟生产时间
197    }
198    queue_terminate(queue); // 生产者结束后终止队列
199    return NULL;
200}
201
202void *consumer(void *arg) {
203    message_queue_t *queue = (message_queue_t *)arg;
204    void *data;
205    while (queue_pop(queue, &data, 2000000) == OWN_OK) {
206        int *message = (int *)data;
207        printf("Consumed: %d\n", *message);
208        free(message);
209    }
210    return NULL;
211}
212
213int main() {
214    message_queue_t *queue = queue_create(QUEUE_MAX_SIZE);
215    pthread_t prod_thread, cons_thread;
216
217    pthread_create(&prod_thread, NULL, producer, queue);
218    pthread_create(&cons_thread, NULL, consumer, queue);
219
220    pthread_join(prod_thread, NULL);
221    pthread_join(cons_thread, NULL);
222
223    queue_destroy(queue);
224    return 0;
225}

改进点说明

  1. 在插入和弹出操作中加入了超时机制

    • 使用 own_thread_cond_timedwait 替代原来的 pthread_cond_wait,在等待条件变量时设定超时时间。
    • 如果等待超时,则立即返回,并不会无限期地阻塞。
  2. 提供超时和错误的处理

    • 如果 queue_pushqueue_pop 函数等待超时,返回 OWN_TIMEUP,并根据具体情况进行相应的处理。
  3. 处理消息的时效性

    • queue_pop 中,弹出消息前检查消息的过期时间,丢弃已过期的消息。

通过这样的改进,可以让消息队列在并发环境中更加健壮,既避免了无限等待,又能根据消息的时效性及时处理过期的消息。

附录. 确保消息时效性的方法

确保消息时效性的方法有很多种,具体方法取决于应用场景和需求。以下是几种常见的确保消息时效性的方式:

1. 使用超时机制

  • 描述: 在消息队列中为每条消息设置一个超时时间,消息到达此时间后会被认为过期。
  • 实现: 每次尝试从队列中读取消息时,检查当前时间与消息的超时时间。如果消息过期,则丢弃它。
  • 优点: 直接在队列层面控制消息的时效性,简单有效。
  • 缺点: 每次读取消息都需要额外的时间检查,可能影响性能。

2. 消息到期检查(定时任务)

  • 描述: 使用后台线程或定时器,定期检查队列中的消息是否过期,并移除所有过期的消息。
  • 实现: 可以在服务器上设置定时任务,定期遍历队列,移除过期消息。
  • 优点: 不影响队列的正常读取操作,过期消息能被及时清理。
  • 缺点: 在消息量较大时,定时检查可能导致一定的性能开销。

3. 消息优先级队列

  • 描述: 使用优先级队列来处理消息,根据消息的到期时间或其他优先级因素来确定处理顺序。
  • 实现: 将消息按超时时间或优先级进行排序,先处理即将过期或优先级较高的消息。
  • 优点: 提高重要消息的处理优先级,减少低优先级或即将过期消息占用队列。
  • 缺点: 需要更多的内存和计算资源来维护优先级,复杂度增加。

4. 使用TTL(Time-To-Live)机制

  • 描述: 为消息设置一个TTL(生存时间),当消息到达队列时,开始计时。超过TTL时间后,消息自动丢弃。
  • 实现: 在消息生产者发送消息时附带TTL字段,消费者或队列管理器根据TTL字段自动丢弃过期消息。
  • 优点: 自动化程度高,消息过期处理由系统负责,使用广泛。
  • 缺点: 需要消息传递的中间件或队列系统支持TTL功能(如RabbitMQ、Redis等)。

5. 在消息生产者端控制时效

  • 描述: 生产者在发送消息时检查时效性,如果消息超时则不发送。
  • 实现: 生产者端逻辑中,加入时间戳检查,确保只发送未过期的消息。
  • 优点: 降低了网络和队列的负担,因为过期消息不会被发送。
  • 缺点: 需要生产者具备时效性检查的逻辑,增加了生产者端的复杂度。

6. 消息版本控制

  • 描述: 使用消息的版本号或唯一标识符(如时间戳)来跟踪消息的有效性。
  • 实现: 在接收消息时,只处理最新版本的消息或指定时间范围内的消息。
  • 优点: 可以灵活控制消息的时效性,特别适用于更新频率较高的场景。
  • 缺点: 需要维护版本号和额外的逻辑来识别过期消息。

7. 使用队列中间件的过期策略

  • 描述: 利用消息队列中间件(如RabbitMQ、Kafka、Redis等)提供的内置过期策略。
  • 实现: 设置消息的过期时间(TTL),中间件在消息过期后自动将其删除或移动到死信队列(DLQ)。
  • 优点: 利用已有中间件的功能,简单有效,自动化程度高。
  • 缺点: 需要依赖特定中间件的特性和配置。

8. 使用缓存来临时存储消息

  • 描述: 使用高速缓存(如Redis或Memcached)存储短时效消息,消息过期自动删除。
  • 实现: 将消息放入缓存系统中,并设置过期时间,消息到期后由缓存系统自动删除。
  • 优点: 高效,适合短时效、高并发的场景。
  • 缺点: 不适合长时效消息,因为缓存系统通常适用于短期数据存储。

9. 消息消费者端的时效性处理

  • 描述: 消费者在处理消息时检查消息的时间戳或时效信息,确定是否处理或丢弃。
  • 实现: 消费者收到消息后,根据消息的时间戳和当前时间判断是否已过期,决定是否处理。
  • 优点: 使消费者有更多的控制权来决定如何处理过期的消息。
  • 缺点: 每个消费者都需要实现时效性检查逻辑,增加了复杂性。

10. 使用数据库来跟踪消息状态

  • 描述: 将消息及其状态(有效或已过期)存储在数据库中,定期清理或更新过期消息。
  • 实现: 使用数据库表记录消息的创建时间和状态字段,利用数据库查询或定时任务清理过期消息。
  • 优点: 易于持久化,适合长时效的消息。
  • 缺点: 数据库操作相对较慢,处理大批量消息时可能产生性能瓶颈。

总结

根据应用场景、消息数量和系统架构,可以选择一种或多种上述方法结合使用。例如,对于高并发场景,使用TTL机制和缓存可能更合适;对于需要更灵活控制的场景,可以使用消息优先级队列和消费者端检查策略。


最后修改于 2024-09-11 18:16