Load data using Stream Load transaction interface
Stream Loadトランザクションインターフェースを使用してデータをロードする
\import InsertPrivNote from '../assets/commonMarkdown/insertPrivNote.md'
StarRocksは、Apache Flink®やApache Kafka®などの外部システムからデータをロードするために、v2.4以降でStream Loadトランザクションインターフェースを提供しています。Stream Loadトランザクションインターフェースは、高並行のストリームロードのパフォーマンスを向上させるのに役立ちます。
このトピックでは、Stream Loadトランザクションインターフェースとこのインターフェースを使用してStarRocksにデータをロードする方法について説明します。
説明
Stream Loadトランザクションインターフェースは、API操作を呼び出すためにHTTPプロトコル互換のツールや言語を使用することができます。このトピックでは、curlを例として使用してこのインターフェースの使用方法を説明します。このインターフェースには、トランザクション管理、データ書き込み、トランザクションプリコミット、トランザクションの重複削除、トランザクションのタイムアウト管理など、さまざまな機能が備わっています。
トランザクション管理
Stream Loadトランザクションインターフェースは、次のAPI操作を提供しています。
/api/transaction/begin
:新しいトランザクションを開始します。/api/transaction/commit
:現在のトランザクションをコミットしてデータ変更を永続化します。/api/transaction/rollback
:現在のトランザクションをロールバックしてデータ変更を中止します。
トランザクションプリコミット
Stream Loadトランザクションインターフェースは、/api/transaction/prepare
操作を提供しています。この操作は、現在のトランザクションをプリコミットし、データ変更を一時的に永続化します。トランザクションをプリコミットした後は、StarRocksクラスターがダウンしている場合でも、StarRocksクラスターが正常に復旧した後にトランザクションをコミットすることができます。
注釈
トランザクションがプリコミットされた後、トランザクションを使用してデータを引き続き書き込まないでください。トランザクションを使用してデータを書き込み続けると、書き込みリクエストがエラーを返します。
データ書き込み
Stream Loadトランザクションインターフェースは、/api/transaction/load
操作を提供しています。この操作は、データを書き込むために使用されます。この操作は、1つのトランザクション内で複数回呼び出すことができます。
トランザクションの重複削除
Stream Loadトランザクションインターフェースは、StarRocksのラベリングメカニズムを引き継いでいます。各トランザクションに固有のラベルをバインドすることで、トランザクションのアトモストワンスガラントを実現することができます。
トランザクションのタイムアウト管理
各FEの設定ファイルにあるstream_load_default_timeout_second
パラメータを使用して、そのFEのデフォルトのトランザクションタイムアウト期間を指定することができます。
トランザクションを作成する際に、HTTPリクエストヘッダーのtimeout
フィールドを使用して、トランザクションのタイムアウト期間を指定することができます。
トランザクションを作成する際に、HTTPリクエストヘッダーのidle_transaction_timeout
フィ ールドを使用して、トランザクションがアイドル状態で維持できるタイムアウト期間を指定することができます。タイムアウト期間内にデータが書き込まれない場合、トランザクションは自動的にロールバックされます。
利点
Stream Loadトランザクションインターフェースには、以下の利点があります。
- エグザクトリーワンスセマンティクストランザクションは、プリコミットとコミットの2つのフェーズに分割されており、システム間でのデータロードを簡単に行うことができます。このインターフェースを使用することで、Flinkからのデータロードに対してエグザクトリーワンスセマンティクスが保証されます。
- パフォーマンスの向上プログラムを使用してロードジョブを実行する場合、Stream Loadトランザクションインターフェースを使用することで、複数のミニバッチのデータを必要に応じてマージし、
/api/transaction/commit
操作を呼び出して一度に送信することができます。そのため、ロードする必要のあるデータバージョンが少なくなり、ロードのパフォーマンスが向上します。
制約事項
Stream Loadトランザクションインターフェースには、以下の制約事項があります。
- 単一データベース単一テーブルのトランザクションのみがサポートされています。複数データベース複数テーブルのトランザクシ ョンは開発中です。
- 1つのクライアントからの同時データ書き込みのみがサポートされています。複数のクライアントからの同時データ書き込みのサポートは開発中です。
/api/transaction/load
操作をトランザクション内で複数回呼び出すことができます。この場合、呼び出されたすべての/api/transaction/load
操作のパラメータ設定は同じでなければなりません。- Stream Loadトランザクションインターフェースを使用してCSV形式のデータをロードする場合、データファイルの各データレコードが行区切り記号で終了していることを確認してください。
注意事項
/api/transaction/begin
、/api/transaction/load
、または/api/transaction/prepare
操作がエラーを返した場合、トランザクションは失敗し、自動的にロールバックされます。- 新しいトランザクションを開始するために
/api/transaction/begin
操作を呼び出す際に、ラベルを指定するオプションがあります。ラベルを指定しない場合、StarRocksはトランザクションのためのラベルを生成します。ただし、後続の/api/transaction/load
、/api/transaction/prepare
、および/api/transaction/commit
操作では、/api/transaction/begin
操作と同じラベルを使用する必要があります。 - 以前のトランザクションのラベルを使用して新しいトランザクションを開始するために
/api/transaction/begin
操作を呼び出すと、前のトランザクションは失敗し、ロールバックされます。 - StarRocksがCSV形式のデータをサポートするデフォルトの列区切り記号は
\t
、行区切り記号は\n
です。データファイルがデフォルトの列区切り記号や行区切り記号を使用していない場合は、"column_separator: <column_separator>"
または"row_delimiter: <row_delimiter>"
を使用して、実際にデータファイルで使用されている列区切り記号や行区切り記号を指定する必要があります。
基本操作
サンプルデータの準備
このトピックでは、CSV形式のデータを使用して説明します。
-
ローカルファイルシステムの
/home/disk1/
パスに、example1.csv
という名前のCSVファイルを作成します。ファイルは3つの列からなり、順にユーザーID、ユーザー名、ユーザースコアを表します。1,Lily,23
2,Rose,23
3,Alice,24
4,Julia,25 -
StarRocksデータベース
test_db
内に、Primary Keyテーブルtable1
を作成します。テーブルは3つの列、id
、name
、score
から構成されており、id
はプライマリキーです。CREATE TABLE `table1`
(
`id` int(11) NOT NULL COMMENT "ユーザーID",
`name` varchar(65533) NULL COMMENT "ユーザー名",
`score` int(11) NOT NULL COMMENT "ユーザースコア"
)
ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10;
トランザクションの開始
構文
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" -H "table:<table_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin
例
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" -H "table:table1" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin
注釈
この例では、トランザクションのラベルとして
streamload_txn_example1_table1
が指定されています。
戻り値
-
トランザクションが正常に開始された場合、次の結果が返されます:
{
"Status": "OK",
"Message": "",
"Label": "streamload_txn_example1_table1",
"TxnId": 9032,
"BeginTxnTimeMs": 0
} -
トランザクションが重複ラベルにバインドされている場合、次の結果が返されます:
{
"Status": "LABEL_ALREADY_EXISTS",
"ExistingJobStatus": "RUNNING",
"Message": "Label [streamload_txn_example1_table1] has already been used."
} -
重複ラベル以外のエラーが発生した場合、次の結果が返されます:
{
"Status": "FAILED",
"Message": ""
}
データの書き込み
構文
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" -H "table:<table_name>" \
-T <file_path> \
-XPUT http://<fe_host>:<fe_http_port>/api/transaction/load
注釈
/api/transaction/load
操作を呼び出す際には、<file_path>
を使用してロードするデータファイルの保存パスを指定する必要があります。
例
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" -H "table:table1" \
-T /home/disk1/example1.csv \
-H "column_separator: ," \
-XPUT http://<fe_host>:<fe_http_port>/api/transaction/load
注釈
この例では、データファイル
example1.csv
で使用される列区切り記号がカンマ(,
)、StarRocksのデフォルトの列区切り記号(\t
)ではないことに注意してください。そのため、/api/transaction/load
操作を呼び出す際には、"column_separator: <column_separator>"
を使用してカンマ(,
)を列区切り記号として指定する必要があります。
戻り値
-
データの書き込みが成功した場合、次の結果が返されます:
{
"TxnId": 1,
"Seq": 0,
"Label": "streamload_txn_example1_table1",
"Status": "OK",
"Message": "",
"NumberTotalRows": 5265644,
"NumberLoadedRows": 5265644,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 10737418067,
"LoadTimeMs": 418778,
"StreamLoadPutTimeMs": 68,
"ReceivedDataTimeMs": 38964,
} -
トランザクションが不明であると見なされた場合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "TXN_NOT_EXISTS"
} -
トランザクションの状態が無効と見なされた場合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "Transcation State Invalid"
} -
上記以外のエラーが発生した場合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": ""
}
トランザクションのプリコミット
構文
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare
例
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare
戻り値
-
プリコミットが成功した場 合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "OK",
"Message": "",
"NumberTotalRows": 5265644,
"NumberLoadedRows": 5265644,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 10737418067,
"LoadTimeMs": 418778,
"StreamLoadPutTimeMs": 68,
"ReceivedDataTimeMs": 38964,
"WriteDataTimeMs": 417851
"CommitAndPublishTimeMs": 1393
} -
トランザクションが存在しないと見なされた場合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "Transcation Not Exist"
} -
プリコミットがタイムアウトした場合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "commit timeout",
} -
上記以外のエラーが発生した場合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "publish timeout"
}
トランザクションのコミット
構文
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit
例
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit
戻り値
-
コミットが成功した場合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "OK",
"Message": "",
"NumberTotalRows": 5265644,
"NumberLoadedRows": 5265644,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 10737418067,
"LoadTimeMs": 418778,
"StreamLoadPutTimeMs": 68,
"ReceivedDataTimeMs": 38964,
"WriteDataTimeMs": 417851
"CommitAndPublishTimeMs": 1393
} -
トランザクションが既にコミットされている場合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "OK",
"Message": "Transaction already commited",
} -
トランザクションが存在しないと見なされた場合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "Transcation Not Exist"
} -
コミットがタイムアウトした場合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "commit timeout",
} -
データの公開がタイムアウトした場合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "publish timeout",
"CommitAndPublishTimeMs": 1393
} -
上記以外のエラーが発生した場合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": ""
}
トランザクションのロールバック
構文
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
-H "Expect:100-continue" \
-H "db:<database_name>" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback
例
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
-H "Expect:100-continue" \
-H "db:test_db" \
-XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback
戻り値
-
ロールバックが成功した場 合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "OK",
"Message": ""
} -
トランザクションが存在しないと見なされた場合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": "Transcation Not Exist"
} -
上記以外のエラーが発生した場合、次の結果が返されます:
{
"TxnId": 1,
"Label": "streamload_txn_example1_table1",
"Status": "FAILED",
"Message": ""
}
参考資料
Stream Loadの適用シナリオやサポートされるデータファイル形式、Stream Loadの動作についての詳細については、次を参照してください:ローカルファイルシステムまたはストリーミングデータソースからのデータをHTTP PUTを使用してロードする。
Stream Loadジョブの作成に使用する構文やパラメータの詳細については、次を参照してください:STREAM LOAD。