Read data from StarRocks using Flink connector
StarRocksのFlinkコネクタを使用してデータを読み取る
StarRocksは、Apache Flink®(以下、Flinkコネクタ)を使用してStarRocksクラスタから大量のデータをバルクで読み取るための自社開発のコネクタを提供しています。
Flinkコネクタは、Flink SQLとFlink DataStreamの2つの読み取り方法をサポートしていますが、Flink SQLが推奨されます。
注意
Flinkコネクタは、FlinkがStarRocksクラスタからのデータを別のStarRocksクラスタやストレージシステムに書き込むこともサポートしています。詳細については、「Apache Flink®から連続的にデータを読み込む」を参照してください。
背景情報
Flinkの提供するJDBCコネクタとは異なり、StarRocksのFlinkコネクタは、FlinkがStarRocksクラスタの複数のBE(バックエンド)から並列にデータを読み取ることをサポートしており、読み取りタスクの高速化に大きく貢献しています。以下の比較は、2つのコネクタの実装の違いを示しています。
-
StarRocksのFlinkコネクタStarRocksのFlinkコネクタでは、Flinkはまず対応するFE(フロントエンド)からクエリプランを取得し、そのクエリプランをすべての関連するBEにパラメータとして送信し、最終的にBEから返されるデータを取得します。
-
FlinkのJDBCコネクタFlinkのJDBCコネクタでは、Flinkは個々のFEから順番にデータを読み取ることしかできません。データの読み取りは遅くなります。
バージョン要件
コネクタ | Flink | StarRocks | Java | Scala |
---|---|---|---|---|
1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1以降 | 8 | 2.11,2.12 |
1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1以降 | 8 | 2.11,2.12 |
前提条件
Flinkがデプロイされていることが前提です。デプロイされていない場合は、以下の手順に従ってデプロイしてください。
-
操作システムにJava 8またはJava 11をインストールして、Flinkが正常に実行できることを確認します。次のコマンドを使用してJavaのバージョンを確認できます。
java -version
たとえば、次の情報が返された場合、Java 8がインストールされていることがわかります。
openjdk version "1.8.0_322"
OpenJDK Runtime Environment (Temurin)(build 1.8.0_322-b06)
OpenJDK 64-Bit Server VM (Temurin)(build 25.322-b06, mixed mode) -
Flinkパッケージをダウンロードして解凍します。
注意
Flink v1.14以降を使用することをお勧めします。サポートされる最小のFlinkバージョンはv1.11です。
# Flinkパッケージをダウンロードします。
wget https://dlcdn.apache.org/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
# Flinkパッケージを解凍します。
tar -xzf flink-1.14.5-bin-scala_2.11.tgz
# Flinkディレクトリに移動します。
cd flink-1.14.5 -
Flinkクラスタを起動します。
# Flinkクラスタを起動します。
./bin/start-cluster.sh
# 以下の情報が表示されたら、Flinkクラスタが正常に起動されています。
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.
またはFlinkのドキュメントに従ってFlinkをデプロイすることもできます。
開始する前に
Flinkコネクタをデプロイするために、以下の手順を実行してください。
-
使用しているFlinkバージョンに合致するflink-connector-starrocks JARパッケージを選択してダウンロードします。
注意
使用しているFlinkバージョンの先頭2桁と一致する、バージョンが1.2.xまたはそれ以降のFlinkコネクタパッケージをダウンロードすることをお勧めします。たとえば、Flink v1.14.xを使用している場合は、
flink-connector-starrocks-1.2.4_flink-1.14_x.yy.jar
をダウンロードできます。 -
コードのデバッグが必要な場合は、Flinkコネクタパッケージをビジネス要件に合わせてコンパイルします。
-
ダウンロードまたはコンパイルしたFlinkコネクタパッケージをFlinkの
lib
ディレクトリに配置します。 -
Flinkクラスタを再起動します。
パラメータ
共通パラメータ
以下のパラメータは、Flink SQLおよびFlink DataStreamの両方の読み取り方法に適用されます。
パラメータ | 必須 | データ型 | 説明 |
---|---|---|---|
connector | Yes | STRING | データの読み取りに使用するコネクタのタイプを指定します。値を |
scan-url | Yes | STRING | WebサーバからFEに接続するためのアドレスです。フォーマット: |
jdbc-url | Yes | STRING | FEのMySQLクライアントに接続するためのアドレスです。フォーマット: |
username | Yes | STRING | StarRocksクラスタのアカウントのユーザ名です。このアカウントにはStarRocksテーブルへの読み取り権限が必要です。 ユーザ権限 を参照してください。 |
password | Yes | STRING | StarRocksクラスタのアカウントのパスワードです。 |
database-name | Yes | STRING | 読み取り対象のStarRocksテーブルが所属するStarRocksデータベースの名前です。 |
table-name | Yes | STRING | 読み取り対象のStarRocksテーブルの名前です。 |
scan.connect.timeout-ms | No | STRING | FlinkコネクタからStarRocksクラスタへの接続がタイムアウトするまでの最大時間。単位:ミリ秒。デフォルト値: |
scan.params.keep-alive-min | No | STRING | 読み取りタスクがアクティブなまま維持されることのできる最大時間。維持時間は定期的なポーリングメカニズムによって確認されます。単位:分。デフォルト値: |
scan.params.query-timeout-s | No | STRING | 読み取りタスクがタイムアウトするまでの最大時間。タイムアウトの期間はタスク実行中に確認されます。単位:秒。デフォルト値: |
scan.params.mem-limit-byte | No | STRING | 各BEごとのクエリごとに許可される最大メモリ量。単位:バイト。デフォルト値: |
scan.max-retries | No | STRING | 読み取りタスクが失敗した場合に再試行できる最大回数。デフォルト値: |
Flink DataStream用パラメータ
以下のパラメータは、Flink DataStreamの読み取り方法にのみ適用されます。
パラメータ | 必須 | データ型 | 説明 |
---|---|---|---|
scan.columns | No | STRING | 読み取りたいカラムを指定します。複数のカラムを指定することができますが、カンマ(,)で区切る必要があります。 |
scan.filter | No | STRING | データをフィルタリングするためのフィルタ条件です。 |
たとえば、Flinkでc1
、c2
、c3
の3つのカラムからなるテーブルを作成し、そのテーブルのc1
カラムの値が100
と等しい行を読み取りたい場合は、フィルタ条件として "scan.columns"、"c1"
および "scan.filter"、"c1 = 100"
を指定します。
StarRocksとFlinkのデータ型のマッピング
以下のデータ型マッピングは、FlinkがStarRocksからデータを読み取る場合にのみ適用されます。FlinkからStarRocksにデータを書き込む場合のデータ型マッピングについては、「Apache Flink®から連続的にデータを読み込む」を参照してください。
StarRocksのデータ型 | Flinkのデータ型 |
---|---|
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
LARGEINT | STRING |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR | CHAR |
VARCHAR | STRING |
例
以下の例では、StarRocksクラスタにtest
という名前のデータベースが作成されており、root
ユーザーには権限が与えられていることを前提としています。
注意
読み取りタスクが失敗した場合、再作成する必要があります。
データの例
-
test
データベースにscore_board
という名前のテーブルを作成します。MySQL [test]> CREATE TABLE `score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`)
PROPERTIES
(
"replication_num" = "3"
); -
score_board
テーブルにデータを挿入します。MySQL [test]> INSERT INTO score_board
VALUES
(1, 'Bob', 21),
(2, 'Stan', 21),
(3, 'Sam', 22),
(4, 'Tony', 22),
(5, 'Alice', 22),
(6, 'Lucy', 23),
(7, 'Polly', 23),
(8, 'Tom', 23),
(9, 'Rose', 24),
(10, 'Jerry', 24),
(11, 'Jason', 24),
(12, 'Lily', 25),
(13, 'Stephen', 25),
(14, 'David', 25),
(15, 'Eddie', 26),
(16, 'Kate', 27),
(17, 'Cathy', 27),
(18, 'Judy', 27),
(19, 'Julia', 28),
(20, 'Robert', 28),
(21, 'Jack', 29); -
score_board
テーブルをクエリします。MySQL [test]> SELECT * FROM score_board;
+------+---------+-------+
| id | name | score |
+------+---------+-------+
| 1 | Bob | 21 |
| 2 | Stan | 21 |
| 3 | Sam | 22 |
| 4 | Tony | 22 |
| 5 | Alice | 22 |
| 6 | Lucy | 23 |
| 7 | Polly | 23 |
| 8 | Tom | 23 |
| 9 | Rose | 24 |
| 10 | Jerry | 24 |
| 11 | Jason | 24 |
| 12 | Lily | 25 |
| 13 | Stephen | 25 |
| 14 | David | 25 |
| 15 | Eddie | 26 |
| 16 | Kate | 27 |
| 17 | Cathy | 27 |
| 18 | Judy | 27 |
| 19 | Julia | 28 |
| 20 | Robert | 28 |
| 21 | Jack | 29 |
+------+---------+-------+
21 rows in set (0.00 sec)
Flink SQLを使用してデータを読み取る
-
Flinkクラスタで、StarRocksテーブル(この例では
score_board
)のスキーマに基づいてflink_test
という名前のテーブルを作成します。テーブル作成コマンドでは、Flinkコネクタ、ソースのStarRocksデータベース、ソースのStarRocksテーブルに関する情報など、読み取りタスクのプロパティを構成する必要があります。CREATE TABLE flink_test
(
`id` INT,
`name` STRING,
`score` INT
)
WITH
(
'connector'='starrocks',
'scan-url'='192.168.xxx.xxx:8030',
'jdbc-url'='jdbc:mysql://192.168.xxx.xxx:9030',
'username'='xxxxxx',
'password'='xxxxxx',
'database-name'='test',
'table-name'='score_board'
); -
SELECT文を使用してStarRocksからデータを読み取ります。
SELECT id, name FROM flink_test WHERE score > 20;
Flink SQLを使用してデータを読み取る際は、以下の点に留意してください。
- StarRocksからデータを読み取るには、
SELECT ... FROM <テーブル名> WHERE ...
のようなSQL文のみを使用できます。集約関数の中では、count
関数のみがサポートされています。 - プリディケートプッシュダウンがサポートされています。たとえば、クエリに
char_1 <> 'A' and int_1 = -126
というフィルタ条件が含まれている場合、フィルタ条件はFlinkコネクタにプッシュダウンされ、クエリが実行される前にStarRocksで実行可能なステートメントに変換されます。追加の設定は必要ありません。 - LIMIT文はサポートされていません。
- StarRocksはチェックポイントメカニズムをサポートしていません。そのため、読み取りタスクが失敗した場合、データの整合性は保証されません。
Flink DataStreamを使用してデータを読み取る
pom.xml
ファイルに以下の依存関係を追加 します。上記のコード例では、<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<!-- for Apache Flink® 1.15 -->
<version>x.x.x_flink-1.15</version>
<!-- for Apache Flink® 1.14 -->
<version>x.x.x_flink-1.14_2.11</version>
<version>x.x.x_flink-1.14_2.12</version>
<!-- for Apache Flink® 1.13 -->
<version>x.x.x_flink-1.13_2.11</version>
<version>x.x.x_flink-1.13_2.12</version>
<!-- for Apache Flink® 1.12 -->
<version>x.x.x_flink-1.12_2.11</version>
<version>x.x.x_flink-1.12_2.12</version>
<!-- for Apache Flink® 1.11 -->
<version>x.x.x_flink-1.11_2.11</version>
<version>x.x.x_flink-1.11_2.12</version>
</dependency>x.x.x
の部分を使用している最新のFlinkコネクタのバージョンに置き換える必要があります。バージョン情報を参照して ください。- Flinkコネクタを呼び出してStarRocksからデータを読み取ります。
import com.starrocks.connector.flink.StarRocksSource;
import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
public class StarRocksSourceApp {
public static void main(String[] args) throws Exception {
StarRocksSourceOptions options = StarRocksSourceOptions.builder()
.withProperty("scan-url", "192.168.xxx.xxx:8030")
.withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030")
.withProperty("username", "root")
.withProperty("password", "")
.withProperty("table-name", "score_board")
.withProperty("database-name", "test")
.build();
TableSchema tableSchema = TableSchema.builder()
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
.field("score", DataTypes.INT())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print();
env.execute("StarRocks flink source");
}
}
次の手順
FlinkがStarRocksからデータを正常に読み取った後、Flink WebUIを使用して読み取りタスクを監視できます。たとえば、WebUIのMetricsページでtotalScannedRows
メトリックを表示して、正常に読み取られた行数を取得することができます。また、読み取ったデータを使用してJOINなどの計算を実行するために、Flink SQLを使用することもできます。