メインコンテンツまでスキップ

Load data using Kafka connector

Kafkaコネクタを使用してデータをロードする

StarRocksは、Apache Kafka®コネクタ(StarRocks Connector for Apache Kafka®)という独自開発のコネクタを提供し、Kafkaからメッセージを継続的に消費してStarRocksにロードします。Kafkaコネクタは、少なくとも一度のセマンティクスを保証します。

KafkaコネクタはKafka Connectとシームレスに統合できるため、Kafkaエコシステムとの統合が容易です。リアルタイムデータをStarRocksにロードしたい場合、Kafkaコネクタを使用することをおすすめします。以下のシナリオでは、ルーチンロードの代わりにKafkaコネクタを使用することをおすすめします。

  • ソースデータのフォーマットが、JSONやCSV、Avroではなく、Protobufの場合。
  • Debezium形式のCDCデータなど、データ変換をカスタマイズする場合。
  • 複数のKafkaトピックからデータをロードする場合。
  • Confluent cloudからデータをロードする場合。
  • ロード速度とリソース利用のバランスを実現するために、ロードバッチサイズや並列度などのパラメータを細かく制御する必要がある場合。

準備

Kafka環境を設定する

セルフマネージドのApache KafkaクラスタとConfluentクラウドの両方がサポートされています。

  • セルフマネージドのApache Kafkaクラスタの場合、Apache KafkaクラスタとKafka Connectクラスタをデプロイし、トピックを作成してください。
  • Confluentクラウドの場合、Confluentアカウントを作成し、クラスタとトピックを作成してください。

Kafkaコネクタをインストールする

Kafka ConnectにKafkaコネクタを提出してください。

  • セルフマネージドのKafkaクラスタの場合:
    • starrocks-kafka-connector-1.0.0.tar.gzをダウンロードし、解凍してください。
    • 解凍したディレクトリをKafkaのライブラリディレクトリにコピーしてください。最新のJARファイルを読み込むために、Kafka Connectを再起動してください。
  • Confluentクラウドの場合:

    注意

    現在、KafkaコネクタはConfluent Hubにアップロードされていません。圧縮ファイルをConfluentクラウドにアップロードする必要があります。

StarRocksテーブルを作成する

Kafkaのトピックとデータに基づいて、StarRocksにテーブルまたはテーブルを作成してください。

次の手順では、セルフマネージドのKafkaクラスタを使用して、Kafkaコネクタを構成し、Kafka Connectを起動してStarRocksにデータをロードする方法を示します。

  1. connect-StarRocks-sink.propertiesというKafkaコネクタの構成ファイルを作成し、パラメータを設定してください。パラメータの詳細については、パラメータを参照してください。

    name=starrocks-kafka-connector
    connector.class=com.starrocks.connector.kafka.StarRocksSinkConnector
    topics=dbserver1.inventory.customers
    starrocks.http.url=192.168.xxx.xxx:8030,192.168.xxx.xxx:8030
    starrocks.username=root
    starrocks.password=123456
    starrocks.database.name=inventory
    key.converter=io.confluent.connect.json.JsonSchemaConverter
    value.converter=io.confluent.connect.json.JsonSchemaConverter

    注意

    ソースデータがDebezium形式のCDCデータであり、StarRocksテーブルがプライマリキーテーブルの場合、__opフィールドを追加してソースデータの変更をプライマリキーテーブルに同期するために、transformを構成する必要があります。

  2. Kafkaコネクタを実行してください。以下のコマンドで、以下のコマンドで使用されるパラメータと説明については、Kafkaドキュメントを参照してください。

    • スタンドアロンモード

      bin/connect-standalone worker.properties connect-StarRocks-sink.properties [connector2.properties connector3.properties ...]
    • ディストリビュートモード

      注意

      本番環境ではディストリビュートモードを使用することをおすすめします。

    • ワーカーを起動してください。

      bin/connect-distributed worker.properties
    • ディストリビュートモードでは、コネクタの構成はコマンドラインでは指定されません。代わりに、以下で説明するREST APIを使用してKafkaコネクタの構成と実行を行います。

      curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json" -X POST -d '{
      "name":"starrocks-kafka-connector",
      "config":{
      "connector.class":"com.starrocks.connector.kafka.SinkConnector",
      "topics":"dbserver1.inventory.customers",
      "starrocks.http.url":"192.168.xxx.xxx:8030,192.168.xxx.xxx:8030",
      "starrocks.user":"root",
      "starrocks.password":"123456",
      "starrocks.database.name":"inventory",
      "key.converter":"io.confluent.connect.json.JsonSchemaConverter",
      "value.converter":"io.confluent.connect.json.JsonSchemaConverter"
      }
      }
  3. StarRocksテーブル内のデータをクエリしてください。

パラメータ

パラメータ

必須

デフォルト値

説明

name

YES

このKafkaコネクタの名前です。Kafka Connectクラスタ内のすべてのKafkaコネクタに対してグローバルにユニークである必要があります。例:starrocks-kafka-connector。

connector.class

YES

com.starrocks.connector.kafka.SinkConnector

このKafkaコネクタのシンクに使用されるクラスです。

topics

YES

登録する1つ以上のトピックで、各トピックはStarRocksテーブルに対応します。デフォルトでは、StarRocksはトピック名がStarRocksテーブル名と一致すると見なします。そのため、StarRocksはトピック名を使用して対象のStarRocksテーブルを判断します。topicsまたはtopics.regex(以下)のいずれかを選択してください。ただし、StarRocksテーブル名がトピック名と異なる場合は、optionalなstarrocks.topic2table.mapパラメータ(以下)を使用して、トピック名とテーブル名のマッピングを指定してください。

topics.regex

一致させるための正規表現の1つ以上のトピック。詳細については、トピックを参照してください。topics.regexまたはtopics(上記)のいずれかを選択してください。

starrocks.topic2table.map

NO

トピック名がStarRocksテーブル名と異なる場合の、StarRocksテーブル名とトピック名のマッピングです。形式は <topic-1>:<table-1>,<topic-2>:<table-2>,...です。

starrocks.http.url

YES

StarRocksクラスタのFEのHTTP URLです。フォーマットは <fe_host1>:<fe_http_port1>,<fe_host2>:<fe_http_port2>,...です。複数のアドレスはカンマ(,)で区切られています。例: 192.168.xxx.xxx:8030,192.168.xxx.xxx:8030

starrocks.database.name

YES

StarRocksデータベースの名前です。

starrocks.username

YES

StarRocksクラスタアカウントのユーザ名です。ユーザはStarRocksテーブル上の INSERT権限 が必要です。

starrocks.password

YES

StarRocksクラスタアカウントのパスワードです。

key.converter

YES

このシナリオでは、StarRocksが提供するKafkaコネクタはシンクコネクタです。このパラメータは、シンクコネクタのキーの逆シリアライズに使用されるキーコンバータを指定します。ソースコネクタで使用されるキーコンバータに基づいて、このパラメータの値を決定する必要があります。

value.converter

YES

このパラメータは、シンクコネクタの値の逆シリアライズに使用される値コンバータを指定します。ソースコネクタで使用される値コンバータに基づいて、このパラメータの値を決定する必要があります。

key.converter.schema.registry.url

NO

キーコンバータのスキーマレジストリのURLです。

value.converter.schema.registry.url

NO

値コンバータのスキーマレジストリのURLです。

tasks.max

NO

1

Kafkaコネクタが作成できるタスクスレッドの最大数の上限です。通常、Kafka ConnectクラスタのワーカーノードのCPUコア数と同じです。このパラメータを調整して、ロードパフォーマンスを制御できます。

bufferflush.maxbytes

NO

94371840(90M)

一度にStarRocksに送信される前にメモリに蓄積されるデータの最大サイズです。最大値は64MBから10GBまでです。ここで言及されているしきい値は、Stream Load SDKのバッファがデータをバッファするために複数のStream Loadジョブを作成するかもしれないことに注意してください。したがって、ここで言及されているしきい値は、合計データサイズを指します。

bufferflush.intervalms

NO

300000

バッチのデータを送信する間隔です。ロードの待機時間を制御します。範囲は[1000, 3600000]です。

connect.timeoutms

NO

1000

HTTP URLへの接続のタイムアウトです。範囲は[100, 60000]です。

sink.properties.*

ロード動作を制御するためのStream Loadパラメータです。たとえば、sink.properties.formatパラメータは、CSVやJSONなどのStream Loadに使用される形式を指定します。サポートされているパラメータのリストとその説明については、[STREAM LOAD](../sql-reference/sql-statements/data-manipulation/STREAM LOAD.md)を参照してください。

sink.properties.format

NO

json

Stream Loadに使用される形式です。Kafkaコネクタは、データの各バッチをStarRocksに送信する前に、形式に変換します。有効な値はcsvjsonです。詳細については、 CSV parameters および JSON parameters を参照してください。

制限

  • 1つのKafkaトピックから単一のメッセージを複数のデータ行に展開してStarRocksにロードすることはサポートされていません。
  • Kafkaコネクタのシンクは、少なくとも一度のセマンティクスを保証します。

ベストプラクティス

Debezium形式のCDCデータのロード

KafkaデータがDebezium CDC形式であり、StarRocksテーブルがプライマリキーテーブルの場合、transformsパラメータやその他の関連するパラメータを設定する必要があります。

transforms=addfield,unwrap
transforms.addfield.type=com.starrocks.connector.kafka.transforms.AddOpFieldForDebeziumRecord
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones=true
transforms.unwrap.delete.handling.mode

上記の構成では、transforms=addfield,unwrapを指定しています。

  • addfield変換は、各Debezium CDC形式のレコードに__opフィールドを追加して、StarRocksプライマリキーモデルテーブルをサポートします。StarRocksテーブルがプライマリキーテーブルでない場合は、addfield変換を指定する必要はありません。addfield変換のクラスはcom.Starrocks.Kafka.Transforms.AddOpFieldForDebeziumRecordです。KafkaコネクタのJARファイルに含まれているため、手動でインストールする必要はありません。
  • unwrap変換はDebeziumによって提供され、操作タイプに基づいてDebeziumの複雑なデータ構造を展開するために使用されます。詳細については、New Record State Extractionを参照してください。