Skip to main content

Using the Kafka Messaging API

InterSystems provides an API you can use to produce and consume Kafka messages. Your code acts as a Producer or Consumer by creating a client, then calling the client’s methods to perform actions like sending and receiving messages. InterSystems IRIS also provides methods to create and delete Kafka topics.

The Kafka API is based on the common messaging classes that are shared by other messaging platforms.

In addition to the API described here, InterSystems provides specialized classes that you can use in interoperability productions. These classes enable your productions to send messages to Kafka and retrieve messages from Kafka.

Connecting to Kafka

To create a connection to Kafka:

  1. Create a settings object. To do this create an instance of %External.Messaging.KafkaSettings and set its properties as needed. Most of the properties, listed below, apply to both Producers and Consumers; the exception is the groupId setting, which is used only to assign a Consumer to a consumer group.

    • username and password define the client's Kafka credentials.

    • servers defines a comma-separated list of IP address:port entries that identify servers in the Kafka cluster.

    • clientId optionally defines the client ID of the Kafka Producer or Consumer.

    • groupId defines the consumer group ID of a Consumer.

    For example:

     Set settings = ##class(%External.Messaging.KafkaSettings).%New()
     Set settings.username = "amandasmith"
     Set settings.password = "234sdsge"
     Set settings.servers = "100.0.70.179:9092, 100.0.70.089:7070"
     Set settings.clientId = "BazcoApp"
     // If Consumer, specify a consumer group
     Set settings.groupId = "G1"
    
  2. Create the messaging client object. To do this, call the CreateClient() method of %External.Messaging.Client, passing the settings object as the first argument. For example:

     Set client = ##class(%External.Messaging.Client).CreateClient(settings, .tSC)
     If $$$ISERR(tSC) { //handle error scenario }

    The method returns a status code by reference as the second argument. Your code should check the status before proceeding.

    Because the settings object is an instance of %External.Messaging.KafkaSettings, the returned object (client) is an instance of %External.Messaging.KafkaClient.

Kafka Producers

InterSystems IRIS can act as a Kafka producer by calling APIs to create messages and send them. If the application needs to create the topic where messages will be sent, see Working with Topics. The following flow uses the client to interact with Kafka as a Producer:

Configure Client as Producer (Optional)

After creating the Kafka client but before sending messages, the application can customize the Producer using standard Apache ProducerConfig configuration options. The client defaults to a standard Producer configuration, but modifications can be made by passing the Apache options as a JSON string to the UpdateProducerConfig() method. For a list of supported options, refer to the Apache Kafka documentation. For example, the following code configures the Kafka client with an Apache configuration option:

 Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)

 Set producerSettings = "{""key.serializer"":""org.apache.kafka.common.serialization.StringSerializer""}"
 Set tSC = client.UpdateProducerConfig(producerSettings)
Create Message

A message sent to Kafka must contain a topic and value, and can optionally contain a key, which acts as a tag for the value. To prepare a message to be sent to a topic, the InterSystems IRIS application simply creates a new Kafka message object, then defines these properties. For example:

 Set topic = "quickstart-events"
 Set value = "MyMessage", key = "OptionalTag"

 Set msg = ##class(%External.Messaging.KafkaMessage).%New()
 Set msg.topic = topic
 Set msg.value = value
 Set msg.key = key
Send Message

After creating a message, you can send it to the topic by executing the SendMessage() method. For example:

 Set tSC = client.SendMessage(msg)

Kafka Consumers

InterSystems IRIS can act as a Kafka Consumer by calling APIs to retrieve messages for a topic. Be sure to define the groupId property when defining the settings of the client in order to identify the consumer group of the Consumer. The following flow uses the client to interact with Kafka as a Consumer:

Configure Client as Consumer (Optional)

After creating the Kafka client but before retrieving messages, the application can customize the Consumer using standard Apache ConsumerConfig configuration options. The client defaults to a standard Consumer configuration, but modifications can be made by passing the Apache options as a JSON string to the UpdateConsumerConfig() method. For a list of supported options, refer to the Apache Kafka documentation. For example, the following code configures the Kafka client with an Apache configuration option:

 Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)

 Set consumerSettings = "{""key.deserializer"":""org.apache.kafka.common.serialization.StringDeserializer""}"
 Set tSC = client.UpdateConsumerConfig(consumerSettings)
Retrieve Messages

The Kafka client can use the ReceiveMessage() method to act as a Kafka Consumer. It takes the name of the topic as an argument and returns messages as a %ListOfObjects. Kafka returns only new messages, that is, messages that have not been previously retrieved by the Consumer. For example, to retrieve and display messages for a topic:

 #Dim messages As %ListOfObjects
 Set tSC = client.ReceiveMessage(topic, .messages)

 For i=1:1:messages.Size {
	    Set msg = messages.GetAt(i)
	    Write "Message: ", msg.ToJSON(), !
 }

Defining AdminClient Configs

An application can customize the Kafka client using standard Apache AdminClient Configs. The client defaults to a standard configuration, but modifications can be made by passing the Apache options as a JSON string to the UpdateAdminConfig() method. For a list of supported options, refer to the Apache Kafka documentation. For example:

Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)

Set adminSettings = "{""ssl.protocol"":""TLSv1.3""}"
Set tSC = client.UpdateAdminConfig(adminSettings)

Working with Topics

InterSystems IRIS provides an API that can be used to create a new Kafka topic, and another that deletes a topic. When creating a topic, the number of partitions and the replication factor are passed in as arguments. For an introduction to partitions and replication factors, see the Apache Kafka documentation. To create a topic:

 Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)

 Set topic = "quickstart-events"
 Set numberOfPartitions = 1
 Set replicationFactor = 3
 Set tSC = client.CreateTopic(topic,numberOfPartitions,replicationFactor)

As an alterative, you can create the topic with a method that is common to all messaging platforms. When using this alternative, the topic settings are defined using a Kafka-specific object. See %External.Messaging.Client.CreateQueueOrTopic() for details.

Delete a Topic

An application can delete a Kafka topic using the DeleteTopic() method.

 Set tSC = client.DeleteTopic(topic)

As an alterative, you can delete the topic with a method that is common to all messaging platforms. See %External.Messaging.Client.DeleteQueueOrTopic() for details.

Close Client

An InterSystems IRIS application that is done communicating with Kafka should close the client with the Close() method. For example:

 Do:client'="" client.Close()
Feedback