Go语言实现协程下载器

2023-06-28,

一般常用的下载方式是通过浏览器访问URL,然后基于HTTP进行下载。这种单线程下载方式通常比较慢,这里尝试使用Go语言实现一个多协程的下载器

大致思路

按照传统的单线程的思路,实现下载要基于HTTP请求,因此要知道对应的URL,请求该URL会得到服务器的响应Responce。之后取出Response的头部的Content-Length,得到要下载文件的大小。从Responce的Body中读取文件内容,保存到指定的路径。多协程的下载同样基于HTTP,但会将文件分片进行并发下载。

发起请求

先向一个下载链接发送一个请求,看看会回显什么。

这里的目标URL是ubuntu系统的下载链接

package main

import (
"fmt"
"log"
"net/http"
) var (
url = "https://releases.ubuntu.com/22.04/ubuntu-22.04-desktop-amd64.iso"
) func main() {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
log.Fatalln(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return
}
for k, v := range resp.Header {
fmt.Println(k, v)
}
}

如上代码所示,使用http.NewRequest创建一个对象,向该函数传入请求方式和URL。之后向http.DefaultClient.Do传入上面创建的对象,即可发送该请求。该函数会返回响应报文,其中的响应头(或者叫它首部行head line)是一个map,使用for-range来遍历打印这个map

Date [Thu, 19 May 2022 14:55:02 GMT]
Server [Apache/2.4.29 (Ubuntu)]
Last-Modified [Tue, 19 Apr 2022 10:25:02 GMT]
Etag ["d9da3800-5dcff4a2cb483"]
Accept-Ranges [bytes]
Content-Length [3654957056]
Content-Type [application/x-iso9660-image]

这个head line很好理解,Date是时间,Server表示这个报文是一台Apache WEB服务器产生的。Last-Modified表示对象创建或者最后修改的日期和时间。Etag 是URL的Entity Tag,用于标示URL对象是否改变,这在断点下载时比较有用。

Accept-Ranges的值用于定义范围请求的单位,这里就表示范围请求的单位是 字节。Content-Length是响应中Body(实际上应该叫实体体entity body)的长度,即要下载的文件的大小,这里单位是字节,换算后约为3.4GB。Content-Type是内容类型。

文件分片

http的响应中有个关键信息就是长度,将其取出作为文件大小。

length, _ := strconv.Atoi(resp.Header.Get("Content-Length"))

已知文件大小后将文件进行分片,分给不同的协程下载,片段数就等于协程数(暂时是这么想的,但或许还存在缺陷)

size = int(math.Ceil(float64(length) / float64(amount)))

每片的大小是总文件大小除以协程的个数,math.Ceil的作用是向上取整。

那么如何实现只下载文件的一部分?首先必须目标服务器支持这样的功能,可以通过响应头中的Accept-Ranges来判断,如果有Accept-Ranges:Bytes,那么则证明目标服务器支持并发的下载。

并发下载

负责下载的功能单独写到一个函数中,作为goroutine函数,一个文件的片段由一个协程去负责下载,有多少个片段就调起多少个协程。

在发起HTTP请求并获得响应之前,先设置一下请求头Request Header

通过配置Range参数 可以指定请求数据的第一个字节的位置和最后一个字节的位置,格式为:Range:(unit=first byte pos)-[last byte pos]

例如Range: bytes=500-999就代表第500-999字节的内容

那么继续按照上面的逻辑,如果协程读到了0,那么就意味着这个协程要下载第1个片段,这个range是0~size。

如果是1,则这个range是size+1~2*size,那么n(n >= 1)就对应了n*size+1~size*(n+1),计算出这个开始字节和结束字节,拼接Range参数,如下是对Header的设置

req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))

下载下来的片段被保存为一个临时的文件,将序号写在文件名中,待全部下载好后再组合为一个完整的文件。

之后就可以使用Do函数发起请求,并且得到响应,文件内容就在响应的Body中,为字节类型的数据,使用io.Copy函数进行读取

大概的工作流程:

    向队列读取一个整数

    计算Range,发送请求

    读取Body并保存

合并文件

下载好所有的文件片段后就要将它们组合,按照文件名中序号的顺序拼接为一个完整的文件。传入路径名,使用os.Open打开文件,返回*os.File类型的变量

func Copy(dst Writer, src Reader) (written int64, err error)

这里使用io.Copy函数,直接传入*os.File类型的参数即可,因为它们都实现了io.Reader和io.Writer接口

代码初步实现

现在按照上面的逻辑,初步实现一个雏形。

首先定义一系列全局变量

var (
amount int // 协程数目
url string // 下载链接
path string // 保存路径
length int // 文件长度
size int // 片段大小
finish chan int // 完成队列
begin time.Time // 起始时间
over time.Time // 完成时间
concurrent bool // 是否并发
)

从命令行获取参数同时生成一些帮助信息,这里使用flag包

// 解析命令行参数
func init() {
flag.IntVar(&amount, "n", 5, "The number of coroutines")
flag.StringVar(&url, "u", "", "Target URL")
flag.StringVar(&path, "p", "", "The path where the file is saved")
flag.Parse()
if path == "" {
path, _ = filepath.Split(url)
}
}

init函数会最先被执行,用于初始化

如下函数用于做下载之前的准备,首先向服务器发送一个head请求,再根据响应报文判断是否支持断点续传。如果支持并且用户指定了多个协程,就可以使用并发下载,其余情况使用单线程下载。同时该程序也要获取文件的大小并计算出文件片段大小(并发下载才要知道片段大小)。

// PrepareDownload 准备下载
func PrepareDownload() {
// 创建head请求
req, err := http.NewRequest(http.MethodHead, url, nil)
if err != nil {
log.Fatalln(err)
}
// 发送Head请求
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatalln(err)
}
if resp.Header.Get("Accept-Ranges") == "bytes" && amount > 1 {
fmt.Println("Use multi process Download")
concurrent = true
} else {
fmt.Println("Using single process Download")
concurrent = false
amount = 1
}
// 获取文件长度
length, _ = strconv.Atoi(resp.Header.Get("Content-Length"))
size = int(math.Ceil(float64(length) / float64(amount))) // 文件片段长度
fmt.Printf("length of file: %d bytes\n", length)
fmt.Printf("%d goroutines start download\nfragment size: %d bytes\n", amount, size)
}

RunDownloader,该函数首先创建一个缓冲通道,相当于一个队列,用于接收已经完成下载的片段的序号,之后判断是否使用并发。如果使用并发,则开启一个循环,不断amount个协程,每个协程负责下载一个文件片段。

// RunDownloader 开启下载
func RunDownloader() {
finish = make(chan int, amount)
begin = time.Now()
if concurrent {
var start = 0
for i := 0; i < amount; i++ {
var end int
if i == amount-1 {
end = length
} else {
end = start + size
}
go MutiDownload(i, start, end) // 开启协程
start += size + 1
}
} else {
SingleDownload() // 单线程下载
}
}

多协程下载,该函数首先创建一个GET请求,同时在请求头中设置Range参数,使得能够获取文件片段,之后从响应报文的body中取出数据并保存为文件即可。当下载完成后会向完成通道里发送一个整数。

// MutiDownload 多协程下载
func MutiDownload(cor, start, end int) {
StartTime := time.Now()
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatal(err)
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
log.Printf("coroutine[%d] start download(range bytes: %d-%d)", cor, start, end)
// 发起HTTP GET请求
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatalln(err)
}
// 临时文件名
temp := fmt.Sprintf("%s.%d", path, cor)
file, err := os.Create(temp)
if err != nil {
file.Close()
resp.Body.Close()
log.Fatalf("coroutine[%d] cannot create %s %v\n", cor, temp, err)
}
// 写入文件
n, err := io.Copy(file, resp.Body)
if err != nil {
file.Close()
resp.Body.Close()
log.Fatalf("coroutine[%d] cannot write %s %v\n", cor, temp, err)
} EndTime := time.Now() // 结束时间
duration := EndTime.Sub(StartTime).Seconds() // 用时
speed := float64(n) / duration / 1024 / 1024 // 计算速度
log.Printf("coroutine[%d] download successfully: %f MB/s\n", cor, speed)
file.Close()
resp.Body.Close()
finish <- cor
}

单线程下载,在请求头中不设Range参数即可

// SingleDownload 单点下载
func SingleDownload() {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatal(err)
}
// 发起HTTP GET请求
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatalln(err)
}
// 临时文件名
temp := fmt.Sprintf("%s.0", path)
file, err := os.Create(temp)
if err != nil {
file.Close()
resp.Body.Close()
log.Fatalf("Cannot create %s %v\n", temp, err)
}
// 写入文件
_, err = io.Copy(file, resp.Body)
if err != nil {
file.Close()
resp.Body.Close()
log.Fatalf("Cannot write %s %v\n", temp, err)
}
finish <- 0
}

等待下载,就是等finish通道不被阻塞,当不再被阻塞时就代表下载已经都结束了,这时计算一些数值。

// WaitDownloader 等待下载结束
func WaitDownloader() {
for i := 0; i < amount; i++ {
<-finish
}
over = time.Now()
duration := over.Sub(begin).Seconds() // 用时
speed := float64(length) / duration / 1024 / 1024 // 计算速度
fmt.Printf("The download is complete: %f MB/s\n", speed)
}

合并文件片段,使用io.Copy进行数据复制

// MergeFragments 合并文件
func MergeFragments() {
// 创建文件
dest, err := os.Create(path)
if err != nil {
dest.Close()
log.Fatalf("Create file %s %v\n", path, err)
} defer dest.Close()
// 合并文件
fmt.Println("Merging files...")
for i := 0; i < amount; i++ {
temp := fmt.Sprintf("%s.%d", path, i)
fmt.Printf("reading file %s\n", temp)
source, err := os.Open(temp)
if err != nil {
log.Fatalln("reading file error:", err)
}
_, err = io.Copy(dest, source)
if err != nil {
log.Fatalln(err)
}
source.Close()
os.Remove(temp)
}
fmt.Println("Merge file complete")
}

完整代码

package main

import (
"flag"
"fmt"
"io"
"log"
"math"
"net/http"
"os"
"path/filepath"
"strconv"
"time"
) var (
amount int // 协程数目
url string // 下载链接
path string // 保存路径
length int // 文件长度
size int // 片段大小
finish chan int // 完成队列
begin time.Time // 起始时间
over time.Time // 完成时间
concurrent bool // 是否并发
) // 解析命令行参数
func init() {
flag.IntVar(&amount, "n", 5, "The number of coroutines")
flag.StringVar(&url, "u", "", "Target URL")
flag.StringVar(&path, "p", "", "The path where the file is saved")
flag.Parse()
if path == "" {
path, _ = filepath.Split(url)
}
} func main() {
// 获取文件大小
PrepareDownload()
// 存放已经完成的任务
RunDownloader()
// 下载完成之前 堵塞主线程
WaitDownloader()
// 合并文件
MergeFragments()
} // MutiDownload 多协程下载
func MutiDownload(cor, start, end int) {
StartTime := time.Now()
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatal(err)
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
log.Printf("coroutine[%d] start download(range bytes: %d-%d)", cor, start, end)
// 发起HTTP GET请求
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatalln(err)
}
// 临时文件名
temp := fmt.Sprintf("%s.%d", path, cor)
file, err := os.Create(temp)
if err != nil {
file.Close()
resp.Body.Close()
log.Fatalf("coroutine[%d] cannot create %s %v\n", cor, temp, err)
}
// 写入文件
n, err := io.Copy(file, resp.Body)
if err != nil {
file.Close()
resp.Body.Close()
log.Fatalf("coroutine[%d] cannot write %s %v\n", cor, temp, err)
} EndTime := time.Now() // 结束时间
duration := EndTime.Sub(StartTime).Seconds() // 用时
speed := float64(n) / duration / 1024 / 1024 // 计算速度
log.Printf("coroutine[%d] download successfully: %f MB/s\n", cor, speed)
file.Close()
resp.Body.Close()
finish <- cor
} // SingleDownload 单点下载
func SingleDownload() {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatal(err)
}
// 发起HTTP GET请求
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatalln(err)
}
// 临时文件名
temp := fmt.Sprintf("%s.0", path)
file, err := os.Create(temp)
if err != nil {
file.Close()
resp.Body.Close()
log.Fatalf("Cannot create %s %v\n", temp, err)
}
// 写入文件
_, err = io.Copy(file, resp.Body)
if err != nil {
file.Close()
resp.Body.Close()
log.Fatalf("Cannot write %s %v\n", temp, err)
}
finish <- 0
} // PrepareDownload 准备下载
func PrepareDownload() {
// 创建head请求
req, err := http.NewRequest(http.MethodHead, url, nil)
if err != nil {
log.Fatalln(err)
}
// 发送Head请求
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatalln(err)
}
if resp.Header.Get("Accept-Ranges") == "bytes" && amount > 1 {
fmt.Println("Use multi process Download")
concurrent = true
} else {
fmt.Println("Using single process Download")
concurrent = false
amount = 1
}
// 获取文件长度
length, _ = strconv.Atoi(resp.Header.Get("Content-Length"))
size = int(math.Ceil(float64(length) / float64(amount))) // 文件片段长度
fmt.Printf("length of file: %d bytes\n", length)
fmt.Printf("%d goroutines start download\nfragment size: %d bytes\n", amount, size)
} // RunDownloader 开启下载
func RunDownloader() {
finish = make(chan int, amount)
begin = time.Now() // 总起始时间
if concurrent {
var start = 0
for i := 0; i < amount; i++ {
var end int
if i == amount-1 {
end = length
} else {
end = start + size
}
go MutiDownload(i, start, end) // 开启协程
start += size + 1
}
} else {
SingleDownload()
}
} // WaitDownloader 等待下载结束
func WaitDownloader() {
for i := 0; i < amount; i++ {
<-finish
}
over = time.Now()
duration := over.Sub(begin).Seconds() // 用时
speed := float64(length) / duration / 1024 / 1024 // 计算速度
fmt.Printf("The download is complete: %f MB/s\n", speed)
} // MergeFragments 合并文件
func MergeFragments() {
// 创建文件
dest, err := os.Create(path)
if err != nil {
dest.Close()
log.Fatalf("Create file %s %v\n", path, err)
} defer dest.Close()
// 合并文件
fmt.Println("Merging files...")
for i := 0; i < amount; i++ {
temp := fmt.Sprintf("%s.%d", path, i)
fmt.Printf("reading file %s\n", temp)
source, err := os.Open(temp)
if err != nil {
log.Fatalln("reading file error:", err)
}
_, err = io.Copy(dest, source)
if err != nil {
log.Fatalln(err)
}
source.Close()
os.Remove(temp)
}
fmt.Println("Merge file complete")
}

第一次运行测试

打印help信息

$ ./main -help
Usage of ./main:
-n int
The number of coroutines (default 5)
-p string
The path where the file is saved
-u string
Target URL

下载ubuntu2022的镜像文件

$ ./main -u https://releases.ubuntu.com/22.04/ubuntu-22.04-desktop-amd64.iso -n 5 -p ubuntu.iso
Use multi process Download
length of file: 3654957056 bytes
5 goroutines start download
fragment size: 730991412 bytes
2022/05/21 18:50:38 coroutine[4] start download(range bytes: 2923965652-3654957056)
2022/05/21 18:50:38 coroutine[1] start download(range bytes: 730991413-1461982825)
2022/05/21 18:50:38 coroutine[3] start download(range bytes: 2192974239-2923965651)
2022/05/21 18:50:38 coroutine[0] start download(range bytes: 0-730991412)
2022/05/21 18:50:38 coroutine[2] start download(range bytes: 1461982826-2192974238)

刚开始下载,协程会输出各自的下载范围

接下来当有协程完成下载时,就会输出对应信息。全部下载好后,就会开始合并文件

2022/05/21 19:02:23 coroutine[2] download successfully: 0.988429 MB/s
2022/05/21 19:03:42 coroutine[3] download successfully: 0.889333 MB/s
2022/05/21 19:04:05 coroutine[0] download successfully: 0.863870 MB/s
2022/05/21 19:04:44 coroutine[1] download successfully: 0.824247 MB/s
2022/05/21 19:08:05 coroutine[4] download successfully: 0.665922 MB/s
The download is complete: 3.329608 MB/s
Merging files...
reading file ubuntu.iso.0
reading file ubuntu.iso.1
reading file ubuntu.iso.2
reading file ubuntu.iso.3
reading file ubuntu.iso.4
Merge file complete

下载完成后,使用cmp命令比较两个文件的差异,借助cmp和原版文件比一下

$ cmp ubuntu-22.04-desktop-amd64.iso ubuntu.iso

没有回显,说明这两个文件完全一样,这证明了下载文件的完整性。事实上,完整性是最重要的。在当前机器上,上述下载过程的平均速度大约在3.3MB/S左右。相比于浏览器,速度似乎是有提升的...

但提升并不明显,并且初步实现的代码还比较粗糙,至少有如下几个问题:

    没有面向对象思想

    没有对协程进行管理

    从暂停的下载中如何续传

    无法跟踪下载进度和速度

    下载中断时要如何处理

(正在改良,未完待续)

下载恢复

设想一下,当有一个协程下载一个片段失败时,要如何恢复对这个片段的下载。

目前一种想法时,当一个协程下载失败时,则舍弃原先该协程下载好的数据,重新下载这个片段。这非常容易实现,只要重新发起请求即可。

于是修改了下载函数

// MutiDownload 多协程下载
func MutiDownload(cor, start, end int) {
for {
StartTime := time.Now()
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Fatal(err)
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
log.Printf("coroutine[%d] start download(range bytes: %d-%d)", cor, start, end)
// 发起HTTP GET请求
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("coroutine[%d] request error %v", cor, err)
log.Printf("coroutine[%d] try downloading again\n", cor)
continue
}
// 临时文件名
temp := fmt.Sprintf("%s.%d", path, cor)
file, err := os.Create(temp)
if err != nil {
file.Close()
resp.Body.Close()
log.Printf("coroutine[%d] cannot create %s %v\n", cor, temp, err)
log.Printf("coroutine[%d] try downloading again\n", cor)
continue
}
// 写入文件
n, err := io.Copy(file, resp.Body)
if err != nil {
file.Close()
resp.Body.Close()
log.Printf("coroutine[%d] cannot write %s %v\n", cor, temp, err)
log.Printf("coroutine[%d] try downloading again\n", cor)
continue
} EndTime := time.Now() // 结束时间
duration := EndTime.Sub(StartTime).Seconds() // 用时
speed := float64(n) / duration / 1024 / 1024 // 计算速度
log.Printf("coroutine[%d] download successfully: %f MB/s\n", cor, speed)
file.Close()
resp.Body.Close()
finish <- cor
break
}
}

写在一个循环中,如果出错就continue到下一次。

但这并没有体现续传的优点,理想情况下应该是不要删除原来下载好的部分,而是接在后面继续下载。只要根据已经下载好的文件片段的字节数计算start即可。

Go语言实现协程下载器的相关教程结束。

《Go语言实现协程下载器.doc》

下载本文的Word格式文档,以方便收藏与打印。