Hits

并行处理管道,感受Go语言魅力

并行处理管道,感受Go语言魅力

Go语言三大特色

  • 面向接口编程
  • 函数式编程
  • 并发编程

Go 语言并发编程

  • 采用CSP模型(Communication Sequential Process)
  • 不需要锁,不需要callback
  • 并发编程 VS 并行计算

外部排序

常用的排序算法插入排序、选择排序和快速排序等都是将数据取到内存中进行排序。当数据量非常大时,大到内存无法一次将所有数据读到内存中,这时就需要使用外部排序。

基本思想:

  • 将数据分为左右两半,分别归并排序,再把两个有序数据继续归并

  • 如何归并

    • [1,3,6,7],[1,2,3,5] => 1
    • [3,6,7],[1,2,3,5] => 1
    • [3,6,7],[2,3,5] => 2
    • [3,6,7],[3,5] => 3

guibing

Go利用Pipeline外部排序

步骤一:

  • 首先切好片的数据通过goroutine读入到某个节点,通过go chan传输。
  • 再启动一个goroutine,把go chan里的数据读入内存中,然后使用快速排序对内存中的数据进行快速排序,最后将排序好的数据再次通过goroutine传输给一个go chan。
  • 最后将go chan管道中排好序的数据输出到文件,或者打印到控制台。

步骤二:

  • 上述步骤一完成了一个节点的外部排序,接着讲所有的切片按照上述步骤一都可以完成内部排序并存入文件。
  • 然后通过MergeN将多个节点分别进行两两二路归并,最后将所有节点排好序并通过go chan管道返回出去并存于文件。
// 将文件读取的数据输送到一个节点
// 该节点通过goroutine将数据输送到chan
func ReaderSource(a ...int) <-chan int {
    out := make(chan int)

    go func() {
        for _, v := range a {
            out <- v
        }
        // 一定要close,close后,外面会用if或range取判断取失败
        // 数据量大的话,不关闭会很占内存
        close(out)
    }()
    
    return out
}
// 将上面ReaderSource返回的chan传进来读入内存
// 使用内部排序对读入内存的数据排序
// 然后通过goroutine输出到chan返回出去
// 参数in 只进不出,返回参数只出不进
func InMemSort(in <-chan int) <-chan int {
    out := make(chan int, 1024)

    go func() {
        // Read into memory
        a := []int{}
        for v := range in {
            a = append(a, v)
        }

        // Sort
        sort.Ints(a)

        // Output
        for _, v := range a {
            out <- v
        }

        // close
        close(out)
    }()

    return out
}
// 将排好序的内存数据打印输出,或者存文件
func main() {
    p := InMemSort(ArraySource(3, 2, 6, 7, 4))
    for v := range p {
        fmt.Println(v)
    }
}
// 将排好序的多个节点通过2路归并排序
func MergeN(inputs ... <-chan int) <-chan int {
    if len(inputs) == 1 {
        return inputs[0]
    }

    m := len(inputs) / 2

    // merge inputs[0...m) and inputs [m...end)
    return Merge(MergeN(inputs[:m]...), MergeN(inputs[m:]...))
}
// 将排好序的2个节点归并归并
func Merge(in1, in2 <-chan int) <-chan int {
    out := make(chan int, 1024)

    go func() {
        v1, ok1 := <-in1 // 没有元素ok1返回false
        v2, ok2 := <-in2
        for ok1 || ok2 {
            // v2没有元素就出v1; v1,v2都有数据,且v1 <= v2也出v1
            if !ok2 || (ok1 && v1 <= v2) {
                out <- v1
                v1, ok1 = <-in1
            } else {
                out <- v2
                v2, ok2 = <-in2
            }
        }

        // 关闭
        close(out)
        fmt.Println("Merge done: ", time.Now().Sub(startTime))
    }()

    return out
}

性能分析

  • ArraySource节点,支持分块读取
  • mergeN,搭建归并节点组,或者使用多路归并排序
  • pipeline的搭建及运行 - CPU及线程数量的观测

Github源码

本文链接:参与评论 »

--EOF--

提醒:本文最后更新于 118 天前,文中所描述的信息可能已发生改变,请谨慎使用。

Comments