File manager - Edit - /opt/gsutil/third_party/apitools/apitools/gen/util.py
Back
#!/usr/bin/env python # # Copyright 2015 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Assorted utilities shared between parts of apitools.""" from __future__ import print_function from __future__ import unicode_literals import collections import contextlib import gzip import json import keyword import logging import os import re import tempfile import six from six.moves import urllib_parse import six.moves.urllib.error as urllib_error import six.moves.urllib.request as urllib_request class Error(Exception): """Base error for apitools generation.""" class CommunicationError(Error): """Error in network communication.""" def _SortLengthFirstKey(a): return -len(a), a class Names(object): """Utility class for cleaning and normalizing names in a fixed style.""" DEFAULT_NAME_CONVENTION = 'LOWER_CAMEL' NAME_CONVENTIONS = ['LOWER_CAMEL', 'LOWER_WITH_UNDER', 'NONE'] def __init__(self, strip_prefixes, name_convention=None, capitalize_enums=False): self.__strip_prefixes = sorted(strip_prefixes, key=_SortLengthFirstKey) self.__name_convention = ( name_convention or self.DEFAULT_NAME_CONVENTION) self.__capitalize_enums = capitalize_enums @staticmethod def __FromCamel(name, separator='_'): name = re.sub(r'([a-z0-9])([A-Z])', r'\1%s\2' % separator, name) return name.lower() @staticmethod def __ToCamel(name, separator='_'): # TODO(craigcitro): Consider what to do about leading or trailing # underscores (such as `_refValue` in discovery). return ''.join(s[0:1].upper() + s[1:] for s in name.split(separator)) @staticmethod def __ToLowerCamel(name, separator='_'): name = Names.__ToCamel(name, separator=separator) return name[0].lower() + name[1:] def __StripName(self, name): """Strip strip_prefix entries from name.""" if not name: return name for prefix in self.__strip_prefixes: if name.startswith(prefix): return name[len(prefix):] return name @staticmethod def CleanName(name): """Perform generic name cleaning.""" name = re.sub('[^_A-Za-z0-9]', '_', name) if name[0].isdigit(): name = '_%s' % name while keyword.iskeyword(name) or name == 'exec': name = '%s_' % name # If we end up with __ as a prefix, we'll run afoul of python # field renaming, so we manually correct for it. if name.startswith('__'): name = 'f%s' % name return name @staticmethod def NormalizeRelativePath(path): """Normalize camelCase entries in path.""" path_components = path.split('/') normalized_components = [] for component in path_components: if re.match(r'{[A-Za-z0-9_]+}$', component): normalized_components.append( '{%s}' % Names.CleanName(component[1:-1])) else: normalized_components.append(component) return '/'.join(normalized_components) def NormalizeEnumName(self, enum_name): if self.__capitalize_enums: enum_name = enum_name.upper() return self.CleanName(enum_name) def ClassName(self, name, separator='_'): """Generate a valid class name from name.""" # TODO(craigcitro): Get rid of this case here and in MethodName. if name is None: return name # TODO(craigcitro): This is a hack to handle the case of specific # protorpc class names; clean this up. if name.startswith(('protorpc.', 'message_types.', 'apitools.base.protorpclite.', 'apitools.base.protorpclite.message_types.')): return name name = self.__StripName(name) name = self.__ToCamel(name, separator=separator) return self.CleanName(name) def MethodName(self, name, separator='_'): """Generate a valid method name from name.""" if name is None: return None name = Names.__ToCamel(name, separator=separator) return Names.CleanName(name) def FieldName(self, name): """Generate a valid field name from name.""" # TODO(craigcitro): We shouldn't need to strip this name, but some # of the service names here are excessive. Fix the API and then # remove this. name = self.__StripName(name) if self.__name_convention == 'LOWER_CAMEL': name = Names.__ToLowerCamel(name) elif self.__name_convention == 'LOWER_WITH_UNDER': name = Names.__FromCamel(name) return Names.CleanName(name) @contextlib.contextmanager def Chdir(dirname, create=True): if not os.path.exists(dirname): if not create: raise OSError('Cannot find directory %s' % dirname) else: os.mkdir(dirname) previous_directory = os.getcwd() try: os.chdir(dirname) yield finally: os.chdir(previous_directory) def NormalizeVersion(version): # Currently, '.' is the only character that might cause us trouble. return version.replace('.', '_') def _ComputePaths(package, version, root_url, service_path): """Compute the base url and base path. Attributes: package: name field of the discovery, i.e. 'storage' for storage service. version: version of the service, i.e. 'v1'. root_url: root url of the service, i.e. 'https://www.googleapis.com/'. service_path: path of the service under the rool url, i.e. 'storage/v1/'. Returns: base url: string, base url of the service, 'https://www.googleapis.com/storage/v1/' for the storage service. base path: string, common prefix of service endpoints after the base url. """ full_path = urllib_parse.urljoin(root_url, service_path) api_path_component = '/'.join((package, version, '')) if api_path_component not in full_path: return full_path, '' prefix, _, suffix = full_path.rpartition(api_path_component) return prefix + api_path_component, suffix class ClientInfo(collections.namedtuple('ClientInfo', ( 'package', 'scopes', 'version', 'client_id', 'client_secret', 'user_agent', 'client_class_name', 'url_version', 'api_key', 'base_url', 'base_path', 'mtls_base_url'))): """Container for client-related info and names.""" @classmethod def Create(cls, discovery_doc, scope_ls, client_id, client_secret, user_agent, names, api_key, version_identifier): """Create a new ClientInfo object from a discovery document.""" scopes = set( discovery_doc.get('auth', {}).get('oauth2', {}).get('scopes', {})) scopes.update(scope_ls) package = discovery_doc['name'] version = ( version_identifier or NormalizeVersion(discovery_doc['version'])) url_version = discovery_doc['version'] base_url, base_path = _ComputePaths(package, url_version, discovery_doc['rootUrl'], discovery_doc['servicePath']) mtls_root_url = discovery_doc.get('mtlsRootUrl', '') mtls_base_url = '' if mtls_root_url: mtls_base_url, _ = _ComputePaths(package, url_version, mtls_root_url, discovery_doc['servicePath']) client_info = { 'package': package, 'version': version, 'url_version': url_version, 'scopes': sorted(list(scopes)), 'client_id': client_id, 'client_secret': client_secret, 'user_agent': user_agent, 'api_key': api_key, 'base_url': base_url, 'base_path': base_path, 'mtls_base_url': mtls_base_url, } client_class_name = '%s%s' % ( names.ClassName(client_info['package']), names.ClassName(client_info['version'])) client_info['client_class_name'] = client_class_name return cls(**client_info) @property def default_directory(self): return self.package @property def client_rule_name(self): return '%s_%s_client' % (self.package, self.version) @property def client_file_name(self): return '%s.py' % self.client_rule_name @property def messages_rule_name(self): return '%s_%s_messages' % (self.package, self.version) @property def services_rule_name(self): return '%s_%s_services' % (self.package, self.version) @property def messages_file_name(self): return '%s.py' % self.messages_rule_name @property def messages_proto_file_name(self): return '%s.proto' % self.messages_rule_name @property def services_proto_file_name(self): return '%s.proto' % self.services_rule_name def ReplaceHomoglyphs(s): """Returns s with unicode homoglyphs replaced by ascii equivalents.""" homoglyphs = { '\xa0': ' ', # ? '\u00e3': '', # TODO(gsfowler) drop after .proto spurious char elided '\u00a0': ' ', # ? '\u00a9': '(C)', # COPYRIGHT SIGN (would you believe "asciiglyph"?) '\u00ae': '(R)', # REGISTERED SIGN (would you believe "asciiglyph"?) '\u2014': '-', # EM DASH '\u2018': "'", # LEFT SINGLE QUOTATION MARK '\u2019': "'", # RIGHT SINGLE QUOTATION MARK '\u201c': '"', # LEFT DOUBLE QUOTATION MARK '\u201d': '"', # RIGHT DOUBLE QUOTATION MARK '\u2026': '...', # HORIZONTAL ELLIPSIS '\u2e3a': '-', # TWO-EM DASH } def _ReplaceOne(c): """Returns the homoglyph or escaped replacement for c.""" equiv = homoglyphs.get(c) if equiv is not None: return equiv try: c.encode('ascii') return c except UnicodeError: pass try: return c.encode('unicode-escape').decode('ascii') except UnicodeError: return '?' return ''.join([_ReplaceOne(c) for c in s]) def CleanDescription(description): """Return a version of description safe for printing in a docstring.""" if not isinstance(description, six.string_types): return description if six.PY3: # https://docs.python.org/3/reference/lexical_analysis.html#index-18 description = description.replace('\\N', '\\\\N') description = description.replace('\\u', '\\\\u') description = description.replace('\\U', '\\\\U') description = ReplaceHomoglyphs(description) return description.replace('"""', '" " "') class SimplePrettyPrinter(object): """Simple pretty-printer that supports an indent contextmanager.""" def __init__(self, out): self.__out = out self.__indent = '' self.__skip = False self.__comment_context = False @property def indent(self): return self.__indent def CalculateWidth(self, max_width=78): return max_width - len(self.indent) @contextlib.contextmanager def Indent(self, indent=' '): previous_indent = self.__indent self.__indent = '%s%s' % (previous_indent, indent) yield self.__indent = previous_indent @contextlib.contextmanager def CommentContext(self): """Print without any argument formatting.""" old_context = self.__comment_context self.__comment_context = True yield self.__comment_context = old_context def __call__(self, *args): if self.__comment_context and args[1:]: raise Error('Cannot do string interpolation in comment context') if args and args[0]: if not self.__comment_context: line = (args[0] % args[1:]).rstrip() else: line = args[0].rstrip() line = ReplaceHomoglyphs(line) try: print('%s%s' % (self.__indent, line), file=self.__out) except UnicodeEncodeError: line = line.encode('ascii', 'backslashreplace').decode('ascii') print('%s%s' % (self.__indent, line), file=self.__out) else: print('', file=self.__out) def _NormalizeDiscoveryUrls(discovery_url): """Expands a few abbreviations into full discovery urls.""" if discovery_url.startswith('http'): return [discovery_url] elif '.' not in discovery_url: raise ValueError('Unrecognized value "%s" for discovery url') api_name, _, api_version = discovery_url.partition('.') return [ 'https://www.googleapis.com/discovery/v1/apis/%s/%s/rest' % ( api_name, api_version), 'https://%s.googleapis.com/$discovery/rest?version=%s' % ( api_name, api_version), ] def _Gunzip(gzipped_content): """Returns gunzipped content from gzipped contents.""" f = tempfile.NamedTemporaryFile(suffix='gz', mode='w+b', delete=False) try: f.write(gzipped_content) f.close() # force file synchronization with gzip.open(f.name, 'rb') as h: decompressed_content = h.read() return decompressed_content finally: os.unlink(f.name) def _GetURLContent(url): """Download and return the content of URL.""" response = urllib_request.urlopen(url) encoding = response.info().get('Content-Encoding') if encoding == 'gzip': content = _Gunzip(response.read()) else: content = response.read() return content def FetchDiscoveryDoc(discovery_url, retries=5): """Fetch the discovery document at the given url.""" discovery_urls = _NormalizeDiscoveryUrls(discovery_url) discovery_doc = None last_exception = None for url in discovery_urls: for _ in range(retries): try: content = _GetURLContent(url) if isinstance(content, bytes): content = content.decode('utf8') discovery_doc = json.loads(content) if discovery_doc: return discovery_doc except (urllib_error.HTTPError, urllib_error.URLError) as e: logging.info( 'Attempting to fetch discovery doc again after "%s"', e) last_exception = e if discovery_doc is None: raise CommunicationError( 'Could not find discovery doc at any of %s: %s' % ( discovery_urls, last_exception))
| ver. 1.4 |
Github
|
.
| PHP 8.2.28 | Generation time: 0.03 |
proxy
|
phpinfo
|
Settings