123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944 |
- # Copyright 2010 Google Inc.
- # Copyright (c) 2011, Nexenta Systems Inc.
- #
- # Permission is hereby granted, free of charge, to any person obtaining a
- # copy of this software and associated documentation files (the
- # "Software"), to deal in the Software without restriction, including
- # without limitation the rights to use, copy, modify, merge, publish, dis-
- # tribute, sublicense, and/or sell copies of the Software, and to permit
- # persons to whom the Software is furnished to do so, subject to the fol-
- # lowing conditions:
- #
- # The above copyright notice and this permission notice shall be included
- # in all copies or substantial portions of the Software.
- #
- # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
- # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
- # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
- # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
- # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- # IN THE SOFTWARE.
- import boto
- import os
- import sys
- import textwrap
- from boto.s3.deletemarker import DeleteMarker
- from boto.exception import BotoClientError
- from boto.exception import InvalidUriError
- class StorageUri(object):
- """
- Base class for representing storage provider-independent bucket and
- object name with a shorthand URI-like syntax.
- This is an abstract class: the constructor cannot be called (throws an
- exception if you try).
- """
- connection = None
- # Optional args that can be set from one of the concrete subclass
- # constructors, to change connection behavior (e.g., to override
- # https_connection_factory).
- connection_args = None
- # Map of provider scheme ('s3' or 'gs') to AWSAuthConnection object. We
- # maintain a pool here in addition to the connection pool implemented
- # in AWSAuthConnection because the latter re-creates its connection pool
- # every time that class is instantiated (so the current pool is used to
- # avoid re-instantiating AWSAuthConnection).
- provider_pool = {}
- def __init__(self):
- """Uncallable constructor on abstract base StorageUri class.
- """
- raise BotoClientError('Attempt to instantiate abstract StorageUri '
- 'class')
- def __repr__(self):
- """Returns string representation of URI."""
- return self.uri
- def equals(self, uri):
- """Returns true if two URIs are equal."""
- return self.uri == uri.uri
- def check_response(self, resp, level, uri):
- if resp is None:
- raise InvalidUriError('\n'.join(textwrap.wrap(
- 'Attempt to get %s for "%s" failed. This can happen if '
- 'the URI refers to a non-existent object or if you meant to '
- 'operate on a directory (e.g., leaving off -R option on gsutil '
- 'cp, mv, or ls of a bucket)' % (level, uri), 80)))
- def _check_bucket_uri(self, function_name):
- if issubclass(type(self), BucketStorageUri) and not self.bucket_name:
- raise InvalidUriError(
- '%s on bucket-less URI (%s)' % (function_name, self.uri))
- def _check_object_uri(self, function_name):
- if issubclass(type(self), BucketStorageUri) and not self.object_name:
- raise InvalidUriError('%s on object-less URI (%s)' %
- (function_name, self.uri))
- def _warn_about_args(self, function_name, **args):
- for arg in args:
- if args[arg]:
- sys.stderr.write(
- 'Warning: %s ignores argument: %s=%s\n' %
- (function_name, arg, str(args[arg])))
- def connect(self, access_key_id=None, secret_access_key=None, **kwargs):
- """
- Opens a connection to appropriate provider, depending on provider
- portion of URI. Requires Credentials defined in boto config file (see
- boto/pyami/config.py).
- @type storage_uri: StorageUri
- @param storage_uri: StorageUri specifying a bucket or a bucket+object
- @rtype: L{AWSAuthConnection<boto.gs.connection.AWSAuthConnection>}
- @return: A connection to storage service provider of the given URI.
- """
- connection_args = dict(self.connection_args or ())
- if (hasattr(self, 'suppress_consec_slashes') and
- 'suppress_consec_slashes' not in connection_args):
- connection_args['suppress_consec_slashes'] = (
- self.suppress_consec_slashes)
- connection_args.update(kwargs)
- if not self.connection:
- if self.scheme in self.provider_pool:
- self.connection = self.provider_pool[self.scheme]
- elif self.scheme == 's3':
- from boto.s3.connection import S3Connection
- self.connection = S3Connection(access_key_id,
- secret_access_key,
- **connection_args)
- self.provider_pool[self.scheme] = self.connection
- elif self.scheme == 'gs':
- from boto.gs.connection import GSConnection
- # Use OrdinaryCallingFormat instead of boto-default
- # SubdomainCallingFormat because the latter changes the hostname
- # that's checked during cert validation for HTTPS connections,
- # which will fail cert validation (when cert validation is
- # enabled).
- #
- # The same is not true for S3's HTTPS certificates. In fact,
- # we don't want to do this for S3 because S3 requires the
- # subdomain to match the location of the bucket. If the proper
- # subdomain is not used, the server will return a 301 redirect
- # with no Location header.
- #
- # Note: the following import can't be moved up to the
- # start of this file else it causes a config import failure when
- # run from the resumable upload/download tests.
- from boto.s3.connection import OrdinaryCallingFormat
- connection_args['calling_format'] = OrdinaryCallingFormat()
- self.connection = GSConnection(access_key_id,
- secret_access_key,
- **connection_args)
- self.provider_pool[self.scheme] = self.connection
- elif self.scheme == 'file':
- from boto.file.connection import FileConnection
- self.connection = FileConnection(self)
- else:
- raise InvalidUriError('Unrecognized scheme "%s"' %
- self.scheme)
- self.connection.debug = self.debug
- return self.connection
- def has_version(self):
- return (issubclass(type(self), BucketStorageUri)
- and ((self.version_id is not None)
- or (self.generation is not None)))
- def delete_key(self, validate=False, headers=None, version_id=None,
- mfa_token=None):
- self._check_object_uri('delete_key')
- bucket = self.get_bucket(validate, headers)
- return bucket.delete_key(self.object_name, headers, version_id,
- mfa_token)
- def list_bucket(self, prefix='', delimiter='', headers=None,
- all_versions=False):
- self._check_bucket_uri('list_bucket')
- bucket = self.get_bucket(headers=headers)
- if all_versions:
- return (v for v in bucket.list_versions(
- prefix=prefix, delimiter=delimiter, headers=headers)
- if not isinstance(v, DeleteMarker))
- else:
- return bucket.list(prefix=prefix, delimiter=delimiter,
- headers=headers)
- def get_all_keys(self, validate=False, headers=None, prefix=None):
- bucket = self.get_bucket(validate, headers)
- return bucket.get_all_keys(headers)
- def get_bucket(self, validate=False, headers=None):
- self._check_bucket_uri('get_bucket')
- conn = self.connect()
- bucket = conn.get_bucket(self.bucket_name, validate, headers)
- self.check_response(bucket, 'bucket', self.uri)
- return bucket
- def get_key(self, validate=False, headers=None, version_id=None):
- self._check_object_uri('get_key')
- bucket = self.get_bucket(validate, headers)
- key = bucket.get_key(self.object_name, headers, version_id)
- self.check_response(key, 'key', self.uri)
- return key
- def new_key(self, validate=False, headers=None):
- self._check_object_uri('new_key')
- bucket = self.get_bucket(validate, headers)
- return bucket.new_key(self.object_name)
- def get_contents_to_stream(self, fp, headers=None, version_id=None):
- self._check_object_uri('get_key')
- self._warn_about_args('get_key', validate=False)
- key = self.get_key(None, headers)
- self.check_response(key, 'key', self.uri)
- return key.get_contents_to_file(fp, headers, version_id=version_id)
- def get_contents_to_file(self, fp, headers=None, cb=None, num_cb=10,
- torrent=False, version_id=None,
- res_download_handler=None, response_headers=None,
- hash_algs=None):
- self._check_object_uri('get_contents_to_file')
- key = self.get_key(None, headers)
- self.check_response(key, 'key', self.uri)
- if hash_algs:
- key.get_contents_to_file(fp, headers, cb, num_cb, torrent,
- version_id, res_download_handler,
- response_headers,
- hash_algs=hash_algs)
- else:
- key.get_contents_to_file(fp, headers, cb, num_cb, torrent,
- version_id, res_download_handler,
- response_headers)
- def get_contents_as_string(self, validate=False, headers=None, cb=None,
- num_cb=10, torrent=False, version_id=None):
- self._check_object_uri('get_contents_as_string')
- key = self.get_key(validate, headers)
- self.check_response(key, 'key', self.uri)
- return key.get_contents_as_string(headers, cb, num_cb, torrent,
- version_id)
- def acl_class(self):
- conn = self.connect()
- acl_class = conn.provider.acl_class
- self.check_response(acl_class, 'acl_class', self.uri)
- return acl_class
- def canned_acls(self):
- conn = self.connect()
- canned_acls = conn.provider.canned_acls
- self.check_response(canned_acls, 'canned_acls', self.uri)
- return canned_acls
- class BucketStorageUri(StorageUri):
- """
- StorageUri subclass that handles bucket storage providers.
- Callers should instantiate this class by calling boto.storage_uri().
- """
- delim = '/'
- capabilities = set([]) # A set of additional capabilities.
- def __init__(self, scheme, bucket_name=None, object_name=None,
- debug=0, connection_args=None, suppress_consec_slashes=True,
- version_id=None, generation=None, is_latest=False):
- """Instantiate a BucketStorageUri from scheme,bucket,object tuple.
- @type scheme: string
- @param scheme: URI scheme naming the storage provider (gs, s3, etc.)
- @type bucket_name: string
- @param bucket_name: bucket name
- @type object_name: string
- @param object_name: object name, excluding generation/version.
- @type debug: int
- @param debug: debug level to pass in to connection (range 0..2)
- @type connection_args: map
- @param connection_args: optional map containing args to be
- passed to {S3,GS}Connection constructor (e.g., to override
- https_connection_factory).
- @param suppress_consec_slashes: If provided, controls whether
- consecutive slashes will be suppressed in key paths.
- @param version_id: Object version id (S3-specific).
- @param generation: Object generation number (GCS-specific).
- @param is_latest: boolean indicating that a versioned object is the
- current version
- After instantiation the components are available in the following
- fields: scheme, bucket_name, object_name, version_id, generation,
- is_latest, versionless_uri, version_specific_uri, uri.
- Note: If instantiated without version info, the string representation
- for a URI stays versionless; similarly, if instantiated with version
- info, the string representation for a URI stays version-specific. If you
- call one of the uri.set_contents_from_xyz() methods, a specific object
- version will be created, and its version-specific URI string can be
- retrieved from version_specific_uri even if the URI was instantiated
- without version info.
- """
- self.scheme = scheme
- self.bucket_name = bucket_name
- self.object_name = object_name
- self.debug = debug
- if connection_args:
- self.connection_args = connection_args
- self.suppress_consec_slashes = suppress_consec_slashes
- self.version_id = version_id
- self.generation = generation and int(generation)
- self.is_latest = is_latest
- self.is_version_specific = bool(self.generation) or bool(version_id)
- self._build_uri_strings()
- def _build_uri_strings(self):
- if self.bucket_name and self.object_name:
- self.versionless_uri = '%s://%s/%s' % (self.scheme, self.bucket_name,
- self.object_name)
- if self.generation:
- self.version_specific_uri = '%s#%s' % (self.versionless_uri,
- self.generation)
- elif self.version_id:
- self.version_specific_uri = '%s#%s' % (
- self.versionless_uri, self.version_id)
- if self.is_version_specific:
- self.uri = self.version_specific_uri
- else:
- self.uri = self.versionless_uri
- elif self.bucket_name:
- self.uri = ('%s://%s/' % (self.scheme, self.bucket_name))
- else:
- self.uri = ('%s://' % self.scheme)
- def _update_from_key(self, key):
- self._update_from_values(
- getattr(key, 'version_id', None),
- getattr(key, 'generation', None),
- getattr(key, 'is_latest', None),
- getattr(key, 'md5', None))
- def _update_from_values(self, version_id, generation, is_latest, md5):
- self.version_id = version_id
- self.generation = generation
- self.is_latest = is_latest
- self._build_uri_strings()
- self.md5 = md5
- def get_key(self, validate=False, headers=None, version_id=None):
- self._check_object_uri('get_key')
- bucket = self.get_bucket(validate, headers)
- if self.get_provider().name == 'aws':
- key = bucket.get_key(self.object_name, headers,
- version_id=(version_id or self.version_id))
- elif self.get_provider().name == 'google':
- key = bucket.get_key(self.object_name, headers,
- generation=self.generation)
- self.check_response(key, 'key', self.uri)
- return key
- def delete_key(self, validate=False, headers=None, version_id=None,
- mfa_token=None):
- self._check_object_uri('delete_key')
- bucket = self.get_bucket(validate, headers)
- if self.get_provider().name == 'aws':
- version_id = version_id or self.version_id
- return bucket.delete_key(self.object_name, headers, version_id,
- mfa_token)
- elif self.get_provider().name == 'google':
- return bucket.delete_key(self.object_name, headers,
- generation=self.generation)
- def clone_replace_name(self, new_name):
- """Instantiate a BucketStorageUri from the current BucketStorageUri,
- but replacing the object_name.
- @type new_name: string
- @param new_name: new object name
- """
- self._check_bucket_uri('clone_replace_name')
- return BucketStorageUri(
- self.scheme, bucket_name=self.bucket_name, object_name=new_name,
- debug=self.debug,
- suppress_consec_slashes=self.suppress_consec_slashes)
- def clone_replace_key(self, key):
- """Instantiate a BucketStorageUri from the current BucketStorageUri, by
- replacing the object name with the object name and other metadata found
- in the given Key object (including generation).
- @type key: Key
- @param key: key for the new StorageUri to represent
- """
- self._check_bucket_uri('clone_replace_key')
- version_id = None
- generation = None
- is_latest = False
- if hasattr(key, 'version_id'):
- version_id = key.version_id
- if hasattr(key, 'generation'):
- generation = key.generation
- if hasattr(key, 'is_latest'):
- is_latest = key.is_latest
- return BucketStorageUri(
- key.provider.get_provider_name(),
- bucket_name=key.bucket.name,
- object_name=key.name,
- debug=self.debug,
- suppress_consec_slashes=self.suppress_consec_slashes,
- version_id=version_id,
- generation=generation,
- is_latest=is_latest)
- def get_acl(self, validate=False, headers=None, version_id=None):
- """returns a bucket's acl"""
- self._check_bucket_uri('get_acl')
- bucket = self.get_bucket(validate, headers)
- # This works for both bucket- and object- level ACLs (former passes
- # key_name=None):
- key_name = self.object_name or ''
- if self.get_provider().name == 'aws':
- version_id = version_id or self.version_id
- acl = bucket.get_acl(key_name, headers, version_id)
- else:
- acl = bucket.get_acl(key_name, headers, generation=self.generation)
- self.check_response(acl, 'acl', self.uri)
- return acl
- def get_def_acl(self, validate=False, headers=None):
- """returns a bucket's default object acl"""
- self._check_bucket_uri('get_def_acl')
- bucket = self.get_bucket(validate, headers)
- acl = bucket.get_def_acl(headers)
- self.check_response(acl, 'acl', self.uri)
- return acl
- def get_cors(self, validate=False, headers=None):
- """returns a bucket's CORS XML"""
- self._check_bucket_uri('get_cors')
- bucket = self.get_bucket(validate, headers)
- cors = bucket.get_cors(headers)
- self.check_response(cors, 'cors', self.uri)
- return cors
- def set_cors(self, cors, validate=False, headers=None):
- """sets or updates a bucket's CORS XML"""
- self._check_bucket_uri('set_cors ')
- bucket = self.get_bucket(validate, headers)
- if self.scheme == 's3':
- bucket.set_cors(cors, headers)
- else:
- bucket.set_cors(cors.to_xml(), headers)
- def get_location(self, validate=False, headers=None):
- self._check_bucket_uri('get_location')
- bucket = self.get_bucket(validate, headers)
- return bucket.get_location(headers)
- def get_storage_class(self, validate=False, headers=None):
- self._check_bucket_uri('get_storage_class')
- # StorageClass is defined as a bucket and object param for GCS, but
- # only as a key param for S3.
- if self.scheme != 'gs':
- raise ValueError('get_storage_class() not supported for %s '
- 'URIs.' % self.scheme)
- bucket = self.get_bucket(validate, headers)
- return bucket.get_storage_class(headers)
- def set_storage_class(self, storage_class, validate=False, headers=None):
- """Updates a bucket's storage class."""
- self._check_bucket_uri('set_storage_class')
- # StorageClass is defined as a bucket and object param for GCS, but
- # only as a key param for S3.
- if self.scheme != 'gs':
- raise ValueError('set_storage_class() not supported for %s '
- 'URIs.' % self.scheme)
- bucket = self.get_bucket(validate, headers)
- bucket.set_storage_class(storage_class, headers)
- def get_subresource(self, subresource, validate=False, headers=None,
- version_id=None):
- self._check_bucket_uri('get_subresource')
- bucket = self.get_bucket(validate, headers)
- return bucket.get_subresource(subresource, self.object_name, headers,
- version_id)
- def add_group_email_grant(self, permission, email_address, recursive=False,
- validate=False, headers=None):
- self._check_bucket_uri('add_group_email_grant')
- if self.scheme != 'gs':
- raise ValueError('add_group_email_grant() not supported for %s '
- 'URIs.' % self.scheme)
- if self.object_name:
- if recursive:
- raise ValueError('add_group_email_grant() on key-ful URI cannot '
- 'specify recursive=True')
- key = self.get_key(validate, headers)
- self.check_response(key, 'key', self.uri)
- key.add_group_email_grant(permission, email_address, headers)
- elif self.bucket_name:
- bucket = self.get_bucket(validate, headers)
- bucket.add_group_email_grant(permission, email_address, recursive,
- headers)
- else:
- raise InvalidUriError('add_group_email_grant() on bucket-less URI '
- '%s' % self.uri)
- def add_email_grant(self, permission, email_address, recursive=False,
- validate=False, headers=None):
- self._check_bucket_uri('add_email_grant')
- if not self.object_name:
- bucket = self.get_bucket(validate, headers)
- bucket.add_email_grant(permission, email_address, recursive,
- headers)
- else:
- key = self.get_key(validate, headers)
- self.check_response(key, 'key', self.uri)
- key.add_email_grant(permission, email_address)
- def add_user_grant(self, permission, user_id, recursive=False,
- validate=False, headers=None):
- self._check_bucket_uri('add_user_grant')
- if not self.object_name:
- bucket = self.get_bucket(validate, headers)
- bucket.add_user_grant(permission, user_id, recursive, headers)
- else:
- key = self.get_key(validate, headers)
- self.check_response(key, 'key', self.uri)
- key.add_user_grant(permission, user_id)
- def list_grants(self, headers=None):
- self._check_bucket_uri('list_grants ')
- bucket = self.get_bucket(headers)
- return bucket.list_grants(headers)
- def is_file_uri(self):
- """Returns True if this URI names a file or directory."""
- return False
- def is_cloud_uri(self):
- """Returns True if this URI names a bucket or object."""
- return True
- def names_container(self):
- """
- Returns True if this URI names a directory or bucket. Will return
- False for bucket subdirs; providing bucket subdir semantics needs to
- be done by the caller (like gsutil does).
- """
- return bool(not self.object_name)
- def names_singleton(self):
- """Returns True if this URI names a file or object."""
- return bool(self.object_name)
- def names_directory(self):
- """Returns True if this URI names a directory."""
- return False
- def names_provider(self):
- """Returns True if this URI names a provider."""
- return bool(not self.bucket_name)
- def names_bucket(self):
- """Returns True if this URI names a bucket."""
- return bool(self.bucket_name) and bool(not self.object_name)
- def names_file(self):
- """Returns True if this URI names a file."""
- return False
- def names_object(self):
- """Returns True if this URI names an object."""
- return self.names_singleton()
- def is_stream(self):
- """Returns True if this URI represents input/output stream."""
- return False
- def create_bucket(self, headers=None, location='', policy=None,
- storage_class=None):
- self._check_bucket_uri('create_bucket ')
- conn = self.connect()
- # Pass storage_class param only if this is a GCS bucket. (In S3 the
- # storage class is specified on the key object.)
- if self.scheme == 'gs':
- return conn.create_bucket(self.bucket_name, headers, location, policy,
- storage_class)
- else:
- return conn.create_bucket(self.bucket_name, headers, location, policy)
- def delete_bucket(self, headers=None):
- self._check_bucket_uri('delete_bucket')
- conn = self.connect()
- return conn.delete_bucket(self.bucket_name, headers)
- def get_all_buckets(self, headers=None):
- conn = self.connect()
- return conn.get_all_buckets(headers)
- def get_provider(self):
- conn = self.connect()
- provider = conn.provider
- self.check_response(provider, 'provider', self.uri)
- return provider
- def set_acl(self, acl_or_str, key_name='', validate=False, headers=None,
- version_id=None, if_generation=None, if_metageneration=None):
- """Sets or updates a bucket's ACL."""
- self._check_bucket_uri('set_acl')
- key_name = key_name or self.object_name or ''
- bucket = self.get_bucket(validate, headers)
- if self.generation:
- bucket.set_acl(
- acl_or_str, key_name, headers, generation=self.generation,
- if_generation=if_generation, if_metageneration=if_metageneration)
- else:
- version_id = version_id or self.version_id
- bucket.set_acl(acl_or_str, key_name, headers, version_id)
- def set_xml_acl(self, xmlstring, key_name='', validate=False, headers=None,
- version_id=None, if_generation=None, if_metageneration=None):
- """Sets or updates a bucket's ACL with an XML string."""
- self._check_bucket_uri('set_xml_acl')
- key_name = key_name or self.object_name or ''
- bucket = self.get_bucket(validate, headers)
- if self.generation:
- bucket.set_xml_acl(
- xmlstring, key_name, headers, generation=self.generation,
- if_generation=if_generation, if_metageneration=if_metageneration)
- else:
- version_id = version_id or self.version_id
- bucket.set_xml_acl(xmlstring, key_name, headers,
- version_id=version_id)
- def set_def_xml_acl(self, xmlstring, validate=False, headers=None):
- """Sets or updates a bucket's default object ACL with an XML string."""
- self._check_bucket_uri('set_def_xml_acl')
- self.get_bucket(validate, headers).set_def_xml_acl(xmlstring, headers)
- def set_def_acl(self, acl_or_str, validate=False, headers=None,
- version_id=None):
- """Sets or updates a bucket's default object ACL."""
- self._check_bucket_uri('set_def_acl')
- self.get_bucket(validate, headers).set_def_acl(acl_or_str, headers)
- def set_canned_acl(self, acl_str, validate=False, headers=None,
- version_id=None):
- """Sets or updates a bucket's acl to a predefined (canned) value."""
- self._check_object_uri('set_canned_acl')
- self._warn_about_args('set_canned_acl', version_id=version_id)
- key = self.get_key(validate, headers)
- self.check_response(key, 'key', self.uri)
- key.set_canned_acl(acl_str, headers)
- def set_def_canned_acl(self, acl_str, validate=False, headers=None,
- version_id=None):
- """Sets or updates a bucket's default object acl to a predefined
- (canned) value."""
- self._check_bucket_uri('set_def_canned_acl ')
- key = self.get_key(validate, headers)
- self.check_response(key, 'key', self.uri)
- key.set_def_canned_acl(acl_str, headers, version_id)
- def set_subresource(self, subresource, value, validate=False, headers=None,
- version_id=None):
- self._check_bucket_uri('set_subresource')
- bucket = self.get_bucket(validate, headers)
- bucket.set_subresource(subresource, value, self.object_name, headers,
- version_id)
- def set_contents_from_string(self, s, headers=None, replace=True,
- cb=None, num_cb=10, policy=None, md5=None,
- reduced_redundancy=False):
- self._check_object_uri('set_contents_from_string')
- key = self.new_key(headers=headers)
- if self.scheme == 'gs':
- if reduced_redundancy:
- sys.stderr.write('Warning: GCS does not support '
- 'reduced_redundancy; argument ignored by '
- 'set_contents_from_string')
- result = key.set_contents_from_string(
- s, headers, replace, cb, num_cb, policy, md5)
- else:
- result = key.set_contents_from_string(
- s, headers, replace, cb, num_cb, policy, md5,
- reduced_redundancy)
- self._update_from_key(key)
- return result
- def set_contents_from_file(self, fp, headers=None, replace=True, cb=None,
- num_cb=10, policy=None, md5=None, size=None,
- rewind=False, res_upload_handler=None):
- self._check_object_uri('set_contents_from_file')
- key = self.new_key(headers=headers)
- if self.scheme == 'gs':
- result = key.set_contents_from_file(
- fp, headers, replace, cb, num_cb, policy, md5, size=size,
- rewind=rewind, res_upload_handler=res_upload_handler)
- if res_upload_handler:
- self._update_from_values(None, res_upload_handler.generation,
- None, md5)
- else:
- self._warn_about_args('set_contents_from_file',
- res_upload_handler=res_upload_handler)
- result = key.set_contents_from_file(
- fp, headers, replace, cb, num_cb, policy, md5, size=size,
- rewind=rewind)
- self._update_from_key(key)
- return result
- def set_contents_from_stream(self, fp, headers=None, replace=True, cb=None,
- policy=None, reduced_redundancy=False):
- self._check_object_uri('set_contents_from_stream')
- dst_key = self.new_key(False, headers)
- result = dst_key.set_contents_from_stream(
- fp, headers, replace, cb, policy=policy,
- reduced_redundancy=reduced_redundancy)
- self._update_from_key(dst_key)
- return result
- def copy_key(self, src_bucket_name, src_key_name, metadata=None,
- src_version_id=None, storage_class='STANDARD',
- preserve_acl=False, encrypt_key=False, headers=None,
- query_args=None, src_generation=None):
- """Returns newly created key."""
- self._check_object_uri('copy_key')
- dst_bucket = self.get_bucket(validate=False, headers=headers)
- if src_generation:
- return dst_bucket.copy_key(
- new_key_name=self.object_name,
- src_bucket_name=src_bucket_name,
- src_key_name=src_key_name, metadata=metadata,
- storage_class=storage_class, preserve_acl=preserve_acl,
- encrypt_key=encrypt_key, headers=headers, query_args=query_args,
- src_generation=src_generation)
- else:
- return dst_bucket.copy_key(
- new_key_name=self.object_name,
- src_bucket_name=src_bucket_name, src_key_name=src_key_name,
- metadata=metadata, src_version_id=src_version_id,
- storage_class=storage_class, preserve_acl=preserve_acl,
- encrypt_key=encrypt_key, headers=headers, query_args=query_args)
- def enable_logging(self, target_bucket, target_prefix=None, validate=False,
- headers=None, version_id=None):
- self._check_bucket_uri('enable_logging')
- bucket = self.get_bucket(validate, headers)
- bucket.enable_logging(target_bucket, target_prefix, headers=headers)
- def disable_logging(self, validate=False, headers=None, version_id=None):
- self._check_bucket_uri('disable_logging')
- bucket = self.get_bucket(validate, headers)
- bucket.disable_logging(headers=headers)
- def get_logging_config(self, validate=False, headers=None, version_id=None):
- self._check_bucket_uri('get_logging_config')
- bucket = self.get_bucket(validate, headers)
- return bucket.get_logging_config(headers=headers)
- def set_website_config(self, main_page_suffix=None, error_key=None,
- validate=False, headers=None):
- self._check_bucket_uri('set_website_config')
- bucket = self.get_bucket(validate, headers)
- if not (main_page_suffix or error_key):
- bucket.delete_website_configuration(headers)
- else:
- bucket.configure_website(main_page_suffix, error_key, headers)
- def get_website_config(self, validate=False, headers=None):
- self._check_bucket_uri('get_website_config')
- bucket = self.get_bucket(validate, headers)
- return bucket.get_website_configuration(headers)
- def get_versioning_config(self, headers=None):
- self._check_bucket_uri('get_versioning_config')
- bucket = self.get_bucket(False, headers)
- return bucket.get_versioning_status(headers)
- def configure_versioning(self, enabled, headers=None):
- self._check_bucket_uri('configure_versioning')
- bucket = self.get_bucket(False, headers)
- return bucket.configure_versioning(enabled, headers)
- def set_metadata(self, metadata_plus, metadata_minus, preserve_acl,
- headers=None):
- return self.get_key(False).set_remote_metadata(metadata_plus,
- metadata_minus,
- preserve_acl,
- headers=headers)
- def compose(self, components, content_type=None, headers=None):
- self._check_object_uri('compose')
- component_keys = []
- for suri in components:
- component_keys.append(suri.new_key())
- component_keys[-1].generation = suri.generation
- self.generation = self.new_key().compose(
- component_keys, content_type=content_type, headers=headers)
- self._build_uri_strings()
- return self
- def get_lifecycle_config(self, validate=False, headers=None):
- """Returns a bucket's lifecycle configuration."""
- self._check_bucket_uri('get_lifecycle_config')
- bucket = self.get_bucket(validate, headers)
- lifecycle_config = bucket.get_lifecycle_config(headers)
- self.check_response(lifecycle_config, 'lifecycle', self.uri)
- return lifecycle_config
- def configure_lifecycle(self, lifecycle_config, validate=False,
- headers=None):
- """Sets or updates a bucket's lifecycle configuration."""
- self._check_bucket_uri('configure_lifecycle')
- bucket = self.get_bucket(validate, headers)
- bucket.configure_lifecycle(lifecycle_config, headers)
- def get_billing_config(self, headers=None):
- self._check_bucket_uri('get_billing_config')
- # billing is defined as a bucket param for GCS, but not for S3.
- if self.scheme != 'gs':
- raise ValueError('get_billing_config() not supported for %s '
- 'URIs.' % self.scheme)
- bucket = self.get_bucket(False, headers)
- return bucket.get_billing_config(headers)
- def configure_billing(self, requester_pays=False, validate=False,
- headers=None):
- """Sets or updates a bucket's billing configuration."""
- self._check_bucket_uri('configure_billing')
- # billing is defined as a bucket param for GCS, but not for S3.
- if self.scheme != 'gs':
- raise ValueError('configure_billing() not supported for %s '
- 'URIs.' % self.scheme)
- bucket = self.get_bucket(validate, headers)
- bucket.configure_billing(requester_pays=requester_pays, headers=headers)
- def get_encryption_config(self, validate=False, headers=None):
- """Returns a GCS bucket's encryption configuration."""
- self._check_bucket_uri('get_encryption_config')
- # EncryptionConfiguration is defined as a bucket param for GCS, but not
- # for S3.
- if self.scheme != 'gs':
- raise ValueError('get_encryption_config() not supported for %s '
- 'URIs.' % self.scheme)
- bucket = self.get_bucket(validate, headers)
- return bucket.get_encryption_config(headers=headers)
- def set_encryption_config(self, default_kms_key_name=None, validate=False,
- headers=None):
- """Sets a GCS bucket's encryption configuration."""
- self._check_bucket_uri('set_encryption_config')
- bucket = self.get_bucket(validate, headers)
- bucket.set_encryption_config(default_kms_key_name=default_kms_key_name,
- headers=headers)
- def exists(self, headers=None):
- """Returns True if the object exists or False if it doesn't"""
- if not self.object_name:
- raise InvalidUriError('exists on object-less URI (%s)' % self.uri)
- bucket = self.get_bucket(headers)
- key = bucket.get_key(self.object_name, headers=headers)
- return bool(key)
- class FileStorageUri(StorageUri):
- """
- StorageUri subclass that handles files in the local file system.
- Callers should instantiate this class by calling boto.storage_uri().
- See file/README about how we map StorageUri operations onto a file system.
- """
- delim = os.sep
- def __init__(self, object_name, debug, is_stream=False):
- """Instantiate a FileStorageUri from a path name.
- @type object_name: string
- @param object_name: object name
- @type debug: boolean
- @param debug: whether to enable debugging on this StorageUri
- After instantiation the components are available in the following
- fields: uri, scheme, bucket_name (always blank for this "anonymous"
- bucket), object_name.
- """
- self.scheme = 'file'
- self.bucket_name = ''
- self.object_name = object_name
- self.uri = 'file://' + object_name
- self.debug = debug
- self.stream = is_stream
- def clone_replace_name(self, new_name):
- """Instantiate a FileStorageUri from the current FileStorageUri,
- but replacing the object_name.
- @type new_name: string
- @param new_name: new object name
- """
- return FileStorageUri(new_name, self.debug, self.stream)
- def is_file_uri(self):
- """Returns True if this URI names a file or directory."""
- return True
- def is_cloud_uri(self):
- """Returns True if this URI names a bucket or object."""
- return False
- def names_container(self):
- """Returns True if this URI names a directory or bucket."""
- return self.names_directory()
- def names_singleton(self):
- """Returns True if this URI names a file (or stream) or object."""
- return not self.names_container()
- def names_directory(self):
- """Returns True if this URI names a directory."""
- if self.stream:
- return False
- return os.path.isdir(self.object_name)
- def names_provider(self):
- """Returns True if this URI names a provider."""
- return False
- def names_bucket(self):
- """Returns True if this URI names a bucket."""
- return False
- def names_file(self):
- """Returns True if this URI names a file."""
- return self.names_singleton()
- def names_object(self):
- """Returns True if this URI names an object."""
- return False
- def is_stream(self):
- """Returns True if this URI represents input/output stream.
- """
- return bool(self.stream)
- def close(self):
- """Closes the underlying file.
- """
- self.get_key().close()
- def exists(self, _headers_not_used=None):
- """Returns True if the file exists or False if it doesn't"""
- # The _headers_not_used parameter is ignored. It is only there to ensure
- # that this method's signature is identical to the exists method on the
- # BucketStorageUri class.
- return os.path.exists(self.object_name)
|