123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425 |
- # Copyright (c) 2012 Amazon.com, Inc. or its affiliates. All Rights Reserved
- #
- # Permission is hereby granted, free of charge, to any person obtaining a
- # copy of this software and associated documentation files (the
- # "Software"), to deal in the Software without restriction, including
- # without limitation the rights to use, copy, modify, merge, publish, dis-
- # tribute, sublicense, and/or sell copies of the Software, and to permit
- # persons to whom the Software is furnished to do so, subject to the fol-
- # lowing conditions:
- #
- # The above copyright notice and this permission notice shall be included
- # in all copies or substantial portions of the Software.
- #
- # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
- # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
- # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
- # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
- # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- # IN THE SOFTWARE.
- #
- import os
- import math
- import threading
- import hashlib
- import time
- import logging
- from boto.compat import Queue
- import binascii
- from boto.glacier.utils import DEFAULT_PART_SIZE, minimum_part_size, \
- chunk_hashes, tree_hash, bytes_to_hex
- from boto.glacier.exceptions import UploadArchiveError, \
- DownloadArchiveError, \
- TreeHashDoesNotMatchError
- _END_SENTINEL = object()
- log = logging.getLogger('boto.glacier.concurrent')
- class ConcurrentTransferer(object):
- def __init__(self, part_size=DEFAULT_PART_SIZE, num_threads=10):
- self._part_size = part_size
- self._num_threads = num_threads
- self._threads = []
- def _calculate_required_part_size(self, total_size):
- min_part_size_required = minimum_part_size(total_size)
- if self._part_size >= min_part_size_required:
- part_size = self._part_size
- else:
- part_size = min_part_size_required
- log.debug("The part size specified (%s) is smaller than "
- "the minimum required part size. Using a part "
- "size of: %s", self._part_size, part_size)
- total_parts = int(math.ceil(total_size / float(part_size)))
- return total_parts, part_size
- def _shutdown_threads(self):
- log.debug("Shutting down threads.")
- for thread in self._threads:
- thread.should_continue = False
- for thread in self._threads:
- thread.join()
- log.debug("Threads have exited.")
- def _add_work_items_to_queue(self, total_parts, worker_queue, part_size):
- log.debug("Adding work items to queue.")
- for i in range(total_parts):
- worker_queue.put((i, part_size))
- for i in range(self._num_threads):
- worker_queue.put(_END_SENTINEL)
- class ConcurrentUploader(ConcurrentTransferer):
- """Concurrently upload an archive to glacier.
- This class uses a thread pool to concurrently upload an archive
- to glacier using the multipart upload API.
- The threadpool is completely managed by this class and is
- transparent to the users of this class.
- """
- def __init__(self, api, vault_name, part_size=DEFAULT_PART_SIZE,
- num_threads=10):
- """
- :type api: :class:`boto.glacier.layer1.Layer1`
- :param api: A layer1 glacier object.
- :type vault_name: str
- :param vault_name: The name of the vault.
- :type part_size: int
- :param part_size: The size, in bytes, of the chunks to use when uploading
- the archive parts. The part size must be a megabyte multiplied by
- a power of two.
- :type num_threads: int
- :param num_threads: The number of threads to spawn for the thread pool.
- The number of threads will control how much parts are being
- concurrently uploaded.
- """
- super(ConcurrentUploader, self).__init__(part_size, num_threads)
- self._api = api
- self._vault_name = vault_name
- def upload(self, filename, description=None):
- """Concurrently create an archive.
- The part_size value specified when the class was constructed
- will be used *unless* it is smaller than the minimum required
- part size needed for the size of the given file. In that case,
- the part size used will be the minimum part size required
- to properly upload the given file.
- :type file: str
- :param file: The filename to upload
- :type description: str
- :param description: The description of the archive.
- :rtype: str
- :return: The archive id of the newly created archive.
- """
- total_size = os.stat(filename).st_size
- total_parts, part_size = self._calculate_required_part_size(total_size)
- hash_chunks = [None] * total_parts
- worker_queue = Queue()
- result_queue = Queue()
- response = self._api.initiate_multipart_upload(self._vault_name,
- part_size,
- description)
- upload_id = response['UploadId']
- # The basic idea is to add the chunks (the offsets not the actual
- # contents) to a work queue, start up a thread pool, let the crank
- # through the items in the work queue, and then place their results
- # in a result queue which we use to complete the multipart upload.
- self._add_work_items_to_queue(total_parts, worker_queue, part_size)
- self._start_upload_threads(result_queue, upload_id,
- worker_queue, filename)
- try:
- self._wait_for_upload_threads(hash_chunks, result_queue,
- total_parts)
- except UploadArchiveError as e:
- log.debug("An error occurred while uploading an archive, "
- "aborting multipart upload.")
- self._api.abort_multipart_upload(self._vault_name, upload_id)
- raise e
- log.debug("Completing upload.")
- response = self._api.complete_multipart_upload(
- self._vault_name, upload_id, bytes_to_hex(tree_hash(hash_chunks)),
- total_size)
- log.debug("Upload finished.")
- return response['ArchiveId']
- def _wait_for_upload_threads(self, hash_chunks, result_queue, total_parts):
- for _ in range(total_parts):
- result = result_queue.get()
- if isinstance(result, Exception):
- log.debug("An error was found in the result queue, terminating "
- "threads: %s", result)
- self._shutdown_threads()
- raise UploadArchiveError("An error occurred while uploading "
- "an archive: %s" % result)
- # Each unit of work returns the tree hash for the given part
- # number, which we use at the end to compute the tree hash of
- # the entire archive.
- part_number, tree_sha256 = result
- hash_chunks[part_number] = tree_sha256
- self._shutdown_threads()
- def _start_upload_threads(self, result_queue, upload_id, worker_queue,
- filename):
- log.debug("Starting threads.")
- for _ in range(self._num_threads):
- thread = UploadWorkerThread(self._api, self._vault_name, filename,
- upload_id, worker_queue, result_queue)
- time.sleep(0.2)
- thread.start()
- self._threads.append(thread)
- class TransferThread(threading.Thread):
- def __init__(self, worker_queue, result_queue):
- super(TransferThread, self).__init__()
- self._worker_queue = worker_queue
- self._result_queue = result_queue
- # This value can be set externally by other objects
- # to indicate that the thread should be shut down.
- self.should_continue = True
- def run(self):
- while self.should_continue:
- try:
- work = self._worker_queue.get(timeout=1)
- except Empty:
- continue
- if work is _END_SENTINEL:
- self._cleanup()
- return
- result = self._process_chunk(work)
- self._result_queue.put(result)
- self._cleanup()
- def _process_chunk(self, work):
- pass
- def _cleanup(self):
- pass
- class UploadWorkerThread(TransferThread):
- def __init__(self, api, vault_name, filename, upload_id,
- worker_queue, result_queue, num_retries=5,
- time_between_retries=5,
- retry_exceptions=Exception):
- super(UploadWorkerThread, self).__init__(worker_queue, result_queue)
- self._api = api
- self._vault_name = vault_name
- self._filename = filename
- self._fileobj = open(filename, 'rb')
- self._upload_id = upload_id
- self._num_retries = num_retries
- self._time_between_retries = time_between_retries
- self._retry_exceptions = retry_exceptions
- def _process_chunk(self, work):
- result = None
- for i in range(self._num_retries + 1):
- try:
- result = self._upload_chunk(work)
- break
- except self._retry_exceptions as e:
- log.error("Exception caught uploading part number %s for "
- "vault %s, attempt: (%s / %s), filename: %s, "
- "exception: %s, msg: %s",
- work[0], self._vault_name, i + 1, self._num_retries + 1,
- self._filename, e.__class__, e)
- time.sleep(self._time_between_retries)
- result = e
- return result
- def _upload_chunk(self, work):
- part_number, part_size = work
- start_byte = part_number * part_size
- self._fileobj.seek(start_byte)
- contents = self._fileobj.read(part_size)
- linear_hash = hashlib.sha256(contents).hexdigest()
- tree_hash_bytes = tree_hash(chunk_hashes(contents))
- byte_range = (start_byte, start_byte + len(contents) - 1)
- log.debug("Uploading chunk %s of size %s", part_number, part_size)
- response = self._api.upload_part(self._vault_name, self._upload_id,
- linear_hash,
- bytes_to_hex(tree_hash_bytes),
- byte_range, contents)
- # Reading the response allows the connection to be reused.
- response.read()
- return (part_number, tree_hash_bytes)
- def _cleanup(self):
- self._fileobj.close()
- class ConcurrentDownloader(ConcurrentTransferer):
- """
- Concurrently download an archive from glacier.
- This class uses a thread pool to concurrently download an archive
- from glacier.
- The threadpool is completely managed by this class and is
- transparent to the users of this class.
- """
- def __init__(self, job, part_size=DEFAULT_PART_SIZE,
- num_threads=10):
- """
- :param job: A layer2 job object for archive retrieval object.
- :param part_size: The size, in bytes, of the chunks to use when uploading
- the archive parts. The part size must be a megabyte multiplied by
- a power of two.
- """
- super(ConcurrentDownloader, self).__init__(part_size, num_threads)
- self._job = job
- def download(self, filename):
- """
- Concurrently download an archive.
- :param filename: The filename to download the archive to
- :type filename: str
- """
- total_size = self._job.archive_size
- total_parts, part_size = self._calculate_required_part_size(total_size)
- worker_queue = Queue()
- result_queue = Queue()
- self._add_work_items_to_queue(total_parts, worker_queue, part_size)
- self._start_download_threads(result_queue, worker_queue)
- try:
- self._wait_for_download_threads(filename, result_queue, total_parts)
- except DownloadArchiveError as e:
- log.debug("An error occurred while downloading an archive: %s", e)
- raise e
- log.debug("Download completed.")
- def _wait_for_download_threads(self, filename, result_queue, total_parts):
- """
- Waits until the result_queue is filled with all the downloaded parts
- This indicates that all part downloads have completed
- Saves downloaded parts into filename
- :param filename:
- :param result_queue:
- :param total_parts:
- """
- hash_chunks = [None] * total_parts
- with open(filename, "wb") as f:
- for _ in range(total_parts):
- result = result_queue.get()
- if isinstance(result, Exception):
- log.debug("An error was found in the result queue, "
- "terminating threads: %s", result)
- self._shutdown_threads()
- raise DownloadArchiveError(
- "An error occurred while uploading "
- "an archive: %s" % result)
- part_number, part_size, actual_hash, data = result
- hash_chunks[part_number] = actual_hash
- start_byte = part_number * part_size
- f.seek(start_byte)
- f.write(data)
- f.flush()
- final_hash = bytes_to_hex(tree_hash(hash_chunks))
- log.debug("Verifying final tree hash of archive, expecting: %s, "
- "actual: %s", self._job.sha256_treehash, final_hash)
- if self._job.sha256_treehash != final_hash:
- self._shutdown_threads()
- raise TreeHashDoesNotMatchError(
- "Tree hash for entire archive does not match, "
- "expected: %s, got: %s" % (self._job.sha256_treehash,
- final_hash))
- self._shutdown_threads()
- def _start_download_threads(self, result_queue, worker_queue):
- log.debug("Starting threads.")
- for _ in range(self._num_threads):
- thread = DownloadWorkerThread(self._job, worker_queue, result_queue)
- time.sleep(0.2)
- thread.start()
- self._threads.append(thread)
- class DownloadWorkerThread(TransferThread):
- def __init__(self, job,
- worker_queue, result_queue,
- num_retries=5,
- time_between_retries=5,
- retry_exceptions=Exception):
- """
- Individual download thread that will download parts of the file from Glacier. Parts
- to download stored in work queue.
- Parts download to a temp dir with each part a separate file
- :param job: Glacier job object
- :param work_queue: A queue of tuples which include the part_number and
- part_size
- :param result_queue: A priority queue of tuples which include the
- part_number and the path to the temp file that holds that
- part's data.
- """
- super(DownloadWorkerThread, self).__init__(worker_queue, result_queue)
- self._job = job
- self._num_retries = num_retries
- self._time_between_retries = time_between_retries
- self._retry_exceptions = retry_exceptions
- def _process_chunk(self, work):
- """
- Attempt to download a part of the archive from Glacier
- Store the result in the result_queue
- :param work:
- """
- result = None
- for _ in range(self._num_retries):
- try:
- result = self._download_chunk(work)
- break
- except self._retry_exceptions as e:
- log.error("Exception caught downloading part number %s for "
- "job %s", work[0], self._job,)
- time.sleep(self._time_between_retries)
- result = e
- return result
- def _download_chunk(self, work):
- """
- Downloads a chunk of archive from Glacier. Saves the data to a temp file
- Returns the part number and temp file location
- :param work:
- """
- part_number, part_size = work
- start_byte = part_number * part_size
- byte_range = (start_byte, start_byte + part_size - 1)
- log.debug("Downloading chunk %s of size %s", part_number, part_size)
- response = self._job.get_output(byte_range)
- data = response.read()
- actual_hash = bytes_to_hex(tree_hash(chunk_hashes(data)))
- if response['TreeHash'] != actual_hash:
- raise TreeHashDoesNotMatchError(
- "Tree hash for part number %s does not match, "
- "expected: %s, got: %s" % (part_number, response['TreeHash'],
- actual_hash))
- return (part_number, part_size, binascii.unhexlify(actual_hash), data)
|