ClickHouse 最早如果要实现 UDF 需要修改源码通过 C++ 代码实现,不过这种方式难度比较大而且需要重新编译,对于实现逻辑简单或者性能要求不太高的任务可以借助于内置的 SQL UDF 或者外部可执行文件来实现,这种方式实现起来比较快,很多情况下其实都可以满足需要。
主要方法有如下两种:
- 通过
CREATE FUNCTION
SQL 语句来定义 UDF,参考文档:https://clickhouse.com/docs/en/sql-reference/statements/create/function - 通过外部可执行文件来定义 UDF,参考文档:https://clickhouse.com/docs/sql-reference/functions/udf
不过目前第二种方法只能在 ClickHouse Cloud 中才可用,属于付费的功能,但是在 22 或 23 版本的 ClickHouse 中这部分功能是直接可用的。
下面分别看一下这两种 UDF 的简单用法。
1. 使用 SQL 自定义 UDF
这种方式是最快捷的实现 UDF 的方式,好处是可以借助于 ClickHouse 现有的内置函数,通过组合实现需求,对于一些逻辑比较简单的函数还是比较方便的,比如下面的函数:
CREATE FUNCTION linear_equation AS (x, k, b) -> k*x + b;
其中函数名为 linear_equation
,参数为 x,k,b
这 3 个,后面是要返回的表达式。
定义之后使用如下:
SELECT number, linear_equation(number, 2, 1) FROM numbers(3);
对于更复杂一些的,还可以在函数中添加判断逻辑,比如:
CREATE FUNCTION resolve_block AS v -> if(length(v) >= 11, substring(v, 2, 10), '')
这样就可以实现根据条件判断返回不同的结果了,比如我们这里输入是字符串,如果满足长度我们就返回其中的一部分,否则就返回空,测试用法如下:
SELECT resolve_block('1lookingfor.')
返回结果如下:
┌─if(greaterOrEquals(length('1lookingfor.'), 11), substring('1lookingfor.', 2, 10), '')─┐
│ lookingfor │
└───────────────────────────────────────────────────────────────────────────────────────┘
函数被创建后默认会被保存到本地数据目录下的 user_defined
目录下的 SQL 文件中,因此函数是持久的,重启后仍然可以使用。
另外创建后的函数只能在当前机器才可以使用,因为其他机器并没有定义当前的函数,所以如果想在所有节点都可以使用需要每个节点都创建一次。虽然函数只能在当前创建的节点使用,但是查询分布式表没有影响,因为我们看上面的返回其实就是拿我们的表达式去替换了 SQL 中的函数,返回的列仍然是一个复杂的表达式,所以如果是查询的分布式表,那么 SQL 首先会解析为具体的表达式,然后再发送到其他节点执行,那么就算其他节点没有定义这个函数,执行也是不受任何影响的。
2. 使用外部可执行文件定义 UDF
对于稍微复杂点的函数可以通过外部可执行文件的方式来实现,ClickHouse 会通过配置文件定义好的入参格式将参数通过标准输入管道传入可执行文件,然后可执行文件必须将结果从标准输出中返回,这样 ClickHouse 就可以拿到返回并应用到 SQL 执行过程中,这样就完成了调用外部可执行文件实现 UDF 的过程。
官方文档已经给出了 Python 实现 UDF 的过程,这里我们再以 Go 为例实现一个自定义函数的示例,实现的主要代码如下:
package main
import (
"bufio"
"fmt"
"io"
"os"
"strconv"
"strings"
"github.com/tenfyzhong/cityhash"
)
func generateRowId(userId uint32, requestId string) uint64 {
hash32 := cityhash.CityHash32([]byte(requestId))
return uint64(userId)*10000000000 + uint64(hash32)
}
func main() {
reader := bufio.NewReader(os.Stdin)
writer := bufio.NewWriter(os.Stdout)
for {
line, err := reader.ReadString('\n')
if err != nil {
if err == io.EOF {
break
}
fmt.Println("err:", err)
break
}
inputArgs := strings.Split(strings.TrimRight(line, "\n"), "\t")
if len(inputArgs) != 2 {
fmt.Println("input err:", inputArgs)
break
}
userId, err := strconv.ParseUint(inputArgs[0], 10, 64)
if err != nil {
fmt.Println("userId err:", err)
break
}
requestId := inputArgs[1]
if _, err := writer.WriteString(strconv.FormatUint(generateRowId(uint32(userId), requestId), 10) + "\n"); err != nil {
fmt.Println("Write err:", err)
break
}
if err := writer.Flush(); err != nil {
fmt.Println("Flush err:", err)
break
}
}
}
这里的函数其实很简单,就是实现通过用户 ID 和请求 ID 计算生成记录 ID,入参是两个,返回值是 UInt64 类型。
对于输入来说如果是多条记录我们按照 \n
区分换行,这是标准输入通用的规范,ClickHouse 也是按照这种方式给我们写入的。那么对于同一条记录而言,这里我们选择通过 Tab 方式分割参数,具体的格式在 ClickHouse 这里是可配置的,两边要对应起来。
最后我们返回的结果也要按照 \n
为分割一一对应返回,并且每条结果要立即刷新缓冲,防止 ClickHouse 的等待。
在程序运行中出现任何错误,都会输出错误并退出,这样 ClickHouse 解析结果也会失败,可以直接将错误提示出来,SQL 执行也会中断。
写好代码之后我们编译成可执行文件并且放到 ClickHouse 数据目录下的 user_scripts
下,这样后面 ClickHouse 会自动去这个目录下查找并且调用,然后我们在配置文件目录下创建新的文件 generate_row_id_function.xml
,文件内容如下:
<functions>
<function>
<type>executable</type>
<name>generate_row_id</name>
<return_type>UInt64</return_type>
<argument>
<type>UInt32</type>
<name>user_id</name>
</argument>
<argument>
<type>String</type>
<name>request_id</name>
</argument>
<format>TabSeparated</format>
<command>go-clickhouse-udf</command>
</function>
</functions>
其中 name
定义函数名称,return_type
定义返回的类型,argument
定义参数类型和名称,多个参数依次往下写即可。format
标签定义标准输入的参数拼接格式,这里采用 Tab 分割的方式。最后 command
就是我们调用可执行文件的命令了。
需要注意函数配置文件的定义要以 _function.xml
结尾,因为在主配置文件中定义的引用外部函数文件的配置像下面这样:
<user_defined_executable_functions_config>*_function.xml</user_defined_executable_functions_config>
上面的配置保存后即可生效,然后就可以用于 SQL 查询了。
如果要查询分布式表的话必须在每个节点都防止可执行文件并且创建上面的配置,否则执行时会报错函数找不到,因为这个和 SQL UDF 不同,函数的逻辑是封装在外部的,ClickHouse 对细节不了解,所以为了防止出现问题正常配置时我们所有节点都要配置一遍。