Blog Go
Các mẫu đồng thời trong Go: Pipeline và hủy bỏ
Giới thiệu
Các nguyên tố đồng thời của Go giúp dễ dàng xây dựng các pipeline dữ liệu phát trực tuyến sử dụng I/O và nhiều CPU một cách hiệu quả. Bài viết này trình bày các ví dụ về các pipeline như vậy, làm nổi bật những điểm tinh tế phát sinh khi các thao tác thất bại, và giới thiệu các kỹ thuật để xử lý các thất bại một cách gọn gàng.
Pipeline là gì?
Không có định nghĩa chính thức về pipeline trong Go; đó chỉ là một trong nhiều loại chương trình đồng thời. Một cách không chính thức, pipeline là một loạt các giai đoạn được kết nối bởi các channel, trong đó mỗi giai đoạn là một nhóm goroutine chạy cùng một hàm. Trong mỗi giai đoạn, các goroutine
- nhận các giá trị từ upstream qua các channel inbound
- thực hiện một hàm nào đó trên dữ liệu đó, thường tạo ra các giá trị mới
- gửi các giá trị downstream qua các channel outbound
Mỗi giai đoạn có bất kỳ số lượng channel inbound và outbound nào, ngoại trừ giai đoạn đầu tiên và cuối cùng, chỉ có channel outbound hoặc inbound, tương ứng. Giai đoạn đầu tiên đôi khi được gọi là source hoặc producer; giai đoạn cuối cùng là sink hoặc consumer.
Chúng ta sẽ bắt đầu với một ví dụ pipeline đơn giản để giải thích các ý tưởng và kỹ thuật. Sau đó, chúng ta sẽ trình bày một ví dụ thực tế hơn.
Bình phương các số
Xem xét một pipeline với ba giai đoạn.
Giai đoạn đầu tiên, gen, là một hàm chuyển đổi một danh sách các số nguyên thành
channel phát ra các số nguyên trong danh sách đó. Hàm gen khởi động một
goroutine gửi các số nguyên trên channel và đóng channel khi tất cả
các giá trị đã được gửi:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
Giai đoạn thứ hai, sq, nhận các số nguyên từ một channel và trả về
channel phát ra bình phương của mỗi số nguyên nhận được. Sau khi
channel inbound được đóng và giai đoạn này đã gửi tất cả các giá trị
downstream, nó đóng channel outbound:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
Hàm main thiết lập pipeline và chạy giai đoạn cuối cùng: nó nhận
các giá trị từ giai đoạn thứ hai và in từng giá trị, cho đến khi channel bị đóng:
func main() {
// Set up the pipeline.
c := gen(2, 3)
out := sq(c)
// Consume the output.
fmt.Println(<-out) // 4
fmt.Println(<-out) // 9
}
Vì sq có cùng kiểu cho channel inbound và outbound của nó, chúng ta
có thể kết hợp nó bất kỳ số lần nào. Chúng ta cũng có thể viết lại main dưới dạng
vòng lặp range, giống như các giai đoạn khác:
func main() {
// Set up the pipeline and consume the output.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}
Fan-out, fan-in
Nhiều hàm có thể đọc từ cùng một channel cho đến khi channel đó bị đóng; điều này được gọi là fan-out. Điều này cung cấp một cách để phân phối công việc trong một nhóm worker để song song hóa việc sử dụng CPU và I/O.
Một hàm có thể đọc từ nhiều đầu vào và tiến hành cho đến khi tất cả đều đóng bằng cách ghép kênh các channel đầu vào vào một channel duy nhất được đóng khi tất cả các đầu vào bị đóng. Điều này được gọi là fan-in.
Chúng ta có thể thay đổi pipeline để chạy hai phiên bản sq, mỗi phiên bản đọc từ cùng
một channel đầu vào. Chúng ta giới thiệu một hàm mới, merge, để fan-in các
kết quả:
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
Hàm merge chuyển đổi một danh sách các channel thành một channel duy nhất bằng cách khởi động
một goroutine cho mỗi channel inbound sao chép các giá trị vào channel outbound duy nhất.
Khi tất cả các goroutine output đã được khởi động, merge khởi động thêm một
goroutine nữa để đóng channel outbound sau khi tất cả các lần gửi trên channel đó được
hoàn thành.
Gửi trên một channel đã đóng sẽ panic, vì vậy điều quan trọng là đảm bảo tất cả các lần gửi
đã xong trước khi gọi close. Kiểu
sync.WaitGroup
cung cấp một cách đơn giản để sắp xếp đồng bộ hóa này:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
Dừng sớm
Có một mẫu trong các hàm pipeline của chúng ta:
- các giai đoạn đóng channel outbound khi tất cả các thao tác gửi đã hoàn thành.
- các giai đoạn tiếp tục nhận giá trị từ các channel inbound cho đến khi các channel đó bị đóng.
Mẫu này cho phép mỗi giai đoạn nhận được viết dưới dạng vòng lặp range và
đảm bảo tất cả các goroutine thoát khi tất cả các giá trị đã được gửi thành công
downstream.
Nhưng trong các pipeline thực tế, các giai đoạn không phải lúc nào cũng nhận tất cả các giá trị inbound. Đôi khi điều này là theo thiết kế: receiver chỉ có thể cần một tập hợp con các giá trị để tiếp tục. Thường xuyên hơn, một giai đoạn thoát sớm vì một giá trị inbound đại diện cho một lỗi ở giai đoạn trước đó. Trong cả hai trường hợp, receiver không nên phải chờ các giá trị còn lại đến, và chúng ta muốn các giai đoạn trước đó dừng tạo ra các giá trị mà các giai đoạn sau không cần.
Trong pipeline ví dụ của chúng ta, nếu một giai đoạn không tiêu thụ hết tất cả các giá trị inbound, các goroutine cố gắng gửi các giá trị đó sẽ bị block vô thời hạn:
// Consume the first value from the output. out := merge(c1, c2) fmt.Println(<-out) // 4 or 9 return // Since we didn't receive the second value from out, // one of the output goroutines is hung attempting to send it. }
Đây là rò rỉ tài nguyên: các goroutine tiêu thụ bộ nhớ và tài nguyên runtime, và các tham chiếu heap trong các stack goroutine ngăn dữ liệu bị thu gom rác. Các goroutine không được thu gom rác; chúng phải tự thoát.
Chúng ta cần sắp xếp để các giai đoạn upstream của pipeline thoát ngay cả khi các giai đoạn downstream không nhận tất cả các giá trị inbound. Một cách để làm điều này là thay đổi các channel outbound để có buffer. Buffer có thể giữ một số lượng cố định các giá trị; các thao tác gửi hoàn thành ngay lập tức nếu có chỗ trong buffer:
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
Khi số lượng giá trị sẽ được gửi được biết tại thời điểm tạo channel, buffer
có thể đơn giản hóa code. Ví dụ, chúng ta có thể viết lại gen để sao chép danh sách
các số nguyên vào một channel có buffer và tránh tạo một goroutine mới:
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
Quay lại các goroutine bị block trong pipeline của chúng ta, chúng ta có thể xem xét thêm
buffer vào channel outbound được trả về bởi merge:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // enough space for the unread inputs
// ... the rest is unchanged ...
Mặc dù điều này sửa goroutine bị block trong chương trình này, đây là code xấu. Việc
chọn kích thước buffer là 1 ở đây phụ thuộc vào việc biết số lượng giá trị mà merge
sẽ nhận và số lượng giá trị mà các giai đoạn downstream sẽ tiêu thụ. Điều này
mong manh: nếu chúng ta truyền một giá trị bổ sung cho gen, hoặc nếu giai đoạn downstream
đọc ít giá trị hơn, chúng ta sẽ lại có các goroutine bị block.
Thay vào đó, chúng ta cần cung cấp một cách để các giai đoạn downstream chỉ ra cho các sender rằng họ sẽ ngừng chấp nhận đầu vào.
Hủy bỏ tường minh
Khi main quyết định thoát mà không nhận tất cả các giá trị từ
out, nó phải nói với các goroutine ở các giai đoạn upstream để từ bỏ
các giá trị chúng đang cố gửi. Nó làm điều này bằng cách gửi các giá trị trên một
channel gọi là done. Nó gửi hai giá trị vì có
hai sender tiềm năng bị block:
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the first value from output.
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// Tell the remaining senders we're leaving.
done <- struct{}{}
done <- struct{}{}
}
Các goroutine gửi thay thế thao tác gửi của chúng bằng câu lệnh select
tiến hành khi lần gửi trên out xảy ra hoặc khi chúng nhận được một giá trị
từ done. Kiểu giá trị của done là struct rỗng vì giá trị
không quan trọng: sự kiện nhận là thứ chỉ ra rằng lần gửi trên out nên
được từ bỏ. Các goroutine output tiếp tục lặp trên channel inbound
của chúng, c, vì vậy các giai đoạn upstream không bị block. (Chúng ta sẽ thảo luận trong giây lát
về cách cho phép vòng lặp này trả về sớm.)
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed or it receives a value
// from done, then output calls wg.Done.
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ... the rest is unchanged ...
Cách tiếp cận này có một vấn đề: mỗi receiver downstream cần biết số lượng sender upstream tiềm năng bị block và sắp xếp để báo hiệu cho những sender đó khi trả về sớm. Việc theo dõi các số đếm này rất tẻ nhạt và dễ xảy ra lỗi.
Chúng ta cần một cách để nói với số lượng goroutine không biết và không giới hạn để ngừng gửi các giá trị của chúng downstream. Trong Go, chúng ta có thể làm điều này bằng cách đóng một channel, vì thao tác nhận trên một channel đã đóng luôn có thể tiến hành ngay lập tức, trả về giá trị zero của kiểu phần tử.
Điều này có nghĩa là main có thể bỏ block tất cả các sender chỉ bằng cách đóng
channel done. Việc đóng này thực sự là một tín hiệu phát sóng cho
các sender. Chúng ta mở rộng mỗi hàm pipeline của mình để chấp nhận
done như một tham số và sắp xếp cho việc đóng xảy ra qua câu lệnh
defer, để tất cả các đường trả về từ main sẽ báo hiệu
các giai đoạn pipeline thoát.
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)
// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done will be closed by the deferred call.
}
Mỗi giai đoạn pipeline của chúng ta bây giờ có thể trả về ngay khi done bị đóng.
Routine output trong merge có thể trả về mà không cần thoát hết channel inbound của nó,
vì nó biết sender upstream, sq, sẽ ngừng cố gắng gửi khi
done bị đóng. output đảm bảo wg.Done được gọi trên tất cả các đường trả về qua
câu lệnh defer:
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ... the rest is unchanged ...
Tương tự, sq có thể trả về ngay khi done bị đóng. sq đảm bảo channel out
của nó bị đóng trên tất cả các đường trả về qua câu lệnh defer:
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
Đây là các hướng dẫn cho việc xây dựng pipeline:
- các giai đoạn đóng channel outbound khi tất cả các thao tác gửi đã hoàn thành.
- các giai đoạn tiếp tục nhận giá trị từ các channel inbound cho đến khi các channel đó bị đóng hoặc các sender được bỏ block.
Các pipeline bỏ block các sender bằng cách đảm bảo có đủ buffer cho tất cả các giá trị được gửi hoặc bằng cách báo hiệu tường minh cho các sender khi receiver có thể từ bỏ channel.
Tính toán digest của một cây
Hãy xem xét một pipeline thực tế hơn.
MD5 là thuật toán message-digest hữu ích như một file checksum. Tiện ích dòng lệnh
md5sum in các giá trị digest cho một danh sách các tệp.
% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
Chương trình ví dụ của chúng ta giống như md5sum nhưng thay vào đó nhận một thư mục duy nhất làm
đối số và in các giá trị digest cho mỗi tệp thông thường trong thư mục đó,
được sắp xếp theo tên đường dẫn.
% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
Hàm main của chương trình gọi một hàm helper MD5All, trả về một
map từ tên đường dẫn đến giá trị digest, sau đó sắp xếp và in kết quả:
func main() {
// Calculate the MD5 sum of all files under the specified directory,
// then print the results sorted by path name.
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
Hàm MD5All là tâm điểm của cuộc thảo luận. Trong
serial.go, triển khai không sử dụng tính đồng thời và
chỉ đơn giản là đọc và tính tổng mỗi tệp khi nó duyệt cây.
// MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. func MD5All(root string) (map[string][md5.Size]byte, error) { m := make(map[string][md5.Size]byte) err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } data, err := ioutil.ReadFile(path) if err != nil { return err } m[path] = md5.Sum(data) return nil }) if err != nil { return nil, err } return m, nil }
Tính toán song song
Trong parallel.go, chúng ta chia MD5All thành pipeline hai giai đoạn.
Giai đoạn đầu tiên, sumFiles, duyệt cây, tính digest của mỗi tệp trong
một goroutine mới, và gửi kết quả trên một channel với kiểu giá trị result:
type result struct {
path string
sum [md5.Size]byte
err error
}
sumFiles trả về hai channel: một cho results và một cho lỗi
được trả về bởi filepath.Walk. Hàm walk khởi động một goroutine mới để
xử lý mỗi tệp thông thường, sau đó kiểm tra done. Nếu done bị đóng, walk
dừng ngay lập tức:
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// For each regular file, start a goroutine that sums the file and sends
// the result on c. Send the result of the walk on errc.
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk has returned, so all calls to wg.Add are done. Start a
// goroutine to close c once all the sends are done.
go func() {
wg.Wait()
close(c)
}()
// No select needed here, since errc is buffered.
errc <- err
}()
return c, errc
}
MD5All nhận các giá trị digest từ c. MD5All trả về sớm khi có lỗi,
đóng done qua defer:
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
Song song có giới hạn
Triển khai MD5All trong parallel.go
khởi động một goroutine mới cho mỗi tệp. Trong một thư mục với nhiều tệp lớn,
điều này có thể cấp phát nhiều bộ nhớ hơn mức có sẵn trên máy.
Chúng ta có thể giới hạn các cấp phát này bằng cách giới hạn số lượng tệp được đọc song song. Trong bounded.go, chúng ta làm điều này bằng cách tạo một số lượng cố định các goroutine để đọc các tệp. Pipeline của chúng ta bây giờ có ba giai đoạn: duyệt cây, đọc và tính digest các tệp, và thu thập các digest.
Giai đoạn đầu tiên, walkFiles, phát ra các đường dẫn của các tệp thông thường trong cây:
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// Close the paths channel after Walk returns.
defer close(paths)
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}
Giai đoạn giữa khởi động một số lượng cố định các goroutine digester nhận
tên tệp từ paths và gửi results trên channel c:
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
Không giống như các ví dụ trước của chúng ta, digester không đóng channel đầu ra của nó, vì
nhiều goroutine đang gửi trên một channel chia sẻ. Thay vào đó, code trong MD5All
sắp xếp để channel bị đóng khi tất cả các digester đã xong:
// Start a fixed number of goroutines to read and digest files. c := make(chan result) var wg sync.WaitGroup const numDigesters = 20 wg.Add(numDigesters) for i := 0; i < numDigesters; i++ { go func() { digester(done, paths, c) wg.Done() }() } go func() { wg.Wait() close(c) }()
Thay vào đó chúng ta có thể có mỗi digester tạo và trả về channel đầu ra riêng của nó, nhưng sau đó chúng ta sẽ cần các goroutine bổ sung để fan-in các kết quả.
Giai đoạn cuối cùng nhận tất cả results từ c rồi kiểm tra lỗi
từ errc. Kiểm tra này không thể xảy ra sớm hơn, vì trước
điểm này, walkFiles có thể bị block khi gửi các giá trị downstream:
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Check whether the Walk failed.
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
Kết luận
Bài viết này đã trình bày các kỹ thuật để xây dựng các pipeline dữ liệu phát trực tuyến trong Go. Xử lý các thất bại trong các pipeline như vậy là khó khăn, vì mỗi giai đoạn trong pipeline có thể bị block khi cố gắng gửi các giá trị downstream, và các giai đoạn downstream có thể không còn quan tâm đến dữ liệu đến. Chúng ta đã chỉ ra cách đóng một channel có thể phát sóng tín hiệu “done” đến tất cả các goroutine được khởi động bởi một pipeline và xác định các hướng dẫn để xây dựng pipeline đúng cách.
Đọc thêm:
- Go Concurrency Patterns (video) trình bày các khái niệm cơ bản về các nguyên tố đồng thời của Go và một số cách áp dụng chúng.
- Advanced Go Concurrency Patterns
(video) đề cập đến các cách sử dụng phức tạp hơn
của các nguyên tố Go,
đặc biệt là
select. - Bài báo của Douglas McIlroy Squinting at Power Series chỉ ra cách tính đồng thời giống Go cung cấp hỗ trợ thanh lịch cho các tính toán phức tạp.
Bài tiếp theo: Chú gopher của Go
Bài trước: Các bài nói về Go tại FOSDEM 2014
Mục lục blog