<acronym id="s8ci2"><small id="s8ci2"></small></acronym>
<rt id="s8ci2"></rt><rt id="s8ci2"><optgroup id="s8ci2"></optgroup></rt>
<acronym id="s8ci2"></acronym>
<acronym id="s8ci2"><center id="s8ci2"></center></acronym>
0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

如何用Rust過程宏魔法簡化SQL函數呢?

jf_wN0SrCdH ? 來源:RisingWave 社區 ? 2024-01-23 09:43 ? 次閱讀

背景介紹

#[function("length(varchar)->int4")]
pubfnchar_length(s:&str)->i32{
s.chars().count()asi32
}

這是 RisingWave 中一個 SQL 函數的實現。只需短短幾行代碼,通過在 Rust 函數上加一行過程宏,我們就把它包裝成了一個 SQL 函數。

dev=>selectlength('RisingWave');
length
--------
11
(1row)

類似的,除了標量函數(Scalar Function),表函數(Table Function)和聚合函數(Aggregate Function)也可以用這樣的方法定義。我們甚至可以利用泛型來同時定義多種類型的重載函數:

#[function("generate_series(int4,int4)->setofint4")]
#[function("generate_series(int8,int8)->setofint8")]
fngenerate_series(start:T,stop:T)->implIterator{
start..=stop
}

#[aggregate("max(int2)->int2",state="ref")]
#[aggregate("max(int4)->int4",state="ref")]
#[aggregate("max(int8)->int8",state="ref")]
fnmax(state:T,input:T)->T{
state.max(input)
}
dev=>selectgenerate_series(1,3);
generate_series
-----------------
1
2
3
(3rows)

dev=>selectmax(x)fromgenerate_series(1,3)t(x);
max
-----
3
(1row)

利用 Rust 過程宏,我們將函數實現背后的瑣碎細節隱藏起來,向開發者暴露一個干凈簡潔的接口。這樣我們便能夠專注于函數本身邏輯的實現,從而大幅提高開發和維護的效率。

而當一個接口足夠簡單,簡單到連 ChatGPT 都可以理解時,讓 AI 幫我們寫代碼就不再是天方夜譚了。(警告:AI 會自信地寫出 Bug,使用前需要人工 review)

ab4cffee-b938-11ee-8b88-92fbcf53809c.png

ab53644c-b938-11ee-8b88-92fbcf53809c.png

向 GPT 展示一個 SQL 函數實現的例子,然后給出一個新函數的文檔,讓他生成完整的 Rust 實現代碼。

在本文中,我們將深度解析 RisingWave 中 #[function] 過程宏的設計目標和工作原理。通過回答以下幾個問題揭開過程宏的魔法面紗:

函數執行的過程是怎樣的?

為什么選擇使用過程宏實現?

這個宏是如何展開的?生成了怎樣的代碼?

利用過程宏還能實現哪些高級需求?

1向量化計算模型

RisingWave 是一個支持 SQL 語言的流處理引擎。在內部處理數據時,它使用基于列式內存存儲的向量化計算模型。在這種模型下,一個表(Table)的數據按列分割,每一列的數據連續存儲在一個數組(Array)中。為了便于理解,本文中我們采用列式內存的行業標準 Apache Arrow 格式作為示例。下圖是其中一批數據(RecordBatch)的內存結構,RisingWave 的列存結構與之大同小異。

ab5d2b76-b938-11ee-8b88-92fbcf53809c.png

列式內存存儲的數據結構

在函數求值時,我們首先把每個輸入參數對應的數據列合并成一個 RecordBatch,然后依次讀取每一行的數據,作為參數調用函數,最后將函數返回值壓縮成一個數組,作為最終返回結果。這種一次處理一批數據的方式就是向量化計算。

ab68ae60-b938-11ee-8b88-92fbcf53809c.png

函數的向量化求值 之所以要這么折騰一圈做列式存儲、向量化求值,本質上還是因為批處理能夠均攤掉控制邏輯的開銷,并充分利用現代 CPU 中的緩存局部性和 SIMD 指令等特性,實現更高的訪存和計算性能。

我們將上述函數求值過程抽象成一個 Rust trait,大概長這樣:

pubtraitScalarFunction{
///Callthefunctiononeachrowandreturnresultsasanarray.
fneval(&self,input:&RecordBatch)->Result;
}

在實際查詢中,多個函數嵌套組合成一個表達式。例如表達式 a + b - c等價于 sub(add(a, b), c)。對表達式求值就相當于遞歸地對多個函數進行求值。這個表達式本身也可以看作一個函數,同樣適用上面的 trait。因此本文中我們不區分表達式和標量函數。

2表達式執行的黑白魔法:類型體操 vs 代碼生成

接下來我們討論在 Rust 語言中如何具體實現表達式向量化求值。

2.1 我們要實現什么

回顧上一節中提到的求值過程,寫成代碼的整體結構是這樣的:

//首先定義好對每行數據的求值函數
fnadd(a:i32,b:i32)->i32{
a+b
}

//對于每一種函數,我們需要定義一個struct
structAdd;

//并為之實現ScalarFunctiontrait
implScalarFunctionforAdd{
//在此方法中實現向量化批處理
fneval(&self,input:&RecordBatch)->Result{
//我們拿到一個RecordBatch,里面包含了若干列,每一列對應一個輸入參數
//此時我們拿到的列是Arc,也就是一個**類型擦除**的數組
leta0:Arc=input.columns(0);
leta1:Arc=input.columns(1);

//我們可以獲取每一列的數據類型,并驗證它符合函數的要求
ensure!(a0.data_type()==DataType::Int32);
ensure!(a1.data_type()==DataType::Int32);

//然后將它們downcast到具體的數組類型
leta0:&Int32Array=a0.as_any().downcast_ref().context("typemismatch")?;
leta1:&Int32Array=a1.as_any().downcast_ref().context("typemismatch")?;

//在求值前,我們還需要準備好一個arraybuilder存儲返回值
letmutbuilder=Int32Builder::with_capacity(input.num_rows());

//此時我們就可以通過.iter()來遍歷具體的元素了
for(v0,v1)ina0.iter().zip(a1.iter()){
//這里我們拿到的v0和v1是Option類型
//對于add函數來說
letres=match(v0,v1){
//只有當所有輸入都非空時,函數才會被計算
(Some(v0),Some(v1))=>Some(add(v0,v1)),
//而任何一個輸入為空會導致輸出也為空
_=>None,
};
//最后將結果存入arraybuilder
builder.append_option(res);
}
//返回結果array
Ok(Arc::new(builder.finish()))
}
}

我們發現,這個函數本體的邏輯只需要短短一個 fn 就可以描述:

fnadd(a:i32,b:i32)->i32{
a+b
}

然而,為了支持在列存上進行向量化計算,還需要實現后面這一大段樣板代碼來處理瑣碎邏輯。有什么辦法能自動生成這坨代碼呢?

2.2 類型體操

著名數據庫專家遲先生曾在博文「數據庫表達式執行的黑魔法:用 Rust 做類型體操[1]」中討論了各種可能的解決方法,包括:

基于 trait 的泛型

聲明宏

過程宏

外部代碼生成器

并且系統性地闡述了它們的關系和工程實現中的利弊:

ab70417a-b938-11ee-8b88-92fbcf53809c.png

從方法論的角度來講,一旦開發者在某個需要使用泛型的地方使用了宏展開,調用它的代碼就不可能再通過 trait-based generics 使用這段代碼。從這個角度來說,越是“大道至簡”的生成代碼,越難維護。但反過來說,如果要完全實現 trait-based generics,往往要和編譯器斗智斗勇,就算是通過編譯也需要花掉大量的時間。

我們首先來看基于 trait 泛型的解決方案。在 arrow-rs 中有一個名為 binary[2] 的 kernel 就是做這個的:給定一個二元標量函數,將其應用于兩個 array 進行向量化計算,并生成一個新的 array。它的函數簽名如下:

pubfnbinary(
a:&PrimitiveArray,
b:&PrimitiveArray,
op:F
)->Result,ArrowError>
where
A:ArrowPrimitiveType,
B:ArrowPrimitiveType,
O:ArrowPrimitiveType,
F:Fn(::Native,::Native)->::Native,

相信你已經開始感受到「類型體操」的味道了。盡管如此,它依然有以下這些局限:

支持的類型僅限于 PrimitiveArray ,也就是 int, float, decimal 等基礎類型。對于復雜類型,如 bytes, string, list, struct,因為沒有統一到一個 trait 下,所以每種都需要一個新的函數。

僅適用于兩個參數的函數。對于一個或更多參數,每一種都需要這樣一個函數。arrow-rs 中也只內置了 unary 和 binary 兩種 kernel。

僅適用于一種標量函數簽名,即不出錯的、不接受空值的函數??紤]其它各種可能的情況下,需要有不同的 F 定義:

fnadd(i32,i32)->i32;
fnchecked_add(i32,i32)->Result;
fnoptional_add(i32,Option)->Option;

如果考慮以上三種因素的結合,那么可能的組合無窮盡也,不可能覆蓋所有的函數類型。

2.3 類型體操 + 聲明宏

在文章《類型體操》及 RisingWave 的初版實現中,作者使用 泛型 + 聲明宏 的方法部分解決了以上問題:

1. 首先設計一套精妙的類型系統,將全部類型統一到一個 trait 下,解決了第一個問題。 ab88452c-b938-11ee-8b88-92fbcf53809c.png

2. 然后,使用聲明宏來生成多種類型的 kernel 函數。覆蓋常見的 1、2、3 個參數,以及 T 和 Option 的輸入輸出組合。生成了常用的 unary binary ternary unary_nullable unary_bytes 等 kernel,部分解決了第二三個問題。(具體實現參見 RisingWave 早期代碼[3])當然,這里理論上也可以繼續使用類型體操。例如,引入 trait 統一 (A,) (A, B) (A, B, C) ,用 Into, AsRef trait 統一 T, Option, Result等。只不過,大概率迎接我們的是 rustc 帶來的一點小小的類型震撼:)

3. 最后,這些 kernel 沒有解決類型動態 downcast 的問題。為此,作者又利用聲明宏設計了一套精妙的宏套宏機制來實現動態派發。

macro_rules!for_all_cmp_combinations{
($macro:tt$(,$x:tt)*)=>{
$macro!{
[$($x),*],
//comparisonacrossintegertypes
{int16,int32,int32},
{int32,int16,int32},
{int16,int64,int64},
//...

盡管解決了一些問題,但這套方案依然有它的痛點:

基于 trait 做類型體操使我們不可避免地陷入到與 Rust 編譯器斗智斗勇之中。

依然沒有全面覆蓋所有可能情況。有相當一部分函數仍然需要開發者手寫向量化實現。

性能。當我們需要引入 SIMD 對部分函數進行優化時,需要重新實現一套 kernel 函數。

沒有對開發者隱藏全部細節。函數開發者依然需要先熟悉類型體操和聲明宏的工作原理,才能比較流暢地添加函數。

究其原因,我認為是函數的變體形式過于復雜,而 Rust 的 trait 和聲明宏系統的靈活性不足導致的。本質上是一種編程能力不夠強大的表現。

2.4 元編程?

讓我們來看看其他語言和框架是怎么解決這個問題的。

首先是 Python,一種靈活的動態類型語言。這是 Flink 中的 Python UDF 接口,其它大數據系統的接口也大同小異:

@udf(result_type='BIGINT')
defadd(i,j):
returni+j

我們發現它是用 @udf 這個裝飾器標記了函數的簽名信息,然后在運行時對不同類型進行相應的處理。當然,由于它本身是動態類型,因此 Rust 中的很多問題在 Python 中根本不存在,代價則是性能損失。

接下來是 Java,它是一種靜態類型語言,但通過虛擬機 JIT 運行。這是 Flink 中的 Java UDF 接口:

publicstaticclassSubstringFunctionextendsScalarFunction{
publicStringeval(Strings,Integerbegin,Integerend){
returns.substring(begin,end);
}
}

可以看到同樣也很短。這次甚至不需要額外標記類型了,因為靜態類型系統本身就包含了類型信息。我們可以通過運行時反射拿到類型信息,并通過 JIT 機制在運行時生成高效的強類型代碼,兼具靈活與性能。

最后是 Zig,一種新時代的 C 語言。它最大的特色是任何代碼都可以加上 comptime 關鍵字在編譯時運行,因此具備非常強的元編程能力。tygg 在博文「Zig lang 初體驗 -- 『大道至簡』的 comptime[4]」中演示了用 Zig 實現遲先生類型體操的方法:通過 編譯期反射 和 過程式的代碼生成 來代替開發者完成類型體操。

用一張表總結一下:

語言 類型反射 代碼生成 靈活性 性能
Python 運行時
Java 運行時 運行時
Zig 編譯時 編譯時
Rust (trait + macro_rules) 編譯時

可以發現,Zig 語言強大的元編程能力提供了相對最好的解決方案。

2.5 過程宏

那么 Rust 里面有沒有類似 Zig 的特性呢。其實是有的,那就是過程宏(Procedural Macros)。它可以在編譯期動態執行任何 Rust 代碼來修改 Rust 程序本身。只不過,它的編譯時和運行時代碼是物理分開的,相比 Zig 的體驗沒有那么統一,但是效果幾乎一樣。

參考 Python UDF 的接口設計,我們便得到了 ”大道至簡“ 的 Rust 函數接口:

#[function("add(int,int)->int")]
fnadd(a:i32,b:i32)->i32{
a+b
}

從用戶的角度看,他只需要在自己熟悉的 Rust 函數上面標一個函數簽名。其它的類型體操和代碼生成操作都被隱藏在過程宏之后,完全無需關心。

此時我們已經拿到了一個函數所必須的全部信息,接下來我們將看到過程宏如何生成向量化執行所需的樣板代碼。

3展開 #[function]

3.1 解析函數簽名

首先我們要實現類型反射,也就是分別解析 SQL 函數和 Rust 函數的簽名,以此決定后面如何生成代碼。在過程宏入口處我們會拿到兩個 TokenStream,分別包含了標注信息和函數本體:

#[proc_macro_attribute]
pubfnfunction(attr:TokenStream,item:TokenStream)->TokenStream{
//attr:"add(int,int)->int"
//item:fnadd(a:i32,b:i32)->i32{a+b}
...
}

我們使用 syn 庫將 TokenStream 轉為 AST,然后:

解析 SQL 函數簽名字符串,獲取函數名、輸入輸出類型等信息。

解析 Rust 函數簽名,獲取函數名、每個參數和返回值的類型模式、是否 async 等信息。

具體地:

對于參數類型,我們確定它是 T 或者 Option。

對于返回值類型,我們將其識別為:T,Option,Result ,Result> 四種類型之一。

這將決定我們后面如何調用函數以及處理錯誤。

3.2 定義類型表

作為 trait 類型體操的代替方案,我們在過程宏中定義了這樣一張類型表,來描述類型系統之間的對應關系,并且提供了相應的查詢函數。

//nameprimitivearrayprefixdatatype
constTYPE_MATRIX:&str="
void_NullNull
boolean_BooleanBoolean
smallintyInt16Int16
intyInt32Int32
bigintyInt64Int64
realyFloat32Float32
floatyFloat64Float64
...
varchar_StringUtf8
bytea_BinaryBinary
array_ListList
struct_StructStruct
";

比如當我們拿到用戶的函數簽名后,

#[function("length(varchar)->int")]

查表即可得知:

第一個參數 varchar 對應的 array 類型為 StringArray

返回值 int 對應的數據類型為 DataType::Int32,對應的 Builder 類型為 Int32Builder

并非所有輸入輸出均為 primitive 類型,因此無法進行 SIMD 優化

在下面的代碼生成中,這些類型將被填入到對應的位置。

3.3 生成求值代碼

在代碼生成階段,我們主要使用 quote 庫來生成并組合代碼片段。最終生成的代碼整體結構如下:

quote!{
struct#struct_name;
implScalarFunctionfor#struct_name{
fneval(&self,input:&RecordBatch)->Result{
#downcast_arrays
letmutbuilder=#builder;
#eval
Ok(Arc::new(builder.finish()))
}
}
}

下面我們來逐個填寫代碼片段,首先是 downcast 輸入 array:

letchildren_indices=(0..self.args.len());
letarrays=children_indices.map(|i|format_ident!("a{i}"));
letarg_arrays=children_indices.map(|i|format_ident!("{}",types::array_type(&self.args[*i])));

letdowncast_arrays=quote!{
#(
let#arrays:&#arg_arrays=input.column(#children_indices).as_any().downcast_ref()
.ok_or_else(||ArrowError::CastError(...))?;
)*
};

builder:

letbuilder_type=format_ident!("{}",types::array_builder_type(ty));
letbuilder=quote!{#builder_type::with_capacity(input.num_rows())};

接下來是最關鍵的執行部分,我們先寫出函數調用的那一行:

letinputs=children_indices.map(|i|format_ident!("i{i}"));
letoutput=quote!{#user_fn_name(#(#inputs,)*)};
//example:add(i0,i1)

然后考慮:這個表達式返回了什么類型呢?這需要根據 Rust 函數簽名決定,它可能包含 Option,也可能包含 Result。我們進行錯誤處理,然后將其歸一化到 Option 類型:

letoutput=matchuser_fn.return_type_kind{
T=>quote!{Some(#output)},
Option=>quote!{#output},
Result=>quote!{Some(#output?)},
ResultOption=>quote!{#output?},
};
//example:Some(add(i0,i1))

下面考慮:這個函數接收什么樣的類型作為輸入?這同樣需要根據 Rust 函數簽名決定,每個參數可能是或不是 Option。如果函數不接受 Option 輸入,但實際輸入的卻是 null,那么我們默認它的返回值就是 null,此時無需調用函數。因此,我們使用 match 語句來對輸入參數做預處理:

letsome_inputs=inputs.iter()
.zip(user_fn.arg_is_option.iter())
.map(|(input,opt)|{
if*opt{
quote!{#input}
}else{
quote!{Some(#input)}
}
});
letoutput=quote!{
//這里的inputs是從array中拿出來的Option
match(#(#inputs,)*){
//我們將部分參數unwrap后再喂給函數
(#(#some_inputs,)*)=>#output,
//如有unwrap失敗則直接返回null
_=>None,
}
};
//example:
//match(i0,i1){
//(Some(i0),Some(i1))=>Some(add(i0,i1)),
//_=>None,
//}

此時我們已經拿到了一行的返回值,可以將它 append 到 builder 中:

letappend_output=quote!{builder.append_option(#output);};

最后在外面套一層循環,對輸入逐行操作:

leteval=quote!{
for(i,(#(#inputs,)*))inmultizip((#(#arrays.iter(),)*)).enumerate(){
#append_output
}
};

如果一切順利的話,過程宏展開生成的代碼將如 2.1 節中所示的那樣。

3.4 函數注冊

到此為止我們已經完成了最核心、最困難的部分,即生成向量化求值代碼。但是,用戶該怎么使用生成的代碼呢?

注意到一開始我們生成了一個 struct。因此,我們可以允許用戶指定這個 struct 的名稱,或者定義一套規范自動生成唯一的名稱。這樣用戶就能在這個 struct 上調用函數了。

//指定生成名為Add的struct
#[function("add(int,int)->int",output="Add")]
fnadd(a:i32,b:i32)->i32{
a+b
}

//調用生成的向量化求值函數
letinput:RecordBatch=...;
letoutput:RecordBatch=Add.eval(&input).unwrap();

不過在實際場景中,很少有這種使用特定函數的需求。更多是在項目中定義很多函數,然后在解析 SQL 查詢時,動態地查找匹配的函數。為此我們需要一種全局的函數注冊和查找機制。

問題來了:Rust 本身沒有反射機制,如何在運行時獲取所有由 #[function] 靜態定義的函數呢?

答案是:利用程序的鏈接時(link time)特性,將函數指針等元信息放入特定的 section 中。程序鏈接時,鏈接器(linker)會自動收集分布在各處的符號(symbol)集中在一起。程序運行時即可掃描這個 section 獲取全部函數了。

Rust 社區的 dtolnay 大佬為此需求做了兩個開箱即用的庫:linkme[5] 和 inventory[6]。其中前者是直接利用上述機制,后者是利用 C 標準的 constructor 初始化函數,但背后的原理沒有本質區別。下面我們以 linkme 為例來演示如何實現注冊機制。

首先我們需要在公共庫(而不是 proc-macro)中定義函數簽名的結構:

pubstructFunctionSignature{
pubname:String,
pubarg_types:Vec,
pubreturn_type:DataType,
pubfunction:Box,
}

然后定義一個全局變量 REGISTRY 作為注冊中心。它會在第一次被訪問時利用 linkme 將所有 #[function] 定義的函數收集到一個 HashMap 中:

///Acollectionofdistributed`#[function]`signatures.
#[linkme::distributed_slice]
pubstaticSIGNATURES:[fn()->FunctionSignature];

lazy_static::lazy_static!{
///Globalfunctionregistry.
pubstaticrefREGISTRY:FunctionRegistry={
letmutsignatures=HashMap::>::new();
forsiginSIGNATURES{
letsig=sig();
signatures.entry(sig.name.clone()).or_default().push(sig);
}
FunctionRegistry{signatures}
};
}

最后在 #[function] 過程宏中,我們為每個函數生成如下代碼:

#[linkme::distributed_slice(SIGNATURES)]
fn#sig_name()->FunctionSignature{
FunctionSignature{
name:#name.into(),
arg_types:vec![#(#args),*],
return_type:#ret,
//這里#struct_name就是我們之前生成的函數結構體
function:Box::new(#struct_name),
}
}

如此一來,用戶就可以通過 FunctionRegistry 提供的方法動態查找函數并進行求值了:

letgcd=REGISTRY.get("gcd",&[Int32,Int32],&Int32);
letoutput:RecordBatch=gcd.function.eval(&input).unwrap();

3.5 小結

以上我們完整闡述了 #[function] 過程宏的工作原理和實現過程:

使用 syn 庫解析函數簽名

使用 quote 庫生成定制化的向量化求值代碼

使用 linkme 庫實現函數的全局注冊和動態查找

其中:

SQL 簽名決定了如何從 input array 中讀取數據,如何生成 output array

Rust 簽名決定了如何調用用戶的 Rust 函數,如何處理空值和錯誤

類型查找表決定了 SQL 類型和 Rust 類型的映射關系

相比 trait + 聲明宏的解決方案,過程宏中的 “過程式” 風格為我們提供了極大的靈活性,一攬子解決了之前提到的全部問題。在下一章中,我們將會在這個框架的基礎上繼續擴展,解決更多實際場景下的復雜需求。

4高級功能

抽象的問題是簡單的,但現實的需求是復雜的。上面的原型看似解決了所有問題,但在 RisingWave 的實際工程開發中,我們遇到了各種稀奇古怪的需求,都無法用最原始的 #[function] 宏實現。下面我們來逐一介紹這些問題,并利用過程宏的靈活性見招拆招。

4.1 支持多類型重載

有些函數支持大量不同類型的重載,例如 + 運算對幾乎支持所有數字類型。此時我們一般會復用同一個泛型函數,然后用不同的類型去實例化它。

#[function("add(*int,*int)->auto")]
#[function("add(*float,*float)->auto")]
#[function("add(decimal,decimal)->decimal")]
#[function("add(interval,interval)->interval")]
fnadd(l:T1,r:T2)->Result
where
T1:Into+Debug,
T2:Into+Debug,
T3:CheckedAdd,
{
a.into().checked_add(b.into()).ok_or(ExprError::NumericOutOfRange)
}

因此我們支持在同一個函數上同時標記多個#[function] 宏。此外,我們還支持使用類型通配符將一個#[function] 自動展開成多個,并使用 auto 自動推斷返回類型。例如 *int 通配符表示全部整數類型 int2, int4, int8,那么 add(*int, *int) 將展開為 3 x 3 = 9 種整數的組合,返回值自動推斷為兩種類型中最大的一個:

#[function("add(int2,int2)->int2")]
#[function("add(int2,int4)->int4")]
#[function("add(int2,int8)->int8")]
#[function("add(int4,int4)->int4")]
...

而如果泛型不能滿足一些特殊類型的要求,你也完全可以定義新函數進行特化(specialization):

#[function("add(interval,timestamp)->timestamp")]
fninterval_timestamp_add(l:Interval,r:Timestamp)->Result{
r.checked_add(l).ok_or(ExprError::NumericOutOfRange)
}

這一特性幫助我們快速實現函數重載,同時避免了冗余代碼。

4.2 自動 SIMD 優化

作為零開銷抽象語言,Rust 從不向性能妥協,#[function] 宏也是如此。對于很多簡單函數,理論上可以利用 CPU 內置的 SIMD 指令實現上百倍的性能提升。然而,編譯器往往只能對簡單的循環結構實現自動 SIMD 向量化。一旦循環中出現分支跳轉等復雜結構,自動向量化就會失效。

//簡單循環支持自動向量化
assert_eq!(a.len(),n);
assert_eq!(b.len(),n);
assert_eq!(c.len(),n);
foriin0..n{
c[i]=a[i]+b[i];
}

//一旦出現分支結構,如錯誤處理、越界檢查等,自動向量化就會失效
foriin0..n{
c.push(a[i].checked_add(b[i])?);
}

不幸的是,我們前文中生成的代碼結構并不利于編譯器進行自動向量化,因為循環中的 builder.append_option() 操作本身就自帶條件分支。

為了支持自動向量化,我們需要對代碼生成邏輯進一步特化:

首先根據函數簽名判斷這個函數能否實現 SIMD 優化。這需要滿足以下兩個主要條件:

比如:

#[function("equal(int,int)->boolean")]
fnequal(a:i32,b:i32)->bool{
a==b
}

所有輸入輸出類型均為基礎類型,即 boolean, int, float, decimal

Rust 函數的輸入類型均不含 Option,輸出不含 Option 和 Result

一旦上述條件滿足,我們會對 #eval 代碼段進行特化,將其替換為這樣的代碼,調用 arrow-rs 內置的 unary 和 binary kernel 實現自動向量化:

//SIMDoptimizationforprimitivetypes
matchself.args.len(){
0=>quote!{
letc=#ret_array_type::from_iter_values(
std::repeat_with(||#user_fn_name()).take(input.num_rows())
);
letarray=Arc::new(c);
},
1=>quote!{
letc:#ret_array_type=arrow_arith::unary(a0,#user_fn_name);
letarray=Arc::new(c);
},
2=>quote!{
letc:#ret_array_type=arrow_arith::binary(a0,a1,#user_fn_name)?;
letarray=Arc::new(c);
},
n=>todo!("SIMDoptimizationfor{n}arguments"),
}

需要注意,如果用戶函數本身包含分支結構,那么自動向量化也是無效的。我們只是盡力為編譯器創造了實現優化的條件。另一方面,這一優化也不是完全安全的,它會使得原本為 null 的輸入強制執行。例如整數除法 a / b,如果 b 為 null,原本不會執行,現在卻會執行 a / 0,導致除零異常而崩潰。這種情況下我們只能修改函數簽名,避免生成特化代碼。

整體而言,實現這一功能后,用戶編寫代碼不需要有任何變化,但是部分函數的性能得到了大幅提高。這對于高性能數據處理系統而言是必須的。

4.3 返回字符串直接寫入 buffer

很多函數會返回字符串。但是樸素地返回 String 會導致大量動態內存分配,降低性能。

#[function("concat(varchar,varchar)->varchar")]
fnconcat(left:&str,right:&str)->String{
format!("{left}{right}")
}

注意到列式內存存儲中,StringArray 實際上是把多個字符串存放在一段連續的內存上,構建這個數組的 StringBuilder 實際上也只是將字符串追加寫入同一個 buffer 里。因此函數返回 String 是沒有必要的,它可以直接將字符串寫入 StringBuilder 的 buffer 中。

于是我們支持對返回字符串的函數添加一個 &mut Write 類型的 writer 參數。內部可以直接用 write! 方法向 writer 寫入返回值。

#[function("concat(varchar,varchar)->varchar")]
fnconcat(left:&str,right:&str,writer:&mutimplstd::Write){
writer.write_str(left).unwrap();
writer.write_str(right).unwrap();
}

在過程宏的實現中,我們主要修改了函數調用部分:

letwriter=user_fn.write.then(||quote!{&mutbuilder,});
letoutput=quote!{#user_fn_name(#(#inputs,)*#writer)};

以及特化 append_output 的邏輯:

letappend_output=ifuser_fn.write{
quote!{{
if#output.is_some(){//返回值直接在這行寫入builder
builder.append_value("");
}else{
builder.append_null();
}
}}
}else{
quote!{builder.append_option(#output);}
};

經過測試,這一功能也可以大幅提升字符串處理函數的性能。

4.4 常量預處理優化

有些函數的某個參數往往是一個常量,并且這個常量需要經過一個開銷較大的預處理過程。這類函數的典型代表是正則表達式匹配:

//regexp_like(source,pattern)
#[function("regexp_like(varchar,varchar)->boolean")]
fnregexp_like(text:&str,pattern:&str)->Result{
letregex=regex::new(pattern)?;//預處理:編譯正則表達式
Ok(regex.is_match(text))
}

對于一次向量化求值來說,如果輸入的 pattern 是常數(very likely),那么其實只需要編譯一次,然后用編譯后的數據結構對每一行文本進行匹配即可。但如果不是常數(unlikely,但是合法行為),則需要對每一行 pattern 編譯一次再執行。

為了支持這一需求,我們修改用戶接口,將特定參數的預處理過程提取到過程宏中,然后把預處理后的類型作為參數:

#[function(
"regexp_like(varchar,varchar)->boolean",
prebuild="Regex::new($1)?"http://$1表示第一個參數(下標從0開始)
)]
fnregexp_like(text:&str,regex:&Regex)->bool{
regex.is_match(text)
}

這樣,過程宏可以對這個函數生成兩個版本的代碼:

如果指定參數為常量,那么在構造函數中執行 prebuild 代碼,并將生成的 Regex 中間值存放在 struct 當中,在求值階段直接傳入函數。

如果不是常量,那么在求值階段將 prebuild 代碼嵌入到函數參數的位置上。

至于具體的代碼生成邏輯,由于細節相當復雜,這里就不再展開介紹了。

總之,這一優化保證了此類函數各種輸入下都具有最優性能,并且極大簡化了手工實現的復雜性。

4.5 表函數

最后,我們來看表函數(Table Function,Postgres 中也稱 Set-returning Funcion,返回集合的函數)。這類函數的返回值不再是一行,而是多行。如果同時返回多列,那么就相當于返回一個表。

select*fromgenerate_series(1,3);
generate_series
-----------------
1
2
3

對應到常見的編程語言中,實際是一個生成器函數(Generator)。以 Python 為例,可以寫成這樣:

defgenerate_series(start,end):
foriinrange(start,end+1):
yieldi

Rust 語言目前在 nightly 版本支持生成器,但這一特性尚未 stable。不過如果不用 yield 語法的話,我們可以利用 RPIT 特性實現返回迭代器的函數,以達到同樣的效果:

#[function("generate_series(int,int)->setofint")]
fngenerate_series(start:i32,stop:i32)->implIterator{
start..=stop
}

我們支持在 #[function] 簽名中使用 -> setof 以聲明一個表函數。它修飾的 Rust 函數必須返回一個 impl Iterator,其中的 Item 需要匹配返回類型。當然,Iterator 的內外都可以包含 Option 或 Result。

在對表函數進行向量化求值時,我們會對每一行輸入調用生成器函數,然后將每一行返回的多行結果串聯起來,最后按照固定的 chunk size 進行切割,依次返回多個 RecordBatch。因此表函數的向量化接口長這個樣子:

pubtraitTableFunction{
fneval(&self,input:&RecordBatch,chunk_size:usize)
->Result>>>;
}

我們給出一組 generate_series 的輸入輸出樣例(假設 chunk size = 2):

inputoutput
+-------+------++-----+-----------------+
|start|stop||row|generate_series|
+-------+------++-----+-----------------+
|0|0|---->|0|0|
|||+->|2|0|
|0|2|--++-----+-----------------+
+-------+------+|2|1|
|2|2|
+-----+-----------------+

由于表函數的輸入輸出不再具有一對一的關系,我們在 output 中會額外生成一列row來表示每一行輸出對應 input 中的哪一行輸入。這一關系信息會在某些 SQL 查詢中被使用到。

回到#[function]宏的實現,它為表函數生成的代碼實際上也是一個生成器。我們在內部使用了futures_async_stream[7]提供的#[try_stream]宏實現 async generator(它依賴 nightly 的 generator 特性),在 stable 版本中則使用genawaiter[8]代替。之所以要使用生成器,則是因為一個表函數可能會生成非常長的結果(例如generate_series(0, 1000000000)),中途必須把控制權交還調用者,才能保證系統不被卡死。感興趣的讀者可以思考一下:如果沒有 generator 機制,高效的向量化表函數求值能否實現?如何實現?

說到這里,多扯兩句。genawaiter 也是個很有意思的庫,它使用 async-await 機制來在 stable Rust 中實現 generator。我們知道 async-await 本質上也是一種 generator,它們都依賴編譯器的 CPS 變換實現狀態機。不過出于對異步編程的強烈需求,async-await 很早就被穩定化,而 generator 卻遲遲沒有穩定。由于背后的原理相通,它們可以互相實現。 此外,目前 Rust 社區正在積極推動 async generator 的進展,原生的async gen[9]和for await[10]語法剛剛在上個月進入 nightly。不過由于沒有和 futures 生態對接,整體依然處于不可用狀態。RisingWave 的流處理引擎就深度依賴 async generator 機制實現流算子,以簡化異步 IO 下的流狀態管理。不過這又是一個龐大的話題,之后有機會再來介紹這方面的應用吧。

5總結

由于篇幅所限,我們只能展開這么多了。如你所見,一個簡單的函數求值背后,隱藏著非常多的設計和實現細節:

為了高性能,我們選擇列式內存存儲和向量化求值。

存儲數據的容器通常是類型擦除的結構。但 Rust 是一門靜態類型語言,用戶定義的函數是強類型的簽名。這意味著我們需要在編譯期確定每一個容器的具體類型,做類型體操來處理不同類型之間的轉換,準確地把數據從容器中取出來喂給函數,最后高效地將函數吐出來的結果打包回數據容器中。

為了將上述過程隱藏起來,我們設計了#[function]過程宏在編譯期做類型反射和代碼生成,最終暴露給用戶一個盡可能簡單直觀的接口。

但是實際工程中存在各種復雜需求以及對性能的要求,我們必須持續在接口上打洞,并對代碼生成邏輯進行特化。幸好,過程宏具有非常強的靈活性,使得我們可以敏捷地應對變化的需求。

#[function]宏最初是為 RisingWave 內部函數實現的一套框架。最近,我們將它從 RisingWave 項目中獨立出來,基于 Apache Arrow 標準化成一套通用的用戶定義函數接口arrow-udf[11]。如果你的項目也在使用 arrow-rs 進行數據處理,現在可以直接使用這套#[function]宏定義自己的函數。如果你在使用 RisingWave,那么從這個月底發布的 1.7 版本起,你可以使用這個庫來定義 Rust UDF。它可以編譯成 WebAssembly 模塊插入到 RisingWave 中運行。感興趣的讀者也可以閱讀這個項目的源碼了解更多實現細節。

事實上,RisingWave 基于 Apache Arrow 構建了一整套用戶定義函數接口。此前,我們已經實現了服務器模式的 Python 和 Java UDF。最近,我們又基于 WebAssembly 實現了 Rust UDF,基于 QuickJS 實現了 JavaScript UDF。它們都可以嵌入到 RisingWave 中運行,以實現更好的性能和用戶體驗。





審核編輯:劉清

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • SQL
    SQL
    +關注

    關注

    1

    文章

    740

    瀏覽量

    43500
  • 生成器
    +關注

    關注

    7

    文章

    304

    瀏覽量

    20319
  • Rust
    +關注

    關注

    1

    文章

    224

    瀏覽量

    6402
  • ChatGPT
    +關注

    關注

    27

    文章

    1453

    瀏覽量

    5141

原文標題:用 Rust 過程宏魔法簡化 SQL 函數實現

文章出處:【微信號:Rust語言中文社區,微信公眾號:Rust語言中文社區】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    SQL儲存過程等的解密

    SQLSERVER2000存儲過程,函數,視圖,觸發器begin trandeclare @objectname1 varchar(100),@orgvarbin varbinary(8000)declare @sql
    發表于 12-31 16:54

    請教如何用SQL語句來壓縮ACCESS數據庫

    通過對ACCESS數據庫的“修復與壓縮”會使程序的運行更加穩定和提高運行速度?!埥倘?b class='flag-5'>何用SQL語句來壓縮ACCESS數據庫,只用SQL語句喲!謝謝!
    發表于 11-29 21:54

    C編程定義函數該如何修改?

    那么該如何修改?不希望寫成函數形式的,寫成定義形式的比較好。
    發表于 12-26 08:51

    怎樣用printf 函數和getchar 函數簡化STM32串口數據的傳輸

    printf 函數和getchar 函數有何功能?怎樣用printf 函數和getchar 函數簡化STM32串口數據的傳輸
    發表于 10-22 07:49

    何用Matlab去實現FFT函數和IFFT函數

    Matlab的FFT函數和IFFT函數有什么用法嗎?如何用Matlab去實現FFT函數和IFFT函數
    發表于 11-18 07:05

    何用 rust 語言開發 stm32

    本文介紹如何用 rust 語言開發 stm32。開發平臺為 linux(gentoo)。硬件準備本文使用的芯片為 STM32F103C8T6。該芯片性價比較高,價格低廉,適合入門學習。需要
    發表于 11-26 06:20

    何用__write函數替換掉原先的fputc函數

    何用__write函數替換掉原先的fputc函數?
    發表于 12-01 06:55

    怎樣去使用Rust進行嵌入式編程

    使用Rust進行嵌入式編程Use Rust for embedded development篇首語:Rust的高性能、可靠性和生產力使其適合于嵌入式系統。在過去的幾年里,Rust在程序
    發表于 12-22 07:20

    如何對gcc編譯過程中生成的進行調試

    如何對gcc編譯過程中生成的進行調試?有哪幾種形式?如何對一個函數進行gprof方式的剖析?
    發表于 12-24 07:53

    Rust代碼中加載靜態庫時,出現錯誤 ` rust-lld: error: undefined symbol: malloc `怎么解決?

    我正在 MCUXpresso IDE 中創建一個靜態庫。我正在使用 redlib 在我的代碼中導入 ` [i]stdlib.h`。它成功地構建了一個靜態庫。但是,靜態庫中未定義一些標準庫函數,例如
    發表于 06-09 08:44

    SQL參考手冊

    SQL 合計函數使用 SQL 合計函數 你可以確定數據組的各種統計。你可以把這些函數用于查詢和合計表達式,條件是在具備
    發表于 12-26 14:09 ?39次下載

    何用兩種不同的方法列寫雙容水槽傳遞函數

    何用兩種不同的方法列寫雙容水槽傳遞函數
    的頭像 發表于 03-10 16:20 ?2427次閱讀
    如<b class='flag-5'>何用</b>兩種不同的方法列寫雙容水槽傳遞<b class='flag-5'>函數</b>

    何用proc sql生成宏變量?

    上節我們講了PROC SQL的基本結構,以及一些sql命令的使用,這節我們主要講一下case...when...、order by 、group by 、update、delete語句以及如何用proc
    的頭像 發表于 05-19 16:13 ?1585次閱讀
    如<b class='flag-5'>何用</b>proc <b class='flag-5'>sql</b>生成宏變量?

    何用Rust通過JNI和Java進行交互

    近期工作中有Rust和Java互相調用需求,這篇文章主要介紹如何用Rust通過JNI和Java進行交互,還有記錄一下開發過程中遇到的一些坑。
    的頭像 發表于 10-17 11:41 ?426次閱讀

    sql中日期函數的用法

    日期函數SQL中是非常重要的功能之一,它們能幫助我們在數據庫中存儲和處理日期和時間數據。在本文中,我將詳細介紹一些常用的SQL日期函數,包括如何創建日期和時間數據、如何格式化和轉換日
    的頭像 發表于 11-17 16:24 ?476次閱讀
    亚洲欧美日韩精品久久_久久精品AⅤ无码中文_日本中文字幕有码在线播放_亚洲视频高清不卡在线观看
    <acronym id="s8ci2"><small id="s8ci2"></small></acronym>
    <rt id="s8ci2"></rt><rt id="s8ci2"><optgroup id="s8ci2"></optgroup></rt>
    <acronym id="s8ci2"></acronym>
    <acronym id="s8ci2"><center id="s8ci2"></center></acronym>