異步任務(wù)處理在復(fù)雜Web應(yīng)用中的研究設(shè)計(jì)
在Web應(yīng)用中,某些功能的實(shí)現(xiàn)邏輯很復(fù)雜、執(zhí)行比較耗時(shí)[1],例如涉及外部系統(tǒng)調(diào)用、多數(shù)據(jù)源等;此時(shí),希望可以讓這些復(fù)雜的業(yè)務(wù)邏輯放在后臺(tái)執(zhí)行,而前臺(tái)與用戶的交互可以不用等待,從而提高用戶體驗(yàn);或者需要以一定時(shí)間間隔重復(fù)運(yùn)行任務(wù)、或在每天的指定時(shí)間運(yùn)行任務(wù)的情況。為此,需要控制大型任務(wù)對服務(wù)器資源的消耗,降低Web服務(wù)器的并發(fā)連接數(shù)目,這就需要將大型任務(wù)的提交和執(zhí)行分開,使服務(wù)器接受任務(wù)后立即斷開與客戶端的連接,減少服務(wù)器的并發(fā)連接數(shù),而任務(wù)則推遲到服務(wù)器資源許可時(shí)執(zhí)行,以抑制服務(wù)器資源的峰值消耗。
為盡量減少耗時(shí)操作對執(zhí)行的影響,本文提出了異步任務(wù)的處理,使用多線程來管理耗時(shí)任務(wù),作為后臺(tái)進(jìn)程執(zhí)行;同時(shí)把任務(wù)信息都持久化在數(shù)據(jù)庫中,保證了異步任務(wù)處理的靈活性、可靠性。
1 多線程
1.1線程池
一個(gè)線程是程序中的一條執(zhí)行流,是操作系統(tǒng)分配處理器的基本單位。并發(fā)是程序中多條執(zhí)行流的同時(shí)推進(jìn),多任務(wù)并發(fā)對應(yīng)多線程并發(fā)[2]。
但是為每個(gè)任務(wù)創(chuàng)建一個(gè)線程,當(dāng)任務(wù)完成時(shí)撤消對應(yīng)的線程存在明顯的缺陷。線程的創(chuàng)建需要一定的時(shí)間,給任務(wù)請求的響應(yīng)帶來延遲,線程的創(chuàng)建和撤消也給操作系統(tǒng)帶來額外的管理負(fù)擔(dān),若頻繁“創(chuàng)建和撤消”,則將明顯增加系統(tǒng)的額外開銷。為有效降低線程重復(fù)創(chuàng)建和撤銷方面的開支可以采用線程池技術(shù)。
線程池技術(shù)提供了一種較好的解決方案[3]:系統(tǒng)維護(hù)由若干個(gè)線程組成的線程池。當(dāng)有任務(wù)請求到達(dá)時(shí),由池中的一個(gè)線程為之運(yùn)行,在任務(wù)完成后不是將該線程撤消而是將其歸還線程池,使之能夠?yàn)楹罄m(xù)到達(dá)的任務(wù)服務(wù);若線程池中沒有空閑的線程,則任務(wù)進(jìn)入等待狀態(tài)直到有空閑的線程。
1.2 Java中的線程池實(shí)現(xiàn)機(jī)制
Java在語言級(jí)實(shí)現(xiàn)了功能豐富的多線程編程機(jī)制[4],對線程池的建立和維護(hù)提供了強(qiáng)大的支持。特別在JDK1.5及以后的版本中,任務(wù)執(zhí)行抽象的首選不再是Thread,而是Executor。Executor雖是一個(gè)簡單的接口,但它提供了異步任務(wù)執(zhí)行框架并支持多種不同類型的任務(wù)執(zhí)行策略,ExecutorService接口和ScheduledExecutorService接口對Executor進(jìn)行了擴(kuò)展,添加了管理線程執(zhí)行和調(diào)度線程池的若干方法。通過Executors工具類提供的靜態(tài)工廠方法可以創(chuàng)建符合特定需求的基于線程池執(zhí)行框架。
newChachedThreadPool()方法用于創(chuàng)建可緩存線程池的執(zhí)行框架。當(dāng)新的請求任務(wù)到達(dá)時(shí),執(zhí)行框架將盡可能地重用池中的空閑線程,若此時(shí)池中沒有空閑線程,則添加新線程,這個(gè)方法對池的大小沒有限制。另一方面,該執(zhí)行框架能夠自動(dòng)回收空閑時(shí)間超過60 s的線程,以合理使用系統(tǒng)資源。對于執(zhí)行大量短異步任務(wù)的程序而言,這種方式的線程池通??商岣咝阅堋?br />
newFixedThreadPool(int nThreads)方法建立的執(zhí)行框架中的線程池具有固定數(shù)量的線程。每提交一個(gè)任務(wù)它就創(chuàng)建一個(gè)線程,直到達(dá)到池的限定值nThreads,線程池的長度不再變化,新到達(dá)的任務(wù)在一個(gè)遵循先來先服務(wù)(FIFS)規(guī)則的無界隊(duì)列中等待執(zhí)行。
newScheduledThreadPool(int nThreads)方法建立的執(zhí)行框架中的線程池也是定長的,它支持定時(shí)的以及周期性的任務(wù)的執(zhí)行。
這些工廠方法返回的Executor 都是ThreadPoolExecutor()類的常用實(shí)例,能滿足大部分線程池的應(yīng)用需求。
2 設(shè)計(jì)思路
為保證異步任務(wù)處理的靈活性和可靠性,本文設(shè)計(jì)的思路為:任務(wù)持久化+Java線程池+任務(wù)調(diào)度。
2.1 任務(wù)持久化
將待處理的任務(wù)信息保存在可信任的數(shù)據(jù)庫中,同時(shí)要確保當(dāng)任務(wù)處理服務(wù)器出問題后這些未執(zhí)行成功、或未開始執(zhí)行的任務(wù)不會(huì)被丟失。
2.2 任務(wù)調(diào)度
當(dāng)任務(wù)信息都持久化在數(shù)據(jù)庫中之后,需要將這些信息讀取出來執(zhí)行具體的業(yè)務(wù)邏輯操作,本文通過ScheduledExecutorService來實(shí)現(xiàn)對任務(wù)的循環(huán)調(diào)度,例如可采取每隔2 min掃描一次待處理任務(wù)列表,若有記錄則提取出來執(zhí)行。
3 具體實(shí)現(xiàn)
異步任務(wù)處理中各組成部分在運(yùn)行過程中的調(diào)用關(guān)系如圖1。
當(dāng)客戶端訪問服務(wù)器時(shí),有耗時(shí)操作的任務(wù),則把該任務(wù)放入數(shù)據(jù)庫中。服務(wù)器每隔一段時(shí)間輪詢存放待處理任務(wù)的表,若表中有任務(wù),則任務(wù)調(diào)度線程池采用多線程機(jī)制來執(zhí)行該任務(wù)。任務(wù)執(zhí)行成功后,刪除待處理任務(wù)表中的該任務(wù)信息,否則把該任務(wù)信息更新到任務(wù)失敗表,進(jìn)行人工干預(yù)。
3.1 任務(wù)數(shù)據(jù)表
建兩張表,一張task表,用來存放待處理的任務(wù);一張task_fail表用來存放失敗的任務(wù)。兩張表的結(jié)構(gòu)一樣,結(jié)構(gòu)如表1所示。
task表主要用來保存所有待處理的任務(wù),每條任務(wù)信息屬于一種任務(wù)類型,由task_handle字段標(biāo)識(shí),任務(wù)類型值為該類型任務(wù)的具體實(shí)現(xiàn)類名。task_params 字段提供了執(zhí)行該任務(wù)需要的所有參數(shù),為字符串,需要在具體任務(wù)實(shí)現(xiàn)類中解析。handle_time字段提供了任務(wù)待執(zhí)行的日期。
每條任務(wù)被執(zhí)行之后根據(jù)執(zhí)行情況進(jìn)行刪除或者更新操作,任務(wù)成功執(zhí)行,就從task表中刪除該記錄。Task_fail表主要用來保存執(zhí)行失敗、需要人工干預(yù)的任務(wù)記錄,記錄來源于task表。
3.2 任務(wù)處理過程
任務(wù)處理的過程如圖2所示。
(1)當(dāng)服務(wù)器啟動(dòng)后,根據(jù)調(diào)度策略每隔一段時(shí)間調(diào)度一次,而不管上次調(diào)度是否已經(jīng)執(zhí)行完畢;任務(wù)輪詢主線程查詢task表,從中取出一定條的數(shù)據(jù)。
(2)對從task表中查詢出來的每條記錄,將該條記錄的ID放進(jìn)本地cache中,根據(jù)記錄中task_handle和task_params字段的值獲得處理該任務(wù)對應(yīng)的類及參數(shù)值,在異步線程池中利用反射機(jī)制來執(zhí)行任務(wù)。
(3)具體處理類對該任務(wù)處理完成之后返回結(jié)果,系統(tǒng)對tasks表中該條記錄進(jìn)行刪除,同時(shí)將cache中的記錄ID清除、避免cache無限膨脹。若任務(wù)處理失敗,系統(tǒng)就把該條記錄插入到task_fail表中,以備人工干預(yù)。
(4)當(dāng)?shù)竭_(dá)下次執(zhí)行時(shí)間時(shí),再次掃描tasks表,循環(huán)上面的邏輯。不過這次在任務(wù)處理之前,要先在本地cache中查詢是否該條記錄正在被處理,若cache中已經(jīng)存在該條記錄就無需處理了,以避免一些任務(wù)被重復(fù)并發(fā)執(zhí)行。
3.3 任務(wù)輪詢主線程的實(shí)現(xiàn)
Executor的靜態(tài)方法生成一個(gè)固定的線程池。線程池的線程是不會(huì)釋放的,即使它空閑,這就會(huì)產(chǎn)生性能問題,如果線程池的大小為200,當(dāng)全部使用完畢后,所有的線程會(huì)繼續(xù)留在池中,相應(yīng)的內(nèi)存和線程切換都會(huì)增加。如果要避免這個(gè)問題,就必須直接使用ThreadPoolExecutor()來構(gòu)造,設(shè)置“最大線程數(shù)”、“最小線程數(shù)”和“空閑線程存活的時(shí)間”。
為了線程池能按時(shí)間計(jì)劃來執(zhí)行任務(wù),允許用戶設(shè)定計(jì)劃執(zhí)行任務(wù)的時(shí)間,就要使用newScheduledThreadPool(int nThreads)方法返回ThreadPoolExecutor類的實(shí)例。參數(shù)nThreads是設(shè)定線程池中線程的最小數(shù)目,當(dāng)任務(wù)較多時(shí),線程池會(huì)自動(dòng)創(chuàng)建更多的工作線程來執(zhí)行任務(wù)。其關(guān)鍵代碼如下:
int nThreads=4 ; //指定線程池尺寸
//創(chuàng)建一個(gè)支持定時(shí)及周期性的任務(wù)執(zhí)行的線程池實(shí)例exec,池中含nThreads個(gè)線程
Executor exec=Executors. newScheduledThreadPool(nThreads);
Runnable task = new Runnable() {
public void run() {
//查詢數(shù)據(jù)庫task表,有數(shù)據(jù)且執(zhí)行時(shí)間到了
//則另一個(gè)線程來執(zhí)行查詢到的任務(wù)
}
};
//1 min后運(yùn)行,并每隔2 min運(yùn)行一次
exec.scheduleAtFixedRate(task,60,60×2,TimeUnit.SECONDS);
3.4 執(zhí)行任務(wù)線程池的實(shí)現(xiàn)
執(zhí)行任務(wù)的線程池采用newFixedThreadPool(int nThreads)方法建立,線程池具有固定數(shù)量。關(guān)鍵代碼如下:
ExecutorService exec = Executors.newFixedThreadPool(2);
Runnable run = new Runnable() {
public void run() {
//執(zhí)行任務(wù)
}
};
exec.execute(run);
對于任務(wù)的生產(chǎn)者,只需要向Task表中insert記錄即可,操作簡單。待執(zhí)行任務(wù)信息在可靠數(shù)據(jù)庫中保存,即使任務(wù)處理出了問題也不會(huì)讓未處理的任務(wù)信息丟失。
本文利用Executor接口提供的異步任務(wù)執(zhí)行框架和任務(wù)執(zhí)行策略,實(shí)現(xiàn)多任務(wù)的執(zhí)行。在為具體應(yīng)用線程池時(shí)往往需要根據(jù)應(yīng)用的需求和處理任務(wù)的特點(diǎn)來優(yōu)化線程池的使用,設(shè)置合適的“最大線程數(shù)”、“最小線程數(shù)”和“空閑線程存活的時(shí)間”,采用不同的策略調(diào)整線程池的工作線程數(shù),才能達(dá)到最好的效果。
評(píng)論