快捷搜索:

同异步开垦情势,CompletableFuture异步化职责管理

作者: w88官方网站手机版  发布:2019-10-06

使用JDK1.8 CompletableFuture异步化任务处理,

同异步开发模式

标签(空格分隔): 同步异步


在soa服务调用中,一般一个请求会分配一个线程对请求进行处理,在IO操作比较多并且采用同步模式的情况下,假如有大量请求,那么大部分的线程都会处于IO等待状态,会导致系统的吞吐率下降。在很多情况下,使用异步的调用方式,可以立即返回初步的结果,延迟返回最终结果数据,在这个过程可以释放占用的线程等资源,避免阻塞,提高响应速率和系统的吞吐率。


高并发的大杀器:异步化

0.概述

服务端编程的一个经典场景是在接收和处理客户端请求时,为了避免对每一个请求都分配线程而带来的资源开销,服务一般会预先分配一个固定大小的线程池(比如Tomcat connector maxThreads),当客户端请求到来时,从线程池里寻找空闲状态的线程来处理请求,请求处理完毕后会回到线程池,继续服务下一个请求。当线程池内的线程都处于繁忙状态时,新来的请求需要排队直到线程池内有可用的线程,或者当超出队列容量后(Tomcat connector acceptCount属性)请求被拒绝(connection refused error)。

为了提高服务的吞吐量,我们应当确保主线程尽快处理尽快返回,尽量使服务端的任务处理线程池始终有可分配的线程来处理新的客户端请求。

当主线程执行一个任务时,如果该任务较耗时, 通常的做法是利用Future/Promise来异步化处理任务。从JDK1.5开始,J.U.C中提供了Future来代表一个异步操作。JDK1.8中则新增了lambda表达式和CompletableFuture, 可以帮助我们更好的用函数式编程风格来实现任务的异步处理。

1、同步异步、阻塞非阻塞

之前一直以为同步/异步阻塞/非阻塞是同一个概念,后面了解到是不一样的概念之后又很想弄清两者之间的区别,一直在找能将两者划清的一道线。但其实这两者的联系是十分紧密的,同步的往往伴随的是阻塞,异步的往往是非阻塞的。但是阻塞和非阻塞一般来说是只针对IO操作而言的。

同步:发出一个请求之后,线程需要等到结果回来才去处理其他事情

异步:发出一个请求后,等待的过程可以释放这个线程,等到结果回来了,继续处理剩下的事情

阻塞:进程处理一个任务时,需要等待IO操作处理完成后才能继续执行

非阻塞:进程处理一个任务时,不需要等待IO操作处理完成,就能继续后续的操作,隔断时间再来询问之前的操作是否完成。这样的过程其实也叫轮询。

阻塞和非阻塞应该是一种状态,针对单个线程或者进程而言,是否需要等待IO操作处理完了才能做其他操作。而同步异步更多的是一种通信模式,一般来说,异步都是通过回调来完成,调用方发出请求后,中间没有阻塞的状态,直接去处理其他任务,被调用方处理完请求后主动通知调用方并返回结果。

这篇文章的介绍写的很好 http://blog.csdn.net/historyasamirror/article/details/5778378


同步和异步,阻塞和非阻塞

1. Future

代码例子: 

ExecutorService executor = Executors.newFixedThreadPool(1);
Future<String> future = executor.submit(() -> {
     // long running task
     return "task finish.";
});

 

Future实在是太鸡肋了,仅暴露了get/cancel/isDone/isCancelled方法。我们无法通过Future去手动结束一个任务,也无法非阻塞的去获取Future的任务结果,因为future.get()方法是阻塞的。假设有下面这个场景,当前有两个任务,后一个任务依赖了前一个任务的处理结果,这种场景也无法通过Future来实现异步流程任务处理。

2、Java的异步编程方式

同步和异步,阻塞和非阻塞,这几个词已经是老生常谈,但是还是有很多同学分不清楚,以为同步肯定就是阻塞,异步肯定就是非阻塞,其实他们并不是一回事。

2. CompletableFuture

CompletableFuture实现了Future和CompletionStage两个接口,CompletionStage可以看做是一个异步任务执行过程的抽象。我们可以基于CompletableFuture方便的创建任务和链式处理多个任务。下面我们通过实例来介绍它的用法。

2.1 创建任务

可以使用runAsync方法新建一个线程来运行Runnable对象(无返回值)

CompletableFuture<Void> futureAsync = CompletableFuture.runAsync(() -> {
    // long running task without return value
    System.out.println("task finish.");
});

 

也可以使用supplyAysnc方法新建线程来运行Supplier<T>对象(有返回值)

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // long running task
    return "task result";
});

 

这里执行任务的线程来自于ForkJoinPool.commonPool() , 也可以自定义线程池

ExecutorService exector = Executors.newFixedThreadPool(5);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // long running task
    return "task result";
}, executor);

 

2.2  任务的异步处理

不论Future.get()方法还是CompletableFuture.get()方法都是阻塞的,为了获取任务的结果同时不阻塞当前线程的执行,我们可以使用CompletionStage提供的方法结合callback来实现任务的异步处理。

2.2.1  使用callback基于特定任务的结果进行异步处理

程序中经常需要主线程创建新的线程来处理某一任务,然后基于任务的完成结果(返回值或者exception)来执行特定逻辑, 对于这种场景我们可以很方面的使用whenComplete或者whenCompleteAsync来注册callback方法

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // long running task
    return "task result";
});

future.whenComplete((result, exception) -> {
    if (null == exception) {
        System.out.println("result from previous task: "   result);
    } 
});

对于任务执行中抛错的情况:

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // long running task
    throw new RuntimeException("error!");
});

future.whenComplete((result, exception) -> {
    if (null == exception) {
        System.out.println("result from previous task: "   result);
    } else {
        System.err.println("Exception thrown from previous task: "   exception.getMessage());
    }
});

 

也可以用exceptionally来显示的处理错误:

CompletableFuture.supplyAsync(() -> {
    throw new IllegalArgumentException("error");
}).exceptionally(ex -> {
    System.out.println("Exception caught: "   ex.getMessage());
    return ex.getMessage();
}).thenAccept(result -> {
    System.out.println("result: "   result);
});

 

如果不需关心任务执行中是否有exception,则可以使用thenAccept方法, 需要注意的是如果执行中抛了exception, 则thenAccept里面的回调方法不会被执行

CompletableFuture.supplyAsync(() -> {
    // long running task
    return "task result";
}).thenAccept((result) -> {
    System.out.println("result from previous task: "   result);
});

 

2.2.2 任务的链式处理

在应用中经常会遇到任务的pipeline处理,任务A执行完后触发任务B,任务B执行完后触发任务C,上一个任务的结果是下一个任务的输入,对于这种场景,我们可以使用thenApply方法。

CompletableFuture.supplyAsync(() -> {
    // long running task
    return "task1";
}).thenApply(previousResult -> {
    return previousResult   " task2";
}).thenApply(previousResult -> {
    return previousResult   " task3";
}).thenAccept(previousResult -> {
    System.out.println(previousResult);
});
output: task1 task2 task3

 

让我们再看下面这个例子,某一应用需要先根据accountId从数据库找到对应的账号信息,然后对该账号执行特定的处理逻辑:

CompletableFuture<Account> getAccount(String accountId) {
    return CompletableFuture.supplyAsync(() -> {
        return accountService.findAccount(accountId);
    });
}

CompletableFuture<String> processAccount(Account account) {
    return CompletableFuture.supplyAsync(() -> {
        return accountService.updateAccount(account);
    });
}

如果使用thenApply方法,其返回的结果是一个嵌套的CompletableFuture对象:

CompletableFuture<CompletableFuture<String>> res = getAccount("123").thenApply(account -> {
    return processAccount(account);
});

 

如果不希望结果是嵌套的CompletableFuture,我们可以使用thenCompose方法来替代thenApply

CompletableFuture<String> res = getAccount("123").thenCompose(account -> {
    return processAccount(account);
});

 

2.2.3 多任务的并行处理

另一种常见的场景是将一个大的任务切分为数个子任务,并行处理所有子任务,当所有子任务都成功结束时再继续处理后面的逻辑。以前的做法是利用CountDownLatch, 主线程构造countDownLatch对象,latch的大小为子任务的总数,每一个任务持有countDownLatch的引用,任务完成时对latch减1,主线程阻塞在countDownLatch.await方法上,当所有子任务都成功执行完后,latch=0, 主线程继续执行。

int size = 5;
CountDownLatch latch = new CountDownLatch(size);
for (int i = 0; i < size; i  ) {
    Executors.newFixedThreadPool(size).submit(() -> {
        try {
            // long running task
            System.out.println(Thread.currentThread().getName()   " "   latch.getCount());
        } finally {
            latch.countDown();
        }
    });
}
try {
    latch.await();
} catch (InterruptedException e) {
    e.printStackTrace();
}

// continue...
System.out.println(Thread.currentThread().getName());

 

这样的代码繁琐且很容易出错,我们可以用CompletableFuture.allOf来方便的处理上述场景。直接贴例子, 根据一组账户ID并行查找对应账户:

CompletableFuture<String> findAccount(String accountId) {
    return CompletableFuture.supplyAsync(() -> {
        // mock finding account from database
        return "account"   accountId;
    });
}

public void batchProcess(List<String> accountIdList) {
    // 并行根据accountId查找对应account
    List<CompletableFuture<String>> accountFindingFutureList =
        accountIdList.stream().map(accountId -> findAccount(accountId)).collect(Collectors.toList());

    // 使用allOf方法来表示所有的并行任务
    CompletableFuture<Void> allFutures =
        CompletableFuture
            .allOf(accountFindingFutureList.toArray(new CompletableFuture[accountFindingFutureList.size()]));

    // 下面的方法可以帮助我们获得所有子任务的处理结果
    CompletableFuture<List<String>> finalResults = allFutures.thenApply(v -> {
        return accountFindingFutureList.stream().map(accountFindingFuture -> accountFindingFuture.join())
            .collect(Collectors.toList());
    });
}

 

如果后续逻辑没有必要等待所有子任务全部结束,而是只要任一一个任务成功结束就可以继续执行,我们可以使用CompletableFuture.anyOf方法:

CompletableFuture<Object> anyOfFutures = CompletableFuture.anyOf(taskFutureA, taskFutureB, taskFutureC);

假设三个任务中taskFutureA最先执行完毕并成功返回,则anyOfFutures里得到的是taskFutureA的执行结果.

2.1、Future

JDK 5引入了Future模式,可以用来进行异步调用。Future接口有五个方法

public interface Future<V> { 

    boolean cancel(boolean mayInterruptIfRunning); //取消任务的执行
    boolean isCancelled();  //任务是否已经取消
    boolean isDone();      //任务是否已经完成
    V get() throws InterruptedException, ExecutionException;   //等待任务结束 ,获取结果
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; //在timeout时间内等待任务结束 ,获取结果,如果超时抛出异常
}

Future虽然可以实现异步执行,但是在调用future.get()时线程会阻塞,直到拿到结果,这时又变成同步操作。
不阻塞的情况则需要循环地调用future.isDone()判断future是否完成,再去get结果,这样会浪费CPU的资源。

同步和异步关注的是结果消息的通信机制:

3.展望

基于JDK1.8的lambda表达式和CompletableFuture, 我们可以写出更具有函数式编程风格的代码,可以更方便的实现任务的异步处理,只用很少的代码便可以实现任务的异步pipeline和并行调用。在异步开发模型(nodeJs/Vert.x)越来越火的今天,我们就从今天开始使用lambda CompletableFuture来改造我们的Java应用吧。

CompletableFuture异步化任务处理, 0.概述 服务端编程的一个经典场景是在接收和处理客户端请求时,为了避免对每一个请求都分配线...

2.2、CompletableFuture

CompletableFuture实现了非阻塞的异步调用。我们可以在CompletableFuture上注册一个completion事件,CompletableFuture执行完成后会触发这个事件的调用,这样就让执行免收阻塞之苦。提供这一功能的方法有

  • thenApply()(针对返回值为其他类型的函数)
  • thenAccept()(针对返回值为void的函数)
  • whenComplete()(针对接受一个值和一个Throwable,并返回void的函数)等。

同步:调用方需要主动等待结果的返回。

3、SOA服务客户端与服务端通信模式

客户端的同异步模式和服务端的同异步模式其实是无关联的。

图片 1

cliSer.png

客户端如果使用同步模式,发出请求后,线程会一直等待,直到服务端返回结果。
客户端如果使用异步模式,发出请求后,先拿到返回的CompletableFuture,等服务端返回结果后,会执行thenApply中的回调方法。
这两种方式,都是要等到服务端有结果了客户端才继续执行或者通过回调的方式处理请求结果,因此客户端的调用模式和服务端应该是无关联的。

服务端使用异步

if (serviceDef.isAsync) {//异步

    SoaFunctionDefinition.Async asyncFunc = (SoaFunctionDefinition.Async) soaFunction;
    CompletableFuture<RESP> future = (CompletableFuture<RESP>) asyncFunc.apply(iface, args);

    future.whenComplete((realResult, ex) -> {//任务执行完成后
        TransactionContext.Factory.setCurrentInstance(context);
        processResult(channelHandlerContext, soaFunction, context, realResult, application, ctx);
        onExit(ctx, getPrevChain(ctx));
    });
} else {//同步

    SoaFunctionDefinition.Sync syncFunction = (SoaFunctionDefinition.Sync) soaFunction;
    RESP result = (RESP) syncFunction.apply(iface, args);
    processResult(channelHandlerContext, soaFunction, context, result, application, ctx);
    onExit(ctx, getPrevChain(ctx));
}

上面的代码是服务端请求的处理,分了同异步,我一开始的时候想不通,既然客户端使用了异步,服务端使用了异步还有意义吗?后来我就去问了我的老师江湖人称老王,其实是因为我只站在了客户端的角度思考,服务端使用异步,服务本身的性能也会提高,那对于客户端肯定是有益处的。另外,服务端使用异步也是要看业务场景的,举个例子,客户端发送请求过来服务端后,服务端需要进行处理的逻辑是:使用爬虫从其他网页爬取数据,再对这些数据进行解析。由于爬虫需要进行网络连接,可能出现网络不好一直在等待的情况,但是解析数据其实是很快的,这个时候不使用异步就会出现大部分时间一直在等待而线程资源又没有释放的情况,就会影响系统的吞吐率。因此,在服务端使用异步模式在某些场景下其实意义是很大的。

异步:不需要主动等待结果的返回,而是通过其他手段,比如状态通知,回调函数等。

3.1、客户端的通信模式分析(以服务化SOA框架dapeng为例)

调用dapeng服务的客户端通过服务的api去调用。dapeng是通过thrift的IDL去定义服务接口的,并且根据IDL自动生成相应的api,api中包含了同步和异步的客户端代码,因此客户端可以通过同步模式来调用也可以通过异步模式来调用。

假定是在电商系统下订单这种场景下,当用户下了订单之后,我们可能需要去做很多的操作,比如扣减用户的优惠券,减库存,扣减用户的钱包等等其他服务,假设这三个操作,每个操作分别需要100ms,200ms,300ms,如果使用同步,那么执行这些所有的操作是串行的,执行总时间可能需要100ms 200ms 300ms=600ms,假如通过异步,那么每次执行完都会立即返回一个CompletableFuture,所有操作的执行其实可以当作是并行的,那么全部执行完只需要300ms,也就是说取决于耗时最长的那个服务。

具体我们结合dapeng的代码分析

dapeng的通信是基于netty的,我们启动客户端时会将所有ChannelHandler注册到ChannelPipeline中,服务端返回的字节流最终会去到回调处理的方法中。

handler:

bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new IdleStateHandler(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds), //超时设置
                new SoaDecoder(),        //粘包拆包处理
                new SoaIdleHandler(),    //心跳处理
                new SoaClientHandler(callBack) //回调处理
        );
    }
});

同步调用:可以看到下面的代码,我们会将future插入RequestQueue队列中,以seqid为key,请求发送到服务端之后,我们调用future的get()方法,这时线程会阻塞在这里,直到服务端返回结果,callback方法中把结果设回future。这种情况下调用每个服务是按顺序走的,一次调用结束后才能继续调用下一个服务,因此耗时是调用每一次服务的时间之和。

public ByteBuf send(Channel channel, int seqid, ByteBuf request, long timeout) throws SoaException {

    //means that this channel is not idle and would not managered by IdleConnectionManager
    IdleConnectionManager.remove(channel);

    CompletableFuture<ByteBuf> future = new CompletableFuture<>();

    RequestQueue.put(seqid, future);

    try {
        channel.writeAndFlush(request); //将请求发送至服务端
        ByteBuf respByteBuf = future.get(timeout, TimeUnit.MILLISECONDS);
        return respByteBuf;
    } catch (TimeoutException e) {
        LOGGER.error("请求超时,seqid:" seqid);
        throw new SoaException(SoaCode.TimeOut.getCode(), SoaCode.TimeOut.getMsg());
    } catch (Throwable e) {
        throw new SoaException(SoaCode.UnKnown, e.getMessage() == null ? SoaCode.UnKnown.getMsg() : e.getMessage());
    } finally {
        RequestQueue.remove(seqid);
    }

}

callback:服务端返回的结果最终会来到这个方法,我们拿到seqid后从RequestQueue获取对应的future,然后将结果写进future。

private SoaClientHandler.CallBack callBack = msg -> {
    // length(4) stx(1) version(...) protocol(1) seqid(4) header(...) body(...) etx(1)
    int readerIndex = msg.readerIndex();
    msg.skipBytes(7); // length4   stx1   version1   protocol1
    int seqid = msg.readInt();   //拿到seqid
    msg.readerIndex(readerIndex);

    CompletableFuture<ByteBuf> future = RequestQueue.remove(seqid);
    if (future != null) {
        future.complete(msg); //将结果写进future
    } else {
        LOGGER.error("返回结果超时,siqid为:"   seqid);
        msg.release();
    }
};

异步调用:这里同样是将future写进RequestQueue队列中,然后发送请求到服务端,这个时候我们直接返回future,因此主线程不会阻塞。最后当结果返回的时候,回调方法里同样会将结果写进future,最后调用方可以在CompletableFuture中的thenApply等方法对结果进行处理。因此异步调用下会立即返回future,马上可以执行其他操作,结合上面的例子,调用十个服务,每个调用耗时100ms,用异步执行总耗时也是大约100ms。

public CompletableFuture<ByteBuf> sendAsync(Channel channel, int seqid, ByteBuf request, long timeout) throws Exception {

    IdleConnectionManager.remove(channel);

    CompletableFuture<ByteBuf> future = new CompletableFuture<>();

    RequestQueue.putAsync(seqid, future, timeout);

    channel.writeAndFlush(request);

    return future;
}

阻塞和非阻塞主要关注的是等待结果返回调用方的状态:

3.2、服务端的通信模式分析(以服务化SOA框架dapeng为例)

dapeng 根据IDL生成api的时候,会生成两个接口,一个同步一个异步,写服务实现时需要根据应用场景决定使用同步还是异步,使用同步则继承同步的接口,使用异步则继承异步的接口。服务端会根据继承的接口判断使用同步还是异步。

我们以在浏览器登陆微信的场景为例,微信登陆的时候需要扫描二维码,假设用户扫码之后我们需要调用一个服务,去加载某些信息,当用户扫描成功后返回这些信息,有个定义这个处理的接口接口,那么我们定义好IDL之后会生成一个同步和一个异步的接口,异步的会继承AsyncService,这样框架处理的时候可以以这个为根据判断是同步还是异步。

public interface LoadInfoService {  
    void loadInfo() throws com.github.dapeng.core.SoaException;
}

public interface LoadInfoServiceAsync extends AsyncService {      

    Future<String> loadInfo() throws SoaException;
}

那么我们在写服务端的时候需要去实现这个接口,我们采用异步的模式,就需要implements LoadInfoServiceAsync。

public class LoadInfoServiceImpl implements LoadInfoServiceAsync {

    @Override
    public Future<String> loadInfo() throws SoaException {

        CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {
            try {
                String info = processLoading();  //业务处理
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return info;
        });
        return result;
    }

}

这样我们就实现了异步服务,那么dapeng框架在处理请求的时候就会判断这个服务是同步还是异步实现,然后采取不同的处理方法,主要代码如下:

if (serviceDef.isAsync) {
    SoaFunctionDefinition.Async asyncFunc = (SoaFunctionDefinition.Async) soaFunction;
    CompletableFuture<RESP> future = (CompletableFuture<RESP>) asyncFunc.apply(iface, args); //调用服务实现的方法
    future.whenComplete((realResult, ex) -> {
        TransactionContext.Factory.setCurrentInstance(context);
        processResult(channelHandlerContext, soaFunction, context, realResult, application, ctx);
        onExit(ctx, getPrevChain(ctx));
    });
} else {
    SoaFunctionDefinition.Sync syncFunction = (SoaFunctionDefinition.Sync) soaFunction;
    RESP result = (RESP) syncFunction.apply(iface, args); //调用服务实现的方法
    processResult(channelHandlerContext, soaFunction, context, result, application, ctx);
    onExit(ctx, getPrevChain(ctx));
}

回到上面说到的场景,假设我们采用同步实现,由于我们不知道用户什么时候扫描,因此扫描完后我们才能去调用服务,并且等待服务返回,这个时候假如服务需要等到的时间很长,那么用户体验就会很差。另外,业务线程池的数量是有限的,如果每次服务调用时间很长,那么使用同步每次调用都占用一个业务线程,对服务性能就会造成很大的影响。

阻塞:是指结果返回之前,当前线程被挂起,不做任何事。

4、同步开发模式存在的问题

对于存在大量计算或者IO操作等需要等待应答的情况,一个连接的并发请求数非常有限,只有等到有结果之后才能进行下一个请求,而一个连接池的连接数是有限的。尤其是在多个服务系统间的调用时,如果流程阻塞在某个系统上,那么整体系统的性能都会下降。

非阻塞:是指结果在返回之前,线程可以做一些其他事,不会被挂起。

5、异步开发模式适用场景

但并不是说异步模式一定优越于同步模式,以下场景适合使用异步的开发模式

  • 不涉及共享资源,或对共享资源只读,即非互斥操作

  • 没有时序上的严格关系

  • 常用于IO操作等耗时操作

  • 不影响主线程逻辑

参考https://juejin.im/post/593e455861ff4b006c9bf6e0

可以看见同步和异步,阻塞和非阻塞主要关注的点不同,有人会问同步还能非阻塞,异步还能阻塞?

当然是可以的,下面为了更好的说明它们的组合之间的意思,用几个简单的例子说明:

同步阻塞:同步阻塞基本也是编程中最常见的模型,打个比方你去商店买衣服,你去了之后发现衣服卖完了,那你就在店里面一直等,期间不做任何事,等着商家进货,直到有货为止,这个效率很低。

同步非阻塞:同步非阻塞在编程中可以抽象为一个轮询模式,你去了商店之后,发现衣服卖完了。

这个时候不需要傻傻的等着,你可以去其他地方比如奶茶店,买杯水,但是你还是需要时不时的去商店问老板新衣服到了吗。

异步阻塞:异步阻塞这个编程里面用的较少,有点类似你写了个线程池,submit 然后马上 future.get(),这样线程其实还是挂起的。

有点像你去商店买衣服,这个时候发现衣服没有了,这个时候你就给老板留个电话,说衣服到了就给我打电话,然后你就守着这个电话,一直等着它响什么事也不做。这样感觉的确有点傻,所以这个模式用得比较少。

异步非阻塞:这也是现在高并发编程的一个核心,也是今天主要讲的一个核心。

好比你去商店买衣服,衣服没了,你只需要给老板说这是我的电话,衣服到了就打。然后你就随心所欲的去玩,也不用操心衣服什么时候到,衣服一到,电话一响就可以去买衣服了。

同步阻塞 PK 异步非阻塞

上面已经看到了同步阻塞的效率是多么的低,如果使用同步阻塞的方式去买衣服,你有可能一天只能买一件衣服,其他什么事都不能干;如果用异步非阻塞的方式去买,买衣服只是你一天中进行的一个小事。

我们把这个映射到我们代码中,当我们的线程发生一次 RPC 调用或者 HTTP 调用,又或者其他的一些耗时的 IO 调用。

发起之后,如果是同步阻塞,我们的这个线程就会被阻塞挂起,直到结果返回,试想一下,如果 IO 调用很频繁那我们的 CPU 使用率会很低很低。

正所谓是物尽其用,既然 CPU 的使用率被 IO 调用搞得很低,那我们就可以使用异步非阻塞。

当发生 IO 调用时我并不马上关心结果,我只需要把回调函数写入这次 IO 调用,这个时候线程可以继续处理新的请求,当 IO 调用结束时,会调用回调函数。

而我们的线程始终处于忙碌之中,这样就能做更多的有意义的事了。这里首先要说明的是,异步化不是万能,异步化并不能缩短你整个链路调用时间长的问题,但是它能极大的提升你的最大 QPS。

一般我们的业务中有两处比较耗时:

CPU:CPU 耗时指的是我们的一般的业务处理逻辑,比如一些数据的运算,对象的序列化。这些异步化是不能解决的,得需要靠一些算法的优化,或者一些高性能框架。

IO Wait:IO 耗时就像我们上面说的,一般发生在网络调用,文件传输中等等,这个时候线程一般会挂起阻塞。而我们的异步化通常用于解决这部分的问题。

哪些可以异步化

上面说了异步化是用于解决 IO 阻塞的问题,而我们一般项目中可以使用异步化的情况如下:

Servlet 异步化

Spring MVC 异步化

RPC 调用如(Dubbo,Thrift),HTTP 调用异步化

数据库调用,缓存调用异步化

下面我会从上面几个方面进行异步化的介绍。

Servlet 异步化

对于 Java 开发程序员来说 Servlet 并不陌生,在项目中不论你使用 Struts2,还是使用的 Spring MVC,本质上都是封装的 Servlet。

但是我们一般的开发都是使用的同步阻塞,模式如下:

图片 2

上面的模式优点在于编码简单,适合在项目启动初期,访问量较少,或者是 CPU 运算较多的项目。

缺点在于,业务逻辑线程和 Servlet 容器线程是同一个,一般的业务逻辑总得发生点 IO,比如查询数据库,比如产生 RPC 调用,这个时候就会发生阻塞。

而我们的 Servlet 容器线程肯定是有限的,当 Servlet 容器线程都被阻塞的时候我们的服务这个时候就会发生拒绝访问,线程不够我当然可以通过增加机器的一系列手段来解决这个问题。

但是俗话说得好靠人不如靠自己,靠别人替我分担请求,还不如我自己搞定。

所以在 Servlet 3.0 之后支持了异步化,我们采用异步化之后,模式变成如下:

图片 3

在这里我们采用新的线程处理业务逻辑,IO 调用的阻塞就不会影响我们的 Serlvet 了,实现异步 Serlvet 的代码也比较简单,如下:

@WebServlet(name = "WorkServlet",urlPatterns = "/work",asyncSupported =true)

public class WorkServlet extends HttpServlet{

private static final long serialVersionUID = 1L;

@Override

protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {

this.doPost(req, resp);

}

@Override

protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {

//设置ContentType,关闭缓存

resp.setContentType("text/plain;charset=UTF-8");

resp.setHeader("Cache-Control","private");

resp.setHeader("Pragma","no-cache");

final PrintWriter writer= resp.getWriter();

writer.println("老师检查作业了");

writer.flush();

List<String> zuoyes=new ArrayList<String>();

for (int i = 0; i < 10; i ) {

zuoyes.add("zuoye" i);;

}

//开启异步请求

final AsyncContext ac=req.startAsync();

doZuoye(ac, zuoyes);

writer.println;

writer.flush();

}

private void doZuoye(final AsyncContext ac, final List<String> zuoyes) {

ac.setTimeout(1*60*60*1000L);

ac.start(new Runnable() {

@Override

public void run() {

//通过response获得字符输出流

try {

PrintWriter writer=ac.getResponse().getWriter();

for (String zuoye:zuoyes) {

writer.println(""" zuoye ""请求处理中");

Thread.sleep;

writer.flush();

}

ac.complete();

} catch (Exception e) {

e.printStackTrace();

}

}

});

}

}

实现 Serlvet 的关键在于 HTTP 采取了长连接,也就是当请求打过来的时候就算有返回也不会关闭,因为可能还会有数据,直到返回关闭指令。

AsyncContext ac=req.startAsync();用于获取异步上下文,后续我们通过这个异步上下文进行回调返回数据,有点像我们买衣服的时候,留给老板一个电话。

而这个上下文也是一个电话,当有衣服到的时候,也就是当有数据准备好的时候就可以打电话发送数据了。ac.complete();用来进行长链接的关闭。

Spring MVC 异步化

现在其实很少人来进行 Serlvet 编程,都是直接采用现成的一些框架,比如 Struts2,Spring MVC。下面介绍下使用 Spring MVC 如何进行异步化:

首先确认你的项目中的 Servlet 是 3.0 以上,其次 Spring MVC 4.0 :

<dependency>

<groupId>javax.servlet</groupId>

<artifactId>javax.servlet-api</artifactId>

<version>3.1.0</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-webmvc</artifactId>

<version>4.2.3.RELEASE</version>

</dependency>

web.xml 头部声明,必须要 3.0,Filter 和 Serverlet 设置为异步:

<?xml version="1.0" encoding="UTF-8"?>

<web-app version="3.0" xmlns=""

xmlns:xsi=""

xsi:schemaLocation="

;

<filter>

<filter-name>testFilter</filter-name>

<filter-class>com.TestFilter</filter-class>

<async-supported>true</async-supported>

</filter>

<servlet>

<servlet-name>mvc-dispatcher</servlet-name>

<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>

.........

<async-supported>true</async-supported>

</servlet>

使用 Spring MVC 封装了 Servlet 的 AsyncContext,使用起来比较简单。以前我们同步的模式的 Controller 是返回 ModelAndView。

而异步模式直接生成一个 DeferredResult即可保存上下文,下面给出如何和我们 HttpClient 搭配的简单 demo:

@RequestMapping(value="/asynctask", method = RequestMethod.GET)

public DeferredResult<String> asyncTask() throws IOReactorException {

IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount.build();

ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);

PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor);

conManager.setMaxTotal;

conManager.setDefaultMaxPerRoute;

CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build();

// Start the client

httpclient.start();

//设置超时时间200ms

final DeferredResult<String> deferredResult = new DeferredResult<String>;

deferredResult.onTimeout(new Runnable() {

@Override

public void run() {

System.out.println("异步调用执行超时!thread id is : " Thread.currentThread);

deferredResult.setResult;

}

});

System.out.println("/asynctask 调用!thread id is : " Thread.currentThread);

final HttpGet request2 = new HttpGet("");

httpclient.execute(request2, new FutureCallback<HttpResponse>() {

public void completed(final HttpResponse response2) {

System.out.println(request2.getRequestLine() "->" response2.getStatusLine;

deferredResult.setResult(request2.getRequestLine() "->" response2.getStatusLine;

}

public void failed(final Exception ex) {

System.out.println(request2.getRequestLine() "->" ex);

}

public void cancelled() {

System.out.println(request2.getRequestLine() " cancelled");

}

});

return deferredResult;

}

注意:在 Serlvet 异步化中有个问题是 Filter 的后置结果处理,没法使用,对于我们一些打点,结果统计直接使用 Serlvet 异步是没法用的。

在 Spring MVC 中就很好的解决了这个问题,Spring MVC 采用了一个比较取巧的方式通过请求转发,能让请求再次通过过滤器。

但是又引入了新的一个问题那就是过滤器会处理两次,这里可以通过 Spring MVC 源码中自身判断的方法。

我们可以在 Filter 中使用下面这句话来进行判断是不是属于 Spring MVC 转发过来的请求,从而不处理 Filter 的前置事件,只处理后置事件:

Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE);

return asyncManagerAttr instanceof WebAsyncManager ;

全链路异步化

上面我们介绍了 Serlvet 的异步化,相信细心的同学都看出来似乎并没有解决根本的问题,我的 IO 阻塞依然存在,只是换了个位置而已。

当 IO 调用频繁同样会让业务线程池快速变满,虽然 Serlvet 容器线程不被阻塞,但是这个业务依然会变得不可用。

图片 4

那么怎么才能解决上面的问题呢?答案就是全链路异步化,全链路异步追求的是没有阻塞,打满你的 CPU,把机器的性能压榨到极致。模型图如下:

图片 5

具体的 NIO Client 到底做了什么事呢,具体如下面模型:

图片 6

上面就是我们全链路异步的图了(部分线程池可以优化)。全链路的核心在于只要我们遇到 IO 调用的时候,我们就可以使用 NIO,从而避免阻塞,也就解决了之前说的业务线程池被打满的尴尬场景。

远程调用异步化

我们一般远程调用使用 RPC 或者 HTTP:

对于 RPC 来说,一般 Thrift,HTTP,Motan 等支持都异步调用,其内部原理也都是采用事件驱动的 NIO 模型。

对于 HTTP 来说,一般的 Apache HTTP Client 和 Okhttp 也都提供了异步调用。

下面简单介绍下 HTTP 异步化调用是怎么做的。首先来看一个例子:

public class HTTPAsyncClientDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException, IOReactorException {

//具体参数含义下文会讲

//apache提供了ioReactor的参数配置,这里我们配置IO 线程为1

IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount.build();

//根据这个配置创建一个ioReactor

ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);

//asyncHttpClient使用PoolingNHttpClientConnectionManager管理我们客户端连接

PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor);

//设置总共的连接的最大数量

conManager.setMaxTotal;

//设置每个路由的连接的最大数量

conManager.setDefaultMaxPerRoute;

//创建一个Client

CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build();

// Start the client

httpclient.start();

// Execute request

final HttpGet request1 = new HttpGet("");

Future<HttpResponse> future = httpclient.execute(request1, null);

// and wait until a response is received

HttpResponse response1 = future.get();

System.out.println(request1.getRequestLine() "->" response1.getStatusLine;

// One most likely would want to use a callback for operation result

final HttpGet request2 = new HttpGet("");

httpclient.execute(request2, new FutureCallback<HttpResponse>() {

//Complete成功后会回调这个方法

public void completed(final HttpResponse response2) {

System.out.println(request2.getRequestLine() "->" response2.getStatusLine;

}

public void failed(final Exception ex) {

System.out.println(request2.getRequestLine() "->" ex);

}

public void cancelled() {

System.out.println(request2.getRequestLine() " cancelled");

}

});

}

}

下面给出 httpAsync 的整个类图:

图片 7

对于我们的 HTTPAysncClient 最后使用的是 InternalHttpAsyncClient,在 InternalHttpAsyncClient 中有个 ConnectionManager,这个就是我们管理连接的管理器。

而在 httpAsync 中只有一个实现那就是 PoolingNHttpClientConnectionManager。

这个连接管理器中有两个我们比较关心的,一个是 Reactor,一个是 Cpool:

Reactor:所有的 Reactor 这里都是实现了 IOReactor 接口。在 PoolingNHttpClientConnectionManager 中会有拥有一个 Reactor,那就是 DefaultConnectingIOReactor,这个 DefaultConnectingIOReactor,负责处理 Acceptor。

在 DefaultConnectingIOReactor 有个 excutor 方法,生成 IOReactor 也就是我们图中的 BaseIOReactor,进行 IO 的操作。这个模型就是我们上面的 1.2.2 的模型。

CPool:在 PoolingNHttpClientConnectionManager 中有个 CPool,主要是负责控制我们连接,我们上面所说的 maxTotal 和 defaultMaxPerRoute,都是由其进行控制。

如果每个路由有满了,它会断开最老的一个链接;如果总共的 total 满了,它会放入 leased 队列,释放空间的时候就会将其重新连接。

数据库调用异步化

对于数据库调用一般的框架并没有提供异步化的方法,这里推荐自己封装或者使用网上开源的。

异步化并不是高并发的银弹,但是有了异步化的确能提高你机器的 QPS,吞吐量等等。

上述讲的一些模型如果能合理的做一些优化,然后进行应用,相信能对你的服务有很大的帮助。

高并发大杀器:并行化

想必热爱游戏的同学小时候都幻想过要是自己会分身之术,就能一边打游戏一边上课了。

可惜现实中并没有这个技术,你要么只有老老实实的上课,要么就只有逃课去打游戏了。

虽然在现实中我们无法实现分身这样的技术,但是我们可以在计算机世界中实现这样的愿望。

计算机中的分身术

计算机中的分身术不是天生就有了。在 1971 年,英特尔推出的全球第一颗通用型微处理器 4004,由 2300 个晶体管构成。

当时,公司的联合创始人之一戈登摩尔就提出大名鼎鼎的“摩尔定律”——每过 18 个月,芯片上可以集成的晶体管数目将增加一倍。

最初的主频 740KHz(每秒运行 74 万次),现在过了快 50 年了,大家去买电脑的时候会发现现在的主频都能达到 4.0GHZ了。

但是主频越高带来的收益却是越来越小:

据测算,主频每增加 1G,功耗将上升 25 瓦,而在芯片功耗超过 150 瓦后,现有的风冷散热系统将无法满足散热的需要。有部分 CPU 都可以用来煎鸡蛋了。

流水线过长,使得单位频率效能低下,越大的主频其实整体性能反而不如小的主频。

戈登摩尔认为摩尔定律未来 10-20 年会失效。

在单核主频遇到瓶颈的情况下,多核 CPU 应运而生,不仅提升了性能,并且降低了功耗。

所以多核 CPU 逐渐成为现在市场的主流,这样让我们的多线程编程也更加的容易。

说到了多核 CPU 就一定要说 GPU,大家可能对这个比较陌生,但是一说到显卡就肯定不陌生,笔者搞过一段时间的 CUDA 编程,我才意识到这个才是真正的并行计算。

大家都知道图片像素点吧,比如 1920*1080 的图片有 210 万个像素点,如果想要把一张图片的每个像素点都进行转换一下,那在我们 Java 里面可能就要循环遍历 210 万次。

就算我们用多线程 8 核 CPU,那也得循环几十万次。但是如果使用 Cuda,最多可以 365535*512 = 100661760个线程并行执行,就这种级别的图片那也是马上处理完成。

但是 Cuda 一般适合于图片这种,有大量的像素点需要同时处理,但是指令集很少所以逻辑不能太复杂。

应用中的并行

一说起让你的服务高性能的手段,那么异步化,并行化这些肯定会第一时间在你脑海中显现出来,并行化可以用来配合异步化,也可以用来单独做优化。

我们可以想想有这么一个需求,在你下外卖订单的时候,这笔订单可能还需要查用户信息,折扣信息,商家信息,菜品信息等。

用同步的方式调用,如下图所示:

图片 8

设想一下这 5 个查询服务,平均每次消耗 50ms,那么本次调用至少是 250ms,我们细想一下,这五个服务其实并没有任何的依赖,谁先获取谁后获取都可以。

那么我们可以想想,是否可以用多重影分身之术,同时获取这五个服务的信息呢?

优化如下:

图片 9

将这五个查询服务并行查询,在理想情况下可以优化至 50ms。当然说起来简单,我们真正如何落地呢?

CountDownLatch/Phaser

CountDownLatch 和 Phaser 是 JDK 提供的同步工具类。Phaser 是 1.7 版本之后提供的工具类。而 CountDownLatch 是 1.5 版本之后提供的工具类。

这里简单介绍一下 CountDownLatch,可以将其看成是一个计数器,await()方法可以阻塞至超时或者计数器减至 0,其他线程当完成自己目标的时候可以减少 1,利用这个机制我们可以用来做并发。

可以用如下的代码实现我们上面的下订单的需求:

public class CountDownTask {

private static final int CORE_POOL_SIZE = 4;

private static final int MAX_POOL_SIZE = 12;

private static final long KEEP_ALIVE_TIME = 5L;

private final static int QUEUE_SIZE = 1600;

protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,

KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE));

public static void main(String[] args) throws InterruptedException {

// 新建一个为5的计数器

CountDownLatch countDownLatch = new CountDownLatch;

OrderInfo orderInfo = new OrderInfo();

THREAD_POOL.execute -> {

System.out.println("当前任务Customer,线程名字为:" Thread.currentThread().getName;

orderInfo.setCustomerInfo(new CustomerInfo;

countDownLatch.countDown();

});

THREAD_POOL.execute -> {

System.out.println("当前任务Discount,线程名字为:" Thread.currentThread().getName;

orderInfo.setDiscountInfo(new DiscountInfo;

countDownLatch.countDown();

});

THREAD_POOL.execute -> {

System.out.println("当前任务Food,线程名字为:" Thread.currentThread().getName;

orderInfo.setFoodListInfo(new FoodListInfo;

countDownLatch.countDown();

});

THREAD_POOL.execute -> {

System.out.println("当前任务Tenant,线程名字为:" Thread.currentThread().getName;

orderInfo.setTenantInfo(new TenantInfo;

countDownLatch.countDown();

});

THREAD_POOL.execute -> {

System.out.println("当前任务OtherInfo,线程名字为:" Thread.currentThread().getName;

orderInfo.setOtherInfo(new OtherInfo;

countDownLatch.countDown();

});

countDownLatch.await(1, TimeUnit.SECONDS);

System.out.println("主线程:" Thread.currentThread().getName;

}

}

建立一个线程池(具体配置根据具体业务,具体机器配置),进行并发的执行我们的任务(生成用户信息,菜品信息等),最后利用 await 方法阻塞等待结果成功返回。

CompletableFuture

相信各位同学已经发现,CountDownLatch 虽然能实现我们需要满足的功能但是其仍然有个问题是,我们的业务代码需要耦合 CountDownLatch 的代码。

比如在我们获取用户信息之后,我们会执行 countDownLatch.countDown(),很明显我们的业务代码显然不应该关心这一部分逻辑,并且在开发的过程中万一写漏了,那我们的 await 方法将只会被各种异常唤醒。

所以在 JDK 1.8 中提供了一个类 CompletableFuture,它是一个多功能的非阻塞的 Future。(什么是 Future:用来代表异步结果,并且提供了检查计算完成,等待完成,检索结果完成等方法。)

我们将每个任务的计算完成的结果都用 CompletableFuture 来表示,利用 CompletableFuture.allOf 汇聚成一个大的 CompletableFuture,那么利用 get()方法就可以阻塞。

public class CompletableFutureParallel {

private static final int CORE_POOL_SIZE = 4;

private static final int MAX_POOL_SIZE = 12;

private static final long KEEP_ALIVE_TIME = 5L;

private final static int QUEUE_SIZE = 1600;

protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE,

KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE));

public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {

OrderInfo orderInfo = new OrderInfo();

//CompletableFuture 的List

List<CompletableFuture> futures = new ArrayList<>();

futures.add(CompletableFuture.runAsync -> {

System.out.println("当前任务Customer,线程名字为:" Thread.currentThread().getName;

orderInfo.setCustomerInfo(new CustomerInfo;

}, THREAD_POOL));

futures.add(CompletableFuture.runAsync -> {

System.out.println("当前任务Discount,线程名字为:" Thread.currentThread().getName;

orderInfo.setDiscountInfo(new DiscountInfo;

}, THREAD_POOL));

futures.add( CompletableFuture.runAsync -> {

System.out.println("当前任务Food,线程名字为:" Thread.currentThread().getName;

orderInfo.setFoodListInfo(new FoodListInfo;

}, THREAD_POOL));

futures.add(CompletableFuture.runAsync -> {

System.out.println("当前任务Other,线程名字为:" Thread.currentThread().getName;

orderInfo.setOtherInfo(new OtherInfo;

}, THREAD_POOL));

CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size;

allDoneFuture.get(10, TimeUnit.SECONDS);

System.out.println(orderInfo);

}

}

可以看见我们使用 CompletableFuture 能很快的完成需求,当然这还不够。

Fork/Join

我们上面用 CompletableFuture 完成了对多组任务并行执行,但是它依然是依赖我们的线程池。

在我们的线程池中使用的是阻塞队列,也就是当我们某个线程执行完任务的时候需要通过这个阻塞队列进行,那么肯定会发生竞争,所以在 JDK 1.7 中提供了 ForkJoinTask 和 ForkJoinPool。

图片 10

ForkJoinPool 中每个线程都有自己的工作队列,并且采用 Work-Steal 算法防止线程饥饿。

Worker 线程用 LIFO 的方法取出任务,但是会用 FIFO 的方法去偷取别人队列的任务,这样就减少了锁的冲突。

图片 11

网上这个框架的例子很多,我们看看如何使用代码完成我们上面的下订单需求:

public class OrderTask extends RecursiveTask<OrderInfo> {

@Override

protected OrderInfo compute() {

System.out.println("执行" this.getClass().getSimpleName() "线程名字为:" Thread.currentThread().getName;

// 定义其他五种并行TasK

CustomerTask customerTask = new CustomerTask();

TenantTask tenantTask = new TenantTask();

DiscountTask discountTask = new DiscountTask();

FoodTask foodTask = new FoodTask();

OtherTask otherTask = new OtherTask();

invokeAll(customerTask, tenantTask, discountTask, foodTask, otherTask);

OrderInfo orderInfo = new OrderInfo(customerTask.join(), tenantTask.join(), discountTask.join(), foodTask.join(), otherTask.join;

return orderInfo;

}

public static void main(String[] args) {

ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors;

System.out.println(forkJoinPool.invoke(new OrderTask;

}

}

class CustomerTask extends RecursiveTask<CustomerInfo>{

@Override

protected CustomerInfo compute() {

System.out.println("执行" this.getClass().getSimpleName() "线程名字为:" Thread.currentThread().getName;

return new CustomerInfo();

}

}

class TenantTask extends RecursiveTask<TenantInfo>{

@Override

protected TenantInfo compute() {

System.out.println("执行" this.getClass().getSimpleName() "线程名字为:" Thread.currentThread().getName;

return new TenantInfo();

}

}

class DiscountTask extends RecursiveTask<DiscountInfo>{

@Override

protected DiscountInfo compute() {

System.out.println("执行" this.getClass().getSimpleName() "线程名字为:" Thread.currentThread().getName;

return new DiscountInfo();

}

}

class FoodTask extends RecursiveTask<FoodListInfo>{

@Override

protected FoodListInfo compute() {

System.out.println("执行" this.getClass().getSimpleName() "线程名字为:" Thread.currentThread().getName;

return new FoodListInfo();

}

}

class OtherTask extends RecursiveTask<OtherInfo>{

@Override

protected OtherInfo compute() {

System.out.println("执行" this.getClass().getSimpleName() "线程名字为:" Thread.currentThread().getName;

return new OtherInfo();

}

}

我们定义一个 Order Task 并且定义五个获取信息的任务,在 Compute 中分别 Fork 执行这五个任务,最后在将这五个任务的结果通过 Join 获得,最后完成我们的并行化的需求。

parallelStream

在 JDK 1.8 中提供了并行流的 API,当我们使用集合的时候能很好的进行并行处理。

下面举了一个简单的例子从 1 加到 100:

public class ParallelStream {

public static void main(String[] args) {

ArrayList<Integer> list = new ArrayList<Integer>();

for (int i = 1; i <= 100; i ) {

list.add;

}

LongAdder sum = new LongAdder();

list.parallelStream().forEach(integer -> {

// System.out.println("当前线程" Thread.currentThread().getName;

sum.add;

});

System.out.println;

}

}

parallelStream 中底层使用的那一套也是 Fork/Join 的那一套,默认的并发程度是可用 CPU 数 -1。

分片

可以想象有这么一个需求,每天定时对 ID 在某个范围之间的用户发券,比如这个范围之间的用户有几百万,如果给一台机器发的话,可能全部发完需要很久的时间。

所以分布式调度框架比如:elastic-job 都提供了分片的功能,比如你用 50 台机器,那么 idP = 0 的在第 0 台机器上;=1 的在第 1 台机器上发券,那么我们的执行时间其实就分摊到了不同的机器上了。

并行化注意事项

线程安全:在 parallelStream 中我们列举的代码中使用的是 LongAdder,并没有直接使用我们的 Integer 和 Long,这个是因为在多线程环境下 Integer 和 Long 线程不安全。所以线程安全我们需要特别注意。

合理参数配置:可以看见我们需要配置的参数比较多,比如我们的线程池的大小,等待队列大小,并行度大小以及我们的等待超时时间等等。

我们都需要根据自己的业务不断的调优防止出现队列不够用或者超时时间不合理等等。

上面介绍了什么是并行化,并行化的各种历史,在 Java 中如何实现并行化,以及并行化的注意事项。希望大家对并行化有个比较全面的认识。

欢迎工作一到五年的Java工程师朋友们加入Java架构开发: 855835163

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

本文由www.w88985.com发布于w88官方网站手机版,转载请注明出处:同异步开垦情势,CompletableFuture异步化职责管理

关键词: www.w88985.c