Transform data at loading
Transform data at loading
\import InsertPrivNote from '../assets/commonMarkdown/insertPrivNote.md'
StarRocks supports data transformation at loading.
This feature supports Stream Load, Broker Load, and Routine Load but does not support Spark Load.
This topic uses CSV data as an example to describe how to extract and transform data at loading. The data file formats that are supported vary depending on the loading method of your choice.
NOTE
For CSV data, you can use a UTF-8 string, such as a comma (,), tab, or pipe (|), whose length does not exceed 50 bytes as a text delimiter.
Scenarios
When you load a data file into a StarRocks table, the data of the data file may not be completely mapped onto the data of the StarRocks table. In this situation, you do not need to extract or transform the data before you load it into the StarRocks table. StarRocks can help you extract and transform the data during loading:
- Skip columns that do not need to be loaded.You can skip the columns that do not need to be loaded. Additionally, if the columns of the data file are in a different order than the columns of the StarRocks table, you can create a column mapping between the data file and the StarRocks table.
- Filter out rows you do not want to load.You can specify filter conditions based on which StarRocks filters out the rows that you do not want to load.
- Generate new columns from original columns.Generated columns are special columns that are computed from the original columns of the data file. You can map the generated columns onto the columns of the StarRocks table.
- Extract partition field values from a file path.If the data file is generated from Apache Hive™, you can extract partition field values from the file path.
Prerequisites
Broker Load
See the "Background information" section in Load data from HDFS or Load data from cloud storage.
Routine load
If you choose Routine Load, make sure that topics are created in your Apache Kafka® cluster. Assume that you have created two topics: topic1
and topic2
.
Data examples
-
Create data files in your local file system.a. Create a data file named
file1.csv
. The file consists of four columns, which represent user ID, user gender, event date, and event type in sequence.354,female,2020-05-20,1
465,male,2020-05-21,2
576,female,2020-05-22,1
687,male,2020-05-23,2b. Create a data file named
file2.csv
. The file consists of only one column, which represents date.2020-05-20
2020-05-21
2020-05-22
2020-05-23 -
Create tables in your StarRocks database
test_db
.NOTE
Since v2.5.7, StarRocks can automatically set the number of buckets (BUCKETS) when you create a table or add a partition. You no longer need to manually set the number of buckets. For detailed information, see determine the number of buckets.
a. Create a table named
table1
, which consists of three columns:event_date
,event_type
, anduser_id
.MySQL [test_db]> CREATE TABLE table1
(
`event_date` DATE COMMENT "event date",
`event_type` TINYINT COMMENT "event type",
`user_id` BIGINT COMMENT "user ID"
)
DISTRIBUTED BY HASH(user_id);b. Create a table named
table2
, which consists of four columns:date
,year
,month
, andday
.MySQL [test_db]> CREATE TABLE table2
(
`date` DATE COMMENT "date",
`year` INT COMMENT "year",
`month` TINYINT COMMENT "month",
`day` TINYINT COMMENT "day"
)
DISTRIBUTED BY HASH(date); -
Upload
file1.csv
andfile2.csv
to the/user/starrocks/data/input/
path of your HDFS cluster, publish the data offile1.csv
totopic1
of your Kafka cluster, and publish the data offile2.csv
totopic2
of your Kafka cluster.
Skip columns that do not need to be loaded
The data file that you want to load into a StarRocks table may contain some columns that cannot be mapped to any columns of the StarRocks table. In this situation, StarRocks supports loading only the columns that can be mapped from the data file onto the columns of the StarRocks table.
This feature supports loading data from the following data sources:
-
Local file system
-
HDFS and cloud storage
NOTE
This section uses HDFS as an example.
-
Kafka
In most cases, the columns of a CSV file are not named. For some CSV files, the first row is composed of column names, but StarRocks processes the content of the first row as common data rather than column names. Therefore, when you load a CSV file, you must temporarily name the columns of the CSV file in sequence in the job creation statement or command. These temporarily named columns are mapped by name onto the columns of the StarRocks table. Take note of the following points about the columns of the data file:
- The data of the columns that can be mapped onto and are temporarily named by using the names of the columns in the StarRocks table is directly loaded.
- The columns that cannot be mapped onto the columns of the StarRocks table are ignored, the data of these columns are not loaded.
- If some columns can be mapped onto the columns of the StarRocks table but are not temporarily named in the job creation statement or command, the load job reports errors.
This section uses file1.csv
and table1
as an example. The four columns of file1.csv
are temporarily named as user_id
, user_gender
, event_date
, and event_type
in sequence. Among the temporarily named columns of file1.csv
, user_id
, event_date
, and event_type
can be mapped onto specific columns of table1
, whereas user_gender
cannot be mapped onto any column of table1
. Therefore, user_id
, event_date
, and event_type
are loaded into table1
, but user_gender
is not.
Load data
Load data from a local file system
If file1.csv
is stored in your local file system, run the following command to create a Stream Load job:
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "column_separator:," \
-H "columns: user_id, user_gender, event_date, event_type" \
-T file1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load
NOTE
If you choose Stream Load, you must use the
columns
parameter to temporarily name the columns of the data file to create a column mapping between the data file and the StarRocks table.
For detailed syntax and parameter descriptions, see STREAM LOAD.
Load data from an HDFS cluster
If file1.csv
is stored in your HDFS cluster, execute the following statement to create a Broker Load job:
LOAD LABEL test_db.label1
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/file1.csv")
INTO TABLE `table1`
FORMAT AS "csv"
COLUMNS TERMINATED BY ","
(user_id, user_gender, event_date, event_type)
)
WITH BROKER "broker1";
NOTE
If you choose Broker Load, you must use the
column_list
parameter to temporarily name the columns of the data file to create a column mapping between the data file and the StarRocks table.
For detailed syntax and parameter descriptions, see BROKER LOAD.
Load data from a Kafka cluster
If the data of file1.csv
is published to topic1
of your Kafka cluster, execute the following statement to create a Routine Load job:
CREATE ROUTINE LOAD test_db.table101 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS(user_id, user_gender, event_date, event_type)
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "topic1",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
NOTE
If you choose Routine Load, you must use the
COLUMNS
parameter to temporarily name the columns of the data file to create a column mapping between the data file and the StarRocks table.
For detailed syntax and parameter descriptions, see CREATE ROUTINE LOAD.
Query data
After the load of data from your local file system, HDFS cluster, or Kafka cluster is complete, query the data of table1
to verify that the load is successful:
MySQL [test_db]> SELECT * FROM table1;
+------------+------------+---------+
| event_date | event_type | user_id |
+------------+------------+---------+
| 2020-05-22 | 1 | 576 |
| 2020-05-20 | 1 | 354 |
| 2020-05-21 | 2 | 465 |
| 2020-05-23 | 2 | 687 |
+------------+------------+---------+
4 rows in set (0.01 sec)
Filter out rows that you do not want to load
When you load a data file into a StarRocks table, you may not want to load specific rows of the data file. In this situation, you can use the WHERE clause to specify the rows that you want to load. StarRocks filters out all rows that do not meet the filter conditions specified in the WHERE clause.
This feature supports loading data from the following data sources:
-
Local file system
-
HDFS and cloud storage
NOTE
This section uses HDFS as an example.
-
Kafka
This section uses file1.csv
and table1
as an example. If you want to load only the rows whose event type is 1
from file1.csv
into table1
, you can use the WHERE clause to specify a filter condition event_type = 1
.
Load data
Load data from a local file system
If file1.csv
is stored in your local file system, run the following command to create a Stream Load job:
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "column_separator:," \
-H "columns: user_id, user_gender, event_date, event_type" \
-H "where: event_type=1" \
-T file1.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table1/_stream_load
For detailed syntax and parameter descriptions, see STREAM LOAD.
Load data from an HDFS cluster
If file1.csv
is stored in your HDFS cluster, execute the following statement to create a Broker Load job:
LOAD LABEL test_db.label2
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/file1.csv")
INTO TABLE `table1`
FORMAT AS "csv"
COLUMNS TERMINATED BY ","
(user_id, user_gender, event_date, event_type)
WHERE event_type = 1
)
WITH BROKER "broker1";
For detailed syntax and parameter descriptions, see BROKER LOAD.
Load data from a Kafka cluster
If the data of file1.csv
is published to topic1
of your Kafka cluster, execute the following statement to create a Routine Load job:
CREATE ROUTINE LOAD test_db.table102 ON table1
COLUMNS TERMINATED BY ",",
COLUMNS (user_id, user_gender, event_date, event_type)
WHERE event_type = 1
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "topic1",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
For detailed syntax and parameter descriptions, see CREATE ROUTINE LOAD.
Query data
After the load of data from your local file system, HDFS cluster, or Kafka cluster is complete, query the data of table1
to verify that the load is successful:
MySQL [test_db]> SELECT * FROM table1;
+------------+------------+---------+
| event_date | event_type | user_id |
+------------+------------+---------+
| 2020-05-20 | 1 | 354 |
| 2020-05-22 | 1 | 576 |
+------------+------------+---------+
2 rows in set (0.01 sec)
Generate new columns from original columns
When you load a data file into a StarRocks table, some data of the data file may require conversions before the data can be loaded into the StarRocks table. In this situation, you can use functions or expressions in the job creation command or statement to implement data conversions.
This feature supports loading data from the following data sources:
-
Local file system
-
HDFS and cloud storage
NOTE
This section uses HDFS as an example.
-
Kafka
This section uses file2.csv
and table2
as an example. file2.csv
consists of only one column that represents date. You can use the year, month, and day functions to extract the year, month, and day in each date from file2.csv
and load the extracted data into the year
, month
, and day
columns of table2
.
Load data
Load data from a local file system
If file2.csv
is stored in your local file system, run the following command to create a Stream Load job:
curl --location-trusted -u <username>:<password> \
-H "Expect:100-continue" \
-H "column_separator:," \
-H "columns:date,year=year(date),month=month(date),day=day(date)" \
-T file2.csv -XPUT \
http://<fe_host>:<fe_http_port>/api/test_db/table2/_stream_load
NOTE
- In the
columns
parameter, you must first temporarily name all columns of the data file, and then temporarily name the new columns that you want to generate from the original columns of the data file. As shown in the preceding example, the only column offile2.csv
is temporarily named asdate
, and then theyear=year(date)
,month=month(date)
, andday=day(date)
functions are invoked to generate three new columns, which are temporarily named asyear
,month
, andday
.- Stream Load does not support
column_name = function(column_name)
but supportscolumn_name = function(column_name)
.
For detailed syntax and parameter descriptions, see STREAM LOAD.
Load data from an HDFS cluster
If file2.csv
is stored in your HDFS cluster, execute the following statement to create a Broker Load job:
LOAD LABEL test_db.label3
(
DATA INFILE("hdfs://<hdfs_host>:<hdfs_port>/user/starrocks/data/input/file2.csv")
INTO TABLE `table2`
FORMAT AS "csv"
COLUMNS TERMINATED BY ","
(date)
SET(year=year(date), month=month(date), day=day(date))
)
WITH BROKER "broker1";
NOTE
You must first use the
column_list
parameter to temporarily name all columns of the data file, and then use the SET clause to temporarily name the new columns that you want to generate from the original columns of the data file. As shown in the preceding example, the only column offile2.csv
is temporarily named asdate
in thecolumn_list
parameter, and then theyear=year(date)
,month=month(date)
, andday=day(date)
functions are invoked in the SET clause to generate three new columns, which are temporarily named asyear
,month
, andday
.
For detailed syntax and parameter descriptions, see BROKER LOAD.
Load data from a Kafka cluster
If the data of file2.csv
is published to topic2
of your Kafka cluster, execute the following statement to create a Routine Load job:
CREATE ROUTINE LOAD test_db.table201 ON table2
COLUMNS TERMINATED BY ",",
COLUMNS(date,year=year(date),month=month(date),day=day(date))
FROM KAFKA
(
"kafka_broker_list" = "<kafka_broker_host>:<kafka_broker_port>",
"kafka_topic" = "topic2",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
NOTE
In the
COLUMNS
parameter, you must first temporarily name all columns of the data file, and then temporarily name the new columns that you want to generate from the original columns of the data file. As shown in the preceding example, the only column offile2.csv
is temporarily named asdate
, and then theyear=year(date)
,month=month(date)
, andday=day(date)
functions are invoked to generate three new columns, which are temporarily named asyear
,month
, andday
.
For detailed syntax and parameter descriptions, see CREATE ROUTINE LOAD.
Query data
After the load of data from your local file system, HDFS cluster, or Kafka cluster is complete, query the data of table2
to verify that the load is successful:
MySQL [test_db]> SELECT * FROM table2;
+------------+------+-------+------+
| date | year | month | day |
+------------+------+-------+------+
| 2020-05-20 | 2020 | 5 | 20 |
| 2020-05-21 | 2020 | 5 | 21 |
| 2020-05-22 | 2020 | 5 | 22 |
| 2020-05-23 | 2020 | 5 | 23 |
+------------+------+-------+------+
4 rows in set (0.01 sec)
Extract partition field values from a file path
If the file path that you specify contains partition fields, you can use the COLUMNS FROM PATH AS
parameter to specify the partition fields you want to extract from the file paths. The partition fields in file paths are equivalent to the columns in data files. The COLUMNS FROM PATH AS
parameter is supported only when you load data from an HDFS cluster.
For example, you want to load the following four data files generated from Hive:
/user/starrocks/data/input/date=2020-05-20/data
1,354
/user/starrocks/data/input/date=2020-05-21/data
2,465
/user/starrocks/data/input/date=2020-05-22/data
1,576
/user/starrocks/data/input/date=2020-05-23/data
2,687
The four data files are stored in the /user/starrocks/data/input/
path of your HDFS cluster. Each of these data files is partitioned by partition field date
and consists of two columns, which represent event type and user ID in sequence.
Load data from an HDFS cluster
Execute the following statement to create a Broker Load job, which enables you to extract the date
partition field values from the /user/starrocks/data/input/
file path and use a wildcard (*) to specify that you want to load all data files in the file path to table1
:
LOAD LABEL test_db.label4
(
DATA INFILE("hdfs://<fe_host>:<fe_http_port>/user/starrocks/data/input/date=*/*")
INTO TABLE `table1`
FORMAT AS "csv"
COLUMNS TERMINATED BY ","
(event_type, user_id)
COLUMNS FROM PATH AS (date)
SET(event_date = date)
)
WITH BROKER "broker1";
NOTE
In the preceding example, the
date
partition field in the specified file path is equivalent to theevent_date
column oftable1
. Therefore, you need to use the SET clause to map thedate
partition field onto theevent_date
column. If the partition field in the specified file path has the same name as a column of the StarRocks table, you do not need to use the SET clause to create a mapping.
For detailed syntax and parameter descriptions, see BROKER LOAD.
Query data
After the load of data from your HDFS cluster is complete, query the data of table1
to verify that the load is successful:
MySQL [test_db]> SELECT * FROM table1;
+------------+------------+---------+
| event_date | event_type | user_id |
+------------+------------+---------+
| 2020-05-22 | 1 | 576 |
| 2020-05-20 | 1 | 354 |
| 2020-05-21 | 2 | 465 |
| 2020-05-23 | 2 | 687 |
+------------+------------+---------+
4 rows in set (0.01 sec)