import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.util.Iterator; public class ReactorTest { private static final Logger LOGGER = LoggerFactory.getLogger(ReactorTest.class); /** * 随便测试下 */ @Test public void concurrentTest(){ //这里没有什么用,纯粹是Schedulers.elastic()可以复用这里的线程池,不想写多的代码了 Flux.range(1,100) .map(a -> a*1) .subscribeOn(Schedulers.elastic()) .subscribe(System.out::println); //开始测试了 long start = System.currentTimeMillis(); //第一个参数20 20个并发 //后面表示N个请求,最长的一个请求可能要2000ms list(20, 1000l,2000l,100l,200l,300l,400l,500l,600l,700l,800l,900l) .forEachRemaining( show -> LOGGER.info(show) ); LOGGER.info("总时间 : {} ms", System.currentTimeMillis() - start ); } /** * 并行执行 * @param concurrent 并行数量 * @param sleeps 模拟停顿时间 * @return 随便返回了 */ public Iterator list(int concurrent, Long... sleeps){ return Flux.fromArray(sleeps) .log() .flatMap(sleep -> Mono.fromCallable( () -> mockHttp(sleep)).subscribeOn(Schedulers.elastic()), concurrent) .toIterable().iterator(); } /** * 实际上是一个http请求 * @param sleep 请求耗时 * @return */ public String mockHttp(long sleep){ try { Thread.sleep(sleep); LOGGER.info("停顿{}ms真的执行了", sleep); } catch (InterruptedException e) { e.printStackTrace(); } return String.format("停顿了%sms", sleep); } }