793 lines
22 KiB
Go
793 lines
22 KiB
Go
|
// Copyright 2018 The Go Cloud Development Kit Authors
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// https://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
// Package s3blob provides a blob implementation that uses S3. Use OpenBucket
|
||
|
// to construct a *blob.Bucket.
|
||
|
//
|
||
|
// URLs
|
||
|
//
|
||
|
// For blob.OpenBucket, s3blob registers for the scheme "s3".
|
||
|
// The default URL opener will use an AWS session with the default credentials
|
||
|
// and configuration; see https://docs.aws.amazon.com/sdk-for-go/api/aws/session/
|
||
|
// for more details.
|
||
|
// To customize the URL opener, or for more details on the URL format,
|
||
|
// see URLOpener.
|
||
|
// See https://gocloud.dev/concepts/urls/ for background information.
|
||
|
//
|
||
|
// Escaping
|
||
|
//
|
||
|
// Go CDK supports all UTF-8 strings; to make this work with services lacking
|
||
|
// full UTF-8 support, strings must be escaped (during writes) and unescaped
|
||
|
// (during reads). The following escapes are performed for s3blob:
|
||
|
// - Blob keys: ASCII characters 0-31 are escaped to "__0x<hex>__".
|
||
|
// Additionally, the "/" in "../" and the trailing "/" in "//" are escaped in
|
||
|
// the same way.
|
||
|
// - Metadata keys: Escaped using URL encoding, then additionally "@:=" are
|
||
|
// escaped using "__0x<hex>__". These characters were determined by
|
||
|
// experimentation.
|
||
|
// - Metadata values: Escaped using URL encoding.
|
||
|
//
|
||
|
// As
|
||
|
//
|
||
|
// s3blob exposes the following types for As:
|
||
|
// - Bucket: *s3.S3
|
||
|
// - Error: awserr.Error
|
||
|
// - ListObject: s3.Object for objects, s3.CommonPrefix for "directories"
|
||
|
// - ListOptions.BeforeList: *s3.ListObjectsV2Input, or *s3.ListObjectsInput
|
||
|
// when Options.UseLegacyList == true.
|
||
|
// - Reader: s3.GetObjectOutput
|
||
|
// - ReaderOptions.BeforeRead: *s3.GetObjectInput
|
||
|
// - Attributes: s3.HeadObjectOutput
|
||
|
// - CopyOptions.BeforeCopy: *s3.CopyObjectInput
|
||
|
// - WriterOptions.BeforeWrite: *s3manager.UploadInput, *s3manager.Uploader
|
||
|
// - SignedURLOptions.BeforeSign:
|
||
|
// *s3.GetObjectInput when Options.Method == http.MethodGet, or
|
||
|
// *s3.PutObjectInput when Options.Method == http.MethodPut, or
|
||
|
// *s3.DeleteObjectInput when Options.Method == http.MethodDelete
|
||
|
package s3blob // import "gocloud.dev/blob/s3blob"
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/base64"
|
||
|
"encoding/hex"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"net/http"
|
||
|
"net/url"
|
||
|
"sort"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
|
||
|
"github.com/aws/aws-sdk-go/aws"
|
||
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||
|
"github.com/aws/aws-sdk-go/aws/client"
|
||
|
"github.com/aws/aws-sdk-go/aws/request"
|
||
|
"github.com/aws/aws-sdk-go/service/s3"
|
||
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||
|
"github.com/google/wire"
|
||
|
gcaws "gocloud.dev/aws"
|
||
|
"gocloud.dev/blob"
|
||
|
"gocloud.dev/blob/driver"
|
||
|
"gocloud.dev/gcerrors"
|
||
|
"gocloud.dev/internal/escape"
|
||
|
)
|
||
|
|
||
|
const defaultPageSize = 1000
|
||
|
|
||
|
func init() {
|
||
|
blob.DefaultURLMux().RegisterBucket(Scheme, new(urlSessionOpener))
|
||
|
}
|
||
|
|
||
|
// Set holds Wire providers for this package.
|
||
|
var Set = wire.NewSet(
|
||
|
wire.Struct(new(URLOpener), "ConfigProvider"),
|
||
|
)
|
||
|
|
||
|
type urlSessionOpener struct {
|
||
|
opener *URLOpener
|
||
|
}
|
||
|
|
||
|
func (o *urlSessionOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) {
|
||
|
sess, rest, err := gcaws.NewSessionFromURLParams(u.Query())
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("open bucket %v: %v", u, err)
|
||
|
}
|
||
|
|
||
|
o.opener = &URLOpener{
|
||
|
ConfigProvider: sess,
|
||
|
}
|
||
|
|
||
|
u.RawQuery = rest.Encode()
|
||
|
return o.opener.OpenBucketURL(ctx, u)
|
||
|
}
|
||
|
|
||
|
// Scheme is the URL scheme s3blob registers its URLOpener under on
|
||
|
// blob.DefaultMux.
|
||
|
const Scheme = "s3"
|
||
|
|
||
|
// URLOpener opens S3 URLs like "s3://mybucket".
|
||
|
//
|
||
|
// The URL host is used as the bucket name.
|
||
|
//
|
||
|
// See gocloud.dev/aws/ConfigFromURLParams for supported query parameters
|
||
|
// that affect the default AWS session.
|
||
|
type URLOpener struct {
|
||
|
// ConfigProvider must be set to a non-nil value.
|
||
|
ConfigProvider client.ConfigProvider
|
||
|
|
||
|
// Options specifies the options to pass to OpenBucket.
|
||
|
Options Options
|
||
|
}
|
||
|
|
||
|
// OpenBucketURL opens a blob.Bucket based on u.
|
||
|
func (o *URLOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) {
|
||
|
configProvider := &gcaws.ConfigOverrider{
|
||
|
Base: o.ConfigProvider,
|
||
|
}
|
||
|
overrideCfg, err := gcaws.ConfigFromURLParams(u.Query())
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("open bucket %v: %v", u, err)
|
||
|
}
|
||
|
configProvider.Configs = append(configProvider.Configs, overrideCfg)
|
||
|
return OpenBucket(ctx, configProvider, u.Host, &o.Options)
|
||
|
}
|
||
|
|
||
|
// Options sets options for constructing a *blob.Bucket backed by fileblob.
|
||
|
type Options struct {
|
||
|
// UseLegacyList forces the use of ListObjects instead of ListObjectsV2.
|
||
|
// Some S3-compatible services (like CEPH) do not currently support
|
||
|
// ListObjectsV2.
|
||
|
UseLegacyList bool
|
||
|
}
|
||
|
|
||
|
// openBucket returns an S3 Bucket.
|
||
|
func openBucket(ctx context.Context, sess client.ConfigProvider, bucketName string, opts *Options) (*bucket, error) {
|
||
|
if sess == nil {
|
||
|
return nil, errors.New("s3blob.OpenBucket: sess is required")
|
||
|
}
|
||
|
if bucketName == "" {
|
||
|
return nil, errors.New("s3blob.OpenBucket: bucketName is required")
|
||
|
}
|
||
|
if opts == nil {
|
||
|
opts = &Options{}
|
||
|
}
|
||
|
return &bucket{
|
||
|
name: bucketName,
|
||
|
client: s3.New(sess),
|
||
|
useLegacyList: opts.UseLegacyList,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
// OpenBucket returns a *blob.Bucket backed by S3.
|
||
|
// AWS buckets are bound to a region; sess must have been created using an
|
||
|
// aws.Config with Region set to the right region for bucketName.
|
||
|
// See the package documentation for an example.
|
||
|
func OpenBucket(ctx context.Context, sess client.ConfigProvider, bucketName string, opts *Options) (*blob.Bucket, error) {
|
||
|
drv, err := openBucket(ctx, sess, bucketName, opts)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return blob.NewBucket(drv), nil
|
||
|
}
|
||
|
|
||
|
// reader reads an S3 object. It implements io.ReadCloser.
|
||
|
type reader struct {
|
||
|
body io.ReadCloser
|
||
|
attrs driver.ReaderAttributes
|
||
|
raw *s3.GetObjectOutput
|
||
|
}
|
||
|
|
||
|
func (r *reader) Read(p []byte) (int, error) {
|
||
|
return r.body.Read(p)
|
||
|
}
|
||
|
|
||
|
// Close closes the reader itself. It must be called when done reading.
|
||
|
func (r *reader) Close() error {
|
||
|
return r.body.Close()
|
||
|
}
|
||
|
|
||
|
func (r *reader) As(i interface{}) bool {
|
||
|
p, ok := i.(*s3.GetObjectOutput)
|
||
|
if !ok {
|
||
|
return false
|
||
|
}
|
||
|
*p = *r.raw
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
func (r *reader) Attributes() *driver.ReaderAttributes {
|
||
|
return &r.attrs
|
||
|
}
|
||
|
|
||
|
// writer writes an S3 object, it implements io.WriteCloser.
|
||
|
type writer struct {
|
||
|
w *io.PipeWriter // created when the first byte is written
|
||
|
|
||
|
ctx context.Context
|
||
|
uploader *s3manager.Uploader
|
||
|
req *s3manager.UploadInput
|
||
|
donec chan struct{} // closed when done writing
|
||
|
// The following fields will be written before donec closes:
|
||
|
err error
|
||
|
}
|
||
|
|
||
|
// Write appends p to w. User must call Close to close the w after done writing.
|
||
|
func (w *writer) Write(p []byte) (int, error) {
|
||
|
// Avoid opening the pipe for a zero-length write;
|
||
|
// the concrete can do these for empty blobs.
|
||
|
if len(p) == 0 {
|
||
|
return 0, nil
|
||
|
}
|
||
|
if w.w == nil {
|
||
|
// We'll write into pw and use pr as an io.Reader for the
|
||
|
// Upload call to S3.
|
||
|
pr, pw := io.Pipe()
|
||
|
w.w = pw
|
||
|
if err := w.open(pr); err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
}
|
||
|
select {
|
||
|
case <-w.donec:
|
||
|
return 0, w.err
|
||
|
default:
|
||
|
}
|
||
|
return w.w.Write(p)
|
||
|
}
|
||
|
|
||
|
// pr may be nil if we're Closing and no data was written.
|
||
|
func (w *writer) open(pr *io.PipeReader) error {
|
||
|
|
||
|
go func() {
|
||
|
defer close(w.donec)
|
||
|
|
||
|
if pr == nil {
|
||
|
// AWS doesn't like a nil Body.
|
||
|
w.req.Body = http.NoBody
|
||
|
} else {
|
||
|
w.req.Body = pr
|
||
|
}
|
||
|
_, err := w.uploader.UploadWithContext(w.ctx, w.req)
|
||
|
if err != nil {
|
||
|
w.err = err
|
||
|
if pr != nil {
|
||
|
pr.CloseWithError(err)
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
}()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Close completes the writer and closes it. Any error occurring during write
|
||
|
// will be returned. If a writer is closed before any Write is called, Close
|
||
|
// will create an empty file at the given key.
|
||
|
func (w *writer) Close() error {
|
||
|
if w.w == nil {
|
||
|
// We never got any bytes written. We'll write an http.NoBody.
|
||
|
w.open(nil)
|
||
|
} else if err := w.w.Close(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
<-w.donec
|
||
|
return w.err
|
||
|
}
|
||
|
|
||
|
// bucket represents an S3 bucket and handles read, write and delete operations.
|
||
|
type bucket struct {
|
||
|
name string
|
||
|
client *s3.S3
|
||
|
useLegacyList bool
|
||
|
}
|
||
|
|
||
|
func (b *bucket) Close() error {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (b *bucket) ErrorCode(err error) gcerrors.ErrorCode {
|
||
|
e, ok := err.(awserr.Error)
|
||
|
if !ok {
|
||
|
return gcerrors.Unknown
|
||
|
}
|
||
|
switch {
|
||
|
case e.Code() == "NoSuchBucket" || e.Code() == "NoSuchKey" || e.Code() == "NotFound" || e.Code() == s3.ErrCodeObjectNotInActiveTierError:
|
||
|
return gcerrors.NotFound
|
||
|
default:
|
||
|
return gcerrors.Unknown
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// ListPaged implements driver.ListPaged.
|
||
|
func (b *bucket) ListPaged(ctx context.Context, opts *driver.ListOptions) (*driver.ListPage, error) {
|
||
|
pageSize := opts.PageSize
|
||
|
if pageSize == 0 {
|
||
|
pageSize = defaultPageSize
|
||
|
}
|
||
|
in := &s3.ListObjectsV2Input{
|
||
|
Bucket: aws.String(b.name),
|
||
|
MaxKeys: aws.Int64(int64(pageSize)),
|
||
|
}
|
||
|
if len(opts.PageToken) > 0 {
|
||
|
in.ContinuationToken = aws.String(string(opts.PageToken))
|
||
|
}
|
||
|
if opts.Prefix != "" {
|
||
|
in.Prefix = aws.String(escapeKey(opts.Prefix))
|
||
|
}
|
||
|
if opts.Delimiter != "" {
|
||
|
in.Delimiter = aws.String(escapeKey(opts.Delimiter))
|
||
|
}
|
||
|
resp, err := b.listObjects(ctx, in, opts)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
page := driver.ListPage{}
|
||
|
if resp.NextContinuationToken != nil {
|
||
|
page.NextPageToken = []byte(*resp.NextContinuationToken)
|
||
|
}
|
||
|
if n := len(resp.Contents) + len(resp.CommonPrefixes); n > 0 {
|
||
|
page.Objects = make([]*driver.ListObject, n)
|
||
|
for i, obj := range resp.Contents {
|
||
|
obj := obj
|
||
|
page.Objects[i] = &driver.ListObject{
|
||
|
Key: unescapeKey(aws.StringValue(obj.Key)),
|
||
|
ModTime: *obj.LastModified,
|
||
|
Size: *obj.Size,
|
||
|
MD5: eTagToMD5(obj.ETag),
|
||
|
AsFunc: func(i interface{}) bool {
|
||
|
p, ok := i.(*s3.Object)
|
||
|
if !ok {
|
||
|
return false
|
||
|
}
|
||
|
*p = *obj
|
||
|
return true
|
||
|
},
|
||
|
}
|
||
|
}
|
||
|
for i, prefix := range resp.CommonPrefixes {
|
||
|
prefix := prefix
|
||
|
page.Objects[i+len(resp.Contents)] = &driver.ListObject{
|
||
|
Key: unescapeKey(aws.StringValue(prefix.Prefix)),
|
||
|
IsDir: true,
|
||
|
AsFunc: func(i interface{}) bool {
|
||
|
p, ok := i.(*s3.CommonPrefix)
|
||
|
if !ok {
|
||
|
return false
|
||
|
}
|
||
|
*p = *prefix
|
||
|
return true
|
||
|
},
|
||
|
}
|
||
|
}
|
||
|
if len(resp.Contents) > 0 && len(resp.CommonPrefixes) > 0 {
|
||
|
// S3 gives us blobs and "directories" in separate lists; sort them.
|
||
|
sort.Slice(page.Objects, func(i, j int) bool {
|
||
|
return page.Objects[i].Key < page.Objects[j].Key
|
||
|
})
|
||
|
}
|
||
|
}
|
||
|
return &page, nil
|
||
|
}
|
||
|
|
||
|
func (b *bucket) listObjects(ctx context.Context, in *s3.ListObjectsV2Input, opts *driver.ListOptions) (*s3.ListObjectsV2Output, error) {
|
||
|
if !b.useLegacyList {
|
||
|
if opts.BeforeList != nil {
|
||
|
asFunc := func(i interface{}) bool {
|
||
|
p, ok := i.(**s3.ListObjectsV2Input)
|
||
|
if !ok {
|
||
|
return false
|
||
|
}
|
||
|
*p = in
|
||
|
return true
|
||
|
}
|
||
|
if err := opts.BeforeList(asFunc); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
return b.client.ListObjectsV2WithContext(ctx, in)
|
||
|
}
|
||
|
|
||
|
// Use the legacy ListObjects request.
|
||
|
legacyIn := &s3.ListObjectsInput{
|
||
|
Bucket: in.Bucket,
|
||
|
Delimiter: in.Delimiter,
|
||
|
EncodingType: in.EncodingType,
|
||
|
Marker: in.ContinuationToken,
|
||
|
MaxKeys: in.MaxKeys,
|
||
|
Prefix: in.Prefix,
|
||
|
RequestPayer: in.RequestPayer,
|
||
|
}
|
||
|
if opts.BeforeList != nil {
|
||
|
asFunc := func(i interface{}) bool {
|
||
|
p, ok := i.(**s3.ListObjectsInput)
|
||
|
if !ok {
|
||
|
return false
|
||
|
}
|
||
|
*p = legacyIn
|
||
|
return true
|
||
|
}
|
||
|
if err := opts.BeforeList(asFunc); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
legacyResp, err := b.client.ListObjectsWithContext(ctx, legacyIn)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
var nextContinuationToken *string
|
||
|
if legacyResp.NextMarker != nil {
|
||
|
nextContinuationToken = legacyResp.NextMarker
|
||
|
} else if aws.BoolValue(legacyResp.IsTruncated) {
|
||
|
nextContinuationToken = aws.String(aws.StringValue(legacyResp.Contents[len(legacyResp.Contents)-1].Key))
|
||
|
}
|
||
|
return &s3.ListObjectsV2Output{
|
||
|
CommonPrefixes: legacyResp.CommonPrefixes,
|
||
|
Contents: legacyResp.Contents,
|
||
|
NextContinuationToken: nextContinuationToken,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
// As implements driver.As.
|
||
|
func (b *bucket) As(i interface{}) bool {
|
||
|
p, ok := i.(**s3.S3)
|
||
|
if !ok {
|
||
|
return false
|
||
|
}
|
||
|
*p = b.client
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
// As implements driver.ErrorAs.
|
||
|
func (b *bucket) ErrorAs(err error, i interface{}) bool {
|
||
|
switch v := err.(type) {
|
||
|
case awserr.Error:
|
||
|
if p, ok := i.(*awserr.Error); ok {
|
||
|
*p = v
|
||
|
return true
|
||
|
}
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
// Attributes implements driver.Attributes.
|
||
|
func (b *bucket) Attributes(ctx context.Context, key string) (*driver.Attributes, error) {
|
||
|
key = escapeKey(key)
|
||
|
in := &s3.HeadObjectInput{
|
||
|
Bucket: aws.String(b.name),
|
||
|
Key: aws.String(key),
|
||
|
}
|
||
|
resp, err := b.client.HeadObjectWithContext(ctx, in)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
md := make(map[string]string, len(resp.Metadata))
|
||
|
for k, v := range resp.Metadata {
|
||
|
// See the package comments for more details on escaping of metadata
|
||
|
// keys & values.
|
||
|
md[escape.HexUnescape(escape.URLUnescape(k))] = escape.URLUnescape(aws.StringValue(v))
|
||
|
}
|
||
|
return &driver.Attributes{
|
||
|
CacheControl: aws.StringValue(resp.CacheControl),
|
||
|
ContentDisposition: aws.StringValue(resp.ContentDisposition),
|
||
|
ContentEncoding: aws.StringValue(resp.ContentEncoding),
|
||
|
ContentLanguage: aws.StringValue(resp.ContentLanguage),
|
||
|
ContentType: aws.StringValue(resp.ContentType),
|
||
|
Metadata: md,
|
||
|
// CreateTime not supported; left as the zero time.
|
||
|
ModTime: aws.TimeValue(resp.LastModified),
|
||
|
Size: aws.Int64Value(resp.ContentLength),
|
||
|
MD5: eTagToMD5(resp.ETag),
|
||
|
ETag: aws.StringValue(resp.ETag),
|
||
|
AsFunc: func(i interface{}) bool {
|
||
|
p, ok := i.(*s3.HeadObjectOutput)
|
||
|
if !ok {
|
||
|
return false
|
||
|
}
|
||
|
*p = *resp
|
||
|
return true
|
||
|
},
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
// NewRangeReader implements driver.NewRangeReader.
|
||
|
func (b *bucket) NewRangeReader(ctx context.Context, key string, offset, length int64, opts *driver.ReaderOptions) (driver.Reader, error) {
|
||
|
key = escapeKey(key)
|
||
|
in := &s3.GetObjectInput{
|
||
|
Bucket: aws.String(b.name),
|
||
|
Key: aws.String(key),
|
||
|
}
|
||
|
if offset > 0 && length < 0 {
|
||
|
in.Range = aws.String(fmt.Sprintf("bytes=%d-", offset))
|
||
|
} else if length == 0 {
|
||
|
// AWS doesn't support a zero-length read; we'll read 1 byte and then
|
||
|
// ignore it in favor of http.NoBody below.
|
||
|
in.Range = aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset))
|
||
|
} else if length >= 0 {
|
||
|
in.Range = aws.String(fmt.Sprintf("bytes=%d-%d", offset, offset+length-1))
|
||
|
}
|
||
|
if opts.BeforeRead != nil {
|
||
|
asFunc := func(i interface{}) bool {
|
||
|
if p, ok := i.(**s3.GetObjectInput); ok {
|
||
|
*p = in
|
||
|
return true
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
if err := opts.BeforeRead(asFunc); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
resp, err := b.client.GetObjectWithContext(ctx, in)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
body := resp.Body
|
||
|
if length == 0 {
|
||
|
body = http.NoBody
|
||
|
}
|
||
|
return &reader{
|
||
|
body: body,
|
||
|
attrs: driver.ReaderAttributes{
|
||
|
ContentType: aws.StringValue(resp.ContentType),
|
||
|
ModTime: aws.TimeValue(resp.LastModified),
|
||
|
Size: getSize(resp),
|
||
|
},
|
||
|
raw: resp,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
// etagToMD5 processes an ETag header and returns an MD5 hash if possible.
|
||
|
// S3's ETag header is sometimes a quoted hexstring of the MD5. Other times,
|
||
|
// notably when the object was uploaded in multiple parts, it is not.
|
||
|
// We do the best we can.
|
||
|
// Some links about ETag:
|
||
|
// https://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html
|
||
|
// https://github.com/aws/aws-sdk-net/issues/815
|
||
|
// https://teppen.io/2018/06/23/aws_s3_etags/
|
||
|
func eTagToMD5(etag *string) []byte {
|
||
|
if etag == nil {
|
||
|
// No header at all.
|
||
|
return nil
|
||
|
}
|
||
|
// Strip the expected leading and trailing quotes.
|
||
|
quoted := *etag
|
||
|
if len(quoted) < 2 || quoted[0] != '"' || quoted[len(quoted)-1] != '"' {
|
||
|
return nil
|
||
|
}
|
||
|
unquoted := quoted[1 : len(quoted)-1]
|
||
|
// Un-hex; we return nil on error. In particular, we'll get an error here
|
||
|
// for multi-part uploaded blobs, whose ETag will contain a "-" and so will
|
||
|
// never be a legal hex encoding.
|
||
|
md5, err := hex.DecodeString(unquoted)
|
||
|
if err != nil {
|
||
|
return nil
|
||
|
}
|
||
|
return md5
|
||
|
}
|
||
|
|
||
|
func getSize(resp *s3.GetObjectOutput) int64 {
|
||
|
// Default size to ContentLength, but that's incorrect for partial-length reads,
|
||
|
// where ContentLength refers to the size of the returned Body, not the entire
|
||
|
// size of the blob. ContentRange has the full size.
|
||
|
size := aws.Int64Value(resp.ContentLength)
|
||
|
if cr := aws.StringValue(resp.ContentRange); cr != "" {
|
||
|
// Sample: bytes 10-14/27 (where 27 is the full size).
|
||
|
parts := strings.Split(cr, "/")
|
||
|
if len(parts) == 2 {
|
||
|
if i, err := strconv.ParseInt(parts[1], 10, 64); err == nil {
|
||
|
size = i
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return size
|
||
|
}
|
||
|
|
||
|
// escapeKey does all required escaping for UTF-8 strings to work with S3.
|
||
|
func escapeKey(key string) string {
|
||
|
return escape.HexEscape(key, func(r []rune, i int) bool {
|
||
|
c := r[i]
|
||
|
switch {
|
||
|
// S3 doesn't handle these characters (determined via experimentation).
|
||
|
case c < 32:
|
||
|
return true
|
||
|
// For "../", escape the trailing slash.
|
||
|
case i > 1 && c == '/' && r[i-1] == '.' && r[i-2] == '.':
|
||
|
return true
|
||
|
// For "//", escape the trailing slash. Otherwise, S3 drops it.
|
||
|
case i > 0 && c == '/' && r[i-1] == '/':
|
||
|
return true
|
||
|
}
|
||
|
return false
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// unescapeKey reverses escapeKey.
|
||
|
func unescapeKey(key string) string {
|
||
|
return escape.HexUnescape(key)
|
||
|
}
|
||
|
|
||
|
// NewTypedWriter implements driver.NewTypedWriter.
|
||
|
func (b *bucket) NewTypedWriter(ctx context.Context, key string, contentType string, opts *driver.WriterOptions) (driver.Writer, error) {
|
||
|
key = escapeKey(key)
|
||
|
uploader := s3manager.NewUploaderWithClient(b.client, func(u *s3manager.Uploader) {
|
||
|
if opts.BufferSize != 0 {
|
||
|
u.PartSize = int64(opts.BufferSize)
|
||
|
}
|
||
|
})
|
||
|
md := make(map[string]*string, len(opts.Metadata))
|
||
|
for k, v := range opts.Metadata {
|
||
|
// See the package comments for more details on escaping of metadata
|
||
|
// keys & values.
|
||
|
k = escape.HexEscape(url.PathEscape(k), func(runes []rune, i int) bool {
|
||
|
c := runes[i]
|
||
|
return c == '@' || c == ':' || c == '='
|
||
|
})
|
||
|
md[k] = aws.String(url.PathEscape(v))
|
||
|
}
|
||
|
req := &s3manager.UploadInput{
|
||
|
Bucket: aws.String(b.name),
|
||
|
ContentType: aws.String(contentType),
|
||
|
Key: aws.String(key),
|
||
|
Metadata: md,
|
||
|
}
|
||
|
if opts.CacheControl != "" {
|
||
|
req.CacheControl = aws.String(opts.CacheControl)
|
||
|
}
|
||
|
if opts.ContentDisposition != "" {
|
||
|
req.ContentDisposition = aws.String(opts.ContentDisposition)
|
||
|
}
|
||
|
if opts.ContentEncoding != "" {
|
||
|
req.ContentEncoding = aws.String(opts.ContentEncoding)
|
||
|
}
|
||
|
if opts.ContentLanguage != "" {
|
||
|
req.ContentLanguage = aws.String(opts.ContentLanguage)
|
||
|
}
|
||
|
if len(opts.ContentMD5) > 0 {
|
||
|
req.ContentMD5 = aws.String(base64.StdEncoding.EncodeToString(opts.ContentMD5))
|
||
|
}
|
||
|
if opts.BeforeWrite != nil {
|
||
|
asFunc := func(i interface{}) bool {
|
||
|
pu, ok := i.(**s3manager.Uploader)
|
||
|
if ok {
|
||
|
*pu = uploader
|
||
|
return true
|
||
|
}
|
||
|
pui, ok := i.(**s3manager.UploadInput)
|
||
|
if ok {
|
||
|
*pui = req
|
||
|
return true
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
if err := opts.BeforeWrite(asFunc); err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
return &writer{
|
||
|
ctx: ctx,
|
||
|
uploader: uploader,
|
||
|
req: req,
|
||
|
donec: make(chan struct{}),
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
// Copy implements driver.Copy.
|
||
|
func (b *bucket) Copy(ctx context.Context, dstKey, srcKey string, opts *driver.CopyOptions) error {
|
||
|
dstKey = escapeKey(dstKey)
|
||
|
srcKey = escapeKey(srcKey)
|
||
|
input := &s3.CopyObjectInput{
|
||
|
Bucket: aws.String(b.name),
|
||
|
CopySource: aws.String(b.name + "/" + srcKey),
|
||
|
Key: aws.String(dstKey),
|
||
|
}
|
||
|
if opts.BeforeCopy != nil {
|
||
|
asFunc := func(i interface{}) bool {
|
||
|
switch v := i.(type) {
|
||
|
case **s3.CopyObjectInput:
|
||
|
*v = input
|
||
|
return true
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
if err := opts.BeforeCopy(asFunc); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
_, err := b.client.CopyObjectWithContext(ctx, input)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Delete implements driver.Delete.
|
||
|
func (b *bucket) Delete(ctx context.Context, key string) error {
|
||
|
if _, err := b.Attributes(ctx, key); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
key = escapeKey(key)
|
||
|
input := &s3.DeleteObjectInput{
|
||
|
Bucket: aws.String(b.name),
|
||
|
Key: aws.String(key),
|
||
|
}
|
||
|
_, err := b.client.DeleteObjectWithContext(ctx, input)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (b *bucket) SignedURL(_ context.Context, key string, opts *driver.SignedURLOptions) (string, error) {
|
||
|
key = escapeKey(key)
|
||
|
var req *request.Request
|
||
|
switch opts.Method {
|
||
|
case http.MethodGet:
|
||
|
in := &s3.GetObjectInput{
|
||
|
Bucket: aws.String(b.name),
|
||
|
Key: aws.String(key),
|
||
|
}
|
||
|
if opts.BeforeSign != nil {
|
||
|
asFunc := func(i interface{}) bool {
|
||
|
v, ok := i.(**s3.GetObjectInput)
|
||
|
if ok {
|
||
|
*v = in
|
||
|
}
|
||
|
return ok
|
||
|
}
|
||
|
if err := opts.BeforeSign(asFunc); err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
}
|
||
|
req, _ = b.client.GetObjectRequest(in)
|
||
|
case http.MethodPut:
|
||
|
in := &s3.PutObjectInput{
|
||
|
Bucket: aws.String(b.name),
|
||
|
Key: aws.String(key),
|
||
|
}
|
||
|
if opts.EnforceAbsentContentType || opts.ContentType != "" {
|
||
|
in.ContentType = aws.String(opts.ContentType)
|
||
|
}
|
||
|
if opts.BeforeSign != nil {
|
||
|
asFunc := func(i interface{}) bool {
|
||
|
v, ok := i.(**s3.PutObjectInput)
|
||
|
if ok {
|
||
|
*v = in
|
||
|
}
|
||
|
return ok
|
||
|
}
|
||
|
if err := opts.BeforeSign(asFunc); err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
}
|
||
|
req, _ = b.client.PutObjectRequest(in)
|
||
|
case http.MethodDelete:
|
||
|
in := &s3.DeleteObjectInput{
|
||
|
Bucket: aws.String(b.name),
|
||
|
Key: aws.String(key),
|
||
|
}
|
||
|
if opts.BeforeSign != nil {
|
||
|
asFunc := func(i interface{}) bool {
|
||
|
v, ok := i.(**s3.DeleteObjectInput)
|
||
|
if ok {
|
||
|
*v = in
|
||
|
}
|
||
|
return ok
|
||
|
}
|
||
|
if err := opts.BeforeSign(asFunc); err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
}
|
||
|
req, _ = b.client.DeleteObjectRequest(in)
|
||
|
default:
|
||
|
return "", fmt.Errorf("unsupported Method %q", opts.Method)
|
||
|
}
|
||
|
return req.Presign(opts.Expiry)
|
||
|
}
|