记录一次险些造成故障的问题
周日凌晨四点多钟,被手机的震动声吵醒,数据中心管理员报告说我们的磁盘空间满了。 在非工作时间打这么多日志,马上联想到是定时任务。
看到管理员发过来的截图里有PriceTax
字样,于是看了一下Git
提交记录,虽然定时任务不是新增的,但处理逻辑有改动过。 简单的说就是读取下发的文件,然后写入数据库。
写成伪代码
public class LineHandler {
private List<LineType> lineList
// file read line by line
public void handleLine(line) {
lineList.add(toType(line))
if (size(lineList)) {
saveLineToDb(lineList);
// there suppose to be a clear
}
if (someConditions) {
// Very Wrong
lineList.clear()
}
}
}
这里为了保证执行的效率,并不是读入一行再插入一行,而是读取后放入集合中,当集合大小足够时,再集中插入数据库,减少IO次数。 可以看到这里犯了一个致命的错误,清空lineList的位置错了,应该在保存进数据库后马上执行清空方法。 也就是说,当读取到第i行时,将执行插入i+1行数据。 悲剧的是,当天的数据量高达2万5千多条,如果这个定时执行完,要插入接近3亿的数据。
为了让数据库资源不被占用这么多,并且防止真的下发三亿条数据到下游系统, 周六晚上发现问题后,德胜把插入的表给删掉了,他期望能够让这个定时任务抛出异常终止执行。
实际上,这个任务还在继续执行,好消息是数据库资源保住了,坏消息是每次插入的异常报错,都会打印出接近5kb的日志信息。
经过几个小时的积累,周六那天已经保存了23Gb的日志文件。
幸亏周日那天,管理员没有报事件,就直接应我们的要求重启了服务。
周一的时候,德胜说,想要做一个能够发送信号让Java的定时任务直接结束的功能。
按照常规思路,我们实现这样一个需求会写出类似下边的代码。
public class DemoQuartzJob extends Job {
public ? execute(JobContext context) {
if (isInterrupt()) {
throw new RuntimeException();
}
}
}
但是现在已经有了接近几十个定时任务在运行,不可能每个任务都增加一个判断。
于是,要先考虑如何让正在执行的定时任务接受到要停止的信号。
好在Quartz有一个InterruptableJob
的接口。 通过实现interrupted
方法,让Quartz的Scheduler可以调用改变定时任务类内部的变量,实现我们想要的效果。
接下来最难解决的问题来了,Java的线程怎么才能在被中断呢?
public class DemoQuartzJob extends Job {
private volatile boolean isContinue;
public ? execute(JobContext context) {
if (isInterrupt()) {
throw new RuntimeException();
}
}
public void interrupt() {
this.isContinue = false;
}
}
我们怎么才能通过isContinue
这个变量让execute方法中断呢,思来想去,这个路是走不通的。
那么我们只能在interrupt
方法里,直接让这个线程终止了。
Java提供了interrupt方法,我们希望调用这个方法终止目标线程,以下代码为例
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (true && !Thread.currentThread().isInterrupted()) {
System.out.println("I am a thread");
}
System.out.println("I am Done");
});
thread.start();
thread.interrupt();
Thread.sleep(5000);
System.out.println(thread.isAlive());
}
}
这里与期望不同的是,我们对这个线程的执行方法做了修改。
这样虽然确实可以中断执行,但是并不能达到我们非侵入性中断执行的目的。
但是如果直接调用Thread.interrupt()并不保证一定会让线程中断执行,如果我们去掉对这个flag的判断
public class Main {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
while (true) {
System.out.println("I am a thread");
}
System.out.println("I am Done");
});
thread.start();
thread.interrupt();
Thread.sleep(5000);
System.out.println(thread.isAlive());
}
}
会看到一直在输出"I am a thread"
那么选择使用线程池,在希望中断执行的时候,把这个线程池直接关闭,用以下代码来测试
public class Main {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
Thread thread = new Thread(() -> {
while (true) {
System.out.println("I am a thread");
}
});
executor.execute(thread);
// 执行两秒的输出
Thread.sleep(2000);
executor.shutdownNow();
Thread.sleep(2000);
}
}
标准输出疯狂打印sout
的内容,线程池虽然关闭了,但是线程池里执行的线程并不会终止。
public class Main {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
Future<?> future = executor.submit(() -> {
while (true) {
System.out.println("I am a thread");
}
});
try {
future.get(1, TimeUnit.SECONDS);
System.out.println("I am finished");
} catch (ExecutionException | TimeoutException e) {
System.out.println(e.getMessage());
future.cancel(false);
}
executor.shutdown();
}
}
所以,根据Java线程的设计理念,如果在线程内部不理会中断信号,那么无法在线程外部中断这个线程。