欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > IT业 > spring boot 集成Rsocket,服务端、客户端

spring boot 集成Rsocket,服务端、客户端

2025/4/15 16:08:06 来源:https://blog.csdn.net/weixin_41463944/article/details/140198244  浏览:    关键词:spring boot 集成Rsocket,服务端、客户端

一、简介

RSocket 是一种二进制协议,可用于 TCP、WebSockets 和 Aeron 等字节流传输的应用协议,具有以下交互模型:

1、Request-Response: 发送一条信息,接收一条信息。

2、Request-Stream: 发送一条消息并接收返回的消息流。

3、Channel: 双向发送消息流。

4、Fire-and-Forget: 发送单向消息。
二、服务端代码

1、安装依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-rsocket</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency>

2、配置文件添加如下:

spring:rsocket:server:port: 9898transport: tcp

3、服务端测试代码

package com.example.rsocketservice.controller;import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.time.Duration;
import java.util.Random;@RestController
public class SendController {//Request-Response模式@MessageMapping("message")public Mono<String> handleMessage(Mono<String> message) {return message.doOnNext(msg -> {System.out.printf("接收到消息:%s%n", msg) ;}).map(msg -> "服务器成功收到了你的消息!!!") ;}//Request-Stream模式// 必须返回Flux@MessageMapping("stream")public Flux<String> handleStream() {return Flux.interval(Duration.ofSeconds(2))// 随机生成.map(i -> String.valueOf(new Random().nextInt(10000000)))// 只在此通道中获取10个值.take(10).doOnComplete(() -> {System.out.println("completed...") ;}) ;}//Channel模式@MessageMapping("channel")public Flux<String> handleChannel(Flux<String> datas) {return datas.doOnNext(ret -> {System.out.printf("【server】%s - 接收到数据: %s%n", Thread.currentThread().getName(), ret) ;}).map(ret -> {return ret + " - " + new Random().nextInt(1000) ;}) ;}//Fire-and-Forget模式@MessageMapping("faf")public Mono<Void> handleFireAndForget(Mono<String> data) {return data.doOnNext(ret -> {System.out.printf("【server】%s - 接收到数据: %s%n", Thread.currentThread().getName(), ret) ;}).then() ;}
}

三、客户端测试代码

1、安装依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-rsocket</artifactId></dependency>

2、新建配置类ClientConfiguration

package com.example.rsocketclient.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.codec.json.Jackson2JsonDecoder;
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;@Configuration
public class ClientConfiguration {@BeanRSocketRequester rSocketRequester(/*RSocketStrategies rSocketStrategies*/) {RSocketStrategies strategies = RSocketStrategies.builder()
//                .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
//                .decoders(decoders -> decoders.add(new Jackson2CborDecoder())).encoders(encoders -> encoders.add(new Jackson2JsonEncoder())).decoders(decoders -> decoders.add(new Jackson2JsonDecoder())).build();RSocketRequester requester = RSocketRequester.builder().rsocketStrategies(strategies).tcp("localhost", 9898);return requester;}
}

3、测试代码

package com.example.rsocketclient.controller;import jakarta.annotation.Resource;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import java.time.Duration;
import java.util.Random;@RestController
public class TestController {@Resourceprivate RSocketRequester rsocketRequester;//Request-Response模式@GetMapping("/message/{body}")// Request-Response 发送一条信息,接收一条信息。public void sendMessage(@PathVariable("body") String body) {this.rsocketRequester.route("message").data(body).retrieveMono(String.class).subscribe(System.out::println) ;}//Request-Stream模式@GetMapping("stream")public void sendStream() {this.rsocketRequester.route("stream").retrieveFlux(String.class).subscribe(ret -> {System.out.printf("%s - 接受到数据: %s%n", Thread.currentThread().getName(), ret) ;}) ;}@GetMapping("channel")// Channel 双向发送消息流。public void sendChannel() {this.rsocketRequester.route("channel").data(Flux.just("1", "2", "3", "4", "5", "6").delayElements(Duration.ofSeconds(1))).retrieveFlux(String.class).subscribe(ret -> {System.out.printf("【client】%s - 接受到数据: %s%n", Thread.currentThread().getName(), ret) ;}) ;}@GetMapping("sendFireAndForget")// Fire-and-Forget 发送单向消息。public void sendFireAndForget() {this.rsocketRequester.route("faf").data(Mono.just(String.valueOf(new Random().nextInt(1000)))).send().subscribe() ;}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词