博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
suricata 源码分析之ringbuffer(环形缓冲区)
阅读量:2791 次
发布时间:2019-05-13

本文共 8611 字,大约阅读时间需要 28 分钟。

    环形缓冲区也就是ringbuffer,其性能相对于异步队列是非常高效的,原因 : 1)无锁; 2)cpu cache 友好;3) 内存不会疯涨。

    当然ringbuffer也有其缺点(也是优点, 不会出现内存暴增),就是其长度是固定的,如果满了无法再往里面放入数据,而异步队列没有这个限制,但是有可能因为并发不够,处理不过来,队列太长导致内存被大量占用,程序性能越来越差。

 

    suricata是多线程架构的,所以线程间的数据传递是必不可少的。因此在suricata-0.8中选用的数据结构是异步队列PacketQueue,

而在suricata-2.0.8中使用的是ringbuffer数据结构。在后期的所有版本中都淘汰了异步队列PacketQueue,使用ringbuffer或者

使用函数 pthread_key_create() 用来创建线程私有数据,其性能是高于异步队列的。

 

     为什么ringbuffer的性能高于异步队列?

     ringbuffer的锁粒度是单个节点,并且在生产者的情况下才会加锁(往ringbuffer中放节点),而异步队列是整个队列,并发远远大于异步队列。

     如果只有一个消费者生产者,则ringbuffer不需要加锁,性能更高。

 

    二者在suricata中的数据包使用

 

    2.0.8中

 

void TmqhPacketpoolRegister (void) {    tmqh_table[TMQH_PACKETPOOL].name = "packetpool";    tmqh_table[TMQH_PACKETPOOL].InHandler = TmqhInputPacketpool;    tmqh_table[TMQH_PACKETPOOL].OutHandler = TmqhOutputPacketpool;    ringbuffer = RingBufferInit();    if (ringbuffer == NULL) {        SCLogError(SC_ERR_FATAL, "Error registering Packet pool handler (at ring buffer init)");        exit(EXIT_FAILURE);    }}

    0.8中

void TmqhPacketpoolRegister (void) {    tmqh_table[TMQH_PACKETPOOL].name = "packetpool";    tmqh_table[TMQH_PACKETPOOL].InHandler = TmqhInputPacketpool;    tmqh_table[TMQH_PACKETPOOL].OutHandler = TmqhOutputPacketpool;}

 

     suricata中关于ringbuffer的定义与实现在util-ringbuffer.h和util-ringbuffer.c中。

     以下就以一个生产者和一个消费者的代码实现 

  /* 该环形缓冲区的大小 256 ( unsigned char 的大小) */

#define RING_BUFFER_8_SIZE 256typedef struct RingBuffer8_ {    /* write 和 read 都是 unsigned char 类型,巧妙地利用其值溢出效果来循环, 使用的是原子变量,来实现无锁*/    SC_ATOMIC_DECLARE(unsigned char, write);  /**< idx where we put data */    SC_ATOMIC_DECLARE(unsigned char, read);   /**< idx where we read data */    uint8_t shutdown;#ifdef RINGBUFFER_MUTEX_WAIT    SCCondT wait_cond;    SCMutex wait_mutex;#endif /* RINGBUFFER_MUTEX_WAIT */    SCSpinlock spin; /**< lock protecting writes for multi writer mode*/    /* 环形缓冲区里面存储实际的数据,可以是任何类型 */    void *array[RING_BUFFER_8_SIZE];} RingBuffer8;

 

其使用接口:

 

RingBuffer8Init:初始化,创建RingBuffer8,将读写计数清零,初始化自旋锁。

RingBufferSrSw8Put:生产者,先判断缓冲区是否已满,若满,循环等待直到有空间存储然后存储。

RingBufferSrMw8Get:消费者,先判断缓冲区是否为空,若空,循环等待直到有数据然后取走。

RingBuffer8Destroy:销毁缓冲区。

 

下面针对实际代码进行解释:

 

/* 生产者 */int RingBufferSrSw8Put(RingBuffer8 *rb, void *ptr){    /* buffer is full, wait... */    /* 判断该缓冲区是否已满,如果是,循环等待。这里判断的依据就是 read write的值会不停的从0-255循环 */    while ((unsigned char)(SC_ATOMIC_GET(rb->write) + 1) == SC_ATOMIC_GET(rb->read)) {        /* break out if the engine wants to shutdown */        if (rb->shutdown != 0)            return -1;        RingBuffer8DoWait(rb);    }    /* 预先将空间给生产者使用,无锁 */    rb->array[SC_ATOMIC_GET(rb->write)] = ptr;    (void) SC_ATOMIC_ADD(rb->write, 1);#ifdef RINGBUFFER_MUTEX_WAIT    SCCondSignal(&rb->wait_cond);#endif    return 0;}

 

/* 消费者 */ void *RingBufferSrMw8Get(RingBuffer8 *rb){    void *ptr = NULL;    /* buffer is empty, wait... */    /* 判断该缓冲区是否为空,如果是,循环等待。这里判断的依据就是 read write的值会不停的从0-255循环 */    while (SC_ATOMIC_GET(rb->write) == SC_ATOMIC_GET(rb->read)) {        /* break out if the engine wants to shutdown */        if (rb->shutdown != 0)            return NULL;        RingBuffer8DoWait(rb);    }    /* 如果有数据可读, 直接读取,然后移动写指针 */    ptr = rb->array[SC_ATOMIC_GET(rb->read)];    (void) SC_ATOMIC_ADD(rb->read, 1);#ifdef RINGBUFFER_MUTEX_WAIT    SCCondSignal(&rb->wait_cond);#endif    return ptr;}

 

以上代码是256大小的环形缓冲区,,没有加锁的地方,由此可见,其使用过程中,如果缓冲区未满,在多线程并发访问下(读写线程只有一个),不会发生阻塞,效率高

下面再介绍0.8版本的PacketQueue packet_q:

 

typedef struct PacketQueue_ {    Packet *top;    Packet *bot;    uint16_t len;    /* 队列锁*/    SCMutex mutex_q;    SCCondT cond_q;#ifdef DBG_PERF    uint16_t dbg_maxlen;#endif /* DBG_PERF */} PacketQueue;

初始化过程在main函数中,在程序启动的时候预先分配MAX_PENDING个数据包 

 

int i = 0;    for (i = 0; i < MAX_PENDING; i++) {        /* XXX pkt alloc function */        Packet *p = malloc(sizeof(Packet));        if (p == NULL) {            printf("ERROR: malloc failed: %s\n", strerror(errno));            exit(EXIT_FAILURE);        }        memset(p, 0, sizeof(Packet));        SCMutexInit(&p->mutex_rtv_cnt, NULL);        PacketEnqueue(&packet_q,p);    }

 

/* 从 packet_q队列中取出数据包*/Packet *SetupPkt (void){    Packet *p = NULL;    int r = 0;    /* 此处对packet_q整个队列加锁,其他地方不能访问*/    r = SCMutexLock(&packet_q.mutex_q);    p = PacketDequeue(&packet_q);    r = SCMutexUnlock(&packet_q.mutex_q);    if (p == NULL) {        TmqDebugList();        p = malloc(sizeof(Packet));        if (p == NULL) {            printf("ERROR: malloc failed: %s\n", strerror(errno));            exit(EXIT_FAILURE);        }        memset(p, 0, sizeof(Packet));        r = SCMutexInit(&p->mutex_rtv_cnt, NULL);        SCLogDebug("allocated a new packet...");    }    /* reset the packet csum fields */    RESET_PACKET_CSUMS(p);    return p;}

/*生产者 在调用SetupPkt 将对packet_q整个异步队列进行加锁*/

Packet *TmqhInputPacketpool(ThreadVars *t){    /* XXX */    Packet *p = SetupPkt();    SCMutexLock(&mutex_pending);    pending++;    //printf("PcapFileCallback: pending %" PRIu32 "\n", pending);#ifdef DBG_PERF    if (pending > dbg_maxpending)        dbg_maxpending = pending;#endif /* DBG_PERF */    SCMutexUnlock(&mutex_pending);/* * Disabled because it can enter a 'wait' state, while * keeping the nfq queue locked thus making it impossble * to free packets, the exact condition we are waiting * for. VJ 09-01-16 *    SCMutexLock(&mutex_pending);    if (pending > MAX_PENDING) {        SCondWait(&cond_pending, &mutex_pending);    }    SCMutexUnlock(&mutex_pending);*/    return p;}

/* 消费者 */

void TmqhOutputPacketpool(ThreadVars *t, Packet *p){    PacketQueue *q = &packet_q;    char proot = 0;    if (p == NULL)        return;    if (IS_TUNNEL_PKT(p)) {        //printf("TmqhOutputPacketpool: tunnel packet: %p %s\n", p,p->root ? "upper layer":"root");        /* get a lock */        SCMutex *m = p->root ? &p->root->mutex_rtv_cnt : &p->mutex_rtv_cnt;        SCMutexLock(m);        if (IS_TUNNEL_ROOT_PKT(p)) {            //printf("TmqhOutputPacketpool: IS_TUNNEL_ROOT_PKT\n");            if (TUNNEL_PKT_TPR(p) == 0) {                //printf("TmqhOutputPacketpool: TUNNEL_PKT_TPR(p) == 0\n");                /* if this packet is the root and there are no                 * more tunnel packets, enqueue it */                /* fall through */            } else {                //printf("TmqhOutputPacketpool: TUNNEL_PKT_TPR(p) > 0\n");                /* if this is the root and there are more tunnel                 * packets, don't add this. It's still referenced                 * by the tunnel packets, and we will enqueue it                 * when we handle them */                p->tunnel_verdicted = 1;                SCMutexUnlock(m);                return;            }        } else {            //printf("TmqhOutputPacketpool: NOT IS_TUNNEL_ROOT_PKT\n");            if (p->root->tunnel_verdicted == 1 && TUNNEL_PKT_TPR(p) == 1) {                //printf("TmqhOutputPacketpool: p->root->tunnel_verdicted == 1 && TUNNEL_PKT_TPR(p) == 1\n");                /* the root is ready and we are the last tunnel packet,                 * lets enqueue them both. */                TUNNEL_DECR_PKT_TPR_NOLOCK(p);                /* handle the root */                //printf("TmqhOutputPacketpool: calling PacketEnqueue for root pkt, p->root %p (%p)\n", p->root, p);                proot = 1;                /* fall through */            } else {                //printf("TmqhOutputPacketpool: NOT p->root->tunnel_verdicted == 1 && TUNNEL_PKT_TPR(p) == 1 (%" PRIu32 ")\n", TUNNEL_PKT_TPR(p));                TUNNEL_DECR_PKT_TPR_NOLOCK(p);                 /* fall through */            }        }        SCMutexUnlock(m);        //printf("TmqhOutputPacketpool: tunnel stuff done, move on\n");    }    FlowDecrUsecnt(t,p);    if (proot && p->root != NULL) {        CLEAR_PACKET(p->root);        SCMutexLock(&q->mutex_q);        PacketEnqueue(q, p->root);        SCMutexUnlock(&q->mutex_q);    }    CLEAR_PACKET(p);
/* 对整个异步队列加锁 导致其他线程不能操作,并发下降*/    SCMutexLock(&q->mutex_q);    PacketEnqueue(q, p);    SCMutexUnlock(&q->mutex_q);    SCMutexLock(&mutex_pending);    //printf("TmqhOutputPacketpool: pending %" PRIu32 "\n", pending);    if (pending > 0) {        pending--;        if (proot) {            if (pending > 0) {                pending--;            } else {                printf("TmqhOutputPacketpool: warning, trying to subtract from 0 pending counter (tunnel root).\n");            }        }    } else {        printf("TmqhOutputPacketpool: warning, trying to subtract from 0 pending counter.\n");    }    if (pending <= MAX_PENDING)        SCCondSignal(&cond_pending);    SCMutexUnlock(&mutex_pending);}

 

suricata 使用 unsigned char 和unsigned short 的大小来定义环形缓冲区的大小,非常巧妙。

以上就是本人对环形缓冲区与异步队列的理解、性能分析、对比,并未对环形缓冲区原理进行介绍。

其定义可以参考以下链接:

转载地址:http://pvtmd.baihongyu.com/

你可能感兴趣的文章
Xcode5&4.6.3 + iOS 7&6免证书开发+真机调试+生成ipa全攻略
查看>>
关键帧动画CAKeyframeAnimation,可以用它来控制图像的运动轨迹
查看>>
linux学习之vi编辑器常用命令
查看>>
UML建模之用例图
查看>>
白盒测试之语句覆盖
查看>>
java 23种设计模式的UML类结构
查看>>
StarUML: 插件加载失败的解决方法 (无法逆向工程)
查看>>
android获取手机内存使用情况
查看>>
常用词汇
查看>>
Android事件分发机制完全解析,带你从源码的角度彻底理解(下)
查看>>
Android Volley完全解析(四),带你从源码的角度理解Volley
查看>>
Activity生命周期
查看>>
apache 的工作模式
查看>>
linux shell wget下载远程目录
查看>>
mysql 5.7源码 启动监听过程
查看>>
git pull fetch 更新本地文件区别
查看>>
REQUEST_URL PHP_SELF SCRIPT_NAME区别
查看>>
libevent,libcurl 以及php扩展libevent,php curl_multi_exec区别
查看>>
in_interrupt()和 in_irq、in_softirq介绍
查看>>
mysql8.0版本在配置文件my.ini[mysqld]加上skip-grant-tables后无法启动
查看>>