// Copyright 2018 The Go Cloud Development Kit Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package azureblob provides a blob implementation that uses Azure Storage’s // BlockBlob. Use OpenBucket to construct a *blob.Bucket. // // NOTE: SignedURLs for PUT created with this package are not fully portable; // they will not work unless the PUT request includes a "x-ms-blob-type" header // set to "BlockBlob". // See https://stackoverflow.com/questions/37824136/put-on-sas-blob-url-without-specifying-x-ms-blob-type-header. // // URLs // // For blob.OpenBucket, azureblob registers for the scheme "azblob". // The default URL opener will use credentials from the environment variables // AZURE_STORAGE_ACCOUNT, AZURE_STORAGE_KEY, and AZURE_STORAGE_SAS_TOKEN. // AZURE_STORAGE_ACCOUNT is required, along with one of the other two. // AZURE_STORAGE_DOMAIN can optionally be used to provide an Azure Environment // blob storage domain to use. If no AZURE_STORAGE_DOMAIN is provided, the // default Azure public domain "blob.core.windows.net" will be used. Check // the Azure Developer Guide for your particular cloud environment to see // the proper blob storage domain name to provide. // To customize the URL opener, or for more details on the URL format, // see URLOpener. // See https://gocloud.dev/concepts/urls/ for background information. // // Escaping // // Go CDK supports all UTF-8 strings; to make this work with services lacking // full UTF-8 support, strings must be escaped (during writes) and unescaped // (during reads). The following escapes are performed for azureblob: // - Blob keys: ASCII characters 0-31, 92 ("\"), and 127 are escaped to // "__0x__". Additionally, the "/" in "../" and a trailing "/" in a // key (e.g., "foo/") are escaped in the same way. // - Metadata keys: Per https://docs.microsoft.com/en-us/azure/storage/blobs/storage-properties-metadata, // Azure only allows C# identifiers as metadata keys. Therefore, characters // other than "[a-z][A-z][0-9]_" are escaped using "__0x__". In addition, // characters "[0-9]" are escaped when they start the string. // URL encoding would not work since "%" is not valid. // - Metadata values: Escaped using URL encoding. // // As // // azureblob exposes the following types for As: // - Bucket: *azblob.ContainerURL // - Error: azblob.StorageError // - ListObject: azblob.BlobItemInternal for objects, azblob.BlobPrefix for "directories" // - ListOptions.BeforeList: *azblob.ListBlobsSegmentOptions // - Reader: azblob.DownloadResponse // - Reader.BeforeRead: *azblob.BlockBlobURL, *azblob.BlobAccessConditions // - Attributes: azblob.BlobGetPropertiesResponse // - CopyOptions.BeforeCopy: azblob.Metadata, *azblob.ModifiedAccessConditions, *azblob.BlobAccessConditions // - WriterOptions.BeforeWrite: *azblob.UploadStreamToBlockBlobOptions // - SignedURLOptions.BeforeSign: *azblob.BlobSASSignatureValues package azureblob import ( "context" "errors" "fmt" "io" "net/http" "net/url" "os" "sort" "strconv" "strings" "sync" "time" "github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-storage-blob-go/azblob" "github.com/Azure/go-autorest/autorest/adal" "github.com/Azure/go-autorest/autorest/azure" "github.com/google/uuid" "github.com/google/wire" "gocloud.dev/blob" "gocloud.dev/blob/driver" "gocloud.dev/gcerrors" "gocloud.dev/internal/escape" "gocloud.dev/internal/gcerr" "gocloud.dev/internal/useragent" ) const ( tokenRefreshTolerance = 300 ) // Options sets options for constructing a *blob.Bucket backed by Azure Block Blob. type Options struct { // Credential represents the authorizer for SignedURL. // Required to use SignedURL. If you're using MSI for authentication, this will // attempt to be loaded lazily the first time you call SignedURL. Credential azblob.StorageAccountCredential // SASToken can be provided along with anonymous credentials to use // delegated privileges. // See https://docs.microsoft.com/en-us/azure/storage/common/storage-dotnet-shared-access-signature-part-1#shared-access-signature-parameters. SASToken SASToken // StorageDomain can be provided to specify an Azure Cloud Environment // domain to target for the blob storage account (i.e. public, government, china). // The default value is "blob.core.windows.net". Possible values will look similar // to this but are different for each cloud (i.e. "blob.core.govcloudapi.net" for USGovernment). // Check the Azure developer guide for the cloud environment where your bucket resides. StorageDomain StorageDomain // Protocol can be provided to specify protocol to access Azure Blob Storage. // Protocols that can be specified are "http" for local emulator and "https" for general. // If blank is specified, "https" will be used. Protocol Protocol } const ( defaultMaxDownloadRetryRequests = 3 // download retry policy (Azure default is zero) defaultPageSize = 1000 // default page size for ListPaged (Azure default is 5000) defaultUploadBuffers = 5 // configure the number of rotating buffers that are used when uploading (for degree of parallelism) defaultUploadBlockSize = 8 * 1024 * 1024 // configure the upload buffer size ) func init() { blob.DefaultURLMux().RegisterBucket(Scheme, new(lazyCredsOpener)) } // Set holds Wire providers for this package. var Set = wire.NewSet( NewPipeline, wire.Struct(new(Options), "Credential", "SASToken"), wire.Struct(new(URLOpener), "AccountName", "Pipeline", "Options"), ) // lazyCredsOpener obtains credentials from the environment on the first call // to OpenBucketURL. type lazyCredsOpener struct { init sync.Once opener *URLOpener err error } func (o *lazyCredsOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) { o.init.Do(func() { // Use default credential info from the environment. // Ignore errors, as we'll get errors from OpenBucket later. accountName, _ := DefaultAccountName() accountKey, _ := DefaultAccountKey() sasToken, _ := DefaultSASToken() storageDomain, _ := DefaultStorageDomain() protocol, _ := DefaultProtocol() isMSIEnvironment := adal.MSIAvailable(ctx, adal.CreateSender()) if accountKey != "" || sasToken != "" { o.opener, o.err = openerFromEnv(accountName, accountKey, sasToken, storageDomain, protocol) } else if isMSIEnvironment { o.opener, o.err = openerFromMSI(accountName, storageDomain, protocol) } else { o.opener, o.err = openerFromAnon(accountName, storageDomain, protocol) } }) if o.err != nil { return nil, fmt.Errorf("open bucket %v: %v", u, o.err) } return o.opener.OpenBucketURL(ctx, u) } // Scheme is the URL scheme gcsblob registers its URLOpener under on // blob.DefaultMux. const Scheme = "azblob" // URLOpener opens Azure URLs like "azblob://mybucket". // // The URL host is used as the bucket name. // // The following query options are supported: // - domain: The domain name used to access the Azure Blob storage (e.g. blob.core.windows.net) type URLOpener struct { // AccountName must be specified. AccountName AccountName // Pipeline must be set to a non-nil value. Pipeline pipeline.Pipeline // Options specifies the options to pass to OpenBucket. Options Options } func openerFromEnv(accountName AccountName, accountKey AccountKey, sasToken SASToken, storageDomain StorageDomain, protocol Protocol) (*URLOpener, error) { // azblob.Credential is an interface; we will use either a SharedKeyCredential // or anonymous credentials. If the former, we will also fill in // Options.Credential so that SignedURL will work. var credential azblob.Credential var storageAccountCredential azblob.StorageAccountCredential if accountKey != "" { sharedKeyCred, err := NewCredential(accountName, accountKey) if err != nil { return nil, fmt.Errorf("invalid credentials %s/%s: %v", accountName, accountKey, err) } credential = sharedKeyCred storageAccountCredential = sharedKeyCred } else { credential = azblob.NewAnonymousCredential() } return &URLOpener{ AccountName: accountName, Pipeline: NewPipeline(credential, azblob.PipelineOptions{}), Options: Options{ Credential: storageAccountCredential, SASToken: sasToken, StorageDomain: storageDomain, Protocol: protocol, }, }, nil } // openerFromAnon creates an anonymous credential backend URLOpener func openerFromAnon(accountName AccountName, storageDomain StorageDomain, protocol Protocol) (*URLOpener, error) { return &URLOpener{ AccountName: accountName, Pipeline: NewPipeline(azblob.NewAnonymousCredential(), azblob.PipelineOptions{}), Options: Options{ StorageDomain: storageDomain, Protocol: protocol, }, }, nil } var defaultTokenRefreshFunction = func(spToken *adal.ServicePrincipalToken) func(credential azblob.TokenCredential) time.Duration { return func(credential azblob.TokenCredential) time.Duration { err := spToken.Refresh() if err != nil { return 0 } expiresIn, err := strconv.ParseInt(string(spToken.Token().ExpiresIn), 10, 64) if err != nil { return 0 } credential.SetToken(spToken.Token().AccessToken) return time.Duration(expiresIn-tokenRefreshTolerance) * time.Second } } // openerFromMSI acquires an MSI token and returns TokenCredential backed URLOpener func openerFromMSI(accountName AccountName, storageDomain StorageDomain, protocol Protocol) (*URLOpener, error) { spToken, err := getMSIServicePrincipalToken(azure.PublicCloud.ResourceIdentifiers.Storage) if err != nil { return nil, fmt.Errorf("failure acquiring token from MSI endpoint %w", err) } err = spToken.Refresh() if err != nil { return nil, fmt.Errorf("failure refreshing token from MSI endpoint %w", err) } credential := azblob.NewTokenCredential(spToken.Token().AccessToken, defaultTokenRefreshFunction(spToken)) return &URLOpener{ AccountName: accountName, Pipeline: NewPipeline(credential, azblob.PipelineOptions{}), Options: Options{ StorageDomain: storageDomain, Protocol: protocol, }, }, nil } // getMSIServicePrincipalToken retrieves Azure API Service Principal token. func getMSIServicePrincipalToken(resource string) (*adal.ServicePrincipalToken, error) { msiEndpoint, err := adal.GetMSIEndpoint() if err != nil { return nil, fmt.Errorf("failed to get the managed service identity endpoint: %v", err) } token, err := adal.NewServicePrincipalTokenFromMSI(msiEndpoint, resource) if err != nil { return nil, fmt.Errorf("failed to create the managed service identity token: %v", err) } return token, nil } // OpenBucketURL opens a blob.Bucket based on u. func (o *URLOpener) OpenBucketURL(ctx context.Context, u *url.URL) (*blob.Bucket, error) { opts := new(Options) *opts = o.Options err := setOptionsFromURLParams(u.Query(), opts) if err != nil { return nil, err } return OpenBucket(ctx, o.Pipeline, o.AccountName, u.Host, opts) } func setOptionsFromURLParams(q url.Values, o *Options) error { for param, values := range q { if len(values) > 1 { return fmt.Errorf("multiple values of %v not allowed", param) } value := values[0] switch param { case "domain": o.StorageDomain = StorageDomain(value) default: return fmt.Errorf("unknown query parameter %q", param) } } return nil } // DefaultIdentity is a Wire provider set that provides an Azure storage // account name, key, and SharedKeyCredential from environment variables. var DefaultIdentity = wire.NewSet( DefaultAccountName, DefaultAccountKey, NewCredential, wire.Bind(new(azblob.Credential), new(*azblob.SharedKeyCredential)), wire.Value(azblob.PipelineOptions{}), ) // SASTokenIdentity is a Wire provider set that provides an Azure storage // account name, SASToken, and anonymous credential from environment variables. var SASTokenIdentity = wire.NewSet( DefaultAccountName, DefaultSASToken, azblob.NewAnonymousCredential, wire.Value(azblob.PipelineOptions{}), ) // AccountName is an Azure storage account name. type AccountName string // AccountKey is an Azure storage account key (primary or secondary). type AccountKey string // SASToken is an Azure shared access signature. // https://docs.microsoft.com/en-us/azure/storage/common/storage-dotnet-shared-access-signature-part-1 type SASToken string // StorageDomain is an Azure Cloud Environment domain name to target // (i.e. blob.core.windows.net, blob.core.govcloudapi.net, blob.core.chinacloudapi.cn). // It is read from the AZURE_STORAGE_DOMAIN environment variable. type StorageDomain string // Protocol is an protocol to access Azure Blob Storage. // It must be "http" or "https". // It is read from the AZURE_STORAGE_PROTOCOL environment variable. type Protocol string // DefaultAccountName loads the Azure storage account name from the // AZURE_STORAGE_ACCOUNT environment variable. func DefaultAccountName() (AccountName, error) { s := os.Getenv("AZURE_STORAGE_ACCOUNT") if s == "" { return "", errors.New("azureblob: environment variable AZURE_STORAGE_ACCOUNT not set") } return AccountName(s), nil } // DefaultAccountKey loads the Azure storage account key (primary or secondary) // from the AZURE_STORAGE_KEY environment variable. func DefaultAccountKey() (AccountKey, error) { s := os.Getenv("AZURE_STORAGE_KEY") if s == "" { return "", errors.New("azureblob: environment variable AZURE_STORAGE_KEY not set") } return AccountKey(s), nil } // DefaultSASToken loads a Azure SAS token from the AZURE_STORAGE_SAS_TOKEN // environment variable. func DefaultSASToken() (SASToken, error) { s := os.Getenv("AZURE_STORAGE_SAS_TOKEN") if s == "" { return "", errors.New("azureblob: environment variable AZURE_STORAGE_SAS_TOKEN not set") } return SASToken(s), nil } // DefaultStorageDomain loads the desired Azure Cloud to target from // the AZURE_STORAGE_DOMAIN environment variable. func DefaultStorageDomain() (StorageDomain, error) { s := os.Getenv("AZURE_STORAGE_DOMAIN") return StorageDomain(s), nil } // DefaultProtocol loads the protocol to access Azure Blob Storage from the // AZURE_STORAGE_PROTOCOL environment variable. func DefaultProtocol() (Protocol, error) { s := os.Getenv("AZURE_STORAGE_PROTOCOL") return Protocol(s), nil } // NewCredential creates a SharedKeyCredential. func NewCredential(accountName AccountName, accountKey AccountKey) (*azblob.SharedKeyCredential, error) { return azblob.NewSharedKeyCredential(string(accountName), string(accountKey)) } // NewPipeline creates a Pipeline for making HTTP requests to Azure. func NewPipeline(credential azblob.Credential, opts azblob.PipelineOptions) pipeline.Pipeline { opts.Telemetry.Value = useragent.AzureUserAgentPrefix("blob") + opts.Telemetry.Value return azblob.NewPipeline(credential, opts) } // bucket represents a Azure Storage Account Container, which handles read, // write and delete operations on objects within it. // See https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blobs-introduction. type bucket struct { name string pageMarkers map[string]azblob.Marker serviceURL *azblob.ServiceURL containerURL azblob.ContainerURL opts *Options mu sync.Mutex // protect the fields below credentialExpiration time.Time delegationCredentials azblob.StorageAccountCredential } // OpenBucket returns a *blob.Bucket backed by Azure Storage Account. See the package // documentation for an example and // https://godoc.org/github.com/Azure/azure-storage-blob-go/azblob // for more details. func OpenBucket(ctx context.Context, pipeline pipeline.Pipeline, accountName AccountName, containerName string, opts *Options) (*blob.Bucket, error) { b, err := openBucket(ctx, pipeline, accountName, containerName, opts) if err != nil { return nil, err } return blob.NewBucket(b), nil } func openBucket(ctx context.Context, pipeline pipeline.Pipeline, accountName AccountName, containerName string, opts *Options) (*bucket, error) { if pipeline == nil { return nil, errors.New("azureblob.OpenBucket: pipeline is required") } if accountName == "" { return nil, errors.New("azureblob.OpenBucket: accountName is required") } if containerName == "" { return nil, errors.New("azureblob.OpenBucket: containerName is required") } if opts == nil { opts = &Options{} } if opts.StorageDomain == "" { // If opts.StorageDomain is missing, use default domain. opts.StorageDomain = "blob.core.windows.net" } switch opts.Protocol { case "": // If opts.Protocol is missing, use "https". opts.Protocol = "https" case "https", "http": default: return nil, errors.New("azureblob.OpenBucket: protocol must be http or https") } d := string(opts.StorageDomain) var u string // The URL structure of the local emulator is a bit different from the real one. if strings.HasPrefix(d, "127.0.0.1") || strings.HasPrefix(d, "localhost") { u = fmt.Sprintf("%s://%s/%s", opts.Protocol, opts.StorageDomain, accountName) // http://127.0.0.1:10000/devstoreaccount1 } else { u = fmt.Sprintf("%s://%s.%s", opts.Protocol, accountName, opts.StorageDomain) // https://myaccount.blob.core.windows.net } blobURL, err := url.Parse(u) if err != nil { return nil, err } if opts.SASToken != "" { // The Azure portal includes a leading "?" for the SASToken, which we // don't want here. blobURL.RawQuery = strings.TrimPrefix(string(opts.SASToken), "?") } serviceURL := azblob.NewServiceURL(*blobURL, pipeline) return &bucket{ name: containerName, pageMarkers: map[string]azblob.Marker{}, serviceURL: &serviceURL, containerURL: serviceURL.NewContainerURL(containerName), opts: opts, }, nil } // Close implements driver.Close. func (b *bucket) Close() error { return nil } // Copy implements driver.Copy. func (b *bucket) Copy(ctx context.Context, dstKey, srcKey string, opts *driver.CopyOptions) error { dstKey = escapeKey(dstKey, false) dstBlobURL := b.containerURL.NewBlobURL(dstKey) srcKey = escapeKey(srcKey, false) srcURL := b.containerURL.NewBlobURL(srcKey).URL() md := azblob.Metadata{} mac := azblob.ModifiedAccessConditions{} bac := azblob.BlobAccessConditions{} at := azblob.AccessTierNone btm := azblob.BlobTagsMap{} if opts.BeforeCopy != nil { asFunc := func(i interface{}) bool { switch v := i.(type) { case *azblob.Metadata: *v = md return true case **azblob.ModifiedAccessConditions: *v = &mac return true case **azblob.BlobAccessConditions: *v = &bac return true } return false } if err := opts.BeforeCopy(asFunc); err != nil { return err } } resp, err := dstBlobURL.StartCopyFromURL(ctx, srcURL, md, mac, bac, at, btm) if err != nil { return err } copyStatus := resp.CopyStatus() nErrors := 0 for copyStatus == azblob.CopyStatusPending { // Poll until the copy is complete. time.Sleep(500 * time.Millisecond) propertiesResp, err := dstBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) if err != nil { // A GetProperties failure may be transient, so allow a couple // of them before giving up. nErrors++ if ctx.Err() != nil || nErrors == 3 { return err } } copyStatus = propertiesResp.CopyStatus() } if copyStatus != azblob.CopyStatusSuccess { return fmt.Errorf("Copy failed with status: %s", copyStatus) } return nil } // Delete implements driver.Delete. func (b *bucket) Delete(ctx context.Context, key string) error { key = escapeKey(key, false) blockBlobURL := b.containerURL.NewBlockBlobURL(key) _, err := blockBlobURL.Delete(ctx, azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}) return err } // reader reads an azblob. It implements io.ReadCloser. type reader struct { body io.ReadCloser attrs driver.ReaderAttributes raw *azblob.DownloadResponse } func (r *reader) Read(p []byte) (int, error) { return r.body.Read(p) } func (r *reader) Close() error { return r.body.Close() } func (r *reader) Attributes() *driver.ReaderAttributes { return &r.attrs } func (r *reader) As(i interface{}) bool { p, ok := i.(*azblob.DownloadResponse) if !ok { return false } *p = *r.raw return true } // NewRangeReader implements driver.NewRangeReader. func (b *bucket) NewRangeReader(ctx context.Context, key string, offset, length int64, opts *driver.ReaderOptions) (driver.Reader, error) { key = escapeKey(key, false) blockBlobURL := b.containerURL.NewBlockBlobURL(key) blockBlobURLp := &blockBlobURL accessConditions := &azblob.BlobAccessConditions{} end := length if end < 0 { end = azblob.CountToEnd } if opts.BeforeRead != nil { asFunc := func(i interface{}) bool { if p, ok := i.(**azblob.BlockBlobURL); ok { *p = blockBlobURLp return true } if p, ok := i.(**azblob.BlobAccessConditions); ok { *p = accessConditions return true } return false } if err := opts.BeforeRead(asFunc); err != nil { return nil, err } } blobDownloadResponse, err := blockBlobURLp.Download(ctx, offset, end, *accessConditions, false, azblob.ClientProvidedKeyOptions{}) if err != nil { return nil, err } attrs := driver.ReaderAttributes{ ContentType: blobDownloadResponse.ContentType(), Size: getSize(blobDownloadResponse.ContentLength(), blobDownloadResponse.ContentRange()), ModTime: blobDownloadResponse.LastModified(), } var body io.ReadCloser if length == 0 { body = http.NoBody } else { body = blobDownloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: defaultMaxDownloadRetryRequests}) } return &reader{ body: body, attrs: attrs, raw: blobDownloadResponse, }, nil } func getSize(contentLength int64, contentRange string) int64 { // Default size to ContentLength, but that's incorrect for partial-length reads, // where ContentLength refers to the size of the returned Body, not the entire // size of the blob. ContentRange has the full size. size := contentLength if contentRange != "" { // Sample: bytes 10-14/27 (where 27 is the full size). parts := strings.Split(contentRange, "/") if len(parts) == 2 { if i, err := strconv.ParseInt(parts[1], 10, 64); err == nil { size = i } } } return size } // As implements driver.As. func (b *bucket) As(i interface{}) bool { p, ok := i.(**azblob.ContainerURL) if !ok { return false } *p = &b.containerURL return true } // As implements driver.ErrorAs. func (b *bucket) ErrorAs(err error, i interface{}) bool { switch v := err.(type) { case azblob.StorageError: if p, ok := i.(*azblob.StorageError); ok { *p = v return true } } return false } func (b *bucket) ErrorCode(err error) gcerrors.ErrorCode { serr, ok := err.(azblob.StorageError) switch { case !ok: // This happens with an invalid storage account name; the host // is something like invalidstorageaccount.blob.core.windows.net. if strings.Contains(err.Error(), "no such host") { return gcerrors.NotFound } return gcerrors.Unknown case serr.ServiceCode() == azblob.ServiceCodeBlobNotFound || serr.Response().StatusCode == 404: // Check and fail both the SDK ServiceCode and the Http Response Code for NotFound return gcerrors.NotFound case serr.ServiceCode() == azblob.ServiceCodeAuthenticationFailed: return gcerrors.PermissionDenied default: return gcerrors.Unknown } } // Attributes implements driver.Attributes. func (b *bucket) Attributes(ctx context.Context, key string) (*driver.Attributes, error) { key = escapeKey(key, false) blockBlobURL := b.containerURL.NewBlockBlobURL(key) blobPropertiesResponse, err := blockBlobURL.GetProperties(ctx, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{}) if err != nil { return nil, err } azureMD := blobPropertiesResponse.NewMetadata() md := make(map[string]string, len(azureMD)) for k, v := range azureMD { // See the package comments for more details on escaping of metadata // keys & values. md[escape.HexUnescape(k)] = escape.URLUnescape(v) } return &driver.Attributes{ CacheControl: blobPropertiesResponse.CacheControl(), ContentDisposition: blobPropertiesResponse.ContentDisposition(), ContentEncoding: blobPropertiesResponse.ContentEncoding(), ContentLanguage: blobPropertiesResponse.ContentLanguage(), ContentType: blobPropertiesResponse.ContentType(), Size: blobPropertiesResponse.ContentLength(), CreateTime: blobPropertiesResponse.CreationTime(), ModTime: blobPropertiesResponse.LastModified(), MD5: blobPropertiesResponse.ContentMD5(), ETag: fmt.Sprintf("%v", blobPropertiesResponse.ETag()), Metadata: md, AsFunc: func(i interface{}) bool { p, ok := i.(*azblob.BlobGetPropertiesResponse) if !ok { return false } *p = *blobPropertiesResponse return true }, }, nil } // ListPaged implements driver.ListPaged. func (b *bucket) ListPaged(ctx context.Context, opts *driver.ListOptions) (*driver.ListPage, error) { pageSize := opts.PageSize if pageSize == 0 { pageSize = defaultPageSize } marker := azblob.Marker{} if len(opts.PageToken) > 0 { if m, ok := b.pageMarkers[string(opts.PageToken)]; ok { marker = m } } azOpts := azblob.ListBlobsSegmentOptions{ MaxResults: int32(pageSize), Prefix: escapeKey(opts.Prefix, true), } if opts.BeforeList != nil { asFunc := func(i interface{}) bool { p, ok := i.(**azblob.ListBlobsSegmentOptions) if !ok { return false } *p = &azOpts return true } if err := opts.BeforeList(asFunc); err != nil { return nil, err } } listBlob, err := b.containerURL.ListBlobsHierarchySegment(ctx, marker, escapeKey(opts.Delimiter, true), azOpts) if err != nil { return nil, err } page := &driver.ListPage{} page.Objects = []*driver.ListObject{} for _, blobPrefix := range listBlob.Segment.BlobPrefixes { page.Objects = append(page.Objects, &driver.ListObject{ Key: unescapeKey(blobPrefix.Name), Size: 0, IsDir: true, AsFunc: func(i interface{}) bool { p, ok := i.(*azblob.BlobPrefix) if !ok { return false } *p = blobPrefix return true }}) } for _, blobInfo := range listBlob.Segment.BlobItems { page.Objects = append(page.Objects, &driver.ListObject{ Key: unescapeKey(blobInfo.Name), ModTime: blobInfo.Properties.LastModified, Size: *blobInfo.Properties.ContentLength, MD5: blobInfo.Properties.ContentMD5, IsDir: false, AsFunc: func(i interface{}) bool { p, ok := i.(*azblob.BlobItemInternal) if !ok { return false } *p = blobInfo return true }, }) } if listBlob.NextMarker.NotDone() { token := uuid.New().String() b.pageMarkers[token] = listBlob.NextMarker page.NextPageToken = []byte(token) } if len(listBlob.Segment.BlobPrefixes) > 0 && len(listBlob.Segment.BlobItems) > 0 { sort.Slice(page.Objects, func(i, j int) bool { return page.Objects[i].Key < page.Objects[j].Key }) } return page, nil } func (b *bucket) refreshDelegationCredentials(ctx context.Context) (azblob.StorageAccountCredential, error) { b.mu.Lock() defer b.mu.Unlock() if time.Now().UTC().After(b.credentialExpiration) { validPeriod := 48 * time.Hour currentTime := time.Now().UTC() expires := currentTime.Add(validPeriod) keyInfo := azblob.NewKeyInfo(currentTime, expires) creds, err := b.serviceURL.GetUserDelegationCredential(ctx, keyInfo, nil /* default timeout */, nil /* no request id */) if err != nil { return nil, err } b.credentialExpiration = expires b.delegationCredentials = creds } return b.delegationCredentials, nil } // SignedURL implements driver.SignedURL. func (b *bucket) SignedURL(ctx context.Context, key string, opts *driver.SignedURLOptions) (string, error) { var credential azblob.StorageAccountCredential if b.opts.Credential != nil { credential = b.opts.Credential } else if isMSIEnvironment := adal.MSIAvailable(ctx, adal.CreateSender()); isMSIEnvironment { var err error credential, err = b.refreshDelegationCredentials(ctx) if err != nil { return "", gcerr.New(gcerr.Internal, err, 1, "azureblob: unable to generate User Delegation Credential") } } else { return "", gcerr.New(gcerr.Unimplemented, nil, 1, "azureblob: to use SignedURL, you must call OpenBucket with a non-nil Options.Credential") } if opts.ContentType != "" || opts.EnforceAbsentContentType { return "", gcerr.New(gcerr.Unimplemented, nil, 1, "azureblob: does not enforce Content-Type on PUT") } key = escapeKey(key, false) blockBlobURL := b.containerURL.NewBlockBlobURL(key) srcBlobParts := azblob.NewBlobURLParts(blockBlobURL.URL()) perms := azblob.BlobSASPermissions{} switch opts.Method { case http.MethodGet: perms.Read = true case http.MethodPut: perms.Create = true perms.Write = true case http.MethodDelete: perms.Delete = true default: return "", fmt.Errorf("unsupported Method %s", opts.Method) } signVals := &azblob.BlobSASSignatureValues{ Protocol: azblob.SASProtocolHTTPS, ExpiryTime: time.Now().UTC().Add(opts.Expiry), ContainerName: b.name, BlobName: srcBlobParts.BlobName, Permissions: perms.String(), } if opts.BeforeSign != nil { asFunc := func(i interface{}) bool { v, ok := i.(**azblob.BlobSASSignatureValues) if ok { *v = signVals } return ok } if err := opts.BeforeSign(asFunc); err != nil { return "", err } } var err error if srcBlobParts.SAS, err = signVals.NewSASQueryParameters(credential); err != nil { return "", err } srcBlobURLWithSAS := srcBlobParts.URL() return srcBlobURLWithSAS.String(), nil } type writer struct { ctx context.Context blockBlobURL *azblob.BlockBlobURL uploadOpts *azblob.UploadStreamToBlockBlobOptions w *io.PipeWriter donec chan struct{} err error } // escapeKey does all required escaping for UTF-8 strings to work with Azure. // isPrefix indicates whether the key is a full key, or a prefix/delimiter. func escapeKey(key string, isPrefix bool) string { return escape.HexEscape(key, func(r []rune, i int) bool { c := r[i] switch { // Azure does not work well with backslashes in blob names. case c == '\\': return true // Azure doesn't handle these characters (determined via experimentation). case c < 32 || c == 127: return true // Escape trailing "/" for full keys, otherwise Azure can't address them // consistently. case !isPrefix && i == len(key)-1 && c == '/': return true // For "../", escape the trailing slash. case i > 1 && r[i] == '/' && r[i-1] == '.' && r[i-2] == '.': return true } return false }) } // unescapeKey reverses escapeKey. func unescapeKey(key string) string { return escape.HexUnescape(key) } // NewTypedWriter implements driver.NewTypedWriter. func (b *bucket) NewTypedWriter(ctx context.Context, key string, contentType string, opts *driver.WriterOptions) (driver.Writer, error) { key = escapeKey(key, false) blockBlobURL := b.containerURL.NewBlockBlobURL(key) if opts.BufferSize == 0 { opts.BufferSize = defaultUploadBlockSize } md := make(map[string]string, len(opts.Metadata)) for k, v := range opts.Metadata { // See the package comments for more details on escaping of metadata // keys & values. e := escape.HexEscape(k, func(runes []rune, i int) bool { c := runes[i] switch { case i == 0 && c >= '0' && c <= '9': return true case escape.IsASCIIAlphanumeric(c): return false case c == '_': return false } return true }) if _, ok := md[e]; ok { return nil, fmt.Errorf("duplicate keys after escaping: %q => %q", k, e) } md[e] = escape.URLEscape(v) } uploadOpts := &azblob.UploadStreamToBlockBlobOptions{ BufferSize: opts.BufferSize, MaxBuffers: defaultUploadBuffers, Metadata: md, BlobHTTPHeaders: azblob.BlobHTTPHeaders{ CacheControl: opts.CacheControl, ContentDisposition: opts.ContentDisposition, ContentEncoding: opts.ContentEncoding, ContentLanguage: opts.ContentLanguage, ContentMD5: opts.ContentMD5, ContentType: contentType, }, } if opts.BeforeWrite != nil { asFunc := func(i interface{}) bool { p, ok := i.(**azblob.UploadStreamToBlockBlobOptions) if !ok { return false } *p = uploadOpts return true } if err := opts.BeforeWrite(asFunc); err != nil { return nil, err } } return &writer{ ctx: ctx, blockBlobURL: &blockBlobURL, uploadOpts: uploadOpts, donec: make(chan struct{}), }, nil } // Write appends p to w. User must call Close to close the w after done writing. func (w *writer) Write(p []byte) (int, error) { if len(p) == 0 { return 0, nil } if w.w == nil { pr, pw := io.Pipe() w.w = pw if err := w.open(pr); err != nil { return 0, err } } return w.w.Write(p) } func (w *writer) open(pr *io.PipeReader) error { go func() { defer close(w.donec) var body io.Reader if pr == nil { body = http.NoBody } else { body = pr } _, w.err = azblob.UploadStreamToBlockBlob(w.ctx, body, *w.blockBlobURL, *w.uploadOpts) if w.err != nil { if pr != nil { pr.CloseWithError(w.err) } return } }() return nil } // Close completes the writer and closes it. Any error occurring during write will // be returned. If a writer is closed before any Write is called, Close will // create an empty file at the given key. func (w *writer) Close() error { if w.w == nil { w.open(nil) } else if err := w.w.Close(); err != nil { return err } <-w.donec return w.err }