File manager - Edit - /home/u478019808/domains/bestandroidphones.store/public_html/static/img/logo/tests_async.zip
Back
PK �,�ZC�=� � test_credentials_async.pynu �[��� # Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import pytest # type: ignore from google.auth import _credentials_async as credentials from google.auth import _helpers class CredentialsImpl(credentials.Credentials): def refresh(self, request): self.token = request def with_quota_project(self, quota_project_id): raise NotImplementedError() def test_credentials_constructor(): credentials = CredentialsImpl() assert not credentials.token assert not credentials.expiry assert not credentials.expired assert not credentials.valid def test_expired_and_valid(): credentials = CredentialsImpl() credentials.token = "token" assert credentials.valid assert not credentials.expired # Set the expiration to one second more than now plus the clock skew # accomodation. These credentials should be valid. credentials.expiry = ( _helpers.utcnow() + _helpers.REFRESH_THRESHOLD + datetime.timedelta(seconds=1) ) assert credentials.valid assert not credentials.expired # Set the credentials expiration to now. Because of the clock skew # accomodation, these credentials should report as expired. credentials.expiry = _helpers.utcnow() assert not credentials.valid assert credentials.expired @pytest.mark.asyncio async def test_before_request(): credentials = CredentialsImpl() request = "token" headers = {} # First call should call refresh, setting the token. await credentials.before_request(request, "http://example.com", "GET", headers) assert credentials.valid assert credentials.token == "token" assert headers["authorization"] == "Bearer token" request = "token2" headers = {} # Second call shouldn't call refresh. credentials.before_request(request, "http://example.com", "GET", headers) assert credentials.valid assert credentials.token == "token" def test_anonymous_credentials_ctor(): anon = credentials.AnonymousCredentials() assert anon.token is None assert anon.expiry is None assert not anon.expired assert anon.valid def test_anonymous_credentials_refresh(): anon = credentials.AnonymousCredentials() request = object() with pytest.raises(ValueError): anon.refresh(request) def test_anonymous_credentials_apply_default(): anon = credentials.AnonymousCredentials() headers = {} anon.apply(headers) assert headers == {} with pytest.raises(ValueError): anon.apply(headers, token="TOKEN") def test_anonymous_credentials_before_request(): anon = credentials.AnonymousCredentials() request = object() method = "GET" url = "https://example.com/api/endpoint" headers = {} anon.before_request(request, method, url, headers) assert headers == {} class ReadOnlyScopedCredentialsImpl(credentials.ReadOnlyScoped, CredentialsImpl): @property def requires_scopes(self): return super(ReadOnlyScopedCredentialsImpl, self).requires_scopes def test_readonly_scoped_credentials_constructor(): credentials = ReadOnlyScopedCredentialsImpl() assert credentials._scopes is None def test_readonly_scoped_credentials_scopes(): credentials = ReadOnlyScopedCredentialsImpl() credentials._scopes = ["one", "two"] assert credentials.scopes == ["one", "two"] assert credentials.has_scopes(["one"]) assert credentials.has_scopes(["two"]) assert credentials.has_scopes(["one", "two"]) assert not credentials.has_scopes(["three"]) def test_readonly_scoped_credentials_requires_scopes(): credentials = ReadOnlyScopedCredentialsImpl() assert not credentials.requires_scopes class RequiresScopedCredentialsImpl(credentials.Scoped, CredentialsImpl): def __init__(self, scopes=None): super(RequiresScopedCredentialsImpl, self).__init__() self._scopes = scopes @property def requires_scopes(self): return not self.scopes def with_scopes(self, scopes): return RequiresScopedCredentialsImpl(scopes=scopes) def test_create_scoped_if_required_scoped(): unscoped_credentials = RequiresScopedCredentialsImpl() scoped_credentials = credentials.with_scopes_if_required( unscoped_credentials, ["one", "two"] ) assert scoped_credentials is not unscoped_credentials assert not scoped_credentials.requires_scopes assert scoped_credentials.has_scopes(["one", "two"]) def test_create_scoped_if_required_not_scopes(): unscoped_credentials = CredentialsImpl() scoped_credentials = credentials.with_scopes_if_required( unscoped_credentials, ["one", "two"] ) assert scoped_credentials is unscoped_credentials PK �,�Z��� transport/async_compliance.pynu �[��� # Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import http.client as http_client import time import flask # type: ignore import pytest # type: ignore from pytest_localserver.http import WSGIServer # type: ignore from google.auth import exceptions from tests.transport import compliance class RequestResponseTests(object): @pytest.fixture(scope="module") def server(self): """Provides a test HTTP server. The test server is automatically created before a test and destroyed at the end. The server is serving a test application that can be used to verify requests. """ app = flask.Flask(__name__) app.debug = True # pylint: disable=unused-variable # (pylint thinks the flask routes are unusued.) @app.route("/basic") def index(): header_value = flask.request.headers.get("x-test-header", "value") headers = {"X-Test-Header": header_value} return "Basic Content", http_client.OK, headers @app.route("/server_error") def server_error(): return "Error", http_client.INTERNAL_SERVER_ERROR @app.route("/wait") def wait(): time.sleep(3) return "Waited" # pylint: enable=unused-variable server = WSGIServer(application=app.wsgi_app) server.start() yield server server.stop() @pytest.mark.asyncio async def test_request_basic(self, server): request = self.make_request() response = await request(url=server.url + "/basic", method="GET") assert response.status == http_client.OK assert response.headers["x-test-header"] == "value" # Use 13 as this is the length of the data written into the stream. data = await response.data.read(13) assert data == b"Basic Content" @pytest.mark.asyncio async def test_request_basic_with_http(self, server): request = self.make_with_parameter_request() response = await request(url=server.url + "/basic", method="GET") assert response.status == http_client.OK assert response.headers["x-test-header"] == "value" # Use 13 as this is the length of the data written into the stream. data = await response.data.read(13) assert data == b"Basic Content" @pytest.mark.asyncio async def test_request_with_timeout_success(self, server): request = self.make_request() response = await request(url=server.url + "/basic", method="GET", timeout=2) assert response.status == http_client.OK assert response.headers["x-test-header"] == "value" data = await response.data.read(13) assert data == b"Basic Content" @pytest.mark.asyncio async def test_request_with_timeout_failure(self, server): request = self.make_request() with pytest.raises(exceptions.TransportError): await request(url=server.url + "/wait", method="GET", timeout=1) @pytest.mark.asyncio async def test_request_headers(self, server): request = self.make_request() response = await request( url=server.url + "/basic", method="GET", headers={"x-test-header": "hello world"}, ) assert response.status == http_client.OK assert response.headers["x-test-header"] == "hello world" data = await response.data.read(13) assert data == b"Basic Content" @pytest.mark.asyncio async def test_request_error(self, server): request = self.make_request() response = await request(url=server.url + "/server_error", method="GET") assert response.status == http_client.INTERNAL_SERVER_ERROR data = await response.data.read(5) assert data == b"Error" @pytest.mark.asyncio async def test_connection_error(self): request = self.make_request() with pytest.raises(exceptions.TransportError): await request(url="http://{}".format(compliance.NXDOMAIN), method="GET") PK �,�Z���Y% Y% "