基礎(chǔ)準(zhǔn)備
ResponseMsg
TaskService
阻塞調(diào)用
Callable異步調(diào)用
DeferredResult異步調(diào)用
后記
大家都知道,Callable和DeferredResult可以用來進(jìn)行異步請求處理。利用它們,我們可以異步生成返回值,在具體處理的過程中,我們直接在controller中返回相應(yīng)的Callable或者DeferredResult,在這之后,servlet線程將被釋放,可用于其他連接;DeferredResult另外會(huì)有線程來進(jìn)行結(jié)果處理,并setResult。
基礎(chǔ)準(zhǔn)備
在正式開始之前,我們先做一點(diǎn)準(zhǔn)備工作,在項(xiàng)目中新建了一個(gè)base模塊。其中包含一些提供基礎(chǔ)支持的java類,在其他模塊中可能會(huì)用到。
ResponseMsg
我們定義了一個(gè)ResponseMsg的實(shí)體類來作為我們的返回值類型:
@Data @NoArgsConstructor @AllArgsConstructor publicclassResponseMsg{ privateintcode; privateStringmsg; privateTdata; }
非常簡單,里面包含了code、msg和data三個(gè)字段,其中data為泛型類型。另外類的注解Data、NoArgsConstructor和AllArgsConstructor都是lombok提供的簡化我們開發(fā)的,主要功能分別是,為我們的類生成set和get方法,生成無參構(gòu)造器和生成全參構(gòu)造器。
使用idea進(jìn)行開發(fā)的童鞋可以裝一下lombok的支持插件。另外,lombok的依賴參見:
org.projectlombok lombok-maven 1.16.16.0 pom
TaskService
我們建立了一個(gè)TaskService,用來為阻塞調(diào)用和Callable調(diào)用提供實(shí)際結(jié)果處理的。代碼如下:
@Service
publicclassTaskService{
privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskService.class);
publicResponseMsggetResult(){
log.info("任務(wù)開始執(zhí)行,持續(xù)等待中...");
try{
Thread.sleep(30000L);
}catch(InterruptedExceptione){
e.printStackTrace();
}
log.info("任務(wù)處理完成");
returnnewResponseMsg(0,"操作成功","success");
}
}
可以看到,里面實(shí)際提供服務(wù)的是getResult方法,這邊直接返回一個(gè)new ResponseMsg(0,“操作成功”,“success”)。但是其中又特意讓它sleep了30秒,模擬一個(gè)耗時(shí)較長的請求。
阻塞調(diào)用
平時(shí)我們用的最普遍的還是阻塞調(diào)用,通常請求的處理時(shí)間較短,在并發(fā)量較小的情況下,使用阻塞調(diào)用問題也不是很大。 阻塞調(diào)用實(shí)現(xiàn)非常簡單,我們首先新建一個(gè)模塊blockingtype,里面只包含一個(gè)controller類,用來接收請求并利用TaskService來獲取結(jié)果。
@RestController
publicclassBlockController{
privatestaticfinalLoggerlog=LoggerFactory.getLogger(BlockController.class);
@Autowired
privateTaskServicetaskService;
@RequestMapping(value="/get",method=RequestMethod.GET)
publicResponseMsggetResult(){
log.info("接收請求,開始處理...");
ResponseMsgresult=taskService.getResult();
log.info("接收任務(wù)線程完成并退出");
returnresult;
}
}
我們請求的是getResult方法,其中調(diào)用了taskService,這個(gè)taskService我們是注入得到的。關(guān)于怎么跨模塊注入的,其實(shí)也非常簡單,在本模塊,加入對其他模塊的依賴就可以了。比如這里我們在blockingtype的pom.xml文件中加入對base模塊的依賴:
com.sunny base 1.0-SNAPSHOT
然后我們看一下實(shí)際調(diào)用效果,這里我們設(shè)置端口號為8080,啟動(dòng)日志如下:
2018-06-2419:02:48.514INFO11207---[main]com.sunny.BlockApplication:StartingBlockApplicationonxdeMacBook-Pro.localwithPID11207(/Users/zsunny/IdeaProjects/asynchronoustask/blockingtype/target/classesstartedbyzsunnyin/Users/zsunny/IdeaProjects/asynchronoustask) 2018-06-2419:02:48.519INFO11207---[main]com.sunny.BlockApplication:Noactiveprofileset,fallingbacktodefaultprofiles:default 2018-06-2419:02:48.762INFO11207---[main]ationConfigEmbeddedWebApplicationContext:Refreshingorg.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629:startupdate[SunJun2419:02:48CST2018];rootofcontexthierarchy 2018-06-2419:02:50.756INFO11207---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatinitializedwithport(s):8080(http) 2018-06-241950.778INFO11207---[main]o.apache.catalina.core.StandardService:Startingservice[Tomcat] 2018-06-241950.780INFO11207---[main]org.apache.catalina.core.StandardEngine:StartingServletEngine:ApacheTomcat/8.5.23 2018-06-241950.922INFO11207---[ost-startStop-1]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpringembeddedWebApplicationContext 2018-06-241950.922INFO11207---[ost-startStop-1]o.s.web.context.ContextLoader:RootWebApplicationContext:initializationcompletedin2200ms 2018-06-241951.156INFO11207---[ost-startStop-1]o.s.b.w.servlet.ServletRegistrationBean:Mappingservlet:'dispatcherServlet'to[/] 2018-06-241951.162INFO11207---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'characterEncodingFilter'to:[/*] 2018-06-241951.163INFO11207---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'hiddenHttpMethodFilter'to:[/*] 2018-06-241951.163INFO11207---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'httpPutFormContentFilter'to:[/*] 2018-06-241951.163INFO11207---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'requestContextFilter'to:[/*] 2018-06-241951.620INFO11207---[main]s.w.s.m.m.a.RequestMappingHandlerAdapter:Lookingfor@ControllerAdvice:org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629:startupdate[SunJun241948CST2018];rootofcontexthierarchy 2018-06-241951.724INFO11207---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/get],methods=[GET]}"ontopubliccom.sunny.entity.ResponseMsgcom.sunny.controller.BlockController.getResult() 2018-06-241951.730INFO11207---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error]}"ontopublicorg.springframework.http.ResponseEntity >org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest) 2018-06-241951.731INFO11207---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error],produces=[text/html]}"ontopublicorg.springframework.web.servlet.ModelAndVieworg.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse) 2018-06-241951.780INFO11207---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/webjars/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-241951.780INFO11207---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-241951.838INFO11207---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**/favicon.ico]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler] 2018-06-241952.126INFO11207---[main]o.s.j.e.a.AnnotationMBeanExporter:RegisteringbeansforJMXexposureonstartup 2018-06-241952.205INFO11207---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatstartedonport(s):8080(http) 2018-06-241952.211INFO11207---[main]com.sunny.BlockApplication:StartedBlockApplicationin5.049seconds(JVMrunningfor6.118)
可以看到順利啟動(dòng)了,那么我們就來訪問一下:
http://localhost:8080/get
等待了大概30秒左右,得到j(luò)son數(shù)據(jù):
{"code":0,"msg":"操作成功","data":"success"}

然后我們來看看控制臺(tái)的日志:
2018-06-2419:04:07.315INFO11207---[nio-8080-exec-1]com.sunny.controller.BlockController:接收請求,開始處理... 2018-06-2419:04:07.316INFO11207---[nio-8080-exec-1]com.sunny.service.TaskService:任務(wù)開始執(zhí)行,持續(xù)等待中... 2018-06-2419:04:37.322INFO11207---[nio-8080-exec-1]com.sunny.service.TaskService:任務(wù)處理完成 2018-06-2419:04:37.322INFO11207---[nio-8080-exec-1]com.sunny.controller.BlockController:接收任務(wù)線程完成并退出
可以看到在“ResponseMsg result = taskService.getResult();”的時(shí)候是阻塞了大約30秒鐘,隨后才執(zhí)行它后面的打印語句“l(fā)og.info(“接收任務(wù)線程完成并退出”);”。
Callable異步調(diào)用
涉及到較長時(shí)間的請求處理的話,比較好的方式是用異步調(diào)用,比如利用Callable返回結(jié)果。異步主要表現(xiàn)在,接收請求的servlet可以不用持續(xù)等待結(jié)果產(chǎn)生,而可以被釋放去處理其他事情。當(dāng)然,在調(diào)用者來看的話,其實(shí)還是表現(xiàn)在持續(xù)等待30秒。這有利于服務(wù)端提供更大的并發(fā)處理量。
這里我們新建一個(gè)callabledemo模塊,在這個(gè)模塊中,我們一樣只包含一個(gè)TaskController,另外也是需要加入base模塊的依賴。只不過這里我們的返回值不是ResponseMsg類型了,而是一個(gè)Callable類型。
@RestController
publicclassTaskController{
privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskController.class);
@Autowired
privateTaskServicetaskService;
@RequestMapping(value="/get",method=RequestMethod.GET)
publicCallable>getResult(){
log.info("接收請求,開始處理...");
Callable>result=(()->{
returntaskService.getResult();
});
log.info("接收任務(wù)線程完成并退出");
returnresult;
}
}
在里面,我們創(chuàng)建了一個(gè)Callable類型的變量result,并實(shí)現(xiàn)了它的call方法,在call方法中,我們也是調(diào)用taskService的getResult方法得到返回值并返回。
下一步我們就運(yùn)行一下這個(gè)模塊,這里我們在模塊的application.yml中設(shè)置端口號為8081:
server: port:8081
啟動(dòng),可以看到控制臺(tái)的消息:
2018-06-2419:38:14.658INFO11226---[main]com.sunny.CallableApplication:StartingCallableApplicationonxdeMacBook-Pro.localwithPID11226(/Users/zsunny/IdeaProjects/asynchronoustask/callabledemo/target/classesstartedbyzsunnyin/Users/zsunny/IdeaProjects/asynchronoustask)
2018-06-2419:38:14.672INFO11226---[main]com.sunny.CallableApplication:Noactiveprofileset,fallingbacktodefaultprofiles:default
2018-06-2419:38:14.798INFO11226---[main]ationConfigEmbeddedWebApplicationContext:Refreshingorg.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629:startupdate[SunJun2419:38:14CST2018];rootofcontexthierarchy
2018-06-2419:38:16.741INFO11226---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatinitializedwithport(s):8081(http)
2018-06-241916.762INFO11226---[main]o.apache.catalina.core.StandardService:Startingservice[Tomcat]
2018-06-241916.764INFO11226---[main]org.apache.catalina.core.StandardEngine:StartingServletEngine:ApacheTomcat/8.5.23
2018-06-241916.918INFO11226---[ost-startStop-1]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpringembeddedWebApplicationContext
2018-06-241916.919INFO11226---[ost-startStop-1]o.s.web.context.ContextLoader:RootWebApplicationContext:initializationcompletedin2126ms
2018-06-241917.144INFO11226---[ost-startStop-1]o.s.b.w.servlet.ServletRegistrationBean:Mappingservlet:'dispatcherServlet'to[/]
2018-06-241917.149INFO11226---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'characterEncodingFilter'to:[/*]
2018-06-241917.150INFO11226---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'hiddenHttpMethodFilter'to:[/*]
2018-06-241917.150INFO11226---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'httpPutFormContentFilter'to:[/*]
2018-06-241917.150INFO11226---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'requestContextFilter'to:[/*]
2018-06-241917.632INFO11226---[main]s.w.s.m.m.a.RequestMappingHandlerAdapter:Lookingfor@ControllerAdvice:org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@4445629:startupdate[SunJun241914CST2018];rootofcontexthierarchy
2018-06-241917.726INFO11226---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/get],methods=[GET]}"ontopublicjava.util.concurrent.Callable>com.sunny.controller.TaskController.getResult()
2018-06-241917.731INFO11226---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error]}"ontopublicorg.springframework.http.ResponseEntity>org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest)
2018-06-241917.733INFO11226---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error],produces=[text/html]}"ontopublicorg.springframework.web.servlet.ModelAndVieworg.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse)
2018-06-241917.777INFO11226---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/webjars/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-241917.777INFO11226---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-241917.825INFO11226---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**/favicon.ico]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-241918.084INFO11226---[main]o.s.j.e.a.AnnotationMBeanExporter:RegisteringbeansforJMXexposureonstartup
2018-06-241918.176INFO11226---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatstartedonport(s):8081(http)
2018-06-241918.183INFO11226---[main]com.sunny.CallableApplication:StartedCallableApplicationin4.538seconds(JVMrunningfor5.327)
完美啟動(dòng)了,然后我們還是一樣,訪問一下:
http://localhost:8081/get
在大約等待了30秒左右,我們在瀏覽器上得到j(luò)son數(shù)據(jù):
{"code":0,"msg":"操作成功","data":"success"}

和阻塞調(diào)用的結(jié)果一樣——當(dāng)然一樣啦,都是同taskService中得到的結(jié)果。
然后我們看看控制臺(tái)的消息:
2018-06-2419:39:07.738INFO11226---[nio-8081-exec-1]com.sunny.controller.TaskController:接收請求,開始處理... 2018-06-2419:39:07.740INFO11226---[nio-8081-exec-1]com.sunny.controller.TaskController:接收任務(wù)線程完成并退出 2018-06-2419:39:07.753INFO11226---[MvcAsync1]com.sunny.service.TaskService:任務(wù)開始執(zhí)行,持續(xù)等待中... 2018-06-2419:39:37.756INFO11226---[MvcAsync1]com.sunny.service.TaskService:任務(wù)處理完成
很顯然,這里的消息出現(xiàn)的順序和阻塞模式有所不同了,這里在“接收請求,開始處理…”之后直接打印了“接收任務(wù)線程完成并退出”。而不是先出現(xiàn)“任務(wù)處理完成”后再出現(xiàn)“接收任務(wù)線程完成并退出”。
這就說明,這里沒有阻塞在從taskService中獲得數(shù)據(jù)的地方,controller中直接執(zhí)行后面的部分(這里可以做其他很多事,不僅僅是打印日志)。
DeferredResult異步調(diào)用
前面鋪墊了那么多,還是主要來說DeferredResult的;和Callable一樣,DeferredResult也是為了支持異步調(diào)用。兩者的主要差異,Sunny覺得主要在DeferredResult需要自己用線程來處理結(jié)果setResult,而Callable的話不需要我們來維護(hù)一個(gè)結(jié)果處理線程。
總體來說,Callable的話更為簡單,同樣的也是因?yàn)楹唵?,靈活性不夠;相對地,DeferredResult更為復(fù)雜一些,但是又極大的靈活性。在可以用Callable的時(shí)候,直接用Callable;而遇到Callable沒法解決的場景的時(shí)候,可以嘗試使用DeferredResult。
這里Sunny將會(huì)設(shè)計(jì)兩個(gè)DeferredResult使用場景。
場景一:
創(chuàng)建一個(gè)持續(xù)在隨機(jī)間隔時(shí)間后從任務(wù)隊(duì)列中獲取任務(wù)的線程
訪問controller中的方法,創(chuàng)建一個(gè)DeferredResult,設(shè)定超時(shí)時(shí)間和超時(shí)返回對象
設(shè)定DeferredResult的超時(shí)回調(diào)方法和完成回調(diào)方法
將DeferredResult放入任務(wù)中,并將任務(wù)放入任務(wù)隊(duì)列
步驟1中的線程獲取到任務(wù)隊(duì)列中的任務(wù),并產(chǎn)生一個(gè)隨機(jī)結(jié)果返回
場景其實(shí)非常簡單,接下來我們來看看具體的實(shí)現(xiàn)。首先,我們還是來看任務(wù)實(shí)體類是怎么樣的。
/**
*任務(wù)實(shí)體類
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
publicclassTask{
privateinttaskId;
privateDeferredResult>taskResult;
@Override
publicStringtoString(){
return"Task{"+
"taskId="+taskId+
",taskResult"+"{responseMsg="+taskResult.getResult()+"}"+
'}';
}
}
看起來非常簡單,成員變量又taskId和taskResult,前者是int類型,后者為我們的DeferredResult類型,它的泛型類型為ResponseMsg,注意這里用到ResponseMsg,所以也需要導(dǎo)入base模塊的依賴。
另外注解之前已經(jīng)說明了,不過這里再提一句,@Data注解也包含了toString的重寫,但是這里為了知道具體的ResponseMsg的內(nèi)容,Sunny特意手動(dòng)重寫。
看完Task類型,我們再來看看任務(wù)隊(duì)列。
@Component
publicclassTaskQueue{
privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskQueue.class);
privatestaticfinalintQUEUE_LENGTH=10;
privateBlockingQueuequeue=newLinkedBlockingDeque<>(QUEUE_LENGTH);
privateinttaskId=0;
/**
*加入任務(wù)
*@paramdeferredResult
*/
publicvoidput(DeferredResult>deferredResult){
taskId++;
log.info("任務(wù)加入隊(duì)列,id為:{}",taskId);
queue.offer(newTask(taskId,deferredResult));
}
/**
*獲取任務(wù)
*@return
*@throwsInterruptedException
*/
publicTasktake()throwsInterruptedException{
Tasktask=queue.poll();
log.info("獲得任務(wù):{}",task);
returntask;
}
}
這里我們將它作為一個(gè)bean,之后會(huì)在其他bean中注入,這里實(shí)際的隊(duì)列為成員變量queue,它是LinkedBlockingDeque類型的。還有一個(gè)成員變量為taskId,是用于自動(dòng)生成任務(wù)id的,并且在加入任務(wù)的方法中實(shí)現(xiàn)自增,以確保每個(gè)任務(wù)的id唯一性。方法的話又put和take方法,分別用于向隊(duì)列中添加任務(wù)和取出任務(wù);其中,對queue的操作,分別用了offer和poll,這樣是實(shí)現(xiàn)一個(gè)非阻塞的操作,并且在隊(duì)列為空和隊(duì)列已滿的情況下不會(huì)拋出異常。
另外,大家實(shí)現(xiàn)的時(shí)候,可以考慮使用ConcurrentLinkedQueue來高效處理并發(fā),因?yàn)樗鼘儆跓o界非阻塞隊(duì)列,使用過程中需要考慮可能造成的OOM問題。Sunny這里選擇阻塞隊(duì)列LinkedBlockingDeque,它底層使用加鎖進(jìn)行了同步;但是這里使用了TaskQueue進(jìn)行封裝,處理過程中有一些額外操作,調(diào)用時(shí)需要加鎖以防發(fā)生某些意料之外的問題。
然后我們來看步驟1中的,啟動(dòng)一個(gè)持續(xù)從任務(wù)隊(duì)列中獲取任務(wù)的線程的具體實(shí)現(xiàn)。
@Component
publicclassTaskExecute{
privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskExecute.class);
privatestaticfinalRandomrandom=newRandom();
//默認(rèn)隨機(jī)結(jié)果的長度
privatestaticfinalintDEFAULT_STR_LEN=10;
//用于生成隨機(jī)結(jié)果
privatestaticfinalStringstr="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
@Autowired
privateTaskQueuetaskQueue;
/**
*初始化啟動(dòng)
*/
@PostConstruct
publicvoidinit(){
log.info("開始持續(xù)處理任務(wù)");
newThread(this::execute).start();
}
/**
*持續(xù)處理
*返回執(zhí)行結(jié)果
*/
privatevoidexecute(){
while(true){
try{
//取出任務(wù)
Tasktask;
synchronized(taskQueue){
task=taskQueue.take();
}
if(task!=null){
//設(shè)置返回結(jié)果
StringrandomStr=getRandomStr(DEFAULT_STR_LEN);
ResponseMsgresponseMsg=newResponseMsg(0,"success",randomStr);
log.info("返回結(jié)果:{}",responseMsg);
task.getTaskResult().setResult(responseMsg);
}
inttime=random.nextInt(10);
log.info("處理間隔:{}秒",time);
Thread.sleep(time*1000L);
}catch(InterruptedExceptione){
e.printStackTrace();
}
}
}
/**
*獲取長度為len的隨機(jī)串
*@paramlen
*@return
*/
privateStringgetRandomStr(intlen){
intmaxInd=str.length();
StringBuildersb=newStringBuilder();
intind;
for(inti=0;i
這里,我們注入了TaskQueue,成員變量比較簡單并且有注釋,不再說明,主要來看方法。先看一下最后一個(gè)方法getRandomStr,很顯然,這是一個(gè)獲得長度為len的隨機(jī)串的方法,訪問限定為private,為類中其他方法服務(wù)的。然后我們看init方法,它執(zhí)行的其實(shí)就是開啟了一個(gè)線程并且執(zhí)行execute方法,注意一下它上面的@PostContruct注解,這個(gè)注解就是在這個(gè)bean初始化的時(shí)候就執(zhí)行這個(gè)方法。
所以我們需要關(guān)注的實(shí)際邏輯在execute方法中??梢钥吹剑趀xecute方法中,用了一個(gè)while(true)來保證線程持續(xù)運(yùn)行。因?yàn)槭遣l(fā)環(huán)境下,考慮對taskQueue加鎖,從中取出任務(wù);如果任務(wù)不為空,獲取用getRandomStr生成一個(gè)隨機(jī)結(jié)果并用setResult方法進(jìn)行返回。
最后可以看到,利用random生成來一個(gè)[0,10)的隨機(jī)數(shù),并讓線程sleep相應(yīng)的秒數(shù)。這里注意一下,需要設(shè)定一個(gè)時(shí)間間隔,否則,先線程持續(xù)跑會(huì)出現(xiàn)CPU負(fù)載過高的情況。
接下來我們就看看controller是如何處理的。
@RestController
publicclassTaskController{
privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskController.class);
//超時(shí)結(jié)果
privatestaticfinalResponseMsgOUT_OF_TIME_RESULT=newResponseMsg<>(-1,"超時(shí)","outoftime");
//超時(shí)時(shí)間
privatestaticfinallongOUT_OF_TIME=3000L;
@Autowired
privateTaskQueuetaskQueue;
@RequestMapping(value="/get",method=RequestMethod.GET)
publicDeferredResult>getResult(){
log.info("接收請求,開始處理...");
//建立DeferredResult對象,設(shè)置超時(shí)時(shí)間,以及超時(shí)返回超時(shí)結(jié)果
DeferredResult>result=newDeferredResult<>(OUT_OF_TIME,OUT_OF_TIME_RESULT);
result.onTimeout(()->{
log.info("調(diào)用超時(shí)");
});
result.onCompletion(()->{
log.info("調(diào)用完成");
});
//并發(fā),加鎖
synchronized(taskQueue){
taskQueue.put(result);
}
log.info("接收任務(wù)線程完成并退出");
returnresult;
}
}
這里我們同樣注入了taskQueue。請求方法就只有一個(gè)getResult,返回值為DeferredResult。這里我們首先創(chuàng)建了DeferredResult對象result并且設(shè)定超時(shí)時(shí)間和超時(shí)返回結(jié)果;隨后設(shè)定result的onTimeout和onCompletion方法,其實(shí)就是傳入兩個(gè)Runnable對象來實(shí)現(xiàn)回調(diào)的效果;之后就是加鎖并且將result加入任務(wù)隊(duì)列中。
總體來說,場景不算非常復(fù)雜,看到這里大家應(yīng)該都能基本了解了。然后我們來跑一下測試一下。
我們在application.yml中設(shè)定端口為8082:
server:
port:8082
啟動(dòng)模塊,控制臺(tái)信息如下:
2018-06-2421:49:28.815INFO11322---[main]com.sunny.DeferredResultApplication:StartingDeferredResultApplicationonxdeMacBook-Pro.localwithPID11322(/Users/zsunny/IdeaProjects/asynchronoustask/deferredresultdemo/target/classesstartedbyzsunnyin/Users/zsunny/IdeaProjects/asynchronoustask)
2018-06-2421:49:28.821INFO11322---[main]com.sunny.DeferredResultApplication:Noactiveprofileset,fallingbacktodefaultprofiles:default
2018-06-2421:49:29.010INFO11322---[main]ationConfigEmbeddedWebApplicationContext:Refreshingorg.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@5ccddd20:startupdate[SunJun2421:49:28CST2018];rootofcontexthierarchy
2018-06-2421:49:30.971INFO11322---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatinitializedwithport(s):8082(http)
2018-06-242130.980INFO11322---[main]o.apache.catalina.core.StandardService:Startingservice[Tomcat]
2018-06-242130.981INFO11322---[main]org.apache.catalina.core.StandardEngine:StartingServletEngine:ApacheTomcat/8.5.23
2018-06-242131.062INFO11322---[ost-startStop-1]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpringembeddedWebApplicationContext
2018-06-242131.063INFO11322---[ost-startStop-1]o.s.web.context.ContextLoader:RootWebApplicationContext:initializationcompletedin2066ms
2018-06-242131.207INFO11322---[ost-startStop-1]o.s.b.w.servlet.ServletRegistrationBean:Mappingservlet:'dispatcherServlet'to[/]
2018-06-242131.212INFO11322---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'characterEncodingFilter'to:[/*]
2018-06-242131.213INFO11322---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'hiddenHttpMethodFilter'to:[/*]
2018-06-242131.213INFO11322---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'httpPutFormContentFilter'to:[/*]
2018-06-242131.213INFO11322---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'requestContextFilter'to:[/*]
2018-06-242131.247INFO11322---[main]com.sunny.bean.TaskExecute:開始持續(xù)處理任務(wù)
2018-06-242131.249INFO11322---[Thread-8]com.sunny.bean.TaskQueue:獲得任務(wù):null
2018-06-242131.250INFO11322---[Thread-8]com.sunny.bean.TaskExecute:處理間隔:6秒
2018-06-242131.498INFO11322---[main]s.w.s.m.m.a.RequestMappingHandlerAdapter:Lookingfor@ControllerAdvice:org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@5ccddd20:startupdate[SunJun242128CST2018];rootofcontexthierarchy
2018-06-242131.572INFO11322---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/get],methods=[GET]}"ontopublicorg.springframework.web.context.request.async.DeferredResult>com.sunny.controller.TaskController.getResult()
2018-06-242131.576INFO11322---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error]}"ontopublicorg.springframework.http.ResponseEntity>org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest)
2018-06-242131.577INFO11322---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error],produces=[text/html]}"ontopublicorg.springframework.web.servlet.ModelAndVieworg.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse)
2018-06-242131.602INFO11322---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/webjars/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-242131.602INFO11322---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-242131.628INFO11322---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**/favicon.ico]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-242131.811INFO11322---[main]o.s.j.e.a.AnnotationMBeanExporter:RegisteringbeansforJMXexposureonstartup
2018-06-242131.892INFO11322---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatstartedonport(s):8082(http)
2018-06-242131.897INFO11322---[main]com.sunny.DeferredResultApplication:StartedDeferredResultApplicationin3.683seconds(JVMrunningfor4.873)
2018-06-242137.254INFO11322---[Thread-8]com.sunny.bean.TaskQueue:獲得任務(wù):null
2018-06-242137.254INFO11322---[Thread-8]com.sunny.bean.TaskExecute:處理間隔:6秒
首先程序完美啟動(dòng),這沒有問題,然后我們注意這幾條信息:
2018-06-2421:49:31.247INFO11322---[main]com.sunny.bean.TaskExecute:開始持續(xù)處理任務(wù)
2018-06-2421:49:31.249INFO11322---[Thread-8]com.sunny.bean.TaskQueue:獲得任務(wù):null
2018-06-2421:49:31.250INFO11322---[Thread-8]com.sunny.bean.TaskExecute:處理間隔:6秒
這說明我們TaskExecute中已經(jīng)成功啟動(dòng)了持續(xù)獲取任務(wù)的線程。
接著,我們還是訪問一下:
http://localhost:8082/get
這一回等待了若干秒就出現(xiàn)了結(jié)果:
{"code":0,"msg":"success","data":"CEUO2lmMJr"}

可以看到我們的隨機(jī)串是CEUO2lmMJr。再一次請求又會(huì)出現(xiàn)不同的隨機(jī)串。再看一下我們控制臺(tái)的相關(guān)信息:
2018-06-2421:51:04.303INFO11322---[nio-8082-exec-1]com.sunny.controller.TaskController:接收請求,開始處理...
2018-06-2421:51:04.304INFO11322---[nio-8082-exec-1]com.sunny.bean.TaskQueue:任務(wù)加入隊(duì)列,id為:1
2018-06-2421:51:04.304INFO11322---[nio-8082-exec-1]com.sunny.controller.TaskController:接收任務(wù)線程完成并退出
2018-06-2421:51:04.323INFO11322---[Thread-8]com.sunny.bean.TaskQueue:獲得任務(wù):Task{taskId=1,taskResult{responseMsg=null}}
2018-06-2421:51:04.323INFO11322---[Thread-8]com.sunny.bean.TaskExecute:返回結(jié)果:ResponseMsg(code=0,msg=success,data=CEUO2lmMJr)
也是符合我們的預(yù)期,請求進(jìn)來進(jìn)入隊(duì)列中,由TaskExecute獲取請求并進(jìn)行處理結(jié)果返回。
場景二
用戶發(fā)送請求到TaskController的getResult方法,該方法接收到請求,創(chuàng)建一個(gè)DeferredResult,設(shè)定超時(shí)時(shí)間和超時(shí)返回對象
設(shè)定DeferredResult的超時(shí)回調(diào)方法和完成回調(diào)方法,超時(shí)和完成都會(huì)將本次請求產(chǎn)生的DeferredResult從集合中remove
將DeferredResult放入集合中
另有一個(gè)TaskExecuteController,訪問其中一個(gè)方法,可取出集合中的等待返回的DeferredResult對象,并將傳入的參數(shù)設(shè)定為結(jié)果
首先我們來看看DeferredResult的集合類:
@Component
@Data
publicclassTaskSet{
privateSet>>set=newHashSet<>();
}
非常簡單,只包含了一個(gè)HashSet的成員變量。這里可以考慮用ConcurrentHashMap來實(shí)現(xiàn)高效并發(fā),Sunny這里簡單實(shí)用HashSet,配合加鎖實(shí)現(xiàn)并發(fā)處理。
然后我們看看發(fā)起調(diào)用的Controller代碼:
@RestController
publicclassTaskController{
privateLoggerlog=LoggerFactory.getLogger(TaskController.class);
//超時(shí)結(jié)果
privatestaticfinalResponseMsgOUT_OF_TIME_RESULT=newResponseMsg<>(-1,"超時(shí)","outoftime");
//超時(shí)時(shí)間
privatestaticfinallongOUT_OF_TIME=60000L;
@Autowired
privateTaskSettaskSet;
@RequestMapping(value="/get",method=RequestMethod.GET)
publicDeferredResult>getResult(){
log.info("接收請求,開始處理...");
//建立DeferredResult對象,設(shè)置超時(shí)時(shí)間,以及超時(shí)返回超時(shí)結(jié)果
DeferredResult>result=newDeferredResult<>(OUT_OF_TIME,OUT_OF_TIME_RESULT);
result.onTimeout(()->{
log.info("調(diào)用超時(shí),移除任務(wù),此時(shí)隊(duì)列長度為{}",taskSet.getSet().size());
synchronized(taskSet.getSet()){
taskSet.getSet().remove(result);
}
});
result.onCompletion(()->{
log.info("調(diào)用完成,移除任務(wù),此時(shí)隊(duì)列長度為{}",taskSet.getSet().size());
synchronized(taskSet.getSet()){
taskSet.getSet().remove(result);
}
});
//并發(fā),加鎖
synchronized(taskSet.getSet()){
taskSet.getSet().add(result);
}
log.info("加入任務(wù)集合,集合大小為:{}",taskSet.getSet().size());
log.info("接收任務(wù)線程完成并退出");
returnresult;
}
}
和場景一中有些類似,但是注意這里在onTimeout和onCompletion中都多了一個(gè)移除元素的操作,這也就是每次調(diào)用結(jié)束,需要將集合中的DeferredResult對象移除,即集合中保存的都是等待請求結(jié)果的DeferredResult對象。
然后我們看處理請求結(jié)果的Controller:
@RestController
publicclassTaskExecuteController{
privatestaticfinalLoggerlog=LoggerFactory.getLogger(TaskExecuteController.class);
@Autowired
privateTaskSettaskSet;
@RequestMapping(value="/set/{result}",method=RequestMethod.GET)
publicStringsetResult(@PathVariable("result")Stringresult){
ResponseMsgres=newResponseMsg<>(0,"success",result);
log.info("結(jié)果處理開始,得到輸入結(jié)果為:{}",res);
Set>>set=taskSet.getSet();
synchronized(set){
set.forEach((deferredResult)->{deferredResult.setResult(res);});
}
return"Successfullysetresult:"+result;
}
}
看起來非常簡單,只是做了兩個(gè)操作,接收得到的參數(shù)并利用參數(shù)生成一個(gè)ResponseMsg對象,隨后將集合中的所有DeferredResult都設(shè)定結(jié)果為根據(jù)參數(shù)生成的ResponseMsg對象。最后返回一個(gè)提示:成功設(shè)置結(jié)果…
好了,話不多說,我們來啟動(dòng)測試驗(yàn)證一下。我們說一下驗(yàn)證的過程,我們同時(shí)打開兩個(gè)請求,然后再設(shè)定一個(gè)結(jié)果,最后兩個(gè)請求都會(huì)得到這個(gè)結(jié)果。當(dāng)然同時(shí)多個(gè)或者一個(gè)請求也是一樣。這里有一個(gè)地方需要注意一下:
瀏覽器可能會(huì)對相同的url請求有緩存策略,也就是同時(shí)兩個(gè)標(biāo)簽向同一個(gè)url發(fā)送請求,瀏覽器只會(huì)先發(fā)送一個(gè)請求,等一個(gè)請求結(jié)束才會(huì)再發(fā)送另外一個(gè)請求。
這樣,我們考慮從兩個(gè)瀏覽器中發(fā)送請求:
localhost:8083/get
然后隨便找其中一個(gè),發(fā)送請求來設(shè)置結(jié)果:
http://localhost:8083/set/aaa
首先我們先啟動(dòng)模塊,可以從控制臺(tái)中看到完美啟動(dòng)管理了:
2018-06-2500:18:44.379INFO12688---[main]com.sunny.DeferredResultApplication:StartingDeferredResultApplicationonxdeMacBook-Pro.localwithPID12688(/Users/zsunny/IdeaProjects/asynchronoustask/deferredresultdemo2/target/classesstartedbyzsunnyin/Users/zsunny/IdeaProjects/asynchronoustask)
2018-06-2500:18:44.382INFO12688---[main]com.sunny.DeferredResultApplication:Noactiveprofileset,fallingbacktodefaultprofiles:default
2018-06-2500:18:44.489INFO12688---[main]ationConfigEmbeddedWebApplicationContext:Refreshingorg.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@96def03:startupdate[MonJun2500:18:44CST2018];rootofcontexthierarchy
2018-06-2500:18:45.650INFO12688---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatinitializedwithport(s):8083(http)
2018-06-250045.658INFO12688---[main]o.apache.catalina.core.StandardService:Startingservice[Tomcat]
2018-06-250045.659INFO12688---[main]org.apache.catalina.core.StandardEngine:StartingServletEngine:ApacheTomcat/8.5.23
2018-06-250045.722INFO12688---[ost-startStop-1]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpringembeddedWebApplicationContext
2018-06-250045.723INFO12688---[ost-startStop-1]o.s.web.context.ContextLoader:RootWebApplicationContext:initializationcompletedin1241ms
2018-06-250045.817INFO12688---[ost-startStop-1]o.s.b.w.servlet.ServletRegistrationBean:Mappingservlet:'dispatcherServlet'to[/]
2018-06-250045.821INFO12688---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'characterEncodingFilter'to:[/*]
2018-06-250045.821INFO12688---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'hiddenHttpMethodFilter'to:[/*]
2018-06-250045.821INFO12688---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'httpPutFormContentFilter'to:[/*]
2018-06-250045.821INFO12688---[ost-startStop-1]o.s.b.w.servlet.FilterRegistrationBean:Mappingfilter:'requestContextFilter'to:[/*]
2018-06-250046.150INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerAdapter:Lookingfor@ControllerAdvice:org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@96def03:startupdate[MonJun250044CST2018];rootofcontexthierarchy
2018-06-250046.197INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/get],methods=[GET]}"ontopublicorg.springframework.web.context.request.async.DeferredResult>com.sunny.controller.TaskController.getResult()
2018-06-250046.199INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/set/{result}],methods=[GET]}"ontopublicjava.lang.Stringcom.sunny.controller.TaskExecuteController.setResult(java.lang.String)
2018-06-250046.202INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error]}"ontopublicorg.springframework.http.ResponseEntity>org.springframework.boot.autoconfigure.web.BasicErrorController.error(javax.servlet.http.HttpServletRequest)
2018-06-250046.202INFO12688---[main]s.w.s.m.m.a.RequestMappingHandlerMapping:Mapped"{[/error],produces=[text/html]}"ontopublicorg.springframework.web.servlet.ModelAndVieworg.springframework.boot.autoconfigure.web.BasicErrorController.errorHtml(javax.servlet.http.HttpServletRequest,javax.servlet.http.HttpServletResponse)
2018-06-250046.237INFO12688---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/webjars/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-250046.238INFO12688---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-250046.262INFO12688---[main]o.s.w.s.handler.SimpleUrlHandlerMapping:MappedURLpath[/**/favicon.ico]ontohandleroftype[classorg.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2018-06-250046.362INFO12688---[main]o.s.j.e.a.AnnotationMBeanExporter:RegisteringbeansforJMXexposureonstartup
2018-06-250046.467INFO12688---[main]s.b.c.e.t.TomcatEmbeddedServletContainer:Tomcatstartedonport(s):8083(http)
2018-06-250046.472INFO12688---[main]com.sunny.DeferredResultApplication:StartedDeferredResultApplicationin2.675seconds(JVMrunningfor3.623)
完美啟動(dòng),接下來Sunny在火狐中發(fā)起一個(gè)請求

可以看到正在等待請求結(jié)果。隨后我們在谷歌瀏覽器中發(fā)起請求

兩個(gè)請求同時(shí)處于等待狀態(tài),這時(shí)候我們看一下控制臺(tái)信息:
2018-06-2500:22:34.642INFO12688---[nio-8083-exec-6]com.sunny.controller.TaskController:接收請求,開始處理...
2018-06-2500:22:34.642INFO12688---[nio-8083-exec-6]com.sunny.controller.TaskController:加入任務(wù)集合,集合大小為:1
2018-06-2500:22:34.642INFO12688---[nio-8083-exec-6]com.sunny.controller.TaskController:接收任務(wù)線程完成并退出
2018-06-2500:22:37.332INFO12688---[nio-8083-exec-7]com.sunny.controller.TaskController:接收請求,開始處理...
2018-06-2500:22:37.332INFO12688---[nio-8083-exec-7]com.sunny.controller.TaskController:加入任務(wù)集合,集合大小為:2
2018-06-2500:22:37.332INFO12688---[nio-8083-exec-7]com.sunny.controller.TaskController:接收任務(wù)線程完成并退出
可以看到兩個(gè)請求都已經(jīng)接收到了,并且加入了隊(duì)列。這時(shí)候,我們再發(fā)送一個(gè)設(shè)置結(jié)果的請求。

隨后我們查看兩個(gè)調(diào)用請求的頁面,發(fā)現(xiàn)頁面已經(jīng)不在等待狀態(tài)中了,都已經(jīng)得到了結(jié)果。


另外,再給大家展示一下超時(shí)的結(jié)果,即我們發(fā)起調(diào)用請求,但是不發(fā)起設(shè)置結(jié)果的請求,等待時(shí)間結(jié)束。

查看控制臺(tái)信息:
2018-06-2500:26:15.898INFO12688---[nio-8083-exec-4]com.sunny.controller.TaskController:接收請求,開始處理...
2018-06-2500:26:15.898INFO12688---[nio-8083-exec-4]com.sunny.controller.TaskController:加入任務(wù)集合,集合大小為:1
2018-06-2500:26:15.898INFO12688---[nio-8083-exec-4]com.sunny.controller.TaskController:接收任務(wù)線程完成并退出
2018-06-2500:27:16.014INFO12688---[nio-8083-exec-5]com.sunny.controller.TaskController:調(diào)用超時(shí),移除任務(wù),此時(shí)隊(duì)列長度為1
2018-06-2500:27:16.018INFO12688---[nio-8083-exec-5]com.sunny.controller.TaskController:調(diào)用完成,移除任務(wù),此時(shí)隊(duì)列長度為0
后記
想要完整代碼的童鞋,點(diǎn)這里:
https://gitee.com/sunnymore/asynchronous_task
-
吞吐量
+關(guān)注
關(guān)注
0文章
49瀏覽量
12659 -
代碼
+關(guān)注
關(guān)注
30文章
4956瀏覽量
73511 -
異步請求
+關(guān)注
關(guān)注
0文章
2瀏覽量
1198
原文標(biāo)題:提高系統(tǒng)吞吐量的一把利器:DeferredResult 到底有多強(qiáng)?
文章出處:【微信號:芋道源碼,微信公眾號:芋道源碼】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。
發(fā)布評論請先 登錄
如何提高CYBT-243053-02吞吐量?
如何利用NI LabVIEW技術(shù)提高測試系統(tǒng)的吞吐量?
如何提高VLD的吞吐量和執(zhí)行效率?
FF H1基于RDA的吞吐量優(yōu)化算法
防火墻術(shù)語-吞吐量
如何提高無線傳感器網(wǎng)絡(luò)的吞吐量
如何提高系統(tǒng)設(shè)計(jì)容量和吞吐量
debug 吞吐量的辦法
debug 吞吐量的辦法
如何讓接口吞吐量提升10多倍
影響ATE電源系統(tǒng)吞吐量的關(guān)鍵因素
DeferredResult異步請求處理 提高系統(tǒng)吞吐量的一把利器
評論