【資料圖】
public class TestScheduled { /** * 1、使用Spring自帶的TaskScheduler注冊(cè)任務(wù) * 2、注冊(cè)后返回:ScheduledFuture,用于取消定時(shí)任務(wù) */ @Resource private TaskScheduler taskScheduler; public void registrarTask() { //具體的任務(wù)Runnable(一般使用類實(shí)現(xiàn)Runnable接口) Runnable taskRunnable = new Runnable() { @Override public void run() { } }; //cron表達(dá)式觸發(fā)器 CronTrigger trigger = new CronTrigger("0/5 * * * * ?"); //開啟定時(shí)任務(wù)的真正方法 ScheduledFuture> future = this.taskScheduler.schedule(taskRunnable, trigger); //取消定時(shí)任務(wù) future.cancel(true); }}二、具體實(shí)現(xiàn)1、配置任務(wù)調(diào)度器作用:設(shè)置:核心線程數(shù):可同時(shí)執(zhí)行任務(wù)數(shù);設(shè)置線程名稱前綴可以不配置。不配置就默認(rèn)使用spring自帶的package com.cc.ssd.config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.TaskScheduler;import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;/** TaskScheduler任務(wù)調(diào)度器配置類 * @since 2023/4/21 0021 * @author CC **/@Configurationpublic class CronTaskConfig { /** * 任務(wù)調(diào)度器自定義配置 */ @Bean(name = "taskScheduler") public TaskScheduler taskScheduler() { // 任務(wù)調(diào)度線程池 ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); // 定時(shí)任務(wù)執(zhí)行線程池核心線程數(shù):可同時(shí)執(zhí)行4個(gè)任務(wù) taskScheduler.setPoolSize(4); taskScheduler.setRemoveOnCancelPolicy(true); // 線程名稱前綴 taskScheduler.setThreadNamePrefix("Cs-ThreadPool-"); return taskScheduler; }}2、定時(shí)任務(wù)注冊(cè)類作用:緩存、注冊(cè)定時(shí)任務(wù);還可以查詢、刪除定時(shí)任務(wù)package com.cc.ssd.registrar;import com.cc.ssd.task.CronTaskFuture;import com.cc.ssd.task.CronTaskRunnable;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.BeanUtils;import org.springframework.beans.factory.DisposableBean;import org.springframework.scheduling.TaskScheduler;import org.springframework.scheduling.config.CronTask;import org.springframework.scheduling.support.CronExpression;import org.springframework.stereotype.Component;import org.springframework.util.Assert;import javax.annotation.Resource;import java.time.LocalDateTime;import java.time.format.DateTimeFormatter;import java.util.*;import java.util.concurrent.ConcurrentHashMap;import java.util.stream.Collectors;/** 注冊(cè)定時(shí)任務(wù):緩存定時(shí)任務(wù)、注冊(cè)定時(shí)任務(wù)到調(diào)度中心 * @author CC **/@Componentpublic class CronTaskRegistrar implements DisposableBean { private static final Logger log = LoggerFactory.getLogger(CronTaskRegistrar.class); /** * 緩存任務(wù) * key:具體的任務(wù) * value:注冊(cè)定時(shí)任務(wù)后返回的ScheduledFuture */ private final Map scheduledTasks = new ConcurrentHashMap<>(16); /** * 使用自定義的任務(wù)調(diào)度配置 */ @Resource(name = "taskScheduler") private TaskScheduler taskScheduler; /** 獲取任務(wù)調(diào)度配置 * @return 任務(wù)調(diào)度配置 */ public TaskScheduler getTaskScheduler() { return this.taskScheduler; } /** 新增定時(shí)任務(wù)1 * 存在任務(wù):刪除此任務(wù),重新新增這個(gè)任務(wù) * @param taskRunnable 執(zhí)行的具體任務(wù)定義:taskRunnable 實(shí)現(xiàn)Runnable * @param cronExpression cron表達(dá)式 */ public void addCronTask(Runnable taskRunnable, String cronExpression) { //驗(yàn)證cron表達(dá)式是否正確 boolean validExpression = CronExpression.isValidExpression(cronExpression); if (!validExpression) { throw new RuntimeException("cron表達(dá)式驗(yàn)證失??!"); } //獲取下次執(zhí)行時(shí)間 CronExpression parse = CronExpression.parse(cronExpression); LocalDateTime next = parse.next(LocalDateTime.now()); if (Objects.nonNull(next)) { //定時(shí)任務(wù)下次執(zhí)行的時(shí)間 String format = next.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); log.info("定時(shí)任務(wù)下次執(zhí)行的時(shí)間:{}", format); } //封裝成 CronTask(cron任務(wù)) CronTask cronTask = new CronTask(taskRunnable, cronExpression); this.addCronTask(cronTask); } /** 新增定時(shí)任務(wù)2 * @param cronTask :CronTask用于在指定時(shí)間間隔內(nèi)執(zhí)行定時(shí)任務(wù)。
* 它是通過CronTrigger來實(shí)現(xiàn)的,CronTrigger是一個(gè)基于cron表達(dá)式的觸發(fā)器,
* 可以在指定的時(shí)間間隔內(nèi)觸發(fā)任務(wù)執(zhí)行。
* @since 2023/4/21 0021 * @author CC **/ private void addCronTask(CronTask cronTask) { if (Objects.nonNull(cronTask)) { //1有這個(gè)任務(wù),先刪除這個(gè)任務(wù)。再新增 Runnable task = cronTask.getRunnable(); String taskId = null; if (task instanceof CronTaskRunnable) { taskId = ((CronTaskRunnable) task).getTaskId(); } //通過任務(wù)id獲取緩存的任務(wù),如果包含則刪除,然后新增任務(wù) Runnable taskCache = this.getTaskByTaskId(taskId); if (Objects.nonNull(taskCache) && this.scheduledTasks.containsKey(taskCache)) { this.removeCronTaskByTaskId(taskId); } //2注冊(cè)定時(shí)任務(wù)到調(diào)度中心 CronTaskFuture scheduledFutureTask = this.scheduleCronTask(cronTask); //3緩存定時(shí)任務(wù) this.scheduledTasks.put(task, scheduledFutureTask); //todo cc 4可以將任務(wù)保存到數(shù)據(jù)庫(kù)中……重新啟動(dòng)程序然后加載數(shù)據(jù)庫(kù)中的任務(wù)到緩存中…… } } /** 注冊(cè) ScheduledTask 定時(shí)任務(wù) * @param cronTask cronTask * @return 注冊(cè)定時(shí)任務(wù)后返回的 ScheduledFutureTask */ private CronTaskFuture scheduleCronTask(CronTask cronTask) { //注冊(cè)定時(shí)任務(wù)后記錄的Future CronTaskFuture scheduledTask = new CronTaskFuture(); //開啟定時(shí)任務(wù)的真正方法 scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());// scheduledTask.setThreadLocal(this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger())); return scheduledTask; } /** 獲取任務(wù)列表 * @return */ public List getScheduledTasks() { List tasks = new ArrayList<>(); Set keySet = scheduledTasks.keySet(); keySet.forEach(key -> { CronTaskRunnable task = new CronTaskRunnable(); if (key instanceof CronTaskRunnable) { CronTaskRunnable taskParent = (CronTaskRunnable) key; BeanUtils.copyProperties(taskParent, task); } tasks.add(task); }); return tasks.stream() .sorted(Comparator.comparing(CronTaskRunnable::getTaskId)) .collect(Collectors.toList()); } /** 根據(jù)任務(wù)id刪除單個(gè)定時(shí)任務(wù) * @param taskId 任務(wù)id */ public void removeCronTaskByTaskId(String taskId) { //通過任務(wù)id獲取任務(wù) Runnable task = this.getTaskByTaskId(taskId); //需要通過任務(wù)id獲取任務(wù),然后再移除 CronTaskFuture cronTaskFuture = this.scheduledTasks.remove(task); if (Objects.nonNull(cronTaskFuture)) { cronTaskFuture.cancel(); } } /** 通過任務(wù)id獲取任務(wù)。未查詢到返回null * @param taskId 任務(wù)id * @return java.lang.Runnable * @since 2023/4/21 0021 * @author CC **/ private Runnable getTaskByTaskId(String taskId) { Assert.notNull(taskId, "任務(wù)id不能為空!"); Set> entries = scheduledTasks.entrySet(); //根據(jù)任務(wù)id獲取該任務(wù)緩存 Map.Entry rcf = entries.stream().filter(rf -> { Runnable key = rf.getKey(); String taskId1 = null; if (key instanceof CronTaskRunnable) { taskId1 = ((CronTaskRunnable) key).getTaskId(); } return taskId.equals(taskId1); }).findAny().orElse(null); if (Objects.nonNull(rcf)) { return rcf.getKey(); } return null; } /** 刪除所有的定時(shí)任務(wù) * DisposableBean是Spring框架中的一個(gè)接口,它定義了一個(gè)destroy()方法, * 用于在Bean銷毀時(shí)執(zhí)行清理工作。 * 當(dāng)一個(gè)Bean實(shí)現(xiàn)了DisposableBean接口時(shí), * Spring容器會(huì)在該Bean銷毀時(shí)自動(dòng)調(diào)用destroy()方法, * 以便進(jìn)行一些清理工作,例如釋放資源等。 * 如果您的Bean需要在銷毀時(shí)執(zhí)行一些清理工作, * 那么實(shí)現(xiàn)DisposableBean接口是一個(gè)很好的選擇。 */ @Override public void destroy() { //關(guān)閉所有定時(shí)任務(wù) for (CronTaskFuture task : this.scheduledTasks.values()) { task.cancel(); } //清空緩存 this.scheduledTasks.clear(); log.info("取消所有定時(shí)任務(wù)!"); //todo cc 修改或刪除數(shù)據(jù)庫(kù)的任務(wù) }} 3、定時(shí)任務(wù)的執(zhí)行結(jié)果ScheduledFuture作用:CronTaskFuture類中使用的是ScheduledFuture對(duì)象來表示定時(shí)任務(wù)的執(zhí)行結(jié)果。package com.cc.ssd.task;import java.util.Objects;import java.util.concurrent.ScheduledFuture;/** CronTaskFuture類中使用的是ScheduledFuture對(duì)象來表示定時(shí)任務(wù)的執(zhí)行結(jié)果。 * ——最后ps:也可以不要這個(gè)記錄類,直接緩存ScheduledFuture對(duì)象。 * 用來記錄單獨(dú)的Future、定時(shí)任務(wù)注冊(cè)任務(wù)后產(chǎn)生的 * @author CC **/public final class CronTaskFuture { /** 每個(gè)線程一個(gè)副本 * 經(jīng)過測(cè)試這里不能使用ThreadLocal */// private static final ThreadLocal> THREAD_LOCAL = new ThreadLocal<>(); /** 最后ps:由于ScheduledFuture是線程安全的。這里不用 volatile 或者 ThreadLocal * 注冊(cè)任務(wù)后返回的:ScheduledFuture 用于記錄并取消任務(wù) * 這兩個(gè)都可以不使用。直接給future賦值 * volatile:線程之間可見:volatile用于實(shí)現(xiàn)多線程之間的可見性和一致性,保證數(shù)據(jù)的正確性。 * ThreadLocal:用于實(shí)現(xiàn)線程封閉,保證線程安全 * 使用建議: * CronTaskFuture類中使用的是ScheduledFuture對(duì)象來表示定時(shí)任務(wù)的執(zhí)行結(jié)果。 * ScheduledFuture對(duì)象是線程安全的,因此不需要使用volatile關(guān)鍵字來保證多線程同步。 * 如果需要在多線程中使用線程本地變量,可以使用ThreadLocal。 * 因此,建議在CronTaskFuture類中使用ScheduledFuture對(duì)象,而不是使用volatile或ThreadLocal。 * 另外,如果需要在Spring容器銷毀時(shí)執(zhí)行一些清理操作,可以實(shí)現(xiàn)DisposableBean接口,并在destroy()方法中進(jìn)行清理操作。 */ public ScheduledFuture> future;// public volatile ScheduledFuture> future;// public void setThreadLocal(ScheduledFuture> future){// THREAD_LOCAL.set(future);// } /** * 取消當(dāng)前定時(shí)任務(wù) */ public void cancel() { try {// ScheduledFuture> future = THREAD_LOCAL.get(); ScheduledFuture> future = this.future; if (Objects.nonNull(future)) { future.cancel(true); } } catch (Exception e) { throw new RuntimeException("銷毀定時(shí)任務(wù)失??!"); } finally {// THREAD_LOCAL.remove(); } }} 4、具體的任務(wù)。實(shí)現(xiàn)Runable接口任務(wù)處理的方式按照自己的需求去實(shí)現(xiàn)即可package com.cc.ssd.task;import lombok.Data;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;/** 具體任務(wù)實(shí)現(xiàn) * @author CC * @since 2023/4/21 0021 */@Datapublic class CronTaskRunnable implements Runnable { private static final Logger log = LoggerFactory.getLogger(CronTaskRunnable.class); /** * 任務(wù)id(必須唯一) */ private String taskId; /** * 任務(wù)類型:自定義 */ private Integer taskType; /** * 任務(wù)名字 */ private String taskName; /** * 任務(wù)參數(shù) */ private Object[] params; public CronTaskRunnable() { } public CronTaskRunnable(String taskId, Integer taskType, String taskName, Object... params) { this.taskId = taskId; this.taskType = taskType; this.taskName = taskName; this.params = params; } /** 執(zhí)行任務(wù) * @since 2023/4/21 0021 * @author CC **/ @Override public void run() { long start = System.currentTimeMillis(); //具體的任務(wù)。 log.info("\n\t {}號(hào).定時(shí)任務(wù)開始執(zhí)行 - taskId:{},taskName:{},taskType:{},params:{}", taskType, taskId, taskName, taskType, params); //任務(wù)處理的方式: //todo cc 1就在這里執(zhí)行:模擬任務(wù) //todo cc 2開啟策略模式,根據(jù)任務(wù)類型 調(diào)度不同的任務(wù) //todo cc 3使用反射:傳來bean名字,方法名字,調(diào)用不同的任務(wù) //todo cc 4開啟隊(duì)列,把要執(zhí)行的任務(wù)放到隊(duì)列中,然后執(zhí)行 —— 使用場(chǎng)景:每個(gè)任務(wù)執(zhí)行很耗時(shí)的情況下使用 try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } log.info("\n\t {}號(hào).任務(wù)執(zhí)行完成 - 耗時(shí):{},taskId:{},taskType:{}", taskType, System.currentTimeMillis() - start, taskId, taskType); }}5、測(cè)試Controllerpackage com.cc.ssd.web.controller;import com.cc.ssd.registrar.CronTaskRegistrar;import com.cc.ssd.task.CronTaskRunnable;import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;import java.util.List;import java.util.Map;/** * @author CC * @since 2023/4/21 0021 */@RestController@RequestMapping("/scheduled")public class TestScheduledController { @Resource private CronTaskRegistrar cronTaskRegistrar; /** 獲取任務(wù)列表 * @return java.util.List * @since 2023/4/21 0021 * @author CC **/ @GetMapping public List getScheduledTasks() { return cronTaskRegistrar.getScheduledTasks(); } /** 添加任務(wù) * @param param param * @return java.lang.String * @since 2023/4/21 0021 * @author CC **/ @PostMapping public String addCronTask(@RequestBody Map param) { //自己拿任務(wù)參數(shù)的邏輯:可以把每個(gè)任務(wù)保存到數(shù)據(jù)庫(kù),重新啟動(dòng)任務(wù)的同時(shí),加載這些任務(wù)到任務(wù)調(diào)度中心 String taskId = (String) param.get("taskId"); Integer taskType = (Integer) param.get("taskType"); String taskName = (String) param.get("taskName"); Object params = param.get("params"); //添加任務(wù)參數(shù) CronTaskRunnable task = new CronTaskRunnable(taskId, taskType, taskName, params); //注冊(cè)任務(wù):cron表達(dá)式,可以從傳入不一樣的 cronTaskRegistrar.addCronTask(task, "0/5 * * * * ?"); return "ok"; } /** 根據(jù)任務(wù)id刪除定時(shí)任務(wù) * @param taskId 任務(wù)id * @return java.lang.String * @since 2023/4/21 0021 * @author CC **/ @DeleteMapping public String removeCronTaskByTaskId(@RequestParam String taskId) { cronTaskRegistrar.removeCronTaskByTaskId(taskId); return "ok"; } /** 刪除全部任務(wù) * @return java.lang.String * @since 2023/4/21 0021 * @author CC **/ @DeleteMapping("/removeAll") public String removeCronTask() { cronTaskRegistrar.destroy(); return "ok"; }} 6、最后效果自己用controller去測(cè)試一波吧 關(guān)鍵詞:




















