File manager - Edit - /opt/gsutil/gslib/thread_message.py
Back
# -*- coding: utf-8 -*- # Copyright 2016 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Thread message classes. Messages are added to the status queue. """ from __future__ import absolute_import from __future__ import print_function from __future__ import division from __future__ import unicode_literals import os import threading from apitools.base.py.exceptions import Error as apitools_service_error from six.moves.http_client import error as six_service_error class StatusMessage(object): """General StatusMessage class. All Message classes inherit this StatusMessage class. """ def __init__(self, message_time, process_id=None, thread_id=None): """Creates a Message. Args: message_time: Time that this message was created (since Epoch). process_id: Process ID that produced this message (overridable for testing). thread_id: Thread ID that produced this message (overridable for testing). """ self.time = message_time self.process_id = process_id or os.getpid() self.thread_id = thread_id or threading.current_thread().ident def __str__(self): """Returns a string with a valid constructor for this message.""" return ( '%s(%s, process_id=%s, thread_id=%s)' % (self.__class__.__name__, self.time, self.process_id, self.thread_id)) class RetryableErrorMessage(StatusMessage): """Message class for retryable errors encountered by the JSON API. This class contains information about the retryable error encountered to report to analytics collection and to display in the UI. """ def __init__(self, exception, message_time, num_retries=0, total_wait_sec=0, process_id=None, thread_id=None): """Creates a RetryableErrorMessage. Args: exception: The retryable error that was thrown. message_time: Float representing when message was created (seconds since Epoch). num_retries: The number of retries consumed so far. total_wait_sec: The total amount of time waited so far in retrying. process_id: Process ID that produced this message (overridable for testing). thread_id: Thread ID that produced this message (overridable for testing). """ super(RetryableErrorMessage, self).__init__(message_time, process_id=process_id, thread_id=thread_id) self.error_type = exception.__class__.__name__ # The socket module error class names aren't descriptive enough, so we # make the error_type more specific. Standard Python uses the module name # 'socket' while PyPy uses '_socket' instead. if exception.__class__.__module__ in ('socket', '_socket'): self.error_type = 'Socket' + exception.__class__.__name__.capitalize() if (isinstance(exception, apitools_service_error) or isinstance(exception, six_service_error)): self.is_service_error = True else: self.is_service_error = False # The number of retries consumed to display to the user. self.num_retries = num_retries # The total amount of time waited on the request to display to the user. self.total_wait_sec = total_wait_sec def __str__(self): """Returns a string with a valid constructor for this message.""" return ('%s(%s(), num_retries=%s, total_wait_sec=%s, ' 'time=%s, process_id=%s, thread_id=%s)' % (self.__class__.__name__, self.error_type, self.num_retries, self.total_wait_sec, self.time, self.process_id, self.thread_id)) class FinalMessage(StatusMessage): """Creates a FinalMessage. A FinalMessage simply indicates that we have finished our operation. """ def __init__(self, message_time): """Creates a FinalMessage. Args: message_time: Float representing when message was created (seconds since Epoch). """ super(FinalMessage, self).__init__(message_time) class MetadataMessage(StatusMessage): """Creates a MetadataMessage. A MetadataMessage simply indicates that a metadata operation on a given object has been successfully done. The only passed argument is the time when such operation has finished. """ def __init__(self, message_time): """Creates a MetadataMessage. Args: message_time: Float representing when message was created (seconds since Epoch). """ super(MetadataMessage, self).__init__(message_time) class FileMessage(StatusMessage): """Marks start or end of an operation for a file, cloud object or component. This class contains general information about each file/component. With that, information such as total size and estimated time remaining may be calculated beforehand. """ # Enum message types FILE_DOWNLOAD = 1 FILE_UPLOAD = 2 FILE_CLOUD_COPY = 3 FILE_LOCAL_COPY = 4 FILE_DAISY_COPY = 5 FILE_REWRITE = 6 FILE_HASH = 7 COMPONENT_TO_UPLOAD = 8 # EXISTING_COMPONENT describes a component that already exists. The field # finished does not apply quite well for it, but the convention used by the UI # is to process the component alongside FileMessages from other components # when finished==False, and to ignore a FileMessage made for # EXISTING_COMPONENT when finished==True. EXISTING_COMPONENT = 9 COMPONENT_TO_DOWNLOAD = 10 EXISTING_OBJECT_TO_DELETE = 11 def __init__(self, src_url, dst_url, message_time, size=None, finished=False, component_num=None, message_type=None, bytes_already_downloaded=None, process_id=None, thread_id=None): """Creates a FileMessage. Args: src_url: FileUrl/CloudUrl representing the source file. dst_url: FileUrl/CloudUrl representing the destination file. message_time: Float representing when message was created (seconds since Epoch). size: Total size of this file/component, in bytes. finished: Boolean to indicate whether this is starting or finishing a file/component transfer. component_num: Component number, if dealing with a component. message_type: Type of the file/component. bytes_already_downloaded: Specific field for resuming downloads. When starting a component download, it tells how many bytes were already downloaded. process_id: Process ID that produced this message (overridable for testing). thread_id: Thread ID that produced this message (overridable for testing). """ super(FileMessage, self).__init__(message_time, process_id=process_id, thread_id=thread_id) self.src_url = src_url self.dst_url = dst_url self.size = size self.component_num = component_num self.finished = finished self.message_type = message_type self.bytes_already_downloaded = bytes_already_downloaded def __str__(self): """Returns a string with a valid constructor for this message.""" return ('%s(\'%s\', \'%s\', %s, size=%s, finished=%s, component_num=%s, ' 'message_type=%s, bytes_already_downloaded=%s, process_id=%s, ' 'thread_id=%s)' % (self.__class__.__name__, self.src_url, self.dst_url, self.time, self.size, self.finished, self.component_num, self.message_type, self.bytes_already_downloaded, self.process_id, self.thread_id)) class ProgressMessage(StatusMessage): """Message class for a file/object/component progress. This class contains specific information about the current progress of a file, cloud object or single component. """ def __init__(self, size, processed_bytes, src_url, message_time, dst_url=None, component_num=None, operation_name=None, process_id=None, thread_id=None): """Creates a ProgressMessage. Args: size: Integer for total size of this file/component, in bytes. processed_bytes: Integer for number of bytes already processed from that specific component, which means processed_bytes <= size. src_url: FileUrl/CloudUrl representing the source file. message_time: Float representing when message was created (seconds since Epoch). dst_url: FileUrl/CloudUrl representing the destination file, or None for unary operations like hashing. component_num: Indicates the component number, if any. operation_name: Name of the operation that is being made over that component. process_id: Process ID that produced this message (overridable for testing). thread_id: Thread ID that produced this message (overridable for testing). """ super(ProgressMessage, self).__init__(message_time) self.size = size self.processed_bytes = processed_bytes self.component_num = component_num self.src_url = src_url self.dst_url = dst_url self.finished = (size == processed_bytes) self.operation_name = operation_name def __str__(self): """Returns a string with a valid constructor for this message.""" # For a valid constructor, None should not be quoted. dst_url_string = ('\'%s\'' % self.dst_url) if self.dst_url else None operation_name_string = ( '\'%s\'' % self.operation_name) if self.operation_name else None return ('%s(%s, %s, \'%s\', %s, dst_url=%s, component_num=%s, ' 'operation_name=%s, process_id=%s, thread_id=%s)' % (self.__class__.__name__, self.size, self.processed_bytes, self.src_url, self.time, dst_url_string, self.component_num, operation_name_string, self.process_id, self.thread_id)) class SeekAheadMessage(StatusMessage): """Message class for results obtained by SeekAheadThread(). It estimates the number of objects and total size in case the task_queue cannot hold all tasks at once (only used in large operations). This class contains information about all the objects yet to be processed. """ def __init__(self, num_objects, size, message_time): """Creates a SeekAheadMessage. Args: num_objects: Number of total objects that the SeekAheadThread estimates. size: Total size corresponding to the sum of the size of objects iterated by SeekAheadThread. message_time: Float representing when message was created (seconds since Epoch). """ super(SeekAheadMessage, self).__init__(message_time) self.num_objects = num_objects self.size = size def __str__(self): """Returns a string with a valid constructor for this message.""" return ('%s(%s, %s, %s, process_id=%s, thread_id=%s)' % (self.__class__.__name__, self.num_objects, self.size, self.time, self.process_id, self.thread_id)) class ProducerThreadMessage(StatusMessage): """Message class for results obtained by calculations made on ProducerThread. It estimates the number of objects and total size currently dealty by task_queue. If the task_queue cannot support all objects at once, the SeekAheadThread will be responsible for sending an accurate message. """ def __init__(self, num_objects, size, message_time, finished=False): """Creates a SeekAheadMessage. Args: num_objects: Number of total objects that the task_queue has. size: Total size corresponding to the sum of the size of objects iterated by the task_queue message_time: Float representing when message was created (seconds since Epoch). finished: Boolean to indicate whether this is the final message from the ProducerThread. The difference is that this message displays the correct total size and number of objects, whereas the previous ones were periodic (on the number of files) updates. """ super(ProducerThreadMessage, self).__init__(message_time) self.num_objects = num_objects self.size = size self.finished = finished def __str__(self): """Returns a string with a valid constructor for this message.""" return ('%s(%s, %s, %s, finished=%s)' % (self.__class__.__name__, self.num_objects, self.size, self.time, self.finished)) class PerformanceSummaryMessage(StatusMessage): """Message class to log PerformanceSummary parameters. This class acts as a relay between a multiprocess/multithread situation and the global status queue, from which the PerformanceSummary info gets consumed. """ def __init__(self, message_time, uses_slice): """Creates a PerformanceSummaryMessage. Args: message_time: Float representing when message was created (seconds since Epoch). uses_slice: True if the command uses slice parallelism. """ super(PerformanceSummaryMessage, self).__init__(message_time, process_id=None, thread_id=None) self.uses_slice = uses_slice def __str__(self): """Returns a string with a valid constructor for this message.""" return ('%s(%s, %s)' % (self.__class__.__name__, self.time, self.uses_slice))
| ver. 1.4 |
Github
|
.
| PHP 8.2.28 | Generation time: 0.02 |
proxy
|
phpinfo
|
Settings