mirror of
https://github.com/superseriousbusiness/gotosocial
synced 2025-01-01 23:48:45 +00:00
9d0df426da
* feat: vendor minio client * feat: introduce storage package with s3 support * feat: serve s3 files directly this saves a lot of bandwith as the files are fetched from the object store directly * fix: use explicit local storage in tests * feat: integrate s3 storage with the main server * fix: add s3 config to cli tests * docs: explicitly set values in example config also adds license header to the storage package * fix: use better http status code on s3 redirect HTTP 302 Found is the best fit, as it signifies that the resource requested was found but not under its presumed URL 307/TemporaryRedirect would mean that this resource is usually located here, not in this case 303/SeeOther indicates that the redirection does not link to the requested resource but to another page * refactor: use context in storage driver interface
491 lines
16 KiB
Go
491 lines
16 KiB
Go
/*
|
|
* MinIO Go Library for Amazon S3 Compatible Cloud Storage
|
|
* Copyright 2017 MinIO, Inc.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
package minio
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"sort"
|
|
"strings"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/minio/minio-go/v7/pkg/s3utils"
|
|
)
|
|
|
|
// putObjectMultipartStream - upload a large object using
|
|
// multipart upload and streaming signature for signing payload.
|
|
// Comprehensive put object operation involving multipart uploads.
|
|
//
|
|
// Following code handles these types of readers.
|
|
//
|
|
// - *minio.Object
|
|
// - Any reader which has a method 'ReadAt()'
|
|
//
|
|
func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
|
|
reader io.Reader, size int64, opts PutObjectOptions,
|
|
) (info UploadInfo, err error) {
|
|
if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
|
|
// Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader.
|
|
info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts)
|
|
} else {
|
|
info, err = c.putObjectMultipartStreamOptionalChecksum(ctx, bucketName, objectName, reader, size, opts)
|
|
}
|
|
if err != nil {
|
|
errResp := ToErrorResponse(err)
|
|
// Verify if multipart functionality is not available, if not
|
|
// fall back to single PutObject operation.
|
|
if errResp.Code == "AccessDenied" && strings.Contains(errResp.Message, "Access Denied") {
|
|
// Verify if size of reader is greater than '5GiB'.
|
|
if size > maxSinglePutObjectSize {
|
|
return UploadInfo{}, errEntityTooLarge(size, maxSinglePutObjectSize, bucketName, objectName)
|
|
}
|
|
// Fall back to uploading as single PutObject operation.
|
|
return c.putObject(ctx, bucketName, objectName, reader, size, opts)
|
|
}
|
|
}
|
|
return info, err
|
|
}
|
|
|
|
// uploadedPartRes - the response received from a part upload.
|
|
type uploadedPartRes struct {
|
|
Error error // Any error encountered while uploading the part.
|
|
PartNum int // Number of the part uploaded.
|
|
Size int64 // Size of the part uploaded.
|
|
Part ObjectPart
|
|
}
|
|
|
|
type uploadPartReq struct {
|
|
PartNum int // Number of the part uploaded.
|
|
Part ObjectPart // Size of the part uploaded.
|
|
}
|
|
|
|
// putObjectMultipartFromReadAt - Uploads files bigger than 128MiB.
|
|
// Supports all readers which implements io.ReaderAt interface
|
|
// (ReadAt method).
|
|
//
|
|
// NOTE: This function is meant to be used for all readers which
|
|
// implement io.ReaderAt which allows us for resuming multipart
|
|
// uploads but reading at an offset, which would avoid re-read the
|
|
// data which was already uploaded. Internally this function uses
|
|
// temporary files for staging all the data, these temporary files are
|
|
// cleaned automatically when the caller i.e http client closes the
|
|
// stream after uploading all the contents successfully.
|
|
func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketName, objectName string,
|
|
reader io.ReaderAt, size int64, opts PutObjectOptions,
|
|
) (info UploadInfo, err error) {
|
|
// Input validation.
|
|
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
if err = s3utils.CheckValidObjectName(objectName); err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
|
|
// Calculate the optimal parts info for a given size.
|
|
totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize)
|
|
if err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
|
|
// Initiate a new multipart upload.
|
|
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
|
|
if err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
|
|
// Aborts the multipart upload in progress, if the
|
|
// function returns any error, since we do not resume
|
|
// we should purge the parts which have been uploaded
|
|
// to relinquish storage space.
|
|
defer func() {
|
|
if err != nil {
|
|
c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
|
|
}
|
|
}()
|
|
|
|
// Total data read and written to server. should be equal to 'size' at the end of the call.
|
|
var totalUploadedSize int64
|
|
|
|
// Complete multipart upload.
|
|
var complMultipartUpload completeMultipartUpload
|
|
|
|
// Declare a channel that sends the next part number to be uploaded.
|
|
// Buffered to 10000 because thats the maximum number of parts allowed
|
|
// by S3.
|
|
uploadPartsCh := make(chan uploadPartReq, 10000)
|
|
|
|
// Declare a channel that sends back the response of a part upload.
|
|
// Buffered to 10000 because thats the maximum number of parts allowed
|
|
// by S3.
|
|
uploadedPartsCh := make(chan uploadedPartRes, 10000)
|
|
|
|
// Used for readability, lastPartNumber is always totalPartsCount.
|
|
lastPartNumber := totalPartsCount
|
|
|
|
// Send each part number to the channel to be processed.
|
|
for p := 1; p <= totalPartsCount; p++ {
|
|
uploadPartsCh <- uploadPartReq{PartNum: p}
|
|
}
|
|
close(uploadPartsCh)
|
|
|
|
partsBuf := make([][]byte, opts.getNumThreads())
|
|
for i := range partsBuf {
|
|
partsBuf[i] = make([]byte, 0, partSize)
|
|
}
|
|
|
|
// Receive each part number from the channel allowing three parallel uploads.
|
|
for w := 1; w <= opts.getNumThreads(); w++ {
|
|
go func(w int, partSize int64) {
|
|
// Each worker will draw from the part channel and upload in parallel.
|
|
for uploadReq := range uploadPartsCh {
|
|
|
|
// If partNumber was not uploaded we calculate the missing
|
|
// part offset and size. For all other part numbers we
|
|
// calculate offset based on multiples of partSize.
|
|
readOffset := int64(uploadReq.PartNum-1) * partSize
|
|
|
|
// As a special case if partNumber is lastPartNumber, we
|
|
// calculate the offset based on the last part size.
|
|
if uploadReq.PartNum == lastPartNumber {
|
|
readOffset = (size - lastPartSize)
|
|
partSize = lastPartSize
|
|
}
|
|
|
|
n, rerr := readFull(io.NewSectionReader(reader, readOffset, partSize), partsBuf[w-1][:partSize])
|
|
if rerr != nil && rerr != io.ErrUnexpectedEOF && rerr != io.EOF {
|
|
uploadedPartsCh <- uploadedPartRes{
|
|
Error: rerr,
|
|
}
|
|
// Exit the goroutine.
|
|
return
|
|
}
|
|
|
|
// Get a section reader on a particular offset.
|
|
hookReader := newHook(bytes.NewReader(partsBuf[w-1][:n]), opts.Progress)
|
|
|
|
// Proceed to upload the part.
|
|
objPart, err := c.uploadPart(ctx, bucketName, objectName,
|
|
uploadID, hookReader, uploadReq.PartNum,
|
|
"", "", partSize, opts.ServerSideEncryption)
|
|
if err != nil {
|
|
uploadedPartsCh <- uploadedPartRes{
|
|
Error: err,
|
|
}
|
|
// Exit the goroutine.
|
|
return
|
|
}
|
|
|
|
// Save successfully uploaded part metadata.
|
|
uploadReq.Part = objPart
|
|
|
|
// Send successful part info through the channel.
|
|
uploadedPartsCh <- uploadedPartRes{
|
|
Size: objPart.Size,
|
|
PartNum: uploadReq.PartNum,
|
|
Part: uploadReq.Part,
|
|
}
|
|
}
|
|
}(w, partSize)
|
|
}
|
|
|
|
// Gather the responses as they occur and update any
|
|
// progress bar.
|
|
for u := 1; u <= totalPartsCount; u++ {
|
|
uploadRes := <-uploadedPartsCh
|
|
if uploadRes.Error != nil {
|
|
return UploadInfo{}, uploadRes.Error
|
|
}
|
|
// Update the totalUploadedSize.
|
|
totalUploadedSize += uploadRes.Size
|
|
// Store the parts to be completed in order.
|
|
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
|
|
ETag: uploadRes.Part.ETag,
|
|
PartNumber: uploadRes.Part.PartNumber,
|
|
})
|
|
}
|
|
|
|
// Verify if we uploaded all the data.
|
|
if totalUploadedSize != size {
|
|
return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
|
|
}
|
|
|
|
// Sort all completed parts.
|
|
sort.Sort(completedParts(complMultipartUpload.Parts))
|
|
|
|
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
|
|
if err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
|
|
uploadInfo.Size = totalUploadedSize
|
|
return uploadInfo, nil
|
|
}
|
|
|
|
func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, bucketName, objectName string,
|
|
reader io.Reader, size int64, opts PutObjectOptions,
|
|
) (info UploadInfo, err error) {
|
|
// Input validation.
|
|
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
if err = s3utils.CheckValidObjectName(objectName); err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
|
|
// Calculate the optimal parts info for a given size.
|
|
totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize)
|
|
if err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
// Initiates a new multipart request
|
|
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
|
|
if err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
|
|
// Aborts the multipart upload if the function returns
|
|
// any error, since we do not resume we should purge
|
|
// the parts which have been uploaded to relinquish
|
|
// storage space.
|
|
defer func() {
|
|
if err != nil {
|
|
c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
|
|
}
|
|
}()
|
|
|
|
// Total data read and written to server. should be equal to 'size' at the end of the call.
|
|
var totalUploadedSize int64
|
|
|
|
// Initialize parts uploaded map.
|
|
partsInfo := make(map[int]ObjectPart)
|
|
|
|
// Create a buffer.
|
|
buf := make([]byte, partSize)
|
|
|
|
// Avoid declaring variables in the for loop
|
|
var md5Base64 string
|
|
var hookReader io.Reader
|
|
|
|
// Part number always starts with '1'.
|
|
var partNumber int
|
|
for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
|
|
|
|
// Proceed to upload the part.
|
|
if partNumber == totalPartsCount {
|
|
partSize = lastPartSize
|
|
}
|
|
|
|
if opts.SendContentMd5 {
|
|
length, rerr := readFull(reader, buf)
|
|
if rerr == io.EOF && partNumber > 1 {
|
|
break
|
|
}
|
|
|
|
if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
|
|
return UploadInfo{}, rerr
|
|
}
|
|
|
|
// Calculate md5sum.
|
|
hash := c.md5Hasher()
|
|
hash.Write(buf[:length])
|
|
md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil))
|
|
hash.Close()
|
|
|
|
// Update progress reader appropriately to the latest offset
|
|
// as we read from the source.
|
|
hookReader = newHook(bytes.NewReader(buf[:length]), opts.Progress)
|
|
} else {
|
|
// Update progress reader appropriately to the latest offset
|
|
// as we read from the source.
|
|
hookReader = newHook(reader, opts.Progress)
|
|
}
|
|
|
|
objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID,
|
|
io.LimitReader(hookReader, partSize),
|
|
partNumber, md5Base64, "", partSize, opts.ServerSideEncryption)
|
|
if uerr != nil {
|
|
return UploadInfo{}, uerr
|
|
}
|
|
|
|
// Save successfully uploaded part metadata.
|
|
partsInfo[partNumber] = objPart
|
|
|
|
// Save successfully uploaded size.
|
|
totalUploadedSize += partSize
|
|
}
|
|
|
|
// Verify if we uploaded all the data.
|
|
if size > 0 {
|
|
if totalUploadedSize != size {
|
|
return UploadInfo{}, errUnexpectedEOF(totalUploadedSize, size, bucketName, objectName)
|
|
}
|
|
}
|
|
|
|
// Complete multipart upload.
|
|
var complMultipartUpload completeMultipartUpload
|
|
|
|
// Loop over total uploaded parts to save them in
|
|
// Parts array before completing the multipart request.
|
|
for i := 1; i < partNumber; i++ {
|
|
part, ok := partsInfo[i]
|
|
if !ok {
|
|
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
|
|
}
|
|
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
|
|
ETag: part.ETag,
|
|
PartNumber: part.PartNumber,
|
|
})
|
|
}
|
|
|
|
// Sort all completed parts.
|
|
sort.Sort(completedParts(complMultipartUpload.Parts))
|
|
|
|
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
|
|
if err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
|
|
uploadInfo.Size = totalUploadedSize
|
|
return uploadInfo, nil
|
|
}
|
|
|
|
// putObject special function used Google Cloud Storage. This special function
|
|
// is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
|
|
func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
|
|
// Input validation.
|
|
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
if err := s3utils.CheckValidObjectName(objectName); err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
|
|
// Size -1 is only supported on Google Cloud Storage, we error
|
|
// out in all other situations.
|
|
if size < 0 && !s3utils.IsGoogleEndpoint(*c.endpointURL) {
|
|
return UploadInfo{}, errEntityTooSmall(size, bucketName, objectName)
|
|
}
|
|
|
|
if opts.SendContentMd5 && s3utils.IsGoogleEndpoint(*c.endpointURL) && size < 0 {
|
|
return UploadInfo{}, errInvalidArgument("MD5Sum cannot be calculated with size '-1'")
|
|
}
|
|
|
|
if size > 0 {
|
|
if isReadAt(reader) && !isObject(reader) {
|
|
seeker, ok := reader.(io.Seeker)
|
|
if ok {
|
|
offset, err := seeker.Seek(0, io.SeekCurrent)
|
|
if err != nil {
|
|
return UploadInfo{}, errInvalidArgument(err.Error())
|
|
}
|
|
reader = io.NewSectionReader(reader.(io.ReaderAt), offset, size)
|
|
}
|
|
}
|
|
}
|
|
|
|
var md5Base64 string
|
|
if opts.SendContentMd5 {
|
|
// Create a buffer.
|
|
buf := make([]byte, size)
|
|
|
|
length, rErr := readFull(reader, buf)
|
|
if rErr != nil && rErr != io.ErrUnexpectedEOF && rErr != io.EOF {
|
|
return UploadInfo{}, rErr
|
|
}
|
|
|
|
// Calculate md5sum.
|
|
hash := c.md5Hasher()
|
|
hash.Write(buf[:length])
|
|
md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil))
|
|
reader = bytes.NewReader(buf[:length])
|
|
hash.Close()
|
|
}
|
|
|
|
// Update progress reader appropriately to the latest offset as we
|
|
// read from the source.
|
|
readSeeker := newHook(reader, opts.Progress)
|
|
|
|
// This function does not calculate sha256 and md5sum for payload.
|
|
// Execute put object.
|
|
return c.putObjectDo(ctx, bucketName, objectName, readSeeker, md5Base64, "", size, opts)
|
|
}
|
|
|
|
// putObjectDo - executes the put object http operation.
|
|
// NOTE: You must have WRITE permissions on a bucket to add an object to it.
|
|
func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string, reader io.Reader, md5Base64, sha256Hex string, size int64, opts PutObjectOptions) (UploadInfo, error) {
|
|
// Input validation.
|
|
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
if err := s3utils.CheckValidObjectName(objectName); err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
// Set headers.
|
|
customHeader := opts.Header()
|
|
|
|
// Populate request metadata.
|
|
reqMetadata := requestMetadata{
|
|
bucketName: bucketName,
|
|
objectName: objectName,
|
|
customHeader: customHeader,
|
|
contentBody: reader,
|
|
contentLength: size,
|
|
contentMD5Base64: md5Base64,
|
|
contentSHA256Hex: sha256Hex,
|
|
}
|
|
if opts.Internal.SourceVersionID != "" {
|
|
if opts.Internal.SourceVersionID != nullVersionID {
|
|
if _, err := uuid.Parse(opts.Internal.SourceVersionID); err != nil {
|
|
return UploadInfo{}, errInvalidArgument(err.Error())
|
|
}
|
|
}
|
|
urlValues := make(url.Values)
|
|
urlValues.Set("versionId", opts.Internal.SourceVersionID)
|
|
reqMetadata.queryValues = urlValues
|
|
}
|
|
|
|
// Execute PUT an objectName.
|
|
resp, err := c.executeMethod(ctx, http.MethodPut, reqMetadata)
|
|
defer closeResponse(resp)
|
|
if err != nil {
|
|
return UploadInfo{}, err
|
|
}
|
|
if resp != nil {
|
|
if resp.StatusCode != http.StatusOK {
|
|
return UploadInfo{}, httpRespToErrorResponse(resp, bucketName, objectName)
|
|
}
|
|
}
|
|
|
|
// extract lifecycle expiry date and rule ID
|
|
expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration))
|
|
|
|
return UploadInfo{
|
|
Bucket: bucketName,
|
|
Key: objectName,
|
|
ETag: trimEtag(resp.Header.Get("ETag")),
|
|
VersionID: resp.Header.Get(amzVersionID),
|
|
Size: size,
|
|
Expiration: expTime,
|
|
ExpirationRuleID: ruleID,
|
|
}, nil
|
|
}
|