本文概览
目标
之前虽然能够解析udp报文,但无法承载上面的应用程序。换句话说,我们希望udp_server程序使用我们提供的API,API的函数内部是对于报文的解析。
解释server:UDP 不同于 TCP,不存在请求连接和受理过程,因此在某种意义上无法明确区分服务器端和客户端,只是因为其提供服务而称为服务器端。1
详细的,请看如下的一个使用内核api的udp程序 unix_udp.c
:
// my_code/0201udp/unix_udp.c
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
#define UDP_APP_RECV_BUFFER_SIZE 128
int main(int argc, char *argv[]) {
int connfd = socket(AF_INET, SOCK_DGRAM, 0);
if (connfd == -1) {
printf("sockfd(套接字描述符)分配失败\n");
return -1;
}
struct sockaddr_in localaddr, clientaddr; // struct sockaddr
memset(&localaddr, 0, sizeof(struct sockaddr_in));
localaddr.sin_port = htons(8889);
localaddr.sin_family = AF_INET;
localaddr.sin_addr.s_addr = inet_addr("192.168.43.130"); // 0.0.0.0
bind(connfd, (struct sockaddr*)&localaddr, sizeof(localaddr));
char buffer[UDP_APP_RECV_BUFFER_SIZE] = {0};
socklen_t addrlen = sizeof(clientaddr);
while(1) {
if (recvfrom(connfd, buffer, UDP_APP_RECV_BUFFER_SIZE, 0, (struct sockaddr*)&clientaddr, &addrlen) < 0) {
continue;
} else {
printf("recv from %s:%d, data:%s\n", inet_ntoa(clientaddr.sin_addr),ntohs(clientaddr.sin_port), buffer);
sendto(connfd, buffer, strlen(buffer), 0, (struct sockaddr*)&clientaddr, sizeof(clientaddr));
}
}
close(connfd);
}
使用gcc -o unix_udp unix_udp.c
命令编译运行后,在命令行中输入netstat -anop | grep 8889
,可以发现
- 端口
8889
被一个名为unix_udp
的进程(进程 ID 为1539
)监听。 - 该端口使用的是 UDP 协议。
本次目标是将unix_udp的代码完整复制到udp.c中,使用我们手工提供的接口能够运行起来且实现相同功能。架构如下图所示。
TODO
了解当前整个程序的架构
三个线程
TODO
- 在udp.c中,在main中启动udp_server线程
-
send buffer和 recv buffer的定义(在结构体connfd中)
-
udp应用所需接口
列出要实现的接口
可通过man 函数名
命令查询内核API。整理全部接口(五个)如下:
int socket(int domain, int type, int protocol);
int bind(int sockfd, const struct sockaddr *addr,
socklen_t addrlen);
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen);
ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen);
int close(int fd);
当然man查询时还能查到其他,比如sendmsg,什么时候用到再去实现。
了解每个接口的任务
一切都围绕着connfd而来,这是一个int类型的数值。socket进行connfd的分配,分配一个别人没有用过的int数值。bind就是绑定,一个connfd对应着一对”local ip : local port”。所以在socket时也要进行ip地址和port号的空间分配。recvfrom/sendto根据connfd查询到”local ip : local port”。close对connfd进行释放。
实现
在main中启动udp_server_entry线程
请注意dpdk启动线程时要求绑定core,所以这么写:
#if ENABLE_MULTHREAD
+ lcore_id = rte_get_next_lcore(lcore_id, 1, 0);
- rte_eal_remote_launch(pkt_process, mbuf_pool, rte_get_next_lcore(lcore_id, 1, 0););
+ rte_eal_remote_launch(pkt_process, mbuf_pool, lcore_id);
#endif
+#if ENABLE_UDP_APP
+
+ lcore_id = rte_get_next_lcore(lcore_id, 1, 0);
+ rte_eal_remote_launch(udp_server_entry, mbuf_pool, lcore_id);
+
+#endif
定义struct localhost
#if ENABLE_UDP_APP
struct localhost {
int fd;
uint32_t localip; //localip与localmac一一对应,在此处记录localmac是为了sendto的时候要写源mac地址,到时从这里找
uint8_t localmac[RTE_ETHER_ADDR_LEN];
uint16_t localport;
int protocal;
struct rte_ring *sendbuf, *recvbuf;
struct localhost* prev, *next; //双向链表。如果连接成千上万,那么就要用红黑树了
}
#endif
接下来实现接口:socket的实现
至于为什么不把内核的socket实现粘贴过来:就算原封不动的移植还是比自己重写要麻烦。此外我们已经实现了一些内容,和内核协议栈不完全一致,在此基础上照搬只会更难。
#if ENABLE_UDP_APP
static struct localhost *lhost = NULL; //链表
#define DEFAULT_FD_NUM 3
int get_fd_from_bitmap(void){
int fd = DEFAULT_FD_NUM;
return fd;
}
struct localhost * get_hostinfo_from_fd(int sockfd) {
struct localhost *host;
for (host = lhost; host !- NULL; host = host->next){
if (sockfd == host->fd)
return host;
}
return NULL;
}
int socket(int domain, int type, int protocol){
int fd = get_fd_from_bitmap();// 0(标准输入),1(标准输出),2(标准输出错误)已被系统使用
struct localhost *host = rte_malloc("localhost", sizeof(struct localhost), 0);
if (host == NULL) {
printf("rte_malloc localhost 失败\n");
return -1;
}
memset(host, 0, sizeof(struct localhost *));
host->fd = fd;
if (type == SOCK_DGRAM)
host->protocol = IPPROTO_UDP;
else if (type == SOCK_STGREAM)
host->protocol = IPPROTO_TCP;
host->recvbuf = rte_ring_create("recvbuf", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
if (host->recvbuf == NULL) {
rte_free(host);
printf("recvbuf ring 创建失败\n");
return -1;
}
host->sendbuf = rte_ring_create("sendbuf", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
if (host->sendbuf == NULL) {
rte_ring_free(host->recvbuf);
rte_free(host);
printf("sendbuf ring 创建失败\n");
return -1;
}
LL_ADD(host, lhost);
return fd;
}
#endif
接口bind的实现
#if ENABLE_UDP_APP
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen){
struct localhost *host = get_hostinfo_from_fd(sockfd);
if (host == NULL) {
printf("绑定失败\n");
return -1;
}
// 绑定,即写入struct localhost中的localip/mac/port三项
struct sockaddr_in * laddr = (struct sockaddr_in *)addr;
host->localport = laddr->sin_port;
rte_memcpy(&host->localip, laddr->sin_addr.s_addr, sizeof(uint32_t));
rte_memspy(host->localmac, gSrcMac, RTE_ETHER_ADDR_LEN);// gSrcMac是全局变量,main中一上来通过dpdk函数取得的。稳妥做法是开启dpdk时先维护一些ip地址和mac地址的数值对。这个全局变量一看就不保险。
return 0;
}
#endif
接口close的实现
与socked是对应的——那里申请,这里释放;那里添加,这里移除。
“`c
#if ENABLE_UDP_APP
int close(int fd){
struct localhost *host = get_host_info_from_fd(sockfd);
if (host == NULL) return -1;
<pre><code>LL_REMOVE(host, lhost);
if (host->recvbuf) {
rte_ring_free(host->recvbuf);
}
if (host->sendbuf) {
rte_ring_free(host->sendbuf);
}
rte_free(host);
return 0;
</code></pre>
}
#endif
“`
udp_process函数的实现:在用户态协议栈中把数据抛到recv buffer
recvfrom要做的是从recvbuf中取出数据包继续向上提交,所以在实现recvfrom之前,先要在pkt_process(用户协议栈)中把udp包放入recvbuf。
为了每个函数不太长,现在将pkt_process中处理udp协议报文的部分单独提出,命名为udp_process。为了好理解,可以先把原来的那部分直接剪切到udp_process函数中(但稍后udp_process函数中大部分要被删改)。
static int udp_process(struct rte_mbuf * udpmbuf){
struct rte_ipv4_hdr *iphdr = rte_pktmbuf_mtod_offset(udpmbuf, struct rte_ipv4_hdr *,
sizeof(struct rte_ether_hdr));
struct rte_udp_hdr *udphdr = (struct rte_udp_hdr *)(iphdr + 1);
#ifdef ENABLE_SEND
rte_memcpy(gDstMac, ehdr->s_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
rte_memcpy(&gSrcIp, &iphdr->dst_addr, sizeof(uint32_t));
rte_memcpy(&gDstIp, &iphdr->src_addr, sizeof(uint32_t));
rte_memcpy(&gSrcPort, &udphdr->dst_port, sizeof(uint16_t));
rte_memcpy(&gDstPort, &udphdr->src_port, sizeof(uint16_t));
#endif
uint16_t length = ntohs(udphdr->dgram_len); //经验:网络字节序2字节以上必须转,不管实际是什么序
*((char*)udphdr + length) = '\0';
struct in_addr addr;
addr.s_addr = iphdr->src_addr;
printf("[recv][udp协议]src: %s:%d, ", inet_ntoa(addr), ntohs(udphdr->src_port));
addr.s_addr = iphdr->dst_addr;
printf("dst: %s:%d, 内容:%s length=%d\n", inet_ntoa(addr), ntohs(udphdr->dst_port),
(ntohs(udphdr->dst_port) == 8888) ? (char *)(udphdr+1) : "略", length);
}
#endif
接下来进行修改。首先重构包需要一个新的结构体,offload。
应用层包类型:struct offload的定义
作用:组装udp数据
struct offload{
uint32_t sip, dip;
uint16_t aport, dport;
uint8_t protocol;
unsigned char *data;
uint16_t length;
}
get_hostinfo_from_ipport函数的实现:监听的真正含义
通过端口找fd
struct localhost * get_hostinfo_from_ipport(uint32_t dip, uint16_t port, itn proto) {
struct localhost *host;
for (host = lhost; host !- NULL; host = host->next){
if (dip == host->localip && port == host->localport && proto == host->protocal)
return host;
}
return NULL;
}
至此梳理出udp_process做三件事:
- 解析
- 转换包类型,从rte_mbuf到struct offload
- 将offload入队(enqueue)到recvbuf
最终udp_process的实现:
#if ENABLE_MULTHREAD
#if ENABLE_UDP_APP
static int udp_process(struct rte_mbuf * udpmbuf){
struct rte_ipv4_hdr *iphdr = rte_pktmbuf_mtod_offset(udpmbuf, struct rte_ipv4_hdr *,
sizeof(struct rte_ether_hdr));
struct rte_udp_hdr *udphdr = (struct rte_udp_hdr *)(iphdr + 1);
struct locolhost *host = get_hostinfo_from_ipport(iphdr->dst_addr, udphdr->dst_port, iphdr->next_proto_id);
if (host == NULL) {
rte_pktmbuf_free(udpmbuf);
return -3;
}
struct offload *ol = rte_malloc("offload", sizeof(struct offload), 0);
if (ol == NULL) {
rte_pktmbuf_free(udpmbuf);
return -1;
}
ol->dip = iphdr->dst_addr;
ol->sip = iphdr->src_addr;
ol->sport = udphdr->src_port;
ol->dport = udphdr->dst_port;
ol->protocol = IPPROTO_UDP;
ol->length = ntohs(udphdr->dgram_len);
ol->data = rte_malloc("unsigned char*", ol->length - sizeof(struct rte_udp_hdr), 0);
if (ol->data == NULL) {
rte_pktmbuf_free(udpmbuf);
rte_free(ol);
return -2;
}
rte_ring_mp_enqueue(host->recvbuf, ol);
rte_pktmbuf_free(udpmbuf);
}
#endif
static int pkt_process(void *arg) {…}
#endif
接口recvfrom的实现
包已经在recv buf中等待我们取出了,此时终于可以写recvfrom了:
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen) {
struct localhost *host = get_host_info_from_fd(sockfd);
if (host == NULL) return -1;
struct offload *ol = NULL;
int nb == rte_ring_mc_dequeue(host->sendbuf, &ol);
if (nb < 0){
//阻塞
}
struct sockaddr_in *saddr = (struct sockaddr_in *)arc_addr;
asddr->sin_port = ol->dport;
rte_memcpy(&saddr->sin_addr.s_addr, &ol->sip, sizeof(uint32_t));
}
条件变量的使用:如果当下无可收的包,进入阻塞态
定义
struct localhost {
…
+ pthread_cond_t codn;
+ pthread_mutex_t mutex;
}
初始化,在socked中
int socket(int domain, int type, int protocol){
…
+ pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
+ rte_memcpy(&host->cond, &blank_cond, sizeof(pthread_cond_t));
+ pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
+ rte_memcpy(&host->mutex, &blank_mutex, sizeof(pthread_mutex_t));
LL_ADD(host, lhost);
return fd;
}
阻塞
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen) {
…
struct offload *ol = NULL;
+ int nb = -1;
+ pthread_mutex_lock(&host->mutex);
+ while ((nb = rte_ring_mc_dequeue(host->sendbuf, &ol)) < 0){
+ pthread_cond_wait(&host->cond, &host->mutex);
+ }
+ pthread_mutex_unlock(&host->mutex);
…
}
唤醒
在udp_process将包放入队列时
static int udp_process(struct rte_mbuf * udpmbuf){
…
rte_ring_mp_enqueue(host->recvbuf, ol);
+ pthread_mutex_lock(&host->mutex);
+ pthread_cond_signal(&host->cond);
+ pthread_mutex_unlock(&host->mutex);
…
}
接口sendto的实现
准备一个offload把它enqueue到send buf中
#if ENABLE_UDP_APP
static ssize_t sendto(int sockfd, const void *buf, size_t len, __attribute__((unused)) int flags,
const struct sockaddr *dest_addr, __attribute__((unused)) socklen_t addrlen){
struct localhost *host = get_hostinfo_from_fd(sockfd);
if (host == NULL) return -1;
const struct sockaddr_in *daddr = (const struct sockaddr_in *)dest_addr;
struct offload *ol = rte_malloc("offload", sizeof(struct offload), 0);
if (ol == NULL) return -1;
ol->dip = daddr->sin_addr.s_addr;
ol->dport = daddr->sin_port;
ol->sip = host->localip;
ol->sport = host->localport;
ol->length = len;
ol->data = rte_malloc("unsigned char *", len, 0);
if (ol->data == NULL) {
rte_free(ol);
return -1;
}
rte_memcpy(ol->data, buf, len);
rte_ring_mp_enqueue(host->sendbuf, ol);
return len;
}
#endif
}
udp_out的实现
从send buf中取出(类型为offload),放入out buf(类型为rte_mbuf),中间要做个类型转换(重新组包)。且它不监听,而是遍历全部fd对应的send buf,有就发。
回忆:从rte_mbuf到struct offload的转换是由udp_process做的。
static int udp_out(struct rte_mempool *mbuf_pool) {
struct localhost *host;
for (host = lhost; host != NULL; host = host->next) {
struct offload * ol;
int nb_snd = rte_ring_mc_dequeue(host->sendbuf, &ol);
if (nb_snd < 0) continue; //此处就没必要像接收那里做阻塞
uint8_t * dstmac = jr_get_dst_macaddr(ol->dip);
if (dstmac == NULL) {
struct rte_mbuf *arpbuf = jr_send_arp(mbuf_pool, RTE_ARP_OP_REQUEST, gDefaultArpMac, ol->sip, ol->dip);
// 组织完arp包后不能直接发,而是放入out buffer
struct inout_ring *ring = ringInstance();
rte_ring_mp_enqueue_burst(ring->out, (void **) &arpbuf, 1, NULL);
rte_ring_mp_enqueue(host->sendbuf, ol);
} else {
struct rte_mbuf *udpbuf = jr_udp_pkt(mbuf_pool, ol->sip, ol->dip, ol->sport, ol->dport, host->localmac, dstmac, ol->data, ol->length);
struct inout_ring *ring = ringInstance();
rte_ring_mp_enqueue_burst(ring->out, (void **) udpbuf, 1, NULL);
}
}
return 0;
}