




已閱讀5頁,還剩19頁未讀, 繼續(xù)免費(fèi)閱讀
版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請(qǐng)進(jìn)行舉報(bào)或認(rèn)領(lǐng)
文檔簡介
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();只有availThreadCount大于0時(shí)才會(huì)進(jìn)行真正的調(diào)度,負(fù)責(zé)將輪詢等待線程的釋放。所以我們來看看可用線程數(shù)充足的情況下的執(zhí)行過程。獲取觸發(fā)器獲取觸發(fā)器的代碼見代碼清單1。其中調(diào)用了JobStore的acquireNextTriggers方法來獲取觸發(fā)器。代碼清單1 List triggers = null; long now = System.currentTimeMillis(); clearSignaledSchedulingChange(); try triggers = qsRsrcs.getJobStore().acquireNextTriggers( now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize(), qsRsrcs.getBatchTimeWindow(); lastAcquireFailed = false; if (log.isDebugEnabled() log.debug(batch acquisition of + (triggers = null ? 0 : triggers.size() + triggers); catch (JobPersistenceException jpe) /省略異常信息 catch (RuntimeException e) /省略異常信息 以JobStore的實(shí)現(xiàn)類LocalDataSourceJobStore來具體看看acquireNextTriggers方法的執(zhí)行內(nèi)容。LocalDataSourceJobStore繼承了父類JobStoreSupport的acquireNextTriggers方法(見代碼清單2),此方法用于從數(shù)據(jù)源獲取觸發(fā)器。代碼清單2 public List acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow) throws JobPersistenceException String lockName; if(isAcquireTriggersWithinLock() | maxCount 1) lockName = LOCK_TRIGGER_ACCESS; else lockName = null; return executeInNonManagedTXLock(lockName, new TransactionCallbackList() public List execute(Connection conn) throws JobPersistenceException return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow); , new TransactionValidatorList() public Boolean validate(Connection conn, List result) throws JobPersistenceException try List acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId(); Set fireInstanceIds = new HashSet(); for (FiredTriggerRecord ft : acquired) fireInstanceIds.add(ft.getFireInstanceId(); for (OperableTrigger tr : result) if (fireInstanceIds.contains(tr.getFireInstanceId() return true; return false; catch (SQLException e) throw new JobPersistenceException(error validating trigger acquisition, e); ); JobStoreSupport的acquireNextTriggers方法,主要調(diào)用了executeInNonManagedTXLock方法(見代碼清單3),其執(zhí)行邏輯如下:1. 獲取數(shù)據(jù)庫連接;2. 回調(diào)txCallback(即代碼清單2中的TransactionCallback的匿名類)的execute方法,因此調(diào)用了acquireNextTrigger方法獲取觸發(fā)器;3. 調(diào)用commitConnection方法提交第2步中的所有sql;4. 返回獲取的觸發(fā)器集合;代碼清單3 protected T executeInNonManagedTXLock( String lockName, TransactionCallback txCallback, final TransactionValidator txValidator) throws JobPersistenceException boolean transOwner = false; Connection conn = null; try if (lockName != null) / If we arent using db locks, then delay getting DB connection / until after acquiring the lock since it isnt needed. if (getLockHandler().requiresConnection() conn = getNonManagedTXConnection(); transOwner = getLockHandler().obtainLock(conn, lockName); if (conn = null) conn = getNonManagedTXConnection(); final T result = txCallback.execute(conn); try commitConnection(conn); catch (JobPersistenceException e) rollbackConnection(conn); if (txValidator = null | !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback() Override public Boolean execute(Connection conn) throws JobPersistenceException return txValidator.validate(conn, result); ) throw e; Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion(); if(sigTime != null & sigTime = 0) signalSchedulingChangeImmediately(sigTime); return result; catch (JobPersistenceException e) rollbackConnection(conn); throw e; catch (RuntimeException e) rollbackConnection(conn); throw new JobPersistenceException(Unexpected runtime exception: + e.getMessage(), e); finally try releaseLock(lockName, transOwner); finally cleanupConnection(conn); acquireNextTrigger方法用于獲取觸發(fā)器,它的執(zhí)行步驟如下:首先,查詢狀態(tài)為WAITING的觸發(fā)器(見代碼清單4),以StdJDBCDelegate為例,其selectTriggerToAcquire方法(使用JDBC的API,留給讀者自己去看)實(shí)際就是執(zhí)行sql查詢觸發(fā)器,執(zhí)行的sql為:SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM 0TRIGGERS WHERE SCHED_NAME = 1 AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME = ?) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC。TRIGGER_STATE的條件是WAITING,NEXT_FIRE_TIME小于最遲的觸發(fā)時(shí)間,并且要大于最早的觸發(fā)時(shí)間。注意:本文所有sql中的0為QRTZ_,1為schedulerFactoryBean。代碼清單4 List keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);其次,遍歷集合keys中的TriggerKey,在循環(huán)中執(zhí)行以下步驟:1. 從表QRTZ_TRIGGERS中查詢觸發(fā)器,代碼為:OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);retrieveTrigger方法內(nèi)部實(shí)際執(zhí)行了Delegate的selectTrigger方法。以StdJDBCDelegate為例,其selectTrigger方法根據(jù)TriggerKey查詢觸發(fā)器,執(zhí)行的sql為:SELECT * FROM 0TRIGGERS WHERE SCHED_NAME = 1 AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?2. 根據(jù)OperableTrigger持有的JobKey查詢表QRTZ_JOB_DETAILS中對(duì)應(yīng)的作業(yè)信息,并將作業(yè)添加到集合acquiredJobKeysForNoConcurrentExec中,代碼如下:3. JobKey jobKey = nextTrigger.getJobKey();4. JobDetail job = getDelegate().selectJobDetail(conn, jobKey, getClassLoadHelper();5. if (job.isConcurrentExectionDisallowed() 6. if (acquiredJobKeysForNoConcurrentExec.contains(jobKey) 7. continue; / next trigger8. else 9. acquiredJobKeysForNoConcurrentExec.add(jobKey);10. 以StdJDBCDelegate為例,其selectJobDetail方法執(zhí)行的sql為:SELECT * FROM 0JOB_DETAILS WHERE SCHED_NAME = 1 AND JOB_NAME = ? AND JOB_GROUP = ?11. 根據(jù)已獲得的TriggerKey,將此觸發(fā)器在表QRTZ_TRIGGERS中的狀態(tài)從WAITING更新為ACQUIRED,代碼如下: int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);以StdJDBCDelegate為例,其updateTriggerStateFromOtherState方法執(zhí)行的sql為:UPDATE 0TRIGGERS SET TRIGGER_STATE = ? WHERE SCHED_NAME = 1 AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ? AND TRIGGER_STATE = ?12. 給OperableTrigger設(shè)置實(shí)例ID,然后將已觸發(fā)的觸發(fā)器插入表QRTZ_FIRED_TRIGGERS,代碼如下:13. nextTrigger.setFireInstanceId(getFiredTriggerRecordId(); getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);以StdJDBCDelegate為例,其insertFiredTrigger方法執(zhí)行的sql為:INSERT INTO 0FIRED_TRIGGERS (SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) VALUES(1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)14. 將OperableTrigger添加到acquiredTriggers,代碼如下: acquiredTriggers.add(nextTrigger);最后,返回獲得的所有觸發(fā)器集合acquiredTriggers;觸發(fā)觸發(fā)器在獲取觸發(fā)器后,下下來就是要觸發(fā)這些觸發(fā)器(見代碼清單5),可以看到調(diào)用了JobStore的triggersFired方法。代碼清單5 / set triggers to executing List bndles = new ArrayList(); boolean goAhead = true; synchronized(sigLock) goAhead = !halted.get(); if(goAhead) try List res = qsRsrcs.getJobStore().triggersFired(triggers); if(res != null) bndles = res; catch (SchedulerException se) /省略異常信息 以JobStore的實(shí)現(xiàn)類LocalDataSourceJobStore來具體看看triggersFired方法的執(zhí)行內(nèi)容。LocalDataSourceJobStore繼承了父類JobStoreSupport的triggersFired方法(見代碼清單6),此方法用于觸發(fā)觸發(fā)器。代碼清單6 public List triggersFired(final List triggers) throws JobPersistenceException return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS, new TransactionCallbackList() public List execute(Connection conn) throws JobPersistenceException List results = new ArrayList(); TriggerFiredResult result; for (OperableTrigger trigger : triggers) try TriggerFiredBundle bundle = triggerFired(conn, trigger); result = new TriggerFiredResult(bundle); catch (JobPersistenceException jpe) result = new TriggerFiredResult(jpe); catch(RuntimeException re) result = new TriggerFiredResult(re); results.add(result); return results; , new TransactionValidatorList() Override public Boolean validate(Connection conn, List result) throws JobPersistenceException try List acquired = getDelegate().selectInstancesFiredTriggerRecords(conn, getInstanceId(); Set executingTriggers = new HashSet(); for (FiredTriggerRecord ft : acquired) if (STATE_EXECUTING.equals(ft.getFireInstanceState() executingTriggers.add(ft.getFireInstanceId(); for (TriggerFiredResult tr : result) if (tr.getTriggerFiredBundle() != null & executingTriggers.contains(tr.getTriggerFiredBundle().getTrigger().getFireInstanceId() return true; return false; catch (SQLException e) throw new JobPersistenceException(error validating trigger acquisition, e); ); 可以看到triggersFired方法也調(diào)用了executeInNonManagedTXLock方法,我們根據(jù)前面的分析,知道最終實(shí)際會(huì)回調(diào)新的TransactionCallback匿名類的execute方法,可以看到其主要執(zhí)行邏輯無非循環(huán)triggers列表,并且調(diào)用triggerFired方法獲取TriggerFiredBundle。triggerFired的執(zhí)行步驟如下:1. 獲取表QRTZ_TRIGGERS中的觸發(fā)器狀態(tài),代碼如下:2. try / if trigger was deleted, state will be STATE_DELETED3. String state = getDelegate().selectTriggerState(conn,4. trigger.getKey();5. if (!state.equals(STATE_ACQUIRED) 6. return null;7. 8. catch (SQLException e) 9. throw new JobPersistenceException(Couldnt select trigger state: 10. + e.getMessage(), e); 以StdJDBCDelegate為例,其selectTriggerState方法的執(zhí)行sql為:SELECT TRIGGER_STATE FROM 0TRIGGERS WHERE SCHED_NAME = 1 AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?11. 查詢觸發(fā)器對(duì)應(yīng)的作業(yè),代碼如下:12. try 13. job = retrieveJob(conn, trigger.getJobKey();14. if (job = null) return null; 15. catch (JobPersistenceException jpe) 16. try 17. getLog().error(Error retrieving job, setting trigger state to ERROR., jpe);18. getDelegate().updateTriggerState(conn, trigger.getKey(),19. STATE_ERROR);20. catch (SQLException sqle) 21. getLog().error(Unable to set trigger state to ERROR., sqle);22. 23. throw jpe; 這里的retrieveJob方法實(shí)際也調(diào)用了Delegate的selectJobDetail方法,不再贅述。24. 更新表QRTZ_FIRED_TRIGGERS中此觸發(fā)器被觸發(fā)的狀態(tài)為STATE_EXECUTING,代碼如下:25. try 26. getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);27. catch (SQLException e) 28. throw new JobPersistenceException(Couldnt insert fired trigger: 29. + e.getMessage(), e); 這里實(shí)際執(zhí)行的sql為UPDATE 0FIRED_TRIGGERS SET INSTANCE_NAME = ?, FIRED_TIME = ?, SCHED_TIME = ?, STATE = ?, JOB_NAME = ?, JOB_GROUP = ?, IS_NONCONCURRENT = ?, REQUESTS_RECOVERY = ? WHERE SCHED_NAME = 1 AND ENTRY_ID = ?30. 更新觸發(fā)器被觸發(fā)的狀態(tài),代碼如下: trigger.triggered(cal);31. 如果捕獲到DisallowConcurrentExecution,則將處于STATE_WAITING、STATE_ACQUIRED、STATE_PAUSED狀態(tài)的觸發(fā)器的狀態(tài)修改為STATE_BLOCKED,代碼如下:32. if (job.isConcurrentExectionDisallowed() 33. state = STATE_BLOCKED;34. force = false;35. try 36. getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),37. STATE_BLOCKED, STATE_WAITING);38. getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),39. STATE_BLOCKED, STATE_ACQUIRED);40. getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),41. STATE_PAUSED_BLOCKED, STATE_PAUSED);42. catch (SQLException e) 43. throw new JobPersistenceException(44. Couldnt update states of blocked triggers: 45. + e.getMessage(), e);46. 47. 插入新的觸發(fā)器,代碼如下: storeTrigger(conn, trigger, job, true, state, force, false);storeTrigger方法首先調(diào)用triggerExists用于判斷當(dāng)前觸發(fā)器是否存在: boolean existingTrigger = triggerExists(conn, newTrigger.getKey();以StdJDBCDelegate為例,其triggerExists方法中執(zhí)行的sql為:SELECT TRIGGER_NAME FROM 0TRIGGERS WHERE SCHED_NAME = 1 AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?。然后根據(jù)existingTrigger的值插入或者更新表QRTZ_TRIGGERS中觸發(fā)器的下次觸發(fā)時(shí)間,代碼如下: if (existingTrigger) getDelegate().updateTrigger(conn, newTrigger, state, job); else getDelegate().insertTrigger(conn, newTrigger, state, job); 以StdJDBCDelegate為例,其執(zhí)行的sql為:INSERT INTO 0TRIGGERS (SCHED_NAME, TRIGGER_NAME, TRIGGER_GROUP, JOB_NAME, JOB_GROUP, DESCRIPTION, NEXT_FIRE_TIME, PREV_FIRE_TIME, TRIGGER_STATE, TRIGGER_TYPE, START_TIME, END_TIME, CALENDAR_NAME, MISFIRE_INSTR, JOB_DATA, PRIORITY) VALUES(1, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)48. 返回TriggerFiredBundle對(duì)象,代碼如下:49. return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()50. .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime();51.創(chuàng)建作業(yè)運(yùn)行的shell腳本之后QuartzSchedulerThread會(huì)遍歷每個(gè)TriggerFiredBundle,然后創(chuàng)建作業(yè)運(yùn)行的shell腳本,見代碼清單7.代碼清單7 for (int i = 0; i bndles.size(); i+) TriggerFiredResult result = bndles.get(i); TriggerFiredBundle bndle = result.getTriggerFiredBundle(); Exception exception = result.getException(); if (exception instanceof RuntimeException) getLog().error(RuntimeException while firing trigger + triggers.get(i), exception); qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i); continue; / its possible to get null if the triggers was paused, / blocked, or other similar occurrences that prevent it being / fired at this time. or if the scheduler was shutdown (halted) if (bndle = null) qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i); continue;
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請(qǐng)下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請(qǐng)聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會(huì)有圖紙預(yù)覽,若沒有圖紙預(yù)覽就沒有圖紙。
- 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
- 5. 人人文庫網(wǎng)僅提供信息存儲(chǔ)空間,僅對(duì)用戶上傳內(nèi)容的表現(xiàn)方式做保護(hù)處理,對(duì)用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對(duì)任何下載內(nèi)容負(fù)責(zé)。
- 6. 下載文件中如有侵權(quán)或不適當(dāng)內(nèi)容,請(qǐng)與我們聯(lián)系,我們立即糾正。
- 7. 本站不保證下載資源的準(zhǔn)確性、安全性和完整性, 同時(shí)也不承擔(dān)用戶因使用這些下載資源對(duì)自己和他人造成任何形式的傷害或損失。
最新文檔
- 2025年3D打印建筑行業(yè)產(chǎn)業(yè)鏈上下游分析報(bào)告
- 食品加工企業(yè)節(jié)能減排技術(shù)改造項(xiàng)目實(shí)施效果評(píng)估與改進(jìn)建議
- 人工智能在金融領(lǐng)域的倫理問題與監(jiān)管政策創(chuàng)新報(bào)告
- rec硅烷氣生產(chǎn)工藝
- ISO15189質(zhì)量監(jiān)督記錄表
- 2025-2030中國金屬浸漬設(shè)備行業(yè)運(yùn)營態(tài)勢與前景動(dòng)態(tài)預(yù)測報(bào)告
- 2025-2030中國花膠行業(yè)消費(fèi)狀況與營銷趨勢預(yù)測報(bào)告
- 2025-2030中國自動(dòng)窗簾行業(yè)競爭態(tài)勢與營銷趨勢預(yù)測報(bào)告
- 2025-2030中國聚丙烯硬質(zhì)塑料包裝行業(yè)需求動(dòng)態(tài)與應(yīng)用趨勢預(yù)測報(bào)告
- 甲狀腺腫瘤的早期診斷與治療進(jìn)展
- 最新國家開放大學(xué)電大《調(diào)劑學(xué)》形考任務(wù)4試題及答案
- 五制配套的基層管理模式
- 有機(jī)磷農(nóng)藥中毒(新)課件
- 室性早搏的定位診斷與鑒別共26張課件
- 人防卷材防水層工程檢驗(yàn)批質(zhì)量驗(yàn)收記錄表
- DB11T 716-2019 穿越既有道路設(shè)施工程技術(shù)要求
- 濕式氧化技術(shù)
- T∕CACM 1316-2019 中醫(yī)內(nèi)科臨床診療指南 中風(fēng)后吞咽困難
- 于新華中考專題2018
- 江蘇自考精密加工與特種加工復(fù)習(xí)大全
- 公司發(fā)生火災(zāi)應(yīng)急流程圖
評(píng)論
0/150
提交評(píng)論