Using the Kafka Messaging API
InterSystems provides an API you can use to produce and consume Kafka messages. Your code acts as a producer or consumer by creating a client, then calling the client’s methods to perform actions like sending and receiving messages. InterSystems IRIS also provides methods to create and delete Kafka topics.
The Kafka API is based on the common messaging classes that are shared by other messaging platforms. This page describes platform-specific variations in the work flow these common classes establish.
In addition to the API described here, InterSystems provides specialized classes that you can use in interoperability productions. These classes enable your productions to send messages to Kafka and retrieve messages from Kafka.
Connecting to Kafka
To create a connection to Kafka:
-
Create a settings object. To do this create an instance of %External.Messaging.KafkaSettingsOpens in a new tab and set its properties as needed. Most of the properties, listed below, apply to both producers and consumers; the exception is the groupId setting, which is used only to assign a consumer to a consumer group.
-
username and password define the client's Kafka credentials.
-
servers defines a comma-separated list of IP address:port entries that identify servers in your Kafka broker cluster.
-
clientId optionally defines the client ID of the Kafka producer or consumer.
-
groupId defines the consumer group ID of a Consumer.
-
securityprotocol specifies the security protocol which secures connections to your Kafka broker cluster. Currently, this property supports two values:
-
SASL_PLAINTEXT, which performs SASL authentication of the client over an unencrypted channel.
-
SASL_SSL, which uses the truststore and keystore information you provide to establish an SSL/TLS connection over which SASL authentication takes place.
-
-
saslmechanism specifies the SASL authentication mechanism used to authenticate the client. Currently, only PLAIN is supported.
-
truststorelocation (optional) specifies the file system path to the truststore which contains the certificate authority certificates necessary to validate a certificate from your Kafka broker cluster and establish an SSL/TLS connection.
-
truststorepassword (optional) defines the password which provides access to the truststore at the location specified by truststorelocation.
-
keystorelocation (optional) specifies the file system path to the keystore which contains the keys necessary to establish an SSL/TLS connection with your Kafka broker cluster.
-
keystorepassword (optional) defines the password which provides access to the keystore at the location specified by keystorelocation.
-
keypassword (optional) defines the password which provides access to a private key within the keystore at the location specified by keystorelocation.
For example:
Set settings = ##class(%External.Messaging.KafkaSettings).%New() Set settings.username = "amandasmith" Set settings.password = "234sdsge" Set settings.servers = "100.0.70.179:9092, 100.0.70.089:7070" Set settings.clientId = "BazcoApp" // If Consumer, specify a consumer group Set settings.groupId = "G1" Set settings.securityprotocol="SASL_SSL" Set settings.saslmechanism="PLAIN" Set settings.truststorelocation="/etc/kafkatls/trustcerts.jks" Set settings.truststorepassword="F00B&r!"
-
-
Create the messaging client object. To do this, call the CreateClient() method of %External.Messaging.ClientOpens in a new tab, passing the settings object as the first argument. For example:
Set client = ##class(%External.Messaging.Client).CreateClient(settings, .tSC) // if tSC is an error, handle error scenario
The method returns a status code by reference as the second argument. Your code should check the status before proceeding.
Because the settings object is an instance of %External.Messaging.KafkaSettingsOpens in a new tab, the method returns an instance of %External.Messaging.KafkaClientOpens in a new tab for client.
Kafka Producers
InterSystems IRIS can act as a Kafka producer by calling API methods to create messages and send them. If the application needs to create the topic where messages will be sent, see Working with Topics. The following flow uses the client to interact with Kafka as a producer:
After creating the Kafka client but before sending messages, the application can customize the Producer using standard Apache ProducerConfig configuration options. The client defaults to a standard Producer configuration, but modifications can be made by passing the Apache options as a JSON string to the UpdateProducerConfig() method. For a list of supported options, refer to the Apache Kafka documentationOpens in a new tab. For example, the following code configures the Kafka client with an Apache configuration option:
Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)
Set producerSettings = "{""key.serializer"":""org.apache.kafka.common.serialization.StringSerializer""}"
Set tSC = client.UpdateProducerConfig(producerSettings)
A message sent to Kafka must contain a topic and value, and can optionally contain a key, which acts as a tag for the value. To prepare a message to be sent to a topic, create a new Kafka message object and then define these properties. For example:
Set topic = "quick-start-events"
Set value = "MyMessage", key = "OptionalTag"
Set msg = ##class(%External.Messaging.KafkaMessage).%New()
Set msg.topic = topic
Set msg.value = value
Set msg.key = key
To support Kafka configurations which allow for the exchange of messages which exceed the length of an InterSystems IRIS %String, store your message content as an appropriately encoded binary stream using the property binaryValue instead of value. The binaryValue property can be arbitrarily long.
After creating a message, you can send it to the topic by executing the SendMessage() method. For example:
Set tSC = client.SendMessage(msg)
Kafka Consumers
InterSystems IRIS can act as a Kafka consumer by calling APIs to retrieve messages for a topic. Be sure to define the groupId property when defining the settings of the client in order to identify the consumer group of the Consumer. The following flow uses the client to interact with Kafka as a Consumer:
After creating the Kafka client but before retrieving messages, the application can customize the Consumer using standard Apache ConsumerConfig configuration options. The client defaults to a standard Consumer configuration, but modifications can be made by passing the Apache options as a JSON string to the UpdateConsumerConfig() method. For a list of supported options, refer to the Apache Kafka documentationOpens in a new tab. For example, the following code configures the Kafka client with an Apache configuration option:
Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)
Set consumerSettings = "{""key.deserializer"":""org.apache.kafka.common.serialization.StringDeserializer""}"
Set tSC = client.UpdateConsumerConfig(consumerSettings)
The Kafka client can use the ReceiveMessage() method to act as a Kafka Consumer. This method allows you to set the poll timeout for the message retrieval operation (in milliseconds) by providing a JSON-formatted string as an optional argument. If you wish to do so, create a new instance of the %External.Messaging.KafkaReceiveSettingsOpens in a new tab class and set the pollTimeout property:
Set rset = ##class(%External.Messaging.KafkaReceiveSettings).%New()
Set rset.pollTimeout = 500
If not set, the default poll timeout duration is 100 milliseconds.
To retrieve messages, invoke the ReceiveMessage() method for the Kafka client. This method takes the name of the topic as an argument and returns messages as a %ListOfObjectsOpens in a new tab by reference. If you have specified message retrieval settings using a KafkaReceiveSettings object (as described in the preceding section), provide these settings as a third argument using the ToJSON() method for the settings object. Kafka returns only new messages, that is, messages that have not been previously retrieved by the Consumer. For example, to retrieve and display messages for a topic:
#dim messages As %ListOfObjects
Set tSC = client.ReceiveMessage(topic, .messages, rset.ToJSON())
For i=1:1:messages.Size {
Set msg = messages.GetAt(i)
Write "Message: ", msg.ToJSON(), !
}
For messages which exceed the length of an InterSystems IRIS %String, the Kafka message class also includes a binaryValue property which can store messages of arbitrary length as appropriately encoded binary streams.
Defining AdminClient Configs
An application can customize the Kafka client using standard Apache AdminClient Configs. The client defaults to a standard configuration, but modifications can be made by passing the Apache options as a JSON string to the UpdateAdminConfig()Opens in a new tab method. For a list of supported options, refer to the Apache Kafka documentationOpens in a new tab. For example:
Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)
Set adminSettings = "{""ssl.protocol"":""TLSv1.3""}"
Set tSC = client.UpdateAdminConfig(adminSettings)
Working with Topics
InterSystems IRIS provides an API that can be used to create a new Kafka topic, and another that deletes a topic. When creating a topic, the number of partitions and the replication factor are passed in as arguments. For an introduction to partitions and replication factors, see the Apache Kafka documentationOpens in a new tab. To create a topic:
Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)
Set topic = "quick-start-events"
Set numberOfPartitions = 1
Set replicationFactor = 3
Set tSC = client.CreateTopic(topic,numberOfPartitions,replicationFactor)
As an alterative, you can create the topic with a method that is common to all messaging platforms. When using this alternative, the topic settings are defined in an instance of the %External.Messaging.KafkaTopicSettingsOpens in a new tab class and then passed to the method as a JSON object using the ToJSON() method. See %External.Messaging.Client.CreateQueueOrTopic()Opens in a new tab for details.
Delete a Topic
An application can delete a Kafka topic using the DeleteTopic() method.
Set tSC = client.DeleteTopic(topic)
As an alterative, you can delete the topic with a method that is common to all messaging platforms. See %External.Messaging.Client.DeleteQueueOrTopic()Opens in a new tab for details.
Close Client
An InterSystems IRIS application that is done communicating with Kafka should close the client with the Close() method. For example:
Do:client'="" client.Close()