Netty学习——源码篇9 Netty的Handler其他处理 备份

 1 Handler>ChannelHandlerContext

        每个Handler>ChannelHandler被添加到ChannelPipeline后,都会创建一个Handler>ChannelHandlerContext,并与Handler>ChannelHandler关联绑定。Handler>ChannelHandlerContext允许Handler>ChannelHandler与其他的Handler>ChannelHandler进行交互。Handler>ChannelHandlerContext不会改变添加到其中的Handler>ChannelHandler,因此它是安全的。Handler>ChannelHandlerContext、Handler>ChannelHandler、ChannelPipeline的关系如下图:

2 Channel的声明周期

        Netty有一个简单但强大的状态模型,能完美映射到ChannelInboundHandler的各个方法。如下表所示是Channel生命周期四个不同的状态。

         一个Channel正常的生命周期如下图所示。随着状态发生变化产生相应的事件。这些事件被转发到ChannelPipeline中的Handler>ChannelHandler来触发相应的操作。

3 Handler>ChannelHandler常用的API

        先看一个Netty中整个Handler体系的类关系图。

        Netty定义了良好的类型层次结构来表示不同的处理程序类型,所有类型的父类是Handler>ChannelHandler, Handler>ChannelHandler提供了在其生命周期内添加或从ChannelPipeline中删除的方法,如下表

        Netty还提供了一个实现了Handler>ChannelHandler的抽象类Handler>ChannelHandlerAdapter。 Handler>ChannelHandlerAdapter实现了父类的所有方法,主要功能就是将请求从一个Handler>ChannelHandler往下传递到下一个Handler>ChannelHandler,直到全部Handler>ChannelHandler传递完毕。也可以直接继承于Handler>ChannelHandlerAdapter,然后重写里面的方法。

4 ChannelInboundHandler

        ChannelInboundHandler还提供了一些在接收数据或Channel状态改变时被调用的方法。下面是ChannelInboundHandler的一些方法。

5 异步处理Future

        java.util.concurrent.Future是Java原生API中提供的接口,用来记录异步执行的状态,Future的get方法会判断任务是否执行完成,如果完成立即返回执行结果,否则阻塞线程,知道任务完成再返回。

        Netty扩展了Java的Future,在Future的基础上扩展了监听器(Listener)接口,通过监听器可以让异步执行更加有效率,不需要通过调用get方法来等待异步执行结束,而是通过监听器回调来精确控制异步执行结束时间。

public interface Future<V> extends java.util.concurrent.Future<V> {

    boolean isSuccess();

    boolean isCancellable();


    Throwable cause();

    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);


    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    Future<V> sync() throws InterruptedException;

    
    Future<V> syncUninterruptibly();

    
    Future<V> await() throws InterruptedException;

 
    Future<V> awaitUninterruptibly();

    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

  
    boolean await(long timeoutMillis) throws InterruptedException;

   
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);

   
    boolean awaitUninterruptibly(long timeoutMillis);


    V getNow();

    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

        ChannelFuture接口有扩展了Netty的Future接口,表示一种没有返回值的异步调用,同时和一个Channel进行绑定。

public interface ChannelFuture extends Future<Void> {

    /**
     * Returns a channel where the I/O operation associated with this
     * future takes place.
     */
    Channel channel();

    @Override
    ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelFuture sync() throws InterruptedException;

    @Override
    ChannelFuture syncUninterruptibly();

    @Override
    ChannelFuture await() throws InterruptedException;

    @Override
    ChannelFuture awaitUninterruptibly();

  
    boolean isVoid();
}

6 异步执行Promise

        Promise接口也是Future的扩展接口,它表示一种可写的Future,可以自定义设置异步执行的结果。


/**
 * Special {@link Future} which is writable.
 */
public interface Promise<V> extends Future<V> {

    
    Promise<V> setSuccess(V result);

 
    boolean trySuccess(V result);

    /**
     * Marks this future as a failure and notifies all
     * listeners.
     *
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     */
    Promise<V> setFailure(Throwable cause);

    /**
     * Marks this future as a failure and notifies all
     * listeners.
     *
     * @return {@code true} if and only if successfully marked this future as
     *         a failure. Otherwise {@code false} because this future is
     *         already marked as either a success or a failure.
     */
    boolean tryFailure(Throwable cause);

    /**
     * Make this future impossible to cancel.
     *
     * @return {@code true} if and only if successfully marked this future as uncancellable or it is already done
     *         without being cancelled.  {@code false} if this future has been cancelled already.
     */
    boolean setUncancellable();

    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    @Override
    Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    @Override
    Promise<V> await() throws InterruptedException;

    @Override
    Promise<V> awaitUninterruptibly();

    @Override
    Promise<V> sync() throws InterruptedException;

    @Override
    Promise<V> syncUninterruptibly();
}

        ChannelPromise接口扩展了Promise和ChannelFuture,绑定了Channel,既可以写异步执行结果,又具备了监听者的功能,是Netty实际编程中使用的表示异步执行的接口。

public interface ChannelPromise extends ChannelFuture, Promise<Void> {

    @Override
    Channel channel();

    @Override
    ChannelPromise setSuccess(Void result);

    ChannelPromise setSuccess();

    boolean trySuccess();

    @Override
    ChannelPromise setFailure(Throwable cause);

    @Override
    ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);

    @Override
    ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);

    @Override
    ChannelPromise sync() throws InterruptedException;

    @Override
    ChannelPromise syncUninterruptibly();

    @Override
    ChannelPromise await() throws InterruptedException;

    @Override
    ChannelPromise awaitUninterruptibly();

    /**
     * Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
     */
    ChannelPromise unvoid();
}

        DefaultChannelPromise是ChannelPromise的实现类,它是实际运行时的Promise实例。Netty使用addListener方法来回调异步执行的结果。DefaultPromise的addListener()方法的代码如下

    public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
        checkNotNull(listener, "listener");

        synchronized (this) {
            addListener0(listener);
        }

        if (isDone()) {
            notifyListeners();
        }

        return this;
    }
    private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
        if (listeners == null) {
            listeners = listener;
        } else if (listeners instanceof DefaultFutureListeners) {
            ((DefaultFutureListeners) listeners).add(listener);
        } else {
            listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
        }
    }
    private void notifyListeners() {
        EventExecutor executor = executor();
        if (executor.inEventLoop()) {
            final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
            final int stackDepth = threadLocals.futureListenerStackDepth();
            if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
                threadLocals.setFutureListenerStackDepth(stackDepth + 1);
                try {
                    notifyListenersNow();
                } finally {
                    threadLocals.setFutureListenerStackDepth(stackDepth);
                }
                return;
            }
        }

        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                notifyListenersNow();
            }
        });
    }

        从上述代码中可以看到,DefaultChannelPromise会判断异步任务执行的状态,如果执行完毕就立即通知监听者,否则加入监听者队列。通知监听者就是找一个线程来执行调用监听者的回调函数。

        再来看监听者的接口,其实就是一个方法,即等待异步任务执行完毕后,获得Future结果,执行回调的逻辑,代码如下。

public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future  the source {@link Future} which called this callback
     */
    void operationComplete(F future) throws Exception;
}


http://www.niftyadmin.cn/n/5463252.html

相关文章

C#全新一代医院手术麻醉系统围术期全流程源码

目录 一、麻醉学科的起源 二、麻醉前访视与评估记录单 患者基本信息 临床诊断 患者重要器官功能及疾病情况 病人体格情况分级 手术麻醉风险评估 拟施麻醉方法及辅助措施 其他需要说明的情况 访视麻醉医师签名 访视时间 与麻醉相关的检查结果 三、手术麻醉信息系统…

车载电子电器架构 —— 通信信号数据库开发

车载电子电器架构 —— 信号数据库开发 我是穿拖鞋的汉子,魔都中坚持长期主义的汽车电子工程师。 老规矩,分享一段喜欢的文字,避免自己成为高知识低文化的工程师: 屏蔽力是信息过载时代一个人的特殊竞争力,任何消耗你的人和事,多看一眼都是你的不对。非必要不费力证明自…

JavaScript 入门指南(三)BOM 对象和 DOM 对象

BOM 对象 BOM 简介 BOM&#xff08;browser Object Model&#xff09;即浏览器对象模型BOM 由一系列对象组成&#xff0c;是访问、控制、修改浏览器的属性的方法BOM 没有统一的标准&#xff08;每种客户端都可以自定标准&#xff09;。BOM 的顶层是 window 对象 window 对象 …

预防 MySQL 死锁的策略

1、按顺序访问数据&#xff1a;按照一定的顺序访问数据可以减少死锁的发生。例如&#xff0c;如果多个线程或事务需要更新多个表&#xff0c;可以按照相同的顺序来执行更新操作。这样可以避免循环等待和资源竞争。 2、避免长时间持有锁&#xff1a;尽量缩短事务的执行时间&…

数学建模智能算法

模拟退火算法 %生成初始解&#xff0c;求目标函数f(x)x1^2x2^28在x1^2-x2>0;-x1-x2^220约束下的最小值问题 sol_new21;%&#xff08;1&#xff09;解空间&#xff08;初始解&#xff09; sol_new12-sol_new2^2; sol_current1 sol_new1; sol_best1 sol_new1; so…

TypeScript-自动编译

1.生成文件 tsc --init 2.修改配置文件 说明&#xff1a;通过CTRLF搜索到以下单词&#xff0c;进行修改。 "strict": true, //是否开启严格模式 "outDir": "./outFile", //表示ts文件最终编译为js文件&#xff0c;js文件存放的位置 3.新…

【Linux】调试器-gdb使用(指令调试常用大全,实用性高!!)

目录 关于windows系统的调试 1.前言关于程序bug 2. 调试是什么&#xff1f;有多重要&#xff1f; 2.1调试是什么 2.2 调试的基本步骤 2.3 Debug和Release的介绍 3.linux调试器--gdb的下载 3.1 如何使gcc生成release版本的代码 4.调试指令 4.1 调试开始 4.2指令 l 显示调试代码…

GridLayoutManager 中的一些坑

前言 如果GridLayoutManager使用item的布局都是wrap_cotent 那么会在布局更改时会出现一些出人意料的情况。&#xff08;本文完全不具备可读性和说教性&#xff0c;仅为博主方便查找问题&#xff09; 布局item: <!--layout_item.xml--> <?xml version"1.0&qu…