メインコンテンツまでスキップ

Java UDFs

Java UDFs

v2.2.0以降、Javaプログラミング言語を使用して特定のビジネスニーズに合わせたユーザ定義関数(UDF)をコンパイルできます。

v3.0以降、StarRocksはグローバルUDFをサポートしており、関連するSQLステートメント(CREATE/SHOW/DROP)に GLOBAL キーワードを含めるだけで使用できます。

このトピックでは、さまざまなUDFの開発と使用方法について説明します。

現在、StarRocksではスカラUDF、ユーザ定義集約関数(UDAF)、ユーザ定義ウィンドウ関数(UDWF)、およびユーザ定義テーブル関数(UDTF)をサポートしています。

前提条件

  • MavenをインストールしてJavaプロジェクトを作成およびコンパイルできるようにしてください。
  • サーバにJDK 1.8をインストールしてください。
  • Java UDF機能が有効になっていることを確認してください。FE構成ファイル fe/conf/fe.confenable_udf オプションを true に設定し、FEノードを再起動して設定を有効にします。詳細については、「パラメータ設定」を参照してください。

UDFの開発と使用

Javaプログラミング言語を使用して必要なUDFを作成し、Mavenプロジェクトを作成してコンパイルする必要があります。

ステップ1: Mavenプロジェクトの作成

以下のような基本的なディレクトリ構造を持つMavenプロジェクトを作成してください。

project
|--pom.xml
|--src
| |--main
| | |--java
| | |--resources
| |--test
|--target

ステップ2: 依存関係の追加

以下の依存関係を pom.xml ファイルに追加してください。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>udf</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.10</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>

ステップ3: UDFのコンパイル

Javaプログラミング言語を使用してUDFをコンパイルします。

スカラUDFのコンパイル

スカラUDFは、1つのデータ行に対して処理され、1つの値を返します。クエリでスカラUDFを使用する場合、各行は結果セット内の単一の値に対応します。典型的なスカラ関数には UPPERLOWERROUNDABS などがあります。

例えば、JSONデータのフィールドの値がJSONオブジェクトではなくJSON文字列である場合、JSON文字列を抽出するために GET_JSON_STRING を2回実行する必要があります。例: GET_JSON_STRING(GET_JSON_STRING('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key"), "$.k0")

SQLステートメントを簡素化するために、直接JSON文字列を抽出するスカラUDFをコンパイルすることができます。例: MY_UDF_JSON_GET('{"key":"{\\"k0\\":\\"v0\\"}"}', "$.key.k0")

package com.starrocks.udf.sample;

import com.alibaba.fastjson.JSONPath;

public class UDFJsonGet {
public final String evaluate(String obj, String key) {
if (obj == null || key == null) return null;
try {
return JSONPath.read(obj, key).toString();
} catch (Exception e) {
return null;
}
}
}

このユーザ定義クラスは、以下の表に説明されているメソッドを実装する必要があります。

注意

メソッドのリクエストパラメータと戻り値のデータ型は、ステップ6で実行する CREATE FUNCTION ステートメントで宣言されたものと同じである必要があり、このトピックの「SQLデータ型とJavaデータ型のマッピング」セクションで提供されるマッピングに準拠している必要があります。

メソッド

説明

TYPE1 evaluate(TYPE2, ...)

UDFを実行します。evaluate() メソッドには public メンバーアクセスレベルが必要です。

UDAFのコンパイル

UDAFは、複数のデータ行に対して操作を行い、1つの値を返します。典型的な集約関数には、SUMCOUNTMAXMIN などがあります。これらの関数は、GROUP BY節で指定された複数のデータ行を集約し、1つの値を返します。

例えば、MY_SUM_INT という名前のUDAFをコンパイルしたいとします。組み込みの集約関数 SUM とは異なり、MY_SUM_INT 関数は BIGINT 型の値を返す代わりに、INT の要求パラメータと戻り値をサポートします。

package com.starrocks.udf.sample;

public class SumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
}

public State create() {
return new State();
}

public void destroy(State state) {
}

public final void update(State state, Integer val) {
if (val != null) {
state.counter+= val;
}
}

public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}

public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}

public Integer finalize(State state) {
return state.counter;
}
}

このユーザ定義クラスは、以下の表に説明されているメソッドを実装する必要があります。

注意

メソッドのリクエストパラメータと戻り値のデータ型は、ステップ6で実行する CREATE FUNCTION ステートメントで宣言されたものと同じである必要があり、このトピックの「SQLデータ型とJavaデータ型のマッピング」セクションで提供されるマッピングに準拠している必要があります。

メソッド

説明

State create()

状態を作成します。

void destroy(State)

状態を破棄します。

void update(State, ...)

状態を更新します。 State 以外にも、UDF宣言で1つ以上のリクエストパラメータを指定できます。

void serialize(State, ByteBuffer)

状態をバイトバッファにシリアライズします。

void merge(State, ByteBuffer)

バイトバッファから状態をデシリアライズしてマージします。

TYPE finalize(State)

状態からUDFの最終結果を取得します。

コンパイル時には、以下の表に説明されているバッファクラス java.nio.ByteBufferserializeLength ローカル変数を使用する必要があります。

クラスとローカル変数

説明

java.nio.ByteBuffer()

バッファクラスで、中間結果を格納します。中間結果はノード間で実行される際にシリアライズまたはデシリアライズされる場合があります。したがって、中間結果のデシリアライズの長さを許容するために、serializeLength 変数を使用する必要もあります。

serializeLength()

中間結果のデシリアライズの長さを指定します。単位: バイト。このローカル変数には INT 型の値を設定します。例えば、State { int counter = 0; public int serializeLength() { return 4; }} は、中間結果が INT データ型であり、デシリアライズの長さが 4 バイトであることを指定します。これらの設定は、ビジネス要件に基づいて調整することができます。例えば、中間結果のデータ型を LONG として、デシリアライズの長さを 8 バイトとする場合は、State { long counter = 0; public int serializeLength() { return 8; }} を指定します。

java.nio.ByteBuffer クラスに格納された中間結果のデシリアライズに関しては、以下の点に注意してください:

  • ByteBufferクラスに依存する remaining() メソッドは、状態をデシリアライズする際に呼び出すことはできません。
  • ByteBufferクラスには clear() メソッドを呼び出すことはできません。
  • serializeLength の値は、書き込まれたデータの長さと同じである必要があります。そうでない場合、シリアライズとデシリアライズの過程で間違った結果が生成されます。

UDWFのコンパイル

通常の集約関数とは異なり、UDWFは複数のデータ行(ウィンドウと呼ばれる)に対して操作を行い、各行に対して値を返します。ウィンドウ関数は、複数のセットにデータ行を区切る OVER 句を含みます。これにより、各セットのデータ行に対して計算を行い、各行に対して値を返します。

例えば、MY_WINDOW_SUM_INT という名前のUDWFをコンパイルしたいとします。組み込みの集約関数 SUM とは異なり、MY_WINDOW_SUM_INT 関数は BIGINT 型の値を返す代わりに、INT の要求パラメータと戻り値をサポートします。

package com.starrocks.udf.sample;

public class WindowSumInt {
public static class State {
int counter = 0;
public int serializeLength() { return 4; }
@Override
public String toString() {
return "State{" +
"counter=" + counter +
'}';
}
}

public State create() {
return new State();
}

public void destroy(State state) {

}

public void update(State state, Integer val) {
if (val != null) {
state.counter+=val;
}
}

public void serialize(State state, java.nio.ByteBuffer buff) {
buff.putInt(state.counter);
}

public void merge(State state, java.nio.ByteBuffer buffer) {
int val = buffer.getInt();
state.counter += val;
}

public Integer finalize(State state) {
return state.counter;
}

public void reset(State state) {
state.counter = 0;
}

public void windowUpdate(State state,
int peer_group_start, int peer_group_end,
int frame_start, int frame_end,
Integer[] inputs) {
for (int i = (int)frame_start; i < (int)frame_end; ++i) {
state.counter += inputs[i];
}
}
}

このユーザ定義クラスは、UDAFに必要なメソッド(UDWFは特別な集約関数です)と、以下の表に説明されている windowUpdate() メソッドを実装する必要があります。

注意

メソッドのリクエストパラメータと戻り値のデータ型は、ステップ6で実行する CREATE FUNCTION ステートメントで宣言されたものと同じである必要があり、このトピックの「SQLデータ型とJavaデータ型のマッピング」セクションで提供されるマッピングに準拠している必要があります。

メソッド

説明

void windowUpdate(State, int, int, int , int, ...)

ウィンドウのデータを更新します。UDWFについての詳細は、「 ウィンドウ関数 」を参照してください。このメソッドは、入力として行を受け入れるたびにウィンドウ情報を取得し、それに応じて中間結果を更新します。

  • peer_group_start: 現在のパーティションの開始位置。OVER句で PARTITION BY が使用され、パーティション列で同じ値を持つ行は同じパーティションと見なされます。
  • peer_group_end: 現在のパーティションの終了位置。
  • frame_start: 現在のウィンドウフレームの開始位置。ウィンドウフレーム句で指定された計算範囲で、現在の行とその前後の指定された距離内の行をカバーします。例: ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING は、現在の行、現在の行の前の行、および現在の行の後の行をカバーする計算範囲を指定します。
  • frame_end: 現在のウィンドウフレームの終了位置。
  • inputs: ウィンドウへの入力データ。データは特定のデータ型の配列パッケージをサポートしています。この例では、INT 値が入力として与えられ、配列パッケージは Integer[] です。

UDTFのコンパイル

UDTFは1行のデータを読み込み、複数の値を返し、テーブルと見なすことができます。通常、テーブル値関数は、行を列に変換するために使用されます。

注意

StarRocksは、複数行および 1 列で構成されるテーブルを返すUDTFを許可しています。

例えば、MY_UDF_SPLIT という名前のUDTFをコンパイルしたいとします。MY_UDF_SPLIT 関数はスペースをデリミタとして使用し、STRING データ型の要求パラメータと戻り値をサポートします。

package com.starrocks.udf.sample;

public class UDFSplit{
public String[] process(String in) {
if (in == null) return null;
return in.split(" ");
}
}

ユーザ定義クラスで定義されているメソッドは、以下の要件を満たす必要があります。

注意

メソッドのリクエストパラメータと戻り値のデータ型は、ステップ6で実行する CREATE FUNCTION ステートメントで宣言されたものと同じである必要があり、このトピックの「SQLデータ型とJavaデータ型のマッピング」セクションで提供されるマッピングに準拠している必要があります。

メソッド

説明

TYPE[] process()

UDTFを実行し、配列を返します。

ステップ4: Javaプロジェクトのパッケージング

次のコマンドを実行してJavaプロジェクトをパッケージ化してください。

mvn package

target フォルダに以下のJARファイルが生成されます: udf-1.0-SNAPSHOT.jar および udf-1.0-SNAPSHOT-jar-with-dependencies.jar

ステップ5: Javaプロジェクトのアップロード

HTTPサーバにJARファイル udf-1.0-SNAPSHOT-jar-with-dependencies.jar をアップロードし、StarRocksクラスタのすべてのFEおよびBEからアクセスできるようにしてください。次に、次のコマンドを実行してファイルをデプロイします。

mvn deploy

Pythonを使用して簡単なHTTPサーバを設定し、それにJARファイルをアップロードすることもできます。

注意

ステップ6で、FEはUDFのコードを含むJARファイルをチェックしチェックサムを計算し、BEはJARファイルをダウンロードして実行します。

ステップ6: StarRocksでUDFを作成

StarRocksでは、データベース名前空間およびグローバル名前空間の2つの名前空間にUDFを作成できます。

  • UDFに対して表示性能または分離性要件がない場合、グローバルUDFとして作成できます。その後、関数名にカタログおよびデータベース名の接頭辞を含めることなく関数名を使用してグローバルUDFを参照できます。
  • UDFに対して表示性能または分離性要件がある場合、または同じUDFを異なるデータベースで作成する必要がある場合、各個別のデータベースにUDFを作成できます。そのため、セッションが対象のデータベースに接続されている場合は、関数名を使用してUDFを参照できます。セッションが対象のデータベース以外のカタログまたはデータベースに接続されている場合は、関数名にカタログおよびデータベース名の接頭辞を含める必要があります。例: catalog.database.function

注記

グローバルUDFを作成および使用する場合は、システム管理者に必要な権限設定を依頼する必要があります。詳細については、「GRANT」を参照してください。

JARパッケージをアップロードした後、StarRocksでUDFを作成できます。グローバルUDFの場合、作成ステートメントに GLOBAL キーワードを含める必要があります。

構文

CREATE [GLOBAL][AGGREGATE | TABLE] FUNCTION function_name
(arg_type [, ...])
RETURNS return_type
PROPERTIES ("key" = "value" [, ...])

パラメータ

パラメータ

必須

説明

GLOBAL

なし

グローバルUDFを作成するかどうか。v3.0以降でサポートされます。

AGGREGATE

なし

UDAFまたはUDWFを作成するかどうか。

TABLE

なし

UDTFを作成するかどうか。AGGREGATE および TABLE が指定されない場合、スカラ関数が作成されます。

function_name

必須

作成する関数の名前。このパラメータにはデータベース名も含めることができます。例:db1.my_func を指定した場合、UDFはそのデータベースに作成されます。それ以外の場合は、UDFは現在のデータベースに作成されます。新しい関数とそのパラメータの名前は、宛先データベース内の既存の名前と同じではない必要があります。それ以外の場合、関数は作成できません。関数名が同じでも、パラメータが異なる場合は作成が成功します。

arg_type

必須

関数の引数の型。追加の引数は , ... として表されることができます。サポートされているデータ型については、「 SQLデータ型とJavaデータ型のマッピング 」を参照してください。

return_type

必須

関数の戻り値の型。サポートされているデータ型については、「Java UDF」を参照してください。

PROPERTIES

必須

関数のプロパティ。作成するUDFのタイプによって異なります。

スカラUDFの作成

以下のコマンドを実行して、前述の例でコンパイルしたスカラUDFを作成します。

CREATE [GLOBAL] FUNCTION MY_UDF_JSON_GET(string, string) 
RETURNS string
properties (
"symbol" = "com.starrocks.udf.sample.UDFJsonGet",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

パラメータ

説明

symbol

UDFが所属するMavenプロジェクトのクラスの名前。このパラメータの値は <package_name>.<class_name> の形式で指定します。

type

UDFのタイプ。 StarrocksJar という値を設定します。これにより、UDFがJavaベースの関数であることが指定されます。

file

UDFのコードを含むJARファイルをダウンロードできるHTTPのURL。このパラメータの値は http://<http_server_ip>:<http_server_port>/<jar_package_name> の形式で指定します。

UDAFの作成

以下のコマンドを実行して、前述の例でコンパイルしたUDAFを作成します。

CREATE [GLOBAL] AGGREGATE FUNCTION MY_SUM_INT(INT) 
RETURNS INT
PROPERTIES
(
"symbol" = "com.starrocks.udf.sample.SumInt",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

PROPERTIES内のパラメータの説明は、「スカラUDFの作成」と同じです。

UDWFの作成

以下のコマンドを実行して、前述の例でコンパイルしたUDWFを作成します。

CREATE [GLOBAL] AGGREGATE FUNCTION MY_WINDOW_SUM_INT(Int)
RETURNS Int
properties
(
"analytic" = "true",
"symbol" = "com.starrocks.udf.sample.WindowSumInt",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

analytic:UDFがウィンドウ関数であるかどうか。値を true に設定します。その他のプロパティの説明は、「スカラUDFの作成」と同じです。

UDTFの作成

以下のコマンドを実行して、前述の例でコンパイルしたUDTFを作成します。

CREATE [GLOBAL] TABLE FUNCTION MY_UDF_SPLIT(string)
RETURNS string
properties
(
"symbol" = "com.starrocks.udf.sample.UDFSplit",
"type" = "StarrocksJar",
"file" = "http://http_host:http_port/udf-1.0-SNAPSHOT-jar-with-dependencies.jar"
);

PROPERTIES内のパラメータの説明は、「スカラUDFの作成」と同じです。

ステップ7: UDFの使用

UDFを作成した後、ビジネスニーズに基づいてテストおよび使用することができます。

スカラUDFの使用

以下のコマンドを実行して、前述の例で作成したスカラUDFを使用します。

SELECT MY_UDF_JSON_GET('{"key":"{\\"in\\":2}"}', '$.key.in');

UDAFの使用

以下のコマンドを実行して、前述の例で作成したUDAFを使用します。

SELECT MY_SUM_INT(col1);

UDWFの使用

以下のコマンドを実行して、前述の例で作成したUDWFを使用します。

SELECT MY_WINDOW_SUM_INT(intcol) 
OVER (PARTITION BY intcol2
ORDER BY intcol3
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
FROM test_basic;

UDTFの使用

以下のコマンドを実行して、前述の例で作成したUDTFを使用します。

-- t1という名前のテーブルがあり、a、b、c1の列の情報は次のようになっているものとします:
SELECT t1.a,t1.b,t1.c1 FROM t1;
> 出力:
1,2.1,"hello world"
2,2.2,"hello UDTF."

-- MY_UDF_SPLIT() 関数を実行します。
SELECT t1.a,t1.b, MY_UDF_SPLIT FROM t1, MY_UDF_SPLIT(t1.c1);
> 出力:
1,2.1,"hello"
1,2.1,"world"
2,2.2,"hello"
2,2.2,"UDTF."

注意

  • 上記のコードスニペットの最初の MY_UDF_SPLIT は、2番目の関数から返されるカラムの別名です。2番目の関数は関数です。
  • 返されるテーブルとその列の別名を指定するためには、AS t2(f1) のような構文を使用できません。

UDFの表示

以下のコマンドを実行してUDFをクエリできます。

SHOW [GLOBAL] FUNCTIONS;

詳細については、「SHOW FUNCTIONS」を参照してください。

UDFの削除

以下のコマンドを実行してUDFを削除できます。

DROP [GLOBAL] FUNCTION <function_name>(arg_type [, ...]);

詳細については、「DROP FUNCTION」を参照してください。

SQLデータ型とJavaデータ型のマッピング

SQLデータ型

Javaデータ型

BOOLEAN

java.lang.Boolean

TINYINT

java.lang.Byte

SMALLINT

java.lang.Short

INT

java.lang.Integer

BIGINT

java.lang.Long

FLOAT

java.lang.Float

DOUBLE

java.lang.Double

STRING/VARCHAR

java.lang.String

パラメータ設定

メモリ使用量を制御するために、StarRocksクラスタ内の各Java仮想マシン(JVM)の be/conf/hadoop_env.sh ファイルに以下の環境変数を設定できます。また、他のパラメータもファイルで設定することができます。

export LIBHDFS_OPTS="-Xloggc:$STARROCKS_HOME/log/be.gc.log -server"

FAQ

UDFの作成時に静的変数を使用できますか?さまざまなUDFの静的変数は相互に影響しますか?

はい、UDFのコンパイル時に静的変数を使用できます。異なるUDFの静的変数は互いに影響しません。クラス名が同じであっても、UDFが異なるクラスに属している場合でも、静的変数は互いに影響しません。