メインコンテンツまでスキップ

Synchronize data from MySQL in real time

MySQLとのリアルタイムデータ同期

Flinkジョブがエラーを報告した場合、どのように対処すればよいですか?

Flinkジョブがエラーを報告し、Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing.というエラーが表示される場合、次の可能性が考えられます。

SMT構成ファイルconfig_prod.conf[table-rule.1][table-rule.2]などの複数のルールセットに必要な構成情報が不足している可能性があります。

各ルールセット([table-rule.1][table-rule.2]など)が必要なデータベース、テーブル、Flinkコネクタの情報で構成されているかどうかを確認してください。

Flinkが障害時に自動的にタスクを再起動する方法はありますか?

Flinkは、チェックポイントメカニズムリスタート戦略を介して、障害時に自動的にタスクを再起動します。

たとえば、チェックポイントメカニズムを有効にし、固定遅延リスタート戦略を使用する場合、次の情報をflink-conf.yamlという構成ファイルに設定できます。

execution.checkpointing.interval: 300000
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory

パラメータの説明:

注意

Flinkドキュメントの詳細なパラメータの説明については、チェックポイントを参照してください。

  • execution.checkpointing.interval: チェックポイントの基準となる時間間隔。単位: ミリ秒。チェックポイントメカニズムを有効にするには、このパラメータを0より大きい値に設定する必要があります。
  • state.backend: ステートが内部的にどのように表現され、チェックポイント時にどのようにおよびどこに永続化されるかを決定するためのステートバックエンドを指定します。filesystemrocksdbなどの一般的な値があります。チェックポイントメカニズムが有効になると、ステートはチェックポイント時に永続化され、データの損失を防ぎ、リカバリ後のデータの一貫性を確保します。ステートの詳細については、ステートバックエンドを参照してください。
  • state.checkpoints.dir: チェックポイントが書き込まれるディレクトリ。

Flinkジョブを手動で停止し、後で停止前の状態に復元するにはどうすればよいですか?

Flinkジョブを停止する際に手動でセーブポイントをトリガーし、後で指定したセーブポイントからFlinkジョブを復元することができます。セーブポイントは、ストリーミングFlinkジョブの実行状態の一貫したイメージであり、チェックポイントメカニズムに基づいて作成されます。

  1. セーブポイント付きでFlinkジョブを停止します。次のコマンドは、FlinkジョブjobIdに対してセーブポイントを自動的にトリガーし、Flinkジョブを停止します。さらに、セーブポイントを保存するための対象ファイルシステムディレクトリを指定することもできます。

    bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId

    パラメータの説明:

    state.savepoints.dir: [file:// or hdfs://]/home/user/savepoints_dir
    • jobId: Flink WebUIでFlinkジョブのIDを表示するか、コマンドラインでflink list -runningを実行してFlinkジョブのIDを確認できます。
    • targetDirectory: Flink構成ファイルflink-conf.ymlstate.savepoints.dirをデフォルトのセーブポイント保存ディレクトリとして指定できます。セーブポイントがトリガーされると、セーブポイントはこのデフォルトディレクトリに保存され、ディレクトリを指定する必要はありません。
  2. 指定したセーブポイントでFlinkジョブを再提出します。

    ./flink run -c com.starrocks.connector.flink.tools.ExecuteSQL -s savepoints_dir/savepoints-xxxxxxxx flink-connector-starrocks-xxxx.jar -f flink-create.all.sql