本篇文章給大家分享的是有關java中怎么實現異步處理,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

創新互聯主要從事網站制作、成都網站制作、網頁設計、企業做網站、公司建網站等業務。立足成都服務清水,十載網站建設經驗,價格優惠、服務專業,歡迎來電咨詢建站服務:028-86922220
1.DeferredResult 加線程池 (DeferredResult 提供了超時、錯誤處理,功能非常完善,再加上多線程處理請求效果很不錯)
2.新開個定時任務線程池 定時輪詢當前任務列表 超時就停止(需要自己維護任務列表)Hystrix就是這種方案
3.JDK9 可以采用CompletableFuture orTimeout、completeOnTimeout 方法處理 前者拋出異常后者返回默認值
總結,其實線程池統一設置超時這個需求本身就是偽需求,線程執行任務時間本身就是參差不齊的,而且這個控制權應該交給Runable或Callable內部業務處理,不同的業務處理超時、異常、報警等各不相同。CompletableFuture、ListenableFuture 、DeferredResult 的功能相當豐富,建議在多線程處理的場景多使用這些api。
具體實現:
DeferredResult 先建個工具類。調用方使用execute方法,傳入new的DeferredResultDTO(DeferredResultDTO只有msgId,也可以自定義一些成員變量方便后期業務擴展使用)
然后在其他線程業務處理完設置結果,調用setResult方法,傳入msgId相同的DeferredResultDTO和result對象
/**
* DeferredResult 工具類
*
* @author tiancong
* @date 2020/10/14 19:23
*/
@UtilityClass
@Slf4j
public class DeferredResultUtil {
private Map<DeferredResultDTO, DeferredResult<ResultVO<Object>>> taskMap = new ConcurrentHashMap<>(16);
public DeferredResult<ResultVO<Object>> execute(DeferredResultDTO dto) {
return execute(dto, 5000L);
}
public DeferredResult<ResultVO<Object>> execute(DeferredResultDTO dto, Long time) {
if (taskMap.containsKey(dto)) {
throw new BusinessException(String.format("msgId=%s 已經存在,請勿重發消息", dto.getMsgId()));
}
DeferredResult<ResultVO<Object>> deferredResult = new DeferredResult<>(time);
deferredResult.onError((e) -> {
taskMap.remove(dto);
log.info("處理失敗 ", e);
deferredResult.setResult(ResultVoUtil.fail("處理失敗"));
});
deferredResult.onTimeout(() -> {
taskMap.remove(dto);
if (dto.getType().equals(DeferredResultTypeEnum.CLOTHES_DETECTION)) {
ExamController.getCURRENT_STUDENT().remove(dto.getMsgId());
}
deferredResult.setResult(ResultVoUtil.fail("請求超時,請聯系工作人員!"));
});
taskMap.putIfAbsent(dto, deferredResult);
return deferredResult;
}
public void setResult(DeferredResultDTO dto, ResultVO<Object> resultVO) {
if (taskMap.containsKey(dto)) {
DeferredResult<ResultVO<Object>> deferredResult = taskMap.get(dto);
deferredResult.setResult(resultVO);
taskMap.remove(dto);
} else {
log.error("ERROR 未找到該消息msgId:{}", dto.getMsgId());
}
}
}2. 新開個定時任務線程池 定時輪詢當前任務列表
/**
* @author tiancong
* @date 2021/4/10 11:06
*/
@Slf4j
public class T {
private static final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(
2,
r -> {
Thread thread = new Thread(r);
thread.setName("failAfter-%d");
thread.setDaemon(true);
return thread;
});
private static int timeCount;
public static void main(String[] args) throws InterruptedException {
ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor();
executorService.setCorePoolSize(4);
executorService.setQueueCapacity(10);
executorService.setMaxPoolSize(100);
executorService.initialize();
// executorService.setAwaitTerminationSeconds(5);
// executorService.getThreadPoolExecutor().awaitTermination(3, TimeUnit.SECONDS);
executorService.setWaitForTasksToCompleteOnShutdown(true);
Random random = new Random();
long start = System.currentTimeMillis();
List<ListenableFuture<Boolean>> asyncResultList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
ListenableFuture<Boolean> asyncResult = executorService.submitListenable(() -> {
int r = random.nextInt(10);
log.info("{} 開始睡{}s", Thread.currentThread().getName(), r);
TimeUnit.SECONDS.sleep(r);
log.info("{} 干完了 {}s", Thread.currentThread().getName(), r);
//throw new RuntimeException("出現異常");
return true;
});
asyncResult.addCallback(data -> {
try {
// 休息3毫秒模擬獲取到執行結果后的操作
TimeUnit.MILLISECONDS.sleep(3);
log.info("{} 收到結果:{}", Thread.currentThread().getName(), data);
} catch (Exception e) {
e.printStackTrace();
}
}, ex -> log.info("**異常信息**", ex));
asyncResultList.add(asyncResult);
}
System.out.println(String.format("總結耗時:%s ms", System.currentTimeMillis() - start));
// 守護進程 定時輪詢 終止超時的任務
scheduler.scheduleAtFixedRate(() -> {
// 模擬守護進程 終止超過6s的任務
timeCount++;
if (timeCount > 6) {
for (ListenableFuture<Boolean> future : asyncResultList) {
if (!future.isDone()) {
log.error("future 因超時終止任務,{}", future);
future.cancel(true);
}
}
}
}, 0, 1000, TimeUnit.MILLISECONDS);
}
}額外補充:
CompletableFuture實現了CompletionStage接口,里面很多豐富的異步編程接口。
applyToEither方法是哪個先完成,就apply哪一個結果(但是兩個任務都會最終走完)
/**
* @author tiancong
* @date 2021/4/10 11:06
*/
@Slf4j
public class T {
public static void main(String[] args) throws InterruptedException {
// CompletableFuture<String> responseFuture = within(
// createTaskSupplier("5"), 3000, TimeUnit.MILLISECONDS);
// responseFuture
// .thenAccept(T::send)
// .exceptionally(throwable -> {
// log.error("Unrecoverable error", throwable);
// return null;
// });
//
// 注意 exceptionally是new 的CompletableFuture
CompletableFuture<Object> timeoutCompletableFuture = timeoutAfter(1000, TimeUnit.MILLISECONDS).exceptionally(xxx -> "超時");
// 異步任務超時、異常處理
List<Object> collect = Stream.of("1", "2", "3", "4", "5", "6", "7")
// .map(x -> within(
// createTaskSupplier(x), 3000, TimeUnit.MILLISECONDS)
// .thenAccept(T::send)
// .exceptionally(throwable -> {
// log.error("Unrecoverable error", throwable);
// return null;
// }))
.map(x -> CompletableFuture.anyOf(createTaskSupplier(x)
, timeoutCompletableFuture))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
// .map(x -> CompletableFuture.anyOf(createTaskSupplier(x)
// , oneSecondTimeout).join())
// .collect(Collectors.toList());
System.out.println("-------結束------");
System.out.println(collect.toString());
}
private static final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(
2,
r -> {
Thread thread = new Thread(r);
thread.setName("failAfter-%d");
thread.setDaemon(true);
return thread;
});
private static String send(String s) {
log.info("最終結果是{}", s);
return s;
}
private static CompletableFuture<String> createTaskSupplier(String x) {
return CompletableFuture.supplyAsync(getStringSupplier(x))
.exceptionally(Throwable::getMessage);
}
private static Supplier<String> getStringSupplier(String text) {
return () -> {
System.out.println("開始 " + text);
if ("1".equals(text)) {
throw new RuntimeException("運行時錯誤");
}
try {
if ("5".equals(text)) {
TimeUnit.SECONDS.sleep(5);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("結束 " + text);
return text + "號";
};
}
private static <T> CompletableFuture<T> within(CompletableFuture<T> future, long timeout, TimeUnit unit) {
final CompletableFuture<T> timeoutFuture = timeoutAfter(timeout, unit);
// 哪個先完成 就apply哪一個結果 這是一個關鍵的API
return future.applyToEither(timeoutFuture, Function.identity());
}
private static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
CompletableFuture<T> result = new CompletableFuture<T>();
// timeout 時間后 拋出TimeoutException 類似于sentinel / watcher
scheduler.schedule(() -> result.completeExceptionally(new TimeoutException("超時:" + timeout)), timeout, unit);
// return CompletableFuture.supplyAsync(()-> (T)"另一個分支任務");
return result;
}
}以上就是java中怎么實現異步處理,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注創新互聯行業資訊頻道。
分享文章:java中怎么實現異步處理
分享網址:http://www.yijiale78.com/article46/pchdeg.html
成都網站建設公司_創新互聯,為您提供服務器托管、定制開發、網站維護、營銷型網站建設、靜態網站、ChatGPT
聲明:本網站發布的內容(圖片、視頻和文字)以用戶投稿、用戶轉載內容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網站立場,如需處理請聯系客服。電話:028-86922220;郵箱:631063699@qq.com。內容未經允許不得轉載,或轉載時需注明來源: 創新互聯