前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Mono的使用

Mono的使用

原创
作者头像
半月无霜
发布2024-08-01 22:59:42
1670
发布2024-08-01 22:59:42
举报
文章被收录于专栏:半月无霜

Mono的使用

一、介绍

最近在看gateway,发现里面是响应式编程,一看里面的代码

发现了Mono的使用,以前怎么没有注意,一下子看到还真的不认识

那么简单看看这是一个什么类,有什么用

Java中,Mono 类是Spring Reactor框架中的一个核心组件,它是Reactive Streams规范的一个实现,主要用于处理包含零个或一个元素的异步序列。Mono可以代表未来某个时刻可能出现的单一值,或者表示没有值(即空值)。这种类型的反应式类型非常适合那些你期望返回单个结果(比如查询数据库得到的单个实体)的情况。

简单的来说,类似与Optional的一个包装类,对一个对象进行包装,然后进行处理

那直接来看看,如何进行使用

二、使用

1)初解使用

代码语言:java
复制
package com.banmoon.mono;
​
import org.junit.Test;
import reactor.core.publisher.Mono;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void beginner() {
        // 生成一个Mono包装对象
        Mono<String> helloWorld = Mono.just("hello world");
        Mono<String> empty = Mono.empty();
        // 订阅使用
        helloWorld.subscribe(System.out::println);
        empty.subscribe(System.out::println);
    }
​
}

2)异步源

代码语言:java
复制
package com.banmoon.mono;
​
import org.junit.Test;
import reactor.core.publisher.Mono;
​
import java.util.concurrent.CompletableFuture;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void async() {
        // 还可以使用这种异步源创建Mono包装对象
        Mono<String> fromCallable = Mono.fromCallable(() -> "hello world");
        Mono<String> fromCompletionStage = Mono.fromCompletionStage(CompletableFuture.supplyAsync(() -> "hello world"));
        Mono<String> fromFuture = Mono.fromFuture(CompletableFuture.completedFuture("hello world"));
        Mono<String> create = Mono.create(monoSink -> monoSink.success("hello world"));
        // 订阅使用
        fromCallable.subscribe(System.out::println);
        fromCompletionStage.subscribe(System.out::println);
        fromFuture.subscribe(System.out::println);
        create.subscribe(System.out::println);
    }
​
}

3)其他订阅处理

代码语言:java
复制
package com.banmoon.mono;
​
import com.banmoon.business.exception.BanmoonException;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Mono;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void otherSubscribe() {
        // 生成一个Mono包装对象
        Mono<String> helloWorld = Mono.just("hello world");
        Mono<?> banmoonException = Mono.error(new BanmoonException("自定义异常"));
        Mono<String> subscribeMono = Mono.just("subscribe");
        // 订阅使用
        helloWorld.subscribe(System.out::println, System.err::println, () -> System.out.println("complete"));
        banmoonException.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("complete"));
        // 使用Subscriber入参,高度自定义订阅;通常情况下,我们使用上面的重载方法即可
        subscribeMono.subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription subscription) {
                // 当订阅开始时,请求最大数量的数据
                subscription.request(Integer.MAX_VALUE);
            }
​
            @Override
            public void onNext(String s) {
                System.out.println(s);
            }
​
            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }
​
            @Override
            public void onComplete() {
                System.out.println("complete");
            }
        });
    }
​
}

4)map映射转换

代码语言:java
复制
package com.banmoon.mono;
​
import org.junit.Test;
import reactor.core.publisher.Mono;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void mapConvert() {
        // 一个Mono包装对象
        Mono<String> helloWorld = Mono.just("hello world");
        // map转换
        helloWorld = helloWorld.map(String::toUpperCase);
        // 订阅输出
        helloWorld.subscribe(System.out::println);
​
        // 一个Mono包装对象
        Mono<String> helloWorld1 = Mono.just("hello world");
        // flatMap转换
        helloWorld1 = helloWorld1.flatMap(s -> Mono.just(s.toUpperCase()));
        // 订阅输出
        helloWorld1.subscribe(System.out::println);
    }
​
}

5)filter过滤

代码语言:java
复制
package com.banmoon.mono;
​
import org.junit.Test;
import reactor.core.publisher.Mono;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void filter() {
        // 一个Mono包装对象
        Mono<String> helloWorld = Mono.just("hello world");
        // filter
        Mono<String> banmoon = helloWorld.filter(s -> s.contains("banmoon"));
        // 订阅输出
        banmoon.subscribe(System.out::println);
    }
​
}

6)异常的处理

代码语言:java
复制
package com.banmoon.mono;
​
import com.banmoon.business.exception.BanmoonException;
import org.junit.Test;
import reactor.core.publisher.Mono;
​
import java.security.InvalidParameterException;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void error() {
        // 模拟在处理过程中,可能会出现异常
        Mono<String> fromCallable = Mono.fromCallable(() -> {
            String str = "hello world";
            throw new BanmoonException("自定义异常");
        });
        // 可以在订阅前面,提前处理这个异常,异常处理,提供一个Mono包装对象
        Mono<String> fromCallable1 = fromCallable.onErrorResume(throwable -> Mono.just(throwable.getMessage()));
        fromCallable1.subscribe(System.out::println);
​
        // 异常处理,提供一个值
        Mono<String> fromCallable2 = fromCallable.onErrorReturn(throwable -> throwable instanceof BanmoonException, "banmoon自定义异常");
        fromCallable2.subscribe(System.out::println);
​
        // 异常处理,将error转换成另一个
        Mono<String> fromCallable3 = fromCallable.onErrorMap(throwable -> new InvalidParameterException("自定义异常"));
        fromCallable3.subscribe(System.err::println);
​
        // 异常处理,对异常进行处理,没有返回值,还是原本的fromCallable
        Mono<String> fromCallable4 = fromCallable.doOnError(throwable -> System.err.println(throwable.getMessage()));
        fromCallable4.subscribe();
    }
​
}

7)延迟

代码语言:java
复制
package com.banmoon.mono;
​
import org.junit.Test;
import reactor.core.publisher.Mono;
​
import java.time.Duration;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void delay() {
        // 创建一个包装对象
        Mono<String> helloWorld = Mono.just("hello world");
        // 延迟3秒
        Mono<String> delayElement = helloWorld.delayElement(Duration.ofSeconds(3));
        // 订阅输出
        delayElement.subscribe(System.out::println);
        // 由于 Mono 是非阻塞的,为了确保主线程等待 Mono 完成,
        // 我们需要在这里阻塞主线程,否则程序会立即退出
        // 注意:在实际应用中,你通常不需要这样做,因为 Mono 通常是在事件循环或异步上下文中使用的
        delayElement.block();
​
        // 另一种方式
        Mono<String> helloWorld1 = Mono.just("hello world").delaySubscription(Duration.ofSeconds(3));
        helloWorld1.subscribe(System.out::println);
        // 避免退出
        helloWorld1.block();
    }
​
}

8)多对象包装Flux

代码语言:java
复制
package com.banmoon.mono;
​
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void toFluxOrMono() {
        // 创建一个包装对象
        Mono<String> helloWorld = Mono.just("Hello World");
        // 转换为Flux包装对象
        Flux<String> flux = helloWorld.flux();
        // 订阅输出
        flux.subscribe(System.out::println);
​
        // 创建一个Flux包装对象
        Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4, 5);
        // 转换为Mono对象
        Mono<Integer> integerMono = integerFlux.next();
        Mono<Integer> integerMono1 = integerFlux.last();
        // 订阅输出
        integerMono.subscribe(System.out::println);
        integerMono1.subscribe(System.out::println);
    }
​
}

9)链式调用

代码语言:java
复制
package com.banmoon.mono;
​
import com.banmoon.business.exception.BanmoonException;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
​
/**
 * @author banmoon
 * @date 2024/08/01 10:42:46
 */
public class MonoTest {
​
    @Test
    public void chain() {
        // 链式调用
        Flux.just(1, 2, 3, 4, 5)
                .filter(i -> i > 3)
                .mapNotNull(i -> i * 2)
                .next()
                .subscribe(System.out::println);
        // 链式调用,正常情况
        Mono.just(1)
                .subscribe(System.out::println, throwable -> System.out.println(throwable.getMessage()), () -> System.out.println("完成"));
        // 链式调用,异常情况
        Mono.fromCallable(() -> {
            throw new BanmoonException("异常");
        }).subscribe(System.out::println, throwable -> System.out.println(throwable.getMessage()));
    }
​
}

三、最后

MonoFlux这都是响应式中必会的,不然你都看不懂写的啥,多看看就行

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Mono的使用
    • 一、介绍
      • 二、使用
        • 1)初解使用
        • 2)异步源
        • 3)其他订阅处理
        • 4)map映射转换
        • 5)filter过滤
        • 6)异常的处理
        • 7)延迟
        • 8)多对象包装Flux
        • 9)链式调用
      • 三、最后
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档