WebStorm 国外镜像 Promise java开发环境变量 Python爬虫实战 xpath pdo 虚拟机 jestjs jwt vue路由 vue网站 vue添加class svn默认安装路径 solidworks图库 js控制台打印 python开发界面 python路径设置 java数据库连接 java中继承 java正则表达式匹配 java表达式 java定义变量 java语言是什么 java集合框架图 linuxshell python教程下载 圣骑士装备 typemonkey js添加元素 p6软件 亚索刀光 苹果手机添加邮箱 ps怎么画漫画 ip地址切换器 梦想世界答题器 地图数据采集 华为手机屏保怎么设置 熊猫头表情包制作 dbgview
当前位置: 首页 > 学习教程  > 编程语言

socket的sk_wmem_alloc

2020/10/8 18:12:11 文章标签:

在socket的结构体里有一个sk_wmem_alloc字段,该字段表示已经提交到ip层,但还没有从本机发送出去的skb占用空间大小。 分配时机 当tcp层封装好skb数据后,会调用tcp_transmit_skb,在该函数会根据skb的长度相应增加sk_wmem_alloc的值…

    在socket的结构体里有一个sk_wmem_alloc字段,该字段表示已经提交到ip层,但还没有从本机发送出去的skb占用空间大小。

分配时机

    当tcp层封装好skb数据后,会调用tcp_transmit_skb,在该函数会根据skb的长度相应增加sk_wmem_alloc的值,然后发送给ip层。

static int tcp_transmit_skb(struct sock *sk, struct sk_buff *skb, int clone_it,
			    gfp_t gfp_mask)
{
	...
	//设置skb的释放处理函数
	skb->destructor = skb_is_tcp_pure_ack(skb) ? __sock_wfree : tcp_wfree;
	skb_set_hash_from_sk(skb, sk);
	//增加sk_wmen_alloc大小
	atomic_add(skb->truesize, &sk->sk_wmem_alloc);

	//发送给ip层
	err = icsk->icsk_af_ops->queue_xmit(sk, skb, &inet->cork.fl);

	return net_xmit_eval(err);
}

释放时机

    当驱动发送完skb,收到中断消息后,会进入ixgbe_clean_tx_irq流程,这里会调用napi_consume_skb,最终通过skb_release_head_state调用skb->destructor回收sk_wmem_alloc内存空间,正常非纯ack的tcp数据,destructor执行的是tcp_wfree函数。

void napi_consume_skb(struct sk_buff *skb, int budget)
{
	if (unlikely(!skb))
		return;

	/* Zero budget indicate non-NAPI context called us, like netpoll */
	if (unlikely(!budget)) {
		dev_consume_skb_any(skb);
		return;
	}

	if (likely(atomic_read(&skb->users) == 1))
		smp_rmb();
	else if (likely(!atomic_dec_and_test(&skb->users)))
		return;
	/* if reaching here SKB is ready to free */
	trace_consume_skb(skb);

	/* if SKB is a clone, don't handle this case */
	//驱动收到中断时,走这个流程,执行__kfree_skb
	if (skb->fclone != SKB_FCLONE_UNAVAILABLE) {
		__kfree_skb(skb);
		return;
	}

	_kfree_skb_defer(skb);
}

static void skb_release_head_state(struct sk_buff *skb)
{
	skb_dst_drop(skb);
#ifdef CONFIG_XFRM
	secpath_put(skb->sp);
#endif
	if (skb->destructor) {
		WARN_ON(in_irq());
		skb->destructor(skb);
	}
#if IS_ENABLED(CONFIG_NF_CONNTRACK)
	nf_conntrack_put(skb_nfct(skb));
#endif
#if IS_ENABLED(CONFIG_BRIDGE_NETFILTER)
	nf_bridge_put(skb->nf_bridge);
#endif
}

void tcp_wfree(struct sk_buff *skb)
{
	struct sock *sk = skb->sk;
	struct tcp_sock *tp = tcp_sk(sk);
	unsigned long flags, nval, oval;
	int wmem;

	/* Keep one reference on sk_wmem_alloc.
	 * Will be released by sk_free() from here or tcp_tasklet_func()
	 */
	//回收sk_wmem_alloc
	wmem = atomic_sub_return(skb->truesize - 1, &sk->sk_wmem_alloc);

	/* If this softirq is serviced by ksoftirqd, we are likely under stress.
	 * Wait until our queues (qdisc + devices) are drained.
	 * This gives :
	 * - less callbacks to tcp_write_xmit(), reducing stress (batches)
	 * - chance for incoming ACK (processed by another cpu maybe)
	 *   to migrate this flow (skb->ooo_okay will be eventually set)
	 */
	if (wmem >= SKB_TRUESIZE(1) && this_cpu_ksoftirqd() == current)
		goto out;

	for (oval = READ_ONCE(sk->sk_tsq_flags);; oval = nval) {
		struct tsq_tasklet *tsq;
		bool empty;

		if (!(oval & TSQF_THROTTLED) || (oval & TSQF_QUEUED))
			goto out;

		nval = (oval & ~TSQF_THROTTLED) | TSQF_QUEUED | TCPF_TSQ_DEFERRED;
		nval = cmpxchg(&sk->sk_tsq_flags, oval, nval);
		if (nval != oval)
			continue;

		/* queue this socket to tasklet queue */
		local_irq_save(flags);
		tsq = this_cpu_ptr(&tsq_tasklet);
		empty = list_empty(&tsq->head);
		list_add(&tp->tsq_node, &tsq->head);
		if (empty)
			tasklet_schedule(&tsq->tasklet);
		local_irq_restore(flags);
		return;
	}
out:
	sk_free(sk);
}

skb数据销毁时机

在tcp_wfree函数里并没有真正的释放发送的skb数据,仅仅只是回收sk_wmem_alloc空间,因为tcp为了保证可靠性,skb的数据需要等到ack流程里才能释放,如果超时丢包等还需要再次用到skb重传。在tcp_ack流程里,会通过tcp_clean_rtx_queue将skb从发送队列里移除,并调用sk_wmem_free_skb真正释放skb的数据内容。

static int tcp_clean_rtx_queue(struct sock *sk, int prior_fackets,
			       u32 prior_snd_una, int *acked,
			       struct tcp_sacktag_state *sack)
{

	while ((skb = tcp_write_queue_head(sk)) && skb != tcp_send_head(sk)) {
		...
		tcp_unlink_write_queue(skb, sk);
		sk_wmem_free_skb(sk, skb);
	}
}

void __kfree_skb(struct sk_buff *skb)
{
	skb_release_all(skb);
	kfree_skbmem(skb);
}

sk_wmem_free_skb最终也会通过__kfree_skb来释放skb数据内容,之前在驱动收到tx中断处理流程里,回收sk_wmem_alloc空间的时候也是通过这个函数来完成的,两个流程调用同一个处理函数,怎么做到驱动只是回收sk_wmem_alloc,而ack流程才去真正释放skb数据内容的呢?要回答这个问题就得先了解清楚skb的分配过程了。

sk_stream_alloc_skb

    在tcp_sendmsg里,当发现发送队列的最后skb空间不足时,内核会调用sk_stream_alloc_skb分配一个新的skb,分配完成后通过skb_entail将skb插入write_queue队列里。在看sk_stream_alloc_skb实现细节前,先看下它的参数size,这个size表示要分配的skb的线性区域空间大小,它是通过select_size函数来计算出来,当不支持sg的时候分配线性区域空间大小为mss的大小,当支持sg并且可以gso的时候,会通过linear_payload_sz来计算,这里的first_skb表示是否是wirte_queue的第一个skb,如果是,则申请(2048-tcp消息头)长度,否则不申请线性区域,这里主要是为了提高sack的处理流程,在sack里需要通过tcp_shift_skb_data对skb做一些迁移操作,skb只有非线性数据可以提高处理效率。

static int select_size(const struct sock *sk, bool sg, bool first_skb)
{
	const struct tcp_sock *tp = tcp_sk(sk);
	int tmp = tp->mss_cache;

	if (sg) {
		if (sk_can_gso(sk)) {
			tmp = linear_payload_sz(first_skb);
		} else {
			int pgbreak = SKB_MAX_HEAD(MAX_TCP_HEADER);

			if (tmp >= pgbreak &&
			    tmp <= pgbreak + (MAX_SKB_FRAGS - 1) * PAGE_SIZE)
				tmp = pgbreak;
		}
	}

	return tmp;
}

static int linear_payload_sz(bool first_skb)
{
	if (first_skb)
		return SKB_WITH_OVERHEAD(2048 - MAX_TCP_HEADER);
	return 0;
}


struct sk_buff *sk_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp,
				    bool force_schedule)
{
	struct sk_buff *skb;

	/* The TCP header must be at least 32-bit aligned.  */
	size = ALIGN(size, 4);

	if (unlikely(tcp_under_memory_pressure(sk)))
		sk_mem_reclaim_partial(sk);

	skb = alloc_skb_fclone(size + sk->sk_prot->max_header, gfp);
	if (likely(skb)) {
		bool mem_scheduled;

		if (force_schedule) {
			mem_scheduled = true;
			sk_forced_mem_schedule(sk, skb->truesize);
		} else {
			mem_scheduled = sk_wmem_schedule(sk, skb->truesize);
		}
		if (likely(mem_scheduled)) {
			skb_reserve(skb, sk->sk_prot->max_header);
			/*
			 * Make sure that we have exactly size bytes
			 * available to the caller, no more, no less.
			 */
			skb->reserved_tailroom = skb->end - skb->tail - size;
			return skb;
		}
		__kfree_skb(skb);
	} else {
		sk->sk_prot->enter_memory_pressure(sk);
		sk_stream_moderate_sndbuf(sk);
	}
	return NULL;
}

    sk_stream_alloc_skb调用alloc_skb_fclone分配skb,最终调用欧冠__alloc_skb时会传递一个SKB_ALLOC_FCLONE参数,这个参数表示fast clone的意思,因为tcp在分配完skb并填充完用户数据,通过tcp_transmit_skb进一步封装tcp头然后发送给ip层,在tcp_transmit_skb里会先把skb clone一份,这样传递给ip层的skb就是clone出来的skb,而原来的skb则还是保存在write_queue里。

static inline struct sk_buff *alloc_skb_fclone(unsigned int size,
					       gfp_t priority)
{
	return __alloc_skb(size, priority, SKB_ALLOC_FCLONE, NUMA_NO_NODE);
}

    通过fclone分配的skb,会同时分配两个skb,每个skb都有一个fclone标志为,用来表示是orig的skb(存放在write_queue)还是clone的skb(真正给驱动发送使用的),它的空间布局如下所示,第一个skb的fclone=1(SKB_FCLONE_ORIG)表示skb为源skb,第二个skb的fclone设置为2(SKB_FCLONE_CLONE),表示是clone出来的skb。这个是在__alloc_skb里初始化设置的。

struct sk_buff *__alloc_skb(unsigned int size, gfp_t gfp_mask,
			    int flags, int node)
{
	if (flags & SKB_ALLOC_FCLONE) {
		struct sk_buff_fclones *fclones;

		fclones = container_of(skb, struct sk_buff_fclones, skb1);

		kmemcheck_annotate_bitfield(&fclones->skb2, flags1);
		//设置第一个skb为源skb
		skb->fclone = SKB_FCLONE_ORIG;
                //设置fclone_ref引用参数为1
		atomic_set(&fclones->fclone_ref, 1);
		//设置第二个skb为clone skb
		fclones->skb2.fclone = SKB_FCLONE_CLONE;
	}
out:
	return skb;
nodata:
	kmem_cache_free(cache, skb);
	skb = NULL;
	goto out;
}

 

tcp_transmit_skb

    在封装好用户数据后,调用tcp_transimit_skb准备发送skb时,这里的clone_it表置位为1,因此进入skb_clone,在skb_clone返回的skb即为__alloc_skb里分配的的二个skb,当然clone的过程会把第二个skb的内容指向第一个skb。

static int tcp_transmit_skb(struct sock *sk, struct sk_buff *skb, int clone_it,
			    gfp_t gfp_mask)
{
	if (clone_it) {
		skb_mstamp_get(&skb->skb_mstamp);
		TCP_SKB_CB(skb)->tx.in_flight = TCP_SKB_CB(skb)->end_seq
			- tp->snd_una;
		tcp_rate_skb_sent(sk, skb);

		if (unlikely(skb_cloned(skb)))
			skb = pskb_copy(skb, gfp_mask);
		else
			//返回sk_buff_fclones的第二个skb,同时更新fclones->fclone_ref引用参数为2
			skb = skb_clone(skb, gfp_mask);
		if (unlikely(!skb))
			return -ENOBUFS;
	}
}

__kfree_skb

    现在再来看下__kfree_skb的调用流程,会有两个调用时机:

    1)、驱动收到tx中断时,通过napi_consume_skb调用(这里使用的是第二个skb,即clone出来的skb);

    2)、tcp_ack流程通过tcp_clean_rtx_queue调用(这里使用的是write_queue队列的skb,即orig skb);

static void kfree_skbmem(struct sk_buff *skb)
{
	struct sk_buff_fclones *fclones;

	switch (skb->fclone) {
	case SKB_FCLONE_UNAVAILABLE:
		kmem_cache_free(skbuff_head_cache, skb);
		return;

	case SKB_FCLONE_ORIG:
		fclones = container_of(skb, struct sk_buff_fclones, skb1);

		/* We usually free the clone (TX completion) before original skb
		 * This test would have no chance to be true for the clone,
		 * while here, branch prediction will be good.
		 */
		//如果是第一个skb(tcp_ack流程调用),会进入fastpash,然后free skb数据内容
		if (atomic_read(&fclones->fclone_ref) == 1)
			goto fastpath;
		break;

	default: /* SKB_FCLONE_CLONE */
		fclones = container_of(skb, struct sk_buff_fclones, skb2);
		break;
	}
	//如果是第二个skb,检查fclone_ref是否为1,正常驱动在napi_consume_skb进入时
	//这个fclone_ref为2,因此减1之后还是非0,直接return,不释放skb数据内容 
	if (!atomic_dec_and_test(&fclones->fclone_ref))
		return;
fastpath:
	kmem_cache_free(skbuff_fclone_cache, fclones);
}

 


本文链接: http://www.dtmao.cc/news_show_250003.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?