目标
之前是在main函数的while(1)中实现了全部功能,本次把之前的流水账改为层次化。层次化需要不同模块共同运作,依赖环形队列进行数据传送,这就叫做“异步解耦”。如图所示。
实现步骤
环形队列
先定义一对环形队列,把它俩封装到一个struct中,命名为inout_ring
#if ENABLE_RINGBUFFER
struct inout_ring {
struct rte_ring *in, *out;
};
#endif
in和out全局只有一对,所以设计成单例。单例在arp中已经使用过。
#if ENABLE_RINGBUFFER
static struct inout_ring *rInst = NULL;
static struct inout_ring *ringInstance(void){
if (rInst == NULL) {
rInst = rte_malloc("in out ring", sizeof(struct inout_ring), 0);
memset(rInst, 0, sizeof(struct inout_ring));
}
return rInst;
}
#endif
main的改造
环形队列初始化
#if ENABLE_RINGBUFFER
struct inout_ring *ring = ringInstance();
if (ring == NULL) {
rte_exit(EXIT_FAILURE, "环形缓冲区初始化失败\n");
}
if (ring->in == NULL){
// 查阅函数定义,如何去初始化ring
ring->in = rte_ring_create("in ring", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ); //第四个参数flag是从example中抄来的
}
if (ring->out == NULL){
ring->out = rte_ring_create("out ring", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
}
#endif
while(1) 的简化
while (1) {
// 接收数据(以太网帧),放到 in buffer
struct rte_mbuf *rx_mbufs[BURST_SIZE]; //数组指针, mbufs尚未分配实际空间,空间从内存池来
unsigned num_recvd = rte_eth_rx_burst(gDpdkPortId, 0, rx_mbufs, BURST_SIZE);
//参数1:端口<注意此端口不是tcp/udp的端口号,而是网络适配器eth0的端口>
//参数2:从哪个队列接收
//参数3:接收后放到哪
if (num_recvd > BURST_SIZE) {
rte_exit(EXIT_FAILURE, "rte_eth_rx_burst接收出错,溢出\n");
} else if (num_recvd > 0) {
rte_ring_sp_enqueue_burst(ring->in, (void **)rx_mbufs, num_recvd, NULL);
}
// 从out buffer中取数据,发送
struct rte_mbuf *tx_mbufs[BURST_SIZE];
unsigned nb_tx = rte_ring_dequeue_burst(ring->out, (void **)tx_mbufs, BURST_SIZE, NULL);
if (nb_tx > 0) {
rte_eth_tx_burst(gDpdkPortId, 0, tx_mbufs, nb_tx);
// 分配与释放:对于in buffer中的项,是在接收时被分配,用户态协议栈被释放;
// 对于out buffer的项,是在用户态协议栈被分配,发送后释放。
for(unsigned i = 0; i < nb_tx; i++){
rte_pktmbuf_free(tx_mbufs[i]);
}
}
static uint64_t prev_tsc = 0, cur_tsc;
uint64_t diff_tsc;
cur_tsc = rte_rdtsc();
diff_tsc = cur_tsc - prev_tsc;
if (diff_tsc > TIMER_RESOLUTION_CYCLES) {
rte_timer_manage();
prev_tsc = cur_tsc;
}
}
创建pkt_process线程
位置:main中while之前
#if ENABLE_MULTHREAD
rte_eal_remote_launch(pkt_process, mbuf_pool, rte_get_next_lcore(lcore_id, 1, 0));
#endif
函数定义:
#if ENABLE_MULTHREAD
static int pkt_process(__attribute__((unused)) void *arg) {
}
#endif
图上四个箭头的依次实现
在main的while(1)中已经实现两个了。另外两个就是从in buf->pkt_process,以及pkt_process->out buf
我们先把原来while1中与协议处理的全部内容放到pkt_process中。
接收改为dequeue from in_buf,
全部的发送(rte_eth_tx_burst)改为向out buffer ring 入队,即代码中的:
rte_eth_tx_burst(gDpdkPortId, 0, &arpbuf, 1);
rte_pktmbuf_free(arpbuf);
全部改为:
rte_ring_mp_enqueue_burst(ring->out, (void **)&arpbuf, 1, NULL);
别忘记timer cb函数中也有arp报文的发送,也要改。
完整的pkt_process
// 用户态协议栈
static int pkt_process(__attribute__((unused)) void *arg) {
// 传参mbuf
// 取出in buf的东西,处理,释放,放入out buf
// 之前的协议栈代码复制过来
struct rte_mempool *mbuf_pool = (struct rte_mempool *)arg;
struct inout_ring *ring = ringInstance();
while (1) {
// dequeue
struct rte_mbuf *mbufs[BURST_SIZE];
unsigned num_recvd = rte_ring_mc_dequeue_burst(ring->in, (void **)mbufs, BURST_SIZE, NULL);
// 之前的协议栈代码复制过来
unsigned i = 0;
for (i = 0;i < num_recvd;i++) {
struct rte_ether_hdr *ehdr = rte_pktmbuf_mtod(mbufs[i], struct rte_ether_hdr*);// rte_pktmbuf_mtod是宏定义。参数1:内存地址 参数2:转换成什么类型
//// ehdr就是拿出的以太网数据
#ifdef ENABLE_ARP
if (ehdr->ether_type == rte_cpu_to_be_16(RTE_ETHER_TYPE_ARP)) {
// 解析这个arp报文并回复
struct rte_arp_hdr *ahdr = rte_pktmbuf_mtod_offset(mbufs[i], struct rte_arp_hdr *, sizeof(struct rte_ether_hdr));
// 调试
struct in_addr addr_source, addr_target;
addr_source.s_addr = ahdr->arp_data.arp_sip;
printf("[recv][arp协议] src: %s ", inet_ntoa(addr_source));
addr_target.s_addr = ahdr->arp_data.arp_tip;
printf("target:%s\n",inet_ntoa(addr_target));
if (ahdr->arp_data.arp_tip == gLocalIp) { // 如果arp问的就是自己的p地址
if (ahdr->arp_opcode == rte_cpu_to_be_16(RTE_ARP_OP_REQUEST)) {
printf("\t[是向本机的请求报文]\n");
struct rte_mbuf * arpbuf = jr_arp_send(mbuf_pool, RTE_ARP_OP_REPLY, ahdr->arp_data.arp_sha.addr_bytes, ahdr->arp_data.arp_tip, ahdr->arp_data.arp_sip);
//rte_eth_tx_burst(gDpdkPortId, 0, &arpbuf, 1);// 真正的发送操作,第四个参数:发一个包,第二个参数:使用0号队列发送
//rte_pktmbuf_free(arpbuf);
rte_ring_mp_enqueue_burst(ring->out, (void **)&arpbuf, 1, NULL);
}
else if (ahdr->arp_opcode == rte_cpu_to_be_16(RTE_ARP_OP_REPLY)) {
printf("\t[是给本机的应答报文]\n");
struct arp_table *table = arp_table_instance();
uint8_t *hardware_addr = jr_get_dst_macaddr(ahdr->arp_data.arp_sip);
if (hardware_addr == NULL){
// 解析mac地址,如果没存过,则存到arp表中
struct arp_entry *entry = rte_malloc("arp entry", sizeof(struct arp_entry), 0);
if (entry) { //注意此处若失败就失败了,不用终止程序
// 在entry中填上内容并加到链表中
entry->ip = ahdr->arp_data.arp_sip;
rte_memcpy(entry->hwaddr, ahdr->arp_data.arp_sha.addr_bytes, RTE_ETHER_ADDR_LEN);
entry->type = ARP_ENTRY_STATUS_DYNAMIC;
LL_ADD(entry, table->entries);
table->count++;
}
}
#if ENABLE_DEBUG
// 打印arp表
struct arp_entry *iter;
for (iter = table->entries;iter != NULL;iter = iter->next) {
// mac
jr_print_ether_addr("", (const struct rte_ether_addr *)iter->hwaddr);
// ip
struct in_addr addr;
addr.s_addr = iter->ip;
printf("\t\t%s", inet_ntoa(addr));
// type
printf("\t\t%s\n", iter->type == ARP_ENTRY_STATUS_DYNAMIC ? "dynamic" : "static");
}
#endif
}
rte_pktmbuf_free(mbufs[i]);
}
continue;
}
#endif
// 此处可看到对接口处理函数封装的已经很完善了
if (ehdr->ether_type != rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4)) {
// 如果不是ip协议,跳过
rte_pktmbuf_free(mbufs[i]);
continue;
}
// 如果是ip协议,取出
struct rte_ipv4_hdr *iphdr = rte_pktmbuf_mtod_offset(mbufs[i], struct rte_ipv4_hdr *,
sizeof(struct rte_ether_hdr));
if (iphdr->next_proto_id == IPPROTO_UDP) {
struct rte_udp_hdr *udphdr = (struct rte_udp_hdr *)(iphdr + 1);
#ifdef ENABLE_SEND
rte_memcpy(gDstMac, ehdr->s_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
rte_memcpy(&gSrcIp, &iphdr->dst_addr, sizeof(uint32_t));
rte_memcpy(&gDstIp, &iphdr->src_addr, sizeof(uint32_t));
rte_memcpy(&gSrcPort, &udphdr->dst_port, sizeof(uint16_t));
rte_memcpy(&gDstPort, &udphdr->src_port, sizeof(uint16_t));
#endif
uint16_t length = ntohs(udphdr->dgram_len); //经验:网络字节序2字节以上必须转,不管实际是什么序
*((char*)udphdr + length) = '\0';
struct in_addr addr;
addr.s_addr = iphdr->src_addr;
printf("[recv][udp协议]src: %s:%d, ", inet_ntoa(addr), ntohs(udphdr->src_port));
addr.s_addr = iphdr->dst_addr;
printf("dst: %s:%d, 内容:%s length=%d\n", inet_ntoa(addr), ntohs(udphdr->dst_port),
(ntohs(udphdr->dst_port) == 8888) ? (char *)(udphdr+1) : "略", length);
#ifdef ENABLE_SEND
if (gSrcIp == gLocalIp){
struct rte_mbuf * txbuf = jr_send(mbuf_pool, (uint8_t *)(udphdr+1), length);
//rte_eth_tx_burst(gDpdkPortId, 0, &txbuf, 1);// 真正的发送操作,第四个参数:发一个包,第二个参数:使用0号队列发送
//rte_pktmbuf_free(txbuf);
rte_ring_mp_enqueue_burst(ring->out, (void **)&txbuf, 1, NULL);
}
#endif
rte_pktmbuf_free(mbufs[i]);
}
#ifdef ENABLE_ICMP
if (iphdr->next_proto_id == IPPROTO_ICMP) {
printf("[recv][icmp]");
struct rte_icmp_hdr *icmphdr = (struct rte_icmp_hdr *)(iphdr + 1);
// 在此只实现echo请求报文的回复,类型是08(RTE_IP_ICMP_ECHO_REQUEST)
if (icmphdr->icmp_type == RTE_IP_ICMP_ECHO_REQUEST) {
printf("[request]\n");
struct rte_mbuf *txbuf = jr_send_icmp(
mbuf_pool, ehdr->s_addr.addr_bytes,
iphdr->dst_addr, iphdr->src_addr,
icmphdr->icmp_ident, icmphdr->icmp_seq_nb
);
//rte_eth_tx_burst(gDpdkPortId, 0, &txbuf, 1);// 真正的发送操作,第四个参数:发一个包,第二个参数:使用0号队列发送
//rte_pktmbuf_free(txbuf);
rte_ring_mp_enqueue_burst(ring->out, (void **)&txbuf, 1, NULL);
}
rte_pktmbuf_free(mbufs[i]);
}
#endif
}
}
return 0;
}
#endif
结果
完成了跟上一集一样的效果。