zm
2020-05-18 a18bfacbf56b401f6e0fdae8710fbca4df8cff77
commit | author | age
a18bfa 1 import org.junit.Test;
Z 2 import org.slf4j.Logger;
3 import org.slf4j.LoggerFactory;
4 import reactor.core.publisher.Flux;
5 import reactor.core.publisher.Mono;
6 import reactor.core.scheduler.Schedulers;
7
8 import java.util.Iterator;
9
10 public class ReactorTest {
11
12     private static final Logger LOGGER = LoggerFactory.getLogger(ReactorTest.class);
13
14
15     /**
16      * 随便测试下
17      */
18     @Test
19     public void concurrentTest(){
20
21         //这里没有什么用,纯粹是Schedulers.elastic()可以复用这里的线程池,不想写多的代码了
22         Flux.range(1,100)
23                 .map(a -> a*1)
24                 .subscribeOn(Schedulers.elastic())
25                 .subscribe(System.out::println);
26
27         //开始测试了
28         long start = System.currentTimeMillis();
29
30
31         //第一个参数20 20个并发
32         //后面表示N个请求,最长的一个请求可能要2000ms
33         list(20, 1000l,2000l,100l,200l,300l,400l,500l,600l,700l,800l,900l)
34                 .forEachRemaining( show ->  LOGGER.info(show) );
35
36         LOGGER.info("总时间 : {} ms", System.currentTimeMillis() - start );
37
38     }
39
40     /**
41      * 并行执行
42      * @param concurrent 并行数量
43      * @param sleeps 模拟停顿时间
44      * @return 随便返回了
45      */
46     public Iterator<String> list(int concurrent, Long... sleeps){
47         return Flux.fromArray(sleeps)
48                 .log()
49                 .flatMap(sleep -> Mono.fromCallable( () -> mockHttp(sleep)).subscribeOn(Schedulers.elastic()), concurrent)
50                 .toIterable().iterator();
51     }
52
53     /**
54      * 实际上是一个http请求
55      * @param sleep 请求耗时
56      * @return
57      */
58     public String mockHttp(long sleep){
59         try {
60             Thread.sleep(sleep);
61             LOGGER.info("停顿{}ms真的执行了", sleep);
62         } catch (InterruptedException e) {
63             e.printStackTrace();
64         }
65         return String.format("停顿了%sms", sleep);
66     }
67
68 }