Lejdi Prifti

0 %
Lejdi Prifti
Software Engineer
DevOps Engineer
ML Practitioner
  • Residence:
    Albania
  • City:
    Tirana
  • Email:
    info@lejdiprifti.com
Spring
AWS & Cloud
Angular
Team Player
Coordination & Leadership
Time Management
Docker & Kubernetes
ReactJs
JavaScript
Python
  • Java, JavaScript, Python
  • AWS, Kubernetes, Azure
  • Bootstrap, Materialize
  • Css, Sass, Less
  • Blockchain, Ethereum, Solidity
  • React, React Native, Flutter
  • GIT knowledge
  • Machine Learning, Deep Learning
0

No products in the basket.

TCP client-server communication with Spring Integration

29. June 2024

Introduction

In this article, I’ll walk you through using Spring Boot 3 to set up a reliable TCP server and client while utilizing the powerful messaging features of Spring Integration. By the end, you’ll have the knowledge and tools to confidently implement and customize TCP communication in your Spring applications.

Table of Contents

Problem definition

Imagine that we have two Spring Boot 3 services that must communicate with each other under the TCP protocol with Spring Integration. The TCP client will always try to connect with the TCP server. When a successful connection is established, it must send a message for processing. The TCP server will process the message and if successful, it must reply with SUCCESS, otherwise it will reply with ERROR.

Does it sound exciting? 

Dependency configuration

The main dependencies that we are going to use to implement both the TCP client and server, are the spring-boot-starter-integration and  spring-integration-ip.

The last dependency offers channel adapters for transmitting and receiving messages via internet protocols. There are adapters for TCP (Transmission Control Protocol) and UDP (User Datagram Protocol) available. Each adapter provides for one-way communication over the underlying protocol. Furthermore, basic TCP gateways for inbound and outgoing traffic are offered by Spring Integration and are utilized when two-way communication is required.

Additionally, I have added also lombok for logging necessary informations.

				
						<dependencies>
		<!-- INIT SPRING INTEGRATION -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-ip</artifactId>
			<version>6.3.1</version>
		</dependency>
		<!-- END SPRING INTEGRATION -->

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
	</dependencies>
				
			

TCP Server configuration

In this section, we will apply the required configurations for setting up the TCP server. We need to create the TcpServerConfig class, as well as the TcpServerInboundEndpoint class.

TcpServerConfig

The @EnableIntegration annotation enables Spring Integration capabilities in the application context. It typically imports necessary configuration and sets up infrastructure required for Spring Integration to work.

As stated also in Spring Integration documentation, for  TCP, the configuration of the underlying connection is provided by using a connection factory. Two types of connection factory are provided: a client connection factory and a server connection factory. Client connection factories establish outgoing connections. Server connection factories listen for incoming connections.

The bean named serverFactory of type AbstractServerConnectionFactory  represents the TCP connection factory for the server, configured with the server port, the serializer and the deserializer. The ByteArrayStxEtxSerializer converts a byte array to a stream of bytes preceded by an STX (0x02) and followed by an ETX (0x03).

Finally, we define a bean of type TcpInboundGateway , which is used for handling inbound messages from TCP clients. An inbound channel adapter or gateway uses a server connection factory, in this case serverFactory. We have defined TcpServerConstants.MESSAGE_CHANNEL as the request channel name where incoming messages are sent for processing. Furthermore, I have enabled logging for some additional information. 

				
					@RequiredArgsConstructor
@Configuration
@EnableIntegration
public class TcpServerConfig {

	@Value("${tcp.server.port}")
	private int serverPort;

	@Bean
	public AbstractServerConnectionFactory serverFactory() {
		AbstractServerConnectionFactory factory = new TcpNetServerConnectionFactory(serverPort);
		factory.setSerializer(new ByteArrayStxEtxSerializer());
		factory.setDeserializer(new ByteArrayStxEtxSerializer());

		return factory;
	}

	@Bean
	public TcpInboundGateway inboundGateway(AbstractServerConnectionFactory serverFactory) {
		TcpInboundGateway inbound = new TcpInboundGateway();
		inbound.setConnectionFactory(serverFactory);
		inbound.setRequestChannelName(TcpServerConstants.MESSAGE_CHANNEL);
		inbound.setLoggingEnabled(true);

		return inbound;
	}

}
				
			

Attention!

Pay attention when using the serializers/deserializers. Depending on your payload type, you will choose the appropriate deserializer. The ByteArrayLengthHeaderSerializer is an efficient deserializer because it does not have to parse every byte to look for a termination character sequence. It can also be used for payloads that contain binary data. The other deserializers such as ByteArrayCrlfSerializer, ByteArraySingleTerminatorSerializer, ByteArrayLfSerializer and ByteArrayStxEtxSerializer, support only text in the payload.

TcpServerInboundEndpoint

The@MessageEndpoint annotation marks the class TcpServerInboundEndpoint as a message-driven endpoint in the Spring Integration framework. It indicates that this class is capable of receiving and processing messages from a message channel (TcpServerConstants.MESSAGE_CHANNEL in this case).

Next, I have marked the onMessage method with @ServiceActivator that is a Spring Integration annotation used to mark a method as a handler for messages received from a specified input channel (TcpServerConstants.MESSAGE_CHANNEL).

The option requiresReply = "true" indicates that this method expects a reply message to be returned after processing the incoming message. If the method returns null, an exception will be thrown.

Finally, the onMessage method handles the message sent by the client. If the payload is not a prime number, the response is ERROR. On the other hand, if it is a prime number, it replies with SUCCESS. Both types of responses will be in byte arrays, due to the serializer/deserializer used.

				
					@MessageEndpoint
@Slf4j
@RequiredArgsConstructor
public class TcpServerInboundEndpoint {

	@ServiceActivator(inputChannel = TcpServerConstants.MESSAGE_CHANNEL, requiresReply = "true")
	public byte[] onMessage(Message<byte[]> message) {
		log.info("received message with connection id {}",
				message.getHeaders().get(IpHeaders.CONNECTION_ID));

		byte[] bytePayload = message.getPayload();

		String payloadString = new String(bytePayload, StandardCharsets.UTF_8);

		if (Integer.valueOf(payloadString) % 2 == 0) {
			return ResultEnum.ERROR.name().getBytes();
		}

		return ResultEnum.SUCCESS.name().getBytes();
	}
}
				
			

TCP Client configuration

In the previous section, we configured the TCP server with Spring Integration. Now it’s time to configure the TCP client that will be the one sending messages to our TCP server.

TcpNetClientRetryConnectionFactory

The TcpNetClientRetryConnectionFactory extends from TcpNetClientConnectionFactory, which is related to creating TCP client connections. However, a key difference in here is the fact that when a connection cannot be established because the server is down or not healthy, the client is always trying to create a connection in a specific interval. Every x seconds it tries to create a connection. This is a custom implementation that suits to the problem definition we stated above. 

If you want to implement a much simpler scenario, you can use the TcpNetClientConnectionFactory directly. 

				
					@Slf4j
public class TcpNetClientRetryConnectionFactory extends TcpNetClientConnectionFactory {

	private final TcpProperties tcpProperties;

	public TcpNetClientRetryConnectionFactory(TcpProperties tcpProperties) {
		super(tcpProperties.getServerHost(), tcpProperties.getServerPort());
		this.tcpProperties = tcpProperties;
	}

	@Override
	protected @NonNull Socket createSocket(@NonNull String host, int port) throws IOException {
		Socket socket = null;

		try {
			socket = super.createSocket(host, port);
		} catch (Exception e) {
			log.warn("retrying connection");

			try {
				Thread.sleep(tcpProperties.getConnectionRetryInterval());
			} catch (InterruptedException ex) {
				log.error("thread killed");
			}

			socket = this.createSocket(host, port);
		}

		log.debug("connection established");
		return socket;
	}

}
				
			

TcpClientConfig

As stated in the TcpServerConfig, @EnableIntegration enables Spring Integration capabilities within the application context, allowing the use of Spring Integration components such as gateways, channels, and transformers. 

Next, we define the clientFactory, which is a bean of type AbstractClientConnectionFactory. It is an abstract client connection factory used for establishing TCP connections. This factory retries connection attempts upon failure, using ByteArrayStxEtxSerializer for serialization and deserialization, and setting singleUse to true to ensure a new connection for each message.

The outboundGateway bean defines a bean of type TcpOutboundGateway, which serves as an outbound gateway for sending messages over TCP. It sets the clientFactory as the connection factory for the gateway, enables logging for debugging purposes and configures the gateway to require a reply (setRequiresReply(true)), indicating that it expects a response from the server for each message sent.

Finally, the interface TcpClientGateway defines a messaging gateway interface, which provides a higher-level, typed API for interacting with messaging systems. It annotates the send method with @Gateway(requestChannel = MESSAGE_CHANNEL), indicating that messages sent through this method should be directed to the MESSAGE_CHANNEL for processing. It is the channel on which we configured the MessageEndpoint for listening on the TCP server.

				
					@Configuration
@RequiredArgsConstructor
@EnableIntegration
public class TcpClientConfig {

	private static final String MESSAGE_CHANNEL = "message-channel";

	private final TcpProperties tcpProperties;

	@Bean
	public AbstractClientConnectionFactory clientFactory() {
		AbstractClientConnectionFactory factory = new TcpNetClientRetryConnectionFactory(tcpProperties);
		factory.setSerializer(new ByteArrayStxEtxSerializer());
		factory.setDeserializer(new ByteArrayStxEtxSerializer());
		factory.setSingleUse(true);

		return factory;
	}

	@Bean
	@ServiceActivator(inputChannel = MESSAGE_CHANNEL)
	public TcpOutboundGateway outboundGateway(AbstractClientConnectionFactory clientFactory) {
		TcpOutboundGateway outboundGateway = new TcpOutboundGateway();
		outboundGateway.setConnectionFactory(clientFactory);
		outboundGateway.setLoggingEnabled(true);
		outboundGateway.setRequiresReply(true);

		return outboundGateway;
	}

    @MessagingGateway
	public interface TcpClientGateway {

		@Gateway(requestChannel = MESSAGE_CHANNEL)
		byte[] send(byte[] payload);
	}
}
				
			

Properties

One of the best practices is to gather the properties in a single class, such as TcpProperties defined in the TCP client.

				
					@Configuration
@Getter
public class TcpProperties {

	@Value("${tcp.server.host}")
	private String serverHost;

	@Value("${tcp.server.port}")
	private int serverPort;

	@Value("${connection.retry.interval}")
	private int connectionRetryInterval;

}

				
			

However, we should not forget to add the following properties in the TCP client application.properties file. 

				
					spring.application.name=tcpclient

tcp.server.host=localhost
tcp.server.port=9001

connection.retry.interval=10
				
			

On the other hand, in the TCP server application we specify the tcp.server.port

				
					spring.application.name=tcpserver

tcp.server.port=9001
				
			

Application startup

On application startup, when we receive the ApplicationReadyEvent , we will send 9 messages containing the bytes of 9 numbers from 0-9. We expect to receive an error message if the number is non-prime and a success message if the number is prime. 

				
					@Component
@RequiredArgsConstructor
@Slf4j
public class TcpClientReadyListener {

	private static final String SUCCESS = "SUCCESS";
	private final TcpClientGateway tcpClientGateway;

	@EventListener(ApplicationReadyEvent.class)
	public void start() {
		for (int i = 0; i < 10; i++) {
			byte[] response = tcpClientGateway.send(String.valueOf(i).getBytes());
			if (SUCCESS.equals(new String(response, StandardCharsets.UTF_8))) {
				log.info("message processed successfully");
			} else {
				log.error("message processing failed");
			}
		}
	}

}
				
			

And that’s what it does. As soon as the connection is established, the application starts sending messages and getting replies.

				
					 INFO 3002665 --- [tcpclient] [           main] l.t.c.TcpNetClientRetryConnectionFactory : connection established
ERROR 3002665 --- [tcpclient] [           main] c.l.t.ready.TcpClientReadyListener       : message processing failed
 INFO 3002665 --- [tcpclient] [           main] l.t.c.TcpNetClientRetryConnectionFactory : connection established
 INFO 3002665 --- [tcpclient] [           main] c.l.t.ready.TcpClientReadyListener       : message processed successfully
 INFO 3002665 --- [tcpclient] [           main] l.t.c.TcpNetClientRetryConnectionFactory : connection established
ERROR 3002665 --- [tcpclient] [           main] c.l.t.ready.TcpClientReadyListener       : message processing failed
 INFO 3002665 --- [tcpclient] [           main] l.t.c.TcpNetClientRetryConnectionFactory : connection established
 INFO 3002665 --- [tcpclient] [           main] c.l.t.ready.TcpClientReadyListener       : message processed successfully
 INFO 3002665 --- [tcpclient] [           main] l.t.c.TcpNetClientRetryConnectionFactory : connection established
ERROR 3002665 --- [tcpclient] [           main] c.l.t.ready.TcpClientReadyListener       : message processing failed
 INFO 3002665 --- [tcpclient] [           main] l.t.c.TcpNetClientRetryConnectionFactory : connection established
 INFO 3002665 --- [tcpclient] [           main] c.l.t.ready.TcpClientReadyListener       : message processed successfully
 INFO 3002665 --- [tcpclient] [           main] l.t.c.TcpNetClientRetryConnectionFactory : connection established
ERROR 3002665 --- [tcpclient] [           main] c.l.t.ready.TcpClientReadyListener       : message processing failed
 INFO 3002665 --- [tcpclient] [           main] l.t.c.TcpNetClientRetryConnectionFactory : connection established
 INFO 3002665 --- [tcpclient] [           main] c.l.t.ready.TcpClientReadyListener       : message processed successfully
 INFO 3002665 --- [tcpclient] [           main] l.t.c.TcpNetClientRetryConnectionFactory : connection established
ERROR 3002665 --- [tcpclient] [           main] c.l.t.ready.TcpClientReadyListener       : message processing failed
 INFO 3002665 --- [tcpclient] [           main] l.t.c.TcpNetClientRetryConnectionFactory : connection established
 INFO 3002665 --- [tcpclient] [           main] c.l.t.ready.TcpClientReadyListener       : message processed successfully

				
			

And that is how you implement tcp client-server communication with Spring Integration. Full code for this tutorial is available on the Github repository.

If you want to learn about WebSocket, check out this blog post.

Comment below if this article helped you or if you are facing an issue you might need help with. 

Thank you for reading!

Posted in SpringBootTags:
Write a comment