メインコンテンツまでスキップ

Load data from HDFS

HDFSからデータをロードする

\import InsertPrivNote from '../assets/commonMarkdown/insertPrivNote.md'

StarRocksは、Broker Loadを使用してHDFSから大量のデータをStarRocksにロードすることができます。

Broker Loadは非同期ローディングモードで実行されます。ロードジョブを提出すると、StarRocksは非同期でジョブを実行します。SELECT * FROM information_schema.loadsを使用してジョブの結果をクエリすることができます。この機能はv3.1以降でサポートされています。詳細については、このトピックの"ロードジョブの表示"セクションを参照してください。

Broker Loadは、複数のデータファイルをロードする場合でも、各ロードジョブのトランザクション的な完全性(すなわち、全てのデータファイルのロードが成功するか、全てが失敗するか)を保証します。

さらに、Broker Loadはデータローディング時のデータ変換や、UPSERTやDELETE操作によるデータの変更をサポートしています。詳細については、データのローディング時のデータ変換およびローディングによるデータの変更を参照してください。

背景情報

v2.4以前では、StarRocksはBroker Loadジョブを実行する際にStarRocksクラスタと外部ストレージシステムとの間の接続を確立するためにブローカーを使用していました。そのため、ロードステートメントで使用するブローカーを指定するためにWITH BROKER "<ブローカー名>"を入力する必要がありました。これは "broker-based loading"と呼ばれます。ブローカーは、ファイルシステムインターフェースに統合された非依存で状態レスのサービスです。ブローカーを使用することで、StarRocksは外部ストレージシステムに保存されているデータファイルにアクセスし、データファイルのデータを自身の計算リソースを使用して前処理し、データをロードすることができます。

v2.5以降では、StarRocksはBroker Loadジョブを実行する際にStarRocksクラスタと外部ストレージシステムとの間の接続を確立するためにブローカーに依存しなくなりました。したがって、ロードステートメントでブローカーを指定する必要はありませんが、WITH BROKERキーワードは引き続き指定する必要があります。これは "broker-free loading"と呼ばれます。

データがHDFSに保存されている場合、ブローカーフリーローディングが機能しない場合があります。これは、データが複数のHDFSクラスタに分散して保存されているか、複数のKerberosユーザーが設定されている場合に発生する可能性があります。このような場合には、ブローカーベースのローディングを使用することができます。これを成功させるためには、少なくとも1つの独立したブローカーグループが展開されていることを確認してください。これらの状況での認証設定とHA設定の指定方法については、HDFSを参照してください。

注意

SHOW BROKERステートメントを使用して、StarRocksクラスタに展開されているブローカーを確認することができます。ブローカーが展開されていない場合、ブローカーの展開の手順に従ってブローカーを展開することができます。

サポートされているデータファイルの形式

Broker Loadは次のデータファイル形式をサポートしています。

  • CSV
  • Parquet
  • ORC

注意

CSVデータの場合、次の点に注意してください:

  • コンマ(,)、タブ、またはパイプ(|)など、長さが50バイトを超えないUTF-8文字列をテキストデリミタとして使用できます。
  • ヌル値は\Nを使用して表します。たとえば、データファイルは3つの列から構成され、そのデータファイルのレコードは最初と3番目の列にデータを持ち、2番目の列にデータを持たない場合、2番目の列には\Nを使用してヌル値を表す必要があります。つまり、レコードはa,\N,bのようにコンパイルする必要があります。a,,bは、レコードの2番目の列に空の文字列があることを意味します。

動作原理

FEにロードジョブを提出すると、FEはクエリプランを生成し、利用可能なBEの数とロードするデータファイルのサイズに基づいてクエリプランを分割し、それぞれのクエリプランの一部を利用可能なBEに割り当てます。ロード中、関与する各BEは外部ストレージシステムのデータファイルのデータを取得し、データを前処理してStarRocksクラスタにデータをロードします。すべてのBEがクエリプランの一部を完了した後、FEはロードジョブが成功したかどうかを判断します。

以下の図は、Broker Loadジョブのワークフローを示しています。

Broker Loadのワークフロー

データの準備例

  1. HDFSクラスタにログインし、指定したパス(例:/user/starrocks/)に2つのCSV形式のデータファイル「file1.csv」と「file2.csv」を作成します。両ファイルはいずれもユーザーID、ユーザー名、ユーザースコアの3つの列で構成されています。
    • file1.csv

      1,Lily,21
      2,Rose,22
      3,Alice,23
      4,Julia,24
    • file2.csv

      5,Tony,25
      6,Adam,26
      7,Allen,27
      8,Jacky,28
  2. StarRocksデータベース(例:test_db)にログインし、「table1」と「table2」という2つのPrimary Keyテーブルを作成します。両テーブルはいずれも「id」、「name」、「score」という3つの列から構成されており、「id」がプライマリキーです。
    CREATE TABLE `table1`
    (
    `id` int(11) NOT NULL COMMENT "ユーザーID",
    `name` varchar(65533) NULL DEFAULT "" COMMENT "ユーザー名",
    `score` int(11) NOT NULL DEFAULT "0" COMMENT "ユーザースコア"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`);

    CREATE TABLE `table2`
    (
    `id` int(11) NOT NULL COMMENT "ユーザーID",
    `name` varchar(65533) NULL DEFAULT "" COMMENT "ユーザー名",
    `score` int(11) NOT NULL DEFAULT "0" COMMENT "ユーザースコア"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`);

ロードジョブの作成

以下の例では、CSV形式と簡単な認証方法を使用しています。他の形式でデータをロードする方法、HA設定を指定する方法、およびKerberos認証メソッドを使用する場合に必要な認証パラメータについては、BROKER LOADを参照してください。

単一のデータファイルを単一のテーブルにロードする

次のステートメントを実行して、file1.csvのデータをtable1にロードします:

LOAD LABEL test_db.label_brokerload_singlefile_singletable
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
)
PROPERTIES
(
"timeout" = "3600"
);

データのクエリ

ロードジョブを送信した後、SELECT * FROM information_schema.loadsを使用してロードジョブの結果を表示できます。この機能はv3.1以降でサポートされています。詳細については、このトピックの"ロードジョブの表示"セクションを参照してください。

ロードジョブが成功したことを確認した後、SELECTを使用してtable1のデータをクエリできます:

SELECT * FROM table1;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 1 | Lily | 21 |
| 2 | Rose | 22 |
| 3 | Alice | 23 |
| 4 | Julia | 24 |
+------+-------+-------+
4 rows in set (0.01 sec)

複数のデータファイルを単一のテーブルにロードする

次のステートメントを実行して、HDFSクラスタの/user/starrocks/パスに格納されているすべてのデータファイル(file1.csvおよびfile2.csv)のデータをtable1にロードします:

LOAD LABEL test_db.label_brokerload_allfile_singletable
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/*")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
)
PROPERTIES
(
"timeout" = "3600"
);

データのクエリ

ロードジョブを送信した後、SELECT * FROM information_schema.loadsを使用してロードジョブの結果を表示できます。この機能はv3.1以降でサポートされています。詳細については、このトピックの"ロードジョブの表示"セクションを参照してください。

ロードジョブが成功したことを確認した後、SELECTを使用してtable1のデータをクエリできます:

SELECT * FROM table1;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 1 | Lily | 21 |
| 2 | Rose | 22 |
| 3 | Alice | 23 |
| 4 | Julia | 24 |
| 5 | Tony | 25 |
| 6 | Adam | 26 |
| 7 | Allen | 27 |
| 8 | Jacky | 28 |
+------+-------+-------+
4 rows in set (0.01 sec)

複数のデータファイルを複数のテーブルにロードする

次のステートメントを実行して、file1.csvおよびfile2.csvのデータをそれぞれtable1およびtable2にロードします:

LOAD LABEL test_db.label_brokerload_multiplefile_multipletable
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/file1.csv")
INTO TABLE table1
COLUMNS TERMINATED BY ","
(id, name, score)
,
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/file2.csv")
INTO TABLE table2
COLUMNS TERMINATED BY ","
(id, name, score)
)
WITH BROKER
(
"username" = "<hdfs_username>",
"password" = "<hdfs_password>"
)
PROPERTIES
(
"timeout" = "3600"
);

データのクエリ

ロードジョブを送信した後、SELECT * FROM information_schema.loadsを使用してロードジョブの結果を表示できます。この機能はv3.1以降でサポートされています。詳細については、このトピックの"ロードジョブの表示"セクションを参照してください。

ロードジョブが成功したことを確認した後、SELECTを使用してtable1およびtable2のデータをクエリできます:

  1. table1をクエリする:

    SELECT * FROM table1;
    +------+-------+-------+
    | id | name | score |
    +------+-------+-------+
    | 1 | Lily | 21 |
    | 2 | Rose | 22 |
    | 3 | Alice | 23 |
    | 4 | Julia | 24 |
    +------+-------+-------+
    4 rows in set (0.01 sec)
  2. table2をクエリする:

    SELECT * FROM table2;
    +------+-------+-------+
    | id | name | score |
    +------+-------+-------+
    | 5 | Tony | 25 |
    | 6 | Adam | 26 |
    | 7 | Allen | 27 |
    | 8 | Jacky | 28 |
    +------+-------+-------+
    4 rows in set (0.01 sec)

ロードジョブの表示

SELECTステートメントを使用して、information_schemaデータベースのloadsテーブルから1つ以上のロードジョブの結果をクエリすることができます。この機能はv3.1以降でサポートされています。

例1:test_dbデータベースで実行されたロードジョブの結果をクエリする場合、クエリ文で最大2つの結果が返され、返された結果は作成時刻(CREATE_TIME)の降順でソートされるよう指定します。

SELECT * FROM information_schema.loads
WHERE database_name = 'test_db'
ORDER BY create_time DESC
LIMIT 2\G

次の結果が返されます:

*************************** 1. row ***************************
JOB_ID: 20686
LABEL: label_brokerload_unqualifiedtest_83
DATABASE_NAME: test_db
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: BROKER
PRIORITY: NORMAL
SCAN_ROWS: 8
FILTERED_ROWS: 0
UNSELECTED_ROWS: 0
SINK_ROWS: 8
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):14400; max_filter_ratio:1.0
CREATE_TIME: 2023-08-02 15:25:22
ETL_START_TIME: 2023-08-02 15:25:24
ETL_FINISH_TIME: 2023-08-02 15:25:24
LOAD_START_TIME: 2023-08-02 15:25:24
LOAD_FINISH_TIME: 2023-08-02 15:25:27
JOB_DETAILS: {"All backends":{"77fe760e-ec53-47f7-917d-be5528288c08":[10006],"0154f64e-e090-47b7-a4b2-92c2ece95f97":[10005]},"FileNumber":2,"FileSize":84,"InternalTableLoadBytes":252,"InternalTableLoadRows":8,"ScanBytes":84,"ScanRows":8,"TaskNumber":2,"Unfinished backends":{"77fe760e-ec53-47f7-917d-be5528288c08":[],"0154f64e-e090-47b7-a4b2-92c2ece95f97":[]}}
ERROR_MSG: NULL
TRACKING_URL: NULL
TRACKING_SQL: NULL
REJECTED_RECORD_PATH: NULL
*************************** 2. row ***************************
JOB_ID: 20624
LABEL: label_brokerload_unqualifiedtest_82
DATABASE_NAME: test_db
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: BROKER
PRIORITY: NORMAL
SCAN_ROWS: 12
FILTERED_ROWS: 4
UNSELECTED_ROWS: 0
SINK_ROWS: 8
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):14400; max_filter_ratio:1.0
CREATE_TIME: 2023-08-02 15:23:29
ETL_START_TIME: 2023-08-02 15:23:34
ETL_FINISH_TIME: 2023-08-02 15:23:34
LOAD_START_TIME: 2023-08-02 15:23:34
LOAD_FINISH_TIME: 2023-08-02 15:23:34
JOB_DETAILS: {"All backends":{"78f78fc3-8509-451f-a0a2-c6b5db27dcb6":[10010],"a24aa357-f7de-4e49-9e09-e98463b5b53c":[10006]},"FileNumber":2,"FileSize":158,"InternalTableLoadBytes":333,"InternalTableLoadRows":8,"ScanBytes":158,"ScanRows":12,"TaskNumber":2,"Unfinished backends":{"78f78fc3-8509-451f-a0a2-c6b5db27dcb6":[],"a24aa357-f7de-4e49-9e09-e98463b5b53c":[]}}
ERROR_MSG: NULL
TRACKING_URL: http://172.26.195.69:8540/api/_load_error_log?file=error_log_78f78fc38509451f_a0a2c6b5db27dcb7
TRACKING_SQL: select tracking_log from information_schema.load_tracking_logs where job_id=20624
REJECTED_RECORD_PATH: 172.26.95.92:/home/disk1/sr/be/storage/rejected_record/test_db/label_brokerload_unqualifiedtest_0728/6/404a20b1e4db4d27_8aa9af1e8d6d8bdc

例2:test_dbデータベースで実行されたロードジョブ(ラベルがlabel_brokerload_unqualifiedtest_82)の結果をクエリする場合:

SELECT * FROM information_schema.loads
WHERE database_name = 'test_db' and label = 'label_brokerload_unqualifiedtest_82'\G

次の結果が返されます:

*************************** 1. row ***************************
JOB_ID: 20624
LABEL: label_brokerload_unqualifiedtest_82
DATABASE_NAME: test_db
STATE: FINISHED
PROGRESS: ETL:100%; LOAD:100%
TYPE: BROKER
PRIORITY: NORMAL
SCAN_ROWS: 12
FILTERED_ROWS: 4
UNSELECTED_ROWS: 0
SINK_ROWS: 8
ETL_INFO:
TASK_INFO: resource:N/A; timeout(s):14400; max_filter_ratio:1.0
CREATE_TIME: 2023-08-02 15:23:29
ETL_START_TIME: 2023-08-02 15:23:34
ETL_FINISH_TIME: 2023-08-02 15:23:34
LOAD_START_TIME: 2023-08-02 15:23:34
LOAD_FINISH_TIME: 2023-08-02 15:23:34
JOB_DETAILS: {"All backends":{"78f78fc3-8509-451f-a0a2-c6b5db27dcb6":[10010],"a24aa357-f7de-4e49-9e09-e98463b5b53c":[10006]},"FileNumber":2,"FileSize":158,"InternalTableLoadBytes":333,"InternalTableLoadRows":8,"ScanBytes":158,"ScanRows":12,"TaskNumber":2,"Unfinished backends":{"78f78fc3-8509-451f-a0a2-c6b5db27dcb6":[],"a24aa357-f7de-4e49-9e09-e98463b5b53c":[]}}
ERROR_MSG: NULL
TRACKING_URL: http://172.26.195.69:8540/api/_load_error_log?file=error_log_78f78fc38509451f_a0a2c6b5db27dcb7
TRACKING_SQL: select tracking_log from information_schema.load_tracking_logs where job_id=20624
REJECTED_RECORD_PATH: 172.26.95.92:/home/disk1/sr/be/storage/rejected_record/test_db/label_brokerload_unqualifiedtest_0728/6/404a20b1e4db4d27_8aa9af1e8d6d8bdc

返される結果のフィールドについての詳細は、Information Schema > loadsを参照してください。

ロードジョブのキャンセル

ロードジョブが CANCELLEDFINISHED の状態でない場合、CANCEL LOADステートメントを使用してジョブをキャンセルすることができます。

たとえば、test_dbデータベースで label1 というラベルのロードジョブをキャンセルするには、次のステートメントを実行します:

CANCEL LOAD
FROM test_db
WHERE LABEL = "label1";

ジョブの分割と同時実行

Broker Loadジョブは、1つ以上のタスクに分割されて同時に実行されることがあります。同じロードジョブ内のタスクは、単一のトランザクション内で実行されます。すべてのタスクが成功するか、すべてが失敗する必要があります。StarRocksは、LOADステートメントでdata_descを宣言する方法に基づいて各ロードジョブを分割します:

  • 複数の異なるテーブルを指定するdata_descパラメータを複数宣言する場合、各テーブルのデータをロードするためのタスクが生成されます。
  • 同じテーブルの異なるパーティションを指定するdata_descパラメータを複数宣言する場合、各パーティションのデータをロードするためのタスクが生成されます。

また、各タスクはさらに1つ以上のインスタンスに分割され、これらのインスタンスはStarRocksクラスタの各BEに均等に分散して同時に実行されます。StarRocksは、各タスクを次のFE設定に基づいて分割します。

  • min_bytes_per_broker_scanner:各インスタンスで処理されるデータの最小量。デフォルトは64 MBです。
  • load_parallel_instance_num:個々のBE上の各ロードジョブで許可される同時インスタンスの数。デフォルトの数は1です。個々のタスクのインスタンス数は、次の数式を使用して計算できます。個々のタスクのインスタンス数 = 個々のタスクがロードするデータ量/min_bytes_per_broker_scanner、****load_parallel_instance_num x BEの数

ほとんどの場合、各ロードジョブには1つのdata_descが宣言され、1つのタスクにのみ分割され、タスクはBEの数と同じ数のインスタンスに分割されます。

トラブルシューティング

Broker Load FAQを参照してください。