使用双异步后,如何保证数据一致性?

一、使用双异前情提要
在上一篇文章中,步后我们使用双异步后,何保从 191s 优化到 2s,证数有个小伙伴在评论区问我,使用双异如何保证插入后数据的步后一致性呢?
很简单,通过对比Excel文件行数和入库数量是何保否相等即可。
那么,证数如何获取异步线程的使用双异返回值呢?

二、通过Future获取异步返回值
我们可以通过给异步方法添加Future返回值的步后方式获取结果。
FutureTask 除了实现 Future 接口外,何保还实现了 Runnable 接口。证数因此,使用双异FutureTask 可以交给 Executor 执行,步后也可以由调用线程直接执行FutureTask.run()。何保
1、FutureTask 是基于 AbstractQueuedSynchronizer实现的
AbstractQueuedSynchronizer简称AQS,它是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及 维护被阻塞线程的队列。基于 AQS 实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch 和 FutureTask。服务器托管
基于 AQS实现的同步器包含两种操作:
acquire,阻塞调用线程,直到AQS的状态允许这个线程继续执行,在FutureTask中,get()就是这个方法;release,改变AQS的状态,使state变为非阻塞状态,在FutureTask中,可以通过run()和cancel()实现。2、FutureTask执行流程

执行@Async异步方法。
建立新线程async-executor-X,执行Runnable的run()方法,(FutureTask实现RunnableFuture,RunnableFuture实现Runnable)。
判断状态state。
如果未新建或者不处于AQS,直接返回。否则进入COMPLETING状态,执行异步线程代码。如果执行cancel()方法改变AQS的状态时,会唤醒AQS等待队列中的第一个线程线程async-executor-1。
线程async-executor-1被唤醒后
将自己从AQS队列中移除。然后唤醒next线程async-executor-2。改变线程async-executor-1的state。IT技术网等待get()线程取值。next等待线程被唤醒后,循环线程async-executor-1的步骤。
被唤醒。从AQS队列中移除。唤醒next线程。改变异步线程状态。新建线程async-executor-N,监听异步方法的state。
如果处于EXCEPTIONAL以上状态,抛出异常。如果处于COMPLETING状态,加入AQS队列等待。如果处于NORMAL状态,返回结果。3、get()方法执行流程
get()方法通过判断状态state观测异步线程是否已结束,如果结束直接将结果返回,否则会将等待节点扔进等待队列自旋,阻塞住线程。
自旋直至异步线程执行完毕,获取另一边的线程计算出结果或取消后,将等待队列里的所有节点依次唤醒并移除队列。

如果state小于等于COMPLETING,表示任务还在执行中。
计算超时时间。如果超时,则从等待队列中移除等待节点WaitNode,b2b信息网返回当前状态state。阻塞队列nanos毫秒。如果已有等待节点WaitNode,将线程置空。返回当前状态。如果线程被中断,从等待队列中移除等待节点WaitNode,抛出中断异常。如果state大于COMPLETING。如果任务正在执行,让出时间片。如果还未构造等待节点,则new一个新的等待节点。如果未入队列,CAS尝试入队。如果有超时时间参数。否则阻塞队列。如果state大于COMPLETING。
如果执行完毕,返回结果。如果大于等于取消状态,则抛出异常。很多小朋友对读源码,嗤之以鼻,工作3年、5年,还是没认真读过任何源码,觉得读了也没啥用,或者读了也看不懂~
其实,只要把源码的执行流程通过画图的形式呈现出来,你就会幡然醒悟,原来是这样的~
简而言之:
如果异步线程还没执行完,则进入CAS自旋。其它线程获取结果或取消后,重新唤醒CAS队列中等待的线程。再通过get()判断状态state;。直至返回结果或(取消、超时、异常)为止。三、FutureTask源码具体分析
1、FutureTask源码
通过定义整形状态值,判断state大小,这个思想很有意思,值得学习。
复制public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }1.2.3.4.5.6.7. 复制public class FutureTask<V> implements RunnableFuture<V> { // 最初始的状态是new 新建状态 private volatile int state; private static final int NEW = 0; // 新建状态 private static final int COMPLETING = 1; // 完成中 private static final int NORMAL = 2; // 正常执行完 private static final int EXCEPTIONAL = 3; // 异常 private static final int CANCELLED = 4; // 取消 private static final int INTERRUPTING = 5; // 正在中断 private static final int INTERRUPTED = 6; // 已中断 public V get() throws InterruptedException, ExecutionException { int s = state; // 任务还在执行中 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 线程被中断,从等待队列中移除等待节点WaitNode,抛出中断异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 任务已执行完毕或取消 if (s > COMPLETING) { // 如果已有等待节点WaitNode,将线程置空 if (q != null) q.thread = null; return s; } // 任务正在执行,让出时间片 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 还未构造等待节点,则new一个新的等待节点 else if (q == null) q = new WaitNode(); // 未入队列,CAS尝试入队 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果有超时时间参数 else if (timed) { // 计算超时时间 nanos = deadline - System.nanoTime(); // 如果超时,则从等待队列中移除等待节点WaitNode,返回当前状态state if (nanos <= 0L) { removeWaiter(q); return state; } // 阻塞队列nanos毫秒 LockSupport.parkNanos(this, nanos); } else // 阻塞队列 LockSupport.park(this); } } private V report(int s) throws ExecutionException { // 获取outcome中记录的返回结果 Object x = outcome; // 如果执行完毕,返回结果 if (s == NORMAL) return (V)x; // 如果大于等于取消状态,则抛出异常 if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80. 2、将异步方法的返回值改为Future<Integer>,将返回值放到new AsyncResult<>();中 复制@Async("async-executor") public void readXls(String filePath, String filename) { try { // 此代码为简化关键性代码 List<Future<Integer>> futureList = new ArrayList<>(); for (int time = 0; time < times; time++) { Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync(); futureList.add(sumFuture); } }catch (Exception e){ logger.error("readXlsCacheAsync---插入数据异常:",e); } }1.2.3.4.5.6.7.8.9.10.11.12.13. 复制@Async("async-executor") public Future<Integer> readXlsCacheAsync() { try { // 此代码为简化关键性代码 return new AsyncResult<>(sum); }catch (Exception e){ return new AsyncResult<>(0); } }1.2.3.4.5.6.7.8.9.3、通过Future<Integer>.get()获取返回值:
复制public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow){ int[] futureSumArr = new int[futureList.size()]; for (int i = 0;i<futureList.size();i++) { try { Future<Integer> future = futureList.get(i); while (true) { if (future.isDone() && !future.isCancelled()) { Integer futureSum = future.get(); logger.info("获取Future返回值成功"+"----Future:" + future + ",Result:" + futureSum); futureSumArr[i] += futureSum; break; } else { logger.info("Future正在执行---获取Future返回值中---等待3秒"); Thread.sleep(3000); } } } catch (Exception e) { logger.error("获取Future返回值异常: ", e); } } boolean insertFlag = getInsertSum(futureSumArr, excelRow); logger.info("获取所有异步线程Future的返回值成功,Excel插入结果="+insertFlag); return insertFlag; }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.4、这里也可以通过新线程+Future获取Future返回值
不过感觉多此一举了,就当练习Future异步取返回值了~
复制public static Future<Boolean> getFutureResultThreadFuture(List<Future<Integer>> futureList, int excelRow) { ExecutorService service = Executors.newSingleThreadExecutor(); final boolean[] insertFlag = {false}; service.execute(new Runnable() { public void run() { try { insertFlag[0] = getFutureResult(futureList, excelRow); } catch (Exception e) { logger.error("新线程+Future获取Future返回值异常: ", e); insertFlag[0] = false; } } }); service.shutdown(); return new AsyncResult<>(insertFlag[0]); }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.获取异步线程结果后,我们可以通过添加事务的方式,实现Excel入库操作的数据一致性。
但Future会造成主线程的阻塞,这个就很不友好了,有没有更优解呢?
在BUG中磨砺,在优化中成长,我们下期见~
本文地址:http://www.bzve.cn/html/431f2499544.html
版权声明
本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。