最近有一个需要实现下载功能:
- 从服务器上读取文件,返回一个ReadCloser
- 在用户磁盘上创建文件,通过io.Copy实现文件下载(io.Copy是流式的操作,不会出现因文件过大而内存暴涨的问题)
- 通过context实现暂停
这里拷贝文件我们选择的是io.Copy而非是通过ioutil.ReadAll()将body中返回的数据一次性读取到内存
通过io.Copy可以保证内存占用一直处于一个比较稳定的水平
通过封装io.Copy实现
- 将io.Copy封装为一个方法,方法里传入context,外部通过context.WithCancel()控制流式拷贝的暂停
这里演示我通过读取S3的一个对象下载到本地
/*
通过io.Copy实现可中断的流复制
*/
var (
ak = "99999999999999999999"
sk = "9999999999999999999999999999999999999999"
endpoint = "http://xx.xx.xx.xx:8060"
bucket = "test-bucket"
key = "d_xp/2G/2G.txt"
)
func main() {
s3Client := osg.Client.GetS3Client(ak, sk, endpoint)
ctx, cancelFunc := context.WithCancel(context.Background())
object, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
go func() {
time.Sleep(time.Second * 10)
cancelFunc()
log.Infof("canceled...")
}()
if err != nil {
log.Errorf("%v", err)
return
}
body := object.Body
defer body.Close()
file, err := os.Create("/Users/ziyi/GolandProjects/MyTest/demo_home/io_demo/target.txt")
if err != nil {
log.Errorf("%v", err)
return
}
defer file.Close()
_, err = FileService.Copy(ctx, file, body)
if err != nil {
log.Errorf("%v", err)
return
}
}
type fileService struct {
sem *semaphore.Weighted
}
var FileService = &fileService{
sem: semaphore.NewWeighted(1),
}
type IoCopyCancelledErr struct {
errMsg string
}
func (e *IoCopyCancelledErr) Error() string {
return fmt.Sprintf("io copy error, %s", e.errMsg)
}
func NewIoCopyCancelledErr(msg string) *IoCopyCancelledErr {
return &IoCopyCancelledErr{
errMsg: msg,
}
}
type readerFunc func(p []byte) (n int, err error)
func (rf readerFunc) Read(p []byte) (n int, err error) { return rf(p) }
//通过ctx实现可中断的流拷贝
// Copy closable copy
func (s *fileService) Copy(ctx context.Context, dst io.Writer, src io.Reader) (int64, error) {
// Copy will call the Reader and Writer interface multiple time, in order
// to copy by chunk (avoiding loading the whole file in memory).
// I insert the ability to cancel before read time as it is the earliest
// possible in the call process.
size, err := io.Copy(dst, readerFunc(func(p []byte) (int, error) {
select {
// if context has been canceled
case <-ctx.Done():
// stop process and propagate "context canceled" error
return 0, NewIoCopyCancelledErr(ctx.Err().Error())
default:
// otherwise just run default io.Reader implementation
return src.Read(p)
}
}))
return size, err
}