Deep Dive Into Spring Cloud Bus

Deep Dive Into Spring Cloud Bus

“Spring Cloud Bus links nodes of a distributed system with a lightweight message broker. This can then be used to broadcast state changes (e.g. configuration changes) or other management instructions.”

Introduction

The statement above is the definition from the Spring Cloud website. And after that brief definition, there is a simple example.

I hope many of you, like me, would like to know more about this new feature.

For example, here are the questions I would have for the Spring Cloud Bus:

  • How is the message sent?
  • How is the message received?
  • How is the destination matched?
  • How do you trigger actions after receiving messages?

In this article, I will share what I understand of the Spring Cloud Bus through a solid code example.

The Code Example

The assumption I have for the explanation is we are already familiar with Spring Cloud Stream since it’s been around for a long time. The Spring Cloud Bus is built on top of it.

Let’s start with the  SpringCloudBusClient interface definition.

public interface SpringCloudBusClient {

  String INPUT = "springCloudBusInput";

  String OUTPUT = "springCloudBusOutput";

  @Output(SpringCloudBusClient.OUTPUT)
  MessageChannel springCloudBusOutput();

  @Input(SpringCloudBusClient.INPUT)
  SubscribableChannel springCloudBusInput();
}

In the interface we created two bindings: springCloudBusInput and springCloudBusOutput . The annotations @input and @output are derived from @EnableBinding annotation.

The properties (i.e. topic) of springCloudBusInput and springCloudBusOutput can be modified via a configuration file.

spring.cloud.stream.bindings:
  springCloudBusInput:
    destination: my-bus-topic
  springCloudBusOutput:
    destination: my-bus-topic

Next is the body of the code example:

// BusAutoConfiguration

@EventListener(classes = RemoteApplicationEvent.class) ❶
public void acceptLocal(RemoteApplicationEvent event) {
  if (this.serviceMatcher.isFromSelf(event)
      && !(event instanceof AckRemoteApplicationEvent)) { ❷
    this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); ❸
  }
}

@StreamListener(SpringCloudBusClient.INPUT) ❹
public void acceptRemote(RemoteApplicationEvent event) {
  if (event instanceof AckRemoteApplicationEvent) {
    if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
        && this.applicationEventPublisher != null) { ❺
      this.applicationEventPublisher.publishEvent(event);
    }
    // If it's an ACK we are finished processing at this point
    return;
  }
  if (this.serviceMatcher.isForSelf(event)
      && this.applicationEventPublisher != null) { ❻
    if (!this.serviceMatcher.isFromSelf(event)) { ❼
      this.applicationEventPublisher.publishEvent(event);
    }
    if (this.bus.getAck().isEnabled()) { ❽
      AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
          this.serviceMatcher.getServiceId(),
          this.bus.getAck().getDestinationService(),
          event.getDestinationService(), event.getId(), event.getClass());
      this.cloudBusOutboundChannel
          .send(MessageBuilder.withPayload(ack).build());
      this.applicationEventPublisher.publishEvent(ack);
    }
  }
  if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) { ❾
    // We are set to register sent events so publish it for local consumption,
    // irrespective of the origin
    this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
        event.getOriginService(), event.getDestinationService(),
        event.getId(), event.getClass()));
  }
}

The notes of the above code:

  1. Spring event listener will monitor all RemoteApplicationEvent events. For example, bus-env will send EnvironmentChangeRemoteApplicationEvent and bus-refresh will send RefreshRemoteApplicationEvent . All these events will be captured.
  2. Check to see if AckRemoteApplicationEvent is a remote event AND sender is the event itself. If so continue to step 3.
  3. Compose a message using the event as payload. Then send the message via springCloudBusOutput to broker.
  4. @StreamListener annotates springCloudBusInput to receive remote events.
  5. If the remote event is AckRemoteApplicationEvent AND trace is turned on, but the event is not sent by itself, AckRemoteApplicationEvent will then acknowledge the receiving of the event. That would be the end of the flow.
  6. If it didn’t satisfy step 5, meaning the event is indeed remote, then go to 7 and 8; otherwise, execute step 9.
  7. Send the event as a local event.
  8. If AckRemoteApplicationEvent is on, compose and send AckRemoteApplicationEvent to local and to the broker.
  9. If trace is turned on, compose and send SentApplicationEvent .

Through this example, we got the answers to the questions above:

  • How is the message sent?

Use BusAutoConfiguration#acceptLocal to send the message to the topic defined in springCloudBus.

  • How is the message received?

BusAutoConfiguration#acceptRemote will receive the message from the springCloudBus topic .

  • How is the destination matched?

BusAutoConfiguration#acceptRemote matches the destination.

  • How do you trigger actions after receiving messages?

In the example, EnvironmentChangeListener receivesEnvironmentChangeRemoteApplicationEvent first, and the action logic can then act accordingly.

Summary

Spring Cloud Bus is still pretty new. However it’s built on top of Spring events and Spring Cloud Stream, so it’s not hard to understand the logic behind it.

Right now there are only a few remote events available on Bus, and most of them are configuration updates. Users can extend RemoteApplicationEvent with the @RemoteApplicationEventScan annotation to architect their own microservice messaging system.

from DZone Cloud Zone

Sharing is caring!

Comments are closed.