《Go语言高级编程》阅读笔记
第一章 语言基础
1.1 Go语言创世纪
Go语言是对C语言最彻底的一次扬弃,不仅仅是语法和C语言有着很多差异,最重要的是舍弃了C语言中灵活但是危险的指针运算。而且,Go语言还重新设计了C语言中部分不太合理运算符的优先级,并在很多细微的地方都做了必要的打磨和改变。
1.5 面向并发的内存模型
在早期,CPU都是以单核的形式顺序执行机器指令。Go语言的祖先C语言正是这种顺序编程语言的代表。顺序编程语言中的顺序是指:所有的指令都是以串行的方式执行,在相同的时刻有且仅有一个CPU在顺序执行程序的指令。
Go语言正是在多核和网络化的时代背景下诞生的原生支持并发的编程语言。
常见的并行编程有多种模型,主要有多线程、消息传递等。从理论上来看,多线程和基于消息的并发编程是等价的。由于多线程并发模型可以自然对应到多核的处理器,主流的操作系统因此也都提供了系统级的多线程支持,同时从概念上讲多线程似乎也更直观,因此多线程编程模型逐步被吸纳到主流的编程语言特性或语言扩展库中。而主流编程语言对基于消息的并发编程模型支持则相比较少,Go语言是基于消息并发模型的集大成者,它将基于CSP模型的并发编程内置到了语言中,通过一个go关键字就可以轻易地启动一个Goroutine,与Erlang不同的是Go语言的Goroutine之间是共享内存的。
1.5.1 Goroutine和系统线程
Goroutine是Go语言特有的并发体,是一种轻量级的线程,由go关键字启动。在真实的Go语言的实现中,goroutine和系统线程也不是等价的。尽管两者的区别实际上只是一个量的区别,但正是这个量变引发了Go语言并发编程质的飞跃。
首先,每个系统级线程都会有一个固定大小的栈(一般默认可能是2MB),这个栈主要用来保存函数递归调用时参数和局部变量。固定了栈的大小导致了两个问题:一是对于很多只需要很小的栈空间的线程来说是一个巨大的浪费,二是对于少数需要巨大栈空间的线程来说又面临栈溢出的风险。针对这两个问题的解决方案是:要么降低固定的栈大小,提升空间的利用率;要么增大栈的大小以允许更深的函数递归调用,但这两者是没法同时兼得的。相反,一个Goroutine会以一个很小的栈启动(可能是2KB或4KB),当遇到深度递归导致当前栈空间不足时,Goroutine会根据需要动态地伸缩栈的大小(主流实现中栈的最大值可达到1GB)。因为启动的代价很小,所以我们可以轻易地启动成千上万个Goroutine。
Go的运行时还包含了其自己的调度器,这个调度器使用了一些技术手段,可以在n个操作系统线程上多工调度m个Goroutine。Go调度器的工作和内核的调度是相似的,但是这个调度器只关注单独的Go程序中的Goroutine。Goroutine采用的是半抢占式的协作调度,只有在当前Goroutine发生阻塞时才会导致调度;同时发生在用户态,调度器会根据具体函数只保存必要的寄存器,切换的代价要比系统线程低得多。运行时有一个runtime.GOMAXPROCS
变量,用于控制当前运行正常非阻塞Goroutine的系统线程数目。
1.5.2 原子操作
所谓的原子操作就是并发编程中“最小的且不可并行化”的操作。通常,如果多个并发体对同一个共享资源进行的操作是原子的话,那么同一时刻最多只能有一个并发体对该资源进行操作。从线程角度看,在当前线程修改共享资源期间,其它的线程是不能访问该资源的。原子操作对于多线程并发编程模型来说,不会发生有别于单线程的意外情况,共享资源的完整性可以得到保证。
一般情况下,原子操作都是通过“互斥”访问来保证的,通常由特殊的CPU指令提供保护。当然,如果仅仅是想模拟下粗粒度的原子操作,我们可以借助于sync.Mutex
来实现
用互斥锁来保护一个数值型的共享资源,麻烦且效率低下。标准库的sync/atomic
包对原子操作提供了丰富的支持
import (
"sync"
"sync/atomic"
)
var total uint64
func worker(wg *sync.WaitGroup) {
defer wg.Done()
var i uint64
for i = 0; i <= 100; i++ {
atomic.AddUint64(&total, i)
// 但是atomic包下没有sub的操作,支持的操作有限
}
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go worker(&wg)
go worker(&wg)
wg.Wait()
}
原子操作配合互斥锁可以实现非常高效的单件模式。互斥锁的代价比普通整数的原子读写高很多,在性能敏感的地方可以增加一个数字型的标志位,通过原子检测标志位状态降低互斥锁的使用次数来提高性能。
type singleton struct {}
var (
instance *singleton
initialized uint32
mu sync.Mutex
)
func Instance() *singleton {
if atomic.LoadUint32(&initialized) == 1 {
return instance
}
mu.Lock()
defer mu.Unlock()
if instance == nil {
defer atomic.StoreUint32(&initialized, 1)
instance = &singleton{}
}
return instance
}
我们可以将通用的代码提取出来,就成了标准库中sync.Once
的实现:
type Once struct {
m Mutex
done uint32
}
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 1 {
return
}
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
基于sync.Once
重新实现单件模式:
var (
instance *singleton
once sync.Once
)
func Instance() *singleton {
once.Do(func() {
instance = &singleton{}
})
return instance
}
sync/atomic
包对基本的数值类型及复杂对象的读写都提供了原子操作的支持。atomic.Value
原子对象提供了Load
和Store
两个原子方法,分别用于加载和保存数据,返回值和参数都是interface{}
类型,因此可以用于任意的自定义复杂类型。
var config atomic.Value // 保存当前配置信息
// 初始化配置信息
config.Store(loadConfig())
// 启动一个后台线程, 加载更新后的配置信息
go func() {
for {
time.Sleep(time.Second)
config.Store(loadConfig())
}
}()
// 用于处理请求的工作者线程始终采用最新的配置信息
for i := 0; i < 10; i++ {
go func() {
for r := range requests() {
c := config.Load()
// ...
}
}()
}
这是一个简化的生产者消费者模型:后台线程生成最新的配置信息;前台多个工作者线程获取最新的配置信息。所有线程共享配置信息资源。
1.5.3 顺序一致性内存模型
如果只是想简单地在线程之间进行数据同步的话,原子操作已经为编程人员提供了一些同步保障。不过这种保障有一个前提:顺序一致性的内存模型。
var a string
var done bool
func setup() {
a = "hello, world"
done = true
}
func main() {
go setup()
for !done {}
print(a)
}
我们创建了setup
线程,用于对字符串a
的初始化工作,初始化完成之后设置done
标志为true
。main
函数所在的主线程中,通过for !done {}
检测done
变为true
时,认为字符串初始化工作完成,然后进行字符串的打印工作。
但是Go语言并不保证在main
函数中观测到的对done
的写入操作发生在对字符串a
的写入的操作之后,因此程序很可能打印一个空字符串。更糟糕的是,因为两个线程之间没有同步事件,setup
线程对done
的写入操作甚至无法被main
线程看到,main
函数有可能陷入死循环中。
在Go语言中,同一个Goroutine线程内部,顺序一致性内存模型是得到保证的。但是不同的Goroutine之间,并不满足顺序一致性内存模型,需要通过明确定义的同步事件来作为同步的参考。如果两个事件不可排序,那么就说这两个事件是并发的。为了最大化并行,Go语言的编译器和处理器在不影响上述规定的前提下可能会对执行语句重新排序(CPU也会对一些指令进行乱序执行)。
因此,如果在一个Goroutine中顺序执行a = 1; b = 2;
两个语句,虽然在当前的Goroutine中可以认为a = 1;
语句先于b = 2;
语句执行,但是在另一个Goroutine中b = 2;
语句可能会先于a = 1;
语句执行,甚至在另一个Goroutine中无法看到它们的变化(可能始终在寄存器中)。也就是说在另一个Goroutine看来, a = 1; b = 2;
两个语句的执行顺序是不确定的。如果一个并发程序无法确定事件的顺序关系,那么程序的运行结果往往会有不确定的结果。比如下面这个程序:
func main() {
go println("你好, 世界")
}
根据Go语言规范,main
函数退出时程序结束,不会等待任何后台线程。因为Goroutine的执行和main
函数的返回事件是并发的,谁都有可能先发生,所以什么时候打印,能否打印都是未知的。
用前面的原子操作并不能解决问题,因为我们无法确定两个原子操作之间的顺序。解决问题的办法就是通过同步原语来给两个事件明确排序:
func main() {
done := make(chan int)
go func(){
println("你好, 世界")
done <- 1
}()
<-done
}
1.5.4 初始化顺序
(init
不是普通函数,可以定义有多个,所以不能被其它函数调用)。
要注意的是,在main.main
函数执行之前所有代码都运行在同一个Goroutine中,也是运行在程序的主系统线程中。如果某个init
函数内部用go关键字启动了新的Goroutine的话,新的Goroutine和main.main
函数是并发执行的。
因为所有的init
函数和main
函数都是在主线程完成,它们也是满足顺序一致性模型的。
1.5.6 基于Channel的通信
Channel通信是在Goroutine之间进行同步的主要方法。在无缓存的Channel上的每一次发送操作都有与其对应的接收操作相配对,发送和接收操作通常发生在不同的Goroutine上(在同一个Goroutine上执行2个操作很容易导致死锁。无缓存的Channel上的发送操作总在对应的接收操作完成前发生.
若在关闭Channel后继续从中接收数据,接收者就会收到该Channel返回的零值。因此在这个例子中,用close(c)
关闭管道代替done <- false
依然能保证该程序产生相同的行为。
var done = make(chan bool)
var msg string
func aGoroutine() {
msg = "你好, 世界"
close(done)
}
func main() {
go aGoroutine()
<-done
println(msg)
}
对于从无缓冲Channel进行的接收,发生在对该Channel进行的发送完成之前。
基于上面这个规则可知,交换两个Goroutine中的接收和发送操作也是可以的(但是很危险):
var done = make(chan bool)
var msg string
func aGoroutine() {
msg = "hello, world"
<-done
}
func main() {
go aGoroutine()
done <- true
println(msg)
}
也可保证打印出“hello, world”。因为main
线程中done <- true
发送完成前,后台线程<-done
接收已经开始,这保证msg = "hello, world"
被执行了,所以之后println(msg)
的msg已经被赋值过了。简而言之,后台线程首先对msg
进行写入,然后从done
中接收信号,随后main
线程向done
发送对应的信号,最后执行println
函数完成。但是,若该Channel为带缓冲的(例如,done = make(chan bool, 1)
),main
线程的done <- true
接收操作将不会被后台线程的<-done
接收操作阻塞,该程序将无法保证打印出“hello, world”。
对于带缓冲的Channel,对于Channel的第K
个接收完成操作发生在第K+C
个发送操作完成之前,其中C
是Channel的缓存大小。 如果将C
设置为0自然就对应无缓存的Channel,也即使第K个接收完成在第K个发送完成之前。因为无缓存的Channel只能同步发1个,也就简化为前面无缓存Channel的规则:对于从无缓冲Channel进行的接收,发生在对该Channel进行的发送完成之前。
我们可以根据控制Channel的缓存大小来控制并发执行的Goroutine的最大数目, 例如:
var limit = make(chan int, 3)
func main() {
for _, w := range work {
go func() {
limit <- 1
w()
<-limit
}()
}
select{}
}
最后一句select{}
是一个空的管道选择语句,该语句会导致main
线程阻塞,从而避免程序过早退出。还有for{}
、<-make(chan int)
等诸多方法可以达到类似的效果。因为main
线程被阻塞了,如果需要程序正常退出的话可以通过调用os.Exit(0)
实现
1.5.7 不靠谱的同步
严谨的并发程序的正确性不应该是依赖于CPU的执行速度和休眠时间等不靠谱的因素的。严谨的并发也应该是可以静态推导出结果的:根据线程内顺序一致性,结合Channel或sync
同步事件的可排序性来推导,最终完成各个线程各段代码的偏序关系排序。如果两个事件无法根据此规则来排序,那么它们就是并发的,也就是执行先后顺序不可靠的。
解决同步问题的思路是相同的:使用显式的同步。
1.6 常见的并发模式
Go语言并发体系的理论是C.A.R Hoare在1978年提出的CSP(Communicating Sequential Process,通讯顺序进程)
作为Go并发编程核心的CSP理论的核心概念只有一个:同步通信。
首先要明确一个概念:并发不是并行。并发更关注的是程序的设计层面,并发的程序完全是可以顺序执行的,只有在真正的多核CPU上才可能真正地同时运行。并行更关注的是程序的运行层面,并行一般是简单的大量重复,例如GPU中对图像处理都会有大量的并行运算。为更好的编写并发程序,从设计之初Go语言就注重如何在编程语言层级上设计一个简洁安全高效的抽象模型,让程序员专注于分解问题和组合方案,而且不用被线程管理和信号互斥这些繁琐的操作分散精力。
在并发编程中,对共享资源的正确访问需要精确的控制,在目前的绝大多数语言中,都是通过加锁等线程同步方案来解决这一困难问题,而Go语言却另辟蹊径,它将共享的值通过Channel传递(实际上多个独立执行的线程很少主动共享资源)。在任意给定的时刻,最好只有一个Goroutine能够拥有该资源。数据竞争从设计层面上就被杜绝了。为了提倡这种思考方式,Go语言将其并发编程哲学化为一句口号:
Do not communicate by sharing memory; instead, share memory by communicating.
不要通过共享内存来通信,而应通过通信来共享内存。
这是更高层次的并发编程哲学(通过管道来传值是Go语言推荐的做法)。虽然像引用计数这类简单的并发问题通过原子操作或互斥锁就能很好地实现,但是通过Channel来控制访问能够让你写出更简洁正确的程序。
1.6.1 并发版本的Hello world
我们先以在一个新的Goroutine中输出“Hello world”,main
等待后台线程输出工作完成之后退出,这样一个简单的并发程序作为热身。
并发编程的核心概念是同步通信,但是同步的方式却有多种。我们先以大家熟悉的互斥量sync.Mutex
来实现同步通信。根据文档,我们不能直接对一个未加锁状态的sync.Mutex
进行解锁,这会导致运行时异常。下面这种方式并不能保证正常工作:
func main() {
var mu sync.Mutex
go func(){
fmt.Println("你好, 世界")
mu.Lock()
}()
mu.Unlock()
}
因为mu.Lock()
和mu.Unlock()
并不在同一个Goroutine中,所以也就不满足顺序一致性内存模型。同时它们也没有其它的同步事件可以参考,这两个事件不可排序也就是可以并发的。因为可能是并发的事件,所以main
函数中的mu.Unlock()
很有可能先发生,而这个时刻mu
互斥对象还处于未加锁的状态,从而会导致运行时异常。
下面是修复后的代码:
func main() {
var mu sync.Mutex
mu.Lock()
go func(){
fmt.Println("你好, 世界")
mu.Unlock()
}()
mu.Lock()
}
修复的方式是在main
函数所在线程中执行两次mu.Lock()
,当第二次加锁时会因为锁已经被占用(不是递归锁)而阻塞,main
函数的阻塞状态驱动后台线程继续向前执行。
使用sync.Mutex
互斥锁同步是比较低级的做法。我们现在改用无缓存的管道来实现同步:
func main() {
done := make(chan int)
go func(){
fmt.Println("你好, 世界")
<-done
}()
done <- 1
}
上面的代码虽然可以正确同步,但是对管道的缓存大小太敏感:如果管道有缓存的话,就无法保证main退出之前后台线程能正常打印了。更好的做法是将管道的发送和接收方向调换一下,这样可以避免同步事件受管道缓存大小的影响:
func main() {
done := make(chan int, 1) // 带缓存的管道
go func(){
fmt.Println("你好, 世界")
done <- 1
}()
<-done
}
对于带缓冲的Channel,对于Channel的第K个接收完成操作发生在第K+C个发送操作完成之前,其中C是Channel的缓存大小。虽然管道是带缓存的,main
线程接收完成是在后台线程发送开始但还未完成的时刻,此时打印工作也是已经完成的。
基于带缓存的管道,我们可以很容易将打印线程扩展到N个。下面的例子是开启10个后台线程分别打印:
func main() {
done := make(chan int, 10) // 带 10 个缓存
// 开N个后台打印线程
for i := 0; i < cap(done); i++ {
go func(){
fmt.Println("你好, 世界")
done <- 1
}()
}
// 等待N个后台线程完成
for i := 0; i < cap(done); i++ {
<-done
}
}
对于这种要等待N个线程完成后再进行下一步的同步操作有一个简单的做法,就是使用sync.WaitGroup
来等待一组事件:
func main() {
var wg sync.WaitGroup
// 开N个后台打印线程
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
fmt.Println("你好, 世界")
wg.Done()
}()
}
// 等待N个后台线程完成
wg.Wait()
}
其中wg.Add(1)
用于增加等待事件的个数,必须确保在后台线程启动之前执行(如果放到后台线程之中执行则不能保证被正常执行到)。当后台线程完成打印工作之后,调用wg.Done()
表示完成一个事件。main
函数的wg.Wait()
是等待全部的事件完成。
1.6.4 控制并发数
在Go语言自带的godoc程序实现中有一个vfs
的包对应虚拟的文件系统,在vfs
包下面有一个gatefs
的子包,gatefs
子包的目的就是为了控制访问该虚拟文件系统的最大并发数。gatefs
包的应用很简单:
import (
"golang.org/x/tools/godoc/vfs"
"golang.org/x/tools/godoc/vfs/gatefs"
)
func main() {
fs := gatefs.New(vfs.OS("/path"), make(chan bool, 8))
// ...
}
其中vfs.OS("/path")
基于本地文件系统构造一个虚拟的文件系统,然后gatefs.New
基于现有的虚拟文件系统构造一个并发受控的虚拟文件系统。并发数控制的原理在前面一节已经讲过,就是通过带缓存管道的发送和接收规则来实现最大并发阻塞:
var limit = make(chan int, 3)
func main() {
for _, w := range work {
go func() {
limit <- 1
w()
<-limit
}()
}
select{}
}
不过gatefs
对此做一个抽象类型gate
,增加了enter
和leave
方法分别对应并发代码的进入和离开。当超出并发数目限制的时候,enter
方法会阻塞直到并发数降下来为止。
type gate chan bool
func (g gate) enter() { g <- true }
func (g gate) leave() { <-g }
gatefs
包装的新的虚拟文件系统就是将需要控制并发的方法增加了enter
和leave
调用而已:
type gatefs struct {
fs vfs.FileSystem
gate
}
func (fs gatefs) Lstat(p string) (os.FileInfo, error) {
fs.enter()
defer fs.leave()
return fs.fs.Lstat(p)
}
我们不仅可以控制最大的并发数目,而且可以通过带缓存Channel的使用量和最大容量比例来判断程序运行的并发率。当管道为空的时候可以认为是空闲状态,当管道满了时任务是繁忙状态,这对于后台一些低级任务的运行是有参考价值的。
1.6.5 赢者为王
假设我们想快速地搜索“golang”相关的主题,我们可能会同时打开Bing、Google或百度等多个检索引擎。当某个搜索最先返回结果后,就可以关闭其它搜索页面了。因为受网络环境和搜索引擎算法的影响,某些搜索引擎可能很快返回搜索结果,某些搜索引擎也可能等到他们公司倒闭也没有完成搜索。我们可以采用类似的策略来编写这个程序:
func main() {
ch := make(chan string, 32)
go func() {
ch <- searchByBing("golang")
}()
go func() {
ch <- searchByGoogle("golang")
}()
go func() {
ch <- searchByBaidu("golang")
}()
fmt.Println(<-ch)
}
首先,我们创建了一个带缓存的管道,管道的缓存数目要足够大,保证不会因为缓存的容量引起不必要的阻塞。然后我们开启了多个后台线程,分别向不同的搜索引擎提交搜索请求。当任意一个搜索引擎最先有结果之后,都会马上将结果发到管道中(因为管道带了足够的缓存,这个过程不会阻塞)。但是最终我们只从管道取第一个结果,也就是最先返回的结果。
通过适当开启一些冗余的线程,尝试用不同途径去解决同样的问题,最终以赢者为王的方式提升了程序的相应性能。
1.6.6 素数筛
并发版本的素数筛是一个经典的并发例子,通过它我们可以更深刻地理解Go语言的并发特性。“素数筛”的原理如图:
图 1-13 素数筛
我们需要先生成最初的2, 3, 4, ...
自然数序列(不包含开头的0、1):
// 返回生成自然数序列的管道: 2, 3, 4, ...
func GenerateNatural() chan int {
ch := make(chan int)
go func() {
for i := 2; ; i++ {
ch <- i
}
}()
return ch
}
GenerateNatural
函数内部启动一个Goroutine生产序列,返回对应的管道。
然后是为每个素数构造一个筛子:将输入序列中是素数倍数的数提出,并返回新的序列,是一个新的管道。
// 管道过滤器: 删除能被素数整除的数
func PrimeFilter(in <-chan int, prime int) chan int {
out := make(chan int)
go func() {
for {
if i := <-in; i%prime != 0 {
out <- i
}
}
}()
return out
}
PrimeFilter
函数也是内部启动一个Goroutine生产序列,返回过滤后序列对应的管道。
现在我们可以在main
函数中驱动这个并发的素数筛了:
func main() {
ch := GenerateNatural() // 自然数序列: 2, 3, 4, ...
for i := 0; i < 100; i++ {
prime := <-ch // 新出现的素数
fmt.Printf("%v: %v\n", i+1, prime)
ch = PrimeFilter(ch, prime) // 基于新素数构造的过滤器
}
}
我们先是调用GenerateNatural()
生成最原始的从2开始的自然数序列。然后开始一个100次迭代的循环,希望生成100个素数。在每次循环迭代开始的时候,管道中的第一个数必定是素数,我们先读取并打印这个素数。然后基于管道中剩余的数列,并以当前取出的素数为筛子过滤后面的素数。不同的素数筛子对应的管道是串联在一起的。
素数筛展示了一种优雅的并发程序结构。但是因为每个并发体处理的任务粒度太细微,程序整体的性能并不理想。对于细粒度的并发程序,CSP模型中固有的消息传递的代价太高了(多线程并发模型同样要面临线程启动的代价)。
1.6.7 并发的安全退出
有时候我们需要通知goroutine停止它正在干的事情,特别是当它工作在错误的方向上的时候。Go语言并没有提供在一个直接终止Goroutine的方法,由于这样会导致goroutine之间的共享变量处在未定义的状态上。但是如果我们想要退出两个或者任意多个Goroutine怎么办呢?
Go语言中不同Goroutine之间主要依靠管道进行通信和同步。要同时处理多个管道的发送或接收操作,我们需要使用select
关键字(这个关键字和网络编程中的select
函数的行为类似)。当select
有多个分支时,会随机选择一个可用的管道分支,如果没有可用的管道分支则选择default
分支,否则会一直保存阻塞状态。
基于select
实现的管道的超时判断:
select {
case v := <-in:
fmt.Println(v)
case <-time.After(time.Second):
return // 超时
}
通过select
的default
分支实现非阻塞的管道发送或接收操作:
select {
case v := <-in:
fmt.Println(v)
default:
// 没有数据
}
通过select
来阻止main
函数退出:
func main() {
// do some thins
select{}
}
当有多个管道均可操作时,select
会随机选择一个管道。基于该特性我们可以用select
实现一个生成随机数序列的程序:
func main() {
ch := make(chan int)
go func() {
for {
select {
case ch <- 0:
case ch <- 1:
}
}
}()
for v := range ch {
fmt.Println(v)
}
}
我们通过select
和default
分支可以很容易实现一个Goroutine的退出控制:
func worker(cannel chan bool) {
for {
select {
default:
fmt.Println("hello")
// 正常工作
case <-cannel:
// 退出
}
}
}
func main() {
cannel := make(chan bool)
go worker(cannel)
time.Sleep(time.Second)
cannel <- true
}
但是管道的发送操作和接收操作是一一对应的,如果要停止多个Goroutine那么可能需要创建同样数量的管道,这个代价太大了。其实我们可以通过close
关闭一个管道来实现广播的效果,所有从关闭管道接收的操作均会收到一个零值和一个可选的失败标志。
func worker(cannel chan bool) {
for {
select {
default:
fmt.Println("hello")
// 正常工作
case <-cannel:
// 退出
}
}
}
func main() {
cancel := make(chan bool)
for i := 0; i < 10; i++ {
go worker(cancel)
}
time.Sleep(time.Second)
close(cancel)
}
我们通过close
来关闭cancel
管道向多个Goroutine广播退出的指令。不过这个程序依然不够稳健:当每个Goroutine收到退出指令退出时一般会进行一定的清理工作,但是退出的清理工作并不能保证被完成,因为main
线程并没有等待各个工作Goroutine退出工作完成的机制。我们可以结合sync.WaitGroup
来改进:
func worker(wg *sync.WaitGroup, cannel chan bool) {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
case <-cannel:
return
}
}
}
func main() {
cancel := make(chan bool)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(&wg, cancel)
}
time.Sleep(time.Second)
close(cancel)
wg.Wait()
}
现在每个工作者并发体的创建、运行、暂停和退出都是在main
函数的安全控制之下了
1.6.8 context包
在Go1.7发布时,标准库增加了一个context
包,用来简化对于处理单个请求的多个Goroutine之间与请求域的数据、超时和退出等操作,官方有博文对此做了专门介绍。我们可以用context
包来重新实现前面的线程安全退出或超时的控制:
func worker(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()
for {
select {
default:
fmt.Println("hello")
case <-ctx.Done():
return ctx.Err()
}
}
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(ctx, &wg)
}
time.Sleep(time.Second)
cancel()
wg.Wait()
}
当并发体超时或main
主动停止工作者Goroutine时,每个工作者都可以安全退出。
Go语言是带内存自动回收特性的,因此内存一般不会泄漏。
在前面素数筛的例子中,GenerateNatural
和PrimeFilter
函数内部都启动了新的Goroutine,当main
函数不再使用管道时后台Goroutine有泄漏的风险。我们可以通过context
包来避免这个问题,下面是改进的素数筛实现:
// 返回生成自然数序列的管道: 2, 3, 4, ...
func GenerateNatural(ctx context.Context) chan int {
ch := make(chan int)
go func() {
for i := 2; ; i++ {
select {
case <- ctx.Done():
return
case ch <- i:
}
}
}()
return ch
}
// 管道过滤器: 删除能被素数整除的数
func PrimeFilter(ctx context.Context, in <-chan int, prime int) chan int {
out := make(chan int)
go func() {
for {
if i := <-in; i%prime != 0 {
select {
case <- ctx.Done():
return
case out <- i:
}
}
}
}()
return out
}
func main() {
// 通过 Context 控制后台Goroutine状态
ctx, cancel := context.WithCancel(context.Background())
ch := GenerateNatural(ctx) // 自然数序列: 2, 3, 4, ...
for i := 0; i < 100; i++ {
prime := <-ch // 新出现的素数
fmt.Printf("%v: %v\n", i+1, prime)
ch = PrimeFilter(ctx, ch, prime) // 基于新素数构造的过滤器
}
cancel()
}
1.7 错误和异常
在Go语言中,错误被认为是一种可以预期的结果;而异常则是一种非预期的结果,发生异常可能表示程序中存在BUG或发生了其它不可控的问题。Go语言推荐使用recover
函数将内部异常转为错误处理,这使得用户可以真正的关心业务相关的错误处理。
捕获异常不是最终的目的。如果异常不可预测,直接输出异常信息是最好的处理方式
1.7.1 错误处理策略
Go语言中的导出函数一般不抛出异常,一个未受控的异常可以看作是程序的BUG。但是对于那些提供类似Web服务的框架而言;它们经常需要接入第三方的中间件。因为第三方的中间件是否存在BUG是否会抛出异常,Web框架本身是不能确定的。为了提高系统的稳定性,Web框架一般会通过recover
来防御性地捕获所有处理流程中可能产生的异常,然后将异常转为普通的错误返回。
让我们以JSON解析器为例,说明recover的使用场景。考虑到JSON解析器的复杂性,即使某个语言解析器目前工作正常,也无法肯定它没有漏洞。因此,当某个异常出现时,我们不会选择让解析器崩溃,而是会将panic异常当作普通的解析错误,并附加额外信息提醒用户报告此错误。
func ParseJSON(input string) (s *Syntax, err error) {
defer func() {
if p := recover(); p != nil {
err = fmt.Errorf("JSON: internal error: %v", p)
}
}()
// ...parser...
}
标准库中的json
包,在内部递归解析JSON数据的时候如果遇到错误,会通过抛出异常的方式来快速跳出深度嵌套的函数调用,然后由最外一级的接口通过recover
捕获panic
,然后返回相应的错误信息。
Go语言库的实现习惯: 即使在包内部使用了panic
,但是在导出函数时会被转化为明确的错误值。
1.7.2 获取错误的上下文
上层用户在遇到错误时,可以很容易从业务层面理解错误发生的原因。但是鱼和熊掌总是很难兼得,在上层用户获得新的错误的同时,我们也丢失了底层最原始的错误类型(只剩下错误描述信息了)。
为了记录这种错误类型在包装的变迁过程中的信息,我们一般会定义一个辅助的WrapError
函数,用于包装原始的错误,同时保留完整的原始错误类型。为了问题定位的方便,同时也为了能记录错误发生时的函数调用状态,我们很多时候希望在出现致命错误的时候保存完整的函数调用信息。同时,为了支持RPC等跨网络的传输,我们可能要需要将错误序列化为类似JSON格式的数据,然后再从这些数据中将错误解码恢出来。
在Go语言中,错误处理也有一套独特的编码风格。检查某个子函数是否失败后,我们通常将处理失败的逻辑代码放在处理成功的代码之前。如果某个错误会导致函数返回,那么成功时的逻辑代码不应放在else
语句块中,而应直接放在函数体中。
f, err := os.Open("filename.ext")
if err != nil {
// 失败的情形, 马上返回错误
}
// 正常的处理流程
Go语言中大部分函数的代码结构几乎相同,首先是一系列的初始检查,用于防止错误发生,之后是函数的实际逻辑。
1.7.3 错误的错误返回
Go语言作为一个强类型语言,不同类型之间必须要显式的转换(而且必须有相同的基础类型)。但是,Go语言中interface
是一个例外:非接口类型到接口类型,或者是接口类型之间的转换都是隐式的。这是为了支持鸭子类型,当然会牺牲一定的安全性。
鸭子类型(英語:duck typing)在程序设计中是动态类型的一种风格。 在这种风格中,一个对象有效的语义,不是由继承自特定的类或实现特定的接口,而是由”当前方法和属性的集合”决定。
后面没记了,现用现查吧!
第2章 CGO编程
C语言作为一个通用语言,很多库会选择提供一个C兼容的API,然后用其他不同的编程语言实现。Go语言通过自带的一个叫CGO的工具来支持C语言函数调用,同时我们可以用Go语言导出C动态库接口给其它语言使用。
2.1 快速入门
通过import "C"
语句启用CGO特性。即使没有调用CGO的相关函数,但是go build
命令会在编译和链接阶段启动gcc编译器,这已经是一个完整的CGO程序了。
2.1.2 基于C标准库函数输出字符串
// hello.go
package main
//#include <stdio.h>
import "C"
func main() {
C.puts(C.CString("Hello, World\n"))
}
我们不仅仅通过import "C"
语句启用CGO特性,同时包含C语言的<stdio.h>
头文件。然后通过CGO包的C.CString
函数将Go语言字符串转为C语言字符串,最后调用CGO包的C.puts
函数向标准输出窗口打印转换后的C字符串。
没有释放使用C.CString
创建的C语言字符串会导致内存泄漏。但是对于这个小程序来说,这样是没有问题的,因为程序退出后操作系统会自动回收程序的所有资源。
2.1.3 使用自己的C函数
现在我们先自定义一个叫SayHello
的C函数来实现打印,然后从Go语言环境中调用这个SayHello
函数:
// hello.go
package main
/*
#include <stdio.h>
static void SayHello(const char* s) {
puts(s);
}
*/
import "C"
func main() {
C.SayHello(C.CString("Hello, World\n"))
}
除了SayHello
函数是我们自己实现的之外,其它的部分和前面的例子基本相似
我们也可以将SayHello
函数放到当前目录下的一个C语言源文件中(后缀名必须是.c
)。因为是编写在独立的C文件中,为了允许外部引用,所以需要去掉函数的static
修饰符。
// hello.c
#include <stdio.h>
void SayHello(const char* s) {
puts(s);
}
然后在CGO部分先声明SayHello
函数,其它部分不变:
// hello.go
package main
//void SayHello(const char* s);
import "C"
func main() {
C.SayHello(C.CString("Hello, World\n"))
}
注意,如果之前运行的命令是go run hello.go
或go build hello.go
的话,此处须使用go run "your/package"
或go build "your/package"
才可以。若本就在包路径下的话,也可以直接运行go run .
或go build
。
既然SayHello
函数已经放到独立的C文件中了,我们自然可以将对应的C文件编译打包为静态库或动态库文件供使用。如果是以静态库或动态库方式引用SayHello
函数的话,需要将对应的C源文件移出当前目录(CGO构建程序会自动构建当前目录下的C源文件,从而导致C函数名冲突)。
2.1.4 C代码的模块化
模块化编程的核心是面向程序接口编程(这里的接口并不是Go语言的interface,而是API的概念)。
在前面的例子中,我们可以抽象一个名为hello的模块,模块的全部接口函数都在hello.h头文件定义:
// hello.h
void SayHello(const char* s);
其中只有一个SayHello函数的声明。但是作为hello模块的用户来说,就可以放心地使用SayHello函数,而无需关心函数的具体实现。而作为SayHello函数的实现者来说,函数的实现只要满足头文件中函数的声明的规范即可。
在采用面向C语言API接口编程之后,我们彻底解放了模块实现者的语言枷锁:实现者可以用任何编程语言实现模块,只要最终满足公开的API约定即可。我们可以用C语言实现SayHello函数,也可以使用更复杂的C++语言来实现SayHello函数,当然我们也可以用汇编语言甚至Go语言来重新实现SayHello函数。
2.1.5用Go重新实现C函数
其实CGO不仅仅用于Go语言中调用C语言函数,还可以用于导出Go语言函数给C语言函数调用。在前面的例子中,我们已经抽象一个名为hello的模块,模块的全部接口函数都在hello.h头文件定义:
// hello.h
void SayHello(/*const*/ char* s);
现在我们创建一个hello.go文件,用Go语言重新实现C语言接口的SayHello函数:
// hello.go
package main
import "C"
import "fmt"
//export SayHello
func SayHello(s *C.char) {
fmt.Print(C.GoString(s))
}
我们通过CGO的//export SayHello
指令将Go语言实现的函数SayHello
导出为C语言函数。为了适配CGO导出的C语言函数,我们禁止了在函数的声明语句中的const修饰符。需要注意的是,这里其实有两个版本的SayHello
函数:一个Go语言环境的;另一个是C语言环境的。cgo生成的C语言版本SayHello函数最终会通过桥接代码调用Go语言版本的SayHello函数。
通过面向C语言接口的编程技术,我们不仅仅解放了函数的实现者,同时也简化的函数的使用者。现在我们可以将SayHello当作一个标准库的函数使用(和puts函数的使用方式类似):
package main
//#include <hello.h>
import "C"
func main() {
C.SayHello(C.CString("Hello, World\n"))
}
2.1.6 面向C接口的Go编程
正所谓合久必分、分久必合,我们现在尝试将例子中的几个文件重新合并到一个Go文件。下面是合并后的成果:
package main
//void SayHello(char* s);
import "C"
import (
"fmt"
)
func main() {
C.SayHello(C.CString("Hello, World\n"))
}
//export SayHello
func SayHello(s *C.char) {
fmt.Print(C.GoString(s))
}
通过分析可以发现SayHello
函数的参数如果可以直接使用Go字符串是最直接的。在Go1.10中CGO新增加了一个_GoString_
预定义的C语言类型,用来表示Go语言字符串。下面是改进后的代码:
package main
//void SayHello(_GoString_ s);
import "C"
import (
"fmt"
)
func main() {
C.SayHello("Hello, World\n")
}
//export SayHello
func SayHello(s string) {
fmt.Print(s)
}
虽然看起来全部是Go语言代码,但是执行的时候是先从Go语言的main
函数,到CGO自动生成的C语言版本SayHello
桥接函数,最后又回到了Go语言环境的SayHello
函数。这个代码包含了CGO编程的精华,读者需要深入理解。
2.2 CGO基础
要使用CGO特性,需要安装C/C++构建工具链,在macOS和Linux下是要安装GCC,在windows下是需要安装MinGW工具。同时需要保证环境变量CGO_ENABLED
被设置为1,这表示CGO是被启用的状态。在本地构建时CGO_ENABLED
默认是启用的,当交叉构建时CGO默认是禁止的。比如要交叉构建ARM环境运行的Go程序,需要手工设置好C/C++交叉构建的工具链,同时开启CGO_ENABLED
环境变量。然后通过import "C"
语句启用CGO特性。
2.2.1 import "C"
语句
如果在Go代码中出现了import "C"
语句则表示使用了CGO特性,紧跟在这行语句前面的注释是一种特殊语法,里面包含的是正常的C语言代码。当确保CGO启用的情况下,还可以在当前目录中包含C/C++对应的源文件。
package main
/*
#include <stdio.h>
void printint(int v) {
printf("printint: %d\n", v);
}
*/
import "C"
func main() {
v := 42
C.printint(C.int(v))
}
这个例子展示了cgo的基本使用方法。开头的注释中写了要调用的C函数和相关的头文件,头文件被include之后里面的所有的C语言元素都会被加入到”C”这个虚拟的包中。需要注意的是,import “C”导入语句需要单独一行,不能与其他包一同import。向C函数传递参数也很简单,就直接转化成对应C语言类型传递就可以。如上例中C.int(v)
用于将一个Go中的int类型值强制类型转换转化为C语言中的int类型值,然后调用C语言定义的printint函数进行打印。
需要注意的是,Go是强类型语言,所以cgo中传递的参数类型必须与声明的类型完全一致,而且传递前必须用”C”中的转化函数转换成对应的C类型,不能直接传入Go中类型的变量。同时通过虚拟的C包导入的C语言符号并不需要是大写字母开头,它们不受Go语言的导出规则约束。
cgo将当前包引用的C语言符号都放到了虚拟的C包中,同时当前包依赖的其它Go语言包内部可能也通过cgo引入了相似的虚拟C包,但是不同的Go语言包引入的虚拟的C包之间的类型是不能通用的。这个约束对于要自己构造一些cgo辅助函数时有可能会造成一点的影响。
比如我们希望在Go中定义一个C语言字符指针对应的CChar类型,然后增加一个GoString方法返回Go语言字符串:
package cgo_helper
//#include <stdio.h>
import "C"
type CChar C.char
func (p *CChar) GoString() string {
return C.GoString((*C.char)(p))
}
func PrintCString(cs *C.char) {
C.puts(cs)
}
现在我们可能会想在其它的Go语言包中也使用这个辅助函数:
package main
//static const char* cs = "hello";
import "C"
import "./cgo_helper"
func main() {
cgo_helper.PrintCString(C.cs)
}
这段代码是不能正常工作的,因为当前main包引入的C.cs
变量的类型是当前main包的cgo构造的虚拟的C包下的char类型(具体点是`C.char,更具体点是
main.C.char),它和cgo_helper包引入的
C.char类型(具体点是
cgo_helper.C.char)是不同的。在Go语言中方法是依附于类型存在的,不同Go包中引入的虚拟的C包的类型却是不同的(
main.C不等
cgo_helper.C),这导致从它们延伸出来的Go类型也是不同的类型(
main.C.char不等
*cgo_helper.C.char`),这最终导致了前面代码不能正常工作。
有Go语言使用经验的用户可能会建议参数转型后再传入。但是这个方法似乎也是不可行的,因为cgo_helper.PrintCString
的参数是它自身包引入的*C.char
类型,在外部是无法直接获取这个类型的。换言之,一个包如果在公开的接口中直接使用了*C.char
等类似的虚拟C包的类型,其它的Go包是无法直接使用这些类型的,除非这个Go包同时也提供了*C.char
类型的构造函数。因为这些诸多因素,如果想在go test环境直接测试这些cgo导出的类型也会有相同的限制。
2.2.2 #cgo
语句
在import "C"
语句前的注释中可以通过#cgo
语句设置编译阶段和链接阶段的相关参数。编译阶段的参数主要用于定义相关宏和指定头文件检索路径。链接阶段的参数主要是指定库文件检索路径和要链接的库文件。
// #cgo CFLAGS: -DPNG_DEBUG=1 -I./include
// #cgo LDFLAGS: -L/usr/local/lib -lpng
// #include <png.h>
import "C"
上面的代码中,CFLAGS部分,-D
部分定义了宏PNG_DEBUG,值为1;-I
定义了头文件包含的检索目录。LDFLAGS部分,-L
指定了链接时库文件检索目录,-l
指定了链接时需要链接png库。
因为C/C++遗留的问题,C头文件检索目录可以是相对目录,但是库文件检索目录则需要绝对路径。在库文件的检索目录中可以通过${SRCDIR}
变量表示当前包目录的绝对路径:
// #cgo LDFLAGS: -L${SRCDIR}/libs -lfoo
上面的代码在链接时将被展开为:
// #cgo LDFLAGS: -L/go/src/foo/libs -lfoo
#cgo
语句主要影响CFLAGS、CPPFLAGS、CXXFLAGS、FFLAGS和LDFLAGS几个编译器环境变量。LDFLAGS用于设置链接时的参数,除此之外的几个变量用于改变编译阶段的构建参数(CFLAGS用于针对C语言代码设置编译参数)。
对于在cgo环境混合使用C和C++的用户来说,可能有三种不同的编译选项:其中CFLAGS对应C语言特有的编译选项、CXXFLAGS对应是C++特有的编译选项、CPPFLAGS则对应C和C++共有的编译选项。但是在链接阶段,C和C++的链接选项是通用的,因此这个时候已经不再有C和C++语言的区别,它们的目标文件的类型是相同的。
2.2.3 build tag 条件编译
2.3 类型转换
2.3.1 数值类型
C语言类型 | CGO类型 | Go语言类型 |
---|---|---|
char | C.char | byte |
singed char | C.schar | int8 |
unsigned char | C.uchar | uint8 |
short | C.short | int16 |
unsigned short | C.ushort | uint16 |
int | C.int | int32 |
unsigned int | C.uint | uint32 |
long | C.long | int32 |
unsigned long | C.ulong | uint32 |
long long int | C.longlong | int64 |
unsigned long long int | C.ulonglong | uint64 |
float | C.float | float32 |
double | C.double | float64 |
size_t | C.size_t | uint |
虽然在C语言中int
、short
等类型没有明确定义内存大小,但是在CGO中它们的内存大小是确定的。在CGO中,C语言的int
和long
类型都是对应4个字节的内存大小,size_t
类型可以当作Go语言uint
无符号整数类型对待。
2.11 补充说明
为何要话费巨大的精力学习CGO是一个问题。任何技术和语言都有它自身的优点和不足,Go语言不是银弹,它无法解决全部问题。而通过CGO可以继承C/C++将近半个世纪的软件遗产,通过CGO可以用Go给其它系统写C接口的共享库,通过CGO技术可以让Go语言编写的代码可以很好地融入现有的软件生态——而现在的软件正式建立在C/C++语言之上的。因此说CGO是一个保底的后备技术,它是Go的一个重量级的替补技术,值得任何一个严肃的Go语言开发人员学习。
感觉有点偏了,CGO暂时先不学了
第3章 Go汇编语言
无论高级语言如何发展,作为最接近CPU的汇编语言的地位依然是无法彻底被替代的。只有通过汇编语言才能彻底挖掘CPU芯片的全部功能,因此操作系统的引导过程必须要依赖汇编语言的帮助。只有通过汇编语言才能彻底榨干CPU芯片的性能,因此很多底层的加密解密等对性能敏感的算法会考虑通过汇编语言进行性能优化。
感觉有点偏了,这部分暂时先不学了
第4章 RPC和Protobuf
RPC是远程过程调用的缩写(Remote Procedure Call),通俗地说就是调用远处的一个函数。因为RPC涉及的函数可能非常之远,远到它们之间说着完全不同的语言,语言就成了两边的沟通障碍。而Protobuf因为支持多种不同的语言(甚至不支持的语言也可以扩展支持),其本身特性也非常方便描述服务的接口(也就是方法列表),因此非常适合作为RPC世界的接口交流语言。本章将讨论RPC的基本用法,如何针对不同场景设计自己的RPC服务,以及围绕Protobuf构造的更为庞大的RPC生态。
4.1 RPC入门
我们先构造一个HelloService类型,其中的Hello方法用于实现打印功能:
type HelloService struct {}
func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request
return nil
}
其中Hello方法必须满足Go语言的RPC规则:方法只能有两个可序列化的参数,其中第二个参数是指针类型,并且返回一个error类型,同时必须是公开的方法。
然后就可以将HelloService类型的对象注册为一个RPC服务:
func main() {
rpc.RegisterName("HelloService", new(HelloService))
listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}
rpc.ServeConn(conn)
}
其中rpc.Register函数调用会将对象类型中所有满足RPC规则的对象方法注册为RPC函数,所有注册的方法会放在“HelloService”服务空间之下。然后我们建立一个唯一的TCP链接,并且通过rpc.ServeConn函数在该TCP链接上为对方提供RPC服务。
下面是客户端请求HelloService服务的代码:
func main() {
client, err := rpc.Dial("tcp", "localhost:1234")
if err != nil {
log.Fatal("dialing:", err)
}
var reply string
err = client.Call("HelloService.Hello", "hello", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}
首选是通过rpc.Dial拨号RPC服务,然后通过client.Call调用具体的RPC方法。在调用client.Call时,第一个参数是用点号链接的RPC服务名字和方法名字,第二和第三个参数分别我们定义RPC方法的两个参数。
4.1.2 更安全的RPC接口
在涉及RPC的应用中,作为开发人员一般至少有三种角色:首选是服务端实现RPC方法的开发人员,其次是客户端调用RPC方法的人员,最后也是最重要的是制定服务端和客户端RPC接口规范的设计人员。
在前面的例子中我们为了简化将以上几种角色的工作全部放到了一起,虽然看似实现简单,但是不利于后期的维护和工作的切割。
4.2 Protobuf
4.2.1 Protobuf入门
对于没有用过Protobuf的读者,建议先从官网了解下基本用法。这里我们尝试将Protobuf和RPC结合在一起使用,通过Protobuf来最终保证RPC的接口规范和安全。Protobuf中最基本的数据单元是message,是类似Go语言中结构体的存在。在message中可以嵌套message或其它的基础数据类型的成员。
首先创建hello.proto文件,其中包装HelloService服务中用到的字符串类型:
syntax = "proto3";
package main;
message String {
string value = 1;
}
开头的syntax语句表示采用proto3的语法。第三版的Protobuf对语言进行了提炼简化,所有成员均采用类似Go语言中的零值初始化(不再支持自定义默认值),因此消息成员也不再需要支持required特性。然后package指令指明当前是main包(这样可以和Go的包名保持一致,简化例子代码),当然用户也可以针对不同的语言定制对应的包路径和名称。最后message关键字定义一个新的String类型,在最终生成的Go语言代码中对应一个String结构体。String类型中只有一个字符串类型的value成员,该成员编码时用1编号代替名字。
在XML或JSON等数据描述语言中,一般通过成员的名字来绑定对应的数据。但是Protobuf编码却是通过成员的唯一编号来绑定对应的数据,因此Protobuf编码后数据的体积会比较小,但是也非常不便于人类查阅。我们目前并不关注Protobuf的编码技术,最终生成的Go结构体可以自由采用JSON或gob等编码格式,因此大家可以暂时忽略Protobuf的成员编码部分。
Protobuf核心的工具集是C++语言开发的,在官方的protoc编译器中并不支持Go语言。要想基于上面的hello.proto文件生成相应的Go代码,需要安装相应的插件。首先是安装官方的protoc工具,可以从 https://github.com/google/protobuf/releases 下载。然后是安装针对Go语言的代码生成插件,可以通过go get github.com/golang/protobuf/protoc-gen-go
命令安装。
感觉这一章也不是很实用,暂且搁置了
第五章 go web
5.1 web 开发简介
因为Go的 net/http
包提供了基础的路由函数组合与丰富的功能函数。所以在社区里流行一种用Go编写api不需要框架的观点;在我们看来,如果你的项目的路由在个位数、URI 固定且不通过 URI 来传递参数,那么确实使用官方库也就足够。但在复杂场景下,官方的 http 库还是有些力有不逮。例如下面这样的路由:
GET /card/:id
POST /card/:id
DELTE /card/:id
GET /card/:id/name
...
GET /card/:id/relations
可见是否使用框架还是要具体问题具体分析的。
Go的Web框架大致可以分为这么两类:
- Router框架
- MVC类框架
在框架的选择上,大多数情况下都是依照个人的喜好和公司的技术栈。例如公司有很多技术人员是PHP出身,那么他们一定会非常喜欢像beego 这样的框架,但如果公司有很多 C 程序员,那么他们的想法可能是越简单越好。比如很多大厂的 C 程序员甚至可能都会去用 C 去写很小的 CGI 程序,他们可能本身并没有什么意愿去学习MVC或者更复杂的 web 框架,他们需要的只是一个非常简单的路由(甚至连路由都不需要,只需要一个基础的HTTP协议处理库来帮他省掉没什么意思的体力劳动)。
根据我们的经验,简单地来说,只要你的路由带有参数,并且这个项目的 api 数目超过了 10,就尽量不要使用 net/http 中默认的路由。在Go开源界应用最广泛的 router 是 httpRouter,很多开源的 router 框架都是基于 httpRouter 进行一定程度的改造的成果。
再来回顾一下文章开头说的,开源界有这么几种框架,第一种是对 httpRouter 进行简单的封装,然后提供定制的 middleware 和一些简单的小工具集成比如 gin,主打轻量,易学,高性能。第二种是借鉴其它语言的编程风格的一些 MVC 类框架,例如 beego,方便从其它语言迁移过来的程序员快速上手,快速开发。还有一些框架功能更为强大,除了 db 设计,大部分代码直接生成,例如 goa。不管哪种框架,适合开发者背景的就是最好的。
5.2 router 请求路由
在常见的 web 框架中,router 是必备的组件。golang 圈子里 router 也时常被称为 http 的 multiplexer。在上一节中我们通过对 Burrow 代码的简单学习,已经知道如何用 http 标准库中内置的 mux 来完成简单的路由功能了。如果开发 web 系统对路径中带参数没什么兴趣的话,用 http 标准库中的 mux 就可以。
restful 是几年前刮起的 API 设计风潮,在 restful 中除了 GET 和 POST 之外,还使用了 http 协议定义的几种其它的标准化语义。具体包括:
const (
MethodGet = "GET"
MethodHead = "HEAD"
MethodPost = "POST"
MethodPut = "PUT"
MethodPatch = "PATCH" // RFC 5789
MethodDelete = "DELETE"
MethodConnect = "CONNECT"
MethodOptions = "OPTIONS"
MethodTrace = "TRACE"
)
restful 中常见的请求路径:
GET /repos/:owner/:repo/comments/:id/reactions
POST /projects/:project_id/columns
PUT /user/starred/:owner/:repo
DELETE /user/starred/:owner/:repo
相信聪明的你已经猜出来了,这是 github 官方文档中挑出来的几个 api 设计。restful 风格的 API 重度依赖请求路径。会将很多参数放在请求 URI 中。除此之外还会使用很多并不那么常见的 HTTP 状态码,不过本节只讨论路由,所以先略过不谈。
如果我们的系统也想要这样的 URI 设计,使用标准库的 mux 显然就力不从心了。
5.2.1 httprouter
较流行的开源 golang web 框架大多使用 httprouter,或是基于 httprouter 的变种对路由进行支持。前面提到的 github 的参数式路由在 httprouter 中都是可以支持的。
因为 httprouter 中使用的是显式匹配,所以在设计路由的时候需要规避一些会导致路由冲突的情况,例如:
conflict:
GET /user/info/:name
GET /user/:id
no conflict:
GET /user/info/:name
POST /user/:id
简单来讲的话,如果两个路由拥有一致的 http method (指 GET/POST/PUT/DELETE) 和请求路径前缀,且在某个位置出现了 A 路由是 wildcard (指 :id 这种形式) 参数,B 路由则是普通字符串,那么就会发生路由冲突。路由冲突会在初始化阶段直接 panic:
还有一点需要注意,因为 httprouter 考虑到字典树的深度,在初始化时会对参数的数量进行限制
除支持路径中的 wildcard 参数之外,httprouter 还可以支持 *
号来进行通配,不过 *
号开头的参数只能放在路由的结尾,例如下面这样:
Pattern: /src/*filepath
/src/ filepath = ""
/src/somefile.go filepath = "somefile.go"
/src/subdir/somefile.go filepath = "subdir/somefile.go"
这种设计在 restful 中可能不太常见,主要是为了能够使用 httprouter 来做简单的 http 静态文件服务器。
除了正常情况下的路由支持,httprouter 也支持对一些特殊情况下的回调函数进行定制,例如 404 的时候:
r := httprouter.New()
r.NotFound = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("oh no, not found"))
})
或者内部 panic 的时候:
r.PanicHandler = func(w http.ResponseWriter, r *http.Request, c interface{}) {
log.Printf("Recovering from panic, Reason: %#v", c.(error))
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(c.(error).Error()))
}
目前开源界最为流行(star 数最多)的 web 框架 gin 使用的就是 httprouter 的变种。
5.2.2 原理
httprouter 和众多衍生 router 使用的数据结构被称为 radix tree,压缩字典树。
如果不用字典树来完成上述功能,要对历史字符串进行排序,再利用二分查找之类的算法去搜索,时间复杂度只高不低。可认为字典树是一种空间换时间的典型做法。
5.2.3 压缩字典树创建过程
我们来跟踪一下 httprouter 中,一个典型的压缩字典树的创建过程,路由设定如下:
PUT /user/installations/:installation_id/repositories/:repository_id
GET /marketplace_listing/plans/
GET /marketplace_listing/plans/:id/accounts
GET /search
GET /status
GET /support
补充路由:
GET /marketplace_listing/plans/ohyes
最后一条补充路由是我们臆想的,除此之外所有 API 路由均来自于 api.github.com。
5.2.3.1 root 节点创建
httprouter 的 Router struct 中存储压缩字典树使用的是下述数据结构
/ 略去了其它部分的 Router struct
type Router struct {
// ...
trees map[string]*node
// ...
}
trees 中的 key 即为 http 1.1 的 RFC 中定义的各种 method,具体有:
GET
HEAD
OPTIONS
POST
PUT
PATCH
DELETE
每一种 method 对应的都是一棵独立的压缩字典树,这些树彼此之间不共享数据。具体到我们上面用到的路由,PUT 和 GET 是两棵树而非一棵。
简单来讲,某个 method 第一次插入的路由就会导致对应字典树的根节点被创建,我们按顺序,先是一个 PUT:
r := httprouter.New()
r.PUT("/user/installations/:installation_id/repositories/:reposit", Hello)
这样 PUT 对应的根节点就会被创建出来。把这棵 PUT 的树画出来:
radix 的节点类型为 *httprouter.node
,为了说明方便,我们留下了目前关心的几个字段:
path: 当前节点对应的路径中的字符串
wildChild: 子节点是否为参数节点,即 wildcard node,或者说 :id 这种类型的节点
nType: 当前节点类型,有四个枚举值: 分别为 static/root/param/catchAll。
static // 非根节点的普通字符串节点
root // 根节点
param // 参数节点,例如 :id
catchAll // 通配符节点,例如 *anyway
indices: 子节点索引,当子节点为非参数类型,即本节点的 wildChild 为 false 时,会将每个子节点的首字母放在该索引数组。说是数组,实际上是个 string。
当然,PUT 路由只有唯一的一条路径。接下来,我们以后续的多条 GET 路径为例,讲解子节点的插入过程。
5.2.3.2 子节点插入
当插入 GET /marketplace_listing/plans
时,类似前面 PUT 的过程,GET 树的结构如图所示:
因为第一个路由没有参数,path 都被存储到根节点上了。所以只有一个节点。
然后插入 GET /marketplace_listing/plans/:id/accounts
,新的路径与之前的路径有共同的前缀,且可以直接在之前叶子节点后进行插入,那么结果也很简单,插入后树变成了这样:
由于 :id
这个节点只有一个字符串的普通子节点,所以 indices 还依然不需要处理。
上面这种情况比较简单,新的路由可以直接作为原路由的子节点进行插入。实际情况不会这么美好。
5.2.3.3 边分裂
我们插入 GET /search
,这时会导致树的边分裂。
原有路径和新的路径在初始的 /
位置发生分裂,这样需要把原有的 root 节点内容下移,再将新路由 search
同样作为子节点挂在 root 节点之下。这时候因为子节点出现多个,root 节点的 indices 提供子节点索引,这时候该字段就需要派上用场了。”ms” 代表子节点的首字母分别为 m(marketplace) 和 s(search)。
我们一口作气,把 GET /status
和 GET /support
也插入到树中。这时候会导致在 search
节点上再次发生分裂,来看看最终的结果:
5.2.3.4 子节点冲突处理
在路由本身只有字符串的情况下,不会发生任何冲突。只有当路由中含有 wildcard(类似 :id) 或者 catchAll 的情况下才可能冲突。这一点在前面已经提到了。
子节点的冲突处理很简单,分几种情况:
- 在插入 wildcard 节点时,父节点的 children 数组非空且 wildChild 被设置为 false。例如:
GET /user/getAll
和GET /user/:id/getAddr
,或者GET /user/*aaa
和GET /user/:id
。 - 在插入 wildcard 节点时,父节点的 children 数组非空且 wildChild 被设置为 true,但该父节点的 wildcard 子节点要插入的 wildcard 名字不一样。例如:
GET /user/:id/info
和GET /user/:name/info
。 - 在插入 catchAll 节点时,父节点的 children 非空。例如:
GET /src/abc
和GET /src/*filename
,或者GET /src/:id
和GET /src/*filename
。 - 在插入 static 节点时,父节点的 wildChild 字段被设置为 true。
- 在插入 static 节点时,父节点的 children 非空,且子节点 nType 为 catchAll。
只要发生冲突,都会在初始化的时候 panic
5.3 middleware 中间件
5.3.1 代码泥潭
5.3.2 使用 middleware 剥离非业务逻辑
实际上,我们犯的最大的错误是把业务代码和非业务代码揉在了一起。对于大多数的场景来讲,非业务的需求都是在 http 请求处理前做一些事情,或者/并且在响应完成之后做一些事情。
我们需要给我们的 helloHandler 增加超时时间统计,我们可以使用一种叫 function adapter
的方法来对 helloHandler 进行包装:
func hello(wr http.ResponseWriter, r *http.Request) {
wr.Write([]byte("hello"))
}
func timeMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(wr http.ResponseWriter, r *http.Request) {
timeStart := time.Now()
// next handler
next.ServeHTTP(wr, r)
timeElapsed := time.Since(timeStart)
logger.Println(timeElapsed)
})
}
func main() {
http.Handle("/", timeMiddleware(http.HandlerFunc(hello)))
err := http.ListenAndServe(":8080", nil)
...
}
这样就非常轻松地实现了业务与非业务之间的剥离,魔法就在于这个 timeMiddleware。可以从代码中看到,我们的 timeMiddleware 也是一个函数,其参数为 http.Handler,http.Handler 的定义在 net/http 包中:
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
任何方法实现了 ServeHTTP,即是一个合法的 http.Handler,
梳理一下 http 库的 Handler,HandlerFunc 和 ServeHTTP 的关系:
type Handler interface {
ServeHTTP(ResponseWriter, *Request)
}
type HandlerFunc func(ResponseWriter, *Request)
func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request)
f(w, r)
}
实际上只要你的 handler 函数签名是:
func (ResponseWriter, *Request)
那么这个 handler 和 http.HandlerFunc 就有了一致的函数签名,可以将该 handler 函数进行类型转换,转为 http.HandlerFunc。
而 http.HandlerFunc 实现了 http.Handler 这个接口。在 http 库需要调用你的 handler 函数来处理 http 请求时,会调用 HandlerFunc 的 ServeHTTP 函数,可见一个请求的基本调用链是这样的:
h = getHandler() => h.ServeHTTP(w, r) => h(w, r)
上面提到的把自定义 handler 转换为 http.HandlerFunc 这个过程是必须的,因为我们的 handler 没有直接实现 ServeHTTP 这个接口。上面的代码中我们看到的 HandleFunc(注意 HandlerFunc 和 HandleFunc 的区别)里也可以看到这个强制转换过程:
func HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
DefaultServeMux.HandleFunc(pattern, handler)
}
// 调用
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
mux.Handle(pattern, HandlerFunc(handler))
}
知道 handler 是怎么一回事,我们的中间件通过包装 handler,再返回一个新的 handler 就好理解了。
总结一下,我们的中间件要做的事情就是通过一个或多个函数对 handler 进行包装,返回一个包括了各个中间件逻辑的函数链。我们把上面的包装再做得复杂一些:
customizedHandler = logger(timeout(ratelimit(helloHandler)))
这个函数链在执行过程中的上下文可以用下面这张图来表示。
再直白一些,这个流程在进行请求处理的时候实际上就是不断地进行函数压栈再出栈,有一些类似于递归的执行流:
[exec of logger logic] 函数栈: []
[exec of timeout logic] 函数栈: [logger]
[exec of ratelimit logic] 函数栈: [timeout/logger]
[exec of helloHandler logic] 函数栈: [ratelimit/timeout/logger]
[exec of ratelimit logic part2] 函数栈: [timeout/logger]
[exec of timeout logic part2] 函数栈: [logger]
[exec of logger logic part2] 函数栈: []
功能实现了,但在上面的使用过程中我们也看到了,这种函数套函数的用法不是很美观,同时也不具备什么可读性。
5.3.3 更优雅的 middleware 写法
如果需要修改这些函数的顺序,或者增删 middleware 还是有点费劲,
r = NewRouter()
r.Use(logger)
r.Use(timeout)
r.Use(ratelimit)
r.Add("/", helloHandler)
如果我们要增加或者删除 middleware,只要简单地增加删除对应的 Use 调用就可以了。非常方便。
从框架的角度来讲,怎么实现这样的功能呢?也不复杂:
type middleware func(http.Handler) http.Handler
type Router struct {
middlewareChain [] func(http.Handler) http.Handler
mux map[string] http.Handler
}
func NewRouter() *Router{
return &Router{}
}
func (r *Router) Use(m middleware) {
r.middlewareChain = append(r.middlewareChain, m)
}
func (r *Router) Add(route string, h http.Handler) {
var mergedHandler = h
for i := len(r.middlewareChain) - 1; i >= 0; i-- {
mergedHandler = r.middlewareChain[i](mergedHandler)
}
r.mux[route] = mergedHandler
}
注意代码中的 middleware 数组遍历顺序,和用户希望的调用顺序应该是”相反”的。应该不难理解。
5.3.4 哪些事情适合在 middleware 中做
比如开源界很火的 gin 这个框架,就专门为用户贡献的 middleware 开了一个仓库:
如果读者去阅读 gin 的源码的话,可能会发现 gin 的 middleware 中处理的并不是 http.Handler,而是一个叫 gin.HandlerFunc 的函数类型,和本节中讲解的 http.Handler 签名并不一样。不过实际上 gin 的 handler 也只是针对其框架的一种封装,middleware 的原理与本节中的说明是一致的。
5.4 validator 请求校验
5.4.1 重构请求校验函数
假设我们的数据已经通过某个 binding 库绑定到了具体的 struct 上。
type RegisterReq struct {
Username string `json:"username"`
PasswordNew string `json:"password_new"`
PasswordRepeat string `json:"password_repeat"`
Email string `json:"email"`
}
func register(req RegisterReq) error{
if len(req.Username) > 0 {
if len(req.PasswordNew) > 0 && len(req.PasswordRepeat) > 0 {
if req.PasswordNew == req.PasswordRepeat {
if emailFormatValid(req.Email) {
createUser()
return nil
} else {
return errors.New("invalid email")
}
} else {
return errors.New("password and reinput must be the same")
}
} else {
return errors.New("password and password reinput must be longer than 0")
}
} else {
return errors.New("length of username cannot be 0")
}
}
我们在 golang 里成功写出了 hadoken 开路的箭头型代码。。这种代码一般怎么进行优化呢?
if len(req.Username) == 0 {
return errors.New("length of username cannot be 0")
}
if len(req.PasswordNew) == 0 || len(req.PasswordRepeat) == 0 {
return errors.New("password and password reinput must be longer than 0")
}
if req.PasswordNew != req.PasswordRepeat {
return errors.New("password and reinput must be the same")
}
if emailFormatValid(req.Email) {
return errors.New("invalid email")
}
createUser()
return nil
}
虽然使用了重构方法使我们的 validate 过程看起来优雅了,但我们还是得为每一个 http 请求都去写这么一套差不多的 validate 函数,有没有更好的办法来帮助我们解除这项体力劳动?答案就是 validator。
5.4.2 用 validator 解放体力劳动
从设计的角度讲,我们一定会为每个请求都声明一个 struct。前文中提到的校验场景我们都可以通过 validator 完成工作。还以前文中的 struct 为例。为了美观起见,我们先把 json tag 省略掉。
这里我们引入一个新的 validator 库:
import "gopkg.in/go-playground/validator.v9"
type RegisterReq struct {
// 字符串的 gt=0 表示长度必须 > 0,gt = greater than
Username string `validate:"gt=0"`
// 同上
PasswordNew string `validate:"gt=0"`
// eqfield 跨字段相等校验
PasswordRepeat string `validate:"eqfield=PasswordNew"`
// 合法 email 格式校验
Email string `validate:"email"`
}
func validate(req RegisterReq) error {
err := validate.Struct(mystruct)
if err != nil {
doSomething()
}
...
}
这样就不需要在每个请求进入业务逻辑之前都写重复的 validate 函数了。本例中只列出了这个 validator 非常简单的几个功能。
我们试着跑一下这个程序,输入参数设置为:
//...
var req = RegisterReq {
Username : "Xargin",
PasswordNew : "ohno",
PasswordRepeat : "ohn",
Email : "alex@abc.com",
}
err := validate.Struct(mystruct)
fmt.Println(err) // Key: 'RegisterReq.PasswordRepeat' Error:Field validation for 'PasswordRepeat' failed on the 'eqfield' tag
如果觉得这个 validator 提供的错误信息不够人性化,例如要把错误信息返回给用户,那就不应该直接显示英文了。可以针对每种 tag 进行错误信息定制,读者可以自行探索。
5.4.3 原理
从结构上来看,每一个 struct 都可以看成是一棵树。假如我们有如下定义的 struct:
type Nested struct {
Email string `validate:"email"`
}
type T struct {
Age int `validate:"eq=10"`
Nested Nested
}
从字段校验的需求来讲,无论我们采用深度优先搜索还是广度优先搜索来对这棵 struct 树来进行遍历,都是可以的。
有心的读者这时候可能会产生一个问题,我们对 struct 进行 validate 时大量使用了 reflect,而 go 的 reflect 在性能上不太出众,有时甚至会影响到我们程序的性能。这样的考虑确实有一些道理,但需要对 struct 进行大量校验的场景往往出现在 web 服务,这里并不一定是程序的性能瓶颈所在,实际的效果还是要从 pprof 中做更精确的判断。
如果基于反射的 validator 真的成为了你服务的性能瓶颈怎么办?现在也有一种思路可以避免反射:使用 golang 内置的 parser 对源代码进行扫描,然后根据 struct 的定义生成校验代码。我们可以将所有需要校验的结构体放在单独的 package 内。
5.5 Database 和数据库打交道
本节将对 db/sql 官方标准库作一些简单分析,并介绍一些应用比较广泛的开源 ORM 和 sql builder。并从企业级应用开发和公司架构的角度来分析哪种技术栈对于现代的企业级应用更为合适。
5.5.1 从 database/sql 讲起
Go官方提供了 database/sql
包来给用户进行和数据库打交道的工作,实际上 database/sql
库就只是提供了一套操作数据库的接口和规范,例如抽象好的 sql 预处理(prepare),连接池管理,数据绑定,事务,错误处理等等。官方并没有提供具体某种数据库实现的协议支持。
和具体的数据库,例如 MySQL 打交道,还需要再引入 MySQL 的驱动,像下面这样:
import "database/sql"
import _ "github.com/go-sql-driver/mysql"
db, err := sql.Open("mysql", "user:password@/dbname")
import _ "github.com/go-sql-driver/mysql"
这一句 import,实际上是调用了 mysql 包的 init 函数,做的事情也很简单:
func init() {
sql.Register("mysql", &MySQLDriver{})
}
在 sql 包的 全局 map 里把 mysql 这个名字的 driver 注册上。实际上 Driver 在 sql 包中是一个 interface:
type Driver interface {
Open(name string) (Conn, error)
}
调用 sql.Open() 返回的 db 对象实际上就是这里的 Conn。
type Conn interface {
Prepare(query string) (Stmt, error)
Close() error
Begin() (Tx, error)
}
也是一个接口。实际上如果你仔细地查看 database/sql/driver/driver.go 的代码会发现,这个文件里所有的成员全都是 interface,对这些类型进行操作,实际上还是会调用具体的 driver 里的方法。
从用户的角度来讲,在使用 database/sql 包的过程中,你能够使用的也就是这些 interface 里提供的函数
package main
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
func main() {
// db 是一个 sql.DB 类型的对象
// 该对象线程安全,且内部已包含了一个连接池
// 连接池的选项可以在 sql.Open 中设置,这里为了简单省略了
db, err := sql.Open("mysql",
"user:password@tcp(127.0.0.1:3306)/hello")
if err != nil {
log.Fatal(err)
}
defer db.Close()
var (
id int
name string
)
rows, err := db.Query("select id, name from users where id = ?", 1)
if err != nil {
log.Fatal(err)
}
defer rows.Close()
// 必须要把 rows 里的内容读完,否则连接永远不会释放
for rows.Next() {
err := rows.Scan(&id, &name)
if err != nil {
log.Fatal(err)
}
log.Println(id, name)
}
err = rows.Err()
if err != nil {
log.Fatal(err)
}
}
官方的 db 库提供的功能这么简单,我们每次去数据库里读取内容岂不是都要去写这么一套差不多的代码?或者如果我们的对象是 struct,把 sql.Rows 绑定到对象的工作就会变得更加得重复而无聊。
是的,所以社区才会有各种各样的 sql builder 和 orm 百花齐放。
5.5.2 提高生产效率的 ORM 和 SQL Builder
在 web 开发领域常常提到的 ORM 是什么?我们先看看万能的维基百科:
对象关系映射(英语:Object Relational Mapping,简称ORM,或O/RM,或O/R mapping),是一种程序设计技术,用于实现面向对象编程语言里不同类型系统的数据之间的转换。
从效果上说,它其实是创建了一个可在编程语言里使用的“虚拟对象数据库”。
最为常见的 ORM 实际上做的是从 db -> 程序的 class / struct 这样的映射。所以你手边的程序可能是从 mysql 的表 -> 你的程序内 class。
ORM 的目的就是屏蔽掉DB层,实际上很多语言的 ORM 只要把你的 class/struct 定义好,再用特定的语法将结构体之间的一对一或者一对多关系表达出来。
举个例子,我们有个需求:向用户展示最新的商品列表,我们再假设,商品和商家是1:1的关联关系,我们就很容易写出像下面这样的代码:
# 伪代码
shopList := []
for product in productList {
shopList = append(shopList, product.GetShop)
}
当然了,我们不能批判这样写代码的程序员是偷懒的程序员。因为ORM 一类的工具在出发点上就是屏蔽 sql,让我们对数据库的操作更接近于人类的思维方式。这样很多只接触过 orm 而且又是刚入行的程序员就很容易写出上面这样的代码。
这样的代码将对数据库的读请求放大了 N 倍。也就是说,如果你的商品列表有 15 个 SKU,那么每次用户打开这个页面,至少需要执行 1(查询商品列表) + 15(查询相关的商铺信息) 次查询。这里 N 是 16。如果你的列表页很大,比如说有 600 个条目,那么你就至少要执行 1 + 600 次查询。如果说你的数据库能够承受的最大的简单查询是12w QPS,而上述这样的查询正好是你最常用的查询的话,实际上你能对外提供的服务能力是多少呢?是 200 qps!互联网系统的忌讳之一,就是这种无端的读放大。
当然,你也可以说这不是 ORM 的问题,如果你手写 sql 你还是可能会写出差不多的程序,那么再来看两个 demo:
o := orm.NewOrm()
num, err := o.QueryTable("cardgroup").Filter("Cards__Card__Name", cardName).All(&cardgroups)
很多 orm 都提供了这种 Filter 类型的查询方式,beego 也不例外。不过实际上在这段 orm 背后隐藏了非常难以察觉的细节,那就是生成的 sql 语句会自动 limit 1000。
一个程序库背地里做的事情还是越少越好,如果一定要做,那也一定要在显眼的地方做。比如上面的例子,去掉这种默认的自作聪明的行为,或者要求用户强制传入 limit 参数都是更好的选择
orm 想从设计上隐去太多的细节。而方便的代价是其背后的运行完全失控。这样的项目在经过几任维护人员之后,将变得面目全非,难以维护。上了规模的公司的人们渐渐达成了一个共识,由于隐藏重要的细节,ORM 可能是失败的设计。其所隐藏的重要细节对于上了规模的系统开发来说至关重要。
相比 ORM 来说,sql builder 在 sql 和项目可维护性之间取得了比较好的平衡。首先 sql builer 不像 ORM 那样屏蔽了过多的细节,其次从开发的角度来讲,sql builder 简单进行封装后也可以非常高效地完成开发,举个例子:
where := map[string]interface{} {
"order_id > ?" : 0,
"customer_id != ?" : 0,
}
limit := []int{0,100}
orderBy := []string{"id asc", "create_time desc"}
orders := orderModel.GetList(where, limit, orderBy)
说白了 sql builder 是 sql 在代码里的一种特殊方言,如果你们没有DBA(A database administrator )但研发有自己分析和优化 sql 的能力,或者你们公司的 dba 对于学习这样一些 sql 的方言没有异议。那么使用 sql builder 是一个比较好的选择,不会导致什么问题。
在一些本来也不需要DBA介入的场景内,使用 sql builder 也是可以的,例如你要做一套运维系统,且将 mysql 当作了系统中的一个组件,系统的 QPS 不高,查询不复杂等等。
一旦你做的是高并发的 OLTP 在线系统,且想在人员充足分工明确的前提下最大程度控制系统的风险,使用 sql builder 就不合适了。
5.5.3 脆弱的 db
无论是 ORM 还是 sql builder 都有一个致命的缺点,就是没有办法进行系统上线的事前 sql 审核。虽然很多 orm 和 sql builder 也提供了运行期打印 sql 的功能,但只在查询的时候才能进行输出。而 sql builder 和 ORM本身提供的功能太过灵活。使得你不可能通过测试枚举出所有可能在线上执行的 sql。
对于现在 7 * 24 服务的互联网公司来说,服务不可用是非常重大的问题。存储层的技术栈虽经历了多年的发展,在整个系统中依然是最为脆弱的一环。系统宕机对于 24 小时对外提供服务的公司来说,意味着直接的经济损失。个中风险不可忽视。
所以现如今,大型的互联网公司核心线上业务都会在代码中把 sql 放在显眼的位置提供给 DBA review,以此来控制系统在数据层的风险。结合Go 举一个例子:
const (
getAllByProductIDAndCustomerID = `select * from p_orders where product_id in (:product_id) and customer_id=:customer_id`
)
// GetAllByProductIDAndCustomerID
// @param driver_id
// @param rate_date
// @return []Order, error
func GetAllByProductIDAndCustomerID(ctx context.Context, productIDs []uint64, customerID uint64) ([]Order, error) {
var orderList []Order
params := map[string]interface{}{
"product_id" : productIDs,
"customer_id": customerID,
}
// getAllByProductIDAndCustomerID 是 const 类型的 sql 字符串
sql, args, err := sqlutil.Named(getAllByProductIDAndCustomerID, params)
if err != nil {
return nil, err
}
err = dao.QueryList(ctx, sqldbInstance, sql, args, &orderList)
if err != nil {
return nil, err
}
return orderList, err
}
像这样的代码,在上线之前把DAO层的变更集的 const 部分直接拿给 dba 来进行审核,就比较方便了。代码中的 sqlutil.Named 是类似于 sqlx 中的 Named 函数,同时支持 where 表达式中的比较操作符和 in。
5.6 Ratelimit 服务流量限制
计算机程序可依据其瓶颈分为 Disk IO-bound,CPU-bound,Network-bound,分布式场景下有时候也会外部系统而导致自身瓶颈
web 系统打交道最多的是网络,无论是接收,解析用户请求,访问存储,还是把响应数据返回给用户,都是要走网络的。在没有 epoll/kqueue 之类的系统提供的 IO 多路复用接口之前,多个核心的现代计算机最头痛的是 C10k 问题,C10k 问题会导致计算机没有办法充分利用 CPU 来处理更多的用户连接,进而没有办法通过优化程序提升 CPU 利用率来处理更多的请求。
自从 linux 实现了 epoll,freebsd 实现了 kqueue,这个问题基本解决了,我们可以借助内核提供的 API 轻松解决当年的 C10k 问题,也就是说如今如果你的程序主要是和网络打交道,那么瓶颈一定在用户程序而不在操作系统内核。
随着时代的发展,编程语言对这些系统调用又进一步进行了封装,如今做应用层开发,几乎不会在程序中看到 epoll 之类的字眼,大多数时候我们就只要聚焦在业务逻辑上就好。Go 的 net 库针对不同平台封装了不同的 syscall API,http 库又是构建在 net 库之上,所以在 Go 我们可以借助标准库,很轻松地写出高性能的 http 服务,下面是一个简单的 hello world
服务的代码:
package main
import (
"io"
"log"
"net/http"
)
func sayhello(wr http.ResponseWriter, r *http.Request) {
wr.WriteHeader(200)
io.WriteString(wr, "hello world")
}
func main() {
http.HandleFunc("/", sayhello)
err := http.ListenAndServe(":9090", nil)
if err != nil {
log.Fatal("ListenAndServe:", err)
}
}
我们需要衡量一下这个 web 服务的吞吐量,再具体一些,实际上就是接口的 QPS(Queries Per Second) 是每秒查询率。借助 wrk,在家用电脑 Macbook Pro 上对这个 hello world
服务进行基准测试,
这里的 hello world
服务没有任何业务逻辑。真实环境的程序要复杂得多,有些程序偏 Network-bound,例如一些 cdn 服务、proxy 服务;有些程序偏 CPU/GPU bound,例如登陆校验服务、图像处理服务;有些程序偏 Disk IO-bound,例如专门的存储系统,数据库。不同的程序瓶颈会体现在不同的地方,这里提到的这些功能单一的服务相对来说还算容易分析。如果碰到业务逻辑复杂代码量巨大的模块,其瓶颈并不是三下五除二可以推测出来的,还是需要从压力测试中得到更为精确的结论。
对于 IO/Network bound 类的程序,其表现是网卡/磁盘 IO 会先于 CPU 打满,这种情况即使优化 CPU 的使用也不能提高整个系统的吞吐量,只能提高磁盘的读写速度,增加内存大小,提升网卡的带宽来提升整体性能。而 CPU bound 类的程序,则是在存储和网卡未打满之前 CPU 占用率提前到达 100%,CPU 忙于各种计算任务,IO 设备相对则较闲。
无论哪种类型的服务,在资源使用到极限的时候都会导致请求堆积,超时,系统 hang 死,最终伤害到终端用户。对于分布式的 web 服务来说,瓶颈还不一定总在系统内部,也有可能在外部。非计算密集型的系统往往会在关系型数据库环节失守,而这时候 web 模块本身还远远未达到瓶颈。
不管我们的服务瓶颈在哪里,最终要做的事情都是一样的,那就是流量限制
5.6.1 常见的流量限制手段
流量限制的手段有很多,最常见的:漏桶、令牌桶两种:
- 漏桶是指我们有一个一直装满了水的桶,每过固定的一段时间即向外漏一滴水。如果你接到了这滴水,那么你就可以继续服务请求,如果没有接到,那么就需要等待下一滴水。
- 令牌桶则是指匀速向桶中添加令牌,服务请求时需要从桶中获取令牌,令牌的数目可以按照需要消耗的资源进行相应的调整。如果没有令牌,可以选择等待,或者放弃。
这两种方法看起来很像,不过还是有区别的。漏桶流出的速率固定,而令牌桶只要在桶中有令牌,那就可以拿。也就是说令牌桶是允许一定程度的并发的,令牌桶在桶中没有令牌的情况下也会退化为漏桶模型。
实际应用中令牌桶应用较为广泛,开源界流行的限流器大多数都是基于令牌桶思想的。并且在此基础上进行了一定程度的扩充,比如 github.com/juju/ratelimit
提供了几种不同特色的令牌桶填充方式:
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
默认的令牌桶,fillInterval 指每过多长时间向桶里放一个令牌,capacity 是桶的容量,超过桶容量的部分会被直接丢弃。桶初始是满的。
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket
和普通的 NewBucket 的区别是,每次向桶中放令牌时,是放 quantum 个令牌,而不是一个令牌。
func NewBucketWithRate(rate float64, capacity int64) *Bucket
这个就有点特殊了,会按照提供的比例,每秒钟填充令牌数。例如 capacity 是 100,而 rate 是 0.1,那么每秒会填充 10 个令牌。
相比于开源界更为有名的 google 的 Java 工具库 Guava 中提供的 ratelimiter,这个库不支持令牌桶预热,且无法修改初始的令牌容量,所以可能个别极端情况下的需求无法满足。但在明白令牌桶的基本原理之后,如果没办法满足需求,相信你也可以很快对其进行修改并支持自己的业务场景。
5.6.2 原理
从功能上来看,令牌桶模型实际上就是对全局计数的加减法操作过程,但使用计数需要我们自己加读写锁,有小小的思想负担。如果我们对 Go 语言已经比较熟悉的话,很容易想到可以用 buffered channel 来完成简单的加令牌取令牌操作:
var tokenBucket = make(chan struct{}, capacity)
每过一段时间向 tokenBucket 中添加 token,如果 bucket 已经满了,那么直接放弃:
illToken := func() {
ticker := time.NewTicker(fillInterval)
for {
select {
case <-ticker.C:
select {
case tokenBucket <- struct{}{}:
default:
}
fmt.Println("current token cnt:", len(tokenBucket), time.Now())
}
}
}
上面的令牌桶的取令牌操作实现起来也比较简单,简化问题,我们这里只取一个令牌:
func TakeAvailable(block bool) bool{
var takenResult bool
if block {
select {
case <-tokenBucket:
takenResult = true
}
} else {
select {
case <-tokenBucket:
takenResult = true
default:
takenResult = false
}
}
return takenResult
}
一些公司自己造的限流的轮子就是用上面这种方式来实现的,不过如果开源版 ratelimit 也如此的话,那我们也没什么可说的了。现实并不是这样的。
我们来思考一下,令牌桶每隔一段固定的时间向桶中放令牌,如果我们记下上一次放令牌的时间为 t1,和当时的令牌数 k1,放令牌的时间间隔为 ti,每次向令牌桶中放 x 个令牌,令牌桶容量为 cap。现在如果有人来调用 TakeAvailable
来取 n 个令牌,我们将这个时刻记为 t2。在 t2 时刻,令牌桶中理论上应该有多少令牌呢?伪代码如下:
cur = k1 + ((t2 - t1)/ti) * x
cur = cur > cap ? cap : cur
我们用两个时间点的时间差,再结合其它的参数,理论上在取令牌之前就完全可以知道桶里有多少令牌了。那劳心费力地像本小节前面向 channel 里填充 token 的操作,理论上是没有必要的。只要在每次 Take
的时候,再对令牌桶中的 token 数进行简单计算,就可以得到正确的令牌数。是不是很像 惰性求值
的感觉?
在得到正确的令牌数之后,再进行实际的 Take 操作就好,这个 Take 操作只需要对令牌数进行简单的减法即可,记得加锁以保证并发安全。github.com/juju/ratelimit
这个库就是这样做的。
5.6.3 服务瓶颈和 QoS
前面我们说了很多 CPU-bound、IO-bound 之类的概念,这种性能瓶颈从大多数公司都有的监控系统中可以比较快速地定位出来,如果一个系统遇到了性能问题,那监控图的反应一般都是最快的。
虽然性能指标很重要,但对用户提供服务时还应考虑服务整体的 QoS。QoS 全称是 Quality of Service,顾名思义是服务质量。QoS 包含有可用性、吞吐量、时延、时延变化和丢失等指标。一般来讲我们可以通过优化系统,来提高 web 服务的 CPU 利用率,从而提高整个系统的吞吐量。但吞吐量提高的同时,用户体验是有可能变差的。用户角度比较敏感的除了可用性之外,还有时延。虽然你的系统吞吐量高,但半天刷不开页面,想必会造成大量的用户流失。
所以在大公司的 web 服务性能指标中,除了平均响应时延之外,还会把响应时间的 95 分位,99 分位也拿出来作为性能标准。平均响应在提高 CPU 利用率没受到太大影响时,可能 95 分位、 99 分位的响应时间大幅度攀升了,那么这时候就要考虑提高这些 CPU 利用率所付出的代价是否值得了。
在线系统的机器一般都会保持 CPU 有一定的余裕。
5.7 layout 常见大型 web 项目分层
流行的 web 框架大多数是 MVC 框架,
- 控制器(Controller)- 负责转发请求,对请求进行处理。
- 视图(View) - 界面设计人员进行图形界面设计。
- 模型(Model) - 程序员编写程序应有的功能(实现算法等等)、数据库专家进行数据管理和数据库设计(可以实现具体的功能)。
随着时代的发展,前端也变成了越来越复杂的工程,为了更好地工程化,现在更为流行的一般是前后分离的架构。可以认为前后分离是把 V 层从 MVC 中抽离单独成为项目。这样一个后端项目一般就只剩下 M 和 C 层了。前后端之间通过 ajax 来交互,有时候要解决跨域的问题,但也已经有了较为成熟的方案。下面是一个前后分离的系统的简易交互图。
事实上,即使是简单的项目,业界也并没有完全遵守 MVC 框架提出者对于 M 和 C 所定义的分工。有很多公司的项目会在 controller 层塞入大量的逻辑,在 model 层就只管理数据的存储。这往往来源于对于 model 层字面含义的某种擅自引申理解。认为字面意思,这一层就是处理某种建模,而模型是什么?就是数据呗!
这种理解显然是有问题的,业务流程也算是一种“模型”,是对真实世界用户行为或者既有流程的一种建模,并非只有按格式组织的数据才能叫模型。不过按照 MVC 的创始人的想法,我们如果把和数据打交道的代码还有业务流程全部塞进 MVC 里的 M 层的话,这个 M 层又会显得有些过于臃肿。对于复杂的项目,一个 C 和一个 M 层显然是不够用的,现在比较流行的纯后端 api 模块一般采用下述划分方法:
- Controller,与上述类似,服务入口,负责处理路由,参数校验,请求转发。
- Logic/Service,逻辑(服务)层,一般是业务逻辑的入口,可以认为从这里开始,所有的请求参数一定是合法的。业务逻辑和业务流程也都在这一层中。常见的设计中会将该层称为 Business Rules。
- DAO/Repository,这一层主要负责和数据、存储打交道。将下层存储以更简单的函数、接口形式暴露给 Logic 层来使用。负责数据的持久化工作。
每一层都会做好自己的工作,然后用请求当前的上下文构造下一层工作所需要的结构体或其它类型参数,然后调用下一次的函数。在工作完成之后,再把处理结果一层层地传出到入口。
划分为 CLD 三层之后,在 C 层之前我们可能还需要同时支持多种协议。本章前面讲到的 thrift、gRPC 和 http 并不是一定只选择其中一种,有时我们需要支持其中的两种,比如同一个接口,我们既需要效率较高的 thrift,也需要方便 debug 的 http 入口。即除了 CLD 之外,还需要一个单独的 protocol 层,负责处理各种交互协议的细节。这样请求的流程会变成下面这样:
这样我们 controller 中的入口函数就变成了下面这样:
func CreateOrder(ctx context.Context, req *CreateOrderStruct) (*CreateOrderRespStruct, error) {
}
func CreateOrder(ctx context.Context, req *CreateOrderStruct) (*CreateOrderRespStruct, error) {
}
CreateOrder 有两个参数,ctx 用来传入 trace_id 一类的需要串联请求的全局参数,req 里存储了我们创建订单所需要的所有输入信息。返回结果是一个响应结构体和错误。可以认为,我们的代码运行到 controller 层之后,就没有任何与“协议”相关的代码了。在这里你找不到 http.Request,也找不到 http.ResponseWriter,也找不到任何与 thrift 或者 gRPC 相关的字眼。
在 protocol 层,处理 http 协议的大概代码如下:
// defined in protocol layer
type CreateOrderRequest struct {
OrderID int64 `json:"order_id"`
// ...
}
// defined in controller
type CreateOrderParams struct {
OrderID int64
}
func HTTPCreateOrderHandler(wr http.ResponseWriter, r *http.Request) {
var req CreateOrderRequest
var params CreateOrderParams
ctx := context.TODO()
// bind data to req
bind(r, &req)
// map protocol binded to protocol-independent
map(req, params)
logicResp,err := controller.CreateOrder(ctx, ¶ms)
if err != nil {}
// ...
}
理论上我们可以用同一个 request struct 组合上不同的 tag,来达到一个 struct 来给不同的协议复用的目的。不过遗憾的是在 thrift 中,request struct 也是通过 IDL 生成的,其内容在自动生成的 ttypes.go 文件中,我们还是需要在 thrift 的入口将这个自动生成的 struct 映射到我们 logic 入口所需要的 struct 上。gRPC 也是类似。这部分代码还是需要的。
聪明的读者可能已经可以看出来了,协议细节处理这一层实际上有大量重复劳动,每一个接口在协议这一层的处理,无非是把数据从协议特定的 struct(例如 http.Request,thrift 的被包装过了) 读出来,再绑定到我们协议无关的 struct 上,再把这个 struct 映射到 controller 入口的 struct 上,这些代码实际上长得都差不多。差不多的代码都遵循着某种模式,那么我们可以对这些模式进行简单的抽象,用 codegen 来把繁复的协议处理代码从工作内容中抽离出去。
先来看看 http 对应的 struct、thrift 对应的 struct 和我们协议无关的 struct 分别长什么样子:
// http request struct
type CreateOrder struct {
OrderID int64 `json:"order_id" validate:"required"`
UserID int64 `json:"user_id" validate:"required"`
ProductID int `json:"prod_id" validate:"required"`
Addr string `json:"addr" validate:"required"`
}
// thrift request struct
type FeatureSetParams struct {
DriverID int64 `thrift:"driverID,1,required"`
OrderID int64 `thrift:"OrderID,2,required"`
UserID int64 `thrift:"UserID,3,required"`
ProductID int `thrift:"ProductID,4,required"`
Addr string `thrift:"Addr,5,required"`
}
// controller input struct
type CreateOrderParams struct {
OrderID int64
UserID int64
ProductID int
Addr string
}
我们需要通过一个源 struct 来生成我们需要的 http 和 thrift 入口代码。再观察一下上面定义的三种 struct,实际上我们只要能用一个 struct 生成 thrift 的 IDL,以及 http 服务的 “IDL(实际上就是带 json/form 相关 tag 的 struct 定义)” 就可以了。这个初始的 struct 我们可以把 struct 上的 http 的 tag 和 thrift 的 tag 揉在一起:
type FeatureSetParams struct {
DriverID int64 `thrift:"driverID,1,required" json:"driver_id"`
OrderID int64 `thrift:"OrderID,2,required" json:"order_id"`
UserID int64 `thrift:"UserID,3,required" json:"user_id"`
ProductID int `thrift:"ProductID,4,required" json:"prod_id"`
Addr string `thrift:"Addr,5,required" json:"addr"`
}
然后通过代码生成把 thrift 的 IDL 和 http 的 request struct 都生成出来:
至于用什么手段来生成,你可以通过 go 语言内置的 parser 读取文本文件中的 Go 源代码,然后根据 ast 来生成目标代码,也可以简单地把这个源 struct 和 generator 的代码放在一起编译,让 struct 作为 generator 的输入参数(这样会更简单一些),都是可以的。
当然这种思路并不是唯一选择,我们还可以通过解析 thrift 的 IDL,生成一套 http 接口的 struct。如果你选择这么做,那整个流程就变成了这样:
看起来比之前的图顺畅一点,不过如果你选择了这么做,你需要自行对 thrift 的 IDL 进行解析,也就是相当于可能要手写一个 thrift 的 IDL 的 parser,虽然现在有 antlr 或者 peg 能帮你简化这些 parser 的书写工作,但在“解析”的这一步我们不希望引入太多的工作量,所以量力而行即可。
既然工作流已经成型,我们可以琢磨一下怎么让整个流程对用户更加友好。
比如在前面的生成环境引入 GUI 或者 web 页面,只要让用户点点鼠标就能生成 SDK,这些就靠读者自己去探索了。
本节中所叙述的分层没有将 middleware 作为项目的分层考虑进去。如果我们考虑 middleware 的话,请求的流程是什么样的?
之前我们学习的 middleware 是和 http 协议强相关的,遗憾的是在 thrift 中看起来没有和 http 中对等的解决这些非功能性逻辑代码重复问题的 middleware。所以我们在图上写 thrift stuff
。这些 stuff
可能需要你手写去实现,然后每次增加一个新的 thrift 接口,就需要去写一遍这些非功能性代码。
这也是很多企业项目所面临的真实问题,遗憾的是开源界并没有这样方便的多协议 middleware 解决方案。当然了,前面我们也说过,很多时候我们给自己保留的 http 接口只是用来做 debug,并不会暴露给外人用。这种情况下,这些非功能性的代码只要在 thrift 的代码中完成即可。
5.8 interface 和 table-driven 开发
在 web 项目中经常会遇到外部依赖环境的变化,比如:
- 公司的老存储系统年久失修,现在已经没有人维护了,新的系统上线也没有考虑平滑迁移,但最后通牒已下,要求 N 天之内迁移完毕。
- 平台部门的老用户系统年久失修(怎么都是年久失修,摔!),现在已经没有人维护了,真是悲伤的故事。新系统上线没有考虑兼容老接口,但最后通牒已下,要求 N 个月之内迁移完毕。
- 公司的老消息队列人走茶凉,年久失修(汗),新来的技术精英们没有考虑向前兼容,但最后通牒已下,要求半年之内迁移完毕。
所以你看到了,我们的外部依赖总是为了自己爽而不断地做升级,且不想做向前兼容,然后来给我们下最后通牒。如果我们的部门工作饱和,领导强势,那么有时候也可以倒逼依赖方来做兼容。但世事不一定如人愿,即使我们的领导强势,读者朋友的领导也还是可能认怂的。
5.8.1 业务系统的发展过程
互联网公司只要可以活过三年,工程方面面临的首要问题就是代码膨胀。系统的代码膨胀之后,可以将系统中与业务本身流程无关的部分做拆解和异步化。什么算是业务无关呢,比如一些统计、反作弊、营销发券、价格计算、用户状态更新等等需求。这些需求往往依赖于主流程的数据,但又只是挂在主流程上的旁支,自成体系。
这时候我们就可以把这些旁支拆解出去,作为独立的系统来部署、开发以及维护。这些旁支流程的时延如若非常敏感,比如用户在界面上点了按钮,需要立刻返回(价格计算、支付),那么需要与主流程系统进行 RPC 通信,并且在通信失败时,要将结果直接返回给用户。如果时延不敏感,比如抽奖系统,结果稍后公布的这种,或者非实时的统计类系统,那么就没有必要在主流程里为每一套系统做一套 RPC 流程。我们只要将下游需要的数据打包成一条消息,传入消息队列,之后的事情与主流程一概无关(当然,与用户的后续交互流程还是要做的)。
通过拆解和异步化虽然解决了一部分问题,但并不能解决所有问题。随着业务发展,单一职责的模块也会变得越来越复杂,这是必然的趋势。一件事情本身变的复杂的话,这时候拆解和异步化就不灵了。我们还是要对事情本身进行一定程度的封装抽象。
5.8.2 使用函数封装业务流程
最基本的封装过程,我们把相似的行为放在一起,然后打包成一个一个的函数,让自己杂乱无章的代码变成下面这个样子:
func BusinessProcess(ctx context.Context, params Params) (resp, error){
ValidateLogin()
ValidateParams()
AntispamCheck()
GetPrice()
CreateOrder()
UpdateUserStatus()
NotifyDownstreamSystems()
}
不管是多么复杂的业务,系统内的逻辑都是可以分解为 step1 -> step2 -> step3 … 这样的流程的。
每一个步骤内部也
会有复杂的流程,比如:
func CreateOrder() {
ValidateDistrict() // 判断是否是地区限定商品
ValidateVIPProduct() // 检查是否是只提供给 vip 的商品
GetUserInfo() // 从用户系统获取更详细的用户信息
GetProductDesc() // 从商品系统中获取商品在该时间点的详细信息
DecrementStorage() // 扣减库存
CreateOrderSnapshot() // 创建订单快照
return CreateSuccess
}
在阅读业务流程代码时,我们只要阅读其函数名就能知晓在该流程中完成了哪些操作,如果需要修改细节,那么就继续深入到每一个业务步骤去看具体的流程。写得稀烂的业务流程代码则会将所有过程都堆积在少数的几个函数中,从而导致几百甚至上千行的函数。这种意大利面条式的代码阅读和维护都会非常痛苦。在开发的过程中,一旦有条件应该立即进行类似上面这种方式的简单封装。
5.8.3 使用 interface 来做抽象
业务发展的早期,是不适宜引入 interface 的,很多时候业务流程变化很大,过早引入 interface 会使业务系统本身增加很多不必要的分层,从而导致每次修改几乎都要全盘否定之前的工作。
当业务发展到一定阶段,主流程稳定之后,就可以适当地使用 interface 来进行抽象了。这里的稳定,是指主流程的大部分业务步骤已经确定,即使再进行修改,也不会进行大规模的变动,而只是小修小补,或者只是增加或删除少量业务步骤。
如果我们在开发过程中,已经对业务步骤进行了良好的封装,这时候进行 interface 抽象化就会变的非常容易,伪代码:
// OrderCreator 创建订单流程
type OrderCreator interface {
ValidateDistrict() // 判断是否是地区限定商品
ValidateVIPProduct() // 检查是否是只提供给 vip 的商品
GetUserInfo() // 从用户系统获取更详细的用户信息
GetProductDesc() // 从商品系统中获取商品在该时间点的详细信息
DecrementStorage() // 扣减库存
CreateOrderSnapshot() // 创建订单快照
}
我们只要把之前写过的步骤函数签名都提到一个 interface 中,就可以完成抽象了。
在进行抽象之前,我们应该想明白的一点是,引入 interface 对我们的系统本身是否有意义,这是要按照场景去进行分析的。假如我们的系统只服务一条产品线,并且内部的代码只是针对很具体的场景进行定制化开发,那么实际上引入 interface 是不会带来任何收益的。至于说是否方便测试,这一点我们会在之后的章节来讲。
如果我们正在做的是平台系统,需要由平台来定义统一的业务流程和业务规范,那么基于 interface 的抽象就是有意义的。举个例子:
平台需要服务多条业务线,但数据定义需要统一,所以希望都能走平台定义的流程。作为平台方,我们可以定义一套类似上文的 interface,然后要求接入方的业务必须将这些 interface 都实现。如果 interface 中有其不需要的步骤,那么只要返回 nil,或者忽略就好。
在业务进行迭代时,平台的代码是不用修改的,这样我们便把这些接入业务当成了平台代码的插件(plugin)引入进来了。如果没有 interface 的话,我们会怎么做?
import (
"sample.com/travelorder"
"sample.com/marketorder"
)
func CreateOrder() {
switch businessType {
case TravelBusiness:
travelorder.CreateOrder()
case MarketBusiness:
marketorder.CreateOrderForMarket()
default:
return errors.New("not supported business")
}
}
func ValidateUser() {
switch businessType {
case TravelBusiness:
travelorder.ValidateUserVIP()
case MarketBusiness:
marketorder.ValidateUserRegistered()
default:
return errors.New("not supported business")
}
}
// ...
switch ...
switch ...
switch ...
没错,就是无穷无尽的 switch,和没完没了的垃圾代码。引入了 interface 之后,我们的 switch 只需要在业务入口做一次。
type BusinessInstance interface {
ValidateLogin()
ValidateParams()
AntispamCheck()
GetPrice()
CreateOrder()
UpdateUserStatus()
NotifyDownstreamSystems()
}
func entry() {
var bi BusinessInstance
switch businessType {
case TravelBusiness:
bi = travelorder.New()
case MarketBusiness:
bi = marketorder.New()
default:
return errors.New("not supported business")
}
}
func BusinessProcess(bi BusinessInstance) {
bi.ValidateLogin()
bi.ValidateParams()
bi.AntispamCheck()
bi.GetPrice()
bi.CreateOrder()
bi.UpdateUserStatus()
bi.NotifyDownstreamSystems()
}
直接面向 interface 编程,而不用关心具体的实现了。如果对应的业务在迭代中发生了修改,所有的逻辑对平台方来说也是完全透明的。
5.8.4 interface 的优缺点
Go 被人称道的最多的地方是其 interface 设计的正交性,模块之间不需要知晓相互的存在,A 模块定义 interface,B 模块实现这个 interface 就可以。如果 interface 中没有 A 模块中定义的数据类型,那 B 模块中甚至都不用 import A。比如标准库中的 io.Writer
:
type Writer interface {
Write(p []byte) (n int, err error)
}
我们可以在自己的模块中实现 io.Writer
接口:
type MyType struct {}
func (m MyType) Write(p []byte) (n int, err error) {
return 0, nil
}
那么我们就可以把我们自己的 MyType 传给任何使用 io.Writer
作为参数的函数来使用了,比如:
package log
func SetOutput(w io.Writer) {
output = w
}
然后:
package my-business
import "xy.com/log"
func init() {
log.SetOutput(MyType)
}
在 MyType 定义的地方,不需要 import "io"
就可以直接实现 io.Writer
interface,我们还可以随意地组合很多函数,以实现各种类型的接口,同时接口实现方和接口定义方都不用建立 import 产生的依赖关系。因此很多人认为 Go 的这种正交是一种很优秀的设计。
但这种“正交”性也会给我们带来一些麻烦。当我们接手了一个几十万行的系统时,如果看到定义了很多 interface,例如订单流程的 interface,我们希望能直接找到这些 interface 都被哪些对象实现了。但直到现在,这个简单的需求也就只有 goland 实现了,并且体验尚可。Visual Studio Code 则需要对项目进行全局扫描,来看到底有哪些 struct 实现了该 interface 的全部函数。那些显式实现 interface 的语言,对于 IDE 的 interface 查找来说就友好多了。另一方面,我们看到一个 struct,也希望能够立刻知道这个 struct 实现了哪些 interface,但也有着和前面提到的相同的问题。
虽有不便,interface 带给我们的好处也是不言而喻的:一是依赖反转,这是 interface 在大多数语言中对软件项目所能产生的影响,在 Go 的正交 interface 的设计场景下甚至可以去除依赖;二是由编译器来帮助我们在编译期就能检查到类似“未完全实现接口”这样的错误,如果业务未实现某个流程,但又将其实例作为 interface 强行来使用的话:
package main
type OrderCreator interface {
ValidateUser()
CreateOrder()
}
type BookOrderCreator struct{}
func (boc BookOrderCreator) ValidateUser() {}
func createOrder(oc OrderCreator) {
oc.ValidateUser()
oc.CreateOrder()
}
func main() {
createOrder(BookOrderCreator{})
}
会报出下述错误。
# command-line-arguments
./a.go:18:30: cannot use BookOrderCreator literal (type BookOrderCreator) as type OrderCreator in argument to createOrder:
BookOrderCreator does not implement OrderCreator (missing CreateOrder method)
所以 interface 也可以认为是一种编译期进行检查的保证类型安全的手段
5.8.5 table-driven 开发
熟悉开源 lint 工具的同学应该见到过圈复杂度的说法,在函数中如果有 if 和 switch 的话,会使函数的圈复杂度上升,所以有强迫症的同学即使在入口一个函数中有 switch,还是想要干掉这个 switch,有没有什么办法呢?当然有,用表驱动的方式来存储我们需要实例:
func entry() {
var bi BusinessInstance
switch businessType {
case TravelBusiness:
bi = travelorder.New()
case MarketBusiness:
bi = marketorder.New()
default:
return errors.New("not supported business")
}
}
可以修改为:
var businessInstanceMap = map[int]BusinessInstance {
TravelBusiness : travelorder.New(),
MarketBusiness : marketorder.New(),
}
func entry() {
bi := businessInstanceMap[businessType]
}
table driven 的设计方式,很多设计模式相关的书籍并没有把它作为一种设计模式来讲,但我认为这依然是一种非常重要的帮助我们来简化代码的手段。在日常的开发工作中可以多多思考,哪些不必要的 switch case 可以用一个字典和一行代码就可以轻松搞定。
5.9 灰度发布和 A/B test
中型的互联网公司往往有着以百万计的用户,而大型互联网公司的系统则可能要服务千万级甚至亿级的用户需求。大型系统的请求流入往往是源源不断的,任何风吹草动,都一定会有最终用户感受得到。例如你的系统在上线途中会拒绝一些上游过来的请求,而这时候依赖你的系统没有做任何容错,那么这个错误就会一直向上抛出,直到触达最终用户。
不管怎么说,在大型系统中容错是重要的,能够让系统按百分比,分批次到达最终用户,也是很重要的。虽然当今的互联网公司系统,名义上会说自己上线前都经过了充分慎重严格的测试,但就算它们真得做到了,代码的 bug 总是在所难免的。即使代码没有 bug,分布式服务之间的协作也是可能出现“逻辑”上的非技术问题的。
互联网系统的灰度发布(灰度发布也称为金丝雀发布)一般通过两种方式实现:
- 通过分批次部署实现灰度发布
- 通过业务规则进行灰度发布
在对系统的旧功能进行升级迭代时,第一种方式用的比较多。新功能上线时,第二种方式用的比较多。当然,对比较重要的老功能进行较大幅度的修改时,一般也会选择按业务规则来进行发布,因为直接全量开放给所有用户风险实在太大。
5.9.1 通过分批次部署实现灰度发布
假如服务部署在 15 个实例(可能是物理机,也可能是容器)上,我们把这 7 个实例分为三组,按照先后顺序,分别有 1-2-4-8 台机器,保证每次扩展时大概都是二倍的关系。为什么要用 2 倍?这样能够保证我们不管有多少台机器,都不会把组划分得太多。
在上线时,最有效的观察手法是查看程序的错误日志,如果较明显的逻辑错误,一般错误日志的滚动速度都会有肉眼可见的增加。这些错误也可以通过 metrics 一类的系统上报给公司内的监控系统,所以在上线过程中,也可以通过观察监控曲线,来判断是否有异常发生。
如果有异常情况,首先要做的自然就是回滚了。
5.9.2 通过业务规则进行灰度发布
常见的灰度策略有多种,较为简单的需求,例如我们的策略是要按照千分比来发布,那么我们可以用用户 id、手机号、用户设备信息,等等,来生成一个简单的哈希值,然后再求模,用伪代码表示一下:
// pass 3/1000
func passed() bool {
key := hashFunctions(userID) % 1000
if key <= 2 {
return true
}
return false
}
5.9.2.1 可选规则
常见的灰度发布系统会有下列规则提供选择:
- 按城市发布
- 按概率发布
- 按百分比发布
- 按白名单发布
- 按业务线发布
- 按 UA 发布(app、web、pc)
- 按分发渠道发布
按白名单发布比较简单,功能上线时,可能我们希望只有公司内部的员工和测试人员可以访问到新功能,会直接把账号、邮箱写入到白名单,拒绝其它任何账号的访问。
按概率发布则是指实现一个简单的函数:
func isTrue() bool {
return true/false according to the rate provided by user
}
按百分比发布,是指实现下面这样的函数:
func isTrue(phone string) bool {
if hash of phone matches {
return true
}
return false
}
和上面的单纯按照概率的区别是这里我们需要调用方提供给我们一个输入参数,我们以该输入参数作为源来计算哈希,并以哈希后的结果来求模,并返回结果。这样可以保证同一个用户的返回结果多次调用是一致的,
5.9.3 如何实现一套灰度发布系统
提供给用户的接口大概可以分为和业务绑定的简单灰度判断逻辑。以及输入稍微复杂一些的哈希灰度。我们来分别看看怎么实现这样的灰度系统(函数)。
5.9.3.1 业务相关的简单灰度
5.9.3.2 哈希算法
求哈希可用的算法非常多,比如 md5,crc32,sha1 等等,但我们这里的目的只是为了给这些数据做个映射,并不想要因为计算哈希消耗过多的 cpu,所以现在业界使用较多的算法是 murmurhash, murmurhash 相比其它的算法有三倍以上的性能提升。
5.9.3.3 分布是否均匀
对于哈希算法来说,性能是一方面的问题,另一方面还要考虑哈希后的值是否分布均匀。
5.10 补充说明
现代的软件工程是离不开 web 的,广义地来讲,web 甚至可以不用非得基于 http 协议。只要是 CS 或者 BS 架构,都可以认为是 web 系统。
即使是在看起来非常封闭的游戏系统里,因为玩家们与日俱增的联机需求,也同样会涉及到远程通信,这里面也会涉及到很多 web 方面的技术。
所以这个时代,web 编程是一个程序员所必须接触的知识领域。无论你的目标是成为架构师,是去创业,或是去当技术顾问。web 方面的知识都会成为你的硬通货。
第6章 分布式系统
Go语言号称是互联网时代的C语言。现在的互联网系统已经不是以前的一个主机搞定一切的时代,互联网时代的后台服务由大量的分布式系统构成,任何单一后台服务器节点的故障并不会导致整个系统的停机。同时以青云、阿里云、腾讯云为代表的云厂商崛起标志着云时代的到来,在云时代分布式编程将成为一个基本技能。而基于Go语言构建的Docker、K8s等系统推动了云时代的提前到来。
6.1 分布式 id 生成器
有时我们需要能够生成类似 MySQL 自增 ID 这样不断增大,同时又不会重复的 id。以支持业务中的高并发场景。比较典型的,电商促销时,短时间内会有大量的订单涌入到系统,比如每秒 10w+。
在插入数据库之前,我们需要给这些消息/订单先打上一个 ID,然后再插入到我们的数据库。对这个 id 的要求是希望其中能带有一些时间信息,这样即使我们后端的系统对消息进行了分库分表,也能够以时间顺序对这些消息进行排序
Twitter 的 snowflake 算法是这种场景下的一个典型解法
datacenter_id sequence_id
unused
│ │
│ │ │
│ │ │
│ │ │ │ │
│ │ │ │ │
▼ │◀────────────────── 41 bits ────────────────────▶│ ▼ ▼
┌─────┼──────────────────────────────────────────────────────┼────────┬────────┬────────────────┐
│ 0 │ 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0 │ 00000 │ 00000 │ 0000 0000 0000 │
└─────┴──────────────────────────────────────────────────────┴────────┴────────┴────────────────┘
▲ ▲
│ │
│ │
│ │
│ │
│ │
│ │
time in milliseconds worker_id
首先确定我们的数值是 64 位,int64 类型,被划分为四部分,不含开头的第一个 bit,因为这个 bit 是符号位。用 41 位来表示收到请求时的时间戳,单位为毫秒,然后五位来表示数据中心的 id,然后再五位来表示机器的实例 id,最后是 12 位的循环自增 id(到达 1111 1111 1111 后会归 0)。
这样的机制可以支持我们在同一台机器上,同一毫秒内产生 2 ^ 12 = 4096 条消息。一秒共 409.6w 条消息。从值域上来讲完全够用了。
数据中心 + 实例 id 共有 10 位,可以支持我们每数据中心部署 32 台机器,所有数据中心共 1024 台实例。
表示 timestamp 的 41 位,可以支持我们使用 69 年。
当然,我们的时间毫秒计数不会真的从 1970 年开始记,那样我们的系统跑到 2039/9/7 23:47:35
就不能用了,所以这里的 timestamp 实际上只是相对于某个时间的增量,比如我们的系统上线是 2018-08-01,那么我们可以把这个 timestamp 当作是从 2018-08-01 00:00:00.000
的偏移量。
6.1.1 worker id 分配
timestamp,datacenter_id,worker_id 和 sequence_id 这四个字段中,timestamp 和 sequence_id 是由程序在运行期生成的。但 datacenter_id 和 worker_id 需要我们在部署阶段就能够获取得到,并且一旦程序启动之后,就是不可更改的了(想想,如果可以随意更改,可能被不慎修改,造成最终生成的 id 有冲突)。
一般不同数据中心的机器,会提供对应的获取数据中心 id 的 api,所以 datacenter_id 我们可以在部署阶段轻松地获取到。而 worker_id 是我们逻辑上给机器分配的一个 id,这个要怎么办呢?比较简单的想法是由能够提供这种自增 id 功能的工具来支持,比如 MySQL:
从 MySQL 中获取到 worker_id 之后,就把这个 worker_id 直接持久化到本地,以避免每次上线时都需要获取新的 worker_id。让单实例的 worker_id 可以始终保持不变。
当然,使用 MySQL 相当于给我们简单的 id 生成服务增加了一个外部依赖。依赖越多,我们的服务的可运维性就越差。
考虑到集群中即使有单个 id 生成服务的实例挂了,也就是损失一段时间的一部分 id,所以我们也可以更简单暴力一些,把 worker_id 直接写在 worker 的配置中,上线时,由部署脚本完成 worker_id 字段替换。
6.1.2 开源实例
6.1.2.1 标准 snowflake 实现
github.com/bwmarrin/snowflake
是一个相当轻量化的 snowflake 的 Go 实现。其文档指出:
+--------------------------------------------------------------------------+
| 1 Bit Unused | 41 Bit Timestamp | 10 Bit NodeID | 12 Bit Sequence ID |
+--------------------------------------------------------------------------+
和标准的 snowflake 完全一致。
package main
import (
"fmt"
"os"
"github.com/bwmarrin/snowflake"
)
func main() {
n, err := snowflake.NewNode(1)
if err != nil {
println(err)
os.Exit(1)
}
for i := 0; i < 3; i++ {
id := n.Generate()
fmt.Println("id", id)
fmt.Println("node: ", id.Node(), "step: ", id.Step(), "time: ", id.Time(), "\n")
}
}
当然,这个库也给我们留好了定制的后路:
// Epoch is set to the twitter snowflake epoch of Nov 04 2010 01:42:54 UTC
// You may customize this to set a different epoch for your application.
Epoch int64 = 1288834974657
// Number of bits to use for Node
// Remember, you have a total 22 bits to share between Node/Step
NodeBits uint8 = 10
// Number of bits to use for Step
// Remember, you have a total 22 bits to share between Node/Step
StepBits uint8 = 12
Epoch 就是本节开头讲的起始时间,NodeBits 指的是机器编号的位长,StepBits 指的是自增序列的位长。
6.1.2.2 sonyflake
sonyflake 是 Sony 公司的一个开源项目,基本思路和 snowflake 差不多,不过位分配上稍有不同:
+-----------------------------------------------------------------------------+
| 1 Bit Unused | 39 Bit Timestamp | 8 Bit Sequence ID | 16 Bit Machine ID |
+-----------------------------------------------------------------------------+
这里的时间只用了 39 个 bit,但时间的单位变成了 10ms,所以理论上比 41 位表示的时间还要久(174 years)。
Sequence ID 和之前的定义一致,Machine ID 其实就是节点 id。sonyflake 与众不同的地方在于其在启动阶段的 setting 配置:
func NewSonyflake(st Settings) *Sonyflake
Settings 数据结构如下:
type Settings struct {
StartTime time.Time
MachineID func() (uint16, error)
CheckMachineID func(uint16) bool
}
StartTime 选项和我们之前的 Epoch 差不多,如果不设置的话,默认是从 2014-09-01 00:00:00 +0000 UTC
开始。
MachineID 可以由用户自定义的函数,如果用户不定义的话,会默认将本机 ip 的低 16 位作为 machine id。
CheckMachineID 是由用户提供的检查 MachineID 是否冲突的函数。这里的设计还是比较巧妙的,如果有另外的中心化存储并支持检查重复的存储,那我们就可以按照自己的想法随意定制这个检查 MachineID 是否冲突的逻辑。如果公司有现成的 Redis 集群,那么我们可以很轻松地用 Redis 的 set 来检查冲突。
6.2 分布式锁
6.2.1 进程内加锁
package main
import "sync"
var counter int
func main() {
var wg sync.WaitGroup
var l sync.Mutex
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
l.Lock()
counter++
l.Unlock()
}()
}
wg.Wait()
println(counter)
}
想要得到正确的结果的话,要把对 counter 的操作代码部分加上锁
6.2.2 trylock
package main
import (
"sync"
)
// Lock try lock
type Lock struct {
c chan struct{}
}
// NewLock generate a try lock
func NewLock() Lock {
var l Lock
l.c = make(chan struct{}, 1)
l.c <- struct{}{}
return l
}
// Lock try lock, return lock result
func (l Lock) Lock() bool {
lockResult := false
select {
case <-l.c:
lockResult = true
default:
}
return lockResult
}
// Unlock , Unlock the try lock
func (l Lock) Unlock() {
l.c <- struct{}{}
}
var counter int
func main() {
var l = NewLock()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
if !l.Lock() {
// log error
println("lock failed")
return
}
counter++
println("current counter", counter)
l.Unlock()
}()
}
wg.Wait()
}
因为我们的逻辑限定每个 goroutine 只有成功执行了 Lock 才会继续执行后续逻辑,因此在 Unlock 时可以保证 Lock struct 中的 channel 一定是空,从而不会阻塞,也不会失败。
在单机系统中,trylock 并不是一个好选择。因为大量的 goroutine 抢锁可能会导致 cpu 无意义的资源浪费。有一个专有名词用来描述这种抢锁的场景:活锁。
活锁指的是程序看起来在正常执行,但实际上 cpu 周期被浪费在抢锁,而非执行任务上,从而程序整体的执行效率低下。活锁的问题定位起来要麻烦很多。所以在单机场景下,不建议使用这种锁。
6.2.3 基于 redis 的 setnx
setnx 很适合在高并发场景下,用来争抢一些“唯一”的资源。比如交易撮合系统中卖家发起订单,而多个买家会对其进行并发争抢。这种场景我们没有办法依赖具体的时间来判断先后,因为不管是用户设备的时间,还是分布式场景下的各台机器的时间,都是没有办法在合并后保证正确的时序的。哪怕是我们同一个机房的集群,不同的机器的系统时间可能也会有细微的差别。
所以,我们需要依赖于这些请求到达 redis 节点的顺序来做正确的抢锁操作。如果用户的网络环境比较差,那也只能自求多福了。
6.2.4 基于 zk
基于 zk 的锁与基于 redis 的锁的不同之处在于 Lock 成功之前会一直阻塞,这与我们单机场景中的 mutex.Lock 很相似。
其原理也是基于临时 sequence 节点和 watch api,例如我们这里使用的是 /lock
节点。Lock 会在该节点下的节点列表中插入自己的值,只要节点下的子节点发生变化,就会通知所有 watch 该节点的程序。这时候程序会检查当前节点下最小的子节点的 id 是否与自己的一致。如果一致,说明加锁成功了。
这种分布式的阻塞锁比较适合分布式任务调度场景,但不适合高频次持锁时间短的抢锁场景。按照 Google 的 chubby 论文里的阐述,基于强一致协议的锁适用于 粗粒度
的加锁操作。这里的粗粒度指锁占用时间较长。我们在使用时也应思考在自己的业务场景中使用是否合适。
6.2.5 基于 etcd
package main
import (
"log"
"github.com/zieckey/etcdsync"
)
func main() {
m, err := etcdsync.New("/lock", 10, []string{"http://127.0.0.1:2379"})
if m == nil || err != nil {
log.Printf("etcdsync.New failed")
return
}
err = m.Lock()
if err != nil {
log.Printf("etcdsync.Lock failed")
return
}
log.Printf("etcdsync.Lock OK")
log.Printf("Get the lock. Do something here.")
err = m.Unlock()
if err != nil {
log.Printf("etcdsync.Unlock failed")
} else {
log.Printf("etcdsync.Unlock OK")
}
}
etcd 中没有像 zookeeper 那样的 sequence 节点。所以其锁实现和基于 zookeeper 实现的有所不同。在上述示例代码中使用的 etcdsync 的 Lock 流程是:
- 先检查
/lock
路径下是否有值,如果有值,说明锁已经被别人抢了 - 如果没有值,那么写入自己的值。写入成功返回,说明加锁成功。写入时如果节点被其它节点写入过了,那么会导致加锁失败,这时候到 3
- watch
/lock
下的事件,此时陷入阻塞 - 当
/lock
路径下发生事件时,当前进程被唤醒。检查发生的事件是否是删除事件(说明锁被持有者主动 unlock),或者过期事件(说明锁过期失效)。如果是的话,那么回到 1,走抢锁流程。
6.2.7 如何选择
业务还在单机就可以搞定的量级时,那么按照需求使用任意的单机锁方案就可以。
如果发展到了分布式服务阶段,但业务规模不大,比如 qps < 1000,使用哪种锁方案都差不多。如果公司内已有可以使用的 zk/etcd/redis 集群,那么就尽量在不引入新的技术栈的情况下满足业务需求。
业务发展到一定量级的话,就需要从多方面来考虑了。首先是你的锁是否在任何恶劣的条件下都不允许数据丢失,如果不允许,那么就不要使用 redis 的 setnx 的简单锁。
如果要使用 redlock,那么要考虑你们公司 redis 的集群方案,是否可以直接把对应的 redis 的实例的 ip+port 暴露给开发人员。如果不可以,那也没法用。
对锁数据的可靠性要求极高的话,那只能使用 etcd 或者 zk 这种通过一致性协议保证数据可靠性的锁方案。但可靠的背面往往都是较低的吞吐量和较高的延迟。需要根据业务的量级对其进行压力测试,以确保分布式锁所使用的 etcd/zk 集群可以承受得住实际的业务请求压力。需要注意的是,etcd 和 zk 集群是没有办法通过增加节点来提高其性能的。要对其进行横向扩展,只能增加搭建多个集群来支持更多的请求。这会进一步提高对运维和监控的要求。多个集群可能需要引入 proxy,没有 proxy 那就需要业务去根据某个业务 id 来做 sharding。如果业务已经上线的情况下做扩展,还要考虑数据的动态迁移。这些都不是容易的事情。
在选择具体的方案时,还是需要多加思考,对风险早做预估。
6.3 延时任务系统
如果业务规模比较小,有时我们也可以通过 db + 轮询来对这种任务进行简单处理,但上了规模的公司,自然会寻找更为普适的解决方案来解决这一类问题
一般有两种思路来解决这个问题:
- 实现一套类似 crontab 的分布式定时任务管理系统。
- 实现一个支持定时发送消息的消息队列。
两种思路进而衍生出了一些不同的系统,但其本质是差不多的。都是需要实现一个定时器。在单机的场景下其实并不少见,例如我们在和网络库打交道的时候经常会写 SetReadDeadline
,这实际上就是在本地创建了一个定时器,在到达指定的时间后,我们会收到定时器的通知,告诉我们时间已到。这时候如果读取还没有完成的话,就可以认为发生了网络问题,从而中断读取。
timer 的实现在工业界已经是有解的问题了。常见的就是时间堆和时间轮
6.3.1 timer 实现
6.3.1.1 时间堆
最常见的时间堆一般用小顶堆实现,小顶堆其实就是一种特殊的二叉树:
┌─────┐
│ │
│ 5 │
└─────┘
│
│
┌──────────┴──────────┐
│ │
▼ ▼
┌─────┐ ┌─────┐
│ │ │ │
│ 6 │ │ 10 │
└─────┘ └─────┘
│ │
┌────┴─────┐ ┌────┴─────┐
│ │ │ │
▼ ▼ ▼ ▼
┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐
│ │ │ │ │ │ │ │
│ 7 │ │ 6 │ │ 11 │ │ 20 │
└─────┘ └─────┘ └─────┘ └─────┘
│ │
│ │
┌───────┴────┐ └───────┐
│ │ │
▼ ▼ ▼
┌─────┐ ┌─────┐ ┌─────┐
│ │ │ │ ............... │ │
│ 15 │ │ 8 │ │ 30 │
└─────┘ └─────┘ └─────┘
小顶堆的好处是什么呢?实际上对于定时器来说,如果堆顶元素比当前的时间还要大,那么说明堆内所有元素都比当前时间大。进而说明这个时刻我们还没有必要对时间堆进行任何处理。所以对于定时 check 来说,时间复杂度是 O(1) 的。
当我们发现堆顶的元素 < 当前时间时,那么说明可能已经有一批事件已经开始过期了,这时进行正常的弹出和堆调整操作就好。每一次堆调整的时间复杂度都是 O(LgN)。
Go 自身的 timer 就是用时间堆来实现的,不过并没有使用二叉堆,而是使用了扁平一些的四叉堆。在最近的版本中,还加了一些优化,我们先不说优化,先来看看四叉的小顶堆长什么样:
+-----+
| |
| 0 |
+-----+
|
|
|
v
+-----+-----+-----+-----+
| | | | |
| 3 | 2 | 2 | 10 |
+-----+-----+-----+-----+
| | | |
| | | |
+----------+ | | | |
+----------------+ 4*i+1 +-----------------------+ | | +-----------------------------+
| +----------+ +-------------------+ +---+ |
| | | |
| | | |
v | | v
+-----+-----+-----+-----+ | | +-----+-----+-----+-----+
| | | | | v v | | | | |
| 20 | 4 | 5 | 13 | +-----+-----+-----+-----+ +-----+-----+-----+-----+ | 99 | 13 | 11 | 12 |
+-----+-----+-----+-----+ | | | | | | | | | | +-----+-----+-----+-----+
| 12 | 14 | 15 | 16 | | 3 | 10 | 3 | 3 |
+-----+-----+-----+-----+ +-----+-----+-----+-----+
小顶堆的性质,父节点比其 4 个子节点都小,子节点之间没有特别的大小关系要求。
6.3.1.2 时间轮
用时间轮来实现 timer 时,我们需要定义每一个格子的“刻度”,可以将时间轮想像成一个时钟,中心有秒针顺时针转动。每次转动到一个刻度时,我们就需要去查看该刻度挂载的 tasklist 是否有已经到期的任务。
从结构上来讲,时间轮和哈希表很相似,如果我们把哈希算法定义为:触发时间%时间轮元素大小。那么这就是一个简单的哈希表。在哈希冲突时,采用链表挂载哈希冲突的定时器。
除了这种单层时间轮,业界也有一些时间轮采用多层实现,
6.3.2 任务分发
有了基本的 timer 实现方案,如果我们开发的是单机系统,那么就可以撸起袖子开干了,不过本章我们讨论的是分布式,距离“分布式”还稍微有一些距离
我们还需要把这些“定时”或是“延时”(本质也是定时)任务分发出去。下面是一种思路:
每一个实例每隔一小时,会去数据库里把下一个小时需要处理的定时任务捞出来,捞取的时候只要取那些 task_id % shard_count = shard_id 的那些 task 即可。
当这些定时任务被触发之后需要通知用户侧,有两种思路:
- 将任务被触发的信息封装为一条 event 消息,发往消息队列,由用户侧对消息队列进行监听。
- 对用户预先配置的回调函数进行调用。
两种方案各有优缺点,如果采用 1,那么如果消息队列出故障会导致整个系统不可用,当然,现在的消息队列一般也会有自身的高可用方案,大多数时候我们不用担心这个问题。其次一般业务流程中间走消息队列的话会导致延时增加,定时任务若必须在触发后的几十毫秒到几百毫秒内完成,那么采用消息队列就会有一定的风险。如果采用 2,会加重定时任务系统的负担。我们知道,单机的 timer 执行时最害怕的就是回调函数执行时间过长,这样会阻塞后续的任务执行。在分布式场景下,这种忧虑依然是适用的。一个不负责任的业务回调可能就会直接拖垮整个定时任务系统。所以我们还要考虑在回调的基础上增加经过测试的超时时间设置,并且对由用户填入的超时时间做慎重的审核。
6.3.3 rebalance 和幂等考量
当我们的任务执行集群有机器故障时,需要对任务进行重新分配。按照之前的求模策略,对这台机器还没有处理的任务进行重新分配就比较麻烦了。如果是实际运行的线上系统,还要在故障时的任务平衡方面花更多的心思。
下面给出一种思路:
我们可以参考 elasticsearch 的设计,每份任务数据都有多个副本,这里假设两副本:
一份数据虽然有两个持有者,但持有者持有的副本会进行区分,比如持有的是主副本还是非主副本
一个任务只会在持有主副本的节点上被执行。
当有机器故障时,任务数据需要进行 rebalance 工作,比如 node 1 挂了:
┌──────────┐
│ node 1 │
├──────────┴────────────────────────┐
│ │
│ X X X │
│ │
└───────────────────────────────────┘
┌──────────┐
│ node 2 │
├──────────┴────────────────────────┐
│ ┌───┐ ┌───┐ ┏━━━┓ ┏━━━┓ ┏━━━┓ │
│ │ 0 │ │ 1 │ ┃ 2 ┃ ┃ 3 ┃ ┃ 4 ┃ │
│ └───┘ └───┘ ┗━━━┛ ┗━━━┛ ┗━━━┛ │
└───────────────────────────────────┘
┌──────────┐
│ node 3 │
├──────────┴────────────────────────┐
│ ┏━━━┓ ┏━━━┓ ┌───┐ ┌───┐ ┌───┐ │
│ ┃ 0 ┃ ┃ 1 ┃ │ 2 │ │ 3 │ │ 4 │ │
│ ┗━━━┛ ┗━━━┛ └───┘ └───┘ └───┘ │
└───────────────────────────────────┘
node 1 的数据会被迁移到 node 2 和 node 3 上。
当然,也可以用稍微复杂一些的思路,比如对集群中的节点进行角色划分,由协调节点来做这种故障时的任务重新分配工作,考虑到高可用,协调节点可能也需要有 1 ~ 2 个备用节点以防不测。
之前提到我们会用 MQ 触发对用户的通知,在使用 MQ 时,很多 MQ 是不支持 exactly once 的语义的,这种情况下我们需要让用户自己来负责消息的去重或者消费的幂等处理。
6.4 分布式搜索引擎
数据库系统本身要保证实时和强一致性,所以其功能设计上都是为了满足这种一致性需求。比如 write ahead log 的设计,基于 B+ 树实现的索引和数据组织,以及基于 MVCC 实现的事务等等。
系型数据库一般被用于实现 OLTP 系统,所谓 OLTP,援引 wikipedia:
在线交易处理(OLTP, Online transaction processing)是指透过信息系统、电脑网络及数据库,以线上交易的方式处理一般即时性的作业数据,和更早期传统数据库系统大量批量的作业方式并不相同。OLTP通常被运用于自动化的数据处理工作,如订单输入、金融业务…等反复性的日常性交易活动。和其相对的是属于决策分析层次的联机分析处理(OLAP)。
在互联网的业务场景中,也有一些实时性要求不高(可以接受多 s 的延迟),但是查询复杂性却很高的场景。举个例子,在电商的 wms 系统中,或者在大多数业务场景丰富的 crm 或者客服系统中,可能需要提供几十个字段的随意组合查询功能。这种系统的数据维度天生众多,比如一个电商的 wms 中对一件货物的描述,可能有下面这些字段:
仓库 id,入库时间,库位分区 id,储存货架 id,入库操作员 id,出库操作员 id,库存数量,过期时间,sku 类型,产品品牌,产品分类,内件数量
除了上述信息,如果商品在仓库内有流转。可能还有有关联的流程 id,当前的流转状态等等。
想像一下,如果我们所经营的是一个大型电商,每天有千万级别的订单,那么在这个数据库中查询和建立合适的索引都是一件非常难的事情。
在 CRM 或客服类系统中,常常有根据关键字进行搜索的需求,大型互联网公司每天会接收数以万计的用户投诉。而考虑到事件溯源,用户的投诉至少要存 2~3 年。又是千万级甚至上亿的数据。根据关键字进行一次 like 查询,可能整个 MySQL 就直接挂掉了。
这时候我们就需要搜索引擎来救场了。
搜索引擎
elasticsearch 是开源分布式搜索引擎的霸主,其依赖于 Lucene 实现,在部署和运维方面做了很多优化。当今搭建一个分布式搜索引擎比起 Sphinx 的时代已经是容易很多很多了。只要简单配置客户端 ip 和端口就可以了。
倒排列表
虽然 es 是针对搜索场景来定制的,但如前文所言,实际应用中常常用 es 来作为 database 来使用,就是因为倒排列表的特性。可以用比较朴素的观点来理解倒排索引:
┌─────────────────┐ ┌─────────────┬─────────────┬─────────────┬─────────────┐
│ order_id: 103 │──────▶│ doc_id:4231 │ doc_id:4333 │ doc_id:5123 │ doc_id:9999 │
└─────────────────┘ └─────────────┴─────────────┴─────────────┴─────────────┘
┌─────────────────┐ ┌─────────────┬─────────────┬─────────────┬─────────────┬─────────────┐
│ sku_id: 30221 │──────▶│ doc_id:4231 │ doc_id:5123 │ doc_id:5644 │ doc_id:7801 │ doc_id:9999 │
└─────────────────┘ └─────────────┴─────────────┴─────────────┴─────────────┴─────────────┘
┌─────────────────┐ ┌─────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┐
│ city_id: 3 │──────▶│ doc_id:5123 │ doc_id:9999 │doc_id:10232 │doc_id:54321 │doc_id:63142 │doc_id:71230 │doc_id:90123 │
└─────────────────┘ └─────────────┴─────────────┴─────────────┴─────────────┴─────────────┴─────────────┴─────────────┘
对 es 中的数据进行查询时,本质就是求多个排好序的序列求交集。非数值类型字段涉及到分词问题,大多数内部使用场景下,我们可以直接使用默认的 bi-gram 分词。什么是 bi-gram 分词呢:
即将所有 Ti 和 T(i+1) 组成一个词(在 es 中叫 term),然后再编排其倒排列表,这样我们的倒排列表大概就是这样的:
当用户搜索 ‘天气很好’ 时,其实就是求:天气、气很、很好三组倒排列表的交集,但这里的相等判断逻辑有些特殊,用伪代码表示一下:
func equal() {
if postEntry.docID of '天气' == postEntry.docID of '气很' && postEntry.offset + 1 of '天气' == postEntry.offset of '气很' {
return true
}
if postEntry.docID of '气很' == postEntry.docID of '很好' && postEntry.offset + 1 of '气很' == postEntry.offset of '很好' {
return true
}
if postEntry.docID of '天气' == postEntry.docID of '很好' && postEntry.offset + 2 of '天气' == postEntry.offset of '很好' {
return true
}
return false
}
多个有序列表求交集的时间复杂度是:O(N * M), N 为给定列表当中元素数最小的集合, M 为给定列表的个数。
在整个算法中起决定作用的一是最短的倒排列表的长度,其次是词数总和,一般词数不会很大(想像一下,你会在搜索引擎里输入几百字来搜索么?),所以起决定性作用的,一般是所有倒排列表中,最短的那一个的长度。
因此,文档总数很多的情况下,搜索词的倒排列表最短的那一个不长时,搜索速度也是很快的。如果用关系型数据库,那就需要按照索引(如果有的话)来慢慢扫描了。
查询 DSL
es 的 Bool Query 方案,实际上就是用 json 来表达了这种程序语言中的 Boolean Expression,为什么可以这么做呢?因为 json 本身是可以表达树形结构的,我们的程序代码在被编译器 parse 之后,也会变成 AST,而 AST 抽象语法树,顾名思义,就是树形结构。理论上 json 能够完备地表达一段程序代码被 parse 之后的结果。这里的 Boolean Expression 被编译器 Parse 之后也会生成差不多的树形结构,而且只是整个编译器实现的一个很小的子集。
基于 client sdk 做开发
将 sql 转换为 DSL
比如我们有一段 bool 表达式,user_id = 1 and (product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1),写成 SQL 是如下形式:
select * from xxx where user_id = 1 and (product_id = 1 and (star_num = 4 or star_num = 5) and banned = 1)
写成 es 的 DSL 是如下形式:
{
"query": {
"bool": {
"must": [
{
"match": {
"user_id": {
"query": "1",
"type": "phrase"
}
}
},
{
"match": {
"product_id": {
"query": "1",
"type": "phrase"
}
}
},
{
"bool": {
"should": [
{
"match": {
"star_num": {
"query": "4",
"type": "phrase"
}
}
},
{
"match": {
"star_num": {
"query": "5",
"type": "phrase"
}
}
}
]
}
},
{
"match": {
"banned": {
"query": "1",
"type": "phrase"
}
}
}
]
}
},
"from": 0,
"size": 1
}
es 的 DSL 虽然很好理解,但是手写起来非常费劲。前面提供了基于 SDK 的方式来写,但也不足够灵活。
SQL 的 where 部分就是 boolean expression。我们之前提到过,这种 bool 表达式在被 parse 之后,和 es 的 DSL 的结构长得差不多,我们能不能直接通过这种“差不多”的猜测来直接帮我们把 SQL 转换成 DSL 呢?
当然可以,我们把 SQL 的 where 被 Parse 之后的结构和 es 的 DSL 的结构做个对比:
既然结构上完全一致,逻辑上我们就可以相互转换。我们以广度优先对 AST 树进行遍历,然后将二元表达式转换成 json 字符串,再拼装起来就可以了,限于篇幅,本文中就不给出示例了,
异构数据同步
在实际应用中,我们很少直接向搜索引擎中写入数据。更为常见的方式是,将 MySQL 或其它关系型数据中的数据同步到搜索引擎中。而搜索引擎的使用方只能对数据进行查询,无法进行修改和删除。
常见的同步方案有两种:
通过时间戳进行增量数据同步
┌────────────────────────┐ ┌────────────────────────┐
│ move 10 min data to es │ │ move 10 min data to es │
└────────────────────────┘ └────────────────────────┘
│ │ ┌───────────────┐
───────────────┼────────────────┬──────────────┴─────────────┬──────────────▶ │ time passes │
│ ┌───────┐ │ │ └───────────────┘
│◀──┤ 10min ├───▶│ ┌────────────────────────┐
│ └───────┘ │ │ move 10 min data to es │
│ └────────────────────────┘
│
│
│
│
┌────────────────────────┐
│ move 10 min data to es │
└────────────────────────┘
这种同步方式与业务强绑定,例如 wms 系统中的出库单,我们并不需要非常实时,稍微有延迟也可以接受,那么我们可以每分钟从 MySQL 的出库单表中,把最近十分钟创建的所有出库单取出,批量存入 es 中,具体的逻辑实际上就是一条 SQL:
from wms_orders where update_time >= date_sub(now(), interval 10 minute);
当然,考虑到边界情况,我们可以让这个时间段的数据与前一次的有一些重叠:
select * from wms_orders where update_time >= date_sub(now(), interval 11 minute);
取最近 11 分钟有变动的数据覆盖更新到 es 中。这种方案的缺点显而易见,我们必须要求业务数据严格遵守一定的规范。比如这里的,必须要有 update_time 字段,并且每次创建和更新都要保证该字段有正确的时间值。否则我们的同步逻辑就会丢失数据。
通过 binlog 进行数据同步
┌────────────────────────┐
│ MySQL master │
└────────────────────────┘
│
│
│
│
│
│
▼
┌───────────────────┐
│ row format binlog │
└───────────────────┘
│
│
│
┌───────────────┴──────────────┐
│ │
│ │
▼ ▼
┌────────────────────────┐ ┌─────────────────┐
│ MySQL slave │ │ canal │
└────────────────────────┘ └─────────────────┘
│
┌─────────┴──────────┐
│ parsed binlog │
└─────────┬──────────┘
│
▼
┌────────────────┐
│ kafka │─────┐
└────────────────┘ │
│
│
│
│
┌───────────┴──────┐
│ kafka consumer │
└───────────┬──────┘
│
│
│
│ ┌────────────────┐
└─────▶│ elasticsearch │
└────────────────┘
业界使用较多的是阿里开源的 canal,来进行 binlog 解析与同步。canal 会伪装成 MySQL 的从库,然后解析好行格式的 binlog,再以更容易解析的格式(例如 json) 发送到消息队列。
由下游的 kafka 消费者负责把上游数据表的自增主键作为 es 的 document 的 id 进行写入,这样可以保证每次接收到 binlog 时,对应 id 的数据都被覆盖更新为最新。MySQL 的 row 格式的 binlog 会将每条记录的所有字段都提供给下游,所以实际上在向异构数据目标同步数据时,不需要考虑数据是插入还是更新,只要一律按 id 进行覆盖即可。
这种模式同样需要业务遵守一条数据表规范,即表中必须有唯一主键 id 来保证我们进入 es 的数据不会发生重复。一旦不遵守该规范,那么就会在同步时导致数据重复。当然,你也可以为每一张需要的表去定制消费者的逻辑,这就不是通用系统讨论的范畴了。
6.5 Load-Balance 负载均衡
6.5.1 常见的负载均衡思路
如果我们不考虑均衡的话,现在有 n 个 endpoint,我们完成业务流程实际上只需要从这 n 个中挑出其中的一个。有几种思路:
- 按顺序挑: 例如上次选了第一台,那么这次就选第二台,下次第三台,如果已经到了最后一台,那么下一次从第一台开始。这种情况下我们可以把 endpoint 都存储在数组中,每次请求完成下游之后,将一个索引后移即可。在移到尽头时再移回数组开头处。
- 随机挑一个: 每次都随机挑,真随机伪随机均可。假设选择第 x 台机器,那么 x 可描述为
rand.Intn() % n
。 - 根据某种权重,对下游 endpoints 进行排序,选择权重最大/小的那一个。
当然了,实际场景我们不可能无脑轮询或者无脑随机,如果对下游请求失败了,我们还需要某种机制来进行重试,如果纯粹的随机算法,存在一定的可能性使你在下一次仍然随机到这次的问题节点。
6.5.2 基于洗牌算法的负载均衡
考虑到我们需要随机选取每次发送请求的 endpoint,同时在遇到下游返回错误时换其它节点重试。所以我们设计一个大小和 endpoints 数组大小一致的索引数组,每次来新的请求,我们对索引数组做洗牌,然后取第一个元素作为选中的服务节点,如果请求失败,那么选择下一个节点重试,以此类推:
var endpoints = []string {
"100.69.62.1:3232",
"100.69.62.32:3232",
"100.69.62.42:3232",
"100.69.62.81:3232",
"100.69.62.11:3232",
"100.69.62.113:3232",
"100.69.62.101:3232",
}
// 重点在这个 shuffle
func shuffle(slice []int) {
for i := 0; i < len(slice); i++ {
a := rand.Intn(len(slice))
b := rand.Intn(len(slice))
slice[a], slice[b] = slice[b], slice[a]
}
}
func request(params map[string]interface{}) error {
var indexes = []int {0,1,2,3,4,5,6}
var err error
shuffle(indexes)
maxRetryTimes := 3
idx := 0
for i := 0; i < maxRetryTimes; i++ {
err = apiRequest(params, indexes[idx])
if err == nil {
break
}
idx++
}
if err != nil {
// logging
return err
}
return nil
}
6.5.2.1 错误的洗牌导致的负载不均衡
这段简短的程序里有两个隐藏的隐患:
- 没有随机种子。在没有随机种子的情况下,rand.Intn 返回的伪随机数序列是固定的。
- 洗牌不均匀,会导致整个数组第一个节点有大概率被选中,并且多个节点的负载分布不均衡。
6.5.2.2 修正洗牌算法
从数学上得到过证明的还是经典的 fisher-yates 算法,主要思路为每次随机挑选一个值,放在数组末尾。然后在 n-1 个元素的数组中再随机挑选一个值,放在数组末尾,以此类推。
func shuffle(indexes []int) {
for i:=len(indexes); i>0; i-- {
lastIdx := i - 1
idx := rand.Int(i)
indexes[lastIdx], indexes[idx] = indexes[idx], indexes[lastIdx]
}
}
在 Go 的标准库中实际上已经为我们内置了该算法:
func shuffle(n int) []int {
b := rand.Perm(n)
return b
}
在当前的场景下,我们只要用 rand.Perm 就可以得到我们想要的索引数组了
6.5.3 zk 集群的随机节点挑选问题
本节中的场景是从 N 个节点中选择一个节点发送请求,初始请求结束之后,后续的请求会重新对数组洗牌,所以每两个请求之间没有什么关联关系。因此我们上面的洗牌算法,理论上不初始化随机库的种子也是不会出什么问题的。
但在一些特殊的场景下,例如使用 zk 时,客户端初始化从多个服务节点中挑选一个节点后,是会向该节点建立长连接的。并且之后如果有请求,也都会发送到该节点去。直到该节点不可用,才会在 endpoints 列表中挑选下一个节点。在这种场景下,我们的初始连接节点选择就要求必须是“真”随机了。否则,所有客户端起动时,都会去连接同一个 zk 的实例,根本无法起到负载均衡的目的。如果在日常开发中,你的业务也是类似的场景,也务必考虑一下是否会发生类似的情况。为 rand 库设置种子的方法:
rand.Seed(time.Now().UnixNano())
之所以会有上面这些结论,是因为某个使用较广泛的开源 zk 库的早期版本就犯了上述错误,直到 2016 年早些时候,这个问题才被修正。
6.6 分布式配置管理
在分布式系统中,常困扰我们的还有上线问题。虽然目前有一些优雅重启方案,但实际应用中可能受限于我们系统内部的运行情况而没有办法做到真正的“优雅”。比如我们为了对去下游的流量进行限制,在内存中堆积一些数据,并对堆积设定时间/总量的阈值。在任意阈值达到之后将数据统一发送给下游,以避免频繁的请求超出下游的承载能力而将下游打垮。这种情况下重启要做到优雅就比较难了。
所以我们的目标还是尽量避免采用或者绕过上线的方式,对线上程序做一些修改。比较典型的修改内容就是程序的配置项。
6.6.1 场景举例
6.6.1.1 报表系统
在一些偏 OLAP 或者离线的数据平台中,经过长期的叠代开发,整个系统的功能模块已经渐渐稳定。可变动的项只出现在数据层,而数据层的变动大多可以认为是 SQL 的变动,架构师们自然而然地会想着把这些变动项抽离到系统外部。比如本节所述的配置管理系统。
当业务提出了新的需求时,我们的需求是将新的 SQL 录入到系统内部,或者简单修改一下老的 SQL。不对系统进行上线,就可以直接完成这些修改。
6.6.1.2 业务配置
大公司的平台部门服务众多业务线,在平台内为各业务线分配唯一 id。平台本身也由多个模块构成,这些模块需要共享相同的业务线定义(要不然就乱套了)。当公司新开产品线时,需要能够在短时间内打通所有平台系统的流程。这时候每个系统都走上线流程肯定是来不及的。另外需要对这种公共配置进行统一管理,同时对其增减逻辑也做统一管理。这些信息变更时,需要自动通知到业务方的系统,而不需要人力介入(或者只需要很简单的介入,比如点击审核通过)。
除业务线管理之外,很多互联网公司会按照城市来铺展自己的业务。在某个城市未开城之前,理论上所有模块都应该认为带有该城市 id 的数据是脏数据并自动过滤掉。而如果业务开城,在系统中就应该自己把这个新的城市 id 自动加入到白名单中。这样业务流程便可以自动运转。
再举个例子,互联网公司的运营系统中会有各种类型的运营活动,有些运营活动推出后可能出现了超出预期的事件(比如公关危机),需要紧急将系统下线。这时候会用到一些开关来快速关闭相应的功能。或者快速将想要剔除的活动 id 从白名单中剔除。在 web 章节中的 ab test 一节中,我们也提到,有时需要有这样的系统来告诉我们当前需要放多少流量到相应的功能代码上。我们可以像那一节中,使用远程 rpc 来获知这些信息,但同时,也可以结合分布式配置系统,主动地拉取到这些信息。
6.6.2 使用 etcd 实现配置更新
6.6.2.1 配置定义
简单的配置,可以将内容完全存储在 etcd 中。比如:
etcdctl get /configs/remote_config.json
{
"addr" : "127.0.0.1:1080",
"aes_key" : "01B345B7A9ABC00F0123456789ABCDAF",
"https" : false,
"secret" : "",
"private_key_path" : "",
"cert_file_path" : ""
}
6.6.2.2 新建 etcd client
cfg := client.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
Transport: client.DefaultTransport,
HeaderTimeoutPerRequest: time.Second,
}
直接用 etcd client 包中的结构体初始化,没什么可说的。
6.6.2.3 配置获取
resp, err = kapi.Get(context.Background(), "/path/to/your/config", nil)
if err != nil {
log.Fatal(err)
} else {
log.Printf("Get is done. Metadata is %q\n", resp)
log.Printf("%q key has %q value\n", resp.Node.Key, resp.Node.Value)
}
获取配置使用 etcd KeysAPI 的 Get 方法,比较简单。
6.6.2.4 配置更新订阅
kapi := client.NewKeysAPI(c)
w := kapi.Watcher("/path/to/your/config", nil)
go func() {
for {
resp, err := w.Next(context.Background())
log.Println(resp, err)
log.Println("new values is ", resp.Node.Value)
}
}()
通过订阅 config 路径的变动事件,在该路径下内容发生变化时,客户端侧可以收到变动通知,并收到变动后的字符串值。
这里只需要注意一点,我们在更新配置时,进行了一系列操作:watch 响应,json 解析,这些操作都不具备原子性。当单个业务请求流程中多次获取 config 时,有可能因为中途 config 发生变化而导致单个请求前后逻辑不一致。因此,在使用类似这样的方式来更新配置时,需要在单个请求的生命周期内使用同样的配置。具体实现方式可以是只在请求开始的时候获取一次配置,然后依次向下透传等等,具体情况具体分析。
6.6.3 配置膨胀
随着业务的发展,配置系统本身所承载的压力可能也会越来越大,配置文件可能成千上万。客户端同样上万,将配置内容存储在 etcd 内部便不再合适了
随着配置文件数量的膨胀,除了存储系统本身的吞吐量问题,还有配置信息的管理问题。我们需要对相应的配置进行权限管理,需要根据业务量进行配置存储的集群划分。如果客户端太多,导致了配置存储系统无法承受瞬时大量的 QPS,那可能还需要在客户端侧进行缓存优化,等等。
这也就是为什么大公司都会针对自己的业务额外开发一套复杂配置系统的原因。
6.6.4 配置版本管理
在配置管理过程中,难免出现用户误操作的情况,例如在更新配置时,输入了无法解析的配置。这种情况下我们可以通过配置校验来解决。
有时错误的配置可能不是格式上有问题,而是在逻辑上有问题。比如我们写 SQL 时少 select 了一个字段,更新配置时,不小心把丢掉了 json 字符串中的一个 field 而导致程序无法理解新的配置而进入诡异的逻辑。为了快速止损,最快且最有效的办法就是进行版本管理,并支持按版本回滚。
在配置进行更新时,我们要为每份配置的新内容赋予一个版本号,并将修改前的内容和版本号记录下来,当发现新配置出问题时,能够及时地回滚回来。
常见的做法是,使用 MySQL 来存储配置文件/字符串的不同版本内容,在需要回滚时,只要进行简单的查询即可。
6.6.5 客户端容错
在业务系统的配置被剥离到配置中心之后,并不意味着我们的系统可以高枕无忧了。当配置中心本身宕机时,我们也需要一定的容错能力,至少保证在其宕机期间,业务依然可以运转。这要求我们的系统能够在配置中心宕机时,也能拿到需要的配置信息。哪怕这些信息不够新。
具体来讲,在给业务提供配置读取的 sdk 时,最好能够将拿到的配置在业务机器的磁盘上也缓存一份。这样远程配置中心不可用时,可以直接用硬盘上的内容来做兜底。当重新连接上配置中心时,再把相应的内容进行更新。
加入缓存之后务必需要考虑的是数据一致性问题,当个别业务机器因为网络错误而与其它机器配置不一致时,我们也应该能够从监控系统中知晓。
我们使用一种手段解决了我们配置更新痛点,但同时可能因为使用的手段而带给我们新的问题。
6.7 分布式爬虫
nats 简介
nats 是 Go 实现的一个高性能分布式消息队列,适用于高并发高吞吐量的消息分发场景。早期的 nats 以速度为重,没有支持持久化。从 16 年开始,nats 通过 nats-streaming 支持基于日志的持久化,以及可靠的消息传输。为了演示方便,我们本节中只使用 nats。
nats 的服务端项目是 gnatsd,客户端与 gnatsd 的通信方式为基于 tcp 的文本协议,非常简单:
向 subject 为 task 发消息:
以 workers 的 queue 从 tasks subject 订阅消息:
其中的 queue 参数是可选的,如果希望在分布式的消费端进行任务的负载均衡,而不是所有人都收到同样的消息,那么就要给消费端指定相同的 queue 名字。
基本消息生产
生产消息只要指定 subject 即可:
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
// log error
return
}
// 指定 subject 为 tasks,消息内容随意
err = nc.Publish("tasks", []byte("your task content"))
nc.Flush()
6.8 补充说明
分布式是很大的领域,本章中的介绍只能算是对领域的管中窥豹。因为大型系统流量大,并发高,所以往往很多朴素的方案会变得难以满足需求。人们为了解决大型系统场景中的各种问题,而开发出了各式各样的分布式系统。有些系统非常简单,比如本章中介绍的分布式 id 生成器,而有一些系统则可能非常复杂,比如本章中的分布式搜索引擎(当然,本章中提到的 es 不是 Go 实现)。
附录
附录A:Go语言常见坑
可变参数是空接口类型
当参数的可变参数是空接口类型时,传人空接口的切片时需要注意参数展开的问题。
func main() {
var a = []interface{}{1, 2, 3}
fmt.Println(a)
fmt.Println(a...)
}
不管是否展开,编译器都无法发现错误,但是输出是不同的:
[1 2 3]
1 2 3
数组是值传递
map遍历是顺序不固定
返回值被屏蔽
在局部作用域中,命名的返回值内同名的局部变量屏蔽:
func Foo() (err error) {
if err := Bar(); err != nil {
return
}
return
}
recover必须在defer函数中运行
recover捕获的是祖父级调用时的异常,直接调用时无效:
func main() {
recover()
panic(1)
}
直接defer调用也是无效:
func main() {
defer recover()
panic(1)
}
defer调用时多层嵌套依然无效:
func main() {
defer func() {
func() { recover() }()
}()
panic(1)
}
必须在defer函数中直接调用才有效:
func main() {
defer func() {
recover()
}()
panic(1)
}
main函数提前退出
后台Goroutine无法保证完成任务。
func main() {
go println("hello")
}
通过Sleep来回避并发中的问题
休眠并不能保证输出完整的字符串:
func main() {
go println("hello")
time.Sleep(time.Second)
}
类似的还有通过插入调度语句:
func main() {
go println("hello")
runtime.Gosched()
}
独占CPU导致其它Goroutine饿死
Goroutine是协作式抢占调度,Goroutine本身不会主动放弃CPU:
func main() {
runtime.GOMAXPROCS(1)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(i)
}
}()
for {} // 占用CPU
}
解决的方法是在for循环加入runtime.Gosched()调度函数:
func main() {
runtime.GOMAXPROCS(1)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(i)
}
}()
for {
runtime.Gosched()
}
}
或者是通过阻塞的方式避免CPU占用:
func main() {
runtime.GOMAXPROCS(1)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(i)
}
os.Exit(0)
}()
select{}
}
不同Goroutine之间不满足顺序一致性内存模型
因为在不同的Goroutine,main函数中无法保证能打印出hello, world
:
var msg string
var done bool
func setup() {
msg = "hello, world"
done = true
}
func main() {
go setup()
for !done {
}
println(msg)
}
解决的办法是用显式同步:
var msg string
var done = make(chan bool)
func setup() {
msg = "hello, world"
done <- true
}
func main() {
go setup()
<-done
println(msg)
}
msg的写入是在channel发送之前,所以能保证打印hello, world
闭包错误引用同一个变量
func main() {
for i := 0; i < 5; i++ {
defer func() {
println(i)
}()
}
}
改进的方法是在每轮迭代中生成一个局部变量:
func main() {
for i := 0; i < 5; i++ {
i := i
defer func() {
println(i)
}()
}
}
或者是通过函数参数传入:
func main() {
for i := 0; i < 5; i++ {
defer func(i int) {
println(i)
}(i)
}
}
在循环内部执行defer语句
defer在函数退出时才能执行,在for执行defer会导致资源延迟释放:
func main() {
for i := 0; i < 5; i++ {
f, err := os.Open("/path/to/file")
if err != nil {
log.Fatal(err)
}
defer f.Close()
}
}
解决的方法可以在for中构造一个局部函数,在局部函数内部执行defer:
func main() {
for i := 0; i < 5; i++ {
func() {
f, err := os.Open("/path/to/file")
if err != nil {
log.Fatal(err)
}
defer f.Close()
}()
}
}
切片会导致整个底层数组被锁定
切片会导致整个底层数组被锁定,底层数组无法释放内存。如果底层数组较大会对内存产生很大的压力。
func main() {
headerMap := make(map[string][]byte)
for i := 0; i < 5; i++ {
name := "/path/to/file"
data, err := ioutil.ReadFile(name)
if err != nil {
log.Fatal(err)
}
headerMap[name] = data[:1]
}
// do some thing
}
解决的方法是将结果克隆一份,这样可以释放底层的数组:
func main() {
headerMap := make(map[string][]byte)
for i := 0; i < 5; i++ {
name := "/path/to/file"
data, err := ioutil.ReadFile(name)
if err != nil {
log.Fatal(err)
}
headerMap[name] = append([]byte{}, data[:1]...)
}
// do some thing
}
内存地址会变化
Go语言中对象的地址可能发生变化,因此指针不能从其它非指针类型的值生成:
func main() {
var x int = 42
var p uintptr = uintptr(unsafe.Pointer(&x))
runtime.GC()
var px *int = (*int)(unsafe.Pointer(p))
println(*px)
}
当内存发送变化的时候,相关的指针会同步更新,但是非指针类型的uintptr不会做同步更新。
同理CGO中也不能保存Go对象地址。
Goroutine泄露
Go语言是带内存自动回收的特性,因此内存一般不会泄漏。但是Goroutine确存在泄漏的情况,同时泄漏的Goroutine引用的内存同样无法被回收。
func main() {
ch := func() <-chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
ch <- i
}
} ()
return ch
}()
for v := range ch {
fmt.Println(v)
if v == 5 {
break
}
}
}
上面的程序中后台Goroutine向管道输入自然数序列,main函数中输出序列。但是当break跳出for循环的时候,后台Goroutine就处于无法被回收的状态了。
我们可以通过context包来避免这个问题:
func main() {
ctx, cancel := context.WithCancel(context.Background())
ch := func(ctx context.Context) <-chan int {
ch := make(chan int)
go func() {
for i := 0; ; i++ {
select {
case <- ctx.Done():
return
case ch <- i:
}
}
} ()
return ch
}(ctx)
for v := range ch {
fmt.Println(v)
if v == 5 {
cancel()
break
}
}
}
当main函数在break跳出循环时,通过调用cancel()
来通知后台Goroutine退出,这样就避免了Goroutine的泄漏。