这是绝对正确的设计,并且值得节省资源并且仅为每个客户端使用所有可能的操作的连接。
但是,不要实现轮子并使用协议,它为您提供所有这些类型的通信。
其中一个选项是使用RSocket协议的RSocket-Java实现。 RSocket-Java建立在Project Reactor之上,因此它自然适合Spring WebFlux生态系统。
不幸的是,没有与Spring生态系统的特色集成。幸运的是,我花了几个小时来提供一个简单的 RSocket Spring Boot Starter 将Spring WebFlux与RSocket集成,并将WebSocket RSocket服务器与WebFlux Http服务器一起公开。
基本上,RSocket隐藏了自己实现相同方法的复杂性。使用RSocket,我们不必关心交互模型定义作为自定义协议和Java中的实现。 RSocket为我们提供了将数据传送到特定逻辑信道的功能。它提供了一个内置客户端,可以将消息发送到同一个WS连接,因此我们不必为此创建自定义实现。
由于RSocket只是一个协议,它不提供任何消息格式,因此这个挑战是针对业务逻辑的。但是,有一个RSocket-RPC项目,它提供一个协议缓冲区作为消息格式,并重用与GRPC相同的代码生成技术。因此,使用RSocket-RPC,我们可以轻松地为客户端和服务器构建API,并且根本不需要传输和协议抽象。
相同的RSocket Spring Boot集成提供了一个 例 RSocket-RPC的用法也是如此。
所以,为此目的,你必须自己实现这个地狱。我之前已经做过一次,但我不能指出那个项目,因为它是一个企业项目。 不过,我可以分享几个代码示例,可以帮助您构建适当的客户端和服务器。
必须考虑的第一点是,一个物理连接中的所有逻辑流都应存储在某处:
class MyWebSocketRouter implements WebSocketHandler { final Map<String, EnumMap<ActionMessage.Type, ChannelHandler>> channelsMapping; @Override public Mono<Void> handle(WebSocketSession session) { final Map<String, Disposable> channelsIdsToDisposableMap = new HashMap<>(); ... } }
上面的示例中有两个地图。第一个是您的路由映射,它允许您根据传入的消息参数识别路由,或者左右。第二个是为请求流用例创建的(在我的例子中是活动订阅的映射),因此您可以发送创建订阅的消息帧,或者订阅特定操作并保留该订阅以便取消订阅执行操作如果存在订阅,您将被取消订阅。
为了从所有逻辑流发回消息,您必须将消息多路复用到一个流。例如,使用Reactor,您可以使用 UnicastProcessor :
UnicastProcessor
@Override public Mono<Void> handle(WebSocketSession session) { final UnicastProcessor<ResponseMessage<?>> funIn = UnicastProcessor.create(Queues.<ResponseMessage<?>>unboundedMultiproducer().get()); ... return Mono .subscriberContext() .flatMap(context -> Flux.merge( session .receive() ... .cast(ActionMessage.class) .publishOn(Schedulers.parallel()) .doOnNext(am -> { switch (am.type) { case CREATE: case UPDATE: case CANCEL: { ... } case SUBSCRIBE: { Flux<ResponseMessage<?>> flux = Flux .from( channelsMapping.get(am.getChannelId()) .get(ActionMessage.Type.SUBSCRIBE) .handle(am) // returns Publisher<> ); if (flux != null) { channelsIdsToDisposableMap.compute( am.getChannelId() + am.getSymbol(), // you can generate a uniq uuid on the client side if needed (cid, disposable) -> { ... return flux .subscriberContext(context) .subscribe( funIn::onNext, // send message to a Processor manually e -> { funIn.onNext( new ResponseMessage<>( // send errors as a messages to Processor here 0, e.getMessage(), ... ResponseMessage.Type.ERROR ) ); } ); } ); } return; } case UNSABSCRIBE: { Disposable disposable = channelsIdsToDisposableMap.get(am.getChannelId() + am.getSymbol()); if (disposable != null) { disposable.dispose(); } } } }) .then(Mono.empty()), funIn ... .map(p -> new WebSocketMessage(WebSocketMessage.Type.TEXT, p)) .as(session::send) ).then() ); }
正如我们从上面的示例中看到的,那里有很多东西:
Flux
Mono
客户端也不那么简单:
为了处理连接,我们必须分配两个处理器,以便我们可以使用它们来复用和解复用消息:
UnicastProcessor<> outgoing = ... UnicastPorcessor<> incoming = ... (session) -> { return Flux.merge( session.receive() .subscribeWith(incoming) .then(Mono.empty()), session.send(outgoing) ).then(); }
所有创建的流都是 Mono 要么 Flux 应存储在某处,以便我们能够区分哪个流消息涉及:
Map<String, MonoSink> monoSinksMap = ...; Map<String, FluxSink> fluxSinksMap = ...;
自MonoSink以来我们必须保留两张地图,而FluxSink没有相同的父接口。
在上面的示例中,我们只考虑了客户端的初始部分。现在我们必须构建一个消息路由机制:
... .subscribeWith(incoming) .doOnNext(message -> { if (monoSinkMap.containsKey(message.getStreamId())) { MonoSink sink = monoSinkMap.get(message.getStreamId()); monoSinkMap.remove(message.getStreamId()); if (message.getType() == SUCCESS) { sink.success(message.getData()); } else { sink.error(message.getCause()); } } else if (fluxSinkMap.containsKey(message.getStreamId())) { FluxSink sink = fluxSinkMap.get(message.getStreamId()); if (message.getType() == NEXT) { sink.next(message.getData()); } else if (message.getType() == COMPLETE) { fluxSinkMap.remove(message.getStreamId()); sink.next(message.getData()); sink.complete(); } else { fluxSinkMap.remove(message.getStreamId()); sink.error(message.getCause()); } } })
上面的代码示例显示了我们如何路由传入的消息。
最后一部分是消息多路复用。为此,我们将涵盖可能的发送者类impl:
class Sender { UnicastProcessor<> outgoing = ... UnicastPorcessor<> incoming = ... Map<String, MonoSink> monoSinksMap = ...; Map<String, FluxSink> fluxSinksMap = ...; public Sender () {
//在这里创建websocket连接并放置前面提到的代码 }
Mono<R> sendForMono(T data) { //generate message with unique return Mono.<R>create(sink -> { monoSinksMap.put(streamId, sink); outgoing.onNext(message); // send message to server only when subscribed to Mono }); } Flux<R> sendForFlux(T data) { return Flux.<R>create(sink -> { fluxSinksMap.put(streamId, sink); outgoing.onNext(message); // send message to server only when subscribed to Flux }); } }
不确定这种情况是否是你的问题? 我发现你正在发送一个静态通量响应(这是一个紧密的流) 您需要一个opend流来向该会话发送消息,例如您可以创建一个处理器
public class SocketMessageComponent { private DirectProcessor<String> emitterProcessor; private Flux<String> subscriber; public SocketMessageComponent() { emitterProcessor = DirectProcessor.create(); subscriber = emitterProcessor.share(); } public Flux<String> getSubscriber() { return subscriber; } public void sendMessage(String mesage) { emitterProcessor.onNext(mesage); }
}
然后你可以发送
public Mono<Void> handle(WebSocketSession webSocketSession) { this.webSocketSession = webSocketSession; return webSocketSession.send(socketMessageComponent.getSubscriber() .map(webSocketSession::textMessage)) .and(webSocketSession.receive() .map(WebSocketMessage::getPayloadAsText).log()); }