Callback 与 Future

Callback 和 Future 是两种实现异步化的通常手段,本篇将介绍两种方式的通常运用。

Callback

背景

不管是在以前学习 Javascript、NodeJS 时,还是最近学习的 Netty 框架中,经常会遇到“回调”模式的应用,比如“钩子”、“监听器”等。比如在 Javascript 的一些 API 中,方法常常要求传入回调函数,且 ES6 也引入了 Promise 语法。在 Netty 的 API 中,所有动作都是异步的,由于网络应用一般瓶颈在 IO,使用异步 IO 充分利用 CPU,提高程序性能,而这个异步化的实现方式就是回调。

应用

Demo

用 Java 语言描述一个 Callback 模式,可能长这样:

UML-0

用代码描述:

public class CallbackTest {

    @Test
    public void testCallback() {
        new MyWorker().work(new MyCallback());
    }
}
interface Worker {
    /**
     * do something,承诺在成功和失败时调用 callback 相应方法
     *
     * @param callback 回调对象
     */
    void work(Callback callback);
}
class MyWorker implements Worker {

    @Override
    public void work(Callback callback) {
        try {
            Thread.sleep(1000);
            // 成功回调
            callback.onSuccess();
        } catch (Exception e) {
            // 失败回调
            callback.onError(e);
        }
    }
}
interface Callback {
    void onSuccess();

    void onError(Throwable e);
}
class MyCallback implements Callback {

    @Override
    public void onSuccess() {
        System.out.println("success");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("error occurred:" + e.getCause().getLocalizedMessage());
    }
}

结果:
结果1

Netty

在使用 Netty 处理 Websocket 协议连接时,通常会实现一个 Handler,这个 Handler 会被添加到 SocketChannel 的处理链上,在发生不同事件时,对应的方法会被回调。

public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    /**
     * 每次接收到一个 WebSocket 帧都会回调此方法,打印收到的消息并返回服务器当前时间。
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        System.out.println("服务器收到消息:" + msg.text());
        ctx.writeAndFlush(new TextWebSocketFrame("当前服务器时间:" + LocalDateTime.now()));
    }
}

NodeJS

NodeJS 也是个异步化的框架,设计思想很类似,比如 AsyncHook 钩子的应用中,我需要创建一个在 object 构造过程被回调的钩子。

const async_hooks = require('async_hooks');

function init(asyncId, type, triggerAsyncId, resource) {...}

const asyncHook = async_hooks.createHook({init});

Future

背景

Future 思想是将结果封装,值在某个时间点会变得 available。

应用

JDK

在 JDK 1.5 实现的 ExecutorService 中,允许提交 Callable、Runnable 类型的任务对象给线程池,返回一个 Future 对象,方法可以调用 Future 相应的 API 以不同方式(同步、异步)获取结果。

class FutureTest {

    @Test
    public void futureTest() {
        MyTask t1 = new MyTask(new Integer[]{1, 5, -10});
        MyTask t2 = new MyTask(new Integer[]{43, 2, 73, 8, 2});
        MyTask t3 = new MyTask(new Integer[]{-2, 11, 23, -9});
        MyTask t4 = new MyTask(new Integer[]{100, 3});
        System.out.println("任务已创建");

        ExecutorService pool = Executors.newFixedThreadPool(4);
        List<Future<Integer>> futures = null;
        try {
            futures = pool.invokeAll(Arrays.asList(t1, t2, t3, t4));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("任务已提交");

        /*
           假设计算时间很长,这里可以进行其它操作,从而实现异步优化性能,
           这期间可以通过 API 检查任务是否跑完,比如:boolean done = futures.get(1).isDone();
         */

        // 输出所有结果
        Assert.assertNotNull(futures);
        for (Future<Integer> future : futures) {
            try {
                System.out.println(future.get());
            } catch (Exception ignored) {
            }
        }

        pool.shutdown();
    }
}
/**
 * 简单的累加计算
 */
class MyTask implements Callable<Integer> {
    private Integer[] counts;

    public MyTask(Integer[] counts) {
        this.counts = counts;
    }

    @Override
    public Integer call() throws Exception {
        if (Arrays.stream(counts).count() == 0L) {
            throw new Exception("no counts to be count.");
        }
        Integer sum = 0;
        for (int i = 0; i < counts.length; i++) {
            sum += counts[i];
        }
        return sum;
    }
}

结果:
结果2

Netty

实际上,Netty 也有相应实现。

...
// bootstrap 为 ServerBootstrap 类型,启动后绑定端口并返回一个 io.netty.util.concurrent.Future 对象
ChannelFuture future = bootstrap.bind(8080);

// 等待 future 完成
future.sync();

实际上,Netty 所建议的,是多 listen 少 wait 的好莱坞法则,即完全的异步化,所以,也可以用下面这种方式:

ChannelFuture future = bootstrap.bind(8080);
future.addListener(future -> {
    System.out.println("端口绑定" + (future.isSuccess() ? "成功" : "失败"));
});

从这里也可以看出,Listener(或者说回调)的方式更优雅吧,毕竟 Future 还得编码去检查结果。