dpdk-udp socket的实现(三线程+两组ring)

目标

之前虽然能够解析udp报文,但无法承载上面的应用程序。换句话说,我们希望udp_server程序使用我们提供的API,API的函数内部是对于报文的解析。

解释server:UDP 不同于 TCP,不存在请求连接和受理过程,因此在某种意义上无法明确区分服务器端和客户端,只是因为其提供服务而称为服务器端。1

详细的,请看如下的一个使用内核api的udp程序 unix_udp.c:

// my_code/0201udp/unix_udp.c
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>

#define UDP_APP_RECV_BUFFER_SIZE 128

int main(int argc, char *argv[]) {
    int connfd = socket(AF_INET, SOCK_DGRAM, 0);
    if (connfd == -1) {
        printf("sockfd(套接字描述符)分配失败\n");
        return -1;
    }
    struct sockaddr_in localaddr, clientaddr; // struct sockaddr
    memset(&localaddr, 0, sizeof(struct sockaddr_in));
    localaddr.sin_port = htons(8889);
    localaddr.sin_family = AF_INET;
    localaddr.sin_addr.s_addr = inet_addr("192.168.43.130"); // 0.0.0.0

    bind(connfd, (struct sockaddr*)&localaddr, sizeof(localaddr));

    char buffer[UDP_APP_RECV_BUFFER_SIZE] = {0};
    socklen_t addrlen = sizeof(clientaddr);
    while(1) {
        if (recvfrom(connfd, buffer, UDP_APP_RECV_BUFFER_SIZE, 0, (struct sockaddr*)&clientaddr, &addrlen) < 0) {
            continue;
        } else {
            printf("recv from %s:%d, data:%s\n", inet_ntoa(clientaddr.sin_addr),ntohs(clientaddr.sin_port), buffer);
            sendto(connfd, buffer, strlen(buffer), 0, (struct sockaddr*)&clientaddr, sizeof(clientaddr));
        }
    }
    close(connfd);
}

使用gcc -o unix_udp unix_udp.c命令编译运行后,在命令行中输入netstat -anop | grep 8889,可以发现

  • 端口 8889 被一个名为 unix_udp 的进程(进程 ID 为 1539)监听。
  • 该端口使用的是 UDP 协议。

本次目标是将unix_udp的代码完整复制到udp.c中,使用我们手工提供的接口能够运行起来且实现相同功能。架构如下图所示。

TODO

了解当前整个程序的架构

三个线程

TODO
  1. 在udp.c中,在main中启动udp_server线程

  2. send buffer和 recv buffer的定义(在结构体connfd中)

  3. udp应用所需接口

列出要实现的接口

可通过man 函数名命令查询内核API。整理全部接口(五个)如下:

int socket(int domain, int type, int protocol);
int bind(int sockfd, const struct sockaddr *addr,
                socklen_t addrlen);
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
                        struct sockaddr *src_addr, socklen_t *addrlen);
ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
                      const struct sockaddr *dest_addr, socklen_t addrlen);
int close(int fd); 

当然man查询时还能查到其他,比如sendmsg,什么时候用到再去实现。

了解每个接口的任务

一切都围绕着connfd而来,这是一个int类型的数值。socket进行connfd的分配,分配一个别人没有用过的int数值。bind就是绑定,一个connfd对应着一对”local ip : local port”。所以在socket时也要进行ip地址和port号的空间分配。recvfrom/sendto根据connfd查询到”local ip : local port”。close对connfd进行释放。

实现

在main中启动udp_server_entry线程

请注意dpdk启动线程时要求绑定core,所以这么写:

#if ENABLE_MULTHREAD

+   lcore_id = rte_get_next_lcore(lcore_id, 1, 0);
-   rte_eal_remote_launch(pkt_process, mbuf_pool, rte_get_next_lcore(lcore_id, 1, 0););
+   rte_eal_remote_launch(pkt_process, mbuf_pool, lcore_id);

#endif

+#if ENABLE_UDP_APP
+
+   lcore_id = rte_get_next_lcore(lcore_id, 1, 0);
+   rte_eal_remote_launch(udp_server_entry, mbuf_pool, lcore_id);
+
+#endif
定义struct localhost
#if ENABLE_UDP_APP
struct localhost {
    int fd;
    uint32_t localip; //localip与localmac一一对应,在此处记录localmac是为了sendto的时候要写源mac地址,到时从这里找
    uint8_t localmac[RTE_ETHER_ADDR_LEN];
    uint16_t localport;
    int protocal;

    struct rte_ring *sendbuf, *recvbuf;
    struct localhost* prev, *next; //双向链表。如果连接成千上万,那么就要用红黑树了
}
#endif
接下来实现接口:socket的实现

至于为什么不把内核的socket实现粘贴过来:就算原封不动的移植还是比自己重写要麻烦。此外我们已经实现了一些内容,和内核协议栈不完全一致,在此基础上照搬只会更难。

#if ENABLE_UDP_APP

static struct localhost *lhost = NULL; //链表

#define DEFAULT_FD_NUM 3
int get_fd_from_bitmap(void){
    int fd = DEFAULT_FD_NUM;
    return fd;
}

struct localhost * get_hostinfo_from_fd(int sockfd) {
    struct localhost *host;
    for (host = lhost; host !- NULL; host = host->next){
        if (sockfd == host->fd)
            return host;
    }
    return NULL;
}

int socket(int domain, int type, int protocol){
    int fd = get_fd_from_bitmap();// 0(标准输入),1(标准输出),2(标准输出错误)已被系统使用
    struct localhost *host = rte_malloc("localhost", sizeof(struct localhost), 0);
    if (host == NULL) {
        printf("rte_malloc localhost 失败\n");
        return -1;
    }
    memset(host, 0, sizeof(struct localhost *));

    host->fd = fd;

    if (type == SOCK_DGRAM)
        host->protocol = IPPROTO_UDP;
    else if (type == SOCK_STGREAM)
        host->protocol = IPPROTO_TCP;

    host->recvbuf = rte_ring_create("recvbuf", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
    if (host->recvbuf == NULL) {
        rte_free(host);
        printf("recvbuf ring 创建失败\n");
        return -1;
    }
    host->sendbuf = rte_ring_create("sendbuf", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
    if (host->sendbuf == NULL) {
        rte_ring_free(host->recvbuf);
        rte_free(host);
        printf("sendbuf ring 创建失败\n");
        return -1;
    }
    LL_ADD(host, lhost);
    return fd;

}
#endif
接口bind的实现
#if ENABLE_UDP_APP

int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen){
    struct localhost *host = get_hostinfo_from_fd(sockfd);
    if (host == NULL) {
        printf("绑定失败\n");
        return -1;
    }

    // 绑定,即写入struct localhost中的localip/mac/port三项
    struct sockaddr_in * laddr = (struct sockaddr_in *)addr;
    host->localport = laddr->sin_port;
    rte_memcpy(&host->localip, laddr->sin_addr.s_addr, sizeof(uint32_t));
    rte_memspy(host->localmac, gSrcMac, RTE_ETHER_ADDR_LEN);// gSrcMac是全局变量,main中一上来通过dpdk函数取得的。稳妥做法是开启dpdk时先维护一些ip地址和mac地址的数值对。这个全局变量一看就不保险。
    return 0;
}

#endif
接口close的实现

与socked是对应的——那里申请,这里释放;那里添加,这里移除。

“`c
#if ENABLE_UDP_APP
int close(int fd){
struct localhost *host = get_host_info_from_fd(sockfd);
if (host == NULL) return -1;

<pre><code>LL_REMOVE(host, lhost);
if (host->recvbuf) {
rte_ring_free(host->recvbuf);
}
if (host->sendbuf) {
rte_ring_free(host->sendbuf);
}
rte_free(host);
return 0;
</code></pre>

}
#endif

“`

udp_process函数的实现:在用户态协议栈中把数据抛到recv buffer

recvfrom要做的是从recvbuf中取出数据包继续向上提交,所以在实现recvfrom之前,先要在pkt_process(用户协议栈)中把udp包放入recvbuf。

为了每个函数不太长,现在将pkt_process中处理udp协议报文的部分单独提出,命名为udp_process。为了好理解,可以先把原来的那部分直接剪切到udp_process函数中(但稍后udp_process函数中大部分要被删改)。

static int udp_process(struct rte_mbuf * udpmbuf){
    struct rte_ipv4_hdr *iphdr =  rte_pktmbuf_mtod_offset(udpmbuf, struct rte_ipv4_hdr *, 
            sizeof(struct rte_ether_hdr));
    struct rte_udp_hdr *udphdr = (struct rte_udp_hdr *)(iphdr + 1);

#ifdef ENABLE_SEND
    rte_memcpy(gDstMac, ehdr->s_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
    rte_memcpy(&gSrcIp, &iphdr->dst_addr, sizeof(uint32_t));
    rte_memcpy(&gDstIp, &iphdr->src_addr, sizeof(uint32_t));
    rte_memcpy(&gSrcPort, &udphdr->dst_port, sizeof(uint16_t));
    rte_memcpy(&gDstPort, &udphdr->src_port, sizeof(uint16_t));
#endif
    uint16_t length = ntohs(udphdr->dgram_len); //经验:网络字节序2字节以上必须转,不管实际是什么序
    *((char*)udphdr + length) = '\0';

    struct in_addr addr;
    addr.s_addr = iphdr->src_addr;
    printf("[recv][udp协议]src: %s:%d, ", inet_ntoa(addr), ntohs(udphdr->src_port));

    addr.s_addr = iphdr->dst_addr;
    printf("dst: %s:%d, 内容:%s  length=%d\n", inet_ntoa(addr), ntohs(udphdr->dst_port), 
        (ntohs(udphdr->dst_port) == 8888) ? (char *)(udphdr+1) : "略", length);  
}

#endif

接下来进行修改。首先重构包需要一个新的结构体,offload。

应用层包类型:struct offload的定义

作用:组装udp数据

struct offload{
    uint32_t sip, dip;
    uint16_t aport, dport;
    uint8_t protocol;
    unsigned char *data;
    uint16_t length;
}
get_hostinfo_from_ipport函数的实现:监听的真正含义

通过端口找fd

struct localhost * get_hostinfo_from_ipport(uint32_t dip, uint16_t port, itn proto) {
    struct localhost *host;
    for (host = lhost; host !- NULL; host = host->next){
        if (dip == host->localip && port == host->localport && proto == host->protocal)
            return host;
    }
    return NULL;
}

至此梳理出udp_process做三件事:

  1. 解析
  2. 转换包类型,从rte_mbuf到struct offload
  3. 将offload入队(enqueue)到recvbuf

最终udp_process的实现:

#if ENABLE_MULTHREAD

#if ENABLE_UDP_APP

static int udp_process(struct rte_mbuf * udpmbuf){
    struct rte_ipv4_hdr *iphdr =  rte_pktmbuf_mtod_offset(udpmbuf, struct rte_ipv4_hdr *, 
            sizeof(struct rte_ether_hdr));
    struct rte_udp_hdr *udphdr = (struct rte_udp_hdr *)(iphdr + 1);

    struct locolhost *host = get_hostinfo_from_ipport(iphdr->dst_addr, udphdr->dst_port, iphdr->next_proto_id);
    if (host == NULL) {
        rte_pktmbuf_free(udpmbuf);
        return -3;   
    }

    struct offload *ol = rte_malloc("offload", sizeof(struct offload), 0);
    if (ol == NULL) {
        rte_pktmbuf_free(udpmbuf);
        return -1;   
    }
    ol->dip = iphdr->dst_addr;
    ol->sip = iphdr->src_addr;
    ol->sport = udphdr->src_port;
    ol->dport = udphdr->dst_port;

    ol->protocol = IPPROTO_UDP;
    ol->length = ntohs(udphdr->dgram_len);

    ol->data = rte_malloc("unsigned char*", ol->length - sizeof(struct rte_udp_hdr), 0);
    if (ol->data == NULL) {
        rte_pktmbuf_free(udpmbuf);
        rte_free(ol);
        return -2;
    }

    rte_ring_mp_enqueue(host->recvbuf, ol);

    rte_pktmbuf_free(udpmbuf);
}

#endif

static int pkt_process(void *arg) {…}

#endif
接口recvfrom的实现

包已经在recv buf中等待我们取出了,此时终于可以写recvfrom了:

ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, 
                 struct sockaddr *src_addr, socklen_t *addrlen) {

    struct localhost *host = get_host_info_from_fd(sockfd);
    if (host == NULL)  return -1;

    struct offload *ol = NULL;
    int nb == rte_ring_mc_dequeue(host->sendbuf, &ol);
    if (nb < 0){
        //阻塞
    }

    struct sockaddr_in *saddr = (struct sockaddr_in *)arc_addr;
    asddr->sin_port = ol->dport;
    rte_memcpy(&saddr->sin_addr.s_addr, &ol->sip, sizeof(uint32_t));

}
条件变量的使用:如果当下无可收的包,进入阻塞态

定义

struct localhost {
    …
+   pthread_cond_t codn;
+   pthread_mutex_t mutex;
}

初始化,在socked中

int socket(int domain, int type, int protocol){
    …

+   pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
+   rte_memcpy(&host->cond, &blank_cond, sizeof(pthread_cond_t));
+   pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
+   rte_memcpy(&host->mutex, &blank_mutex, sizeof(pthread_mutex_t));

    LL_ADD(host, lhost);
    return fd;
}

阻塞

ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags, 
                 struct sockaddr *src_addr, socklen_t *addrlen) {
    …
    struct offload *ol = NULL;
+   int nb = -1;
+   pthread_mutex_lock(&host->mutex);
+   while ((nb = rte_ring_mc_dequeue(host->sendbuf, &ol)) < 0){
+       pthread_cond_wait(&host->cond, &host->mutex);
+   }
+   pthread_mutex_unlock(&host->mutex);
    …    
}

唤醒

在udp_process将包放入队列时

static int udp_process(struct rte_mbuf * udpmbuf){
    …    
    rte_ring_mp_enqueue(host->recvbuf, ol);
+   pthread_mutex_lock(&host->mutex);
+   pthread_cond_signal(&host->cond);
+   pthread_mutex_unlock(&host->mutex);
    …        
}
接口sendto的实现

准备一个offload把它enqueue到send buf中

#if ENABLE_UDP_APP
static ssize_t sendto(int sockfd, const void *buf, size_t len, __attribute__((unused)) int flags,
                      const struct sockaddr *dest_addr, __attribute__((unused)) socklen_t addrlen){

    struct localhost *host = get_hostinfo_from_fd(sockfd);
    if (host == NULL)  return -1;

    const struct sockaddr_in *daddr = (const struct sockaddr_in *)dest_addr;

    struct offload *ol = rte_malloc("offload", sizeof(struct offload), 0);
    if (ol == NULL) return -1;

    ol->dip = daddr->sin_addr.s_addr;
    ol->dport = daddr->sin_port;
    ol->sip = host->localip;
    ol->sport = host->localport;
    ol->length = len;

    ol->data = rte_malloc("unsigned char *", len, 0);
    if (ol->data == NULL) {
        rte_free(ol);
        return -1;
    }

    rte_memcpy(ol->data, buf, len);

    rte_ring_mp_enqueue(host->sendbuf, ol);

    return len;
}

#endif
}
udp_out的实现

从send buf中取出(类型为offload),放入out buf(类型为rte_mbuf),中间要做个类型转换(重新组包)。且它不监听,而是遍历全部fd对应的send buf,有就发。

回忆:从rte_mbuf到struct offload的转换是由udp_process做的。

static int udp_out(struct rte_mempool *mbuf_pool) {
    struct localhost *host;
    for (host = lhost; host != NULL; host = host->next) {
        struct offload * ol;
        int nb_snd = rte_ring_mc_dequeue(host->sendbuf, &ol);
        if (nb_snd < 0) continue; //此处就没必要像接收那里做阻塞
        uint8_t * dstmac = jr_get_dst_macaddr(ol->dip);
        if (dstmac == NULL) {
            struct rte_mbuf *arpbuf = jr_send_arp(mbuf_pool, RTE_ARP_OP_REQUEST, gDefaultArpMac, ol->sip, ol->dip);
            // 组织完arp包后不能直接发,而是放入out buffer
            struct inout_ring *ring = ringInstance();
            rte_ring_mp_enqueue_burst(ring->out, (void **) &arpbuf, 1, NULL);
            rte_ring_mp_enqueue(host->sendbuf, ol);

        } else {
            struct rte_mbuf *udpbuf = jr_udp_pkt(mbuf_pool, ol->sip, ol->dip, ol->sport, ol->dport, host->localmac, dstmac, ol->data, ol->length);

            struct inout_ring *ring = ringInstance();
            rte_ring_mp_enqueue_burst(ring->out, (void **) udpbuf, 1, NULL);
        }
    }
    return 0;
}