Badger源码学习之-SkipList基础篇

June 29, 2019

Badger源码学习之-SkipList基础篇

SkipList 全称跳跃表,具体定义可以查看维基百科的定义,或者论文SkipList是一个有序的高效的链表结构,数组数据结构的search的时间复杂度为O(lgn),通过二分法搜索,但是插入和删除的时间复杂度为O(n);链表的search的时间复杂度为O(n),插入和删除的时间复杂度为O(lg)SkipList这个数据结构很好的结合了这两种数据结构的特性,searchinsertdelete的时间复杂度都是O(lgn),但是空间复杂度比其他的高,利用了空间换取时间的做法。

SkipList是根据链表而来的,其数据结构类似与链表。

链表

singl-list.jpg

这就是一个简单的顺序链表,每次searchinsertdelete的时间复杂度都是O(n),都需要遍历整个链表。

SkipList

skip list SkipList和单链表的不同点就是相邻两层的节点比为1/2。查找的时间从最上层开始来查找,这就相对于建了一个索引。

层级之间节点比为1/2实现起来有会边复杂,相当于需要固定维护相邻层级之间的关系。所以作者推荐使用一个随机高度,每个节点的高度随机产生,再去修改节点的前后关系。通过设置一个最大高度和相邻节点比来生成随机高度。如下设置的最大高度是20,相邻节点是1/3;算法作者推荐这个比最好为1/2^n次方,但是我不明白为啥badger选择1/3.

// 这事badger中的随机高度的实现方式,
const (
	maxHeight      = 20
	heightIncrease = math.MaxUint32 / 3
)
func randomHeight(heightIncrease uint32) int32 {
	h := 1
	for h < maxHeight && rand.Uint32() <= heightIncrease {
		h++
	}
	return int32(h)
}

高度看似是随机产生的,但是是按比例划分的,官方是按照概率去计算的,我通过生成10000000个数字来查看数字的分布,基本是和上面的概率比一致的。

func Test_randomHeight1(t *testing.T)  {
	rand.Seed(time.Now().UnixNano())
	num := 10000000
	data2 := make([]int32,num)
	for i := 0 ; i < num ; i++ {
		data2[i] = randomHeight(math.MaxUint32 / 2)
	}
	scatterData2 := randomPoints1(data2)

	data3 := make([]int32,num)
	for i := 0 ; i < num ; i++ {
		data3[i] = randomHeight(math.MaxUint32 / 3)
	}
	scatterData3 := randomPoints1(data3)

	data4 := make([]int32,num)
	for i := 0 ; i < num ; i++ {
		data4[i] = randomHeight(math.MaxUint32 / 4)
	}
	scatterData4  := randomPoints1(data4)

	data5 := make([]int32,num)
	for i := 0 ; i < num ; i++ {
		data5[i] = randomHeight(math.MaxUint32 / 5)
	}
	scatterData5 := randomPoints1(data5)
	// Create a new plot, set its title and
	// axis labels.
	p, err := plot.New()
	if err != nil {
		panic(err)
	}
	p.Title.Text = "Points Example"
	p.X.Label.Text = "X"
	p.Y.Label.Text = "Y"
	// Draw a grid behind the data
	//p.Add(plotter.NewGrid())
	//
	//// Make a scatter plotter and set its style.
	//s, err := plotter.NewScatter(scatterData)
	//if err != nil {
	//	panic(err)
	//}
	//s.GlyphStyle.Color = color.RGBA{R: 255, B: 128, A: 255}
	//p.Add(s)
	l2, s2, _ := plotter.NewLinePoints(scatterData2)
	l2.Color = plotutil.Color(0)
	l2.Dashes = plotutil.Dashes(0)
	s2.Color = plotutil.Color(0)
	s2.Shape = plotutil.Shape(0)

	l3, s3, _ := plotter.NewLinePoints(scatterData3)
	l3.Color = plotutil.Color(1)
	l3.Dashes = plotutil.Dashes(1)
	s3.Color = plotutil.Color(1)
	s3.Shape = plotutil.Shape(1)

	l4, s4, _ := plotter.NewLinePoints(scatterData4)
	l4.Color = plotutil.Color(2)
	l4.Dashes = plotutil.Dashes(2)
	s4.Color = plotutil.Color(2)
	s4.Shape = plotutil.Shape(2)

	l5, s5, _ := plotter.NewLinePoints(scatterData5)
	l5.Color = plotutil.Color(3)
	l5.Dashes = plotutil.Dashes(3)
	s5.Color = plotutil.Color(3)
	s5.Shape = plotutil.Shape(3)

	//plotutil.AddLinePoints(p, scatterData2,scatterData3,scatterData4,scatterData5)
	p.Add(l2,s2,l3,s3,l4,s4,l5,s5)
	p.Legend.Add("line points 2", l2, s2)
	p.Legend.Add("line points 3", l3, s3)
	p.Legend.Add("line points 4", l4, s4)
	p.Legend.Add("line points 5", l5, s5)
	// Save the plot to a PNG file.
	if err := p.Save(8*vg.Inch, 8*vg.Inch, "4-points0.5.png"); err != nil {
		panic(err)
	}
	t.Log("Test Pass")
}


// randomPoints returns some random x, y points.
func randomPoints1(n []int32) plotter.XYs {
	m := make(map[int32]int)

	for _,v := range n {
		if _,ok := m[v]; ok  {
			m[v] ++
		}else {
			m[v] = 1
		}
	}
	pts := make(plotter.XYs, len(m))
	for i:= 0 ; i < len(m) ; {
		k := i+1
		v := m[int32(k)]
		fmt.Println(k,v)
		pts[i].X = float64(k)
		pts[i].Y = float64(v)
		i ++
	}
	fmt.Printf("map:%+v\n",m)
	return pts
}

我写了如上的一个测试程序,并画出数据的分布情况。 分布

横轴是最大高度,纵轴是分布在高度上的情况,从图上可以看出,p越小,衰变的速度越快,数据分布越集中,当p1/2是数据基本是按层级间1/2的关系递减的。

实现

官方论文中有实现的伪代码,按照伪代码即可实现。

Search

如下是查询时的一个伪代码,基本逻辑就是从头节点的最高层级开始变量查询,如果查询节点不为nil并且key大于当前节点的key,就往后查询,否则层级下降。

Search(list, searchKey)
    x := list→header
    -- loop invariant: x→key < searchKey
    for i := list→level downto 1 do
        while x→forward[i] != nil &&& x→forward[i]→key < searchKey do
            x := x→forward[i]
    -- x→key < searchKey ≤ x→forward[1]→key
    x := x→forward[1]
    if x→key = searchKey then 
        return x→value
    else return failure

Insert

插入的不同点就是要记录下查找的路径,后面需要更新插入前后的元素;只记录阶梯变化的最后一个;下面的update数组就是记录变化的,然后查找到要插入的地方,如果key已经存在就更新值,不存在就插入。

Insert(list, searchKey, newValue)
    local update[1..MaxLevel]
    x := list→header
    for i := list→level downto 1 do
        while x→forward[i]→key < searchKey do
            x := x→forward[i]
        update[i] := x
    x := x→forward[1]
    if x→key = searchKey then x→value := newValue
    else
        newLevel := randomLevel()
        if newLevel > list→level then
            for i := list→level + 1 to newLevel do
                update[i] := list→header
            list→level := newLevel
        x := makeNode(newLevel , searchKey, value)
        for i := 1 to newLevel do
            x→forward[i] := update[i]→forward[i]
            update[i]→forward[i] := x

Delete

删除是同样需要记录查找路径,后面需要更新前后的路径。

Delete(list, searchKey)
    local update[1..MaxLevel]
    x := list→header
    for i := list→level downto 1 do
        while x→forward[i]→key < searchKey do
            x := x→forward[i]
        update[i] := x
    x := x→forward[1]
    if x→key = searchKey then
        for i := 1 to list→level do
            if update[i]→forward[i] ≠ x then break
            update[i]→forward[i] := x→forward[i]
        free(x)
        while list→level > 1 and
            list→header→forward[list→level] = NIL do
            list→level := list→level – 1

参考


LRF 记录学习、生活的点滴