Motan 学习笔记(三) - 优雅停机

本篇介绍 Motan 框架是如何做到停机感知和优雅停机的。

背景知识

kill 命令

查看 Linux 的 kill 命令用法:

kill 手册

可以清楚的看到,kill 的语义是 给进程发送一个信号,而这个信号默认是 TERM,我们罗列出所有可用的信号:

kill 可用信号列表

发现 kill -15 即默认行为,而 kill -9 是发送 SIGKILL,立即终止进程,而 kill -2 对应前台运行时的 ctrl + c 快捷键。

JDK API

JDK 提供以下方式实现 JVM 进程的优雅关闭:

  1. 调用 java.lang.Runtime.getRuntime().addShutdownHook(Thread hook) 添加 JVM 关闭回调线程,须为 NEW 状态,在 JVM 的 main 函数退出 ,hook 的 run() 逻辑会被回调。

  2. 实现 sun.misc.SignalHandler 接口,程序启动时实例化该实现类,调用其 registerSignal 方法注册指定信号,在进程收到相关信号时,在 handle(Signal signal) 方法中进行相应回调逻辑(比如停止 tomcat 的 main 线程、释放资源等):

class KillHandler implements sun.misc.SignalHandler {...}

KillHandler killHandler = new KillHandler();
killHandler.registerSignal("TERM");

ShutDownHookListener

com.weibo.api.motan.closable 包包含优雅关闭相关功能的类,ShutDownHookListener 是标准的 ServletContextListener 类型的 JavaEE 监听器,当项目部署在 Tomcat 等标准 Servlet 容器中时,可以通过在 web.xml 声明 Listener 或者声明 @WebListener 标注的子类来实现 Servlet 容器关闭回调。

回调代码也很简单:

public void contextDestroyed(ServletContextEvent sce) {
    ShutDownHook.runHook(true);
}

ShutDownHook

ShutDownHook 是 Thread 的子类,runHook() 方法支持“同步、异步”两种调用方式:

public static void runHook(boolean sync) {
    if (instance != null) {
        if (sync) {
            instance.run();
        } else {
            instance.start();
        }
    }
}

ShutDownHookListener 默认使用同步关闭的方式,并没有启动一个额外的线程去关闭资源。比如使用 JVM ShutdownHook 机制的 Tomcat,当调用 /bin/shutdown.sh 或者监听的 Socket 去触发服务器关闭时,org.apache.catalina.core.StandardContext#stopInternal 会被调用,其中所有已注册的 ServletContextListener 的 contextDestroyed() 方法会被调用,Tomcat 的 start 方法会阻塞到 Motan 的 ShutDownHook.run() 执行完。

ClosableObject

// ClosableObject 实现了 Comparable 接口,封装 Closable,以 priority 为比较基准
private static class closableObject implements Comparable<closableObject> {
    Closable closable;
    int priority;

    public closableObject(Closable closable, int priority) {
        this.closable = closable;
        this.priority = priority;
    }

    @Override
    public int compareTo(closableObject o) {
        if (this.priority > o.priority) {
            return -1;
        } else if (this.priority == o.priority) {
            return 0;
        } else {
            return 1;
        }
    }
}

吐槽一嘴,closableObject 居然是小写字母开头。。

run

当容器关闭时,run() 逻辑被执行:

@Override
public void run() {
    closeAll();
}

private synchronized void closeAll() {
    // 对 ClosableObject 排序
    Collections.sort(resourceList);
    
    for (closableObject resource : resourceList) {
        // 依次调用 Closable 的 close 方法
        resource.closable.close();
    }
    
    // 清空回调
    resourceList.clear();
}

registerShutdownHook

需要关心容器关闭事件的对象,可以通过调用 ShutDownHook 的 registerShutdownHook() 静态方法传入 Closable 对象来实现。该方法将 Closable 包装成了 ClosableObject,后放入一个 ArrayList<closableObject> 中持有:

public static void registerShutdownHook(Closable closable) {
    // DEFAULT_PRIORITY 是优先级 priority 的默认值 20,越大优先级越高
    registerShutdownHook(closable, DEFAULT_PRIORITY);
}

public static synchronized void registerShutdownHook(Closable closable, int priority) {
    // ShutDownHook 为懒汉式单例
    if (instance == null) {
        init();
    }
    instance.resourceList.add(new closableObject(closable, priority));
}

使用了这个方法的地方(motan 框架内部使用的回调钩子均为默认优先级):

registerShutdownHook

Demo

对于在集群模式下,如何优雅关闭 Motan 服务,官方文档中有所说明:

MotanSwitcherUtil.setSwitcherValue(MotanConstants.REGISTRY_HEARTBEAT_SWITCHER, false)
public static void setSwitcherValue(String switcherName, boolean value) {
    // LocalSwitcherService
    switcherService.setValue(switcherName, value);
}

LocalSwitcherService

feature.configserver.heartbeat 对应的 SwitcherListener 添加到 MotanSwitcherUtil 持有的 ,当这个 key 对应的开关状态值(boolean)更改时,会回调 listenerMap 持有的该 key 对应监听器集合中的所有 SwitcherListener 的 onValueChanged() 方法:

@Override
public void setValue(String switcherName, boolean value) {
    /*
       将 feature.configserver.heartbeat -> false 放入 switchers(ConcurrentMap 类型)
       由于 key 是 String 类型,所以原有 key 的 value(若有)会被覆盖
     */  
    putSwitcher(new Switcher(switcherName, value));
    
    // 获取 switcher 对应的监听器列表,回调其 onValueChanged 方法
    List<SwitcherListener> listeners = listenerMap.get(switcherName);
    if(listeners != null) {
        for (SwitcherListener listener : listeners) {
            listener.onValueChanged(switcherName, value);
        }
    }
}

监听器被添加到 LocalSwitcherService 的 listenerMap(switcherName -> List<SwitcherListener>)属性的 相应 value 集合中的时机是 Spring IOC 容器事件 ContextRefreshedEvent 发生时(容器已准备好),回调 ApplicationListener 的 onApplicationEvent() 方法。

AbstractRegistry

ServiceConfigBean 正是这种类型的监听器,其为 Motan 暴露服务的配置类,服务(容器)启动时创建 Registry 服务注册发现组件(如 ZookeeperRegistry),构造器中调用继承的 AbstractRegistry 构造器,保存 Registry 地址(registryUrl),并添加心跳开关(默认状态值为 false 未开启)以感知服务可用状态,同时添加匿名 SwitcherListener:

public AbstractRegistry(URL url) {
    this.registryUrl = url.createCopy();
    
    MotanSwitcherUtil.initSwitcher(MotanConstants.REGISTRY_HEARTBEAT_SWITCHER, false);
    MotanSwitcherUtil.registerSwitcherListener(MotanConstants.REGISTRY_HEARTBEAT_SWITCHER, new SwitcherListener() {

        @Override
        public void onValueChanged(String key, Boolean value) {
            if (key != null && value != null) {
                if (value) {
                    available(null);
                } else {
                    unavailable(null);
                }
            }
        }
    });
}

每个 Registry 都有一个 SwitcherListener 对应。

SwitcherListener

上线

当服务启动时,调用 available(),最终调用 doAvailable()

protected void doAvailable(URL url) {
    try {
        // 防止并发修改导致本机调用顺序不一致,分布式环境下的顺序无所谓,因为本身就是不同服务地址,即不同的节点名称
        serverLock.lock();
        if (url == null) {
            // 添加所有本机提供的 Motan 服务(启动时注册到 registeredServiceUrls 中的)到可用服务列表
            availableServices.addAll(getRegisteredServiceUrls());
            // 将本机可用服务全部在当前 SwitcherListener 对应的 Registry 中上线
            for (URL u : getRegisteredServiceUrls()) {
                // 防止旧节点未正常注销
                removeNode(u, ZkNodeType.AVAILABLE_SERVER);
                removeNode(u, ZkNodeType.UNAVAILABLE_SERVER);
                // 创建 server 子节点
                createNode(u, ZkNodeType.AVAILABLE_SERVER);
            }
        } else {
            availableServices.add(url);
            removeNode(url, ZkNodeType.AVAILABLE_SERVER);
            removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
            createNode(url, ZkNodeType.AVAILABLE_SERVER);
        }
    } finally {
        serverLock.unlock();
    }
}

上线

getRegisteredServiceUrls() 是已保存在本机的所有本应用提供的 Motan 服务,此时服务已经 export 暴露给了 ZK,但暂处于 /unavailableServer 节点中,createNode 后才放入 /server

@Override
protected void doRegister(URL url) {
    try {
        serverLock.lock();
        // 防止旧节点未正常注销
        removeNode(url, ZkNodeType.AVAILABLE_SERVER);
        removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
        // 尚未开启心跳检测,zk 节点上的该服务处于不可用状态
        createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
    } finally {
        serverLock.unlock();
    }
}

doRegister()available() 之前,由于本篇不是分析服务暴露过程,所以没有按顺序写。

下线

调用 AbstractRegistry 的 unavailable(null)

protected void doUnavailable(URL url) {
    try {
        serverLock.lock();
        if (url == null) {
            availableServices.removeAll(getRegisteredServiceUrls());
            for (URL u : getRegisteredServiceUrls()) {
                removeNode(u, ZkNodeType.AVAILABLE_SERVER);
                removeNode(u, ZkNodeType.UNAVAILABLE_SERVER);
                createNode(u, ZkNodeType.UNAVAILABLE_SERVER);
            }
        } else {
            availableServices.remove(url);
            removeNode(url, ZkNodeType.AVAILABLE_SERVER);
            removeNode(url, ZkNodeType.UNAVAILABLE_SERVER);
            createNode(url, ZkNodeType.UNAVAILABLE_SERVER);
        }
    } finally {
        serverLock.unlock();
    }
}

正确摘负载

可以看到,调用 unavailable() 后服务仍然没停,只是让调用者感知到服务已下线(通过 ZK 的一致性),从而停止后续针对此服务器的调用,而已经进入线程池的请求还仍然在处理中,代码可能正在某行还未执行完,只要所有 ShutDownHook 都执行完毕,那么 JVM 就退出了,如果部分请求还在处理中,就可能会导致一些问题。

一个做法是像 Dubbo 那样,在 Server 组件生命周期的 close 方法中检测线程池中的线程是否正在运行,如果有,等待所有线程执行完成,若超时则强制关闭,关键代码如下:

// JDK ExecutorService 自带,调用后拒绝接收新任务
try {
    executor.shutdown();
} catch (SecurityException | NullPointerException e) {
    return;
}

// 超时等待当前线程池的任务执行完
try {
    if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
        executor.shutdownNow();
    }
} catch (InterruptedException e) {
    executor.shutdownNow();
    Thread.currentThread().interrupt();
}

// 发生未捕获异常,且线程池还未关闭,
if (!isTerminated(executor)) {
    // 跑一个新线程去关,或者别的实现逻辑
    newThreadToCloseExecutor(executor);
}