dpdk-架构重组:两线程+一组ring

目标

之前是在main函数的while(1)中实现了全部功能,本次把之前的流水账改为层次化。层次化需要不同模块共同运作,依赖环形队列进行数据传送,这就叫做“异步解耦”。如图所示。

实现步骤

环形队列

先定义一对环形队列,把它俩封装到一个struct中,命名为inout_ring

#if ENABLE_RINGBUFFER
struct inout_ring {
    struct rte_ring *in, *out;
};
#endif

in和out全局只有一对,所以设计成单例。单例在arp中已经使用过。

#if ENABLE_RINGBUFFER
static struct inout_ring *rInst = NULL;
static struct inout_ring *ringInstance(void){
    if (rInst == NULL) {
        rInst = rte_malloc("in out ring", sizeof(struct inout_ring), 0);
        memset(rInst, 0, sizeof(struct inout_ring));
    }
    return rInst;
}
#endif
main的改造
环形队列初始化
#if ENABLE_RINGBUFFER

    struct inout_ring *ring = ringInstance();
    if (ring == NULL) {
        rte_exit(EXIT_FAILURE, "环形缓冲区初始化失败\n");
    }
    if (ring->in == NULL){
        // 查阅函数定义,如何去初始化ring
        ring->in = rte_ring_create("in ring", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); //第四个参数flag是从example中抄来的
    }
    if (ring->out == NULL){
        ring->out = rte_ring_create("out ring", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); 
    }
#endif
while(1) 的简化
    while (1) {

        // 接收数据(以太网帧),放到 in buffer
        struct rte_mbuf *rx_mbufs[BURST_SIZE]; //数组指针, mbufs尚未分配实际空间,空间从内存池来
        unsigned num_recvd = rte_eth_rx_burst(gDpdkPortId, 0, rx_mbufs, BURST_SIZE);
                                    //参数1:端口<注意此端口不是tcp/udp的端口号,而是网络适配器eth0的端口> 
                                    //参数2:从哪个队列接收
                                    //参数3:接收后放到哪

        if (num_recvd > BURST_SIZE) {
            rte_exit(EXIT_FAILURE, "rte_eth_rx_burst接收出错,溢出\n");
        } else if (num_recvd > 0) {
            rte_ring_sp_enqueue_burst(ring->in, (void **)rx_mbufs, num_recvd, NULL);
        }

        // 从out buffer中取数据,发送
        struct rte_mbuf *tx_mbufs[BURST_SIZE]; 
        unsigned nb_tx = rte_ring_dequeue_burst(ring->out, (void **)tx_mbufs, BURST_SIZE, NULL);
        if (nb_tx > 0) {
            rte_eth_tx_burst(gDpdkPortId, 0, tx_mbufs, nb_tx);

            // 分配与释放:对于in buffer中的项,是在接收时被分配,用户态协议栈被释放;
            // 对于out buffer的项,是在用户态协议栈被分配,发送后释放。

            for(unsigned i = 0; i < nb_tx; i++){
                rte_pktmbuf_free(tx_mbufs[i]);
            }

        }

        static uint64_t prev_tsc = 0, cur_tsc;
        uint64_t diff_tsc;

        cur_tsc = rte_rdtsc();
        diff_tsc = cur_tsc - prev_tsc;
        if (diff_tsc > TIMER_RESOLUTION_CYCLES) {
            rte_timer_manage();
            prev_tsc = cur_tsc;
        }

    }

创建pkt_process线程

位置:main中while之前

#if ENABLE_MULTHREAD

    rte_eal_remote_launch(pkt_process, mbuf_pool, rte_get_next_lcore(lcore_id, 1, 0));

#endif

函数定义:

#if ENABLE_MULTHREAD

static int pkt_process(__attribute__((unused)) void *arg) {
}

#endif
图上四个箭头的依次实现

在main的while(1)中已经实现两个了。另外两个就是从in buf->pkt_process,以及pkt_process->out buf

我们先把原来while1中与协议处理的全部内容放到pkt_process中。

接收改为dequeue from in_buf,

全部的发送(rte_eth_tx_burst)改为向out buffer ring 入队,即代码中的:

rte_eth_tx_burst(gDpdkPortId, 0, &arpbuf, 1);
rte_pktmbuf_free(arpbuf);

全部改为:

rte_ring_mp_enqueue_burst(ring->out, (void **)&arpbuf, 1, NULL);

别忘记timer cb函数中也有arp报文的发送,也要改。

完整的pkt_process
// 用户态协议栈
static int pkt_process(__attribute__((unused)) void *arg) {
    // 传参mbuf
    // 取出in buf的东西,处理,释放,放入out buf
    // 之前的协议栈代码复制过来
    struct rte_mempool *mbuf_pool = (struct rte_mempool *)arg;
    struct inout_ring *ring = ringInstance();

    while (1) {
        // dequeue
        struct rte_mbuf *mbufs[BURST_SIZE];
        unsigned num_recvd = rte_ring_mc_dequeue_burst(ring->in, (void **)mbufs, BURST_SIZE, NULL);

        // 之前的协议栈代码复制过来
        unsigned i = 0;
        for (i = 0;i < num_recvd;i++) {

            struct rte_ether_hdr *ehdr = rte_pktmbuf_mtod(mbufs[i], struct rte_ether_hdr*);// rte_pktmbuf_mtod是宏定义。参数1:内存地址 参数2:转换成什么类型
                    //// ehdr就是拿出的以太网数据

#ifdef ENABLE_ARP

            if (ehdr->ether_type == rte_cpu_to_be_16(RTE_ETHER_TYPE_ARP)) {



                // 解析这个arp报文并回复
                struct rte_arp_hdr *ahdr = rte_pktmbuf_mtod_offset(mbufs[i], struct rte_arp_hdr *, sizeof(struct rte_ether_hdr));

                // 调试
                struct in_addr addr_source, addr_target;
                addr_source.s_addr = ahdr->arp_data.arp_sip;

                printf("[recv][arp协议] src: %s ", inet_ntoa(addr_source));

                addr_target.s_addr = ahdr->arp_data.arp_tip;
                printf("target:%s\n",inet_ntoa(addr_target));


                if (ahdr->arp_data.arp_tip == gLocalIp) { // 如果arp问的就是自己的p地址

                    if (ahdr->arp_opcode == rte_cpu_to_be_16(RTE_ARP_OP_REQUEST)) {
                        printf("\t[是向本机的请求报文]\n");
                        struct rte_mbuf * arpbuf = jr_arp_send(mbuf_pool, RTE_ARP_OP_REPLY, ahdr->arp_data.arp_sha.addr_bytes, ahdr->arp_data.arp_tip, ahdr->arp_data.arp_sip);
                        //rte_eth_tx_burst(gDpdkPortId, 0, &arpbuf, 1);// 真正的发送操作,第四个参数:发一个包,第二个参数:使用0号队列发送
                        //rte_pktmbuf_free(arpbuf);

                        rte_ring_mp_enqueue_burst(ring->out, (void **)&arpbuf, 1, NULL);

                    }
                    else if (ahdr->arp_opcode == rte_cpu_to_be_16(RTE_ARP_OP_REPLY)) {
                        printf("\t[是给本机的应答报文]\n");
                        struct arp_table *table = arp_table_instance();
                        uint8_t *hardware_addr = jr_get_dst_macaddr(ahdr->arp_data.arp_sip);
                        if (hardware_addr == NULL){
                            // 解析mac地址,如果没存过,则存到arp表中

                            struct arp_entry *entry = rte_malloc("arp entry", sizeof(struct arp_entry), 0);

                            if (entry) { //注意此处若失败就失败了,不用终止程序
                                // 在entry中填上内容并加到链表中
                                entry->ip = ahdr->arp_data.arp_sip;
                                rte_memcpy(entry->hwaddr, ahdr->arp_data.arp_sha.addr_bytes, RTE_ETHER_ADDR_LEN);
                                entry->type = ARP_ENTRY_STATUS_DYNAMIC;

                                LL_ADD(entry, table->entries);
                                table->count++;


                            }


                        }

#if ENABLE_DEBUG
                        // 打印arp表
                        struct arp_entry *iter;
                        for (iter = table->entries;iter != NULL;iter = iter->next) {
                            // mac
                            jr_print_ether_addr("", (const struct rte_ether_addr *)iter->hwaddr);
                            // ip
                            struct in_addr addr;
                            addr.s_addr = iter->ip;
                            printf("\t\t%s", inet_ntoa(addr));                          
                            // type
                            printf("\t\t%s\n", iter->type == ARP_ENTRY_STATUS_DYNAMIC ? "dynamic" : "static");
                        }           
#endif


                    }
                    rte_pktmbuf_free(mbufs[i]);

                }
                continue;
            }

#endif

            // 此处可看到对接口处理函数封装的已经很完善了
            if (ehdr->ether_type != rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4)) {
                // 如果不是ip协议,跳过
                rte_pktmbuf_free(mbufs[i]);
                continue;
            }


            // 如果是ip协议,取出
            struct rte_ipv4_hdr *iphdr =  rte_pktmbuf_mtod_offset(mbufs[i], struct rte_ipv4_hdr *, 
            sizeof(struct rte_ether_hdr));

            if (iphdr->next_proto_id == IPPROTO_UDP) {

                struct rte_udp_hdr *udphdr = (struct rte_udp_hdr *)(iphdr + 1);





#ifdef ENABLE_SEND
                rte_memcpy(gDstMac, ehdr->s_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
                rte_memcpy(&gSrcIp, &iphdr->dst_addr, sizeof(uint32_t));
                rte_memcpy(&gDstIp, &iphdr->src_addr, sizeof(uint32_t));
                rte_memcpy(&gSrcPort, &udphdr->dst_port, sizeof(uint16_t));
                rte_memcpy(&gDstPort, &udphdr->src_port, sizeof(uint16_t));
#endif
                uint16_t length = ntohs(udphdr->dgram_len); //经验:网络字节序2字节以上必须转,不管实际是什么序
                *((char*)udphdr + length) = '\0';

                struct in_addr addr;
                addr.s_addr = iphdr->src_addr;
                printf("[recv][udp协议]src: %s:%d, ", inet_ntoa(addr), ntohs(udphdr->src_port));

                addr.s_addr = iphdr->dst_addr;
                printf("dst: %s:%d, 内容:%s  length=%d\n", inet_ntoa(addr), ntohs(udphdr->dst_port), 
                    (ntohs(udphdr->dst_port) == 8888) ? (char *)(udphdr+1) : "略", length);

#ifdef ENABLE_SEND

                if (gSrcIp == gLocalIp){
                    struct rte_mbuf * txbuf = jr_send(mbuf_pool, (uint8_t *)(udphdr+1), length); 
                    //rte_eth_tx_burst(gDpdkPortId, 0, &txbuf, 1);// 真正的发送操作,第四个参数:发一个包,第二个参数:使用0号队列发送
                    //rte_pktmbuf_free(txbuf);

                    rte_ring_mp_enqueue_burst(ring->out, (void **)&txbuf, 1, NULL);
                }
#endif

                rte_pktmbuf_free(mbufs[i]);
            }

#ifdef ENABLE_ICMP

            if (iphdr->next_proto_id == IPPROTO_ICMP) {
                printf("[recv][icmp]");
                struct rte_icmp_hdr *icmphdr = (struct rte_icmp_hdr *)(iphdr + 1);
                // 在此只实现echo请求报文的回复,类型是08(RTE_IP_ICMP_ECHO_REQUEST)
                if (icmphdr->icmp_type == RTE_IP_ICMP_ECHO_REQUEST) {
                    printf("[request]\n");
                    struct rte_mbuf *txbuf = jr_send_icmp(
                        mbuf_pool, ehdr->s_addr.addr_bytes, 
                        iphdr->dst_addr, iphdr->src_addr, 
                        icmphdr->icmp_ident, icmphdr->icmp_seq_nb
                        );

                    //rte_eth_tx_burst(gDpdkPortId, 0, &txbuf, 1);// 真正的发送操作,第四个参数:发一个包,第二个参数:使用0号队列发送
                    //rte_pktmbuf_free(txbuf);  

                    rte_ring_mp_enqueue_burst(ring->out, (void **)&txbuf, 1, NULL);

                }
                rte_pktmbuf_free(mbufs[i]);
            }
#endif


        }
    }
    return 0;
}

#endif

结果

完成了跟上一集一样的效果。