Skip to content

Commit

Permalink
perf: replace concatMap to flatMapSequential to improve parallelism a…
Browse files Browse the repository at this point in the history
…nd efficiency (#6706)

#### What type of PR is this?
/kind improvement
/area core
/milestone 2.20.x

#### What this PR does / why we need it:
将 concatMap 替换为 flatMapSequential 以提高并行度和执行效率

可以看一下这个场景示例来模拟像文章列表 API 的数据组装
假如每个步骤的执行时间是 1s 有 4 个步骤 同时 Flux 发出 4 条数据:

```java
@test  
void test() {  
    var startMs = System.currentTimeMillis();  
  
    var monoA = Mono.fromSupplier(  
            () -> {  
                sleep();  
                return "A";  
            })        .subscribeOn(Schedulers.boundedElastic());  
  
    var monoB = Mono.fromSupplier(  
            () -> {  
                sleep();  
                return "B";  
            })        .subscribeOn(Schedulers.boundedElastic());  
  
    var monoC = Mono.fromSupplier(  
            () -> {  
                sleep();  
                return "C";  
            })        .subscribeOn(Schedulers.boundedElastic());  
  
    var monoD = Mono.fromSupplier(  
            () -> {  
                sleep();  
                return "D";  
            })        .subscribeOn(Schedulers.boundedElastic());  
  
    var convert = Mono.when(monoA, monoB, monoC, monoD);  

    Flux.just("1", "2", "3", "4")
        // concatMap(convert::thenReturn)
        .flatMapSequential(convert::thenReturn)  
        .collectList()  
        .block(); 

    System.out.println("Time: " + (System.currentTimeMillis() - startMs));  
}

private static void sleep() {  
    try {  
        Thread.sleep(1000);  
    } catch (InterruptedException e) {  
        throw new RuntimeException(e);  
    }
}
```
**结果:**
1. 如果每个步骤没有加  subscribeOn 且使用 concatMap 耗时: 16362 ms
2. 每个步骤使用 subscribeOn 且使用 concatMap 耗时: 4174 ms
3. 每个步骤使用 subscribeOn 且使用 flatMapSequential 耗时: 1185 ms

#### Does this PR introduce a user-facing change?
```release-note
提升页面访问速度
```
  • Loading branch information
guqing authored Oct 10, 2024
1 parent 02c5484 commit 25c54d7
Show file tree
Hide file tree
Showing 12 changed files with 17 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Mono<ListResult<ListedComment>> listComment(CommentQuery commentQuery) {
commentQuery.toPageRequest())
.flatMap(comments -> Flux.fromStream(comments.get()
.map(this::toListedComment))
.concatMap(Function.identity())
.flatMapSequential(Function.identity())
.collectList()
.map(list -> new ListResult<>(comments.getPage(), comments.getSize(),
comments.getTotal(), list)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public Mono<ListResult<ListedReply>> list(ReplyQuery query) {
return client.listBy(Reply.class, query.toListOptions(), query.toPageRequest())
.flatMap(list -> Flux.fromStream(list.get()
.map(this::toListedReply))
.concatMap(Function.identity())
.flatMapSequential(Function.identity())
.collectList()
.map(listedReplies -> new ListResult<>(list.getPage(), list.getSize(),
list.getTotal(), listedReplies))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public Mono<ListResult<ListedPost>> listPost(PostQuery query) {
)
.flatMap(listResult -> Flux.fromStream(listResult.get())
.map(this::getListedPost)
.concatMap(Function.identity())
.flatMapSequential(Function.identity())
.collectList()
.map(listedPosts -> new ListResult<>(listResult.getPage(), listResult.getSize(),
listResult.getTotal(), listedPosts)
Expand Down Expand Up @@ -175,7 +175,7 @@ private Flux<Contributor> listContributors(List<String> usernames) {
return Flux.empty();
}
return Flux.fromIterable(usernames)
.concatMap(userService::getUserOrGhost)
.flatMapSequential(userService::getUserOrGhost)
.map(user -> {
Contributor contributor = new Contributor();
contributor.setName(user.getMetadata().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public Flux<ListedSnapshotDto> listSnapshots(String pageName) {
public Mono<ListResult<ListedSinglePage>> list(SinglePageQuery query) {
return client.listBy(SinglePage.class, query.toListOptions(), query.toPageRequest())
.flatMap(listResult -> Flux.fromStream(listResult.get().map(this::getListedSinglePage))
.concatMap(Function.identity())
.flatMapSequential(Function.identity())
.collectList()
.map(listedSinglePages -> new ListResult<>(
listResult.getPage(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public Flux<DataBuffer> uglifyJsBundle() {
});
var body = Flux.fromIterable(startedPlugins)
.sort(Comparator.comparing(PluginWrapper::getPluginId))
.concatMap(pluginWrapper -> {
.flatMapSequential(pluginWrapper -> {
var pluginId = pluginWrapper.getPluginId();
return Mono.<Resource>fromSupplier(
() -> BundleResourceUtils.getJsBundleResource(
Expand All @@ -294,7 +294,7 @@ public Flux<DataBuffer> uglifyJsBundle() {
public Flux<DataBuffer> uglifyCssBundle() {
return Flux.fromIterable(pluginManager.getStartedPlugins())
.sort(Comparator.comparing(PluginWrapper::getPluginId))
.concatMap(pluginWrapper -> {
.flatMapSequential(pluginWrapper -> {
var pluginId = pluginWrapper.getPluginId();
var dataBufferFactory = DefaultDataBufferFactory.sharedInstance;
return Mono.<Resource>fromSupplier(() -> BundleResourceUtils.getJsBundleResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ private <T extends ExtensionPoint> Flux<T> getEnabledExtensions(String epdName,
}
var extensions = getExtensions(extensionPoint).cache();
return Flux.fromIterable(extensionDefNames)
.concatMap(extensionDefName ->
.flatMapSequential(extensionDefName ->
client.fetch(ExtensionDefinition.class, extensionDefName)
)
.concatMap(extensionDef -> {
.flatMapSequential(extensionDef -> {
var className = extensionDef.getSpec().getClassName();
return extensions.filter(
extension -> Objects.equals(extension.getClass().getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public Mono<ListResult<CommentVo>> list(Ref ref, PageRequest pageParam) {
return client.listBy(Comment.class, listOptions, pageRequest)
.flatMap(listResult -> Flux.fromStream(listResult.get())
.map(this::toCommentVo)
.concatMap(Function.identity())
.flatMapSequential(Function.identity())
.collectList()
.map(commentVos -> new ListResult<>(listResult.getPage(),
listResult.getSize(),
Expand All @@ -102,7 +102,7 @@ public Mono<ListResult<CommentVo>> list(Ref ref, PageRequest pageParam) {
public Mono<ListResult<CommentWithReplyVo>> convertToWithReplyVo(ListResult<CommentVo> comments,
int replySize) {
return Flux.fromIterable(comments.getItems())
.concatMap(commentVo -> {
.flatMapSequential(commentVo -> {
var commentName = commentVo.getMetadata().getName();
return listReply(commentName, 1, replySize)
.map(replyList -> CommentWithReplyVo.from(commentVo)
Expand Down Expand Up @@ -135,7 +135,7 @@ public Mono<ListResult<ReplyVo>> listReply(String commentName, PageRequest pageP
.orElse(PageRequestImpl.ofSize(0));
return client.listBy(Reply.class, listOptions, pageRequest)
.flatMap(list -> Flux.fromStream(list.get().map(this::toReplyVo))
.concatMap(Function.identity())
.flatMapSequential(Function.identity())
.collectList()
.map(replyVos -> new ListResult<>(list.getPage(), list.getSize(),
list.getTotal(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ public Flux<ContributorVo> getContributors(List<String> names) {
return Flux.empty();
}
return Flux.fromIterable(names)
.concatMap(this::getContributor);
.flatMapSequential(this::getContributor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public Mono<ListResult<PostArchiveVo>> archives(Integer page, Integer size, Stri
public Flux<ListedPostVo> listAll() {
return postPredicateResolver.getListOptions()
.flatMapMany(listOptions -> client.listAll(Post.class, listOptions, defaultSort()))
.concatMap(postPublicQueryService::convertToListedVo);
.flatMapSequential(postPublicQueryService::convertToListedVo);
}

static int pageNullSafe(Integer page) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public Mono<ListResult<ListedPostVo>> list(ListOptions queryOptions, PageRequest
})
.flatMap(listOptions -> client.listBy(Post.class, listOptions, page))
.flatMap(list -> Flux.fromStream(list.get())
.concatMap(post -> convertToListedVo(post)
.flatMapSequential(post -> convertToListedVo(post)
.flatMap(postVo -> populateStats(postVo)
.doOnNext(postVo::setStats).thenReturn(postVo)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public Mono<ListResult<ListedSinglePageVo>> listBy(ListOptions listOptions,

return client.listBy(SinglePage.class, rewroteListOptions, rewrotePageRequest)
.flatMap(list -> Flux.fromStream(list.get())
.concatMap(this::convertToListedVo)
.flatMapSequential(this::convertToListedVo)
.collectList()
.map(pageVos ->
new ListResult<>(list.getPage(), list.getSize(), list.getTotal(), pageVos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public Mono<TagVo> getByName(String name) {
@Override
public Flux<TagVo> getByNames(List<String> names) {
return Flux.fromIterable(names)
.concatMap(this::getByName);
.flatMapSequential(this::getByName);
}

@Override
Expand Down

0 comments on commit 25c54d7

Please sign in to comment.