メインコンテンツまでスキップ

Change data through loading

データの変更による変更

StarRocksが提供するPrimary Key tablesを使用すると、Stream LoadBroker Load、またはRoutine Load ジョブを実行してStarRocksのテーブルにデータの変更(挿入、更新、削除)を行うことができます。しかし、Primary Key tablesではSpark LoadまたはINSERTを使用してデータを変更することはサポートされていません。

StarRocksは部分的な更新と条件付きの更新もサポートしています。

このトピックでは、CSVデータを使用してStarRocksのテーブルへのデータの変更方法を説明します。選択するロード方法によってサポートされるデータファイルの形式は異なります。

注意

CSVデータの場合、50バイトを超えないUTF-8の文字列(カンマ(,)、タブ、またはパイプ(|)など)をテキスト区切り記号として使用することができます。

実装

StarRocksが提供するPrimary Key tablesはUPSERTとDELETE操作をサポートしており、INSERT操作とUPDATE操作の区別はありません。

ロードジョブを作成する際、StarRocksではジョブ作成文またはコマンドに __op というフィールドを追加することができます。__op フィールドは実行したい操作のタイプを指定するために使用されます。

注意

テーブルを作成する場合、__op というカラムを追加する必要はありません。

__op フィールドの定義方法は、選択するロード方法によって異なります:

  • Stream Loadを選択する場合は、columns パラメータを使用して __op フィールドを定義します。
  • Broker Loadを選択する場合は、SET句を使用して __op フィールドを定義します。
  • Routine Loadを選択する場合は、COLUMNS カラムを使用して __op フィールドを定義します。

データの変更内容に応じて、__op フィールドの追加の有無を決定することができます。__op フィールドを追加しない場合、オペレーションタイプはデフォルトでUPSERTになります。主なデータ変更シナリオは次のとおりです:

  • ロードするデータファイルがUPSERT操作のみを含む場合、__op フィールドを追加する必要はありません。
  • ロードするデータファイルが削除操作のみを含む場合、__op フィールドを追加し、オペレーションタイプをDELETEに指定する必要があります。
  • ロードするデータファイルにUPSERTおよびDELETE操作の両方が含まれる場合、__op フィールドを追加し、データファイルに値が 0 または 1 の列が含まれていることを確認する必要があります。0 の値はUPSERT操作を、1 の値はDELETE操作を示します。

注意事項

  • データファイルの各行の列の数が同じであることを確認してください。
  • データ変更に関与するカラムには、主キーカラムを含める必要があります。

事前条件

Broker Load

HDFSからデータをロードする または クラウドストレージからデータをロードする の「背景情報」のセクションを参照してください。

Routine load

Routine Loadを選択する場合は、Apache Kafka®クラスタでトピックが作成されていることを確認してください。以下のトピックを作成したと仮定します: topic1topic2topic3topic4

基本操作

このセクションでは、StarRocksテーブルへのデータ変更の例を提供します。詳細な構文とパラメータの説明については、STREAM LOADBROKER LOAD、およびCREATE ROUTINE LOADを参照してください。

UPSERT

ロードするデータファイルがUPSERT操作のみを含む場合、__op フィールドを追加する必要はありません。

注意

__op フィールドを追加した場合:

  • UPSERTとしてオペレーションタイプを指定することができます。
  • __op フィールドを空のままにしておくことができます。なぜなら、オペレーションタイプはデフォルトでUPSERTになるからです。

データの例

  1. データファイルを準備します。a. ローカルのファイルシステムに example1.csv という名前のCSVファイルを作成します。このファイルは、ユーザーID、ユーザー名、ユーザースコアを順に表す3つの列で構成されています。

    101,Lily,100
    102,Rose,100

    b. example1.csv のデータをKafkaクラスタの topic1 に公開します。

  2. StarRocksテーブルを準備します。a. StarRocksのデータベース test_dbtable1 というPrimary Keyテーブルを作成します。このテーブルは、idnamescore の3つの列で構成されており、id がプライマリキーです。

    CREATE TABLE `table1`
    (
    `id` int(11) NOT NULL COMMENT "user ID",
    `name` varchar(65533) NOT NULL COMMENT "user name",
    `score` int(11) NOT NULL COMMENT "user score"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`);

    注意

    v2.5.7以降、StarRocksはテーブルの作成またはパーティションの追加時にバケット数(BUCKETS)を自動で設定することができます。バケット数を手動で設定する必要はありません。詳細については、バケット数の決定を参照してください。

    b. table1 にレコードを追加します。

    INSERT INTO table1 VALUES
    (101, 'Lily',80);

データのロード

example1.csv の中の id101 のレコードを table1 に更新し、example1.csv の中の id102 のレコードを table1 に挿入するため、ロードジョブを実行します。

  • Stream Loadジョブを実行します。
    • __op フィールドを含めたくない場合、次のコマンドを実行します:

      curl --location-trusted -u <username>:<password> \
      -H "Expect:100-continue" \
      -H "label:label1" \
      -H "column_separator:," \
      -T example1.csv -XPUT \
      http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load
    • __op フィールドを含めたい場合、次のコマンドを実行します:

      curl --location-trusted -u <username>:<password> \
      -H "Expect:100-continue" \
      -H "label:label2" \
      -H "column_separator:," \
      -H "columns:__op ='upsert'" \
      -T example1.csv -XPUT \
      http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load
  • Broker Loadジョブを実行します。
    • __op フィールドを含めたくない場合、次のコマンドを実行します:

      LOAD LABEL test_db.label1
      (
      data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
      into table table1
      columns terminated by ","
      format as "csv"
      )
      with broker "broker1";
    • __op フィールドを含めたい場合、次のコマンドを実行します:

      LOAD LABEL test_db.label2
      (
      data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
      into table table1
      columns terminated by ","
      format as "csv"
      set (__op = 'upsert')
      )
      with broker "broker1";
  • Routine Loadジョブを実行します。
    • __op フィールドを含めたくない場合、次のコマンドを実行します:

      CREATE ROUTINE LOAD test_db.table1 ON table1
      COLUMNS TERMINATED BY ",",
      COLUMNS (id, name, score)
      PROPERTIES
      (
      "desired_concurrent_number" = "3",
      "max_batch_interval" = "20",
      "max_batch_rows"= "250000",
      "max_error_number" = "1000"
      )
      FROM KAFKA
      (
      "kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
      "kafka_topic" = "test1",
      "property.kafka_default_offsets" ="OFFSET_BEGINNING"
      );
    • __op フィールドを含めたい場合、次のコマンドを実行します:

      CREATE ROUTINE LOAD test_db.table1 ON table1
      COLUMNS TERMINATED BY ",",
      COLUMNS (id, name, score, __op ='upsert')
      PROPERTIES
      (
      "desired_concurrent_number" = "3",
      "max_batch_interval" = "20",
      "max_batch_rows"= "250000",
      "max_error_number" = "1000"
      )
      FROM KAFKA
      (
      "kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
      "kafka_topic" = "test1",
      "property.kafka_default_offsets" ="OFFSET_BEGINNING"
      );

データのクエリ

ロードが完了した後、データが table1 に正常に挿入されたことを確認するために、table1 のデータをクエリします。

SELECT * FROM table1;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 101 | Lily | 100 |
| 102 | Rose | 100 |
+------+------+-------+
2 rows in set (0.02 sec)

上記のクエリ結果のように、example1.csv の中の id101 のレコードが table1 に更新され、id102 のレコードが table1 に挿入されました。

DELETE

ロードするデータファイルが削除操作のみを含む場合、__op フィールドを追加し、オペレーションタイプとしてDELETEを指定する必要があります。

データの例

  1. データファイルを準備します。a. ローカルのファイルシステムに example2.csv という名前のCSVファイルを作成します。このファイルは、ユーザーID、ユーザー名、ユーザースコアを順に表す3つの列で構成されています。

    101,Jack,100

    b. example2.csv のデータをKafkaクラスタの topic2 に公開します。

  2. StarRocksテーブルを準備します。a. StarRocksのデータベース test_dbtable2 というPrimary Keyテーブルを作成します。このテーブルは、idnamescore の3つの列で構成されており、id がプライマリキーです。

    CREATE TABLE `table2`
    (
    `id` int(11) NOT NULL COMMENT "user ID",
    `name` varchar(65533) NOT NULL COMMENT "user name",
    `score` int(11) NOT NULL COMMENT "user score"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`);

    注意

    v2.5.7以降、StarRocksはテーブルの作成またはパーティションの追加時にバケット数(BUCKETS)を自動で設定することができます。バケット数を手動で設定する必要はありません。詳細については、バケット数の決定を参照してください。

    b. table2 に2つのレコードを追加します。

    INSERT INTO table2 VALUES
    (101, 'Jack', 100),
    (102, 'Bob', 90);

データのロード

example2.csv の中の id101 のレコードを table2 から削除するため、ロードジョブを実行します。

  • Stream Loadジョブを実行します。

    curl --location-trusted -u <username>:<password> \
    -H "Expect:100-continue" \
    -H "label:label3" \
    -H "column_separator:," \
    -H "columns:__op='delete'" \
    -T example2.csv -XPUT \
    http://<fe_host>:<fe_http_port>/api/test_db/table2/_stream_load
  • Broker Loadジョブを実行します。

    LOAD LABEL test_db.label3
    (
    data infile("hdfs://<hdfs_host>:<hdfs_port>/example2.csv")
    into table table2
    columns terminated by ","
    format as "csv"
    set (__op = 'delete')
    )
    with broker "broker1";
  • Routine Loadジョブを実行します。

    CREATE ROUTINE LOAD test_db.table2 ON table2
    COLUMNS(id, name, score, __op = 'delete')
    PROPERTIES
    (
    "desired_concurrent_number" = "3",
    "max_batch_interval" = "20",
    "max_batch_rows"= "250000",
    "max_error_number" = "1000"
    )
    FROM KAFKA
    (
    "kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
    "kafka_topic" = "test2",
    "property.kafka_default_offsets" ="OFFSET_BEGINNING"
    );

データのクエリ

ロードが完了した後、table2 のデータをクエリして、ロードが成功したかどうかを確認します:

SELECT * FROM table2;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 102 | Bob | 90 |
+------+------+-------+
1 row in set (0.00 sec)

上記のクエリ結果のように、example2.csv の中の id101 のレコードが table2 から削除されました。

UPSERTとDELETE

ロードするデータファイルがUPSERTとDELETEの両方の操作を含む場合、__op フィールドを追加し、データファイルに値が 0 または 1 の列が含まれていることを確認する必要があります。0 の値はUPSERT操作を、1 の値はDELETE操作を示します。

データの例

  1. データファイルを準備します。a. ローカルのファイルシステムに example3.csv という名前のCSVファイルを作成します。このファイルは、ユーザーID、ユーザー名、ユーザースコア、操作タイプを順に表す4つの列で構成されています。

    101,Tom,100,1
    102,Sam,70,0
    103,Stan,80,0

    b. example3.csv のデータをKafkaクラスタの topic3 に公開します。

  2. StarRocksテーブルを準備します。a. StarRocksのデータベース test_dbtable3 というPrimary Keyテーブルを作成します。このテーブルは、idnamescore の3つの列で構成されており、id がプライマリキーです。

    CREATE TABLE `table3`
    (
    `id` int(11) NOT NULL COMMENT "user ID",
    `name` varchar(65533) NOT NULL COMMENT "user name",
    `score` int(11) NOT NULL COMMENT "user score"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`);

    注意

    v2.5.7以降、StarRocksはテーブルの作成またはパーティションの追加時にバケット数(BUCKETS)を自動で設定することができます。バケット数を手動で設定する必要はありません。詳細については、バケット数の決定を参照してください。

    b. table3 に2つのレコードを追加します。

    INSERT INTO table3 VALUES
    (101, 'Tom', 100),
    (102, 'Sam', 90);

データのロード

example3.csv の中の id101 のレコードを table3 から削除し、example3.csv の中の id102 のレコードを table3 に更新し、example3.csv の中の id103 のレコードを table3 に挿入するため、ロードジョブを実行します。

  • Stream Loadジョブを実行します:

    curl --location-trusted -u <username>:<password> \
    -H "Expect:100-continue" \
    -H "label:label4" \
    -H "column_separator:," \
    -H "columns: id, name, score, temp = __op" \
    -T example3.csv -XPUT \
    http://<fe_host>:<fe_http_port>/api/test_db/table3/_stream_load

    注意

    上記の例では、example3.csv の中の操作タイプを一時的に temp という名前で指定し、columns パラメータを使用して __op フィールドを temp 列にマッピングします。これにより、example3.csv の中の各レコードの4番目の列の値が 0 または 1 であるかどうかで、StarRocksがUPSERT操作またはDELETE操作を実行するかどうかを判断することができます。

  • Broker Loadジョブを実行します:

    LOAD LABEL test_db.label4
    (
    data infile("hdfs://<hdfs_host>:<hdfs_port>/example1.csv")
    into table table1
    columns terminated by ","
    format as "csv"
    (id, name, score, temp)
    set (__op = temp)
    )
    with broker "broker1";
  • Routine Loadジョブを実行します:

    CREATE ROUTINE LOAD test_db.table3 ON table3
    COLUMNS(id, name, score, temp, __op = temp)
    PROPERTIES
    (
    "desired_concurrent_number" = "3",
    "max_batch_interval" = "20",
    "max_batch_rows"= "250000",
    "max_error_number" = "1000"
    )
    FROM KAFKA
    (
    "kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
    "kafka_topic" = "test3",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    );

データのクエリ

ロードが完了した後、データのクエリを実行して、ロードが成功したかどうかを確認します。

SELECT * FROM table3;
+------+------+-------+
| id | name | score |
+------+------+-------+
| 102 | Sam | 70 |
| 103 | Stan | 80 |
+------+------+-------+
2 rows in set (0.01 sec)

上記のクエリ結果のように、example3.csv の中の id101 のレコードが table3 から削除され、id102 のレコードが table3 に更新され、id103 のレコードが table3 に挿入されました。

部分的な更新

v2.2以降、StarRocksはPrimary Keyテーブルの指定された列のみを更新することをサポートしています。このセクションでは、部分的な更新を実行する方法をCSVを例に説明します。

データの例

  1. データファイルを準備します。a. ローカルのファイルシステムに example4.csv という名前のCSVファイルを作成します。このファイルは、ユーザーIDとユーザー名の2つの列で構成されています。

    101,Lily
    102,Rose
    103,Alice

    b. example4.csv のデータをKafkaクラスタの topic4 に公開します。

  2. StarRocksテーブルを準備します。a. StarRocksのデータベース test_dbtable4 というPrimary Keyテーブルを作成します。このテーブルは、idnamescore の3つの列で構成されており、id がプライマリキーです。

    CREATE TABLE `table4`
    (
    `id` int(11) NOT NULL COMMENT "user ID",
    `name` varchar(65533) NOT NULL COMMENT "user name",
    `score` int(11) NOT NULL COMMENT "user score"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    DISTRIBUTED BY HASH(`id`);

    注意

    v2.5.7以降、StarRocksはテーブルの作成またはパーティションの追加時にバケット数(BUCKETS)を自動で設定することができます。バケット数を手動で設定する必要はありません。詳細については、バケット数の決定を参照してください。

    b. レコードを table4 に挿入します。

    INSERT INTO table4 VALUES
    (101, 'Tom',80);

データのロード

example4.csv の中の2つの列のデータを table4id 列と name 列に更新するため、ロードジョブを実行します。

  • Stream Loadジョブを実行します。

    curl --location-trusted -u <username>:<password> \
    -H "Expect:100-continue" \
    -H "label:label7" -H "column_separator:," \
    -H "partial_update:true" \
    -H "columns:id,name" \
    -T example4.csv -XPUT \
    http://<fe_host>:<fe_http_port>/api/test_db/table4/_stream_load

    注意

    Stream Loadを選択した場合、部分的な更新機能を有効にするために partial_update パラメータを true に設定する必要があります。さらに、更新する列を columns パラメータで指定する必要があります。

  • Broker Loadジョブを実行します。

    LOAD LABEL test_db.table4
    (
    data infile("hdfs://<hdfs_host>:<hdfs_port>/example4.csv")
    into table table4
    format as "csv"
    (id, name)
    )
    with broker "broker1"
    properties
    (
    "partial_update" = "true"
    );

    注意

    Broker Loadを選択した場合、部分的な更新機能を有効にするために partial_update パラメータを true に設定する必要があります。さらに、更新する列を column_list パラメータで指定する必要があります。

  • Routine Loadジョブを実行します。

    CREATE ROUTINE LOAD test_db.table4 on table4
    COLUMNS (id, name),
    COLUMNS TERMINATED BY ','
    PROPERTIES
    (
    "partial_update" = "true"
    )
    FROM KAFKA
    (
    "kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
    "kafka_topic" = "test4",
    "property.kafka_default_offsets" ="OFFSET_BEGINNING"
    );

    注意

    Broker Loadを選択した場合、部分的な更新機能を有効にするために partial_update パラメータを true に設定する必要があります。さらに、更新する列を COLUMNS パラメータで指定する必要があります。

データのクエリ

ロードが完了した後、table4 のデータをクエリして、ロードが成功したかどうかを確認します。

SELECT * FROM table4;
+-----+-------+-------+
| id | name | score |
+-----+-------+-------+
| 101 | Lily | 80 |
| 102 | Rose | 0 |
| 103 | Alice | 0 |
+-----+-------+-------+
3 rows in set (0.01 sec)

上記のクエリ結果のように、example4.csv の中の id101 のレコードが table4 に更新され、id102103 のレコードが table4 に挿入されました。

条件付きの更新

StarRocks v2.5以降、Primary Keyテーブルは条件付きの更新をサポートしています。指定した非プライマリキーカラムを条件として指定し、ソースレコードからデスティネーションレコードへの更新を実行した場合、指定したカラムでソースデータレコードの値がデスティネーションデータレコードの値以上である場合のみ、更新が実行されます。

条件付きの更新は、データの乱れを解決するために設計されています。ソースデータが乱れている場合、この機能を使用して新しいデータが古いデータに上書きされないようにすることができます。

注意

  • 同一のデータバッチの異なるカラムを更新条件として指定することはできません。
  • DELETE操作は条件付きの更新をサポートしていません。
  • 部分的な更新と条件付きの更新は同時に使用することはできません。
  • 条件付きの更新はStream LoadとRoutine Loadのみをサポートしています。

データの例

  1. データファイルを準備します。a. ローカルのファイルシステムに example5.csv という名前のCSVファイルを作成します。このファイルは、ユーザーID、バージョン、ユーザースコアを順に表す3つの列で構成されています。

    101,1,100
    102,3,100

    b. example5.csv のデータをKafkaクラスタの topic5 に公開します。

  2. StarRocksテーブルを準備します。a. StarRocksのデータベース test_dbtable5 というPrimary Keyテーブルを作成します。このテーブルは、idversionscore の3つの列で構成されており、id がプライマリキーです。

    CREATE TABLE `table5`
    (
    `id` int(11) NOT NULL COMMENT "user ID",
    `version` int NOT NULL COMMENT "version",
    `score` int(11) NOT NULL COMMENT "user score"
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`) DISTRIBUTED BY HASH(`id`);

    注意

    v2.5.7以降、StarRocksはテーブルの作成またはパーティションの追加時にバケット数(BUCKETS)を自動で設定することができます。バケット数を手動で設定する必要はありません。詳細については、バケット数の決定を参照してください。

    b. レコードを table5 に挿入します。

    INSERT INTO table5 VALUES
    (101, 2, 80),
    (102, 2, 90);

データのロード

example5.csv の中の id101102 のレコードを table5 に更新します。また、更新を実行するために、それぞれのレコード内の version カラムの値が現在の version の値以上であることを指定します。

  • Stream Loadジョブを実行します。

    curl --location-trusted -u <username>:<password> \
    -H "Expect:100-continue" \
    -H "label:label10" \
    -H "column_separator:," \
    -H "merge_condition:version" \
    -T example5.csv -XPUT \
    http://<fe_host>:<fe_http_port>/api/test_db/table5/_stream_load
  • Routine Loadジョブを実行します。

    CREATE ROUTINE LOAD test_db.table5 on table5
    COLUMNS (id, version, score),
    COLUMNS TERMINATED BY ','
    PROPERTIES
    (
    "merge_condition" = "version"
    )
    FROM KAFKA
    (
    "kafka_broker_list" ="<kafka_broker_host>:<kafka_broker_port>",
    "kafka_topic" = "topic5",
    "property.kafka_default_offsets" ="OFFSET_BEGINNING"
    );

データのクエリ

ロードが完了した後、table5 のデータをクエリして、ロードが成功したかどうかを確認します。

SELECT * FROM table5;
+------+------+-------+
| id | version | score |
+------+------+-------+
| 101 | 2 | 80 |
| 102 | 3 | 100 |
+------+------+-------+
2 rows in set (0.02 sec)

上記のクエリ結果のように、example5.csv の中の id101 のレコードは table5 に更新されず、id102 のレコードは table5 に挿入されました。