Skip to main content

This documentation is for an older version of this product. See the latest version of this content.Opens in a new tab

Kafka メッセージング API の使用法

インターシステムズでは、Kafka メッセージの作成と利用に使用できる API を用意しています。コードは、クライアントを作成した後、メッセージの送信または受信のアクションを実行する、そのクライアントのメソッドを呼び出すことにより、メッセージのプロデューサまたはコンシューマとして機能します。InterSystems IRIS も、Kafka のトピックを作成するメソッドと削除するメソッドを提供しています。

Kafka API は、他のメッセージング・プラットフォームと共有する共通のメッセージング・クラスを基本としています。このページでは、このような共通のクラスで確立されているフローに対し、プラットフォーム固有で発生するバリエーションについて説明します。

インターシステムズでは、ここで説明する API のほか、相互運用プロダクションに使用できる専用のクラスを用意しています。このようなクラスでは、プロダクションで Kafka へメッセージを送信でき、また Kafka からメッセージを取得できます。

Kafka との接続

Kafka との接続を作成するには以下の手順に従います。

  1. 設定オブジェクトを作成します。そのためには、%External.Messaging.KafkaSettingsOpens in a new tab のインスタンスを作成し、必要に応じてそのプロパティを設定します。以下に挙げるほとんどのプロパティは、プロデューサとコンシューマの両方に適用できますが、groupId 設定は例外で、コンシューマ・グループへのコンシューマの割り当てにのみ使用します。

    • usernamepassword でクライアントの Kafka 認証情報を定義します。

    • servers は、Kafka クラスタにあるサーバを特定する IP アドレス ポートのエントリをコンマ区切りの一覧で定義します。

    • 必要に応じて、Kafka プロデューサまたは Kafka コンシューマのクライアント ID を clientId で定義します。

    • groupId で、コンシューマのコンシューマ・グループ ID を定義します。

    例えば以下のようにします。

     Set settings = ##class(%External.Messaging.KafkaSettings).%New()
     Set settings.username = "amandasmith"
     Set settings.password = "234sdsge"
     Set settings.servers = "100.0.70.179:9092, 100.0.70.089:7070"
     Set settings.clientId = "BazcoApp"
     // If Consumer, specify a consumer group
     Set settings.groupId = "G1"
    
  2. メッセージング・クライアント・オブジェクトを作成します。そのためには、%External.Messaging.ClientOpens in a new tabCreateClient() メソッドを呼び出し、設定オブジェクトを 1 番目の引数として渡します。例えば以下のようにします。

     Set client = ##class(%External.Messaging.Client).CreateClient(settings, .tSC)
     If $$$ISERR(tSC) { //handle error scenario }
    

    このメソッドからは、参照渡しでステータス・コードが 2 番目の引数として返されます。コードでは、このステータスを確認したうえで処理を継続する必要があります。

    settings オブジェクトは %External.Messaging.KafkaSettingsOpens in a new tab のインスタンスなので、返されるオブジェクト (client) は %External.Messaging.KafkaClientOpens in a new tab のインスタンスです。

Kafka プロデューサ

InterSystems IRIS は、メッセージを作成する API メソッドを呼び出し、作成したメッセージを送信することにより、Kafka プロデューサとして機能できます。メッセージの送信先とするトピックをアプリケーションで作成する必要がある場合は、"トピックの操作" を参照してください。以下のフローでは、クライアントを使用し、プロデューサとして Kafka を操作しています。

プロデューサとしてのクライアントの構成 (オプション)

Kafka クライアントの作成後、メッセージを送信する前に、標準の Apache ProducerConfig 構成オプションをアプリケーションで使用して、プロデューサをカスタマイズできます。既定では、クライアントは標準のプロデューサとして構成されますが、Apache オプションを JSON 文字列として UpdateProducerConfig() メソッドに渡すことで、この既定の構成を変更できます。サポートされているオプションの一覧は、Apache Kafka のドキュメントOpens in a new tabを参照してください。例えば、次のコードは、Apache 構成オプションを使用して、Kafka クライアントを構成します。

 Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)

 Set producerSettings = "{""key.serializer"":""org.apache.kafka.common.serialization.StringSerializer""}"
 Set tSC = client.UpdateProducerConfig(producerSettings)
メッセージの作成

Kafka に送信するメッセージには、topicvalue を記述する必要があります。また、値のタグとして機能する key を必要に応じて記述できます。トピックに送信するメッセージを作成するには、新しい Kafka メッセージ・オブジェクトを作成し、そこで上記のプロパティを定義します。例えば以下のようにします。

 Set topic = "quick-start-events"
 Set value = "MyMessage", key = "OptionalTag"

 Set msg = ##class(%External.Messaging.KafkaMessage).%New()
 Set msg.topic = topic
 Set msg.value = value
 Set msg.key = key

InterSystems IRIS の %String で指定されている値を超える長さのメッセージを交換できる Kafka 構成をサポートするには、value プロパティではなく、binaryValue プロパティを使用し、適切にエンコードしたバイナリ・ストリームとしてメッセージ・コンテンツを保存します。binaryValue プロパティの値は任意の長さにすることができます。

メッセージの送信

作成したメッセージは、SendMessage() メソッドを実行することによってトピックに送信できます。例えば以下のようにします。

 Set tSC = client.SendMessage(msg)

Kafka コンシューマ

InterSystems IRIS は、トピックのメッセージを取得する API を呼び出すことで、Kafka コンシューマとして機能できます。コンシューマのコンシューマ・グループを指定するために、クライアントの設定を定義するときに groupId プロパティを必ず定義します。以下のフローでは、クライアントを使用し、コンシューマとして Kafka を操作しています。

コンシューマとしてのクライアントの構成 (オプション)

Kafka クライアントの作成後、メッセージを取得する前に、標準の Apache ConsumerConfig 構成オプションをアプリケーションで使用して、コンシューマをカスタマイズできます。既定では、クライアントは標準のコンシューマとして構成されますが、Apache オプションを JSON 文字列として UpdateConsumerConfig() メソッドに渡すことで、この既定の構成を変更できます。サポートされているオプションの一覧は、Apache Kafka のドキュメントOpens in a new tabを参照してください。例えば、次のコードは、Apache 構成オプションを使用して、Kafka クライアントを構成します。

 Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)

 Set consumerSettings = "{""key.deserializer"":""org.apache.kafka.common.serialization.StringDeserializer""}"
 Set tSC = client.UpdateConsumerConfig(consumerSettings)
メッセージ取得のための設定の構成 (オプション)

Kafka クライアントは、ReceiveMessage() メソッドを使用して Kafka コンシューマとして機能できます。このメソッドでは、JSON 形式の文字列をオプションの引数として指定することで、メッセージの取得をポーリングする際のタイムアウト (ミリ秒) を設定できます。そのためには、次のように %External.Messaging.KafkaReceiveSettingsOpens in a new tab クラスの新しいインスタンスを作成し、pollTimeout プロパティを設定します。

Set rset = ##class(%External.Messaging.KafkaReceiveSettings).%New()
Set rset.pollTimeout = 500

このように設定しない場合、ポーリングのタイムアウト時間は既定値の 100 ミリ秒になります。

メッセージの取得

メッセージを取得するには、Kafka クライアント ReceiveMessage() メソッドを呼び出します。このメソッドは、トピックの名前を引数として取り、参照渡しで %ListOfObjectsOpens in a new tab としてメッセージを返します。前のセクションで説明したように KafkaReceiveSettings オブジェクトを使用してメッセージ取得設定を指定している場合は、設定オブジェクトの ToJSON() メソッドを使用して、その設定を 3 番目の引数として指定します。Kafka からは、コンシューマがまだ取得していない新しいメッセージのみが返されます。例えば、トピックのメッセージを取得し、表示する場合、次のようになります。

 #dim messages As %ListOfObjects
 Set tSC = client.ReceiveMessage(topic, .messages, rset.ToJSON())

 For i=1:1:messages.Size {
        Set msg = messages.GetAt(i)
        Write "Message: ", msg.ToJSON(), !
 }

InterSystems IRIS の %String に指定された値を超える長さのメッセージを処理できるように、Kafka のメッセージ・クラスには、適切にエンコードしたバイナリ・ストリームとして任意の長さのメッセージを保存できる binaryValue プロパティも用意されています。

AdminClient Configs の定義

アプリケーションでは、標準の Apache AdminClient Configs を使用して Kafka クライアントをカスタマイズできます。クライアントは既定で標準の構成になりますが、Apache オプションを JSON 文字列として UpdateAdminConfig()Opens in a new tab メソッドに渡すことで、この既定の構成を変更できます。サポートされているオプションの一覧は、Apache Kafka のドキュメントOpens in a new tabを参照してください。例えば以下のようにします。

Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)

Set adminSettings = "{""ssl.protocol"":""TLSv1.3""}"
Set tSC = client.UpdateAdminConfig(adminSettings)

トピックの操作

InterSystems IRIS は、新しい Kafka トピック、およびトピックを削除する別のトピックを作成するために使用できる API を提供します。トピックを作成するときは、パーティション数とレプリケーション・ファクタが引数として渡されます。パーティションとレプリケーション・ファクタの概要は、Apache Kafka のドキュメントOpens in a new tabを参照してください。トピックを作成するには以下のよう指定します。

 Set client = ##class(%External.Messaging.Client).CreateClient(kafkaSettings, .tSC)

 Set topic = "quick-start-events"
 Set numberOfPartitions = 1
 Set replicationFactor = 3
 Set tSC = client.CreateTopic(topic,numberOfPartitions,replicationFactor)

代わりに、すべてのメッセージング・プラットフォームに共通するメソッドを使用して、トピックを作成することもできます。この代替手法を使用する場合は、%External.Messaging.KafkaTopicSettingsOpens in a new tab クラスのインスタンスにトピックの設定を定義し、ToJSON() メソッドを使用して、その設定を JSON オブジェクトとして共通メソッドに渡します。詳細は、"%External.Messaging.Client.CreateQueueOrTopic()Opens in a new tab" を参照してください。

トピックの削除

アプリケーションで DeleteTopic() メソッドを使用して Kafka トピックを削除できます。

 Set tSC = client.DeleteTopic(topic)

代わりに、すべてのメッセージング・プラットフォームに共通するメソッドを使用して、トピックを削除することもできます。詳細は、"%External.Messaging.Client.DeleteQueueOrTopic()Opens in a new tab" を参照してください。

クライアントの終了

Kafka との通信を完了した InterSystems IRIS アプリケーションは、Close() メソッドを使用してクライアントを閉じる必要があります。例えば以下のようにします。

 Do:client'="" client.Close()
FeedbackOpens in a new tab