Kafka メッセージング API の使用法
インターシステムズでは、Kafka メッセージの作成と利用に使用できる API を用意しています。コードは、クライアントを作成した後、メッセージの送信または受信のアクションを実行する、そのクライアントのメソッドを呼び出すことにより、メッセージのプロデューサまたはコンシューマとして機能します。InterSystems IRIS も、Kafka のトピックを作成するメソッドと削除するメソッドを提供しています。
Kafka API は、他のメッセージング・プラットフォームと共有する共通のメッセージング・クラスを基本としています。このページでは、このような共通のクラスで確立されているフローに対し、プラットフォーム固有で発生するバリエーションについて説明します。
インターシステムズでは、ここで説明する API のほか、相互運用プロダクションに使用できる専用のクラスを用意しています。このようなクラスでは、プロダクションで Kafka へメッセージを送信でき、また Kafka からメッセージを取得できます。
Kafka との接続
Kafka との接続を作成するには以下の手順に従います。
-
設定オブジェクトを作成します。そのためには、%External.Messaging.KafkaSettingsOpens in a new tab のインスタンスを作成し、必要に応じてそのプロパティを設定します。以下に挙げるほとんどのプロパティは、プロデューサとコンシューマの両方に適用できますが、groupId 設定は例外で、コンシューマ・グループへのコンシューマの割り当てにのみ使用します。
-
username と password でクライアントの Kafka 認証情報を定義します。
-
servers は、Kafka ブローカ・クラスタ内のサーバを指定する、コンマ区切りの IP address:port エントリのリストを定義します。
-
必要に応じて、Kafka プロデューサまたは Kafka コンシューマのクライアント ID を clientId で定義します。
-
groupId で、コンシューマのコンシューマ・グループ ID を定義します。
-
securityprotocol は、Kafka ブローカ・クラスタへの接続を保護するセキュリティ・プロトコルを指定します。現在、このプロパティは以下の 2 つの値をサポートしています。
-
SASL_PLAINTEXT。これは、暗号化されていないチャンネルでクライアントの SASL 認証を実行します。
-
SASL_SSL。指定したトラストストアとキーストアの情報を使用して、SASL 認証が行われる SSL/TLS 接続を確立します。
-
-
saslmechanism は、クライアントの認証に使用される SASL 認証メカニズムを指定します。現在、PLAIN のみがサポートされています。
-
truststorelocation (オプション) は、Kafka ブローカ・クラスタからの証明書を検証し、SSL/TLS 接続を確立するために必要な認証局の証明書を含むトラストストアへのファイル・システム・パスを指定します。
-
truststorepassword (オプション) は、truststorelocation で指定された場所にあるトラストストアへのアクセスを提供するパスワードを定義します。
-
keystorelocation (オプション) は、Kafka ブローカ・クラスタとの SSL/TLS 接続を確立するために必要なキーを含むキーストアへのファイル・システム・パスを指定します。
-
keystorepassword (オプション) は、keystorelocation で指定された場所にあるキーストアへのアクセスを提供するパスワードを定義します。
-
keypassword (オプション) は、keystorelocation で指定された場所にあるキーストア内の秘密鍵へのアクセスを提供するパスワードを定義します。
例えば以下のようにします。
Set settings = ##class(%External.Messaging.KafkaSettings).%New() Set settings.username = "amandasmith" Set settings.password = "234sdsge" Set settings.servers = "100.0.70.179:9092, 100.0.70.089:7070" Set settings.clientId = "BazcoApp" // If Consumer, specify a consumer group Set settings.groupId = "G1" Set settings.securityprotocol="SASL_SSL" Set settings.saslmechanism="PLAIN" Set settings.truststorelocation="/etc/kafkatls/trustcerts.jks" Set settings.truststorepassword="F00B&r!"
-
-
メッセージング・クライアント・オブジェクトを作成します。そのためには、%External.Messaging.ClientOpens in a new tab の CreateClient() メソッドを呼び出し、設定オブジェクトを 1 番目の引数として渡します。例えば以下のようにします。
Set client = ##class(%External.Messaging.Client).CreateClient(settings, .tSC) // if tSC is an error, handle error scenario
このメソッドからは、参照渡しでステータス・コードが 2 番目の引数として返されます。コードでは、このステータスを確認したうえで処理を継続する必要があります。
settings オブジェクトは %External.Messaging.KafkaSettingsOpens in a new tab のインスタンスであるため、メソッドは client に対する %External.Messaging.KafkaClientOpens in a new tab のインスタンスを返します。
Kafka プロデューサ
InterSystems IRIS は、メッセージを作成する API メソッドを呼び出し、作成したメッセージを送信することにより、Kafka プロデューサとして機能できます。メッセージの送信先とするトピックをアプリケーションで作成する必要がある場合は、"トピックの操作" を参照してください。以下のフローでは、クライアントを使用し、プロデューサとして Kafka を操作しています。
Kafka クライアントの作成後、メッセージを送信する前に、標準の Apache ProducerConfig 構成オプションをアプリケーションで使用して、プロデューサをカスタマイズできます。既定では、クライアントは標準のプロデューサとして構成されますが、Apache オプションを JSON 文字列として UpdateProducerConfig() メソッドに渡すことで、この既定の構成を変更できます。サポートされているオプションの一覧は、Apache Kafka のドキュメントOpens in a new tabを参照してください。例えば、次のコードは、Apache 構成オプションを使用して、Kafka クライアントを構成します。
Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)
Set producerSettings = "{""key.serializer"":""org.apache.kafka.common.serialization.StringSerializer""}"
Set tSC = client.UpdateProducerConfig(producerSettings)
Kafka に送信するメッセージには、topic と value を記述する必要があります。また、値のタグとして機能する key を必要に応じて記述できます。トピックに送信するメッセージを作成するには、新しい Kafka メッセージ・オブジェクトを作成し、そこで上記のプロパティを定義します。例えば以下のようにします。
Set topic = "quick-start-events"
Set value = "MyMessage", key = "OptionalTag"
Set msg = ##class(%External.Messaging.KafkaMessage).%New()
Set msg.topic = topic
Set msg.value = value
Set msg.key = key
InterSystems IRIS の %String で指定されている値を超える長さのメッセージを交換できる Kafka 構成をサポートするには、value プロパティではなく、binaryValue プロパティを使用し、適切にエンコードしたバイナリ・ストリームとしてメッセージ・コンテンツを保存します。binaryValue プロパティの値は任意の長さにすることができます。
作成したメッセージは、SendMessage() メソッドを実行することによってトピックに送信できます。例えば以下のようにします。
Set tSC = client.SendMessage(msg)
Kafka コンシューマ
InterSystems IRIS は、トピックのメッセージを取得する API を呼び出すことで、Kafka コンシューマとして機能できます。コンシューマのコンシューマ・グループを指定するために、クライアントの設定を定義するときに groupId プロパティを必ず定義します。以下のフローでは、クライアントを使用し、コンシューマとして Kafka を操作しています。
Kafka クライアントの作成後、メッセージを取得する前に、標準の Apache ConsumerConfig 構成オプションをアプリケーションで使用して、コンシューマをカスタマイズできます。既定では、クライアントは標準のコンシューマとして構成されますが、Apache オプションを JSON 文字列として UpdateConsumerConfig() メソッドに渡すことで、この既定の構成を変更できます。サポートされているオプションの一覧は、Apache Kafka のドキュメントOpens in a new tabを参照してください。例えば、次のコードは、Apache 構成オプションを使用して、Kafka クライアントを構成します。
Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)
Set consumerSettings = "{""key.deserializer"":""org.apache.kafka.common.serialization.StringDeserializer""}"
Set tSC = client.UpdateConsumerConfig(consumerSettings)
Kafka クライアントは、ReceiveMessage() メソッドを使用して Kafka コンシューマとして機能できます。このメソッドでは、JSON 形式の文字列をオプションの引数として指定することで、メッセージの取得をポーリングする際のタイムアウト (ミリ秒) を設定できます。そのためには、次のように %External.Messaging.KafkaReceiveSettingsOpens in a new tab クラスの新しいインスタンスを作成し、pollTimeout プロパティを設定します。
Set rset = ##class(%External.Messaging.KafkaReceiveSettings).%New()
Set rset.pollTimeout = 500
このように設定しない場合、ポーリングのタイムアウト時間は既定値の 100 ミリ秒になります。
メッセージを取得するには、Kafka クライアント ReceiveMessage() メソッドを呼び出します。このメソッドは、トピックの名前を引数として取り、参照渡しで %ListOfObjectsOpens in a new tab としてメッセージを返します。前のセクションで説明したように KafkaReceiveSettings オブジェクトを使用してメッセージ取得設定を指定している場合は、設定オブジェクトの ToJSON() メソッドを使用して、その設定を 3 番目の引数として指定します。Kafka からは、コンシューマがまだ取得していない新しいメッセージのみが返されます。例えば、トピックのメッセージを取得し、表示する場合、次のようになります。
#dim messages As %ListOfObjects
Set tSC = client.ReceiveMessage(topic, .messages, rset.ToJSON())
For i=1:1:messages.Size {
Set msg = messages.GetAt(i)
Write "Message: ", msg.ToJSON(), !
}
InterSystems IRIS の %String に指定された値を超える長さのメッセージを処理できるように、Kafka のメッセージ・クラスには、適切にエンコードしたバイナリ・ストリームとして任意の長さのメッセージを保存できる binaryValue プロパティも用意されています。
AdminClient Configs の定義
アプリケーションでは、標準の Apache AdminClient Configs を使用して Kafka クライアントをカスタマイズできます。クライアントは既定で標準の構成になりますが、Apache オプションを JSON 文字列として UpdateAdminConfig()Opens in a new tab メソッドに渡すことで、この既定の構成を変更できます。サポートされているオプションの一覧は、Apache Kafka のドキュメントOpens in a new tabを参照してください。例えば以下のようにします。
Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)
Set adminSettings = "{""ssl.protocol"":""TLSv1.3""}"
Set tSC = client.UpdateAdminConfig(adminSettings)
トピックの操作
InterSystems IRIS は、新しい Kafka トピック、およびトピックを削除する別のトピックを作成するために使用できる API を提供します。トピックを作成するときは、パーティション数とレプリケーション・ファクタが引数として渡されます。パーティションとレプリケーション・ファクタの概要は、Apache Kafka のドキュメントOpens in a new tabを参照してください。トピックを作成するには以下のよう指定します。
Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)
Set topic = "quick-start-events"
Set numberOfPartitions = 1
Set replicationFactor = 3
Set tSC = client.CreateTopic(topic,numberOfPartitions,replicationFactor)
代わりに、すべてのメッセージング・プラットフォームに共通するメソッドを使用して、トピックを作成することもできます。この代替手法を使用する場合は、%External.Messaging.KafkaTopicSettingsOpens in a new tab クラスのインスタンスにトピックの設定を定義し、ToJSON() メソッドを使用して、その設定を JSON オブジェクトとして共通メソッドに渡します。詳細は、"%External.Messaging.Client.CreateQueueOrTopic()Opens in a new tab" を参照してください。
トピックの削除
アプリケーションで DeleteTopic() メソッドを使用して Kafka トピックを削除できます。
Set tSC = client.DeleteTopic(topic)
代わりに、すべてのメッセージング・プラットフォームに共通するメソッドを使用して、トピックを削除することもできます。詳細は、"%External.Messaging.Client.DeleteQueueOrTopic()Opens in a new tab" を参照してください。
クライアントの終了
Kafka との通信を完了した InterSystems IRIS アプリケーションは、Close() メソッドを使用してクライアントを閉じる必要があります。例えば以下のようにします。
Do:client'="" client.Close()