メインコンテンツまでスキップ

Overview of data loading

#データロードの概要

\import InsertPrivNote from '../assets/commonMarkdown/insertPrivNote.md'

データロードは、ビジネス要件に基づいてさまざまなデータソースからの生データをクレンジングおよび変換し、結果のデータをStarRocksにロードして高速なデータ分析を実現するプロセスです。

ロードジョブを実行することで、StarRocksにデータをロードすることができます。各ロードジョブには、ユーザーが指定するか、StarRocksが自動的に生成するユニークなラベルがあります。各ラベルは1つのロードジョブのみに使用できます。ロードジョブが完了した後は、そのラベルを他のロードジョブに再利用することはできません。ラベルの再利用は、失敗したロードジョブの場合にのみ行うことができます。このメカニズムにより、特定のラベルに関連付けられたデータは、1度しかロードできないAt-Most-Onceセマンティクスが実装されます。

StarRocksが提供するすべてのローディングメソッドは、アトミック性を保証できます。アトミック性とは、ロードジョブ内の有効なデータがすべて正常にロードされるか、有効なデータのいずれかがロードされないことを意味します。有効なデータの一部がロードされる一方、他のデータがロードされないことはありません。ただし、有効なデータにはデータ型変換エラーなどの品質上の問題によりフィルタリングされたデータは含まれません。

StarRocksは、ロードジョブを提出するために使用できる2つの通信プロトコル、MySQLおよびHTTPをサポートしています。各ローディングメソッドがサポートするプロトコルについての詳細は、このトピックの"ローディングメソッド"セクションを参照してください。

サポートされるデータ型

StarRocksは、すべてのデータ型のデータのロードをサポートしています。ただし、いくつかの特定のデータ型のロードには制限があることに注意してください。詳細については、データ型を参照してください。

ロードモード

StarRocksは、同期ロードモードと非同期ロードモードの2つのロードモードをサポートしています。

注意

外部プログラムを使用してデータをロードする場合は、最適なロードモードを選択する必要があります。

同期ローディング

同期ロードモードでは、ロードジョブを送信すると、StarRocksはジョブを同期的に実行してデータをロードし、ジョブが完了した後にジョブの結果を返します。ジョブの結果に基づいて、ジョブが成功したかどうかを確認することができます。

StarRocksは、同期ローディングをサポートする2つのローディングメソッド、Stream LoadおよびINSERTを提供しています。

同期ロードのプロセスは以下の通りです。

  1. ロードジョブを作成します。
  2. StarRocksが返すジョブの結果を表示します。
  3. ジョブの結果に基づいて、ジョブが成功したかどうかを確認します。ジョブの結果がロードの失敗を示す場合、ジョブを再試行することができます。

非同期ロード

非同期ロードモードでは、ロードジョブを送信すると、StarRocksはジョブの作成結果をすぐに返します。

  • 結果がジョブの作成成功を示す場合、StarRocksは非同期でジョブを実行します。ただし、これはデータが正常にロードされたことを意味するわけではありません。ジョブのステータスを確認するためにステートメントやコマンドを使用し、ジョブのステータスに基づいてデータが正常にロードされたかどうかを判断する必要があります。
  • 結果がジョブの作成失敗を示す場合、エラー情報に基づいてジョブを再試行するかどうかを判断することができます。

StarRocksは、非同期ローディングをサポートする3つのローディングメソッド、ブローカーロードルーチンロード、およびSparkロードを提供しています。

非同期ロードのプロセスは以下の通りです。

  1. ロードジョブを作成します。
  2. StarRocksが返すジョブの作成結果を表示し、ジョブが正常に作成されたかどうかを判断します。 a. ジョブの作成に成功した場合、ステップ3に進みます。 b. ジョブの作成に失敗した場合、ステップ1に戻ります。
  3. ジョブのステータスが「FINISHED」または「CANCELLED」を表示するまで、ステートメントやコマンドを使用してジョブのステータスを確認します。

ブローカーロードまたはSparkロードジョブのワークフローは、次の図に示すように、5つのステージで構成されています。

ブローカーロードまたはSparkロードオーバーフロー

ワークフローは以下のように説明されます。

  1. **PENDING(保留中)**ジョブは、FEによるスケジュールを待ってキューで保留状態にあります。

  2. ETLFEは、クレンジング、パーティショニング、ソート、集約など、データのプリプロセスを行います。

    注意

    ジョブがブローカーロードジョブの場合、このステージは直接完了します。

  3. LOADING(ローディング中)FEはデータをクレンジングおよび変換し、データをBEに送信します。すべてのデータがロードされた後、データはキューで有効になるのを待ちます。この時間帯、ジョブのステータスはLOADINGのままです。

  4. FINISHED(完了)全てのデータが有効化された後、ジョブのステータスがFINISHEDになります。この時点でデータをクエリできます。FINISHEDは最終的なジョブのステートです。

  5. CANCELLED(取り消し済み)ジョブのステータスがFINISHEDになる前にいつでもジョブをキャンセルすることができます。また、ロードエラーの場合はStarRocksが自動的にジョブをキャンセルすることもできます。ジョブがキャンセルされると、ジョブのステータスはCANCELLEDになります。CANCELLEDも最終的なジョブのステートです。

ルーチンジョブのワークフローは以下のように説明されます。

  1. MySQLクライアントからFEにジョブが送信されます。
  2. FEはジョブを複数のタスクに分割します。各タスクは、複数のパーティションからデータをロードするためのものです。
  3. FEはタスクを指定されたBEに配布します。
  4. BEはタスクを実行し、タスクが完了した後にFEに報告します。
  5. FEは、BEからのレポートに基づいて、次のタスクを生成し、失敗したタスクを再試行し、またはタスクスケジューリングを一時停止します。

ロードメソッド

StarRocksは、さまざまなビジネスシナリオでデータをロードするのに役立つ5つのローディングメソッド、Stream LoadBroker LoadRoutine LoadSpark Load、およびINSERT INTO SELECTを提供しています。

ローディングメソッド

データソース

ビジネスシナリオ

1回のロードジョブのデータ量

データファイル形式

ローディングモード

プロトコル

Stream Load

ローカルファイル、データストリーム

ロカ↑ルファイルシステムからデータファイルをロードするか、プログラムを使用してデータストリームをロードします。

10 GB以下

CSV、JSON

同期

HTTP

Broker Load

HDFS、Amazon S3、Google GCS、Microsoft Azure Storage、Alibaba Cloud OSS、Tencent Cloud COS、Huawei Cloud OBSなどの他のS3互換ストレージシステム

HDFSやクラウドストレージからデータをロードします。

数十GBから数百GB

CSV、Parquet、ORC

非同期

MySQL

Routine Load

Apache Kafka®

Kafkaからリアルタイムでデータをロードします。

MBからGB程度のデータをミニバッチとして

CSV、JSON、Avro

非同期

MySQL

Spark Load

HDFS、Hive

HDFSやHiveから大量のデータをApache Spark™クラスタを使用して移行します。グローバルデータディクショナリを使用してデータを重複排除します。

数十GBから数TB

CSV、ORC、Parquet

非同期

MySQL

INSERT INTO SELECT

StarRocksテーブル、外部テーブル、AWS S3

外部テーブルからデータをロードします。StarRocksテーブル間でデータをロードします。

固定されていません(メモリサイズによってデータ量が異なります。)

StarRocksテーブル

同期

MySQL

INSERT INTO VALUES

プログラム、ETLツール

小量のデータを個別のレコードとして挿入します。JDBCなどのAPIを使用してデータをロードします。

少量のデータ

SQL

同期

MySQL

ビジネスシナリオ、データ量、データソース、データファイル形式、およびローディング頻度に基づいて、ロードメソッドを選択することができます。また、次の点に注意しながらロードメソッドを選択してください。

  • Kafkaからデータをロードする場合、ルーチンロードを使用することをおすすめします。ただし、データがマルチテーブルの結合や抽出、変換、ロード(ETL)操作を必要とする場合は、Apache Flink®を使用してKafkaからデータを読み取り、前処理し、flink-connector-starrocksを使用してデータをStarRocksにロードすることができます。

  • Hive、Iceberg、Hudi、またはDelta Lakeからデータをロードする場合は、HiveカタログIcebergカタログHudiカタログ、またはDelta Lakeカタログを作成し、それからINSERTを使用してデータをロードすることをおすすめします。

  • 他のStarRocksクラスタまたはElasticsearchクラスタからデータをロードする場合は、StarRocks外部テーブルまたはElasticsearch外部テーブルを作成し、INSERTを使用してデータをロードします。

    注意

    StarRocks外部テーブルはデータの書き込みのみをサポートしています。データの読み取りはサポートされていません。

  • MySQLデータベースからデータをロードする場合は、MySQL外部テーブルを作成し、それからINSERTを使用してデータをロードすることをおすすめします。リアルタイムでデータをロードしたい場合は、MySQLからのリアルタイム同期の手順に従ってデータをロードすることをおすすめします。

  • Oracle、PostgreSQL、SQL Serverなどの他のデータソースからデータをロードする場合は、JDBC外部テーブルを作成し、それからINSERTを使用してデータをロードすることをおすすめします。

次の図は、StarRocksでサポートされているさまざまなデータソースと、これらのデータソースからデータをロードするために使用できるロードメソッドを概観しています。

データロードソース

メモリ制限

StarRocksは、各ロードジョブのメモリ使用量を制限するためのパラメータを提供しており、高い並行性のシナリオではメモリ消費量を削減することができます。ただし、メモリ使用量制限を極端に低く設定しないでください。メモリ使用量制限が極端に低い場合、ロードジョブのメモリ使用量が指定された制限に達するたびにデータが頻繁にメモリからディスクにフラッシュされる可能性があります。ビジネスシナリオに基づいて適切なメモリ使用量制限を指定することをお勧めします。

各ローディングメソッドには、メモリ使用量制限に使用されるパラメータがあります。詳細については、Stream LoadBroker LoadRoutine LoadSpark Load、およびINSERTを参照してください。ただし、通常、1つのロードジョブは複数のBEで実行されます。したがって、これらのパラメータは、すべての関係するBE上の各ロードジョブのメモリ使用量を制限するものであり、関係するBE上のロードジョブの合計メモリ使用量ではありません。

StarRocksはまた、各個別のBEで実行されるすべてのロードジョブの合計メモリ使用量を制限するためのパラメータも提供しています。詳細については、このトピックの「システム構成」セクションを参照してください。

使用上の注意

ロード中に宛先の列を自動的に埋める

データをロードする場合、データファイルの特定のフィールドからデータをロードしないように選択できます。

  • スターロックスのテーブル作成時に、ソースフィールドにマッピングされる宛先スターロックスのテーブル列にDEFAULTキーワードを指定した場合、スターロックスは指定されたデフォルト値を宛先列に自動的に入力します。Stream LoadBroker LoadRoutine LoadINSERTは、DEFAULT current_timestampDEFAULT <default_value>、およびDEFAULT (<expression>)をサポートしています。Spark Loadは、DEFAULT current_timestampDEFAULT <default_value>のみをサポートしています。

    注意

    DEFAULT (<expression>)は、uuid()およびuuid_numeric()関数のみをサポートしています。

  • スターロックスのテーブル作成時に、ソースフィールドにマッピングされる宛先スターロックスのテーブル列でDEFAULTキーワードを指定しなかった場合、スターロックスは宛先列にNULLを自動的に入力します。

    注意

    宛先列がNOT NULLとして定義されている場合、ロードは失敗します。

    Stream LoadBroker LoadRoutine LoadSpark Loadでは、宛先列に入力する値をカラムマッピングを指定するためのパラメータを使用することもできます。

NOT NULLDEFAULTの使用方法については、CREATE TABLEを参照してください。

データローディングのための書き込みクォーラムの設定

StarRocksクラスタに複数のデータレプリカがある場合、ロード成功の返答を得るために必要なレプリカ数をテーブルごとに異なる書き込みクォーラムを設定することができます。write_quorumプロパティをCREATE TABLEで指定するか、既存のテーブルをALTER TABLEで設定することで書き込みクォーラムを指定できます。このプロパティはv2.5以降対応しています。

システム構成

このセクションでは、StarRocksが提供するすべてのローディングメソッドに適用されるいくつかのパラメータ構成について説明します。

FEの構成

各FEの構成ファイルfe.confで次のパラメータを設定できます。

  • max_load_timeout_secondおよびmin_load_timeout_secondこれらのパラメータは、各ロードジョブの最大タイムアウト期間と最小タイムアウト期間を指定します。タイムアウト期間は秒単位で測定されます。デフォルトの最大タイムアウト期間は3日で、デフォルトの最小タイムアウト期間は1秒です。指定する最大タイムアウト期間と最小タイムアウト期間は、1秒から3日の範囲内にある必要があります。これらのパラメータは、同期ロードジョブおよび非同期ロードジョブの両方に有効です。

  • desired_max_waiting_jobsこのパラメータは、キューで保留中のロードジョブの最大数を指定します。デフォルト値は1024です(v2.4以前のバージョンでは100、v2.5以降のバージョンでは1024)。FE上のPENDINGステートにあるロードジョブの数が指定した最大数に達した場合、FEは新規ロードリクエストを拒否します。このパラメータは非同期ロードジョブのみに有効です。

  • max_running_txn_num_per_dbこのパラメータは、StarRocksクラスタの各データベースで許可される実行中のロードトランザクションの最大数を指定します。ロードジョブには1つ以上のトランザクションが含まれることがあります。デフォルト値は100です。データベースで実行されているロードトランザクションの数が指定した最大数に達すると、後続のロードジョブがスケジュールされなくなります。この状況では、同期ロードジョブを提出するとジョブが拒否されます。非同期ロードジョブを提出すると、ジョブはキューで保留されます。

    注意

    StarRocksは、同期ロードジョブと非同期ロードジョブを区別せずに、すべてのロードジョブをカウントします。

  • label_keep_max_secondこのパラメータは、完了したロードジョブの終了またはキャンセルされたステータスである履歴レコードの保持期間を指定します。デフォルトの保持期間は3日です。このパラメータは、同期ロードジョブおよび非同期ロードジョブの両方に対して有効です。

BEの構成

各BEの構成ファイルbe.confで次のパラメータを設定できます。

  • write_buffer_sizeこのパラメータは、最大メモリブロックサイズを指定します。デフォルトのサイズは100 MBです。ロードされたデータは、まずBEのメモリブロックに書き込まれます。ロードされたデータの量が指定した最大メモリブロックサイズに達すると、データはディスクにフラッシュされます。ビジネスシナリオに基づいて適切な最大メモリブロックサイズを指定する必要があります。
    • 最大メモリブロックサイズが極端に小さい場合、BE上に多数の小さなファイルが生成される可能性があります。この場合、クエリのパフォーマンスが低下します。生成されるファイルの数を減らすために最大メモリブロックサイズを増やすことができます。
    • 最大メモリブロックサイズが極端に大きい場合、リモートプロシージャコール(RPC)がタイムアウトする場合があります。この場合は、ビジネスニーズに応じてこのパラメータの値を調整することができます。
  • streaming_load_rpc_max_alive_time_sec各ライタープロセスの待機タイムアウト期間を指定します。デフォルトの値は600秒です。データローディングプロセスでは、StarRocksは各テーブルからデータを受信し、各テーブルにデータを書き込むためにライタープロセスを起動します。指定した待機タイムアウト期間内にライタープロセスがデータを受信しない場合、StarRocksはライタープロセスを停止します。StarRocksクラスタが低速でデータを処理する場合、ライタープロセスは長い時間データを受信せず、「TabletWriter add batch with unknown id」というエラーが発生する場合があります。この場合、このパラメータの値を増やすことができます。
  • load_process_max_memory_limit_bytesおよびload_process_max_memory_limit_percentこれらのパラメータは、各個別のBE上のすべてのロードジョブのメモリ使用量の上限を指定します。StarRocksは、2つのパラメータの値のうち小さい方を最終の許可されるメモリ使用量として識別します。
    • load_process_max_memory_limit_bytes:最大メモリサイズを指定します。デフォルトの最大メモリサイズは100 GBです。
    • load_process_max_memory_limit_percent:最大メモリ使用量を指定します。デフォルト値は30%です。このパラメータはmem_limitパラメータとは異なります。mem_limitパラメータは、StarRocksクラスタの総最大メモリ使用量を指定し、デフォルト値は90% x 90%です。BEが配置されているマシンのメモリ容量をMとすると、ロードジョブのメモリ使用量の最大量は次のように計算されます。M x 90% x 90% x 30%

システム変数の設定

次のシステム変数を設定できます。

  • query_timeoutクエリのタイムアウト期間。単位:秒。値の範囲:1から259200。デフォルト値:300。この変数は、現在の接続のすべてのクエリステートメント、およびINSERTステートメントに対して適用されます。

トラブルシューティング

詳細については、データロードのFAQを参照してください。