Git 摩尔投票法 私有变量 北斗系统 string vue安装 vue学习 大数据驾驶舱 bootstrap时间轴 当前线程等待5秒 idea生成main方法 matlab复数求模 idea批量替换快捷键 Navicat python数据格式 python实例 python怎么调用函数 python怎么使用 java的数据结构 java获取文件大小 java格式化字符串 java声明变量 变量的类型 java删除 h5模板 离散数学及其应用 方正流行体 运行时错误1004 高效能人士的七个习惯pdf Ext2Fsd 图片批量处理工具 免费脚本 模拟人生2夜生活 lol语音包 幽灵行动多少钱 游戏linux正则表达式 文件压缩工具 ps给图片加边框 js压缩图片 jquery添加样式
当前位置: 首页 > 学习教程  > 编程语言

Netty核心组件分析(一)

2020/11/4 14:53:35 文章标签:

Netty 从源码中了解 一 了解何为Netty1.Netty详解1.1 Netty的启动1.1.1 Bootsrap1.1.2 ServerBootstrap1.1.3 EventLoopGroup1.1.4 ChannelHandler了解何为Netty 官网网站 : https://netty.io/Netty是一个NIO客户端服务器框架,可快速轻松地开发网络应用程序&#x…

Netty 从源码中了解 一

  • 了解何为Netty
  • 1.Netty详解
    • 1.1 Netty的启动
      • 1.1.1 Bootsrap
      • 1.1.2 ServerBootstrap
      • 1.1.3 EventLoopGroup
      • 1.1.4 ChannelHandler

了解何为Netty

  1. 官网网站 : https://netty.io/
  2. Netty是一个NIO客户端服务器框架,可快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了网络编程,例如TCP和UDP套接字服务器。

1.Netty详解

了解Netty的第一步先从Netty的启动类开始接触

1.1 Netty的启动

Bootsrap,ServerBootstrap

1.1.1 Bootsrap

1.1.2 ServerBootstrap

ServerBootstrap的类图描述

ServerBootstrap的类图

@Override
    public ServerBootstrap group(EventLoopGroup group) {
        return group(group, group);
    }
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
		// 调用的是父类的AbstactBootstrap的group
		// 主要还是把 parentGroup的EventLoopGroup进行设置 
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

1.1.3 EventLoopGroup

类图

Netty 的代码结构略微有点儿复杂,我们先通过类图对 EventLoop 有个基本认识。

  • Netty 的任务调度框架还是遵循了 java.util.concurrent 中 Executor 的接口 Netty
    任务调度框架的实现在 io.netty.util.concurrent 包中,其接口是 EventExecutor 和
  • EventExecutorGroup。从命名可以看出任务调度是基于事件的。 io.netty.channel 包中
  • EventLoopGroup 和 EventLoop 基于 io.netty.util.concurrent 封装了 channel
    执行的业务逻辑

EventLoopGroup 一对多 EventLoop
EventLoop 一对多 Channel

EventLoopGroup
EventLoop1
EventLoop2
Channel1
Channel2
    ChannelFuture register(Channel var1);
    ChannelFuture register(ChannelPromise var1);

EventLoopGroup的初始化核心逻辑

// 
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
        this.terminatedChildren = new AtomicInteger();
        this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        } else {
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
            }
            this.children = new EventExecutor[nThreads];
            int j;
            for(int i = 0; i < nThreads; ++i) {
                boolean success = false;
                boolean var18 = false;
                try {
                    var18 = true;
                    this.children[i] = this.newChild((Executor)executor, args);
                    success = true;
                    var18 = false;
                } catch (Exception var19) {
                    throw new IllegalStateException("failed to create a child event loop", var19);
                } finally {
                    if (var18) {
                        if (!success) {
                            int j;
                            for(j = 0; j < i; ++j) {
                                this.children[j].shutdownGracefully();
                            }
                            for(j = 0; j < i; ++j) {
                                EventExecutor e = this.children[j];
                                try {
                                    while(!e.isTerminated()) {
                                        e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                                    }
                                } catch (InterruptedException var20) {
                                    Thread.currentThread().interrupt();
                                    break;
                                }
                            }
                        }
                    }
                }
                if (!success) {
                    for(j = 0; j < i; ++j) {
                        this.children[j].shutdownGracefully();
                    }
                    for(j = 0; j < i; ++j) {
                        EventExecutor e = this.children[j];
                        try {
                            while(!e.isTerminated()) {
                                e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException var22) {
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
            this.chooser = chooserFactory.newChooser(this.children);
            FutureListener<Object> terminationListener = new FutureListener<Object>() {
                public void operationComplete(Future<Object> future) throws Exception {
                    if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) {
                        MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null);
                    }
                }
            };
            EventExecutor[] var24 = this.children;
            j = var24.length;
            for(int var26 = 0; var26 < j; ++var26) {
                EventExecutor e = var24[var26];
                e.terminationFuture().addListener(terminationListener);
            }
            Set<EventExecutor> childrenSet = new LinkedHashSet(this.children.length);
            Collections.addAll(childrenSet, this.children);
            this.readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    }	

核心: this.children[i] = this.newChild((Executor)executor, args);
对于NioEventLoopGroup的具体实现

protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2]);
    }

this: NioEventLoopGroup的引入
executor: 默认情况下是 ThreadPerTaskExecutor
(SelectorProvider)args[0]:
从上文中看默认是ThreadPerTaskExecutor, 具体ThreadPerTaskExecutor起到何作用,后面会提及

if (executor == null) {
		executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
}

((SelectStrategyFactory)args[1]).newSelectStrategy(): 故名思义,就是Select的 选择策略,下文会讲
**(RejectedExecutionHandler)args[2]:**控制任务的拒绝策略,默认为抛出 RejectedExecutionException

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
    super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}

从此代码中可以看出,NioEventLoopGroup和EventLoopGroup的联系,strategy,rejectedExecutionHandler的具体用处 不细数,查看源码即可
final SelectorTuple selectorTuple = openSelector();

this.chooser = chooserFactory.newChooser(this.children);
提供了选择方案

	// DefaultEventExecutorChooserFactory.class
	@Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

channel 往 EventLoop 中进行注册
的核心逻辑

public class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup{
	// 调用父类的 next()方法
	public EventLoop next() {
        return (EventLoop)super.next();
    }
	public ChannelFuture register(Channel channel) {
	        return this.next().register(channel);
	}
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup{
	private final EventExecutorChooser chooser;
	public EventExecutor next() {
	        return this.chooser.next();
	}
}

1.1.4 ChannelHandler

在一篇中单独研究


本文链接: http://www.dtmao.cc/news_show_350260.shtml

附件下载

相关教程

    暂无相关的数据...

共有条评论 网友评论

验证码: 看不清楚?