メインコンテンツまでスキップ

Colocate Join

Colocate Join(コロケーションジョイン)

シャッフルジョインとブロードキャストジョインでは、ジョイン条件が満たされた場合、ジョインする2つの結合テーブルのデータ行は1つのノードにマージされてジョインが完了します。これら2つのジョイン方法は、ノード間のデータネットワークの転送によって引き起こされるレイテンシやオーバーヘッドを避けることはできません。

Colocate Joinのコアアイデアは、同じコロケーショングループ内のテーブルのバケットキー、コピー数、およびコピー配置を一貫させることです。ジョインカラムがバケットキーである場合、計算ノードは他のノードからデータを取得せずにローカルジョインのみを行う必要があります。Colocate Joinは、等価結合(equi join)をサポートしています。

このドキュメントでは、Colocate Joinの原理、実装、使用方法、および考慮事項について紹介します。

用語

  • コロケーショングループ(CG):コロケーショングループには1つ以上のテーブルが含まれます。CG内のテーブルは、同じバケット化とレプリカ配置を持ち、Colocation Group Schemaを使用して説明されます。
  • Colocation Group Schema(CGS):CGSには、CGのバケットキー、バケット数、およびレプリカ数が含まれます。

原理

Colocate Joinは、同じCGSを持つ一連のテーブルでCGを形成し、これらのテーブルの対応するバケットコピーが同じ一連のBEノードに配置されることを保証します。CG内のテーブルがバケット化された列でJoin操作を実行する場合、ローカルデータを直接ジョインすることができ、ノード間でデータを転送する時間を節約することができます。

バケットシーケンスは hash(key) mod buckets によって取得されます。あるテーブルに8つのバケットがあるとすると、[0, 1, 2, 3, 4, 5, 6, 7] の8つのバケットが存在し、各バケットには1つ以上のサブテーブルがあります。サブテーブルの数はパーティションの数に依存します。複数のパーティションを持つテーブルの場合、複数のタブレットが存在します。

同じCG内のテーブルは、以下の条件を満たす必要があります。

  1. 同じCG内のテーブルは、同一のバケットキー(型、数、順序)と同じバケット数を持たなければなりません。これにより、複数のテーブルのデータスライスを1つずつ分割して制御することができます。バケットキーはテーブル作成文の DISTRIBUTED BY HASH(col1, col2, ...) で指定される列です。バケットキーは、データのどの列が異なるバケットシーケンスにハッシュされるかを決定します。バケットキーの名前は、同じCG内のテーブルによって異なる場合があります。バケットキーの列はテーブル作成文で異なる場合がありますが、DISTRIBUTED BY HASH(col1, col2, ...) の対応するデータ型の順序はまったく同じでなければなりません。
  2. 同じCG内のテーブルは、同じ数のパーティションコピーを持たなければなりません。そうでない場合、同じBEのパーティションに対応するコピーが存在しないことがあります。
  3. 同じCG内のテーブルは、異なるパーティション数と異なるパーティションキーを持つ場合があります。

テーブルを作成する際は、テーブルPROPERTIESの属性 "colocate_with" = "group_name" でCGを指定します。CGが存在しない場合、テーブルはCGの最初のテーブルであることを意味し、親テーブルと呼ばれます。親テーブルのデータ分布(分割バケットキーの型、数、順序、コピー数、分割バケット数)がCGSを決定します。CGが存在する場合は、テーブルのデータ分布がCGSと一致しているかどうかを確認します。

同じCG内のテーブルのコピー配置は、次の条件を満たします。

  1. すべてのテーブルのバケットシーケンスとBEノードとのマッピングは、親テーブルのバケットシーケンスとBEノードとのマッピングと同じであること。
  2. 親テーブルのすべてのパーティションのバケットシーケンスとBEノードとのマッピングは、最初のパーティションと同じであること。
  3. 親テーブルの最初のパーティションのバケットシーケンスとBEノードとのマッピングは、ネイティブのRound Robinアルゴリズムを使用して決定されます。

一貫したデータ分布とマッピングにより、バケットキーの値が同じデータ行は同じBEに配置されます。そのため、バケットキーを使用して列をジョインする場合は、ローカルジョインのみが必要です。

使用方法

テーブルの作成

テーブルを作成する際、PROPERTIESで属性 "colocate_with" = "group_name" を指定して、テーブルがColocate Joinテーブルであり、指定されたコロケーショングループに属することを示します。

注釈

バージョン2.5.4以降、異なるデータベースのテーブルでもColocate Joinを実行できるようになりました。テーブルを作成する際に同じ colocate_with プロパティを指定するだけです。

例:

CREATE TABLE tbl (k1 int, v1 int sum)
DISTRIBUTED BY HASH(k1)
BUCKETS 8
PROPERTIES(
"colocate_with" = "group1"
);

指定したグループが存在しない場合、StarRocksは自動的に現在のテーブルのみを含むグループを作成します。グループが存在する場合、StarRocksは現在のテーブルがColocation Group Schemaを満たすかどうかを確認し、満たす場合はテーブルを作成してグループに追加します。同時に、テーブルは既存のグループのデータ分布ルールに基づいてパーティションとタブレットを作成します。

Colocation Groupはデータベースに所属します。Colocation Groupの名前はデータベース内で一意です。内部ストレージでは、Colocation Groupの完全な名前は dbId_groupName ですが、 groupName のみが知られています。

注釈

Colocate Joinのための同じコロケーショングループを異なるデータベースのテーブルに関連付ける場合、コロケーショングループは各データベースに存在します。異なるデータベースのコロケーショングループを確認するには、 show proc "/colocation_group" を実行します。

削除

完全な削除とは、ゴミ箱からの削除のことです。通常、DROP TABLE コマンドでテーブルを削除すると、デフォルトでは削除されるまで1日間ゴミ箱に保持されます)。グループ内の最後のテーブルが完全に削除されると、グループも自動的に削除されます。

グループ情報の表示

以下のコマンドを使用して、クラスタ内に既に存在するグループの情報を表示できます。

SHOW PROC '/colocation_group';

+-------------+--------------+--------------+------------+----------------+----------+----------+
| GroupId | GroupName | TableIds | BucketsNum | ReplicationNum | DistCols | IsStable |
+-------------+--------------+--------------+------------+----------------+----------+----------+
| 10005.10008 | 10005_group1 | 10007, 10040 | 10 | 3 | int(11) | true |
+-------------+--------------+--------------+------------+----------------+----------+----------+
  • GroupId: グループのクラスタ全体で一意の識別子であり、前半がdb id、後半がグループidです。
  • GroupName: グループの完全な名前です。
  • TabletIds: グループ内のテーブルのIDのリストです。
  • BucketsNum: バケットの数です。
  • ReplicationNum: レプリカの数です。
  • DistCols: ディストリビューションカラム、つまりバケット化カラムの型です。
  • IsStable: グループが安定しているかどうか(安定性の定義については、コロケーションレプリカのバランスと修復のセクションを参照)。

以下のコマンドでグループのデータ分布も確認できます。

SHOW PROC '/colocation_group/10005.10008';

+-------------+---------------------+
| BucketIndex | BackendIds |
+-------------+---------------------+
| 0 | 10004, 10002, 10001 |
| 1 | 10003, 10002, 10004 |
| 2 | 10002, 10004, 10001 |
| 3 | 10003, 10002, 10004 |
| 4 | 10002, 10004, 10003 |
| 5 | 10003, 10002, 10001 |
| 6 | 10003, 10004, 10001 |
| 7 | 10003, 10004, 10002 |
+-------------+---------------------+
  • BucketIndex: バケットシーケンスの添字です。
  • BackendIds: バケット化データスライスが配置されているBEノードのIDです。

注意:上記のコマンドはAMDIN権限が必要です。一般ユーザーはアクセスできません。

テーブルグループプロパティの変更

テーブルのColocation Groupプロパティを変更できます。例:

ALTER TABLE tbl SET ("colocate_with" = "group2");

テーブルが以前に別のグループに割り当てられていた場合、コマンドはテーブルを元のグループから削除し、新しいグループに追加します(グループが存在しない場合はまずグループを作成します)。

また、次のコマンドを使用してテーブルのColocateプロパティを削除することもできます。

ALTER TABLE tbl SET ("colocate_with" = "");

その他の関連操作

Colocation属性を持つテーブルに対して ADD PARTITION を使用してパーティションを追加したり、コピー数を変更したりする際、StarRocksはColocation Group Schemaが違反されないかどうかを確認し、違反する場合は拒否します。

コロケーションレプリカのバランスおよび修復

Colocationテーブルのレプリカ分布は、Groupスキーマで指定された分配ルールに従う必要があるため、通常のシャーディングとは異なるレプリカの修復とバランスに関して異なります。

Group自体には stable プロパティがあります。 stabletrue の場合、現在のGroup内のテーブルスライスに変更はなく、Colocation機能が正常に動作していることを意味します。 stablefalse の場合、現在のGroup内の一部のテーブルスライスが修復または移行されていることを意味し、影響を受けるテーブルのColocate Joinは通常のJoinに低下します。

レプリカの修復

レプリカは指定されたBEノードにのみ格納できます。StarRocksは、利用可能なBE(たとえば、ダウン、除去など)のうち負荷が最も少ないBEを探し、そのBEで利用できるようになれば古いBE上のバケット化データスライスを修復します。マイグレーション中、グループは不安定(Unstable) とマークされます。

レプリカのバランス

StarRocksは、ColocationテーブルスライスをすべてのBEノードに均等に分散させるように試みます。通常のテーブルのバランスは、レプリカ単位で行われます。つまり、各レプリカは個別により負荷の低いBEノードを見つけます。Colocationテーブルの場合、バランスはバケットレベルで行われます。つまり、同じバケット内のすべてのレプリカが一緒に移動します。私たちは実際のレプリカのサイズではなく、レプリカの数のみを考慮して、BucketsSequenceをすべてのBEノードに均等に分散させるシンプルなバランスアルゴリズムを使用しています。正確なアルゴリズムの詳細は、 ColocateTableBalancer.java のコードコメントにあります。

注意1:現在のColocationレプリカのバランスおよび修復アルゴリズムは、異質なデプロイメントを持つStarRocksクラスタではうまく機能しない場合があります。異種デプロイメントとは、BEノードのディスク容量、ディスク数、およびディスクタイプ(SSDとHDD)が一致していない場合を指します。異種デプロイメントの場合、小容量のBEノードが大容量のBEノードと同じ数のレプリカを格納する可能性があります。

注意2:グループが不安定(Unstable)な状態の場合、そのテーブルのJoinは通常のJoinに低下し、クラスタのクエリパフォーマンスが著しく低下する可能性があります。システムが自動的にバランスを取らないようにする場合は、FE設定の disable_colocate_balance を設定して自動的なバランスを無効化し、適切なタイミングで再度有効にします。 (詳細については、セクション「高度な操作」を参照してください)

クエリ

Colocationテーブルは通常のテーブルと同様にクエリされます。Colocationテーブルが所属するグループが不安定な状態にある場合、自動的に通常のJoinに低下します。以下の例を参照してください。

テーブル1:

CREATE TABLE `tbl1` (
`k1` date NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` int(11) SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
PARTITION BY RANGE(`k1`)
(
PARTITION p1 VALUES LESS THAN ('2019-05-31'),
PARTITION p2 VALUES LESS THAN ('2019-06-30')
)
DISTRIBUTED BY HASH(`k2`)
PROPERTIES (
"colocate_with" = "group1"
);

テーブル2:

CREATE TABLE `tbl2` (
`k1` datetime NOT NULL COMMENT "",
`k2` int(11) NOT NULL COMMENT "",
`v1` double SUM NOT NULL COMMENT ""
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`)
DISTRIBUTED BY HASH(`k2`)
PROPERTIES (
"colocate_with" = "group1"
);

クエリプランの表示:

DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);

+----------------------------------------------------+
| Explain String |
+----------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`tbl1`.`k1` | |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN |
| | hash predicates: |
| | colocate: true |
| | `tbl1`.`k2` = `tbl2`.`k2` |
| | tuple ids: 0 1 |
| | |
| |----1:OlapScanNode |
| | TABLE: tbl2 |
| | PREAGGREGATION: OFF. Reason: null |
| | partitions=0/1 |
| | rollup: null |
| | buckets=0/0 |
| | cardinality=-1 |
| | avgRowSize=0.0 |
| | numNodes=0 |
| | tuple ids: 1 |
| | |
| 0:OlapScanNode |
| TABLE: tbl1 |
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
| partitions=0/2 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 0 |
+----------------------------------------------------+

Colocate Joinが有効になる場合、HASH JOINノードには colocate: true という表示がされます。

有効にならない場合、クエリプランは次のようになります:

+----------------------------------------------------+
| Explain String |
+----------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:`tbl1`.`k1` | |
| PARTITION: RANDOM |
| |
| RESULT SINK |
| |
| 2:HASH JOIN |
| | join op: INNER JOIN (BROADCAST) |
| | hash predicates: |
| | colocate: false, reason: group is not stable |
| | `tbl1`.`k2` = `tbl2`.`k2` |
| | tuple ids: 0 1 |
| | |
| |----3:EXCHANGE |
| | tuple ids: 1 |
| | |
| 0:OlapScanNode |
| TABLE: tbl1 |
| PREAGGREGATION: OFF. Reason: No AggregateInfo |
| partitions=0/2 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 0 |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 03 |
| UNPARTITIONED |
| |
| 1:OlapScanNode |
| TABLE: tbl2 |
| PREAGGREGATION: OFF. Reason: null |
| partitions=0/1 |
| rollup: null |
| buckets=0/0 |
| cardinality=-1 |
| avgRowSize=0.0 |
| numNodes=0 |
| tuple ids: 1 |
+----------------------------------------------------+

HASH JOINノードには colocate: false, reason: group is not stable という対応する理由が表示されます。同時にEXCHANGEノードが生成されます。

高度な操作

FE設定項目

  • disable_colocate_relocate

StarRocksのColocationレプリカの自動修復を無効にするかどうか。デフォルトではfalseで、オンになっています。このパラメータは、Colocationテーブルのレプリカの修復にのみ影響を与え、通常のテーブルには影響しません。

  • disable_colocate_balance

StarRocksのColocationレプリカの自動バランスを無効にするかどうか。デフォルトではfalseで、オンになっています。このパラメータは、Colocationテーブルのレプリカのバランスにのみ影響を与え、通常のテーブルには影響しません。

  • disable_colocate_join

この変数を変更することで、セッション単位でColocate Joinを無効にすることができます。

HTTP Restful API

StarRocksは、Colocate Joinに関連するいくつかのHTTP Restful APIを提供しており、Colocationグループの表示や変更が可能です。

このAPIはFE上で実装されており、ADMIN権限を持っている場合に fe_host:fe_http_port を使用してアクセスできます。

  1. クラスタのすべてのColocation情報を表示する

    curl --location-trusted -u<username>:<password> 'http://<fe_host>:<fe_http_port>/api/colocate'
    // 内部のColocation情報をJson形式で返します。
    {
    "colocate_meta": {
    "groupName2Id": {
    "g1": {
    "dbId": 10005,
    "grpId": 10008
    }
    },
    "group2Tables": {},
    "table2Group": {
    "10007": {
    "dbId": 10005,
    "grpId": 10008
    },
    "10040": {
    "dbId": 10005,
    "grpId": 10008
    }
    },
    "group2Schema": {
    "10005.10008": {
    "groupId": {
    "dbId": 10005,
    "grpId": 10008
    },
    "distributionColTypes": [{
    "type": "INT",
    "len": -1,
    "isAssignedStrLenInColDefinition": false,
    "precision": 0,
    "scale": 0
    }],
    "bucketsNum": 10,
    "replicationNum": 2
    }
    },
    "group2BackendsPerBucketSeq": {
    "10005.10008": [
    [10004, 10002],
    [10003, 10002],
    [10002, 10004],
    [10003, 10002],
    [10002, 10004],
    [10003, 10002],
    [10003, 10004],
    [10003, 10004],
    [10003, 10004],
    [10002, 10004]
    ]
    },
    "unstableGroups": []
    },
    "status": "OK"
    }
  2. グループを安定または不安定にマークする

    # 安定にマークする
    curl -XPOST --location-trusted -u<username>:<password>'http://<fe_host>:<fe_http_port>/api/colocate/group_stable?db_id=<dbId>&group_id=<grpId>​'
    # 不安定にマークする
    curl -XPOST --location-trusted -u<username>:<password>'http://<fe_host>:<fe_http_port>/api/colocate/group_unstable?db_id=<dbId>&group_id=<grpId>​'

    返された結果が 200 の場合、グループは正常に安定または不安定にマークされました。

  3. グループのデータ分布を設定するこのインターフェースを使用すると、グループの数値分布を強制的に設定できます。POST /api/colocate/bucketseq?db_id=10005&group_id= 10008Body:[[10004,10002],[10003,10002],[10002,10004],[10003,10002],[10002,10004],[10003,10002],[10003,10004],[10003,10004],[10003,10004],[10002,10004]]returns: 200ここで、Body は階層化された配列であり、バケット化スライスが存在するBEのIDです。

    このコマンドを使用するには、FEの設定である disable_colocate_relocate および disable_colocate_balance をtrueに設定する必要があるかもしれません。つまり、システムが自動的にColocationレプリカの修復とバランスを行わないようにするためです。それによって変更後にシステムが自動的にリセットされる可能性があります。