定时任务详解


详解Java Quartz Job Scheduling

​ Quartz Job Scheduling是基于Java实现的成熟的企业级作业调度组件。笔者最近所做的项目正好用到了Quartz来实现定时任务的调度,在使用过程中对Quartz不甚了解,于是趁此闲暇机会,学习了Quartz官方教程和《Quartz Job Scheduling Framework》。并形成此篇详解Java Quartz Job Scheduling。

一、 Hello Quartz

本节通过一个Hello Quartz的示例,来介绍Quartz中的核心概念。这个示例先打印”Hello Quartz!”,再以10秒频率打印当前系统时间:

package com.quartz.learning;

import org.quartz.*;

/**
 * HelloJob是一个简单的job,用于打印指定内容
 *
 * Created by zhuyiquan90 on 2018/8/18.
 */
public class HelloJob implements Job{

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        // JobDetail
        JobDetail jobDetail = jobExecutionContext.getJobDetail();
        // JobDataMap
        JobDataMap dataMap = jobDetail.getJobDataMap();
        String content = dataMap.getString("CONTENT");
        System.out.println(content);
    }
}
package com.quartz.learning;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.impl.StdSchedulerFactory;

import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;

/**
 * HelloQuartz是一个简单的Quartz调度器
 * <p>
 * Created by zhuyiquan90 on 2018/8/18.
 */
public class HelloQuartz {

    private static Log logger = LogFactory.getLog(HelloQuartz.class);

    public static void main(String[] args) {

        try {
            // 从Scheduler工厂获取一个Scheduler的实例
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();

            scheduler.start();
            /**
             * 重用HelloJob,实现不同实例
             */
            // 注册jobDetail1,打印"Hello Quartz!",第5秒钟执行一次
            JobDetail jobDetail1 = newJob(HelloJob.class).withIdentity("job1", "group").build();
            jobDetail1.getJobDataMap().put("CONTENT", "Hello Quartz!");
            Trigger trigger1 = newTrigger().withIdentity("trigger1", "group").startNow()
                    .withSchedule(simpleSchedule().withIntervalInSeconds(5).withRepeatCount(0)).build();
            scheduler.scheduleJob(jobDetail1, trigger1);

            // 注册jobDetail2,打印当前系统时间,每10秒钟执行一次
            JobDetail jobDetail2 = newJob(HelloJob.class).withIdentity("job2", "group").build();
            jobDetail2.getJobDataMap().put("CONTENT", String.valueOf(System.currentTimeMillis()));
            Trigger trigger2 = newTrigger().withIdentity("trigger2", "group").startNow()
                    .withSchedule(simpleSchedule().withIntervalInSeconds(10).repeatForever()).build();
            scheduler.scheduleJob(jobDetail2, trigger2);

        } catch (SchedulerException e) {
            logger.error(e);
        }
    }
}

输出结果如下:

19:11:08.473 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job group.job1
19:11:08.473 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
Hello Quartz!
19:11:08.473 [DefaultQuartzScheduler_Worker-2] DEBUG org.quartz.core.JobRunShell - Calling execute on job group.job2
1534590668464
19:11:18.468 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'group.job2', class=com.quartz.learning.HelloJob
19:11:18.469 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers
19:11:18.469 [DefaultQuartzScheduler_Worker-3] DEBUG org.quartz.core.JobRunShell - Calling execute on job group.job2
1534590668464
19:11:28.468 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'group.job2', class=com.quartz.learning.HelloJob
19:11:28.469 [DefaultQuartzScheduler_Worker-4] DEBUG org.quartz.core.JobRunShell - Calling execute on job group.job2
1534590668464

Scheduler(调度器)是Quartz框架的心脏。Scheduler的生命周期始于通过SchedulerFactory工厂类创建实例,终于调用shutdown() 方法。Scheduler不仅可以用于新增、移除、列举Jobs和Triggers,还可以执行调度相关操作,比如暂停Trigger、恢复Trigger等。需要注意的是,直到调用start()方法时,Scheduler才正式开始执行job和trigger。
Job(作业)是指执行一些作业的特定的Java类。Job必须实现 org.quartz.Job接口,这个接口要求在Job中实现execute()方法。当 Quartz 调用 execute() 方法,会传递一个 JobExecutionContext 上下文变量,里面封装有 Quartz 的运行时环境和当前正执行的 Job。JobExecutionContext可以被用来访问 JobDetail 类,JobDetail 类持有Job的详细信息,包括为Job实例指定的名称,Job 所属组,Job 是否被持久化(易失性)。JobDetail又持有一个指向JobDataMap的引用。JobDataMap中包含Job配置的自定义属性。
Trigger(触发器)用于触发Job的执行。最常用的类型包括 SimpleTrigger和CronTrigger。

下面针对Quartz的核心框架展开详述。

二、Quartz框架核心

2.1 Scheduler

​ 客户端与Scheduler交互是通过org.quartz.Scheduler接口的。这个 Scheduler 的实现,在这种情况下,是一个代理,对其中方法调用会传递到QuartzScheduler实例上。QuartzScheduler对于客户端是不可见的,并且也不存在与此实例的直接交互。QuartzScheduler处在框架根的位置,它是一个引擎驱动着整个框架。
​ Scheduler主要包括StdScheduler(Quartz默认的Scheduler)和RemoteScheduler(带有RMI功能的Scheduler)。
​ Quartz提供了org.quartz.SchedulerFactory接口来创建Scheduler实例。SchedulerFactory包括两种类型org.quartz.impl.DirectoSchedulerFactoryorg.quartz.impl.StdSchedulerFactory

2.1.1 使用DirectSchedulerFactory创建实例

​ DirectSchedulerFactory通过编程式的方式来创建Scheduler实例。一般包括三个基本步骤。首先,你必须用静态方法 getInstance() 获取到工厂的实例。当你持有了工厂的实例之后,你必须调用其中的一个 createXXX 方法去初始化它。第三步也就是最后一步是通过工厂的 getScheduler() 方法拿到 Scheduler 的实例。

​ 代码实例如下:

DirectSchedulerFactory factory = DirectSchedulerFactory.getInstance();
try {
    // Initialize the Scheduler Factory with 10 threads
    factory.createVolatileScheduler(10);

    // Get a scheduler from the factory
    Scheduler scheduler = factory.getScheduler();

    // Start the scheduler running
    logger.info("Scheduler starting up...");
    scheduler.start();

    // Do something
    ...
 } catch (SchedulerException e) {
    logger.error(e);
}

采用编程式来创建实例你需要硬编码所有的scheduler配置,这无疑是很有挑战性的。所以请慎用DirectoSchedulerFactory

2.1.2 使用StdSchedulerFactory创建实例

StdSchedulerFactory通过声明式的方式来创建Scheduler实例。它依赖于一系列的属性配置。比如

Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();

就是以默认的配置文件quartz.properties来实例化scheduler。一个简单的quartz.properties配置如下所示。Quartz配置参考详见第七节。

#===============================================================     
#Configure Main Scheduler Properties     
#===============================================================      
org.quartz.scheduler.instanceName = QuartzScheduler      
org.quartz.scheduler.instanceId = AUTO     

#===============================================================     
#Configure ThreadPool     
#===============================================================      
org.quartz.threadPool.threadCount =  5      
org.quartz.threadPool.threadPriority = 5      
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool     

#===============================================================     
#Configure JobStore     
#===============================================================      
org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore     

#===============================================================     
#Configure Plugins     
#===============================================================      
org.quartz.plugin.jobInitializer.class =      
org.quartz.plugins.xml.JobInitializationPlugin      

org.quartz.plugin.jobInitializer.overWriteExistingJobs = true     
org.quartz.plugin.jobInitializer.failOnFileNotFound = true     
org.quartz.plugin.jobInitializer.validating=false  

2.1.3 管理 Scheduler实例

启动Scheduler

启动Scheduler通过start()

//Create an instance of the Scheduler  
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();  

//Start the scheduler  
scheduler.start();  
临时暂停Scheduler

临时暂停调度的方式分为standBy()pauseAll()两种。两者的区别如下。

standby():

Temporarily halts the Scheduler’s firing of Triggers.
void standby() throws SchedulerException Temporarily halts the Scheduler’s firing of Triggers.

When start() is called (to bring the scheduler out of stand-by mode),
trigger misfire instructions will NOT be applied during the execution
of the start() method - any misfires will be detected immediately
afterward (by the JobStore’s normal process).

The scheduler is not destroyed, and can be re-started at any time.

pauseAll() :

void pauseAll() throws SchedulerException Pause all triggers - similar to calling
pauseTriggerGroup(group) on every group, however, after using this
method resumeAll() must be called to clear the scheduler’s state of
‘remembering’ that all new triggers will be paused as they are added.

When resumeAll() is called (to un-pause), trigger misfire instructions WILL be applied.

说明standby()需要再次调用start()恢复调度,trigger misfire(触发未执行调度)策略在start()执行过程中将不会直接执行;pauseAll()需要调用resumeAll()恢复调度,所有trigger misfire策略将被立即执行。

终止Scheduler

终止调度的方式是shutdown()

public void shutdown(boolean waitForJobsToComplete)  throws SchedulerException;  

public void shutdown() throws SchedulerException;  

​ 上面那两个方法唯一不同之处是其中一个方法可接受一个 boolean 型参数,表示是否让当前正在进行的 Job 正常执行完成才停止 Scheduler。无参的 shutdown() 方法相当于调用 shutdown(false)。
​ 以上,start(),standBy(),pauseAll(),shutdown()等原子操作都是 QuartzScheduler完成的,后续会有专门文章对QuartzScheduler(Quartz框架的根本)进行源码剖析。

2.2 Job

​ 如第一节所述,HelloJob引用了 org.quartz.Job 接口,并实现了execute() 方法。Scheduler决定运行HelloJob的时机,JobExecutionContext用于记录Job上下文,execute()执行异常抛出JobExecutionException。
Job如何注册到Scheduler中?如何定义Job实例的属性和配置?执行过程中如何追踪Job的状态?下面还有一些我们必须了解的关于Job的特征。

2.2.1 JobDetail

​ 从第一节的例子,我们可以看到不是直接把Job对象注册到Scheduler,实际注册的是一个JobDetail实例。这样做的好处是,针对某一类的Job,仅需要构造一个Job class,比如文件操作类Job,通过创建多个JobDetail实例来完成不同的调度任务。
JobDetail实例通过org.quartz.JobBuilder构造。我们将JobDetail注册到Scheduler,通过newJob(),Scheduler知道所要执行的具体Job。每一次Scheduler执行,在调用execute()方法前,会创建一个新的实例。当执行完毕,相关的job实例会被丢弃,对应的堆内存会被回收。换句话说,Job是无状态的(在最新的版本中,StatefulJob已经废弃)。因此我们需要使用JobDataMap来传递数据。

2.2.2 JobDataMap

我们能使用 org.quartz.JobDataMap 来定义 Job 的状态。JobDataMap 是JobDetail的一部分。可以向 JobDataMap 中存入键/值对,那些数据对可在你的 Job 类中传递和进行访问。
Trigger中也可以使用JobDataMap,这在一个Job应用于多个Triggers的场景下非常使用于参数传递。最终JobExecutionContext上下文中传递的JobDataMap是JobDetail和Trigger的并集,通过getMergedJobDataMap()获取。
注意,对同一key,如果在JobDetail和Trigger中都有使用,后来者会覆盖先来者。如下面的例子:

public class HelloJob implements Job{

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        JobDataMap dataMap = jobExecutionContext.getMergedJobDataMap();;
        String content = dataMap.getString("CONTENT");
        System.out.println(content);
    }
}
public class HelloQuartz {

    private static Log logger = LogFactory.getLog(HelloQuartz.class);

    public static void main(String[] args) {
        DirectSchedulerFactory factory = DirectSchedulerFactory.getInstance();
        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
            scheduler.start();

            JobDetail jobDetail = newJob(HelloJob.class).withIdentity("job1", "group").build();
            jobDetail.getJobDataMap().put("CONTENT", "Hello JobDetal!");
            Trigger trigger = newTrigger().withIdentity("trigger1", "group").startNow()
                    .withSchedule(simpleSchedule().withIntervalInSeconds(5).withRepeatCount(0)).build();
            trigger.getJobDataMap().put("CONTENT", "Hello Trigger!");
            scheduler.scheduleJob(jobDetail, trigger);

        } catch (SchedulerException e) {
            logger.error(e);
        }
    }
}

输出:

16:07:04.646 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'group.job1', class=com.quartz.learning.HelloJob
16:07:04.652 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers
16:07:04.653 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job group.job1
Hello Trigger!

最后,Job的持久化是通过JobStore实现的,我们将在第三节详细介绍。

2.3 Trigger

​ 主要使用的Quartz Trigger是org.quartz.SimpleTrigger和org.quartz.CronTrigger。

2.3.1 使用SimpleTrigger部署Job

​ SimpleTrigger 对于设置和使用是最为简单的一种 Quartz Trigger。它是为那种需要在特定的日期/时间启动,且以一个可能的间隔时间重复执行 n 次的 Job 所设计的。

2.3.1.1 代码示例

构建一个指定时刻执行的触发器:

SimpleTrigger trigger = (SimpleTrigger) newTrigger()
    .withIdentity("trigger1", "group1")
    .startAt(myStartTime) // some Date
    .forJob("job1", "group1") // identify job with name, group strings
    .build();

构建一个指定时刻以10秒为频率执行10次的触发器:

trigger = newTrigger()
    .withIdentity("trigger3", "group1")
    .startAt(myTimeToStartFiring)  // if a start time is not given (if this line were omitted), "now" is implied
    .withSchedule(simpleSchedule()
        .withIntervalInSeconds(10)
        .withRepeatCount(10)) // note that 10 repeats will give a total of 11 firings
    .forJob(myJob) // identify job with handle to its JobDetail itself                   
    .build();

构建一个在5分钟后执行一次的触发器:

trigger = (SimpleTrigger) newTrigger()
    .withIdentity("trigger5", "group1")
    .startAt(futureDate(5, IntervalUnit.MINUTE)) // use DateBuilder to create a date in the future
    .forJob(myJobKey) // identify job with its JobKey
    .build();

构建一个现在执行,每5分钟执行一次,22点结束的触发器:

trigger = newTrigger()
   .withIdentity("trigger7", "group1")
   .withSchedule(simpleSchedule()
       .withIntervalInMinutes(5)
       .repeatForever())
   .endAt(dateOf(22, 0, 0))
   .build();

构建一个下一个整点执行,每2小时执行一次的永久触发器:

trigger = newTrigger()
   .withIdentity("trigger8") // because group is not specified, "trigger8" will be in the default group
   .startAt(evenHourDate(null)) // get the next even-hour (minutes and seconds zero ("00:00"))
   .withSchedule(simpleSchedule()
       .withIntervalInHours(2)
       .repeatForever())
   // note that in this example, 'forJob(..)' is not called
   //  - which is valid if the trigger is passed to the scheduler along with the job  
   .build();

   scheduler.scheduleJob(trigger, job);
2.3.1.2 trigger misfire策略

misfire是指触发器错过触发时间(firing time),scheduler被关闭或者Quartz线程池中没有可以完成工作的工作线程都会导致trigger misfire。不同类型的trigger针对misfire情况的处理策略也不同,SimpleTrigger的misfire策略包括:

// 立即执行
public static final int MISFIRE_INSTRUCTION_FIRE_NOW = 1;   
// 立即执行,并累计到已经执行次数,如果结束时间已经过了,则不会再执行。
public static final int MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT = 2;
// 立即执行,并累计到未执行次数,如果结束时间已经过了,则不会再执行。
public static final int MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_REMAINING_REPEAT_COUNT = 3;
// 告诉Quartz在下一次执行时间再次开始执行,并累计到未执行次数
public static final int MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT = 4;
// 告诉Quartz在下一次执行时间再次开始执行,并累计到已经执行次数
public static final int MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_EXISTING_COUNT = 5;

SimpleTrigger的misfire默认策略是Trigger.MISFIRE_INSTRUCTION_SMART_POLICY。这种策略会根据不同情况选择不同处理,源码如下所示:

if (instr == Trigger.MISFIRE_INSTRUCTION_SMART_POLICY) {
            if (getRepeatCount() == 0) {
                instr = MISFIRE_INSTRUCTION_FIRE_NOW;
            } else if (getRepeatCount() == REPEAT_INDEFINITELY) {
                instr = MISFIRE_INSTRUCTION_RESCHEDULE_NEXT_WITH_REMAINING_COUNT;
            } else {
                // if (getRepeatCount() > 0)
                instr = MISFIRE_INSTRUCTION_RESCHEDULE_NOW_WITH_EXISTING_REPEAT_COUNT;
            }
        }

2.3.2 使用CronTrigger部署Job

CronTrigger 是基于 Unix 类似于 cron 的表达式,允许设定非常复杂的触发时间表。Cron表达式由七个子表达式组成的字符串,它描述了不同的调度细节。这些子表达式是用空格分隔的,并表示:秒、分钟、小时、天、月、星期几、年(可选项)。

2.3.2.1 Cron表达式

Quartz Cron 表达式支持七个域:

名称 是否必须 允许值 允许特殊字符
0-59 , - * /
0-59 , - * /
0-23 , - * /
1月31日 , - * ? / L W C
1-12 或 JAN-DEC , - * /
1-7 或 SUN-SAT , - * ? / L C #
空 或 1970-2099 , - * /

其中

  • “,” :表示指定多个值
  • “-”:表示指定一个范围的值
  • “*”:表示整个时间段
  • “/”:表示指定一个值的增加幅度。n/m表示从n开始,每次增加m
  • “?”:表示不确定的值
  • “L”:用在日表示一个月中的最后一天,用在周表示该月最后一个星期X
  • “W”:指定离给定日期最近的工作日(周一到周五)。LW:这两个字符可以连用,表示在某个月最后一个工作日,即最后一个星期五。
  • “#”:用于指定月份中的第几周的哪一天。例如,如果你指定周域的值为 6#3,它意思是某月的第三个周五 (6=星期五,#3意味着月份中的第三周)。

每个17点开始,每隔5分钟过10秒执行。比如17:00:10 pm,17:05:10 pm

10 0/5 17 * * ?

每个周三和周五,10:30,11:30,12:30,13:30执行

0 30 10-13 ? * WED,FRI

每个月最后一个星期一,12:00:00执行

0 0 12 ? * 2L

2.3.2.2 代码示例

代码示例如下所示:

Trigger trigger = newTrigger().withIdentity("trigger1","group").startNow().withSchedul(CronScheduleBuilder.cronSchedule("0 0/2 8-17 * * ?")).build();

scheduler.scheduleJob(jobDetail, trigger); 
2.3.2.3 trigger misfire策略

下面是CronTrigger处理misfire的策略。

// 以错过的第一个频率时间立刻开始执行,重做错过的所有频率周期后
// 当下一次触发频率发生时间大于当前时间后,再按照正常的Cron频率依次执行
public static final int MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY = -1;
// 不触发立即执行,等待下次Cron触发频率到达时刻开始按照Cron频率依次执行
public static final int MISFIRE_INSTRUCTION_DO_NOTHING = 2; 
// 以当前时间为触发频率立刻触发一次执行,然后按照Cron频率依次执行    
public static final int MISFIRE_INSTRUCTION_FIRE_ONCE_NOW = 1;                  

同样,CronTrigger默认也采用Trigger.MISFIRE_INSTRUCTION_SMART_POLICY策略:

if (instr == MISFIRE_INSTRUCTION_SMART_POLICY) {//instr  == 0
            instr = MISFIRE_INSTRUCTION_FIRE_ONCE_NOW;//instr = 1
}
// 指定CronTrigger使用立即执行的misfire策略
Trigger trigger = newTrigger().withIdentity("trigger1", "group").startNow()
                    .withSchedule(CronScheduleBuilder.cronSchedule("0 0/2 8-17 * * ?").withMisfireHandlingInstructionFireAndProceed()).build();

2.3.3 使用Quartz Calendar排除指定日期

Quartz 的 Calender 专门用于屏闭一个时间区间,使 Trigger 在这个区间中不被触发。Quartz包括了多种类型的Calender:

Calender 名称 用法
BaseCalender org.quartz.impl.calendar.BaseCalender 为高级的 Calender 实现了基本的功能,实现了 org.quartz.Calender 接口
DailyCalendar org.quartz.impl.calendar.DailyCalendar 您可以使用此日历来排除营业时间(上午8点 - 5点)每天。 每个DailyCalendar仅允许指定单个时间范围,并且该时间范围可能不会跨越每日边界(即,您不能指定从上午8点至凌晨5点的时间范围)。 如果属性invertTimeRange为false(默认),则时间范围定义触发器不允许触发的时间范围。 如果invertTimeRange为true,则时间范围被反转 - 也就是排除在定义的时间范围之外的所有时间
WeeklyCalendar org.quartz.impl.calendar.WeeklyCalendar 排除星期中的一天或多天,例如,可用于排除周末
MonthlyCalendar org.quartz.impl.calendar.MonthlyCalendar 排除月份中的数天,例如,可用于排除每月的最后一天
AnnualCalendar org.quartz.impl.calendar.AnnualCalendar 排除年中一天或多天
HolidayCalendar org.quartz.impl.calendar.HolidayCalendar 特别的用于从 Trigger 中排除节假日
CronCalendar org.quartz.impl.calendar.CronCalendar 日历的这种实现排除了由给定的CronExpression表达的时间集合。 例如,您可以使用此日历使用表达式“* * 0-7,18-23?* *”每天排除所有营业时间(上午8点至下午5点)。 如果CronTrigger具有给定的cron表达式并且与具有相同表达式的CronCalendar相关联,则日历将排除触发器包含的所有时间,并且它们将彼此抵消

下面看一个简单实例:

// Add the holiday calendar to the schedule
AnnualCalendar holidays = new AnnualCalendar();

// fourth of July (July 4) Independence Day Of USA
Calendar fourthOfJuly = new GregorianCalendar(2018, 6, 4);
holidays.setDayExcluded(fourthOfJuly, true);
// halloween (Oct 31)
Calendar halloween = new GregorianCalendar(2018, 9, 31);
holidays.setDayExcluded(halloween, true);
// christmas (Dec 25)
Calendar christmas = new GregorianCalendar(2018, 11, 25);
holidays.setDayExcluded(christmas, true);

// tell the schedule about our holiday calendar
sched.addCalendar("holidays", holidays, false, false);

// schedule a job to run hourly, starting on halloween
// at 10 am
Date runDate = dateOf(0, 0, 10, 31, 10);
SimpleTrigger trigger = newTrigger()
                .withIdentity("trigger1", "group1")
                .startAt(runDate)
                .withSchedule(
                        simpleSchedule().withIntervalInHours(1).repeatForever())
                .modifiedByCalendar("holidays").build();

三、存储和持久化

​ Quartz通过JobStore提供一种数据存储机制。JobStore有多种类型:基于内存的RAMJobStore(非持久化),、基于数据库的JDBCJobStore(持久化)、基于分布式缓存的TerracottaJobStore(持久化)。我们在定义SchedulerFactory时,在配置文件中要声明使用的JobStore类型来生成shceduler实例。千万不要直接在代码中定义JobStore实例,正确的做法是在幕后(通过配置的方式)来使用JobStore。

3.1 RAMJobStore

​ org.quartz.simple.RAMJobStore是默认的Quartz的JobStore方式。SchedulerFactory初始化时没有指定JobStore,则默认使用RAMJobStore。同时,RAMJobStore基于内存,也是数据访问性能最佳的JobStore。当然,缺点也是显而易见的,因为不可持久化,所以RAMJobStore中的数据时易失的。

org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore

​ RAMJobStore通过两种方式来加载数据。一种是直接硬编码这些数据到你的代码中,比如上文示例代码中的Job信息、Tigger信息等,一旦有任何变化,代码都需要重新编译。另一种是基于插件 JobInitializationPlugin,这种方式的优点是,当有改变时只需要对这个 XML 文件作改动,不用改代码,不用重编译。

3.2 JDBCJobStore

JDBCJobStore通过关系型数据库存储相关数据。其优点是数据持久化,同时为分布式Quartz提供了可能性。JDBCJobStore支持大部分关系型数据库包括: Oracle, PostgreSQL, MySQL, MS SQLServer, HSQLDB,DB2等。首先,要创建一系列用于存储数据的表:

表名 描述
QRTZ_CALENDARS 以 Blob 类型存储 Quartz 的 Calendar 信息
QRTZ_CRON_TRIGGERS 存储 Cron Trigger,包括 Cron 表达式和时区信息
QRTZ_FIRED_TRIGGERS 存储与已触发的 Trigger 相关的状态信息,以及相联 Job 的执行信息
QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的 Trigger 组的信息
QRTZ_SCHEDULER_STATE 存储少量的有关 Scheduler 的状态信息,和别的 Scheduler 实例(假如是用于一个集群中)
QRTZ_LOCKS 存储程序的非观锁的信息(假如使用了悲观锁)
QRTZ_JOB_DETAILS 存储每一个已配置的 Job 的详细信息
QRTZ_JOB_LISTENERS 存储有关已配置的 JobListener 的信息
QRTZ_SIMPLE_TRIGGERS 存储简单的 Trigger,包括重复次数,间隔,以及已触的次数
QRTZ_BLOG_TRIGGERS Trigger 作为 Blob 类型存储(用于 Quartz 用户用 JDBC 创建他们自己定制的 Trigger 类型,JobStore 并不知道如何存储实例的时候)
QRTZ_TRIGGER_LISTENERS 存储已配置的 TriggerListener 的信息
QRTZ_TRIGGERS 存储已配置的 Trigger 的信息

​ 有关QRTZ表的具体用法将在第六章分布式集群实现中详述。

3.2.1 配置JobStoreTX

​ 表创建完成后,需要决定应用需要的事务类型。Quartz来管理事务的话,使用JobStoreTX;如果希望应用容器来管理事务(比如JBoss或者Tomcat),可以使用JobStoreCMT。这里选择使用JobStoreTX,继续展开。

org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

3.2.2 配置驱动代理

​ Quartz 指定 DriverDelegate 来与给定数据库进行通信。顾名思义,从 Scheduler 通过 JobStore 对数据库的调用是委托给一个预配置的 DriverDelegate 实例。这个代理承担起所有与 JDBC driver 的通信。所有的 DriverDelegate 类都继承自 org.quartz.impl.jdbcjobstore.StdDriverDelegate 类。Mysql可以直接使用StdDriverDelegate。

org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdDriverDelegate

3.2.3 配置数据源

数据源配置可以指定为应用服务器配置的默认数据源,也可以单独配置,比如:

# 指定数据源
org.quartz.jobStore.dataSource = myDS
# JDBC 驱动类的全限名
org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver  
# 连接到你的数据库的 URL(主机,端口等)
org.quartz.dataSource.myDS.URL = jdbc:jtds:sqlserver://localhost:1433/quartz  
# 用于连接你的数据库的用户名
org.quartz.dataSource.myDS.user = admin  
# 用于连接你的数据库的密码
org.quartz.dataSource.myDS.password = myPassword  
# DataSource 在连接接中创建的最大连接数
org.quartz.dataSource.myDS.maxConnections = 10 
# 一个可选的 SQL 查询字串,DataSource 用它来侦测并替换失败/断开的连接
org.quartz.dataSource.NAME.validationQuary= SELECT 1 

3.2.4 配置数据库表的前缀

​ 表前缀的目的是在某些情况下,你也许需要创建多套的 Quartz 数据库表。在这时候,你就需要改变每一套表的前缀。比如JOB_QRTZ_,则表名是JOB_QRTZ_CALENDARS。

org.quartz.jobStore.tablePrefix = JOB_QRTZ_

3.2.5 其他配置

可用于设置 JobStoreTX 的其他配置属性:

属性 默认值 描述
org.quartz.jobStore.userProperties FALSE 设置为true时表示JobDataMap中的value存放的类型必须是String类型,这样保证不用让更复杂的对象以序列化的形式存入 BLOB 列中。一方面提高性能,另一方面避免序列化问题
org.quartz.jobStore.misfireThreshold 60000 Scheduler对Trigger-misfire的忍耐时间,超过阈值则认为是misfire
org.quartz.jobStore.isClustered FALSE 设置为 true 打开集群特性。分布式Quartz场景下,这个属性就必须设置为 true
org.quartz.jobStore.clusterCheckinInterval 15000 定义了Scheduler 实例检入到数据库中的频率(单位:毫秒).Scheduler 检查是否其他的实例到了它们应当检入的时候未检入;这能指出一个失败的 Scheduler 实例,且当前 Scheduler 会以此来接管任何执行失败并可恢复的 Job
org.quartz.jobStore.maxMisfiresToHandleAtATime 20 JobStore 能处理的错过触发的 Trigger 的最大数量。处理太多很快会导致数据库表被锁定过长的时间,这样就妨碍了触发别的(还未错过触发) trigger 执行的性能
org.quartz.jobStore.dontSetAutoCommitFalse FALSE false表示调用setAutoCommit(false),true表示不调用setAutoCommit(false)
org.quartz.jobStore.selectWithLockSQL SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE 排他锁操作,必须是一个排他锁语句。{0} 会在运行期间被前面你配置的 TABLE_PREFIX 所替换
org.quartz.jobStore.selectWithLockSQL SELECT * FROM {0}LOCKS WHERE LOCK_NAME = ? FOR UPDATE 行级排他锁,必须是一个排他锁语句。{0} 会在运行期间被前面你配置的 TABLE_PREFIX 所替换
org.quartz.jobStore.txIsolationLevelSerializable FALSE 值为 true 时告知 Quartz调用JDBC连接的 setTransactionIsolation(Connection.TRANSACTION_SERIALIZABLE) 方法

3.3 TerracottaJobStore

​ TerracottaJobStore支持在分布式内存缓存服务器Terracotta中存储数据。其优势在于性能介于RAMJobStore和JDBCJobStore之间,同时提供了诸如负载均衡和故障切换的集群功能,保证了分布式Quartz的高可用。

​ 启用TerracottaJobStore的简单配置如下:

# 指定TerracottaJobStore
org.quartz.jobStore.class = org.terracotta.quartz.TerracottaJobStore
# 主机和端口标识要连接的Terracotta服务器的位置
org.quartz.jobStore.tcConfigUrl = localhost:9510

​ 具体使用可以参考http://www.terracotta.org/quartz 这里不做展开。

3.4 自建JobStore

​ 可以自己根据需要创建新的JobStore,必须实现org.quartz.spi.JobStore接口。JobStore 接口有 40 个方法,它要求任何 JobStore 实现都必须实现这些方法,你的也一样。你如何实现那些方法完全依赖于你正构建的 JobStore 的类型。那不是说你的 JobStore 将只能有 40 个方法;这仅仅是接口需要的最小数量。这 40 个方法体现 JobStore 和 Scheduler 之间的公共契约。
让我们拣出其中一个方法来简短的讨论它。我们就选 JobStore 接口方法:

public void schedulerStarted() throws SchedulerException;

​ Scheduler调用JobStore的SchedulerStarted() 方法去通知 JobStore Scheduler 已经启动了。如果你看了 RAMJobStore 的实现,你能发现它在这个方法实现中什么也没做:

public void schedulerStarted() throws SchedulerException{
    // nothing to do
}

然而,假如你去看那两个 JobStore 的实现,你会看到在 Scheduler 在首次启动时进行了一些工作:

public void schedulerStarted() throws SchedulerException {  

    if (isClustered()) {  
        clusterManagementThread = new ClusterManager(this);  
        clusterManagementThread.initialize();  
    } else {  
        try {  
            recoverJobs();  
        } catch (SchedulerException se) {  
            throw new SchedulerConfigException("Failure occurred during job recovery.", se);  
        }  
    }  
    misfireHandler = new MisfireHandler(this);  
    misfireHandler.initialize();  
}  

提供方式来创建新的JobStore是非常有意义的,这样我们可以考虑通过其他数据库、文件系统、甚至是内存来存储Quartz中的数据。

四、实现Quartz监听器

​ Quartz的监听器用于当任务调度中你所关注事件发生时,能够及时获取这一事件的通知。Quartz监听器主要有JobListener、TriggerListener、SchedulerListener三种,顾名思义,分别表示任务、触发器、调度器对应的监听器。监听器可以分为全局和非全局:全局监听器能够接收到所有的Job/Trigger的事件通知,而非全局监听器只能接收到在其上注册的Job或Trigger的事件,不在其上注册的Job或Trigger则不会进行监听。

4.1 JobListener

监听Job 在其生命周期中产生的某些关键事件时,需要实现org.quartz.JobListener接口

public interface JobListener {

    /**
     * getName() 方法返回一个字符串用以说明 JobListener 的名称
     * 对于注册为全局的监听器,getName() 主要用于记录日志,
     * 对于由特定 Job 引用的 JobListener,注册在 JobDetail 上的监听器名称必须匹配从监听器上 getName() 方法的返回值
     */
    String getName();

    /**
     * Scheduler 在 JobDetail 将要被执行时调用这个方法
     */
    void jobToBeExecuted(JobExecutionContext context);

    /**
     * Scheduler 在 JobDetail 即将被执行,但又被 TriggerListener 否决了时调用这个方法
     */
    void jobExecutionVetoed(JobExecutionContext context);

    /**
     * Scheduler 在 JobDetail 被执行之后调用这个方法
     */
    void jobWasExecuted(JobExecutionContext context,
            JobExecutionException jobException);

}

结合前面给出一个简单示例:

public class SimpleJobListener implements JobListener{

    private static Logger logger = LoggerFactory.getLogger(SimpleJobListener.class);

    @Override
    public String getName() {
        String name = getClass().getSimpleName();
        logger.info(" listener name is:"+name);
        return name;
    }

    @Override
    public void jobToBeExecuted(JobExecutionContext context) {
        String jobName = context.getJobDetail().getKey().getName();
        logger.info(jobName + " is going to be executed");
    }

    @Override
    public void jobExecutionVetoed(JobExecutionContext context) {
        String jobName = context.getJobDetail().getKey().getName();
        logger.info(jobName + " was vetoed and not executed");
    }

    @Override
    public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) {
        String jobName = context.getJobDetail().getKey().getName();
        logger.info(jobName + " was executed");

    }
}
public class HelloQuartz {

    private static Log logger = LogFactory.getLog(HelloQuartz.class);

    public static void main(String[] args) {

        DirectSchedulerFactory factory = DirectSchedulerFactory.getInstance();

        try {
            // 从Scheduler工厂获取一个Scheduler的实例
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();

            // 创建并注册一个指定任务job1的Job Listener
            scheduler.getListenerManager().addJobListener(new SimpleJobListener(), KeyMatcher.keyEquals(JobKey.jobKey("job1", "group")));


            JobDetail jobDetail1 = newJob(HelloJob.class).withIdentity("job1", "group").build();
            jobDetail1.getJobDataMap().put("CONTENT", "Hello Quartz!");
            Trigger trigger1 = newTrigger().withIdentity("trigger1", "group").startNow()
                    .withSchedule(simpleSchedule().withIntervalInSeconds(5).withRepeatCount(0)).build();
            scheduler.scheduleJob(jobDetail1, trigger1);

        } catch (SchedulerException e) {
            logger.error(e);
        }
    }
}

输出结果:

19:37:45.690 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleJobListener -  listener name is:SimpleJobListener
19:37:45.690 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleJobListener - job1 is going to be executed
19:37:45.690 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job group.job1
Hello Quartz!
19:37:45.690 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleJobListener -  listener name is:SimpleJobListener
19:37:45.690 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleJobListener - job1 was executed

​ 我们可以通过addJobListener() 注册一个非全局监听器,针对指定任务,也可以调用任务组;通过Scheduler 调用 addGlobalJobListener() 方法注册为一个全局的 JobListener。如果 Scheduler 不能根据名称找到监听器,它会抛出一个 SchedulerException 异常。
​ 加入一个非全局 JobListener 的步骤必须是依序完成。JobListener 必须首先加入到 Scheduler 中。接着,JobListener 才能够设置给 JobDetail 对象。之后,你就能使用 scheduleJob() 方法安全的把 JobDetail 加入到 Scheduler 中。

4.2 TriggerListener

实现org.quartz.TriggerListener接口可以监听Trigger 实例生命周期。

public interface TriggerListener {  
    /**
     * getName() 方法返回一个字符串用以说明 TriggerListener 的名称
     * 对于非全局的 TriggerListener,在 addTriggerListener() 方法中给定的名称必须与监听器的 getName() 方法返回值相匹配
     */
    public String getName();  

    /**
     * 当与监听器相关联的 Trigger 被触发,Job 上的 execute() 方法将要被执行时,Scheduler 就调用这个方法
     * 在全局 TriggerListener 情况下,这个方法为所有 Trigger 被调用
     */  
    public void triggerFired(Trigger trigger,  
         JobExecutionContext context);  

    /**
     * 在 Trigger 触发后,Job 将要被执行时由 Scheduler 调用这个方法
     * TriggerListener 给了一个选择去否决 Job 的执行。假如这个方法返回 true,这个 Job 将不会为此次 Trigger 触发而得到执行
     */
    public boolean vetoJobExecution(Trigger trigger,  
          JobExecutidonContext context);  

    /**
     * Scheduler 调用这个方法是在 Trigger 错过触发时
     * 如这个方法的 JavaDoc 所指出的,你应该关注此方法中持续时间长的逻辑:在出现许多错过触发的 Trigger 时,长逻辑会导致骨牌效应。你应当保持这上方法尽量的小
     */
    public void triggerMisfired(Trigger trigger);  

    /**
     * Trigger 被触发并且完成了 Job 的执行时,Scheduler 调用这个方法
     */
    public void triggerComplete(Trigger trigger,  
          JobExecutionContext context,  
          int triggerInstructionCode);  
}  

结合给出一个简单示例

public class SimpleTriggerListener implements TriggerListener{

    private static Logger logger = LoggerFactory.getLogger(SimpleTriggerListener.class);

    private String name;

    public SimpleTriggerListener(String name) {
        this.name = name;
    }

    @Override
    public String getName() {
        return name;
    }

    @Override
    public void triggerFired(Trigger trigger, JobExecutionContext context) {
        String triggerName = trigger.getKey().getName();
        logger.info(triggerName + " was fired");
    }

    @Override
    public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
        String triggerName = trigger.getKey().getName();
        logger.info(triggerName + " was not vetoed");
        return false;
    }

    @Override
    public void triggerMisfired(Trigger trigger) {
        String triggerName = trigger.getKey().getName();
        logger.info(triggerName + " misfired");
    }

    @Override
    public void triggerComplete(Trigger trigger, JobExecutionContext context,
            CompletedExecutionInstruction triggerInstructionCode) {
        String triggerName = trigger.getKey().getName();
        logger.info(triggerName + " is complete");
    }
}

输出结果

19:59:20.147 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleTriggerListener - trigger1 was fired
19:59:20.147 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleTriggerListener - trigger1 was not vetoed
19:59:20.148 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleJobListener -  listener name is:SimpleJobListener
19:59:20.148 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleJobListener - job1 is going to be executed
19:59:20.148 [DefaultQuartzScheduler_Worker-1] DEBUG org.quartz.core.JobRunShell - Calling execute on job group.job1
Hello Quartz!
19:59:20.148 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleJobListener -  listener name is:SimpleJobListener
19:59:20.148 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleJobListener - job1 was executed
19:59:20.148 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleTriggerListener - trigger1 is complete

​ 同样我们可以指定一个任务组的triggerscheduler.getListenerManager().addTriggerListener(new SimpleTriggerListener(“SimpleTrigger”), GroupMatcher.groupEquals(“group”));也可以调用 addGloabelTriggerListener()注册一个全局的 TriggerListener。
针对于前面的非全局 JobListener 提到的相同的警告可以应用到这里来;你必须在把它设置给 Trigger 实例并存储了 Trigger 之前把 TriggerListener 加入到 Scheduler 中。

4.3 SchedulerListener

org.quartz.SchedulerListener 接口包含了一系列的回调方法,它们会在 Scheduler 的生命周期中有关键事件发生时被调用。

public interface SchedulerListener {  
    /**
     * Scheduler 在有新的 JobDetail 部署时调用
     */
    public void jobScheduled(Trigger trigger);  
    /**
     * Scheduler 在有 JobDetail 卸载时调用
     */
    public void jobUnscheduled(String triggerName, String triggerGroup); 
    /**
     * 当一个 Trigger 来到了再也不会触发的状态时调用这个方法
     * 除非这个 Job 已设置成了持久性,否则它就会从 Scheduler 中移除
     */ 
    public void triggerFinalized(Trigger trigger);  
    /**
     * Scheduler 调用这个方法是发生在一个 Trigger 或 Trigger 组被暂停时
     * 假如是 Trigger 组的话,triggerName 参数将为 null
     */
    public void triggersPaused(String triggerName, String triggerGroup);  
    /**
     * Scheduler 调用这个方法是发生成一个 Trigger 或 Trigger 组从暂停中恢复时
     * 假如是 Trigger 组的话,triggerName 参数将为 null
     */
    public void triggersResumed(String triggerName,String triggerGroup);  
    /**
     * 当一个或一组 JobDetail 暂停时调用这个方法
     */
    public void jobsPaused(String jobName, String jobGroup); 
    /**
     * 当一个或一组 Job 从暂停上恢复时调用这个方法
     * 假如是一个 Job 组,jobName 参数将为 null
     */ 
    public void jobsResumed(String jobName, String jobGroup); 
    /**
     * 在 Scheduler 的正常运行期间产生一个严重错误时调用这个方法
     * 你可以使用 SchedulerException 的 getErrorCode() 或者 getUnderlyingException() 方法或获取到特定错误的更详尽的信息
     */ 
    public void schedulerError(String msg, SchedulerException cause); 
    /**
     * Scheduler 调用这个方法用来通知 SchedulerListener Scheduler 将要被关闭
     */ 
    public void schedulerShutdown();  
} 
public class HelloQuartz {

    private static Log logger = LogFactory.getLog(HelloQuartz.class);

    public static void main(String[] args) {

        DirectSchedulerFactory factory = DirectSchedulerFactory.getInstance();

        try {
            Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();

            scheduler.getListenerManager().addSchedulerListener(new SimpleSchedulerListener());
            // Scheduler 创建后是在 Job 注册之前被启动的。这就使得在 Job 部署时 jobScheduled() 方法能得到调用
            scheduler.start();

            JobDetail jobDetail1 = newJob(HelloJob.class).withIdentity("job1", "group").build();
            jobDetail1.getJobDataMap().put("CONTENT", "Hello Quartz!");
            Trigger trigger1 = newTrigger().withIdentity("trigger1", "group").startNow()
                    .withSchedule(simpleSchedule().withIntervalInSeconds(5).withRepeatCount(0)).build();
            scheduler.scheduleJob(jobDetail1, trigger1);

            // 创建并注册一个指定任务的Job Listener
            scheduler.getListenerManager().addJobListener(new SimpleJobListener(), KeyMatcher.keyEquals(JobKey.jobKey("job1", "group")));

            // 创建并注册一个局部的Trigger Listener
            scheduler.getListenerManager().addTriggerListener(new SimpleTriggerListener("SimpleTrigger"), KeyMatcher.keyEquals(TriggerKey.triggerKey("trigger1", "group")));

            Thread.sleep(10000);

            scheduler.shutdown();

            logger.info("shut down scheduler");
        } catch (SchedulerException e) {
            logger.error(e);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

输出结果:

20:36:10.401 [main] INFO com.quartz.learning.SimpleSchedulerListener - scheduler has been started
20:36:10.411 [main] INFO com.quartz.learning.SimpleSchedulerListener - group.job1 is added
20:36:10.411 [main] INFO com.quartz.learning.SimpleSchedulerListener - job1 has been scheduled
20:36:10.419 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleTriggerListener - trigger1 was fired
20:36:10.419 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleTriggerListener - trigger1 was not vetoed
20:36:10.419 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleJobListener -  listener name is:SimpleJobListener
20:36:10.419 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleJobListener - job1 is going to be executed
Hello Quartz!
20:36:10.420 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleJobListener -  listener name is:SimpleJobListener
20:36:10.420 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleJobListener - job1 was executed
20:36:10.420 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleTriggerListener - trigger1 is complete
20:36:10.420 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleSchedulerListener - Trigger is finished for job1
20:36:10.420 [DefaultQuartzScheduler_Worker-1] INFO com.quartz.learning.SimpleSchedulerListener - group.job1 is deleted
20:36:20.413 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED shutting down.
20:36:20.413 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED paused.
20:36:20.414 [main] INFO com.quartz.learning.SimpleSchedulerListener - scheduler is in standby mode
20:36:20.414 [main] INFO com.quartz.learning.SimpleSchedulerListener - scheduler is being shutdown
20:36:20.414 [main] INFO com.quartz.learning.SimpleSchedulerListener - scheduler has been shutdown
20:36:20.414 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED shutdown complete.
20:36:20.414 [main] INFO com.quartz.learning.HelloQuartz - shut down scheduler

同样,注意addSchedulerListener在示例中放置的位置,否则因为执行顺序问题,有些事件将无法捕获。

五、Quartz远程调用

Quartz RMI的目的是为不同客户端提供更好的定时任务调度服务。

5.1 创建Quartz RMI服务端

首先创建Quartz RMI的服务端。

5.1.1 配置服务端启动参数

RMI 服务端必要的属性包括:

属性 默认值 说明
org.quartz.scheduler.rmi.export FALSE 假如你要使 Quartz 调度作为一个可用的 RMI 对象,这个标记必须设置为 true
org.quartz.scheduler.rmi.registryHost localhost 这是运行 RMI 注册表所在的主机
org.quartz.scheduler.rmi.registryPort 1099 这是 RMI 注册服务监听所用的端口号(通常是1099)
org.quartz.scheduler.rmi.createRegistry never 这项决定了 Quartz 是否会创建 RMI 注册服务。如果你不希望 Quartz 创建注册服务就设置为 false 或 never。如果是希望 Quartz 首先尝试去使用已存在的注册服务,如果失败的话自行创建一个就设置为 true 或 as_needed。假如注册服务创建好了,它会使用给定的 registryPort 绑定到所给的 registryHost 上
org.quartz.scheduler.rmi.serverPort -1 这是 Quartz 调度器服务所绑定的端口号,在其中监听到来的连接。默认,RMI 服务会随机选择一个端口号作为调度器绑定到 RMI 注册服务的端口

用于 Quartz RMI 服务端的 quartz.properties 文件样例如下:

#==============================================================     
# Configure Main Scheduler Properties     
#==============================================================      
org.quartz.scheduler.instanceName = RMIScheduler     


#==============================================================     
# Configure RMI Properties     
#==============================================================      
org.quartz.scheduler.rmi.export = true     
org.quartz.scheduler.rmi.registryHost = localhost      
org.quartz.scheduler.rmi.registryPort = 1099      
org.quartz.scheduler.rmi.serverPort = 0      
org.quartz.scheduler.rmi.createRegistry = true    

#==============================================================     
# Configure ThreadPool     
#==============================================================      
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool      
org.quartz.threadPool.threadCount = 10      
org.quartz.threadPool.threadPriority = 5     

#==============================================================     
# Configure JobStore     
#==============================================================      
org.quartz.jobStore.misfireThreshold = 60000      
org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore   

5.1.2 创建服务端启动类

​ 为启动 Quartz RMI 服务端,你必须创建一个启动类,该类从工厂中获取到调度器实例,然后运行这个调度器。
​ 首先,为清晰起见,我们把 quartz.properties 文件更名为 server.properties,这时候要告诉 Quartz RMI 服务端去加载新命名的文件而不是默认的 quartz.properties 文件。更改文件名会让我们调试问题变得容易些。这样,我们可以确保 Quartz 加载的是正确的设置文件。
​ 第二个改变是:我们加载了一个新的安全管理器(SecurityManager),以便能够赋予 RMI 服务端必须的权限。

代码示例如下:

public class QuartzRMIServer {     

     public void run() throws Exception {     
          Log log = LogFactory.getLog(QuartzRMIServer.class);     

          // Use this properties file instead of quartz.properties     
          System.setProperty("org.quartz.properties",     
               "server.properties");     

          // RMI with Quartz requires a special security manager     
          if (System.getSecurityManager() == null) {     
               System.setSecurityManager(new    
                    java.rmi.RMISecurityManager());     
          }     
          // Get a reference to the Scheduler     
          Scheduler scheduler =     
               StdSchedulerFactory.getDefaultScheduler();     

          /*    
           * Due to the server.properties file, our Scheduler will   
           * be exported to RMI Registry automatically.   
           */    
          scheduler.start();     

          log.info("Quartz RMI Server started at " + new Date());     
          log.info("RMI Clients may now access it. ");     

          System.out.println("\n");     
          System.out.println(     
               "The scheduler will run until you type \"exit\"");     

          BufferedReader rdr = new BufferedReader(     
                    new InputStreamReader(System.in));     

          while (true) {     
               System.out.print("Type 'exit' to shutdown server: ");     
               if ("exit".equals(rdr.readLine())) {     
                    break;     
               }     
          }     

          log.info("Scheduler is shutting down...");     
          scheduler.shutdown(true);     
          log.info("Scheduler has been stopped.");     
     }     
     public static void main(String[] args) throws Exception {     

          QuartzRMIServer example = new QuartzRMIServer();     
          example.run();     
     }     
}   

​ 在代码示例中,安装了 RMISecurityManager 之后,通过工厂方法获得调度器实例,并调用它的 start() 方法。服务端是设计成在控制台运行的,因此一旦调度器启动之后,直至用户在控制台上键入 exit 。接着调度器被关闭也不再为远程的客户端提供服务了。
除了要使用 RMISecurityManager,我们注意到用不着在代码中做任何特别的事情,就能让 Quartz 调度器作为一个远程调度器来用。那些全是托server.properties 文件的福所致。当调度器被创建后,假如属性文件告诉它这么做,调度器就会把自己导出并注册到 RMI 注册服务器上,并使之可被远程调用。

5.1.3 使用RMI注册服务

​ 当属性 org.quartz.scheduler.rmi.createRegistry设置为 true 或者 always时,Quartz 会自动启动注册服务。
​ 你也可以选择在命令行下使用 Java 的 rmiregistry 命令来运行注册服务。假如你要通过命令行启动注册服务,要确保你启动时所用的端口号要与属性文件所指定的一致。要从命令行启动,你应先进入到 /bin 目录下,然后键入如下命令:rmiregistry <port>

5.2 创建Quartz RMI客户端

5.2.1 配置客户端启动参数

Quartz RMI 客户端所必须的属性:

属性 默认值 说明
org.quartz.scheduler.rmi.registryHost localhost 这是运行 RMI 注册服务所在的主机
org.quartz.scheduler.rmi.registryPort 1099 这是运行 RMI 注册服务所监听的端口(通常是 1099)
org.quartz.scheduler.rmi.proxy FALSE 假如你希望连接到远程服务端的调度器,设置 org.quartz.scheduler.rmi.proxy 标志为 true。你同时必须指定 RMI 注册服务进程的主机和端口号

​ 为了能让客户端定位到服务对象,它需要知道 RMI 注册服务运行在哪里,以便能查找到远程对象。org.quartz.scheduler.rmi.registryHost 和 org.quartz.scheduler.rmi.registryPort 属性必须是运行着 RMI 注册服务的主机和端口。假如你配置了 Quartz RMI 服务端自动启动注册服务,那么 RMI 注册服务器与 RMI 服务端就是同在一个机器上的。
​ 因为你想要客户端能联系到远程调度器去部署 Job,你必须设置属性 org.quartz.scheduler.rmi.proxy 为 true。
​ 一个用于 Quartz RMI 客户端的 quartz.properties 文件例子:

#=============================================================     
# Configure Main Scheduler Properties     
#=============================================================      
org.quartz.scheduler.instanceName = RMIScheduler     
#org.quartz.scheduler.instanceId = AUTO     

#==============================================================     
#Configure RMI Properties     
#==============================================================      
org.quartz.scheduler.rmi.registryHost=localhost      
org.quartz.scheduler.rmi.registryPort=1099      
org.quartz.scheduler.rmi.proxy= true  

注意,属性 org.quartz.scheduler.instanceName 在 RMI 客户端和服务端必须一致。不然,客户将无法在注册服务中查找到服务对象,会收一个客户端无法获取到远程调度器句柄的异常。

5.2.2 创建RMI客户端类

通过远程调度器部署一个 Job 的 Quartz RMI 客户端的例子。我们把 quartz.properties 文件更名为 client.properties 并且告诉客户端从更名后的文件中加载属性。

public class RMITestClient {      

     public void run() throws Exception {      

          Log log = LogFactory.getLog(RMITestClient.class);      

          // Use this properties file instead of quartz.properties      
          System.setProperty("org.quartz.properties",      
                "client.properties");      

          // Get a reference to the remote scheduler      
          Scheduler scheduler =      
               StdSchedulerFactory.getDefaultScheduler();      

          // Define the job to add      
          JobDetail job = new JobDetail("remotelyAddedJob", "default",      
                    SimpleJob.class);      
          JobDataMap map = new JobDataMap();      
          map.put("msg", "Your remotely added job has executed!");      
          job.setJobDataMap(map);      
          CronTrigger trigger =      
               new CronTrigger("remotelyAddedTrigger",      
                  "default", "remotelyAddedJob", "default", new     
                        Date(), null, "/5 * * ? * *");      


        // schedule the remote job      
         scheduler.scheduleJob(job, trigger);      

         log.info("Remote job scheduled.");      
    }      

    public static void main(String[] args) throws Exception {      
         RMITestClient example = new RMITestClient();      
         example.run();      
    }      
}    

工厂类是依据我们告诉它所加载的 client.properties 文件知道这么做的。明确的讲就是,设置了 RMI 属性导引着工厂类创建了一个远程调度器:org.quartz.scheduler.rmi.proxy = true

六、Quartz集群

集群化使Quartz满足了高可用,可扩展性强的企业级部署需要。本节主要讲Quartz的数据库集群化方式。开启Quartz集群包括3个步骤:

  1. 配置每个节点的 quartz.properties 文件,支持集群化
  2. 将Scheduler信息装在数据库
  3. 启动每个Quartz节点

6.1 集群实例的配置

集群实例的 quartz.properties 文件示例如下所示

#==============================================================  
#Configure Main Scheduler Properties  
#==============================================================  
org.quartz.scheduler.instanceName = TestScheduler1  
org.quartz.scheduler.instanceId = AUTO 
#==============================================================  
#Configure ThreadPool  
#==============================================================  
org.quartz.threadPool.class = org.quartz.simpl.Simple ThreadPool  
org.quartz.threadPool.threadCount = 5  
org.quartz.threadPool.threadPriority = 5  
#==============================================================  
#Configure JobStore  
#==============================================================  
org.quartz.jobStore.misfireThreshold = 60000  
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX  
org.quartz.jobStore.driverDelegateClass =  
org.quartz.impl.jdbcjobstore.MSSQLDelegate  
org.quartz.jobStore.tablePrefix = QRTZ_  
org.quartz.jobStore.dataSource = myDS  

org.quartz.jobStore.isClustered = true  
org.quartz.jobStore.clusterCheckinInterval = 20000  
#==============================================================  
#Non-Managed Configure Datasource  
#==============================================================  
org.quartz.dataSource.myDS.driver = net.sourceforge.jtds.jdbc.Driver  
org.quartz.dataSource.myDS.URL = jdbc:jtds:sqlserver://localhost:1433/quartz  
org.quartz.dataSource.myDS.user = admin  
org.quartz.dataSource.myDS.password = admin  
org.quartz.dataSource.myDS.maxConnections = 10  

​ 每一个实例有两个属性应该配置org.quartz.scheduler.instanceName和org.quartz.scheduler.instanceId,用在 JDBC JobStore 中和数据库来唯一标识实例。集群下实例 ID 建议使用 AUTO,防止重复后导致Quartz 集群将不能正常工作。
通过设置 org.quartz.jobStore.isClustered 属性为 true,你就告诉了 Scheduler 实例要它参与到一个集群当中。这一属性会贯穿于调度框架的始终,用于修改集群环境中操作的默认行为。
org.quartz.jobStore.clusterCheckinInterval 属性定义了Scheduler 实例检入到数据库中的频率(毫秒为单位)。Scheduler 检查是否其他的实例到了它们应当检入的时候未检入;这能指出一个失败的 Scheduler 实例,且当前 Scheduler 会以此来接管任何执行失败并可恢复的 Job。通过检入操作,Scheduler 也会更新自身的状态记录。
​ 数据库集群化方式的JobStore选择可以是JobStoreTX 或 JobStoreCMT。同时如第三节说的配置数据源及其他合理属性。

6.2 集群调度详解

​ 参考 quartz定时任务框架调度机制解析

​ Quartz集群调度是以数据库作为枢纽,各个节点并不感知其他节点的存在,只是通过数据库来进行间接的沟通。

​ quartz运行时由QuartzSchedulerThread类作为主体,循环执行调度流程。JobStore作为中间层,按照quartz的并发策略执行数据库操作,完成主要的调度逻辑。JobRunShellFactory负责实例化JobDetail对象,将其放入线程池运行。LockHandler负责获取LOCKS表中的数据库悲观锁。

Quartz集群调度流程

​ 整个quartz对任务调度的时序大致如下

Quartz集群调度时序图

梳理一下其中的流程,可以表示为:

0.调度器线程run()

1.获取待触发trigger

1.1数据库LOCKS表TRIGGER_ACCESS行加锁
1.2读取JobDetail信息
1.3读取trigger表中触发器信息并标记为”已获取”
1.4commit事务,释放锁

2.触发trigger

2.1数据库LOCKS表STATE_ACCESS行加锁
2.2确认trigger的状态
2.3读取trigger的JobDetail信息
2.4读取trigger的Calendar信息
2.5更新trigger信息
2.6commit事务,释放锁

3实例化并执行Job

3.1从线程池获取线程执行JobRunShell的run方法

​ 可以看到,这个过程中有两个相似的过程:同样是对数据表的更新操作,同样是在执行操作前获取锁 操作完成后释放锁.这一规则可以看做是quartz解决集群问题的核心思想.
​ 进一步解释这条规则就是:一个调度器实例在执行涉及到分布式问题的数据库操作前,首先要获取QUARTZ2_LOCKS表中对应当前调度器的行级锁,获取锁后即可执行其他表中的数据库操作,随着操作事务的提交,行级锁被释放,供其他调度器实例获取.
集群中的每一个调度器实例都遵循这样一种严格的操作规程,那么对于同一类调度器来说,每个实例对数据库的操作只能是串行的.而不同名的调度器之间却可以并行执行.

6.3 Quartz 集群 Cookbook

本节记录Quartz集群的细节问题。

指派 Job 给集群中特定的实例

目前不支持指派一个Job到集群中特定的节点。假如你需要这种行为,你可以创建一个非集群的 Quartz 应用与集群中的节点并行运行。注意,不要让非集群的实例指向到集群所用的同一套数据库表。不然你会得到不可预知的结果。

在集群中的每一个节点上运行 Job

正如前面所回答的,当前还没有一种方式能让某一个 Job 实例在集群中的每一个节点上都运行。最好的办法是使用一个非集群的实例与集群的每一个节点并行运行,并且要使用独立的一套数据库表或单独的 JobInitializationPlugin 和 RAMJobStore 用到的 XML 文件。

在不同的机器上运行节点

Quartz 实际并不关心你是在相同的还是不同的机器上运行节点。当集群是放置在不同的机器上时,通常称之为水平集群。节点是跑在同一台机器是,称之为垂直集群。对于垂直集群,存在着单点故障的问题。这对高可用性的应用来说是个坏消息,因为一旦机器崩溃了,所有的节点也就被有效的终止了。

使用时间同步服务

当你在是在不同的机器上运行 Quartz 集群时,时钟应当要同步,以免出现离奇且不可预知的行为。我们已经提及过,假如时钟没能够同步,Scheduler 实例将对其他节点的状态产生混乱。有几种简单的方法来保证时钟何持同步,而且也没有理由不这么做。
最简单的同步计算机时钟的方式是使用某一个 Internet 时间服务器(Internet Time Server ITS)。关于如何基于其中一个国际可接受标准来设置你的时钟的信息请看 http://tf.nist.gov/service/its.html。

从集群获取正在执行的 Job 列表

当前,如果不直接进到数据库的话,还没有一个简单的方式来得到集群中所有正在执行的 Job 列表。如果你请求一个 Scheduler 实例,你将只能得到在那个实例上正运行 Job 的列表。你可以写一些访问数据库 JDBC 代码来从适当的表中获取信息。当然,这是用的 Quartz 之外的方法,但确是能解决问题的。另一个方法是使用 Quartz 的 RMI 特性来依次连接到每一个节点,并从中查询到当前正在执行的 Job。

让集群和非集群实例一起运行

非集群环境不要使用与集群应用相同的一套数据库表;否则将得到不可预知的结果,集群和非集群的 Job 都会遇到问题。

在集群环境中使用全局监听器

在集群环境中,你仍然可以使用 Job 和 Trigger 监听器。唯一的问题是哪一个 Scheduler 实例将收到方法回调。
要记住这个最简单的方法是:Job 或 Trigger 是在哪个 Scheduler 实例上执行的,通知的就是这个 Scheduler 实例上的监听器。因为 Job 和 Trigger 只会在单个节点上执行,也就只会通知那个节点上的监听器。

到此Java Quartz Job Scheduling基础的内容已经完成,但对Quartz的学习还远远没有结束,比如类似Terracotta Quartz这种分布式缓存实现Quartz集群,比如Quartz与Spring Boot框架结合的使用。未完待续…

参考文档

《Quartz Job Scheduling Framework v1.0.0》
http://www.quartz-scheduler.org/documentation/quartz-2.2.x/tutorials/ Quartz Job Scheduler Tutorials
https://www.cnblogs.com/mengrennwpu/p/7191229.html Quartz使用(4) - Quartz监听器Listerner
https://www.cnblogs.com/zhenyuyaodidiao/p/4755649.html Quartz集群原理及配置应用
https://www.cnblogs.com/yangyudexiaobai/p/4422665.html quartz定时任务框架调度机制解析


文章作者: zohar
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 zohar !
  目录