メインコンテンツまでスキップ

Read data from StarRocks using Flink connector

StarRocksのFlinkコネクタを使用してデータを読み取る

StarRocksは、Apache Flink®(以下、Flinkコネクタ)を使用してStarRocksクラスタから大量のデータをバルクで読み取るための自社開発のコネクタを提供しています。

Flinkコネクタは、Flink SQLとFlink DataStreamの2つの読み取り方法をサポートしていますが、Flink SQLが推奨されます。

注意

Flinkコネクタは、FlinkがStarRocksクラスタからのデータを別のStarRocksクラスタやストレージシステムに書き込むこともサポートしています。詳細については、「Apache Flink®から連続的にデータを読み込む」を参照してください。

背景情報

Flinkの提供するJDBCコネクタとは異なり、StarRocksのFlinkコネクタは、FlinkがStarRocksクラスタの複数のBE(バックエンド)から並列にデータを読み取ることをサポートしており、読み取りタスクの高速化に大きく貢献しています。以下の比較は、2つのコネクタの実装の違いを示しています。

  • StarRocksのFlinkコネクタStarRocksのFlinkコネクタでは、Flinkはまず対応するFE(フロントエンド)からクエリプランを取得し、そのクエリプランをすべての関連するBEにパラメータとして送信し、最終的にBEから返されるデータを取得します。

    - StarRocksのFlinkコネクタ

  • FlinkのJDBCコネクタFlinkのJDBCコネクタでは、Flinkは個々のFEから順番にデータを読み取ることしかできません。データの読み取りは遅くなります。

    FlinkのJDBCコネクタ

バージョン要件

コネクタ

Flink

StarRocks

Java

Scala

1.2.8

1.13,1.14,1.15,1.16,1.17

2.1以降

8

2.11,2.12

1.2.7

1.11,1.12,1.13,1.14,1.15

2.1以降

8

2.11,2.12

前提条件

Flinkがデプロイされていることが前提です。デプロイされていない場合は、以下の手順に従ってデプロイしてください。

  1. 操作システムにJava 8またはJava 11をインストールして、Flinkが正常に実行できることを確認します。次のコマンドを使用してJavaのバージョンを確認できます。

    java -version

    たとえば、次の情報が返された場合、Java 8がインストールされていることがわかります。

    openjdk version "1.8.0_322"
    OpenJDK Runtime Environment (Temurin)(build 1.8.0_322-b06)
    OpenJDK 64-Bit Server VM (Temurin)(build 25.322-b06, mixed mode)
  2. Flinkパッケージをダウンロードして解凍します。

    注意

    Flink v1.14以降を使用することをお勧めします。サポートされる最小のFlinkバージョンはv1.11です。

    # Flinkパッケージをダウンロードします。
    wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
    # Flinkパッケージを解凍します。
    tar -xzf flink-1.14.5-bin-scala_2.11.tgz
    # Flinkディレクトリに移動します。
    cd flink-1.14.5
  3. Flinkクラスタを起動します。

    # Flinkクラスタを起動します。
    ./bin/start-cluster.sh

    # 以下の情報が表示されたら、Flinkクラスタが正常に起動されています。
    Starting cluster.
    Starting standalonesession daemon on host.
    Starting taskexecutor daemon on host.

またはFlinkのドキュメントに従ってFlinkをデプロイすることもできます。

開始する前に

Flinkコネクタをデプロイするために、以下の手順を実行してください。

  1. 使用しているFlinkバージョンに合致するflink-connector-starrocks JARパッケージを選択してダウンロードします。

    注意

    使用しているFlinkバージョンの先頭2桁と一致する、バージョンが1.2.xまたはそれ以降のFlinkコネクタパッケージをダウンロードすることをお勧めします。たとえば、Flink v1.14.xを使用している場合は、flink-connector-starrocks-1.2.4_flink-1.14_x.yy.jarをダウンロードできます。

  2. コードのデバッグが必要な場合は、Flinkコネクタパッケージをビジネス要件に合わせてコンパイルします。

  3. ダウンロードまたはコンパイルしたFlinkコネクタパッケージをFlinkのlibディレクトリに配置します。

  4. Flinkクラスタを再起動します。

パラメータ

共通パラメータ

以下のパラメータは、Flink SQLおよびFlink DataStreamの両方の読み取り方法に適用されます。

パラメータ

必須

データ型

説明

connector

Yes

STRING

データの読み取りに使用するコネクタのタイプを指定します。値をstarrocksに設定します。

scan-url

Yes

STRING

WebサーバからFEに接続するためのアドレスです。フォーマット:<fe_host>:<fe_http_port>。デフォルトのポートは8030です。複数のアドレスを指定することもできますが、カンマ(,)で区切る必要があります。例:192.168.xxx.xxx:8030,192.168.xxx.xxx:8030

jdbc-url

Yes

STRING

FEのMySQLクライアントに接続するためのアドレスです。フォーマット:jdbc:mysql://<fe_host>:<fe_query_port>。デフォルトのポート番号は9030です。

username

Yes

STRING

StarRocksクラスタのアカウントのユーザ名です。このアカウントにはStarRocksテーブルへの読み取り権限が必要です。 ユーザ権限 を参照してください。

password

Yes

STRING

StarRocksクラスタのアカウントのパスワードです。

database-name

Yes

STRING

読み取り対象のStarRocksテーブルが所属するStarRocksデータベースの名前です。

table-name

Yes

STRING

読み取り対象のStarRocksテーブルの名前です。

scan.connect.timeout-ms

No

STRING

FlinkコネクタからStarRocksクラスタへの接続がタイムアウトするまでの最大時間。単位:ミリ秒。デフォルト値:1000。接続の確立にかかる時間がこの制限を超えると、読み取りタスクは失敗します。

scan.params.keep-alive-min

No

STRING

読み取りタスクがアクティブなまま維持されることのできる最大時間。維持時間は定期的なポーリングメカニズムによって確認されます。単位:分。デフォルト値:10。5以上の値を設定することをお勧めします。

scan.params.query-timeout-s

No

STRING

読み取りタスクがタイムアウトするまでの最大時間。タイムアウトの期間はタスク実行中に確認されます。単位:秒。デフォルト値:600。指定された時間間隔が経過しても読み取り結果が返されない場合、読み取りタスクは停止します。

scan.params.mem-limit-byte

No

STRING

各BEごとのクエリごとに許可される最大メモリ量。単位:バイト。デフォルト値:1073741824(1GBと等しい)。

scan.max-retries

No

STRING

読み取りタスクが失敗した場合に再試行できる最大回数。デフォルト値:1。読み取りタスクが再試行される回数がこの上限を超えると、読み取りタスクはエラーとなります。

以下のパラメータは、Flink DataStreamの読み取り方法にのみ適用されます。

パラメータ

必須

データ型

説明

scan.columns

No

STRING

読み取りたいカラムを指定します。複数のカラムを指定することができますが、カンマ(,)で区切る必要があります。

scan.filter

No

STRING

データをフィルタリングするためのフィルタ条件です。

たとえば、Flinkでc1c2c3の3つのカラムからなるテーブルを作成し、そのテーブルのc1カラムの値が100と等しい行を読み取りたい場合は、フィルタ条件として "scan.columns"、"c1" および "scan.filter"、"c1 = 100" を指定します。

StarRocksとFlinkのデータ型のマッピング

以下のデータ型マッピングは、FlinkがStarRocksからデータを読み取る場合にのみ適用されます。FlinkからStarRocksにデータを書き込む場合のデータ型マッピングについては、「Apache Flink®から連続的にデータを読み込む」を参照してください。

StarRocksのデータ型

Flinkのデータ型

NULL

NULL

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

LARGEINT

STRING

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

DATE

DATETIME

TIMESTAMP

DECIMAL

DECIMAL

DECIMALV2

DECIMAL

DECIMAL32

DECIMAL

DECIMAL64

DECIMAL

DECIMAL128

DECIMAL

CHAR

CHAR

VARCHAR

STRING

以下の例では、StarRocksクラスタにtestという名前のデータベースが作成されており、rootユーザーには権限が与えられていることを前提としています。

注意

読み取りタスクが失敗した場合、再作成する必要があります。

データの例

  1. testデータベースにscore_boardという名前のテーブルを作成します。

    MySQL [test]> CREATE TABLE `score_board`
    (
    `id` int(11) NOT NULL COMMENT "",
    `name` varchar(65533) NULL DEFAULT "" COMMENT "",
    `score` int(11) NOT NULL DEFAULT "0" COMMENT ""
    )
    ENGINE=OLAP
    PRIMARY KEY(`id`)
    COMMENT "OLAP"
    DISTRIBUTED BY HASH(`id`)
    PROPERTIES
    (
    "replication_num" = "3"
    );
  2. score_boardテーブルにデータを挿入します。

    MySQL [test]> INSERT INTO score_board
    VALUES
    (1, 'Bob', 21),
    (2, 'Stan', 21),
    (3, 'Sam', 22),
    (4, 'Tony', 22),
    (5, 'Alice', 22),
    (6, 'Lucy', 23),
    (7, 'Polly', 23),
    (8, 'Tom', 23),
    (9, 'Rose', 24),
    (10, 'Jerry', 24),
    (11, 'Jason', 24),
    (12, 'Lily', 25),
    (13, 'Stephen', 25),
    (14, 'David', 25),
    (15, 'Eddie', 26),
    (16, 'Kate', 27),
    (17, 'Cathy', 27),
    (18, 'Judy', 27),
    (19, 'Julia', 28),
    (20, 'Robert', 28),
    (21, 'Jack', 29);
  3. score_boardテーブルをクエリします。

    MySQL [test]> SELECT * FROM score_board;
    +------+---------+-------+
    | id | name | score |
    +------+---------+-------+
    | 1 | Bob | 21 |
    | 2 | Stan | 21 |
    | 3 | Sam | 22 |
    | 4 | Tony | 22 |
    | 5 | Alice | 22 |
    | 6 | Lucy | 23 |
    | 7 | Polly | 23 |
    | 8 | Tom | 23 |
    | 9 | Rose | 24 |
    | 10 | Jerry | 24 |
    | 11 | Jason | 24 |
    | 12 | Lily | 25 |
    | 13 | Stephen | 25 |
    | 14 | David | 25 |
    | 15 | Eddie | 26 |
    | 16 | Kate | 27 |
    | 17 | Cathy | 27 |
    | 18 | Judy | 27 |
    | 19 | Julia | 28 |
    | 20 | Robert | 28 |
    | 21 | Jack | 29 |
    +------+---------+-------+
    21 rows in set (0.00 sec)
  1. Flinkクラスタで、StarRocksテーブル(この例ではscore_board)のスキーマに基づいてflink_testという名前のテーブルを作成します。テーブル作成コマンドでは、Flinkコネクタ、ソースのStarRocksデータベース、ソースのStarRocksテーブルに関する情報など、読み取りタスクのプロパティを構成する必要があります。

    CREATE TABLE flink_test
    (
    `id` INT,
    `name` STRING,
    `score` INT
    )
    WITH
    (
    'connector'='starrocks',
    'scan-url'='192.168.xxx.xxx:8030',
    'jdbc-url'='jdbc:mysql://192.168.xxx.xxx:9030',
    'username'='xxxxxx',
    'password'='xxxxxx',
    'database-name'='test',
    'table-name'='score_board'
    );
  2. SELECT文を使用してStarRocksからデータを読み取ります。

    SELECT id, name FROM flink_test WHERE score > 20;

Flink SQLを使用してデータを読み取る際は、以下の点に留意してください。

  • StarRocksからデータを読み取るには、SELECT ... FROM <テーブル名> WHERE ...のようなSQL文のみを使用できます。集約関数の中では、count関数のみがサポートされています。
  • プリディケートプッシュダウンがサポートされています。たとえば、クエリにchar_1 <> 'A' and int_1 = -126というフィルタ条件が含まれている場合、フィルタ条件はFlinkコネクタにプッシュダウンされ、クエリが実行される前にStarRocksで実行可能なステートメントに変換されます。追加の設定は必要ありません。
  • LIMIT文はサポートされていません。
  • StarRocksはチェックポイントメカニズムをサポートしていません。そのため、読み取りタスクが失敗した場合、データの整合性は保証されません。
  1. pom.xmlファイルに以下の依存関係を追加します。
    <dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <!-- for Apache Flink® 1.15 -->
    <version>x.x.x_flink-1.15</version>
    <!-- for Apache Flink® 1.14 -->
    <version>x.x.x_flink-1.14_2.11</version>
    <version>x.x.x_flink-1.14_2.12</version>
    <!-- for Apache Flink® 1.13 -->
    <version>x.x.x_flink-1.13_2.11</version>
    <version>x.x.x_flink-1.13_2.12</version>
    <!-- for Apache Flink® 1.12 -->
    <version>x.x.x_flink-1.12_2.11</version>
    <version>x.x.x_flink-1.12_2.12</version>
    <!-- for Apache Flink® 1.11 -->
    <version>x.x.x_flink-1.11_2.11</version>
    <version>x.x.x_flink-1.11_2.12</version>
    </dependency>
    上記のコード例では、x.x.xの部分を使用している最新のFlinkコネクタのバージョンに置き換える必要があります。バージョン情報を参照してください。
  2. Flinkコネクタを呼び出してStarRocksからデータを読み取ります。
    import com.starrocks.connector.flink.StarRocksSource;
    import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.TableSchema;

    public class StarRocksSourceApp {
    public static void main(String[] args) throws Exception {
    StarRocksSourceOptions options = StarRocksSourceOptions.builder()
    .withProperty("scan-url", "192.168.xxx.xxx:8030")
    .withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030")
    .withProperty("username", "root")
    .withProperty("password", "")
    .withProperty("table-name", "score_board")
    .withProperty("database-name", "test")
    .build();
    TableSchema tableSchema = TableSchema.builder()
    .field("id", DataTypes.INT())
    .field("name", DataTypes.STRING())
    .field("score", DataTypes.INT())
    .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print();
    env.execute("StarRocks flink source");
    }

    }

次の手順

FlinkがStarRocksからデータを正常に読み取った後、Flink WebUIを使用して読み取りタスクを監視できます。たとえば、WebUIのMetricsページでtotalScannedRowsメトリックを表示して、正常に読み取られた行数を取得することができます。また、読み取ったデータを使用してJOINなどの計算を実行するために、Flink SQLを使用することもできます。