最近在看gateway
,发现里面是响应式编程,一看里面的代码
发现了Mono
的使用,以前怎么没有注意,一下子看到还真的不认识
那么简单看看这是一个什么类,有什么用
在Java
中,Mono
类是Spring Reactor
框架中的一个核心组件,它是Reactive Streams
规范的一个实现,主要用于处理包含零个或一个元素的异步序列。Mono
可以代表未来某个时刻可能出现的单一值,或者表示没有值(即空值)。这种类型的反应式类型非常适合那些你期望返回单个结果(比如查询数据库得到的单个实体)的情况。
简单的来说,类似与Optional
的一个包装类,对一个对象进行包装,然后进行处理
那直接来看看,如何进行使用
package com.banmoon.mono;
import org.junit.Test;
import reactor.core.publisher.Mono;
/**
* @author banmoon
* @date 2024/08/01 10:42:46
*/
public class MonoTest {
@Test
public void beginner() {
// 生成一个Mono包装对象
Mono<String> helloWorld = Mono.just("hello world");
Mono<String> empty = Mono.empty();
// 订阅使用
helloWorld.subscribe(System.out::println);
empty.subscribe(System.out::println);
}
}
package com.banmoon.mono;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.util.concurrent.CompletableFuture;
/**
* @author banmoon
* @date 2024/08/01 10:42:46
*/
public class MonoTest {
@Test
public void async() {
// 还可以使用这种异步源创建Mono包装对象
Mono<String> fromCallable = Mono.fromCallable(() -> "hello world");
Mono<String> fromCompletionStage = Mono.fromCompletionStage(CompletableFuture.supplyAsync(() -> "hello world"));
Mono<String> fromFuture = Mono.fromFuture(CompletableFuture.completedFuture("hello world"));
Mono<String> create = Mono.create(monoSink -> monoSink.success("hello world"));
// 订阅使用
fromCallable.subscribe(System.out::println);
fromCompletionStage.subscribe(System.out::println);
fromFuture.subscribe(System.out::println);
create.subscribe(System.out::println);
}
}
package com.banmoon.mono;
import com.banmoon.business.exception.BanmoonException;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Mono;
/**
* @author banmoon
* @date 2024/08/01 10:42:46
*/
public class MonoTest {
@Test
public void otherSubscribe() {
// 生成一个Mono包装对象
Mono<String> helloWorld = Mono.just("hello world");
Mono<?> banmoonException = Mono.error(new BanmoonException("自定义异常"));
Mono<String> subscribeMono = Mono.just("subscribe");
// 订阅使用
helloWorld.subscribe(System.out::println, System.err::println, () -> System.out.println("complete"));
banmoonException.subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("complete"));
// 使用Subscriber入参,高度自定义订阅;通常情况下,我们使用上面的重载方法即可
subscribeMono.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription subscription) {
// 当订阅开始时,请求最大数量的数据
subscription.request(Integer.MAX_VALUE);
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
}
}
package com.banmoon.mono;
import org.junit.Test;
import reactor.core.publisher.Mono;
/**
* @author banmoon
* @date 2024/08/01 10:42:46
*/
public class MonoTest {
@Test
public void mapConvert() {
// 一个Mono包装对象
Mono<String> helloWorld = Mono.just("hello world");
// map转换
helloWorld = helloWorld.map(String::toUpperCase);
// 订阅输出
helloWorld.subscribe(System.out::println);
// 一个Mono包装对象
Mono<String> helloWorld1 = Mono.just("hello world");
// flatMap转换
helloWorld1 = helloWorld1.flatMap(s -> Mono.just(s.toUpperCase()));
// 订阅输出
helloWorld1.subscribe(System.out::println);
}
}
package com.banmoon.mono;
import org.junit.Test;
import reactor.core.publisher.Mono;
/**
* @author banmoon
* @date 2024/08/01 10:42:46
*/
public class MonoTest {
@Test
public void filter() {
// 一个Mono包装对象
Mono<String> helloWorld = Mono.just("hello world");
// filter
Mono<String> banmoon = helloWorld.filter(s -> s.contains("banmoon"));
// 订阅输出
banmoon.subscribe(System.out::println);
}
}
package com.banmoon.mono;
import com.banmoon.business.exception.BanmoonException;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.security.InvalidParameterException;
/**
* @author banmoon
* @date 2024/08/01 10:42:46
*/
public class MonoTest {
@Test
public void error() {
// 模拟在处理过程中,可能会出现异常
Mono<String> fromCallable = Mono.fromCallable(() -> {
String str = "hello world";
throw new BanmoonException("自定义异常");
});
// 可以在订阅前面,提前处理这个异常,异常处理,提供一个Mono包装对象
Mono<String> fromCallable1 = fromCallable.onErrorResume(throwable -> Mono.just(throwable.getMessage()));
fromCallable1.subscribe(System.out::println);
// 异常处理,提供一个值
Mono<String> fromCallable2 = fromCallable.onErrorReturn(throwable -> throwable instanceof BanmoonException, "banmoon自定义异常");
fromCallable2.subscribe(System.out::println);
// 异常处理,将error转换成另一个
Mono<String> fromCallable3 = fromCallable.onErrorMap(throwable -> new InvalidParameterException("自定义异常"));
fromCallable3.subscribe(System.err::println);
// 异常处理,对异常进行处理,没有返回值,还是原本的fromCallable
Mono<String> fromCallable4 = fromCallable.doOnError(throwable -> System.err.println(throwable.getMessage()));
fromCallable4.subscribe();
}
}
package com.banmoon.mono;
import org.junit.Test;
import reactor.core.publisher.Mono;
import java.time.Duration;
/**
* @author banmoon
* @date 2024/08/01 10:42:46
*/
public class MonoTest {
@Test
public void delay() {
// 创建一个包装对象
Mono<String> helloWorld = Mono.just("hello world");
// 延迟3秒
Mono<String> delayElement = helloWorld.delayElement(Duration.ofSeconds(3));
// 订阅输出
delayElement.subscribe(System.out::println);
// 由于 Mono 是非阻塞的,为了确保主线程等待 Mono 完成,
// 我们需要在这里阻塞主线程,否则程序会立即退出
// 注意:在实际应用中,你通常不需要这样做,因为 Mono 通常是在事件循环或异步上下文中使用的
delayElement.block();
// 另一种方式
Mono<String> helloWorld1 = Mono.just("hello world").delaySubscription(Duration.ofSeconds(3));
helloWorld1.subscribe(System.out::println);
// 避免退出
helloWorld1.block();
}
}
package com.banmoon.mono;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author banmoon
* @date 2024/08/01 10:42:46
*/
public class MonoTest {
@Test
public void toFluxOrMono() {
// 创建一个包装对象
Mono<String> helloWorld = Mono.just("Hello World");
// 转换为Flux包装对象
Flux<String> flux = helloWorld.flux();
// 订阅输出
flux.subscribe(System.out::println);
// 创建一个Flux包装对象
Flux<Integer> integerFlux = Flux.just(1, 2, 3, 4, 5);
// 转换为Mono对象
Mono<Integer> integerMono = integerFlux.next();
Mono<Integer> integerMono1 = integerFlux.last();
// 订阅输出
integerMono.subscribe(System.out::println);
integerMono1.subscribe(System.out::println);
}
}
package com.banmoon.mono;
import com.banmoon.business.exception.BanmoonException;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author banmoon
* @date 2024/08/01 10:42:46
*/
public class MonoTest {
@Test
public void chain() {
// 链式调用
Flux.just(1, 2, 3, 4, 5)
.filter(i -> i > 3)
.mapNotNull(i -> i * 2)
.next()
.subscribe(System.out::println);
// 链式调用,正常情况
Mono.just(1)
.subscribe(System.out::println, throwable -> System.out.println(throwable.getMessage()), () -> System.out.println("完成"));
// 链式调用,异常情况
Mono.fromCallable(() -> {
throw new BanmoonException("异常");
}).subscribe(System.out::println, throwable -> System.out.println(throwable.getMessage()));
}
}
像Mono
、Flux
这都是响应式中必会的,不然你都看不懂写的啥,多看看就行
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。