Go进阶29:高效io.Reader/io.Writer S3数据传输
这里有一个需求就是把云储存(AWS S3)的很多小文件压缩成一个大的压缩文件. 我们脑海第一个想到的办法就是下载全部的小文件,使用压缩软件创建压缩包,在上传到S3中, 当然这个方案时可行的,但是我们这里有一个更好的方法使用Go语言的标准库io.reader/io.writer.
1. “直觉”的方案
首先定义 S3的文件管理上传下载结构
var (
downloader *s3manager.Downloader
uploader *s3manager.Uploader
)
从S3下载小文件保存到小文件,创建zip压缩包,写入小文件的内容到压缩包,上传zip压缩文件的内容到S3.
func zipS3Files(in []*s3.GetObjectInput, result *s3manager.UploadInput) error {
filesToZip := make([]string, 0, len(in))
// 下载小文件到本地磁盘
for _, file := range in {
pathToFile := os.TempDir() + "/" + path.Base(*file.Key)
f, _:= os.Create(pathToFile)
downloader.Download(f, file)
f.Close()
filesToZip = append(filesToZip, pathToFile)
}
// 创建零时zip压缩文件
zipFile := os.TempDir() + "/" + path.Base(*result.Key)
f, _:= os.Create(zipFile)
defer f.Close()
zipWriter := zip.NewWriter(f)
for _, file := range filesToZip {
// 在zip 压缩包创建文件
w, _:= zipWriter.Create(file)
// 打开下载好的本地小文件
inFile, _:= os.Open(file)
// 拷贝小文件的内容到压缩包
io.Copy(w, inFile)
inFile.Close()
}
zipWriter.Close()
// seek 移动读写游标
f.Seek(0, 0)
// 上传zip文件内容
result.Body = f
_, err = uploader.Upload(result)
return err
}
这个方法看起来时非常简单明了,但是我们可以改进它,我们没有必要把小文件和zip压缩包文件写到磁盘上. 如果我们想在AWS Lambda 中运行代码,但是它的存储/temp空间限制512MB. 我们不适用硬盘.我们将使用内存,AWS Lambda中最大内存使用数量高大3GB.
2. Stream流解决方案
我们可以创建一个管道Pipe,它将数据从S3 Bucket 经过 zip.Writer 压缩在送回S3 bucket里面. 这个过程不涉及硬盘.使用简单易用的接口io.Reader和io.Writer就可以实现以上的功能.流程图如下:
首先我们创建一个管道Pipe来传送小文件到 zip.Writer
在到S3 bucket uploader
, pr 代表pipe reader, pw 代表 pipe writer.
pr, pw := io.Pipe()
然后使用pipe writer作为参数pr
创建 zip.Writer
.
zipWriter := zip.NewWriter(pw)
任何内容写入都通过pip writer pw
传输.
现在我们遍历每个文件并在其中创建一个 writer zip.Writer
for _, file := range in {
w, _ := zipWriter.Create(path.Base(*file.Key))
现在我们来看一下S3下载签名文档:
func (d Downloader) Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(*Downloader)) (n int64, err error)
Download方法需要io.WriterAt,os.File满足此接口,但zipWriter.Create返回io.Writer. AWS SDK 使用io.WriterAt用于并发下载.我们可以通过以下设置禁用此并发下载功能.
downloader.Concurrency = 1
我们创建自己的struct,该struct将提供该方法 WriteAt
,使其满足接口要求io.WriterAt.它将忽略offset,
因此其工作方式类似于io.Writer.
io.WriterAt由于并发下载,AWS SDK之所以使用,是因为它可以在偏移位置写入(例如,在文件中间).
通过禁用文件并发下载,我们可以安全地忽略offset参数,因为它将按顺序下载的.参考stackoverflow Buffer implementing io.WriterAt in go
type FakeWriterAt struct {
w io.Writer
}
func (fw FakeWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
// ignore 'offset' because we forced sequential downloads
return fw.w.Write(p)
}
现在我们可以通过将writer包装到我们的FakeWriterAt结构体中来下载文件.
downloader.Download(FakeWriterAt{w}, file)
下载每个文件之后,我们需要调用closer.
zipWriter.Close()
pw.Close()
这样,我们将文件下载到zipWriter中的writer,先对其进行处理,然后再将其写入pipe writer中.
现在我们需要将ZIP上传回S3 Bucket中.我们正在给管道写入数据,不从中读取数据.我们将UploadInput
的body设置为pipe reader.
result.Body = pr
uploader.Upload(result)
最后一步是并行运行下载和上传,当下载完成处理了一些数据块之后,可以立即将其上传到S3 bucket. 我们使用并行执行这两个步骤,go func()..并将其与wait group同步.
这是最终代码:
type FakeWriterAt struct {
w io.Writer
}
func (fw FakeWriterAt) WriteAt(p []byte, offset int64) (n int, err error) {
// ignore 'offset' because we forced sequential downloads
return fw.w.Write(p)
}
func zipS3Files(in []*s3.GetObjectInput, result *s3manager.UploadInput) error {
//设置单线程下载
downloader.Concurrency = 1
// 通过io.Pipe创建管道
pr, pw := io.Pipe()
// 使用pipe writer 参数创建 zip.writer
zipWriter := zip.NewWriter(pw)
wg := sync.WaitGroup{}
// 等待s3 的下载器 和上传器执行完成
wg.Add(2)
// Run 'downloader'
go func() {
// 必须要使用closer 关闭zip.Writer 和 pipe wirite
// zip.Writer closer不会自动的关闭
defer func() {
wg.Done()
zipWriter.Close()
pw.Close()
}()
for _, file := range in {
// 文件内容下载到 zip.Writer的Writer
w, err := zipWriter.Create(path.Base(*file.Key))
if err != nil {
fmt.Println(err)
}
_, err = downloader.Download(FakeWriterAt{w}, file)
if err != nil {
fmt.Println(err)
}
}
}()
go func() {
defer wg.Done()
// 上传压缩文件
// result.Body 从pipe reader 取回内容
result.Body = pr
_, err := uploader.Upload(result)
if err != nil {
fmt.Println(err)
}
}()
wg.Wait()
return nil
}
正如您所见,这里没有做异常处理,error handle 不在本文讨论范围之内.
还有如果downloader.Download失败,我也希望上传失败. 当您要使用时,这是一个很好的用例context.我们可以创建上下文,例如time out.
ctx, cancel := context.WithTimeout(context.Background(), time.Minute * 4)
如果下载失败,我们执行 ctx.cancel(),然后取消上传.
3. 结论
关于时间性能,我尝试处理20MB小文件压缩到5MB的zip文件. 而通过第一个简单的解决方案花费了7s.
使用这个方法花费了5s, 但是不涉及磁盘读写,因此您可以将其用于AWS Lambda,但仍然AWS Lambda只有5分钟执行时间. 最后,我想说的时Go语言标准库时非常强大的.监视简单的interface设计.