From b0877d1f86625b3118fed0ec1d9938a2c0fce079 Mon Sep 17 00:00:00 2001 From: jiangyong27 Date: Thu, 25 Jan 2024 14:59:48 +0800 Subject: [PATCH] storeage --- go.mod | 5 ++ storage/minio.go | 116 ++++++++++++++++++++++++ storage/qiniu.go | 213 +++++++++++++++++++++++++++++++++++++++++++++ storage/storage.go | 20 +++++ storage/util.go | 56 ++++++++++++ 5 files changed, 410 insertions(+) create mode 100644 storage/minio.go create mode 100644 storage/qiniu.go create mode 100644 storage/storage.go create mode 100644 storage/util.go diff --git a/go.mod b/go.mod index be12bb4..175d4d5 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,8 @@ require ( github.com/gin-gonic/gin v1.9.1 github.com/gomodule/redigo v1.8.9 github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c + github.com/minio/minio-go v6.0.14+incompatible + github.com/qiniu/go-sdk/v7 v7.19.0 github.com/sirupsen/logrus v1.9.3 github.com/smbrave/goutil v0.0.0-20240105134047-64fe0dfafba2 github.com/spf13/cast v1.5.0 @@ -20,6 +22,7 @@ require ( github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-ini/ini v1.67.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.14.0 // indirect @@ -30,6 +33,7 @@ require ( github.com/klauspost/cpuid/v2 v2.2.4 // indirect github.com/leodido/go-urn v1.2.4 // indirect github.com/mattn/go-isatty v0.0.19 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/pelletier/go-toml/v2 v2.0.8 // indirect @@ -37,6 +41,7 @@ require ( github.com/ugorji/go/codec v1.2.11 // indirect golang.org/x/arch v0.3.0 // indirect golang.org/x/net v0.10.0 // indirect + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect golang.org/x/sys v0.8.0 // indirect golang.org/x/text v0.9.0 // indirect google.golang.org/protobuf v1.30.0 // indirect diff --git a/storage/minio.go b/storage/minio.go new file mode 100644 index 0000000..7c947e4 --- /dev/null +++ b/storage/minio.go @@ -0,0 +1,116 @@ +package storage + +import ( + "context" + "fmt" + "github.com/minio/minio-go" + "path" + "strings" + "time" + + log "github.com/sirupsen/logrus" +) + +type MinioConfig struct { + Bucket string + BaseUrl string + Endpoint string + AccessKey string + SecretKey string + Timeout time.Duration +} + +type Minio struct { + inited bool + config *MinioConfig + client *minio.Client +} + +func NewMinio(cfg *MinioConfig) Storage { + return &Minio{ + config: cfg, + inited: false, + } +} + +func (s *Minio) Init() error { + if s.inited { + return nil + } + + c, err := minio.New(s.config.Endpoint, s.config.AccessKey, s.config.SecretKey, false) + if err != nil { + return err + } + s.client = c + s.inited = true + return nil +} + +func (s *Minio) Put(fileName, objectName string, onProcess func(fsize, uploaded int64)) error { + if err := s.Init(); err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), s.config.Timeout) + defer cancel() + + objectName = strings.TrimLeft(objectName, "/ ") + ext := strings.TrimLeft(path.Ext(fileName), ".") + + _, err := s.client.FPutObjectWithContext(ctx, s.config.Bucket, objectName, fileName, + minio.PutObjectOptions{ContentType: ext2ContentType(ext)}) + if err != nil { + log.Errorf("upload file to minio error:%s", err.Error()) + return err + } + + return nil +} + +func (s *Minio) Del(objectName string) error { + if err := s.Init(); err != nil { + return err + } + + err := s.client.RemoveObject(s.config.Bucket, objectName) + if err != nil { + log.Errorf("delete file to minio error:%s", err.Error()) + return err + } + return nil +} + +func (s *Minio) Get(objectName, fileName string) error { + if err := s.Init(); err != nil { + return err + } + + return s.client.FGetObject(s.config.Bucket, objectName, fileName, minio.GetObjectOptions{}) +} + +func (s *Minio) List(objectPrefix string) ([]string, error) { + if err := s.Init(); err != nil { + return nil, err + } + + done := make(chan struct{}) + defer close(done) + infos := s.client.ListObjectsV2(s.config.Bucket, objectPrefix, true, done) + result := make([]string, 0) + for info := range infos { + result = append(result, info.Key) + } + return result, nil +} + +func (s *Minio) Url(objectName string, expire time.Duration) string { + return fmt.Sprintf("%s/%s/%s", s.config.BaseUrl, s.config.Bucket, objectName) +} + +func (s *Minio) Stat(objectName string) (*ObjectInfo, error) { + return nil, nil +} + +func (s *Minio) Fetch(url, objectName string) error { + return nil +} diff --git a/storage/qiniu.go b/storage/qiniu.go new file mode 100644 index 0000000..da6a763 --- /dev/null +++ b/storage/qiniu.go @@ -0,0 +1,213 @@ +package storage + +import ( + "context" + "errors" + "fmt" + "github.com/qiniu/go-sdk/v7/auth/qbox" + "github.com/qiniu/go-sdk/v7/storage" + "os" + "time" +) + +type QiniuConfig struct { + Bucket string + AK string + SK string + Timeout time.Duration +} + +type Qiniu struct { + config *QiniuConfig +} + +func NewQiniu(cfg *QiniuConfig) Storage { + + return &Qiniu{ + config: cfg, + } +} + +func (s *Qiniu) Put(fileName, objectName string, onProcess func(fsize, uploaded int64)) error { + mac := qbox.NewMac(s.config.AK, s.config.SK) + putPolicy := storage.PutPolicy{ + Scope: s.config.Bucket, + Expires: uint64(time.Now().Add(12 * time.Hour).Unix()), + } + upToken := putPolicy.UploadToken(mac) + cfg := storage.Config{} + cfg.UseHTTPS = false + cfg.UseCdnDomains = false + + // 构建表单上传的对象 + formUploader := storage.NewFormUploader(&cfg) + ret := storage.PutRet{} + + // 可选配置 + putExtra := storage.PutExtra{ + Params: map[string]string{}, + OnProgress: onProcess, + } + err := formUploader.PutFile(context.Background(), &ret, upToken, objectName, fileName, &putExtra) + if err != nil { + return err + } + return nil +} + +func (s *Qiniu) PutResume(fileName, objectName string, onProcess func(fsize, uploaded int64)) error { + mac := qbox.NewMac(s.config.AK, s.config.SK) + putPolicy := storage.PutPolicy{ + Scope: s.config.Bucket, + } + upToken := putPolicy.UploadToken(mac) + cfg := storage.Config{} + cfg.UseHTTPS = false + cfg.UseCdnDomains = false + + file, err := os.Open(fileName) + if err != nil { + return err + } + + fileInfo, _ := file.Stat() + fileSize := fileInfo.Size() + + resumeUploader := storage.NewResumeUploaderV2(&cfg) + ret := storage.PutRet{} + + recorder, err := storage.NewFileRecorder(os.TempDir()) + if err != nil { + return err + } + + partSize := int64(1024 * 1024 * 1) + partUploaded := int64(0) + putExtra := storage.RputV2Extra{ + Recorder: recorder, + Notify: func(partNumber int64, ret *storage.UploadPartsRet) { + partUploaded += 1 + onProcess(fileSize, partUploaded*partSize) + }, + PartSize: partSize, + } + err = resumeUploader.PutFile(context.Background(), &ret, upToken, objectName, fileName, &putExtra) + if err != nil { + return err + } + return nil +} + +func (s *Qiniu) Del(objectName string) error { + mac := qbox.NewMac(s.config.AK, s.config.SK) + cfg := storage.Config{ + UseHTTPS: false, + } + bucketManager := storage.NewBucketManager(mac, &cfg) + return bucketManager.Delete(s.config.Bucket, objectName) +} + +func (s *Qiniu) List(objectPrefix string) ([]string, error) { + mac := qbox.NewMac(s.config.AK, s.config.SK) + cfg := storage.Config{ + UseHTTPS: false, + } + result := make([]string, 0) + marker := "" + bucketManager := storage.NewBucketManager(mac, &cfg) + for { + entries, _, nextMarker, hasNext, err := bucketManager.ListFiles(s.config.Bucket, objectPrefix, "", marker, 1000) + if err != nil { + return nil, err + } + for _, entry := range entries { + result = append(result, entry.Key) + } + if hasNext { + marker = nextMarker + } else { + break + } + } + + return result, nil +} + +func (s *Qiniu) Get(objectName, fileName string) error { + mac := qbox.NewMac(s.config.AK, s.config.SK) + cfg := storage.Config{ + UseHTTPS: false, + } + + bucketManager := storage.NewBucketManager(mac, &cfg) + + domains, err := bucketManager.ListBucketDomains(s.config.Bucket) + if err != nil { + return err + } + if len(domains) <= 0 { + return errors.New("bucket no domain") + } + url := fmt.Sprintf("http://%s/%s", domains[0].Domain, objectName) + return Download(url, fileName) +} + +func (s *Qiniu) Url(objectName string, expire time.Duration) string { + mac := qbox.NewMac(s.config.AK, s.config.SK) + cfg := storage.Config{ + UseHTTPS: false, + } + + bucketManager := storage.NewBucketManager(mac, &cfg) + domains, err := bucketManager.ListBucketDomains(s.config.Bucket) + if err != nil { + return "" + } + if len(domains) <= 0 { + return "" + } + + domain := "http://" + domains[0].Domain + if expire == 0 { + return storage.MakePublicURLv2(domain, objectName) + } else { + deadline := time.Now().Add(expire).Unix() + return storage.MakePrivateURLv2(mac, domain, objectName, deadline) + } +} + +func (s *Qiniu) Stat(objectName string) (*ObjectInfo, error) { + mac := qbox.NewMac(s.config.AK, s.config.SK) + cfg := storage.Config{ + UseHTTPS: false, + } + + bucketManager := storage.NewBucketManager(mac, &cfg) + fileInfo, err := bucketManager.Stat(s.config.Bucket, objectName) + if err != nil { + return nil, err + } + + info := new(ObjectInfo) + info.Size = fileInfo.Fsize + info.Hash = fileInfo.Hash + info.MimeType = fileInfo.MimeType + info.PutTime = fileInfo.PutTime + return info, nil +} + +func (s *Qiniu) Fetch(url, objectName string) error { + mac := qbox.NewMac(s.config.AK, s.config.SK) + cfg := storage.Config{ + UseHTTPS: false, + } + + bucketManager := storage.NewBucketManager(mac, &cfg) + + _, err := bucketManager.Fetch(url, s.config.Bucket, objectName) + if err != nil { + return err + } + + return nil +} diff --git a/storage/storage.go b/storage/storage.go new file mode 100644 index 0000000..cf5e329 --- /dev/null +++ b/storage/storage.go @@ -0,0 +1,20 @@ +package storage + +import "time" + +type ObjectInfo struct { + Size int64 + MimeType string + Hash string + PutTime int64 +} + +type Storage interface { + Put(fileName, objectName string, onProcess func(fsize, uploaded int64)) error + Get(objectName, fileName string) error + Del(objectName string) error + Url(objectName string, expire time.Duration) string + Stat(objectName string) (*ObjectInfo, error) + List(objectPrefix string) ([]string, error) + Fetch(url, objectName string) error +} diff --git a/storage/util.go b/storage/util.go new file mode 100644 index 0000000..de9ad2c --- /dev/null +++ b/storage/util.go @@ -0,0 +1,56 @@ +package storage + +import ( + "io" + "net/http" + "os" + "strconv" + "strings" +) + +func contentType2Ext(contentType string) string { + if strings.Contains(contentType, "image/jpeg") { + return "jpg" + } else if strings.Contains(contentType, "audio/mpeg") { + return "mp3" + } else if strings.Contains(contentType, "video/mp4") { + return "mp4" + } + return "" +} + +func ext2ContentType(ext string) string { + ext = strings.ToLower(ext) + if ext == "jpg" || ext == "jpeg" { + return "image/jpeg" + } else if ext == "png" { + return "image/png" + } else if ext == "mp3" { + return "audio/mpeg" + } else if ext == "mp4" { + return "video/mp4" + } + return "" +} + +func Download(url, path string) error { + f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + stat, err := f.Stat() + if err != nil { + return err + } + defer f.Close() + + req, _ := http.NewRequest("GET", url, nil) + req.Header.Set("Range", "bytes="+strconv.FormatInt(stat.Size(), 10)+"-") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + _, err = io.Copy(f, resp.Body) + if err != nil { + return err + } + + return nil +}