in 论文 分布式数据库 ~ read.

WindowFunctions Processing

Efficient Processing of Window Functions in Analytical SQL Queries Notes

Efficient Processing of Window Functions in Analytical SQL Queries

Viktor Leis, Kan Kundhikanjana, A. Kemper, Thomas Neumann Published 2015

论文目的是提出一个有效的通用的窗口算法,算法针对数据库进行了优化,在现代多核CPU上具有出色的性能。

Introduction

TPC-DS基准测试的99各查询中有9个使用窗口函数

TPC-DS,是用于评测决策支持系统(或数据仓库)的标准SQL测试集。这个测试集包含对大数据集的统计、报表生成、联机查询、数据挖掘等复杂应用,测试用的数据和值是有倾斜的,与真实数据一致。可以说TPC-DS是与真实场景非常接近的一个测试集,也是难度较大的一个测试集。

    -- 窗口函数
    select location, time, value, abs(value-
    (avg(value) over w))/(stddev(value) over w)
    from measurement
    window w as (
    partition by location
    order by time
    range between 5 preceding and 5 following)

    -- 低效率写法,关联子查询
    select location, time, value, abs(value-
        (select avg(value)
            from measurement m2
            where m2.time between m.time-5 and m.time+5
                and m.location = m2.location))
        / (select stddev(value)
            from measurement m3
            where m3.time between m.time-5 and m.time+5
                and m.location = m3.location)
    from measurement m

相关论文

Y . Cao, C.-Y . Chan, J. Li, and K.-L. Tan. Optimization of analytic window functions. PVLDB, 5(11):1244–1255, 2012.

通过避免不必要的排序或分区步骤来优化一个查询中出现的多个窗口函数。

简述

通过线段树,结合多核CPU特性,进行并行计算,从而实现优化。

展示了如何并行化算法的所有阶段,而不仅仅是分区间并行(最有效的方法,但不适用于所有查询)。

所以,算法实现速度很快,不仅对任意分区,甚至对于没有分区的,高倾斜度数据的也可以适用。

WINDOW FUNCTIONS IN SQL

SQLseq

  • 窗口函数在大多数其它字子句(group by, having, ...)之后进行计算,但在最终的 order by 和 distinct之前进行计算;
  • 窗口函数只会对输入数据额外计算附加属性,不会更改和过滤(只附加,不修改)。

 partititonOrderFrame

rowsRange

窗口操作符的核心和并行化策略

处理流程遵循语法结构,包括以下阶段:

  1. Partitioning: 使用 partition by 划分 partition

  2. Sorting: 使用order by 对每个 partition 进行排序

  3. Window function computation

    a. Compute window frame: 确定窗口的Frame

    b. Evaluate window function: 基于Frame计算窗口函数

Partitioning and Sorting

窗口函数经典的执行方式分为排序函数求值这 2 步。

排序两种传统方法:

  1. hash-based: 使用partition by的属性对所有输入的数据进行Hash,产生不同的分区, 之后在按照order by进行排序
  2. sort-based: 根据partition byorder by的属性进行排序,分区边界在窗口函数计算期间"on-the-fly"(实时确定)

n 行输入, $O(n)$个分区, 则hash-based的复杂度为 $O(n)$, sort-based的复杂度为 $O(nlogn)$

看起来 hash-based更可取,但sort-based实现更容易,因为无论如何都是需要进行排序的。

单线程执行时,需要用hash table进行完全分区;
在并行执行时,需要并发hash table;
但,dyncmic hash(冲突后Bucket成倍增长,数据重新放置) 相比于 unsynchronized hash 开销大得多;
不进行分区直接`sort-based`, 代价也很大;
为了实现高可伸缩性,和较低的开销,两者进行结合

动态哈希

Pre-Partitioning into Hash Groups

通过HashGroups预先划分Partition

不管输入数据有多少个分区,都将其划分进一个常量(eg:1024)HashGroups;

HashGroups的数量应该是$2^n$, 要大于线程的数量,又要足够小,以便充分利用CPUs。

Figure3

两个thread, 每个thread的HashGroups大小为4;

每个线程对自己的输入数据进行Hash分区;

当所有线程分区完成之后,将所有线程相对应的HashGroups复制到一个组合的数组中,即合并;

以上合并操作,可以在不同步的情况下进行,因为HashGroups的size和offset都是已知的(个人理解:另起线程,实时合并,因只需要保证数据进入相同的Bucket,无需按thread中数据顺序)

因为每个Bucket中不止一个partition,按照partition byorder by进行排序;

    思考:
    thread1/thread2 分别排序,而后合并时,进行归并
    先合并,后进行排序

结果,具有相同partition key的Tuple在一个HashGroup中相邻,尽管一个HashGroup中可能有多个partition key;

必要时,像sorted-base一样在计算窗口函数时,通过二分查找来确定partition的边界值;

Inter- and Intra-Partition Parallelism

分区间和分区内并行

数据库系统常用的策略:分区间并行计算;

这种方式对于分区的数量远小于线程的数量,或者分区数据严重倾斜时没有任何意义。需要支持intra-partition

只对大的HashGroups使用分区内并行, 如果有HashGroups足够满足线程数量,且HashGroup不是很大,此时分区间并行足够。

因为前一阶段已经划分完Partition,所以已经知道HashGroups的大小,依据HashGroup的Size/总数据量/线程数量(目的让计算均匀),可以动态将HashGroups分配给Inter- and Intra-Partition处理类。

In Figure 3, 只有HashGroup 11使用分区内并行(可采用归并排序), 其它Group使用分区间并行

分区内并行计算窗口函数,需要支持并行排序,更需要实际计算本身支持并行。

WINDOW FUNCTION EVALUATION

运算符的最后阶段(窗口函数表达式的求值)

Basci Algorithmic Structure

将不同的线程分配给HashGroup的不同的子集,并行的执行窗口函数的计算;

单线程执行,或分区间并行HashGroup会被分配给一个线程。

计算步骤(优化点):

  1. 确定 partition 边界(binary search)
  2. 确定 frame
  3. 计算

Determining the Window Frame Bounds

确定 frame

  • Rows
    只需按照当前行的索引进行+/-得到Frame,需要注意确保仍在当前partition中
  • Range
    对于常量的边界,先基于正常的增量方法进行进行+/-,而后逐行推进,避免每次都重新查找,复杂度$O(n)$;
    对于非常量的边界,Frame可变,可以采用二分查找,而后逐行推进,复杂度$O(nlogn)$;

N行Tuple,确定Frame复杂度:

mode constant non-constant
rows $O(N)$ $O(N)$
range $O(N)$ $O(NlogN)$

Aggregation Algorithms

Navigation functions(导航函数) : first_expr, last_expr, nth_expr , 仅选择窗口中的一行计算,很简单,$O(1)$。

Aggregate functions(聚合函数) : 针对所有行计算,开销大。

Naive Aggregation

简单的遍历frame中所有tuple,进行计算的agg

缺点: 反复进行冗余的计算

eg:sum(b) over (order by a rows between unbounded preceding and current row)

Cumulative Aggregation

累积聚合,对上述算法进行改进,每次不从头进行计算,记录前一个frame和frame的结果,和新增行的进行累加

eg: range between unbounded preceding and current row

缺点:仅适用于frame增长的场景,对于也会收缩的场景不适用

eg: sum(b) over (order by a rows between 5 preceding and 5 following)

Removable Cumulative Aggregation

可移除累积聚合,对上述算法的改进,不但允许frame累积增长,也可以从先前的聚合结果中remove行

适用于sum/avg/count/max/min的查询

缺点:对于恒定的frame还好,对于可变的frame,frame的变化幅度非常大,以至于缓存前一个frame的结果没有意义,计算复杂度变成$O(n^2)$

eg: sum(b) over (order by a rows between x preceding and y following)

Segment Tree Aggregation

线段树聚合,允许在对数时间内计算任意范围的集合

physical

gsum

为计算给定范围的聚合,从 frame 的两个边界开始,自下而上进行遍历,直到遍历到同一节点,对于小的frame,这个过程会提前结束,以最小的节点数进行聚合计算。

遍历算法参考附录C。

除了可以提高最坏情况(可变frame)的效率外,另一个好处是允许并行化计算聚合 sum(b) over (order by a rows between unbounded preceding and current row),这对没有分区的场景很重要,可以通过分区内并行来提高效率。

线段树的构建可以并行,以自底向上的方式:所有线程扫描线段树同一level的相邻范围,将计算存储到上层level。

对于min/max/count/sum直接线段树计算存储,对于avg/stddev可以同时存储(sum/count)。

对于dense_rank在每个分段上,记录不同子值的数量。

Algorithm Choice

algorithmComplexity

通常选择线段树是最好的,但对于简单的场景,更适合采取最简单的算法,避免线段树构建时的开销。

* naive agg:
    对于每一条tuple都耿直地找frame执行一遍agg。会有平方级的计算。
* cumulative agg:
    适用于range between unbound preceding and current row的一直增长的frame计算(想像下累计sum计算)
* removable cumulative agg:
    适用于于非一直增长的的between x preceding and y following,(想像下sum计算, 减去x前面一个对应的值再加上y对应的值)
* segment tree agg: (有利于并行)
    先构建线段树,加速后续Sum的计算。适用于动态的frame计算。
select sum(a) over
    (order by b
        rows between ? preceding and current row)
from r

framePer

frameVarPer

Window Function without Framing

不受Frame影响,计算比较简单:

  1. 计算第一个tuple的结果
  2. 使用前面的结果依次计算剩下的结果

优化汇总

优化方式

  • 相同的窗口合并为一个Window计算

  • 并行划分partition

    HashGroups多线程划分(Hash-Based)

  • 并行排序

    对partition和order key进行排序(Sort-Based)

    GNU libstdc++ library("Parallel Mode") 比如:__gnu_parallel::sort()

  • 分区间并行

    parition数目较多,且数据分布比较均匀

    节点之间的并行

    单节点内部的并行

  • 分区内并行

    数据倾斜严重,partition数目较少

  • LLVM compiler

    计算Frame

  • Agg计算

    SegmentTree(利于并行)

  • NUMA系统(可能的优化)

建议值(测试)

HashGroups的size默认为1024,是个测试值,有利于CPU cache和TLB。

threads的size,与 CPU核心数*并发数 一致。

分区并行,可以采用自动分类,同时支持强制某种并行方式。

SegmentTree的 fanout 通常设置为16。

fanout

存在问题

  1. 线程数控制
  2. 内存预估
  3. 内存使用上限
  4. 切外存
  5. 现有算法,前三种场景$O(n)$,第四种$O(nlogN)$,性能要比线段树好
  6. 优化的适用线段树算法