WebSocket Wizardry: Spring Webflux Edition

Meiyappan Kannappa
7 min readOct 17, 2023

Most of the time developers will come across situations where the app has to provide live updates and events to improve consumer experience.

API Polling is one approach to pull the latest events and status from the server. However, there are several drawbacks

  • Increased network traffic: Polling involves sending frequent requests to the API server, even if there is no new data available. This can lead to unnecessary network congestion and increased bandwidth usage.
  • Latency and delay: Polling introduces a delay between data updates and their availability to the client. The client has to wait for the next poll to receive the updated information, which can result in delayed or outdated data.
  • Inefficient resource utilization: Polling requires continuous server resources to handle frequent requests, even if there is no new data. This can lead to inefficient resource utilization and increased server costs.
  • Scalability challenges: As the number of clients increases, API polling can become a scalability challenge. The server needs to handle a large number of concurrent requests, which can impact performance and responsiveness.
  • Real-time limitations: API polling is not suitable for real-time applications that require instant data updates. The delay introduced by polling can make real-time interactions less responsive and less accurate.
  • Battery drain on mobile devices: Continuous polling can drain the battery life of mobile devices due to frequent network requests and data processing.

There are other options to overcome these drawbacks and provide a great consumer experience, like Websockets, GRPC and MQTT. The difference between these 3 and when one should use one over the other is covered in my previous post here.

In this post, we will see how to implement websockets using Spring Webflux and optimize it for data & connection loss.

Unlike Spring Boot where we can implement websockets using STOMP protocol, spring webflux doesn't support STOMP.

Let's see step by step how to implement and optimize websockets with Spring Webflux. Before that

What is Spring Webflux?

Spring Webflux is a reactive web framework that enables developers to build scalable and efficient applications. It supports non-blocking I/O and reactive programming, allowing for concurrent handling of requests without blocking threads. With built-in WebSocket support and seamless integration with other Spring projects, it provides a flexible and powerful solution for building reactive web applications.

Piece of Code for Websocket

Let's create a websocket handler on the server side in Spring Webflux. Below is a simple code to handle the websocket on /ws endpoint and echo the message back to the client.

The above code is a WebSocket handler implementation in a Spring WebFlux application. It is annotated with @Configuration and @EnableWebFlux, indicating that it serves as a configuration class for Spring WebFlux. The class implements the WebSocketHandler interface, allowing it to handle WebSocket connections. The simpleUrlHandlerMapping() method defines a bean that maps the "/ws" URL path to the handle() method of the class. Within the handle() method, the WebSocket session's handshake information is used to extract query parameters, specifically the "email" parameter. The transformed messages are sent back to the client using the session.send() method. Overall, this code sets up a WebSocket handler that listens for incoming messages, processes them, and sends back a response to the client. Please note it doesn't filter the clients here.

This is good for a simple “Hello World ” message, but to send the message for an event happening within an application we need to figure out a way. I would prefer to use Sinks in Reactor, emit value to sink which will handle sending messages to the websocket session.

What are Sinks in the Reactor?

In Reactor, Sinks is a utility class that provides various types of sinks for emitting values in a reactive programming context. A sink represents the starting point of a reactive stream and allows you to emit values to be consumed by subscribers. Here are some types of sinks provided by the Sinks class:

Sinks.One: Represents a sink that can emit only one value. It is useful when you want to emit a single value to a subscriber.

Sinks.Many: Represents a sink that can emit multiple values to multiple subscribers. It supports unicast or multicast behaviour and allows you to control backpressure.

Sinks.Empty: Represents an empty sink that doesn't emit any values. It can be useful as a placeholder or for testing purposes.

Sinks.UnicastSpec: This represents a builder-like API for creating a unicast sink. It provides additional configuration options such as buffering and error handling.

These sinks provide a way to create and manage streams of values in a reactive application. You can emit values to the sinks using methods like tryEmitNext(), tryEmitComplete(), or tryEmitError(). Subscribers can then consume these values by subscribing to the sink's corresponding Flux or Mono.

Create a Sink and send data to websocket via Sinks

The below code is a WebSocket handler implementation in a Spring WebFlux application. The handle() method is invoked when a WebSocket session is established. It extracts the query parameters from the WebSocket session's handshake information, specifically the "email" parameter.

The sink.asFlux().subscribe() method is used to subscribe a WebSocketSessionDataPublisher to the sink, which will handle publishing data to the WebSocket session. The incoming messages from the WebSocket session are filtered to only process text messages and mapped to a custom WSMessage object. The filtered messages are then emitted to the sink, and a log statement is printed to indicate the result of the emission. The WebSocketSessionDataPublisher class is a consumer that handles publishing data to the WebSocket session based on the unique ID. When a message is received, it checks if the unique ID matches and sends a filtered message to the WebSocket session. The WSMessage class is a simple data class that holds the unique ID and the message payload.

Good, we now have the websocket handler which can filter the messages to be sent to the client.

The next step is to avoid connection disconnect between the client and server.

Ping Pong in Websockets

“ping-pong” refers to a mechanism used to keep the connection alive and ensure its stability. It involves sending periodic “ping” messages from the server to the client and receiving corresponding “pong” messages from the client to the server. It's required for below reasons

  • Connection Monitoring: Ping-pong messages allow the server to actively monitor the connection’s health. If the server doesn’t receive a pong response within a certain timeframe, it can assume that the connection is no longer active or stable.
  • Preventing Connection Timeout: Websocket connections can be subject to timeouts, especially when traversing through proxies or load balancers. By sending periodic ping messages, the server can prevent the connection from being prematurely closed due to inactivity.
  • Load Balancer and Proxy Support: Ping-pong messages help ensure that intermediate proxies and load balancers are aware of the active connection. Some proxies or load balancers may close idle connections, but by sending pings, the connection remains active and prevents premature closure.
  • Network Stability: Ping-pong messages can help identify network issues or latency problems. If the server receives pong responses with significant delays, it may indicate network congestion or other connectivity issues.

We will face a considerable amount of data loss without Ping Pong, esp while there are multiple hops like Loadbalancer, API Gateway etc with variable idle timeout, and TCP timeout settings.

Ping Pong for every 2 seconds is implemented to the websocket as below in the handler method.

The sendPing variable represents a Mono<Void> that sends periodic "ping" messages to the WebSocket session. This is achieved by using the session.send() method and passing a Flux that emits ping messages at a regular interval using Flux.interval(). The map() operator is used to create a ping message using the dataBufferFactory.

The pong variable represents a Flux<WebSocketMessage> that filters and processes incoming messages of type "PONG" from the WebSocket session. The filter() operator is used to only select messages of type "PONG". The doOnNext() operator is then used to perform an action on each selected message, which includes logging the received "PONG" message and emitting a corresponding WSMessage to the sink.

Enable Persistence on the Client and Server Side

The implementation of Ping-Pong in websockets aims to prevent disconnection caused by socket or TCP timeouts and the corresponding settings between the client and server. However, in real-world scenarios, websocket connections can still experience interruptions due to various factors. These factors include network conditions, unavailability of the network, device switching between different networks (e.g., 3G to 4G, GPRS to WIFI), or when the device moves into areas with no network coverage, such as tunnels or dead zones. In such cases, the client should handle reconnection using error handlers and connection close handlers to re-establish the websocket connection.

To handle data loss during such scenarios, persistence needs to be enabled at the data sender side be it client or server. Data can be persisted temporarily in a cache like Redis or in any database, before sending the data via websocket connection. Implement an acknowledgement mechanism, so when the receiver acknowledges the message remove it from cache.

Retry can happen when the connection is established after disconnecting, by streaming the un-acknowledged messages from the persistence

Finishing Thoughts

There are multiple other options like GRPC, MQTT which can be used to stream live data events to its subscribers, sometimes because of various reasons developers will be forced to use Websockets, though it has issues like timeouts, network errors and unavailiblity we can reduce the data loss, and improve the connectivity by configuring the additional handlers.

--

--

Meiyappan Kannappa

Technical enthusiast, daydreamer. Design of involute software architecture for digital transformation, sustainability and cost optimization.