Spark大数据平台调度任务的优化
在大数据平台生产环境上,遇到一个头疼的问题,每天都要定时运行一个任务。刚开始数据量小和简单,用cron来定时调用可以满足需求。
后来数据量大,出现昨天的任务没跑完,今天的任务又要开始了,在大数据平台上运行的任务越来越多,大数据平台资源被占满了。
考虑进行下面的优化:
1、某个任务如果运行超过30小时时,进行中断。
2、采用DelayQueue来实现延时队列,等前面的任务执行完或被中断,又到它的开始时间时才进行执行。
延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理。

入门例子
DelayQueue 非常适合指定时间之后,才能让消费者获取到的场景。
private static class DelayElem implements Delayed {
/**
* 延迟时间
*/
private final long delay;
/**
* 到期时间
*/
private final long expire;
/**
* 数据
*/
private final String msg;
private DelayElem(long delay, String msg) {
this.delay = delay;
this.msg = msg;
//到期时间 = 当前时间+延迟时间
this.expire = System.currentTimeMillis() + this.delay;
}
/**
* 需要实现的接口,获得延迟时间
*
* 用过期时间-当前时间
* @param unit 时间单位
* @return 延迟时间
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
}
/**
* 用于延迟队列内部比较排序
* <p>
* 当前时间的延迟时间 - 比较对象的延迟时间
*
* @param o 比较对象
* @return 结果
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public String toString() {
return "DelayElem{" +
"delay=" + delay +
", expire=" + expire +
", msg='" + msg + '\'' +
'}';
}
}
private static class WriteThread extends Thread {
private final DelayQueue<DelayElem> delayQueue;
private WriteThread(DelayQueue<DelayElem> delayQueue) {
this.delayQueue = delayQueue;
}
@Override
public void run() {
for(int i = 0; i < 3; i++) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
DelayElem element = new DelayElem(1000,i+"test");
delayQueue.offer(element);
System.out.println(System.currentTimeMillis() + " 放入元素 " + i);
}
}
}
private static class ReadThread extends Thread {
private final DelayQueue<DelayElem> delayQueue;
private ReadThread(DelayQueue<DelayElem> delayQueue) {
this.delayQueue = delayQueue;
}
@Override
public void run() {
while (true){
try {
DelayElem element = delayQueue.take();
System.out.println(System.currentTimeMillis() +" 获取元素:" + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayElem> delayQueue = new DelayQueue<>();
new WriteThread(delayQueue).start();
new ReadThread(delayQueue).start();
}