Realtime synchronization from MySQL
Realtime synchronization from MySQL
\import InsertPrivNote from '../assets/commonMarkdown/insertPrivNote.md'
StarRocks supports real-time data synchronization from MySQL within seconds, delivering ultra-low latency real-time analytics at scale and enabling users to query real-time data as they happen.
This tutorial helps you learn how you can bring real-time analytics to your business and users. It demonstrates how to synchronize data from MySQL to StarRocks in real time by using the following tools: StarRocks Migration Tools (SMT), Flink, Flink CDC Connector, and flink-starrocks-connector.
How it works
The following figure illustrates the entire synchronization process.
Real-time synchronization from MySQL is implemented in two stages: synchronizing database & table schema and synchronizing data. First, the SMT converts MySQL database & table schema into table creation statements for StarRocks. Then, the Flink cluster runs Flink jobs to synchronize full and incremental MySQL data to StarRocks.
Note
The synchronization process guarantees exactly-once semantics.
Synchronization process:
- Synchronize database & table schema.The SMT reads the schema of the MySQL database & table to be synchronized and generates SQL files for creating a destination database & table in StarRocks. This operation is based on the MySQL and StarRocks information in SMT's configuration file.
- Synchronize data.a. The Flink SQL client executes the data loading statement
INSERT INTO SELECT
to submit one or more Flink jobs to the Flink cluster.b. The Flink cluster runs the Flink jobs to obtain data. The Flink CDC connector first reads full historical data from the source database, then seamlessly switches to incremental reading, and sends the data to flink-starrocks-connector.c. flink-starrocks-connector accumulates data in mini-batches, and synchronizes each batch of data to StarRocks.
Note
Only data manipulation language (DML) operations in MySQL can be synchronized to StarRocks. Data definition language (DDL) operations cannot be synchronized.
Scenarios
Real-time synchronization from MySQL has a broad range of use cases where data is constantly changed. Take a real-world use case "real-time ranking of commodity sales" as an example.
Flink calculates the real-time ranking of commodity sales based on the original order table in MySQL and synchronizes the ranking to StarRocks' Primary Key table in real time. Users can connect a visualization tool to StarRocks to view the ranking in real time to gain on-demand operational insights.
Preparations
Download and install synchronization tools
To synchronize data from MySQL, you need to install the following tools: SMT, Flink, Flink CDC connector, and flink-starrocks-connector.
-
Download and install Flink, and start the Flink cluster. You can also perform this step by following the instructions in Flink official documentation.a. Install Java 8 or Java 11 in your operating system before you run Flink. You can run the following command to check the installed Java version.
# View the Java version.
java -version
# Java 8 is installed if the following output is returned.
java version "1.8.0_301"
Java(TM) SE Runtime Environment (build 1.8.0_301-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.301-b09, mixed mode)b. Download the Flink installation package and decompress it. We recommend that you use Flink 1.14 or later. The minimum allowed version is Flink 1.11. This topic uses Flink 1.14.5.
# Download Flink.
wget https://archive.apache.org/dist/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
# Decompress Flink.
tar -xzf flink-1.14.5-bin-scala_2.11.tgz
# Go to the Flink directory.
cd flink-1.14.5c. Start the Flink cluster.
# Start the Flink cluster.
./bin/start-cluster.sh
# The Flink cluster is started if the following output is returned.
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host. -
Download Flink CDC connector. This topic uses MySQL as the data source and therefore,
flink-sql-connector-mysql-cdc-x.x.x.jar
is downloaded. The connector version must match the Flink version. For detailed version mapping, see Supported Flink Versions. This topic uses Flink 1.14.5 and you can downloadflink-sql-connector-mysql-cdc-2.2.0.jar
.wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.1.1/flink-sql-connector-mysql-cdc-2.2.0.jar
-
Download flink-connector-starrocks. The version must match the Flink version.
The flink-connector-starrocks package
x.x.x_flink-y.yy _ z.zz.jar
contains three version numbers:x.x.x
is the version number of flink-connector-starrocks.y.yy
is the supported Flink version.z.zz
is the Scala version supported by Flink. If the Flink version is 1.14.x or earlier, you must download a package that has the Scala version.
This topic uses Flink 1.14.5 and Scala 2.11. Therefore, you can download the following package:
1.2.3_flink-14_2.11.jar
. -
Move the JAR packages of Flink CDC connector (
flink-sql-connector-mysql-cdc-2.2.0.jar
) and flink-connector-starrocks (1.2.3_flink-1.14_2.11.jar
) to thelib
directory of Flink.Note
If a Flink cluster is already running in your system, you must stop the Flink cluster and restart it to load and validate the JAR packages.
$ ./bin/stop-cluster.sh
$ ./bin/start-cluster.sh -
Download and decompress the SMT package and place it in the
flink-1.14.5
directory. StarRocks provides SMT packages for Linux x86 and macos ARM64. You can choose one based on your operating system and CPU.# for Linux x86
wget https://releases.starrocks.io/resources/smt.tar.gz
# for macOS ARM64
wget https://releases.starrocks.io/resources/smt_darwin_arm64.tar.gz
Enable MySQL binary log
To synchronize data from MySQL in real time, the system needs to read data from MySQL binary log (binlog), parse the data, and then synchronize the data to StarRocks. Make sure that MySQL binary log is enabled.
-
Edit the MySQL configuration file
my.cnf
(default path:/etc/my.cnf
) to enable MySQL binary log.# Enable MySQL Binlog.
log_bin = ON
# Configure the save path for the Binlog.
log_bin =/var/lib/mysql/mysql-bin
# Configure server_id.
# If server_id is not configured for MySQL 5.7.3 or later, the MySQL service cannot be used.
server_id = 1
# Set the Binlog format to ROW.
binlog_format = ROW
# The base name of the Binlog file. An identifier is appended to identify each Binlog file.
log_bin_basename =/var/lib/mysql/mysql-bin
# The index file of Binlog files, which manages the directory of all Binlog files.
log_bin_index =/var/lib/mysql/mysql-bin.index -
Run one of the following commands to restart MySQL for the modified configuration file to take effect.
# Use service to restart MySQL.
service mysqld restart
# Use mysqld script to restart MySQL.
/etc/init.d/mysqld restart