Spring Kafka application with Message Hub on Bluemix Kubernetes

Spring Kafka application with Message Hub on Bluemix Kubernetes

Share this post:

In this post, I’ll describe how to create two Spring Kafka applications that will communicate through a Message Hub service on Bluemix.  One application will act as a Kafka message producer and the other will be a Kafka message consumer.  We will deploy these applications to a Bluemix Kubernetes cluster.

Prerequisites

Before you begin, you’ll need to have the following installed and configured on your PATH:

I will also assume that you are logged into the Bluemix CLI and the Bluemix Container Registry CLI.  You should also have a Kubernetes cluster created in Bluemix, and configured with the Kubernetes CLI.

Setting Up Message Hub

Create MessageHub service

Navigate to the Bluemix catalog and select Message Hub.  I will create mine using the default settings; press Create when you are ready.

Now click the + button to create a topic.  You should see the screen shown below.

create message hub topic

I will call my topic “spring” and leave the other settings as their defaults.

Create JAAS configuration file

Create a kafkaclient.conf file with the following contents:

KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="*****"
    password="*****";
};

You will find the values for username and password under the Service credentials tab.  This file will allow us to securely connect to the Message Hub service from our Spring applications.  Later, we will put it into a Kubernetes secret and have it mounted into our application containers.

Building Producer Application

Create Project

I will build my application inside the spring-kafka/ directory we have already created.  Create the following directory structure for the Kafka producer Maven project:

spring-kafka/spring-kafka-producer/src/main/java/application/

Now create a pom.xml file for the producer project with the following contents:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://bit.ly/2vBpaJm" xmlns:xsi="http://bit.ly/2uCbrop" xsi:schemaLocation="http://bit.ly/2vBpaJm http://bit.ly/2vBgMJV">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.application</groupId>
    <artifactId>spring-kafka-producer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.4.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.3.RELEASE</version>
        </dependency>
    </dependencies>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Configure the Kafka producer

Next we will create Spring beans that will send messages to Message Hub.  Spring Kafka does most of the hard work, but there is some configuration that we need to provide.

First, create a KafkaProducerConfig.java file with the following contents:

package application;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.producer.bootstrap.servers}") String bootstrapServers;
    @Value("${spring.kafka.producer.client.id}") String clientId;

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<String, Object>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        configProps.put(
                ProducerConfig.CLIENT_ID_CONFIG,
                clientId);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        configProps.put(
                "security.protocol",
                "SASL_SSL");
        configProps.put(
                "sasl.mechanism",
                "PLAIN");
        configProps.put(
                "ssl.protocol",
                "TLSv1.2");
        configProps.put(
                "ssl.enabled.protocols",
                "TLSv1.2");
        configProps.put(
                "ssl.endpoint.identification.algorithm",
                "HTTPS");

        return new DefaultKafkaProducerFactory<String, String>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String > kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
}

We will use the KafkaTemplate to send messages to Message Hub.  The ProducerFactory is responsible for generating Kafka producers to actually send these messages, and is required for creation of a KafkaTemplate.

Most of the configuration in this file should be consistent across Message Hub service instances.  Properties that are less consistent will be set in a properties file and accessed using Spring’s @Value annotation.

Now create an application.properties file with the following contents:

spring.kafka.producer.bootstrap.servers=kafka01-prod01.messagehub.services.ng.bluemix.net:9093
spring.kafka.producer.client.id=kafka-producer

You can find the value for bootstrap.servers in the Service credentials tab under kafka_brokers_sasl.  I have chosen to provide just one broker, but you can provide a comma-separated list of brokers for this property if you want.  The value of client.id can be left the same or changed if you choose.

Finally, we will create an Application.java file with the following contents:

package application;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.*;

@RestController
@SpringBootApplication
public class Application implements CommandLineRunner {

    public static Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    @Override
    public void run(String... args) throws Exception {
        template.send("spring", "Message from Kafka producer!");

        logger.info("All sent");
    }

    @RequestMapping(value = "/send/{message}", method = RequestMethod.GET)
    public @ResponseBody
    ResponseEntity sendMessages(@PathVariable String message){
        template.send("spring", message);

        return new ResponseEntity("Sent: " + message, HttpStatus.OK);
    }
}

There is a lot happening in this file:

  • We make our application executable with a standard main function.
  • We inject an instance of the KafkaTemplate configured above.
  • We send an initial message using a CommandLineRunner.
  • We create a Spring REST endpoint, allowing further messages to be sent.

Build Docker image

If you’ve followed me to this point, you should have a complete Spring Kafka application, ready to deploy to Bluemix Kubernetes.  But first, we need to build a Docker image and push it to our Bluemix container registry.

Create a Dockerfile with the following contents:

FROM java:8
COPY /target/spring-kafka-producer-1.0-SNAPSHOT.jar /spring-kafka-producer-1.0-SNAPSHOT.jar
EXPOSE 8080
ENTRYPOINT java -jar -Djava.security.auth.login.config=/etc/secret/kafkaclient.conf /spring-kafka-producer-1.0-SNAPSHOT.jar

Then execute the following commands in the same directory as your Dockerfile:

$ mvn package
...
$ docker build . -t http://bit.ly/2w31duY
...
$ docker push http://bit.ly/2w31duY
...

Note: Be sure to replace “NAMESPACE” with your own private Bluemix namespace.  For help on creating a Bluemix namespace, see the Bluemix instructions.

You now have a Spring Kafka producer application ready to be deployed to Kubernetes!  But before we do that, we need to build a consumer to read the messages produced.

Building the Consumer Application

Create Project

I will also build the consumer application inside the spring-kafka/ directory we have already created.  Create the following directory structure for the Kafka consumer Maven project:
spring-kafka/spring-kafka-consumer/src/main/java/application/
Now create a pom.xml file for the project with the following contents:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://bit.ly/2vBpaJm" xmlns:xsi="http://bit.ly/2uCbrop" xsi:schemaLocation="http://bit.ly/2vBpaJm http://bit.ly/2vBgMJV">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.application</groupId>
    <artifactId>spring-kafka-consumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.4.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.3.RELEASE</version>
        </dependency>
    </dependencies>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

Configure the Kafka consumer

Next we will create Spring beans that will read messages from Message Hub.  Spring Kafka does most of the hard work again, but there is some configuration that we need to provide.

First, create a KafkaConsumerConfig.java file with the following contents:

package application;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.consumer.bootstrap.servers}") String bootstrapServers;
    @Value("${spring.kafka.consumer.client.id}") String clientId;
    @Value("${spring.kafka.consumer.group.id}") String groupId;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> configProps = new HashMap<String, Object>();
        configProps.put(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        configProps.put(
                ConsumerConfig.CLIENT_ID_CONFIG,
                clientId);
        configProps.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                groupId);
        configProps.put(
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        configProps.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        configProps.put(
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
                "latest");
        configProps.put(
                "security.protocol",
                "SASL_SSL");
        configProps.put(
                "sasl.mechanism",
                "PLAIN");
        configProps.put(
                "ssl.protocol",
                "TLSv1.2");
        configProps.put(
                "ssl.enabled.protocols",
                "TLSv1.2");
        configProps.put(
                "ssl.endpoint.identification.algorithm",
                "HTTPS");

        return new DefaultKafkaConsumerFactory<String, String>(configProps);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory
                = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }
}

We will use the ConcurrentKafkaListenerContainerFactory to create consumers using the ConsumerFactory bean.

Most of the configuration in this file should be consistent across Message Hub service instances.  Properties that are less consistent will be set in a properties file and accessed using Spring’s @Value annotation.

Now create an application.properties file with the following contents:

spring.kafka.consumer.bootstrap.servers=http://bit.ly/2w31ez2
spring.kafka.consumer.client.id=kafka-consumer
spring.kafka.consumer.group.id=group

You can find the value for bootstrap.servers in the Service credentials tab under kafka_brokers_sasl.  I have chosen to provide just one broker, but you can provide a comma-separated list of brokers for this property if you want.  The value of client.id can be left the same or changed if you choose.

Finally, we will create an Application.java file with the following contents:

package application;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;

@SpringBootApplication
public class Application {

    public static Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @KafkaListener(topics = "spring")
    public void listen(ConsumerRecord<?, ?> cr) throws Exception {
        logger.info(cr.toString());
    }
}

In this file, we will tell the application to listen for messages and log them, using the listen method annotated by @KafkaListener. We also specify which topics should be listened on (“spring” in this case).

Build Docker image

We now have a complete Spring Kafka consumer application. Next we’ll build a Docker image for this application, like we did for the producer.

Create a Dockerfile with the following contents:

FROM java:8
COPY /target/spring-kafka-consumer-1.0-SNAPSHOT.jar /spring-kafka-consumer-1.0-SNAPSHOT.jar
EXPOSE 8080
ENTRYPOINT java -jar -Djava.security.auth.login.config=/etc/secret/kafkaclient.conf /spring-kafka-consumer-1.0-SNAPSHOT.jar

Then execute the following commands in the same directory as your Dockerfile:

$ mvn package
...
$ docker build . -t http://bit.ly/2vb9Jf5
...
$ docker push http://bit.ly/2vb9Jf5
...

Note: Be sure to replace “NAMESPACE” with your own private Bluemix namespace again.

You now have a Spring Kafka consumer application ready to be deployed to Kubernetes!  If you followed from the beginning, you are ready to deploy to Kubernetes on Bluemix.

Deploying to Bluemix Kubernetes

Create deployment files

Now we let’s create a couple YAML files to deploy to our Kubernetes cluster.  We will need a file to create our secret (which will hold the kafkaclient.conf file), our application pod, and a service.

First, create a secret.yaml file with the following contents:

apiVersion: v1
kind: Secret
metadata:
  name: kafkaclient-secret
type: Opaque
data:
  kafkaclient.conf: BASE64_ENCODED_VALUE

You will need to produce the base64-encoded strings of your IBM Graph credentials and replace the values I indicated. You can find the credentials in your IBM Graph service on Bluemix under the Service credentials tab. You can use the following command to produce the base64-encoded string:

$ echo "$(cat kafkaclient.conf)" | base64
BASE64_ENCODED_VALUE

The resulting string is the value you should put in the secret.yaml file created above.

Next create a pod-service.yaml file with the following contents:

apiVersion: v1
kind: Pod
metadata:
  name: spring-kafka-producer
  labels:
    name: spring-kafka-producer
spec:
  containers:
    - name: spring-kafka-producer
      image: http://bit.ly/2w31duY
      ports:
        - containerPort: 8080
      volumeMounts:
        - name : kafkaclient-secret-vol
          mountPath : /etc/secret
          readOnly : true
  volumes:
    - name : kafkaclient-secret-vol
      secret :
        secretName: kafkaclient-secret

---

apiVersion: v1
kind: Pod
metadata:
  name: spring-kafka-consumer
  labels:
    name: spring-kafka-consumer
spec:
  containers:
    - name: spring-kafka-consumer
      image: http://bit.ly/2vb9Jf5
      ports:
        - containerPort: 8080
      volumeMounts:
        - name : kafkaclient-secret-vol
          mountPath : /etc/secret
          readOnly : true
  volumes:
    - name : kafkaclient-secret-vol
      secret :
        secretName: kafkaclient-secret

---

apiVersion: v1
kind: Service
metadata:
  name: producer-service
  namespace: default
spec:
  type: NodePort
  ports:
    - port: 8080
  selector:
    name: spring-kafka-producer

In this file, we define a pod for our producer and consumer. Both pods have the secret created above mounted into the /etc/secret/ directory, where we told them it would be in the Dockerfiles. A service is created exposing port 8080 on the producer pod, so that we will be able to create custom messages using the REST endpoint.

Deploy to cluster

Now we are ready to run our Spring application on our Bluemix Kubernetes cluster!  Create a proxy connection to your cluster with the following command:

$ kubectl proxy

Then navigate to the Kubernetes dashboard at: 127.0.0.1:8001/ui.

From the dashboard, click CREATE in the top-right corner, then select the Upload a YAML or JSON file option, and choose your secret.yaml file.  Upload this file with the UPLOAD button.  Then follow the same procedure with your pod-service.yaml file.

After your pods have finished starting up, monitor the log for you consumer pod with the following command:

$ kubectl logs -f spring-kafka-consumer

The most recent item in the log should be a ConsumerRecord with a value of “Message from Kafka producer!” Once this message has appeared, we will know the producer is ready to send some custom messages.

We will need to find where the producer service is being exposed to send messages through it. To find the public node IP, click Nodes on the left sidebar and use the name of the node that your pod is running on (there should only be one node if you are using a Lite cluster plan). To find the service NodePort, click Services on the left sidebar and find the port number (in the range 30000-32767) associated with the “producer-service” service.

In a new command window, use the following command to send your own Kafka message (using your own node’s IP and port):

$ curl http://bit.ly/2w3oGMz

Your message should appear in the window monitoring the consumer pod logs.

Conclusion

We created two Spring Kafka applications: a producer and a consumer.  These applications used the Message Hub service on Bluemix to send and consume messages.  Then we deployed both applications to Kubernetes on Bluemix and sent custom messages through the producer to the consumer.

#bluemix,#awvi,#ibm

Bluemix

via Bluemix Blog https://ibm.co/2pQcNaA

July 26, 2017 at 11:03AM

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s