Skip to main content

ALTER ROUTINE LOAD

ALTER ROUTINE LOAD

Description

This command is used to modify a Routine Load job that has been created. The job to be modified must be in the PAUSED state. You can run the PASUME(PAUSE ROUTINE LOAD) command to pause a loading job and then perform the ALTER ROUTINE LOAD operation on the job.

Syntax

Note: You do not need to specify the content enclosed in square brackets [].

ALTER ROUTINE LOAD FOR [db.]<job_name>

[load_properties]

[job_properties]

FROM data_source

[data_source_properties]

Parameters

  • [db.]<job_name>

The name of the job you want to modify.

  • load_properties

The properties of the data to be \imported.

Syntax:

[COLUMNS TERMINATED BY '<terminator>'],

[COLUMNS ([<column_name> [, ...] ] [, column_assignment [, ...] ] )],

[WHERE <expr>],

[PARTITION ([ <partition_name> [, ...] ])]



column_assignment:

<column_name> = column_expression
  1. Specify column separators.You can specify column separators for the CSV data you want to \import. For example, you can use commas (,) as column separators.

    COLUMNS TERMINATED BY ","

    The default separator is \t.

  2. Specify column mapping.Specify the mapping of columns in the source and destination tables, and define how derived columns are generated.Specify which columns in the source table correspond to which columns in the destination table in sequence. If you want to skip a column, you can specify a column name that does not exist. For example, the destination table has three columns k1, k2, and v1. The source table has four columns, of which the first, second, and fourth columns correspond to k2, k1, and v1. You can write the code as follows.

    COLUMNS (k2, k1, xxx, v1)

    xxx is the column that does not exist. It is used to skip the third column in the source table.Columns expressed in col_name = expr are derived columns. These columns are generated by using expr. Derived columns are usually placed after mapped columns. Although this is not a mandatory rule, StarRocks always parses mapped columns prior to derived columns. Assume that the destination table has a fourth column v2, which is generated by adding up k1 and k2. You can write the code as follows.

    COLUMNS (k2, k1, xxx, v1, v2 = k1 + k2);

    For CSV data, the number of mapped columns in COLUMNS must match the number of columns in the CSV file.

    • Mapped columns
    • Derived columns
  3. Specify filter conditions.You can specify filter conditions to filter out unwanted columns. The filter columns can be mapped columns or derived columns. For example, if you need to \import data from columns whose k1 value is greater than 100 and k2 value equals 1000, you can write the code as follows.

    WHERE k1 > 100 and k2 = 1000
  4. Specify the partitions into which you want to \import data.If no partitions are specified, data will be automatically \imported into StarRocks partitions based on the partition key values in the CSV data. Example:

    PARTITION(p1, p2, p3)
  • job_properties

The job parameters you want to modify. Currently, you can modify the following parameters:

  1. desired_concurrent_number
  2. max_error_number
  3. max_batch_interval
  4. max_batch_rows
  5. max_batch_size
  6. jsonpaths
  7. json_root
  8. strip_outer_array
  9. strict_mode
  10. timezone
  • data_source

The type of the data source. Currently, only Kafka data source is supported.

  • data_source_properties

The properties of the data source. The following properties are supported:

  1. kafka_partitionsYou can only modify Kafka partitions that have been consumed.
  2. kafka_offsetsYou can only modify partition offsets that have not been consumed.
  3. Custom properties such as property.group.id and property.group.id

You can only specify Kafka partitions that have been consumed in kafka_partitions. You can only specify partition offsets that have not been consumed in kafka_offsets.

Examples

Example 1: Change the value of desired_concurrent_number to 1. This parameter specifies the parallelism of jobs used to consume Kafka data.

ALTER ROUTINE LOAD FOR db1.label1

PROPERTIES

(

"desired_concurrent_number" = "1"

);

Example 2: Change the value of desired_concurrent_number to 10 and modify the partition offset and group ID.

ALTER ROUTINE LOAD FOR db1.label1

PROPERTIES

(

"desired_concurrent_number" = "10"

)

FROM kafka

(

"kafka_partitions" = "0, 1, 2",

"kafka_offsets" = "100, 200, 100",

"property.group.id" = "new_group"

);

Example 2: Change the filter condition to a > 0 and set the destination partition to p1.

ALTER ROUTINE LOAD FOR db1.label1

WHERE a > 0

PARTITION (p1)