File manager - Edit - /home/u478019808/domains/bestandroidphones.store/public_html/static/img/logo/with_dummyserver.tar
Back
test_poolmanager.py 0000644 00000060763 15025306245 0010477 0 ustar 00 from __future__ import annotations import gzip import typing from test import LONG_TIMEOUT from unittest import mock import pytest from dummyserver.socketserver import HAS_IPV6 from dummyserver.testcase import ( HypercornDummyServerTestCase, IPv6HypercornDummyServerTestCase, ) from urllib3 import HTTPHeaderDict, HTTPResponse, request from urllib3.connectionpool import port_by_scheme from urllib3.exceptions import MaxRetryError, URLSchemeUnknown from urllib3.poolmanager import PoolManager from urllib3.util.retry import Retry class TestPoolManager(HypercornDummyServerTestCase): @classmethod def setup_class(cls) -> None: super().setup_class() cls.base_url = f"http://{cls.host}:{cls.port}" cls.base_url_alt = f"http://{cls.host_alt}:{cls.port}" def test_redirect(self) -> None: with PoolManager() as http: r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": f"{self.base_url}/"}, redirect=False, ) assert r.status == 303 r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": f"{self.base_url}/"}, ) assert r.status == 200 assert r.data == b"Dummy server!" def test_redirect_twice(self) -> None: with PoolManager() as http: r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": f"{self.base_url}/redirect"}, redirect=False, ) assert r.status == 303 r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": f"{self.base_url}/redirect?target={self.base_url}/"}, ) assert r.status == 200 assert r.data == b"Dummy server!" def test_redirect_to_relative_url(self) -> None: with PoolManager() as http: r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": "/redirect"}, redirect=False, ) assert r.status == 303 r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": "/redirect"} ) assert r.status == 200 assert r.data == b"Dummy server!" def test_cross_host_redirect(self) -> None: with PoolManager() as http: cross_host_location = f"{self.base_url_alt}/echo?a=b" with pytest.raises(MaxRetryError): http.request( "GET", f"{self.base_url}/redirect", fields={"target": cross_host_location}, timeout=LONG_TIMEOUT, retries=0, ) r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": f"{self.base_url_alt}/echo?a=b"}, timeout=LONG_TIMEOUT, retries=1, ) assert isinstance(r, HTTPResponse) assert r._pool is not None assert r._pool.host == self.host_alt def test_too_many_redirects(self) -> None: with PoolManager() as http: with pytest.raises(MaxRetryError): http.request( "GET", f"{self.base_url}/redirect", fields={ "target": f"{self.base_url}/redirect?target={self.base_url}/" }, retries=1, preload_content=False, ) with pytest.raises(MaxRetryError): http.request( "GET", f"{self.base_url}/redirect", fields={ "target": f"{self.base_url}/redirect?target={self.base_url}/" }, retries=Retry(total=None, redirect=1), preload_content=False, ) # Even with preload_content=False and raise on redirects, we reused the same # connection assert len(http.pools) == 1 pool = http.connection_from_host(self.host, self.port) assert pool.num_connections == 1 def test_redirect_cross_host_remove_headers(self) -> None: with PoolManager() as http: r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": f"{self.base_url_alt}/headers"}, headers={ "Authorization": "foo", "Proxy-Authorization": "bar", "Cookie": "foo=bar", }, ) assert r.status == 200 data = r.json() assert "Authorization" not in data assert "Proxy-Authorization" not in data assert "Cookie" not in data r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": f"{self.base_url_alt}/headers"}, headers={ "authorization": "foo", "proxy-authorization": "baz", "cookie": "foo=bar", }, ) assert r.status == 200 data = r.json() assert "authorization" not in data assert "Authorization" not in data assert "proxy-authorization" not in data assert "Proxy-Authorization" not in data assert "cookie" not in data assert "Cookie" not in data def test_redirect_cross_host_no_remove_headers(self) -> None: with PoolManager() as http: r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": f"{self.base_url_alt}/headers"}, headers={ "Authorization": "foo", "Proxy-Authorization": "bar", "Cookie": "foo=bar", }, retries=Retry(remove_headers_on_redirect=[]), ) assert r.status == 200 data = r.json() assert data["Authorization"] == "foo" assert data["Proxy-Authorization"] == "bar" assert data["Cookie"] == "foo=bar" def test_redirect_cross_host_set_removed_headers(self) -> None: with PoolManager() as http: r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": f"{self.base_url_alt}/headers"}, headers={ "X-API-Secret": "foo", "Authorization": "bar", "Proxy-Authorization": "baz", "Cookie": "foo=bar", }, retries=Retry(remove_headers_on_redirect=["X-API-Secret"]), ) assert r.status == 200 data = r.json() assert "X-API-Secret" not in data assert data["Authorization"] == "bar" assert data["Proxy-Authorization"] == "baz" assert data["Cookie"] == "foo=bar" headers = { "x-api-secret": "foo", "authorization": "bar", "proxy-authorization": "baz", "cookie": "foo=bar", } r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": f"{self.base_url_alt}/headers"}, headers=headers, retries=Retry(remove_headers_on_redirect=["X-API-Secret"]), ) assert r.status == 200 data = r.json() assert "x-api-secret" not in data assert "X-API-Secret" not in data assert data["Authorization"] == "bar" assert data["Proxy-Authorization"] == "baz" assert data["Cookie"] == "foo=bar" # Ensure the header argument itself is not modified in-place. assert headers == { "x-api-secret": "foo", "authorization": "bar", "proxy-authorization": "baz", "cookie": "foo=bar", } def test_redirect_without_preload_releases_connection(self) -> None: with PoolManager(block=True, maxsize=2) as http: r = http.request("GET", f"{self.base_url}/redirect", preload_content=False) assert isinstance(r, HTTPResponse) assert r._pool is not None assert r._pool.num_requests == 2 assert r._pool.num_connections == 1 assert len(http.pools) == 1 def test_303_redirect_makes_request_lose_body(self) -> None: with PoolManager() as http: response = http.request( "POST", f"{self.base_url}/redirect", fields={ "target": f"{self.base_url}/headers_and_params", "status": "303 See Other", }, ) data = response.json() assert data["params"] == {} assert "Content-Type" not in HTTPHeaderDict(data["headers"]) def test_unknown_scheme(self) -> None: with PoolManager() as http: unknown_scheme = "unknown" unknown_scheme_url = f"{unknown_scheme}://host" with pytest.raises(URLSchemeUnknown) as e: r = http.request("GET", unknown_scheme_url) assert e.value.scheme == unknown_scheme r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": unknown_scheme_url}, redirect=False, ) assert r.status == 303 assert r.headers.get("Location") == unknown_scheme_url with pytest.raises(URLSchemeUnknown) as e: r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": unknown_scheme_url}, ) assert e.value.scheme == unknown_scheme def test_raise_on_redirect(self) -> None: with PoolManager() as http: r = http.request( "GET", f"{self.base_url}/redirect", fields={"target": f"{self.base_url}/redirect?target={self.base_url}/"}, retries=Retry(total=None, redirect=1, raise_on_redirect=False), ) assert r.status == 303 def test_raise_on_status(self) -> None: with PoolManager() as http: with pytest.raises(MaxRetryError): # the default is to raise r = http.request( "GET", f"{self.base_url}/status", fields={"status": "500 Internal Server Error"}, retries=Retry(total=1, status_forcelist=range(500, 600)), ) with pytest.raises(MaxRetryError): # raise explicitly r = http.request( "GET", f"{self.base_url}/status", fields={"status": "500 Internal Server Error"}, retries=Retry( total=1, status_forcelist=range(500, 600), raise_on_status=True ), ) # don't raise r = http.request( "GET", f"{self.base_url}/status", fields={"status": "500 Internal Server Error"}, retries=Retry( total=1, status_forcelist=range(500, 600), raise_on_status=False ), ) assert r.status == 500 def test_missing_port(self) -> None: # Can a URL that lacks an explicit port like ':80' succeed, or # will all such URLs fail with an error? with PoolManager() as http: # By globally adjusting `port_by_scheme` we pretend for a moment # that HTTP's default port is not 80, but is the port at which # our test server happens to be listening. port_by_scheme["http"] = self.port try: r = http.request("GET", f"http://{self.host}/", retries=0) finally: port_by_scheme["http"] = 80 assert r.status == 200 assert r.data == b"Dummy server!" def test_headers(self) -> None: with PoolManager(headers={"Foo": "bar"}) as http: r = http.request("GET", f"{self.base_url}/headers") returned_headers = r.json() assert returned_headers.get("Foo") == "bar" r = http.request("POST", f"{self.base_url}/headers") returned_headers = r.json() assert returned_headers.get("Foo") == "bar" r = http.request_encode_url("GET", f"{self.base_url}/headers") returned_headers = r.json() assert returned_headers.get("Foo") == "bar" r = http.request_encode_body("POST", f"{self.base_url}/headers") returned_headers = r.json() assert returned_headers.get("Foo") == "bar" r = http.request_encode_url( "GET", f"{self.base_url}/headers", headers={"Baz": "quux"} ) returned_headers = r.json() assert returned_headers.get("Foo") is None assert returned_headers.get("Baz") == "quux" r = http.request_encode_body( "GET", f"{self.base_url}/headers", headers={"Baz": "quux"} ) returned_headers = r.json() assert returned_headers.get("Foo") is None assert returned_headers.get("Baz") == "quux" def test_headers_http_header_dict(self) -> None: # Test uses a list of headers to assert the order # that headers are sent in the request too. headers = HTTPHeaderDict() headers.add("Foo", "bar") headers.add("Multi", "1") headers.add("Baz", "quux") headers.add("Multi", "2") with PoolManager(headers=headers) as http: r = http.request("GET", f"{self.base_url}/multi_headers") returned_headers = r.json()["headers"] assert returned_headers[-4:] == [ ["Foo", "bar"], ["Multi", "1"], ["Multi", "2"], ["Baz", "quux"], ] r = http.request( "GET", f"{self.base_url}/multi_headers", headers={ **headers, "Extra": "extra", "Foo": "new", }, ) returned_headers = r.json()["headers"] assert returned_headers[-4:] == [ ["Foo", "new"], ["Multi", "1, 2"], ["Baz", "quux"], ["Extra", "extra"], ] def test_merge_headers_with_pool_manager_headers(self) -> None: headers = HTTPHeaderDict() headers.add("Cookie", "choc-chip") headers.add("Cookie", "oatmeal-raisin") orig = headers.copy() added_headers = {"Cookie": "tim-tam"} with PoolManager(headers=headers) as http: r = http.request( "GET", f"{self.base_url}/multi_headers", headers=typing.cast(HTTPHeaderDict, http.headers) | added_headers, ) returned_headers = r.json()["headers"] assert returned_headers[-3:] == [ ["Cookie", "choc-chip"], ["Cookie", "oatmeal-raisin"], ["Cookie", "tim-tam"], ] # make sure the pool headers weren't modified assert http.headers == orig def test_headers_http_multi_header_multipart(self) -> None: headers = HTTPHeaderDict() headers.add("Multi", "1") headers.add("Multi", "2") old_headers = headers.copy() with PoolManager(headers=headers) as http: r = http.request( "POST", f"{self.base_url}/multi_headers", fields={"k": "v"}, multipart_boundary="b", encode_multipart=True, ) returned_headers = r.json()["headers"] assert returned_headers[5:] == [ ["Multi", "1"], ["Multi", "2"], ["Content-Type", "multipart/form-data; boundary=b"], ] # Assert that the previous headers weren't modified. assert headers == old_headers # Set a default value for the Content-Type headers["Content-Type"] = "multipart/form-data; boundary=b; field=value" r = http.request( "POST", f"{self.base_url}/multi_headers", fields={"k": "v"}, multipart_boundary="b", encode_multipart=True, ) returned_headers = r.json()["headers"] assert returned_headers[5:] == [ ["Multi", "1"], ["Multi", "2"], # Uses the set value, not the one that would be generated. ["Content-Type", "multipart/form-data; boundary=b; field=value"], ] def test_body(self) -> None: with PoolManager() as http: r = http.request("POST", f"{self.base_url}/echo", body=b"test") assert r.data == b"test" def test_http_with_ssl_keywords(self) -> None: with PoolManager(ca_certs="REQUIRED") as http: r = http.request("GET", f"http://{self.host}:{self.port}/") assert r.status == 200 def test_http_with_server_hostname(self) -> None: with PoolManager(server_hostname="example.com") as http: r = http.request("GET", f"http://{self.host}:{self.port}/") assert r.status == 200 def test_http_with_ca_cert_dir(self) -> None: with PoolManager(ca_certs="REQUIRED", ca_cert_dir="/nosuchdir") as http: r = http.request("GET", f"http://{self.host}:{self.port}/") assert r.status == 200 @pytest.mark.parametrize( ["target", "expected_target"], [ # annoyingly quart.request.full_path adds a stray `?` ("/echo_uri", b"/echo_uri?"), ("/echo_uri?q=1#fragment", b"/echo_uri?q=1"), ("/echo_uri?#", b"/echo_uri?"), ("/echo_uri#!", b"/echo_uri?"), ("/echo_uri#!#", b"/echo_uri?"), ("/echo_uri??#", b"/echo_uri??"), ("/echo_uri?%3f#", b"/echo_uri?%3F"), ("/echo_uri?%3F#", b"/echo_uri?%3F"), ("/echo_uri?[]", b"/echo_uri?%5B%5D"), ], ) def test_encode_http_target(self, target: str, expected_target: bytes) -> None: with PoolManager() as http: url = f"http://{self.host}:{self.port}{target}" r = http.request("GET", url) assert r.data == expected_target def test_top_level_request(self) -> None: r = request("GET", f"{self.base_url}/") assert r.status == 200 assert r.data == b"Dummy server!" def test_top_level_request_without_keyword_args(self) -> None: body = "" with pytest.raises(TypeError): request("GET", f"{self.base_url}/", body) # type: ignore[misc] def test_top_level_request_with_body(self) -> None: r = request("POST", f"{self.base_url}/echo", body=b"test") assert r.status == 200 assert r.data == b"test" def test_top_level_request_with_preload_content(self) -> None: r = request("GET", f"{self.base_url}/echo", preload_content=False) assert r.status == 200 assert r.connection is not None r.data assert r.connection is None def test_top_level_request_with_decode_content(self) -> None: r = request( "GET", f"{self.base_url}/encodingrequest", headers={"accept-encoding": "gzip"}, decode_content=False, ) assert r.status == 200 assert gzip.decompress(r.data) == b"hello, world!" r = request( "GET", f"{self.base_url}/encodingrequest", headers={"accept-encoding": "gzip"}, decode_content=True, ) assert r.status == 200 assert r.data == b"hello, world!" def test_top_level_request_with_redirect(self) -> None: r = request( "GET", f"{self.base_url}/redirect", fields={"target": f"{self.base_url}/"}, redirect=False, ) assert r.status == 303 r = request( "GET", f"{self.base_url}/redirect", fields={"target": f"{self.base_url}/"}, redirect=True, ) assert r.status == 200 assert r.data == b"Dummy server!" def test_top_level_request_with_retries(self) -> None: r = request("GET", f"{self.base_url}/redirect", retries=False) assert r.status == 303 r = request("GET", f"{self.base_url}/redirect", retries=3) assert r.status == 200 def test_top_level_request_with_timeout(self) -> None: with mock.patch("urllib3.poolmanager.RequestMethods.request") as mockRequest: mockRequest.return_value = HTTPResponse(status=200) r = request("GET", f"{self.base_url}/redirect", timeout=2.5) assert r.status == 200 mockRequest.assert_called_with( "GET", f"{self.base_url}/redirect", body=None, fields=None, headers=None, preload_content=True, decode_content=True, redirect=True, retries=None, timeout=2.5, json=None, ) @pytest.mark.parametrize( "headers", [ None, {"content-Type": "application/json"}, {"content-Type": "text/plain"}, {"attribute": "value", "CONTENT-TYPE": "application/json"}, HTTPHeaderDict(cookie="foo, bar"), ], ) def test_request_with_json(self, headers: HTTPHeaderDict) -> None: old_headers = None if headers is None else headers.copy() body = {"attribute": "value"} r = request( method="POST", url=f"{self.base_url}/echo_json", headers=headers, json=body ) assert r.status == 200 assert r.json() == body content_type = HTTPHeaderDict(old_headers).get( "Content-Type", "application/json" ) assert content_type in r.headers["Content-Type"].replace(" ", "").split(",") # Ensure the header argument itself is not modified in-place. assert headers == old_headers def test_top_level_request_with_json_with_httpheaderdict(self) -> None: body = {"attribute": "value"} header = HTTPHeaderDict(cookie="foo, bar") with PoolManager(headers=header) as http: r = http.request(method="POST", url=f"{self.base_url}/echo_json", json=body) assert r.status == 200 assert r.json() == body assert "application/json" in r.headers["Content-Type"].replace( " ", "" ).split(",") def test_top_level_request_with_body_and_json(self) -> None: match = "request got values for both 'body' and 'json' parameters which are mutually exclusive" with pytest.raises(TypeError, match=match): body = {"attribute": "value"} request(method="POST", url=f"{self.base_url}/echo", body="", json=body) def test_top_level_request_with_invalid_body(self) -> None: class BadBody: def __repr__(self) -> str: return "<BadBody>" with pytest.raises(TypeError) as e: request( method="POST", url=f"{self.base_url}/echo", body=BadBody(), # type: ignore[arg-type] ) assert str(e.value) == ( "'body' must be a bytes-like object, file-like " "object, or iterable. Instead was <BadBody>" ) @pytest.mark.skipif(not HAS_IPV6, reason="IPv6 is not supported on this system") class TestIPv6PoolManager(IPv6HypercornDummyServerTestCase): @classmethod def setup_class(cls) -> None: super().setup_class() cls.base_url = f"http://[{cls.host}]:{cls.port}" def test_ipv6(self) -> None: with PoolManager() as http: http.request("GET", self.base_url) test_connection.py 0000644 00000010641 15025306245 0010320 0 ustar 00 from __future__ import annotations import contextlib import sys import typing from http.client import ResponseNotReady from unittest import mock import pytest from dummyserver.testcase import HypercornDummyServerTestCase as server from urllib3 import HTTPConnectionPool from urllib3.response import HTTPResponse @pytest.fixture() def pool() -> typing.Generator[HTTPConnectionPool, None, None]: server.setup_class() with HTTPConnectionPool(server.host, server.port) as pool: yield pool server.teardown_class() def test_returns_urllib3_HTTPResponse(pool: HTTPConnectionPool) -> None: with contextlib.closing(pool._get_conn()) as conn: conn.request("GET", "/") response = conn.getresponse() assert isinstance(response, HTTPResponse) @pytest.mark.skipif(not hasattr(sys, "audit"), reason="requires python 3.8+") @mock.patch("urllib3.connection.sys.audit") def test_audit_event(audit_mock: mock.Mock, pool: HTTPConnectionPool) -> None: with contextlib.closing(pool._get_conn()) as conn: conn.request("GET", "/") audit_mock.assert_any_call("http.client.connect", conn, conn.host, conn.port) # Ensure the event is raised only once. connect_events = [ call for call in audit_mock.mock_calls if call.args[0] == "http.client.connect" ] assert len(connect_events) == 1 def test_does_not_release_conn(pool: HTTPConnectionPool) -> None: with contextlib.closing(pool._get_conn()) as conn: conn.request("GET", "/") response = conn.getresponse() response.release_conn() assert pool.pool.qsize() == 0 # type: ignore[union-attr] def test_releases_conn(pool: HTTPConnectionPool) -> None: with contextlib.closing(pool._get_conn()) as conn: conn.request("GET", "/") response = conn.getresponse() # If these variables are set by the pool # then the response can release the connection # back into the pool. response._pool = pool # type: ignore[attr-defined] response._connection = conn # type: ignore[attr-defined] response.release_conn() assert pool.pool.qsize() == 1 # type: ignore[union-attr] def test_double_getresponse(pool: HTTPConnectionPool) -> None: with contextlib.closing(pool._get_conn()) as conn: conn.request("GET", "/") _ = conn.getresponse() # Calling getrepsonse() twice should cause an error with pytest.raises(ResponseNotReady): conn.getresponse() def test_connection_state_properties(pool: HTTPConnectionPool) -> None: conn = pool._get_conn() assert conn.is_closed is True assert conn.is_connected is False assert conn.has_connected_to_proxy is False assert conn.is_verified is False assert conn.proxy_is_verified is None conn.connect() assert conn.is_closed is False assert conn.is_connected is True assert conn.has_connected_to_proxy is False assert conn.is_verified is False assert conn.proxy_is_verified is None conn.request("GET", "/") resp = conn.getresponse() assert resp.status == 200 conn.close() assert conn.is_closed is True assert conn.is_connected is False assert conn.has_connected_to_proxy is False assert conn.is_verified is False assert conn.proxy_is_verified is None def test_set_tunnel_is_reset(pool: HTTPConnectionPool) -> None: conn = pool._get_conn() assert conn.is_closed is True assert conn.is_connected is False assert conn.has_connected_to_proxy is False assert conn.is_verified is False assert conn.proxy_is_verified is None conn.set_tunnel(host="host", port=8080, scheme="http") assert conn._tunnel_host == "host" # type: ignore[attr-defined] assert conn._tunnel_port == 8080 # type: ignore[attr-defined] assert conn._tunnel_scheme == "http" # type: ignore[attr-defined] conn.close() assert conn._tunnel_host is None # type: ignore[attr-defined] assert conn._tunnel_port is None # type: ignore[attr-defined] assert conn._tunnel_scheme is None # type: ignore[attr-defined] def test_invalid_tunnel_scheme(pool: HTTPConnectionPool) -> None: conn = pool._get_conn() with pytest.raises(ValueError) as e: conn.set_tunnel(host="host", port=8080, scheme="socks") assert ( str(e.value) == "Invalid proxy scheme for tunneling: 'socks', must be either 'http' or 'https'" ) test_connectionpool.py 0000644 00000156167 15025306245 0011230 0 ustar 00 from __future__ import annotations import io import socket import time import typing import warnings from test import LONG_TIMEOUT, SHORT_TIMEOUT from threading import Event from unittest import mock from urllib.parse import urlencode import pytest from dummyserver.socketserver import NoIPv6Warning from dummyserver.testcase import HypercornDummyServerTestCase, SocketDummyServerTestCase from urllib3 import HTTPConnectionPool, encode_multipart_formdata from urllib3._collections import HTTPHeaderDict from urllib3.connection import _get_default_user_agent from urllib3.exceptions import ( ConnectTimeoutError, DecodeError, EmptyPoolError, MaxRetryError, NameResolutionError, NewConnectionError, ReadTimeoutError, UnrewindableBodyError, ) from urllib3.fields import _TYPE_FIELD_VALUE_TUPLE from urllib3.util import SKIP_HEADER, SKIPPABLE_HEADERS from urllib3.util.retry import RequestHistory, Retry from urllib3.util.timeout import _TYPE_TIMEOUT, Timeout from .. import INVALID_SOURCE_ADDRESSES, TARPIT_HOST, VALID_SOURCE_ADDRESSES from ..port_helpers import find_unused_port def wait_for_socket(ready_event: Event) -> None: ready_event.wait() ready_event.clear() class TestConnectionPoolTimeouts(SocketDummyServerTestCase): def test_timeout_float(self) -> None: block_event = Event() ready_event = self.start_basic_handler(block_send=block_event, num=2) with HTTPConnectionPool(self.host, self.port, retries=False) as pool: wait_for_socket(ready_event) with pytest.raises(ReadTimeoutError): pool.request("GET", "/", timeout=SHORT_TIMEOUT) block_event.set() # Release block # Shouldn't raise this time wait_for_socket(ready_event) block_event.set() # Pre-release block pool.request("GET", "/", timeout=LONG_TIMEOUT) def test_conn_closed(self) -> None: block_event = Event() self.start_basic_handler(block_send=block_event, num=1) with HTTPConnectionPool( self.host, self.port, timeout=SHORT_TIMEOUT, retries=False ) as pool: conn = pool._get_conn() pool._put_conn(conn) try: with pytest.raises(ReadTimeoutError): pool.urlopen("GET", "/") if not conn.is_closed: with pytest.raises(socket.error): conn.sock.recv(1024) # type: ignore[attr-defined] finally: pool._put_conn(conn) block_event.set() def test_timeout(self) -> None: # Requests should time out when expected block_event = Event() ready_event = self.start_basic_handler(block_send=block_event, num=3) # Pool-global timeout short_timeout = Timeout(read=SHORT_TIMEOUT) with HTTPConnectionPool( self.host, self.port, timeout=short_timeout, retries=False ) as pool: wait_for_socket(ready_event) block_event.clear() with pytest.raises(ReadTimeoutError): pool.request("GET", "/") block_event.set() # Release request # Request-specific timeouts should raise errors with HTTPConnectionPool( self.host, self.port, timeout=short_timeout, retries=False ) as pool: wait_for_socket(ready_event) now = time.time() with pytest.raises(ReadTimeoutError): pool.request("GET", "/", timeout=LONG_TIMEOUT) delta = time.time() - now message = "timeout was pool-level SHORT_TIMEOUT rather than request-level LONG_TIMEOUT" assert delta >= (LONG_TIMEOUT - 1e-5), message block_event.set() # Release request # Timeout passed directly to request should raise a request timeout wait_for_socket(ready_event) with pytest.raises(ReadTimeoutError): pool.request("GET", "/", timeout=SHORT_TIMEOUT) block_event.set() # Release request def test_connect_timeout(self) -> None: url = "/" host, port = TARPIT_HOST, 80 timeout = Timeout(connect=SHORT_TIMEOUT) # Pool-global timeout with HTTPConnectionPool(host, port, timeout=timeout) as pool: conn = pool._get_conn() with pytest.raises(ConnectTimeoutError): pool._make_request(conn, "GET", url) # Retries retries = Retry(connect=0) with pytest.raises(MaxRetryError): pool.request("GET", url, retries=retries) # Request-specific connection timeouts big_timeout = Timeout(read=LONG_TIMEOUT, connect=LONG_TIMEOUT) with HTTPConnectionPool(host, port, timeout=big_timeout, retries=False) as pool: conn = pool._get_conn() with pytest.raises(ConnectTimeoutError): pool._make_request(conn, "GET", url, timeout=timeout) pool._put_conn(conn) with pytest.raises(ConnectTimeoutError): pool.request("GET", url, timeout=timeout) def test_total_applies_connect(self) -> None: host, port = TARPIT_HOST, 80 timeout = Timeout(total=None, connect=SHORT_TIMEOUT) with HTTPConnectionPool(host, port, timeout=timeout) as pool: conn = pool._get_conn() try: with pytest.raises(ConnectTimeoutError): pool._make_request(conn, "GET", "/") finally: conn.close() timeout = Timeout(connect=3, read=5, total=SHORT_TIMEOUT) with HTTPConnectionPool(host, port, timeout=timeout) as pool: conn = pool._get_conn() try: with pytest.raises(ConnectTimeoutError): pool._make_request(conn, "GET", "/") finally: conn.close() def test_total_timeout(self) -> None: block_event = Event() ready_event = self.start_basic_handler(block_send=block_event, num=2) wait_for_socket(ready_event) # This will get the socket to raise an EAGAIN on the read timeout = Timeout(connect=3, read=SHORT_TIMEOUT) with HTTPConnectionPool( self.host, self.port, timeout=timeout, retries=False ) as pool: with pytest.raises(ReadTimeoutError): pool.request("GET", "/") block_event.set() wait_for_socket(ready_event) block_event.clear() # The connect should succeed and this should hit the read timeout timeout = Timeout(connect=3, read=5, total=SHORT_TIMEOUT) with HTTPConnectionPool( self.host, self.port, timeout=timeout, retries=False ) as pool: with pytest.raises(ReadTimeoutError): pool.request("GET", "/") def test_create_connection_timeout(self) -> None: self.start_basic_handler(block_send=Event(), num=0) # needed for self.port timeout = Timeout(connect=SHORT_TIMEOUT, total=LONG_TIMEOUT) with HTTPConnectionPool( TARPIT_HOST, self.port, timeout=timeout, retries=False ) as pool: conn = pool._new_conn() with pytest.raises(ConnectTimeoutError): conn.connect() class TestConnectionPool(HypercornDummyServerTestCase): def test_get(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("GET", "/specific_method", fields={"method": "GET"}) assert r.status == 200, r.data def test_post_url(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("POST", "/specific_method", fields={"method": "POST"}) assert r.status == 200, r.data def test_urlopen_put(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: r = pool.urlopen("PUT", "/specific_method?method=PUT") assert r.status == 200, r.data def test_wrong_specific_method(self) -> None: # To make sure the dummy server is actually returning failed responses with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("GET", "/specific_method", fields={"method": "POST"}) assert r.status == 400, r.data with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("POST", "/specific_method", fields={"method": "GET"}) assert r.status == 400, r.data def test_upload(self) -> None: data = "I'm in ur multipart form-data, hazing a cheezburgr" fields: dict[str, _TYPE_FIELD_VALUE_TUPLE] = { "upload_param": "filefield", "upload_filename": "lolcat.txt", "filefield": ("lolcat.txt", data), } fields["upload_size"] = len(data) # type: ignore[assignment] with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("POST", "/upload", fields=fields) assert r.status == 200, r.data def test_one_name_multiple_values(self) -> None: fields = [("foo", "a"), ("foo", "b")] with HTTPConnectionPool(self.host, self.port) as pool: # urlencode r = pool.request("GET", "/echo", fields=fields) assert r.data == b"foo=a&foo=b" # multipart r = pool.request("POST", "/echo", fields=fields) assert r.data.count(b'name="foo"') == 2 def test_request_method_body(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: body = b"hi" r = pool.request("POST", "/echo", body=body) assert r.data == body fields = [("hi", "hello")] with pytest.raises(TypeError): pool.request("POST", "/echo", body=body, fields=fields) def test_unicode_upload(self) -> None: fieldname = "myfile" filename = "\xe2\x99\xa5.txt" data = "\xe2\x99\xa5".encode() size = len(data) fields: dict[str, _TYPE_FIELD_VALUE_TUPLE] = { "upload_param": fieldname, "upload_filename": filename, fieldname: (filename, data), } fields["upload_size"] = size # type: ignore[assignment] with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("POST", "/upload", fields=fields) assert r.status == 200, r.data def test_nagle(self) -> None: """Test that connections have TCP_NODELAY turned on""" # This test needs to be here in order to be run. socket.create_connection actually tries # to connect to the host provided so we need a dummyserver to be running. with HTTPConnectionPool(self.host, self.port) as pool: conn = pool._get_conn() try: pool._make_request(conn, "GET", "/") tcp_nodelay_setting = conn.sock.getsockopt( # type: ignore[attr-defined] socket.IPPROTO_TCP, socket.TCP_NODELAY ) assert tcp_nodelay_setting finally: conn.close() @pytest.mark.parametrize( "socket_options", [ [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)], ((socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),), ], ) def test_socket_options(self, socket_options: tuple[int, int, int]) -> None: """Test that connections accept socket options.""" # This test needs to be here in order to be run. socket.create_connection actually tries to # connect to the host provided so we need a dummyserver to be running. with HTTPConnectionPool( self.host, self.port, socket_options=socket_options, ) as pool: # Get the socket of a new connection. s = pool._new_conn()._new_conn() # type: ignore[attr-defined] try: using_keepalive = ( s.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE) > 0 ) assert using_keepalive finally: s.close() @pytest.mark.parametrize("socket_options", [None, []]) def test_disable_default_socket_options( self, socket_options: list[int] | None ) -> None: """Test that passing None or empty list disables all socket options.""" # This test needs to be here in order to be run. socket.create_connection actually tries # to connect to the host provided so we need a dummyserver to be running. with HTTPConnectionPool( self.host, self.port, socket_options=socket_options ) as pool: s = pool._new_conn()._new_conn() # type: ignore[attr-defined] try: using_nagle = s.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) == 0 assert using_nagle finally: s.close() def test_defaults_are_applied(self) -> None: """Test that modifying the default socket options works.""" # This test needs to be here in order to be run. socket.create_connection actually tries # to connect to the host provided so we need a dummyserver to be running. with HTTPConnectionPool(self.host, self.port) as pool: # Get the HTTPConnection instance conn = pool._new_conn() try: # Update the default socket options assert conn.socket_options is not None conn.socket_options += [(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)] # type: ignore[operator] s = conn._new_conn() # type: ignore[attr-defined] nagle_disabled = ( s.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) > 0 ) using_keepalive = ( s.getsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE) > 0 ) assert nagle_disabled assert using_keepalive finally: conn.close() s.close() def test_connection_error_retries(self) -> None: """ECONNREFUSED error should raise a connection error, with retries""" port = find_unused_port() with HTTPConnectionPool(self.host, port) as pool: with pytest.raises(MaxRetryError) as e: pool.request("GET", "/", retries=Retry(connect=3)) assert type(e.value.reason) is NewConnectionError def test_timeout_success(self) -> None: timeout = Timeout(connect=3, read=5, total=None) with HTTPConnectionPool(self.host, self.port, timeout=timeout) as pool: pool.request("GET", "/") # This should not raise a "Timeout already started" error pool.request("GET", "/") with HTTPConnectionPool(self.host, self.port, timeout=timeout) as pool: # This should also not raise a "Timeout already started" error pool.request("GET", "/") timeout = Timeout(total=None) with HTTPConnectionPool(self.host, self.port, timeout=timeout) as pool: pool.request("GET", "/") socket_timeout_reuse_testdata = pytest.mark.parametrize( ["timeout", "expect_settimeout_calls"], [ (1, (1, 1)), (None, (None, None)), (Timeout(read=4), (None, 4)), (Timeout(read=4, connect=5), (5, 4)), (Timeout(connect=6), (6, None)), ], ) @socket_timeout_reuse_testdata def test_socket_timeout_updated_on_reuse_constructor( self, timeout: _TYPE_TIMEOUT, expect_settimeout_calls: typing.Sequence[float | None], ) -> None: with HTTPConnectionPool(self.host, self.port, timeout=timeout) as pool: # Make a request to create a new connection. pool.urlopen("GET", "/") # Grab the connection and mock the inner socket. assert pool.pool is not None conn = pool.pool.get_nowait() conn_sock = mock.Mock(wraps=conn.sock) conn.sock = conn_sock pool._put_conn(conn) # Assert that sock.settimeout() is called with the new connect timeout, then the read timeout. pool.urlopen("GET", "/", timeout=timeout) conn_sock.settimeout.assert_has_calls( [mock.call(x) for x in expect_settimeout_calls] ) @socket_timeout_reuse_testdata def test_socket_timeout_updated_on_reuse_parameter( self, timeout: _TYPE_TIMEOUT, expect_settimeout_calls: typing.Sequence[float | None], ) -> None: with HTTPConnectionPool(self.host, self.port) as pool: # Make a request to create a new connection. pool.urlopen("GET", "/", timeout=LONG_TIMEOUT) # Grab the connection and mock the inner socket. assert pool.pool is not None conn = pool.pool.get_nowait() conn_sock = mock.Mock(wraps=conn.sock) conn.sock = conn_sock pool._put_conn(conn) # Assert that sock.settimeout() is called with the new connect timeout, then the read timeout. pool.urlopen("GET", "/", timeout=timeout) conn_sock.settimeout.assert_has_calls( [mock.call(x) for x in expect_settimeout_calls] ) def test_tunnel(self) -> None: # note the actual httplib.py has no tests for this functionality timeout = Timeout(total=None) with HTTPConnectionPool(self.host, self.port, timeout=timeout) as pool: conn = pool._get_conn() try: conn.set_tunnel(self.host, self.port) with mock.patch.object( conn, "_tunnel", create=True, return_value=None ) as conn_tunnel: pool._make_request(conn, "GET", "/") conn_tunnel.assert_called_once_with() finally: conn.close() # test that it's not called when tunnel is not set timeout = Timeout(total=None) with HTTPConnectionPool(self.host, self.port, timeout=timeout) as pool: conn = pool._get_conn() try: with mock.patch.object( conn, "_tunnel", create=True, return_value=None ) as conn_tunnel: pool._make_request(conn, "GET", "/") assert not conn_tunnel.called finally: conn.close() def test_redirect_relative_url_no_deprecation(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: with warnings.catch_warnings(): warnings.simplefilter("error", DeprecationWarning) pool.request("GET", "/redirect", fields={"target": "/"}) def test_redirect(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("GET", "/redirect", fields={"target": "/"}, redirect=False) assert r.status == 303 r = pool.request("GET", "/redirect", fields={"target": "/"}) assert r.status == 200 assert r.data == b"Dummy server!" def test_303_redirect_makes_request_lose_body(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: response = pool.request( "POST", "/redirect", fields={"target": "/headers_and_params", "status": "303 See Other"}, ) data = response.json() assert data["params"] == {} assert "Content-Type" not in HTTPHeaderDict(data["headers"]) def test_bad_connect(self) -> None: with HTTPConnectionPool("badhost.invalid", self.port) as pool: with pytest.raises(MaxRetryError) as e: pool.request("GET", "/", retries=5) assert type(e.value.reason) is NameResolutionError def test_keepalive(self) -> None: with HTTPConnectionPool(self.host, self.port, block=True, maxsize=1) as pool: r = pool.request("GET", "/keepalive?close=0") r = pool.request("GET", "/keepalive?close=0") assert r.status == 200 assert pool.num_connections == 1 assert pool.num_requests == 2 def test_keepalive_close(self) -> None: with HTTPConnectionPool( self.host, self.port, block=True, maxsize=1, timeout=2 ) as pool: r = pool.request( "GET", "/keepalive?close=1", retries=0, headers={"Connection": "close"} ) assert pool.num_connections == 1 # The dummyserver will have responded with Connection:close, # and httplib will properly cleanup the socket. # We grab the HTTPConnection object straight from the Queue, # because _get_conn() is where the check & reset occurs assert pool.pool is not None conn = pool.pool.get() assert conn.sock is None pool._put_conn(conn) # Now with keep-alive r = pool.request( "GET", "/keepalive?close=0", retries=0, headers={"Connection": "keep-alive"}, ) # The dummyserver responded with Connection:keep-alive, the connection # persists. conn = pool.pool.get() assert conn.sock is not None pool._put_conn(conn) # Another request asking the server to close the connection. This one # should get cleaned up for the next request. r = pool.request( "GET", "/keepalive?close=1", retries=0, headers={"Connection": "close"} ) assert r.status == 200 conn = pool.pool.get() assert conn.sock is None pool._put_conn(conn) # Next request r = pool.request("GET", "/keepalive?close=0") def test_post_with_urlencode(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: data = {"banana": "hammock", "lol": "cat"} r = pool.request("POST", "/echo", fields=data, encode_multipart=False) assert r.data.decode("utf-8") == urlencode(data) def test_post_with_multipart(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: data = {"banana": "hammock", "lol": "cat"} r = pool.request("POST", "/echo", fields=data, encode_multipart=True) body = r.data.split(b"\r\n") encoded_data = encode_multipart_formdata(data)[0] expected_body = encoded_data.split(b"\r\n") # TODO: Get rid of extra parsing stuff when you can specify # a custom boundary to encode_multipart_formdata """ We need to loop the return lines because a timestamp is attached from within encode_multipart_formdata. When the server echos back the data, it has the timestamp from when the data was encoded, which is not equivalent to when we run encode_multipart_formdata on the data again. """ for i, line in enumerate(body): if line.startswith(b"--"): continue assert body[i] == expected_body[i] def test_post_with_multipart__iter__(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: data = {"hello": "world"} r = pool.request( "POST", "/echo", fields=data, preload_content=False, multipart_boundary="boundary", encode_multipart=True, ) chunks = [chunk for chunk in r] assert chunks == [ b"--boundary\r\n", b'Content-Disposition: form-data; name="hello"\r\n', b"\r\n", b"world\r\n", b"--boundary--\r\n", ] def test_check_gzip(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request( "GET", "/encodingrequest", headers={"accept-encoding": "gzip"} ) assert r.headers.get("content-encoding") == "gzip" assert r.data == b"hello, world!" def test_check_deflate(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request( "GET", "/encodingrequest", headers={"accept-encoding": "deflate"} ) assert r.headers.get("content-encoding") == "deflate" assert r.data == b"hello, world!" def test_bad_decode(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: with pytest.raises(DecodeError): pool.request( "GET", "/encodingrequest", headers={"accept-encoding": "garbage-deflate"}, ) with pytest.raises(DecodeError): pool.request( "GET", "/encodingrequest", headers={"accept-encoding": "garbage-gzip"}, ) def test_connection_count(self) -> None: with HTTPConnectionPool(self.host, self.port, maxsize=1) as pool: pool.request("GET", "/") pool.request("GET", "/") pool.request("GET", "/") assert pool.num_connections == 1 assert pool.num_requests == 3 def test_connection_count_bigpool(self) -> None: with HTTPConnectionPool(self.host, self.port, maxsize=16) as http_pool: http_pool.request("GET", "/") http_pool.request("GET", "/") http_pool.request("GET", "/") assert http_pool.num_connections == 1 assert http_pool.num_requests == 3 def test_partial_response(self) -> None: with HTTPConnectionPool(self.host, self.port, maxsize=1) as pool: req_data = {"lol": "cat"} resp_data = urlencode(req_data).encode("utf-8") r = pool.request("GET", "/echo", fields=req_data, preload_content=False) assert r.read(5) == resp_data[:5] assert r.read() == resp_data[5:] def test_lazy_load_twice(self) -> None: # This test is sad and confusing. Need to figure out what's # going on with partial reads and socket reuse. with HTTPConnectionPool( self.host, self.port, block=True, maxsize=1, timeout=2 ) as pool: payload_size = 1024 * 2 first_chunk = 512 boundary = "foo" req_data = {"count": "a" * payload_size} resp_data = encode_multipart_formdata(req_data, boundary=boundary)[0] req2_data = {"count": "b" * payload_size} resp2_data = encode_multipart_formdata(req2_data, boundary=boundary)[0] r1 = pool.request( "POST", "/echo", fields=req_data, multipart_boundary=boundary, preload_content=False, ) assert r1.read(first_chunk) == resp_data[:first_chunk] try: r2 = pool.request( "POST", "/echo", fields=req2_data, multipart_boundary=boundary, preload_content=False, pool_timeout=0.001, ) # This branch should generally bail here, but maybe someday it will # work? Perhaps by some sort of magic. Consider it a TODO. assert r2.read(first_chunk) == resp2_data[:first_chunk] assert r1.read() == resp_data[first_chunk:] assert r2.read() == resp2_data[first_chunk:] assert pool.num_requests == 2 except EmptyPoolError: assert r1.read() == resp_data[first_chunk:] assert pool.num_requests == 1 assert pool.num_connections == 1 def test_for_double_release(self) -> None: MAXSIZE = 5 # Check default state with HTTPConnectionPool(self.host, self.port, maxsize=MAXSIZE) as pool: assert pool.num_connections == 0 assert pool.pool is not None assert pool.pool.qsize() == MAXSIZE # Make an empty slot for testing pool.pool.get() assert pool.pool.qsize() == MAXSIZE - 1 # Check state after simple request pool.urlopen("GET", "/") assert pool.pool.qsize() == MAXSIZE - 1 # Check state without release pool.urlopen("GET", "/", preload_content=False) assert pool.pool.qsize() == MAXSIZE - 2 pool.urlopen("GET", "/") assert pool.pool.qsize() == MAXSIZE - 2 # Check state after read pool.urlopen("GET", "/").data assert pool.pool.qsize() == MAXSIZE - 2 pool.urlopen("GET", "/") assert pool.pool.qsize() == MAXSIZE - 2 def test_release_conn_parameter(self) -> None: MAXSIZE = 5 with HTTPConnectionPool(self.host, self.port, maxsize=MAXSIZE) as pool: assert pool.pool is not None assert pool.pool.qsize() == MAXSIZE # Make request without releasing connection pool.request("GET", "/", release_conn=False, preload_content=False) assert pool.pool.qsize() == MAXSIZE - 1 def test_dns_error(self) -> None: with HTTPConnectionPool( "thishostdoesnotexist.invalid", self.port, timeout=0.001 ) as pool: with pytest.raises(MaxRetryError): pool.request("GET", "/test", retries=2) @pytest.mark.parametrize("char", [" ", "\r", "\n", "\x00"]) def test_invalid_method_not_allowed(self, char: str) -> None: with pytest.raises(ValueError): with HTTPConnectionPool(self.host, self.port) as pool: pool.request("GET" + char, "/") def test_percent_encode_invalid_target_chars(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("GET", "/echo_params?q=\r&k=\n \n") assert r.data == b"[('k', '\\n \\n'), ('q', '\\r')]" def test_source_address(self) -> None: for addr, is_ipv6 in VALID_SOURCE_ADDRESSES: if is_ipv6: # TODO enable if HAS_IPV6_AND_DNS when this is fixed: # https://github.com/pgjones/hypercorn/issues/160 warnings.warn("No IPv6 support: skipping.", NoIPv6Warning) continue with HTTPConnectionPool( self.host, self.port, source_address=addr, retries=False ) as pool: r = pool.request("GET", "/source_address") assert r.data == addr[0].encode() @pytest.mark.parametrize( "invalid_source_address, is_ipv6", INVALID_SOURCE_ADDRESSES ) def test_source_address_error( self, invalid_source_address: tuple[str, int], is_ipv6: bool ) -> None: with HTTPConnectionPool( self.host, self.port, source_address=invalid_source_address, retries=False ) as pool: if is_ipv6: with pytest.raises(NameResolutionError): pool.request("GET", f"/source_address?{invalid_source_address}") else: with pytest.raises(NewConnectionError): pool.request("GET", f"/source_address?{invalid_source_address}") def test_stream_keepalive(self) -> None: x = 2 with HTTPConnectionPool(self.host, self.port) as pool: for _ in range(x): response = pool.request( "GET", "/chunked", headers={"Connection": "keep-alive"}, preload_content=False, retries=False, ) for chunk in response.stream(): assert chunk == b"123" assert pool.num_connections == 1 assert pool.num_requests == x def test_read_chunked_short_circuit(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: response = pool.request("GET", "/chunked", preload_content=False) response.read() with pytest.raises(StopIteration): next(response.read_chunked()) def test_read_chunked_on_closed_response(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: response = pool.request("GET", "/chunked", preload_content=False) response.close() with pytest.raises(StopIteration): next(response.read_chunked()) def test_chunked_gzip(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: response = pool.request( "GET", "/chunked_gzip", preload_content=False, decode_content=True ) assert b"123" * 4 == response.read() def test_cleanup_on_connection_error(self) -> None: """ Test that connections are recycled to the pool on connection errors where no http response is received. """ poolsize = 3 with HTTPConnectionPool( self.host, self.port, maxsize=poolsize, block=True ) as http: assert http.pool is not None assert http.pool.qsize() == poolsize # force a connection error by supplying a non-existent # url. We won't get a response for this and so the # conn won't be implicitly returned to the pool. with pytest.raises(MaxRetryError): http.request( "GET", "/redirect", fields={"target": "/"}, release_conn=False, retries=0, ) r = http.request( "GET", "/redirect", fields={"target": "/"}, release_conn=False, retries=1, ) r.release_conn() # the pool should still contain poolsize elements assert http.pool.qsize() == http.pool.maxsize def test_mixed_case_hostname(self) -> None: with HTTPConnectionPool("LoCaLhOsT", self.port) as pool: response = pool.request("GET", f"http://LoCaLhOsT:{self.port}/") assert response.status == 200 def test_preserves_path_dot_segments(self) -> None: """ConnectionPool preserves dot segments in the URI""" with HTTPConnectionPool(self.host, self.port) as pool: response = pool.request("GET", "/echo_uri/seg0/../seg2") assert response.data == b"/echo_uri/seg0/../seg2?" def test_default_user_agent_header(self) -> None: """ConnectionPool has a default user agent""" default_ua = _get_default_user_agent() custom_ua = "I'm not a web scraper, what are you talking about?" custom_ua2 = "Yet Another User Agent" with HTTPConnectionPool(self.host, self.port) as pool: # Use default user agent if no user agent was specified. r = pool.request("GET", "/headers") request_headers = r.json() assert request_headers.get("User-Agent") == _get_default_user_agent() # Prefer the request user agent over the default. headers = {"UsEr-AGENt": custom_ua} r = pool.request("GET", "/headers", headers=headers) request_headers = r.json() assert request_headers.get("User-Agent") == custom_ua # Do not modify pool headers when using the default user agent. pool_headers = {"foo": "bar"} pool.headers = pool_headers r = pool.request("GET", "/headers") request_headers = r.json() assert request_headers.get("User-Agent") == default_ua assert "User-Agent" not in pool_headers pool.headers.update({"User-Agent": custom_ua2}) r = pool.request("GET", "/headers") request_headers = r.json() assert request_headers.get("User-Agent") == custom_ua2 @pytest.mark.parametrize( "headers", [ None, {}, {"User-Agent": "key"}, {"user-agent": "key"}, {b"uSeR-AgEnT": b"key"}, {b"user-agent": "key"}, ], ) @pytest.mark.parametrize("chunked", [True, False]) def test_user_agent_header_not_sent_twice( self, headers: dict[str, str] | None, chunked: bool ) -> None: with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("GET", "/headers", headers=headers, chunked=chunked) request_headers = r.json() if not headers: assert request_headers["User-Agent"].startswith("python-urllib3/") assert "key" not in request_headers["User-Agent"] else: assert request_headers["User-Agent"] == "key" def test_no_user_agent_header(self) -> None: """ConnectionPool can suppress sending a user agent header""" custom_ua = "I'm not a web scraper, what are you talking about?" with HTTPConnectionPool(self.host, self.port) as pool: # Suppress user agent in the request headers. no_ua_headers = {"User-Agent": SKIP_HEADER} r = pool.request("GET", "/headers", headers=no_ua_headers) request_headers = r.json() assert "User-Agent" not in request_headers assert no_ua_headers["User-Agent"] == SKIP_HEADER # Suppress user agent in the pool headers. pool.headers = no_ua_headers r = pool.request("GET", "/headers") request_headers = r.json() assert "User-Agent" not in request_headers assert no_ua_headers["User-Agent"] == SKIP_HEADER # Request headers override pool headers. pool_headers = {"User-Agent": custom_ua} pool.headers = pool_headers r = pool.request("GET", "/headers", headers=no_ua_headers) request_headers = r.json() assert "User-Agent" not in request_headers assert no_ua_headers["User-Agent"] == SKIP_HEADER assert pool_headers.get("User-Agent") == custom_ua @pytest.mark.parametrize("header", ["Content-Length", "content-length"]) @pytest.mark.parametrize("chunked", [True, False]) def test_skip_header_non_supported(self, header: str, chunked: bool) -> None: with HTTPConnectionPool(self.host, self.port) as pool: with pytest.raises( ValueError, match="urllib3.util.SKIP_HEADER only supports 'Accept-Encoding', 'Host', 'User-Agent'", ) as e: pool.request( "GET", "/headers", headers={header: SKIP_HEADER}, chunked=chunked ) # Ensure that the error message stays up to date with 'SKIP_HEADER_SUPPORTED_HEADERS' assert all( ("'" + header.title() + "'") in str(e.value) for header in SKIPPABLE_HEADERS ) @pytest.mark.parametrize("chunked", [True, False]) @pytest.mark.parametrize("pool_request", [True, False]) @pytest.mark.parametrize("header_type", [dict, HTTPHeaderDict]) def test_headers_not_modified_by_request( self, chunked: bool, pool_request: bool, header_type: type[dict[str, str] | HTTPHeaderDict], ) -> None: # Test that the .request*() methods of ConnectionPool and HTTPConnection # don't modify the given 'headers' structure, instead they should # make their own internal copies at request time. headers = header_type() headers["key"] = "val" with HTTPConnectionPool(self.host, self.port) as pool: pool.headers = headers if pool_request: pool.request("GET", "/headers", chunked=chunked) else: conn = pool._get_conn() conn.request("GET", "/headers", chunked=chunked) conn.getresponse().close() conn.close() assert pool.headers == {"key": "val"} assert type(pool.headers) is header_type with HTTPConnectionPool(self.host, self.port) as pool: if pool_request: pool.request("GET", "/headers", headers=headers, chunked=chunked) else: conn = pool._get_conn() conn.request("GET", "/headers", headers=headers, chunked=chunked) conn.getresponse().close() conn.close() assert headers == {"key": "val"} def test_request_chunked_is_deprecated( self, ) -> None: with HTTPConnectionPool(self.host, self.port) as pool: conn = pool._get_conn() with pytest.warns(DeprecationWarning) as w: conn.request_chunked("GET", "/headers") # type: ignore[attr-defined] assert len(w) == 1 and str(w[0].message) == ( "HTTPConnection.request_chunked() is deprecated and will be removed in urllib3 v2.1.0. " "Instead use HTTPConnection.request(..., chunked=True)." ) resp = conn.getresponse() assert resp.status == 200 assert resp.json()["Transfer-Encoding"] == "chunked" conn.close() def test_bytes_header(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: headers = {"User-Agent": "test header"} r = pool.request("GET", "/headers", headers=headers) request_headers = r.json() assert "User-Agent" in request_headers assert request_headers["User-Agent"] == "test header" @pytest.mark.parametrize( "user_agent", ["Schönefeld/1.18.0", "Schönefeld/1.18.0".encode("iso-8859-1")] ) def test_user_agent_non_ascii_user_agent(self, user_agent: str) -> None: with HTTPConnectionPool(self.host, self.port, retries=False) as pool: r = pool.urlopen( "GET", "/headers", headers={"User-Agent": user_agent}, ) request_headers = r.json() assert "User-Agent" in request_headers assert request_headers["User-Agent"] == "Schönefeld/1.18.0" class TestRetry(HypercornDummyServerTestCase): def test_max_retry(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: with pytest.raises(MaxRetryError): pool.request("GET", "/redirect", fields={"target": "/"}, retries=0) def test_disabled_retry(self) -> None: """Disabled retries should disable redirect handling.""" with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("GET", "/redirect", fields={"target": "/"}, retries=False) assert r.status == 303 r = pool.request( "GET", "/redirect", fields={"target": "/"}, retries=Retry(redirect=False), ) assert r.status == 303 with HTTPConnectionPool( "thishostdoesnotexist.invalid", self.port, timeout=0.001 ) as pool: with pytest.raises(NameResolutionError): pool.request("GET", "/test", retries=False) def test_read_retries(self) -> None: """Should retry for status codes in the forcelist""" with HTTPConnectionPool(self.host, self.port) as pool: retry = Retry(read=1, status_forcelist=[418]) resp = pool.request( "GET", "/successful_retry", headers={"test-name": "test_read_retries"}, retries=retry, ) assert resp.status == 200 def test_read_total_retries(self) -> None: """HTTP response w/ status code in the forcelist should be retried""" with HTTPConnectionPool(self.host, self.port) as pool: headers = {"test-name": "test_read_total_retries"} retry = Retry(total=1, status_forcelist=[418]) resp = pool.request( "GET", "/successful_retry", headers=headers, retries=retry ) assert resp.status == 200 def test_retries_wrong_forcelist(self) -> None: """HTTP response w/ status code not in forcelist shouldn't be retried""" with HTTPConnectionPool(self.host, self.port) as pool: retry = Retry(total=1, status_forcelist=[202]) resp = pool.request( "GET", "/successful_retry", headers={"test-name": "test_wrong_forcelist"}, retries=retry, ) assert resp.status == 418 def test_default_method_forcelist_retried(self) -> None: """urllib3 should retry methods in the default method forcelist""" with HTTPConnectionPool(self.host, self.port) as pool: retry = Retry(total=1, status_forcelist=[418]) resp = pool.request( "OPTIONS", "/successful_retry", headers={"test-name": "test_default_forcelist"}, retries=retry, ) assert resp.status == 200 def test_retries_wrong_method_list(self) -> None: """Method not in our allowed list should not be retried, even if code matches""" with HTTPConnectionPool(self.host, self.port) as pool: headers = {"test-name": "test_wrong_allowed_method"} retry = Retry(total=1, status_forcelist=[418], allowed_methods=["POST"]) resp = pool.request( "GET", "/successful_retry", headers=headers, retries=retry ) assert resp.status == 418 def test_read_retries_unsuccessful(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: headers = {"test-name": "test_read_retries_unsuccessful"} resp = pool.request("GET", "/successful_retry", headers=headers, retries=1) assert resp.status == 418 def test_retry_reuse_safe(self) -> None: """It should be possible to reuse a Retry object across requests""" with HTTPConnectionPool(self.host, self.port) as pool: headers = {"test-name": "test_retry_safe"} retry = Retry(total=1, status_forcelist=[418]) resp = pool.request( "GET", "/successful_retry", headers=headers, retries=retry ) assert resp.status == 200 with HTTPConnectionPool(self.host, self.port) as pool: resp = pool.request( "GET", "/successful_retry", headers=headers, retries=retry ) assert resp.status == 200 def test_retry_return_in_response(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: headers = {"test-name": "test_retry_return_in_response"} retry = Retry(total=2, status_forcelist=[418]) resp = pool.request( "GET", "/successful_retry", headers=headers, retries=retry ) assert resp.status == 200 assert resp.retries is not None assert resp.retries.total == 1 assert resp.retries.history == ( RequestHistory("GET", "/successful_retry", None, 418, None), ) def test_retry_redirect_history(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: resp = pool.request("GET", "/redirect", fields={"target": "/"}) assert resp.status == 200 assert resp.retries is not None assert resp.retries.history == ( RequestHistory("GET", "/redirect?target=%2F", None, 303, "/"), ) def test_multi_redirect_history(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request( "GET", "/multi_redirect", fields={"redirect_codes": "303,302,200"}, redirect=False, ) assert r.status == 303 assert r.retries is not None assert r.retries.history == tuple() with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request( "GET", "/multi_redirect", retries=10, fields={"redirect_codes": "303,302,301,307,302,200"}, ) assert r.status == 200 assert r.data == b"Done redirecting" expected = [ (303, "/multi_redirect?redirect_codes=302,301,307,302,200"), (302, "/multi_redirect?redirect_codes=301,307,302,200"), (301, "/multi_redirect?redirect_codes=307,302,200"), (307, "/multi_redirect?redirect_codes=302,200"), (302, "/multi_redirect?redirect_codes=200"), ] assert r.retries is not None actual = [ (history.status, history.redirect_location) for history in r.retries.history ] assert actual == expected class TestRetryAfter(HypercornDummyServerTestCase): def test_retry_after(self) -> None: # Request twice in a second to get a 429 response. with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request( "GET", "/retry_after", fields={"status": "429 Too Many Requests"}, retries=False, ) r = pool.request( "GET", "/retry_after", fields={"status": "429 Too Many Requests"}, retries=False, ) assert r.status == 429 r = pool.request( "GET", "/retry_after", fields={"status": "429 Too Many Requests"}, retries=True, ) assert r.status == 200 # Request twice in a second to get a 503 response. r = pool.request( "GET", "/retry_after", fields={"status": "503 Service Unavailable"}, retries=False, ) r = pool.request( "GET", "/retry_after", fields={"status": "503 Service Unavailable"}, retries=False, ) assert r.status == 503 r = pool.request( "GET", "/retry_after", fields={"status": "503 Service Unavailable"}, retries=True, ) assert r.status == 200 # Ignore Retry-After header on status which is not defined in # Retry.RETRY_AFTER_STATUS_CODES. r = pool.request( "GET", "/retry_after", fields={"status": "418 I'm a teapot"}, retries=True, ) assert r.status == 418 def test_redirect_after(self) -> None: with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("GET", "/redirect_after", retries=False) assert r.status == 303 t = time.time() r = pool.request("GET", "/redirect_after") assert r.status == 200 delta = time.time() - t assert delta >= 1 t = time.time() timestamp = t + 2 r = pool.request("GET", "/redirect_after?date=" + str(timestamp)) assert r.status == 200 delta = time.time() - t assert delta >= 1 # Retry-After is past t = time.time() timestamp = t - 1 r = pool.request("GET", "/redirect_after?date=" + str(timestamp)) delta = time.time() - t assert r.status == 200 assert delta < 1 class TestFileBodiesOnRetryOrRedirect(HypercornDummyServerTestCase): def test_retries_put_filehandle(self) -> None: """HTTP PUT retry with a file-like object should not timeout""" with HTTPConnectionPool(self.host, self.port, timeout=LONG_TIMEOUT) as pool: retry = Retry(total=3, status_forcelist=[418]) # httplib reads in 8k chunks; use a larger content length content_length = 65535 data = b"A" * content_length uploaded_file = io.BytesIO(data) headers = { "test-name": "test_retries_put_filehandle", "Content-Length": str(content_length), } resp = pool.urlopen( "PUT", "/successful_retry", headers=headers, retries=retry, body=uploaded_file, assert_same_host=False, redirect=False, ) assert resp.status == 200 def test_redirect_put_file(self) -> None: """PUT with file object should work with a redirection response""" with HTTPConnectionPool(self.host, self.port, timeout=LONG_TIMEOUT) as pool: retry = Retry(total=3, status_forcelist=[418]) # httplib reads in 8k chunks; use a larger content length content_length = 65535 data = b"A" * content_length uploaded_file = io.BytesIO(data) headers = { "test-name": "test_redirect_put_file", "Content-Length": str(content_length), } url = "/redirect?target=/echo&status=307" resp = pool.urlopen( "PUT", url, headers=headers, retries=retry, body=uploaded_file, assert_same_host=False, redirect=True, ) assert resp.status == 200 assert resp.data == data def test_redirect_with_failed_tell(self) -> None: """Abort request if failed to get a position from tell()""" class BadTellObject(io.BytesIO): def tell(self) -> typing.NoReturn: raise OSError body = BadTellObject(b"the data") url = "/redirect?target=/successful_retry" # httplib uses fileno if Content-Length isn't supplied, # which is unsupported by BytesIO. headers = {"Content-Length": "8"} with HTTPConnectionPool(self.host, self.port, timeout=LONG_TIMEOUT) as pool: with pytest.raises( UnrewindableBodyError, match="Unable to record file position for" ): pool.urlopen("PUT", url, headers=headers, body=body) class TestRetryPoolSize(HypercornDummyServerTestCase): def test_pool_size_retry(self) -> None: retries = Retry(total=1, raise_on_status=False, status_forcelist=[404]) with HTTPConnectionPool( self.host, self.port, maxsize=10, retries=retries, block=True ) as pool: pool.urlopen("GET", "/not_found", preload_content=False) assert pool.num_connections == 1 class TestRedirectPoolSize(HypercornDummyServerTestCase): def test_pool_size_redirect(self) -> None: retries = Retry( total=1, raise_on_status=False, status_forcelist=[404], redirect=True ) with HTTPConnectionPool( self.host, self.port, maxsize=10, retries=retries, block=True ) as pool: pool.urlopen("GET", "/redirect", preload_content=False) assert pool.num_connections == 1 test_https.py 0000644 00000130026 15025306245 0007323 0 ustar 00 from __future__ import annotations import contextlib import datetime import os.path import shutil import ssl import tempfile import warnings from pathlib import Path from test import ( LONG_TIMEOUT, SHORT_TIMEOUT, TARPIT_HOST, requires_network, resolvesLocalhostFQDN, ) from test.conftest import ServerConfig from unittest import mock import pytest import trustme import urllib3.util as util import urllib3.util.ssl_ from dummyserver.socketserver import ( DEFAULT_CA, DEFAULT_CA_KEY, DEFAULT_CERTS, encrypt_key_pem, ) from dummyserver.testcase import HTTPSHypercornDummyServerTestCase from urllib3 import HTTPSConnectionPool from urllib3.connection import RECENT_DATE, HTTPSConnection, VerifiedHTTPSConnection from urllib3.exceptions import ( ConnectTimeoutError, InsecureRequestWarning, MaxRetryError, ProtocolError, SSLError, SystemTimeWarning, ) from urllib3.util.ssl_match_hostname import CertificateError from urllib3.util.timeout import Timeout from .. import has_alpn TLSv1_CERTS = DEFAULT_CERTS.copy() TLSv1_CERTS["ssl_version"] = getattr(ssl, "PROTOCOL_TLSv1", None) TLSv1_1_CERTS = DEFAULT_CERTS.copy() TLSv1_1_CERTS["ssl_version"] = getattr(ssl, "PROTOCOL_TLSv1_1", None) TLSv1_2_CERTS = DEFAULT_CERTS.copy() TLSv1_2_CERTS["ssl_version"] = getattr(ssl, "PROTOCOL_TLSv1_2", None) TLSv1_3_CERTS = DEFAULT_CERTS.copy() TLSv1_3_CERTS["ssl_version"] = getattr(ssl, "PROTOCOL_TLS", None) CLIENT_INTERMEDIATE_PEM = "client_intermediate.pem" CLIENT_NO_INTERMEDIATE_PEM = "client_no_intermediate.pem" CLIENT_INTERMEDIATE_KEY = "client_intermediate.key" PASSWORD_CLIENT_KEYFILE = "client_password.key" CLIENT_CERT = CLIENT_INTERMEDIATE_PEM class BaseTestHTTPS(HTTPSHypercornDummyServerTestCase): tls_protocol_name: str | None = None def tls_protocol_not_default(self) -> bool: return self.tls_protocol_name in {"TLSv1", "TLSv1.1"} def tls_version(self) -> ssl.TLSVersion: if self.tls_protocol_name is None: return pytest.skip("Skipping base test class") try: from ssl import TLSVersion except ImportError: return pytest.skip("ssl.TLSVersion isn't available") return TLSVersion[self.tls_protocol_name.replace(".", "_")] def ssl_version(self) -> int: if self.tls_protocol_name is None: return pytest.skip("Skipping base test class") if self.tls_protocol_name == "TLSv1.3" and ssl.HAS_TLSv1_3: return ssl.PROTOCOL_TLS_CLIENT if self.tls_protocol_name == "TLSv1.2" and ssl.HAS_TLSv1_2: return ssl.PROTOCOL_TLSv1_2 if self.tls_protocol_name == "TLSv1.1" and ssl.HAS_TLSv1_1: return ssl.PROTOCOL_TLSv1_1 if self.tls_protocol_name == "TLSv1" and ssl.HAS_TLSv1: return ssl.PROTOCOL_TLSv1 else: return pytest.skip(f"{self.tls_protocol_name} isn't available") @classmethod def setup_class(cls) -> None: super().setup_class() cls.certs_dir = tempfile.mkdtemp() # Start from existing root CA as we don't want to change the server certificate yet with open(DEFAULT_CA, "rb") as crt, open(DEFAULT_CA_KEY, "rb") as key: root_ca = trustme.CA.from_pem(crt.read(), key.read()) # Generate another CA to test verification failure bad_ca = trustme.CA() cls.bad_ca_path = os.path.join(cls.certs_dir, "ca_bad.pem") bad_ca.cert_pem.write_to_path(cls.bad_ca_path) # client cert chain intermediate_ca = root_ca.create_child_ca() cert = intermediate_ca.issue_cert("example.com") encrypted_key = encrypt_key_pem(cert.private_key_pem, b"letmein") cert.private_key_pem.write_to_path( os.path.join(cls.certs_dir, CLIENT_INTERMEDIATE_KEY) ) encrypted_key.write_to_path( os.path.join(cls.certs_dir, PASSWORD_CLIENT_KEYFILE) ) # Write the client cert and the intermediate CA client_cert = os.path.join(cls.certs_dir, CLIENT_INTERMEDIATE_PEM) cert.cert_chain_pems[0].write_to_path(client_cert) cert.cert_chain_pems[1].write_to_path(client_cert, append=True) # Write only the client cert cert.cert_chain_pems[0].write_to_path( os.path.join(cls.certs_dir, CLIENT_NO_INTERMEDIATE_PEM) ) @classmethod def teardown_class(cls) -> None: super().teardown_class() shutil.rmtree(cls.certs_dir) def test_simple(self, http_version: str) -> None: with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as https_pool: r = https_pool.request("GET", "/") assert r.status == 200, r.data assert r.headers["server"] == f"hypercorn-{http_version}" assert r.data == b"Dummy server!" @resolvesLocalhostFQDN() def test_dotted_fqdn(self) -> None: with HTTPSConnectionPool( self.host + ".", self.port, ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as pool: r = pool.request("GET", "/") assert r.status == 200, r.data def test_client_intermediate(self) -> None: """Check that certificate chains work well with client certs We generate an intermediate CA from the root CA, and issue a client certificate from that intermediate CA. Since the server only knows about the root CA, we need to send it the certificate *and* the intermediate CA, so that it can check the whole chain. """ with HTTPSConnectionPool( self.host, self.port, key_file=os.path.join(self.certs_dir, CLIENT_INTERMEDIATE_KEY), cert_file=os.path.join(self.certs_dir, CLIENT_INTERMEDIATE_PEM), ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as https_pool: r = https_pool.request("GET", "/certificate") subject = r.json() assert subject["organizationalUnitName"].startswith("Testing cert") def test_client_no_intermediate(self) -> None: """Check that missing links in certificate chains indeed break The only difference with test_client_intermediate is that we don't send the intermediate CA to the server, only the client cert. """ with HTTPSConnectionPool( self.host, self.port, cert_file=os.path.join(self.certs_dir, CLIENT_NO_INTERMEDIATE_PEM), key_file=os.path.join(self.certs_dir, CLIENT_INTERMEDIATE_KEY), ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as https_pool: with pytest.raises((SSLError, ProtocolError)): https_pool.request("GET", "/certificate", retries=False) def test_client_key_password(self) -> None: with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, key_file=os.path.join(self.certs_dir, PASSWORD_CLIENT_KEYFILE), cert_file=os.path.join(self.certs_dir, CLIENT_CERT), key_password="letmein", ssl_minimum_version=self.tls_version(), ) as https_pool: r = https_pool.request("GET", "/certificate") subject = r.json() assert subject["organizationalUnitName"].startswith("Testing cert") def test_client_encrypted_key_requires_password(self) -> None: with HTTPSConnectionPool( self.host, self.port, key_file=os.path.join(self.certs_dir, PASSWORD_CLIENT_KEYFILE), cert_file=os.path.join(self.certs_dir, CLIENT_CERT), key_password=None, ssl_minimum_version=self.tls_version(), ) as https_pool: with pytest.raises(MaxRetryError, match="password is required") as e: https_pool.request("GET", "/certificate") assert type(e.value.reason) is SSLError def test_verified(self) -> None: with HTTPSConnectionPool( self.host, self.port, cert_reqs="CERT_REQUIRED", ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as https_pool: with contextlib.closing(https_pool._new_conn()) as conn: assert conn.__class__ == VerifiedHTTPSConnection with warnings.catch_warnings(record=True) as w: r = https_pool.request("GET", "/") assert r.status == 200 assert [str(wm) for wm in w] == [] def test_verified_with_context(self) -> None: ctx = util.ssl_.create_urllib3_context( cert_reqs=ssl.CERT_REQUIRED, ssl_minimum_version=self.tls_version() ) ctx.load_verify_locations(cafile=DEFAULT_CA) with HTTPSConnectionPool(self.host, self.port, ssl_context=ctx) as https_pool: with contextlib.closing(https_pool._new_conn()) as conn: assert conn.__class__ == VerifiedHTTPSConnection with mock.patch("warnings.warn") as warn: r = https_pool.request("GET", "/") assert r.status == 200 assert not warn.called, warn.call_args_list def test_context_combines_with_ca_certs(self) -> None: ctx = util.ssl_.create_urllib3_context( cert_reqs=ssl.CERT_REQUIRED, ssl_minimum_version=self.tls_version() ) with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, ssl_context=ctx ) as https_pool: with contextlib.closing(https_pool._new_conn()) as conn: assert conn.__class__ == VerifiedHTTPSConnection with mock.patch("warnings.warn") as warn: r = https_pool.request("GET", "/") assert r.status == 200 assert not warn.called, warn.call_args_list def test_ca_dir_verified(self, tmp_path: Path) -> None: # OpenSSL looks up certificates by the hash for their name, see c_rehash # TODO infer the bytes using `cryptography.x509.Name.public_bytes`. # https://github.com/pyca/cryptography/pull/3236 shutil.copyfile(DEFAULT_CA, str(tmp_path / "81deb5f7.0")) with HTTPSConnectionPool( self.host, self.port, cert_reqs="CERT_REQUIRED", ca_cert_dir=str(tmp_path), ssl_minimum_version=self.tls_version(), ) as https_pool: with contextlib.closing(https_pool._new_conn()) as conn: assert conn.__class__ == VerifiedHTTPSConnection with warnings.catch_warnings(record=True) as w: r = https_pool.request("GET", "/") assert r.status == 200 assert [str(wm) for wm in w] == [] def test_invalid_common_name(self) -> None: with HTTPSConnectionPool( "127.0.0.1", self.port, cert_reqs="CERT_REQUIRED", ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as https_pool: with pytest.raises(MaxRetryError) as e: https_pool.request("GET", "/", retries=0) assert type(e.value.reason) is SSLError assert "doesn't match" in str( e.value.reason ) or "certificate verify failed" in str(e.value.reason) def test_verified_with_bad_ca_certs(self) -> None: with HTTPSConnectionPool( self.host, self.port, cert_reqs="CERT_REQUIRED", ca_certs=self.bad_ca_path, ssl_minimum_version=self.tls_version(), ) as https_pool: with pytest.raises(MaxRetryError) as e: https_pool.request("GET", "/") assert type(e.value.reason) is SSLError assert ( "certificate verify failed" in str(e.value.reason) # PyPy is more specific or "self signed certificate in certificate chain" in str(e.value.reason) ), f"Expected 'certificate verify failed', instead got: {e.value.reason!r}" def test_wrap_socket_failure_resource_leak(self) -> None: with HTTPSConnectionPool( self.host, self.port, cert_reqs="CERT_REQUIRED", ca_certs=self.bad_ca_path, ssl_minimum_version=self.tls_version(), ) as https_pool: with contextlib.closing(https_pool._get_conn()) as conn: with pytest.raises(ssl.SSLError): conn.connect() assert conn.sock is not None # type: ignore[attr-defined] def test_verified_without_ca_certs(self) -> None: # default is cert_reqs=None which is ssl.CERT_NONE with HTTPSConnectionPool( self.host, self.port, cert_reqs="CERT_REQUIRED", ssl_minimum_version=self.tls_version(), ) as https_pool: with pytest.raises(MaxRetryError) as e: https_pool.request("GET", "/") assert type(e.value.reason) is SSLError # there is a different error message depending on whether or # not pyopenssl is injected assert ( "No root certificates specified" in str(e.value.reason) # PyPy is more specific or "self signed certificate in certificate chain" in str(e.value.reason) # PyPy sometimes uses all-caps here or "certificate verify failed" in str(e.value.reason).lower() or "invalid certificate chain" in str(e.value.reason) ), ( "Expected 'No root certificates specified', " "'certificate verify failed', or " "'invalid certificate chain', " "instead got: %r" % e.value.reason ) def test_no_ssl(self) -> None: with HTTPSConnectionPool(self.host, self.port) as pool: pool.ConnectionCls = None # type: ignore[assignment] with pytest.raises(ImportError): pool._new_conn() with pytest.raises(ImportError): pool.request("GET", "/", retries=0) def test_unverified_ssl(self) -> None: """Test that bare HTTPSConnection can connect, make requests""" with HTTPSConnectionPool( self.host, self.port, cert_reqs=ssl.CERT_NONE, ssl_minimum_version=self.tls_version(), ) as pool: with mock.patch("warnings.warn") as warn: r = pool.request("GET", "/") assert r.status == 200 assert warn.called # Modern versions of Python, or systems using PyOpenSSL, only emit # the unverified warning. Older systems may also emit other # warnings, which we want to ignore here. calls = warn.call_args_list assert InsecureRequestWarning in [x[0][1] for x in calls] def test_ssl_unverified_with_ca_certs(self) -> None: with HTTPSConnectionPool( self.host, self.port, cert_reqs="CERT_NONE", ca_certs=self.bad_ca_path, ssl_minimum_version=self.tls_version(), ) as pool: with mock.patch("warnings.warn") as warn: r = pool.request("GET", "/") assert r.status == 200 assert warn.called # Modern versions of Python, or systems using PyOpenSSL, only emit # the unverified warning. Older systems may also emit other # warnings, which we want to ignore here. calls = warn.call_args_list category = calls[0][0][1] assert category == InsecureRequestWarning def test_assert_hostname_false(self) -> None: with HTTPSConnectionPool( "localhost", self.port, cert_reqs="CERT_REQUIRED", ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as https_pool: https_pool.assert_hostname = False https_pool.request("GET", "/") def test_assert_specific_hostname(self) -> None: with HTTPSConnectionPool( "localhost", self.port, cert_reqs="CERT_REQUIRED", ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as https_pool: https_pool.assert_hostname = "localhost" https_pool.request("GET", "/") def test_server_hostname(self) -> None: with HTTPSConnectionPool( "127.0.0.1", self.port, cert_reqs="CERT_REQUIRED", ca_certs=DEFAULT_CA, server_hostname="localhost", ssl_minimum_version=self.tls_version(), ) as https_pool: conn = https_pool._new_conn() conn.request("GET", "/") # Assert the wrapping socket is using the passed-through SNI name. # pyopenssl doesn't let you pull the server_hostname back off the # socket, so only add this assertion if the attribute is there (i.e. # the python ssl module). if hasattr(conn.sock, "server_hostname"): # type: ignore[attr-defined] assert conn.sock.server_hostname == "localhost" # type: ignore[attr-defined] conn.getresponse().close() conn.close() def test_assert_fingerprint_md5(self) -> None: with HTTPSConnectionPool( "localhost", self.port, cert_reqs="CERT_REQUIRED", ca_certs=DEFAULT_CA, assert_fingerprint=("55:39:BF:70:05:12:43:FA:1F:D1:BF:4E:E8:1B:07:1D"), ssl_minimum_version=self.tls_version(), ) as https_pool: https_pool.request("GET", "/") def test_assert_fingerprint_sha1(self) -> None: with HTTPSConnectionPool( "localhost", self.port, cert_reqs="CERT_REQUIRED", ca_certs=DEFAULT_CA, assert_fingerprint=( "72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A" ), ssl_minimum_version=self.tls_version(), ) as https_pool: https_pool.request("GET", "/") def test_assert_fingerprint_sha256(self) -> None: with HTTPSConnectionPool( "localhost", self.port, cert_reqs="CERT_REQUIRED", ca_certs=DEFAULT_CA, assert_fingerprint=( "E3:59:8E:69:FF:C5:9F:C7:88:87:44:58:22:7F:90:8D:D9:BC:12:C4:90:79:D5:" "DC:A8:5D:4F:60:40:1E:A6:D2" ), ssl_minimum_version=self.tls_version(), ) as https_pool: https_pool.request("GET", "/") def test_assert_invalid_fingerprint(self) -> None: def _test_request(pool: HTTPSConnectionPool) -> SSLError: with pytest.raises(MaxRetryError) as cm: pool.request("GET", "/", retries=0) assert type(cm.value.reason) is SSLError return cm.value.reason with HTTPSConnectionPool( self.host, self.port, cert_reqs="CERT_REQUIRED", ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as https_pool: https_pool.assert_fingerprint = ( "AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA" ) e = _test_request(https_pool) expected = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" got = "728b554c9afc1e88a11cad1bb2e7cc3edbc8f98a" assert ( str(e) == f'Fingerprints did not match. Expected "{expected}", got "{got}"' ) # Uneven length https_pool.assert_fingerprint = "AA:A" e = _test_request(https_pool) assert "Fingerprint of invalid length:" in str(e) # Invalid length https_pool.assert_fingerprint = "AA" e = _test_request(https_pool) assert "Fingerprint of invalid length:" in str(e) def test_verify_none_and_bad_fingerprint(self) -> None: with HTTPSConnectionPool( "127.0.0.1", self.port, cert_reqs="CERT_NONE", assert_hostname=False, assert_fingerprint=( "AA:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A" ), ) as https_pool: with pytest.raises(MaxRetryError) as cm: https_pool.request("GET", "/", retries=0) assert type(cm.value.reason) is SSLError def test_verify_none_and_good_fingerprint(self) -> None: with HTTPSConnectionPool( "127.0.0.1", self.port, cert_reqs="CERT_NONE", assert_hostname=False, assert_fingerprint=( "72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A" ), ) as https_pool: https_pool.request("GET", "/") def test_good_fingerprint_and_hostname_mismatch(self) -> None: with HTTPSConnectionPool( "127.0.0.1", self.port, cert_reqs="CERT_REQUIRED", ca_certs=DEFAULT_CA, assert_fingerprint=( "72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A" ), ssl_minimum_version=self.tls_version(), ) as https_pool: https_pool.request("GET", "/") @requires_network() def test_https_timeout(self) -> None: timeout = Timeout(total=None, connect=SHORT_TIMEOUT) with HTTPSConnectionPool( TARPIT_HOST, self.port, timeout=timeout, retries=False, cert_reqs="CERT_REQUIRED", ssl_minimum_version=self.tls_version(), ) as https_pool: with pytest.raises(ConnectTimeoutError): https_pool.request("GET", "/") timeout = Timeout(read=0.01) with HTTPSConnectionPool( self.host, self.port, timeout=timeout, retries=False, cert_reqs="CERT_REQUIRED", ca_certs=DEFAULT_CA, assert_fingerprint=( "72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A" ), ssl_minimum_version=self.tls_version(), ) as https_pool: # TODO This was removed in https://github.com/urllib3/urllib3/pull/703/files # We need to put something back or remove this block. pass timeout = Timeout(total=None) with HTTPSConnectionPool( self.host, self.port, timeout=timeout, cert_reqs="CERT_NONE", ssl_minimum_version=self.tls_version(), ) as https_pool: with pytest.warns(InsecureRequestWarning): https_pool.request("GET", "/") def test_tunnel(self) -> None: """test the _tunnel behavior""" timeout = Timeout(total=None) with HTTPSConnectionPool( self.host, self.port, timeout=timeout, cert_reqs="CERT_NONE", ssl_minimum_version=self.tls_version(), ) as https_pool: with contextlib.closing(https_pool._new_conn()) as conn: conn.set_tunnel(self.host, self.port) with mock.patch.object( conn, "_tunnel", create=True, return_value=None ) as conn_tunnel: with pytest.warns(InsecureRequestWarning): https_pool._make_request(conn, "GET", "/") conn_tunnel.assert_called_once_with() @requires_network() def test_enhanced_timeout(self) -> None: with HTTPSConnectionPool( TARPIT_HOST, self.port, timeout=Timeout(connect=SHORT_TIMEOUT), retries=False, cert_reqs="CERT_REQUIRED", ) as https_pool: with contextlib.closing(https_pool._new_conn()) as conn: with pytest.raises(ConnectTimeoutError): https_pool.request("GET", "/") with pytest.raises(ConnectTimeoutError): https_pool._make_request(conn, "GET", "/") with HTTPSConnectionPool( TARPIT_HOST, self.port, timeout=Timeout(connect=LONG_TIMEOUT), retries=False, cert_reqs="CERT_REQUIRED", ) as https_pool: with pytest.raises(ConnectTimeoutError): https_pool.request("GET", "/", timeout=Timeout(connect=SHORT_TIMEOUT)) with HTTPSConnectionPool( TARPIT_HOST, self.port, timeout=Timeout(total=None), retries=False, cert_reqs="CERT_REQUIRED", ) as https_pool: with contextlib.closing(https_pool._new_conn()) as conn: with pytest.raises(ConnectTimeoutError): https_pool.request( "GET", "/", timeout=Timeout(total=None, connect=SHORT_TIMEOUT) ) def test_enhanced_ssl_connection(self) -> None: fingerprint = "72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A" with HTTPSConnectionPool( self.host, self.port, cert_reqs="CERT_REQUIRED", ca_certs=DEFAULT_CA, assert_fingerprint=fingerprint, ssl_minimum_version=self.tls_version(), ) as https_pool: r = https_pool.request("GET", "/") assert r.status == 200 def test_ssl_correct_system_time(self) -> None: with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as https_pool: https_pool.cert_reqs = "CERT_REQUIRED" https_pool.ca_certs = DEFAULT_CA w = self._request_without_resource_warnings("GET", "/") assert [] == w def test_ssl_wrong_system_time(self) -> None: with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as https_pool: https_pool.cert_reqs = "CERT_REQUIRED" https_pool.ca_certs = DEFAULT_CA with mock.patch("urllib3.connection.datetime") as mock_date: mock_date.date.today.return_value = datetime.date(1970, 1, 1) w = self._request_without_resource_warnings("GET", "/") assert len(w) == 1 warning = w[0] assert SystemTimeWarning == warning.category assert isinstance(warning.message, Warning) assert str(RECENT_DATE) in warning.message.args[0] def _request_without_resource_warnings( self, method: str, url: str ) -> list[warnings.WarningMessage]: with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as https_pool: https_pool.request(method, url) w = [x for x in w if not isinstance(x.message, ResourceWarning)] return w def test_set_ssl_version_to_tls_version(self) -> None: if self.tls_protocol_name is None: pytest.skip("Skipping base test class") with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA ) as https_pool: https_pool.ssl_version = ssl_version = self.certs["ssl_version"] if ssl_version is getattr(ssl, "PROTOCOL_TLS", object()): cmgr: contextlib.AbstractContextManager[ object ] = contextlib.nullcontext() else: cmgr = pytest.warns( DeprecationWarning, match=r"'ssl_version' option is deprecated and will be removed " r"in urllib3 v2\.1\.0\. Instead use 'ssl_minimum_version'", ) with cmgr: r = https_pool.request("GET", "/") assert r.status == 200, r.data def test_set_cert_default_cert_required(self) -> None: conn = VerifiedHTTPSConnection(self.host, self.port) with pytest.warns(DeprecationWarning) as w: conn.set_cert() assert conn.cert_reqs == ssl.CERT_REQUIRED assert len(w) == 1 and str(w[0].message) == ( "HTTPSConnection.set_cert() is deprecated and will be removed in urllib3 v2.1.0. " "Instead provide the parameters to the HTTPSConnection constructor." ) @pytest.mark.parametrize("verify_mode", [ssl.CERT_NONE, ssl.CERT_REQUIRED]) def test_set_cert_inherits_cert_reqs_from_ssl_context( self, verify_mode: int ) -> None: ssl_context = urllib3.util.ssl_.create_urllib3_context(cert_reqs=verify_mode) assert ssl_context.verify_mode == verify_mode conn = HTTPSConnection(self.host, self.port, ssl_context=ssl_context) with pytest.warns(DeprecationWarning) as w: conn.set_cert() assert conn.cert_reqs == verify_mode assert ( conn.ssl_context is not None and conn.ssl_context.verify_mode == verify_mode ) assert len(w) == 1 and str(w[0].message) == ( "HTTPSConnection.set_cert() is deprecated and will be removed in urllib3 v2.1.0. " "Instead provide the parameters to the HTTPSConnection constructor." ) def test_tls_protocol_name_of_socket(self) -> None: if self.tls_protocol_name is None: pytest.skip("Skipping base test class") with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ssl_maximum_version=self.tls_version(), ) as https_pool: with contextlib.closing(https_pool._get_conn()) as conn: conn.connect() if not hasattr(conn.sock, "version"): # type: ignore[attr-defined] pytest.skip("SSLSocket.version() not available") assert conn.sock.version() == self.tls_protocol_name # type: ignore[attr-defined] def test_ssl_version_is_deprecated(self) -> None: if self.tls_protocol_name is None: pytest.skip("Skipping base test class") if self.ssl_version() == ssl.PROTOCOL_TLS_CLIENT: pytest.skip( "Skipping because ssl_version=ssl.PROTOCOL_TLS_CLIENT is not deprecated" ) with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, ssl_version=self.ssl_version() ) as https_pool: with contextlib.closing(https_pool._get_conn()) as conn: with pytest.warns(DeprecationWarning) as w: conn.connect() assert len(w) >= 1 assert any(x.category == DeprecationWarning for x in w) assert any( str(x.message) == ( "'ssl_version' option is deprecated and will be removed in " "urllib3 v2.1.0. Instead use 'ssl_minimum_version'" ) for x in w ) @pytest.mark.parametrize( "ssl_version", [None, ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_CLIENT] ) def test_ssl_version_with_protocol_tls_or_client_not_deprecated( self, ssl_version: int | None ) -> None: if self.tls_protocol_name is None: pytest.skip("Skipping base test class") if self.tls_protocol_not_default(): pytest.skip( f"Skipping because '{self.tls_protocol_name}' isn't set by default" ) with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, ssl_version=ssl_version ) as https_pool: with contextlib.closing(https_pool._get_conn()) as conn: with warnings.catch_warnings(record=True) as w: conn.connect() assert [str(wm) for wm in w if wm.category != ResourceWarning] == [] def test_no_tls_version_deprecation_with_ssl_context(self) -> None: if self.tls_protocol_name is None: pytest.skip("Skipping base test class") ctx = util.ssl_.create_urllib3_context(ssl_minimum_version=self.tls_version()) with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, ssl_context=ctx, ) as https_pool: with contextlib.closing(https_pool._get_conn()) as conn: with warnings.catch_warnings(record=True) as w: conn.connect() assert [str(wm) for wm in w if wm.category != ResourceWarning] == [] def test_tls_version_maximum_and_minimum(self) -> None: if self.tls_protocol_name is None: pytest.skip("Skipping base test class") from ssl import TLSVersion min_max_versions = [ (self.tls_version(), self.tls_version()), (TLSVersion.MINIMUM_SUPPORTED, self.tls_version()), (TLSVersion.MINIMUM_SUPPORTED, TLSVersion.MAXIMUM_SUPPORTED), ] for minimum_version, maximum_version in min_max_versions: with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, ssl_minimum_version=minimum_version, ssl_maximum_version=maximum_version, ) as https_pool: conn = https_pool._get_conn() try: conn.connect() if maximum_version == TLSVersion.MAXIMUM_SUPPORTED: # A higher protocol than tls_protocol_name could be negotiated assert conn.sock.version() >= self.tls_protocol_name # type: ignore[attr-defined] else: assert conn.sock.version() == self.tls_protocol_name # type: ignore[attr-defined] finally: conn.close() def test_sslkeylogfile( self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: if not hasattr(util.SSLContext, "keylog_filename"): pytest.skip("requires OpenSSL 1.1.1+") keylog_file = tmp_path / "keylogfile.txt" monkeypatch.setenv("SSLKEYLOGFILE", str(keylog_file)) with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as https_pool: r = https_pool.request("GET", "/") assert r.status == 200, r.data assert keylog_file.is_file(), "keylogfile '%s' should exist" % str( keylog_file ) assert keylog_file.read_text().startswith( "# TLS secrets log file" ), "keylogfile '%s' should start with '# TLS secrets log file'" % str( keylog_file ) @pytest.mark.parametrize("sslkeylogfile", [None, ""]) def test_sslkeylogfile_empty( self, monkeypatch: pytest.MonkeyPatch, sslkeylogfile: str | None ) -> None: # Assert that an HTTPS connection doesn't error out when given # no SSLKEYLOGFILE or an empty value (ie 'SSLKEYLOGFILE=') if sslkeylogfile is not None: monkeypatch.setenv("SSLKEYLOGFILE", sslkeylogfile) else: monkeypatch.delenv("SSLKEYLOGFILE", raising=False) with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as pool: r = pool.request("GET", "/") assert r.status == 200, r.data def test_alpn_default(self) -> None: """Default ALPN protocols are sent by default.""" if not has_alpn() or not has_alpn(ssl.SSLContext): pytest.skip("ALPN-support not available") with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, ssl_minimum_version=self.tls_version(), ) as pool: r = pool.request("GET", "/alpn_protocol", retries=0) assert r.status == 200 assert r.data.decode("utf-8") == util.ALPN_PROTOCOLS[0] def test_default_ssl_context_ssl_min_max_versions(self) -> None: ctx = urllib3.util.ssl_.create_urllib3_context() assert ctx.minimum_version == ssl.TLSVersion.TLSv1_2 # urllib3 sets a default maximum version only when it is # injected with PyOpenSSL SSL-support. # Otherwise, the default maximum version is set by Python's # `ssl.SSLContext`. The value respects OpenSSL configuration and # can be different from `ssl.TLSVersion.MAXIMUM_SUPPORTED`. # https://github.com/urllib3/urllib3/issues/2477#issuecomment-1151452150 if util.IS_PYOPENSSL: expected_maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED else: expected_maximum_version = ssl.SSLContext( ssl.PROTOCOL_TLS_CLIENT ).maximum_version assert ctx.maximum_version == expected_maximum_version def test_ssl_context_ssl_version_uses_ssl_min_max_versions(self) -> None: if self.ssl_version() == ssl.PROTOCOL_TLS_CLIENT: pytest.skip( "Skipping because ssl_version=ssl.PROTOCOL_TLS_CLIENT is not deprecated" ) with pytest.warns( DeprecationWarning, match=r"'ssl_version' option is deprecated and will be removed in " r"urllib3 v2\.1\.0\. Instead use 'ssl_minimum_version'", ): ctx = urllib3.util.ssl_.create_urllib3_context( ssl_version=self.ssl_version() ) assert ctx.minimum_version == self.tls_version() assert ctx.maximum_version == self.tls_version() @pytest.mark.usefixtures("requires_tlsv1") class TestHTTPS_TLSv1(BaseTestHTTPS): tls_protocol_name = "TLSv1" certs = TLSv1_CERTS @pytest.mark.usefixtures("requires_tlsv1_1") class TestHTTPS_TLSv1_1(BaseTestHTTPS): tls_protocol_name = "TLSv1.1" certs = TLSv1_1_CERTS @pytest.mark.usefixtures("requires_tlsv1_2") class TestHTTPS_TLSv1_2(BaseTestHTTPS): tls_protocol_name = "TLSv1.2" certs = TLSv1_2_CERTS @pytest.mark.usefixtures("requires_tlsv1_3") class TestHTTPS_TLSv1_3(BaseTestHTTPS): tls_protocol_name = "TLSv1.3" certs = TLSv1_3_CERTS class TestHTTPS_Hostname: def test_can_validate_san(self, san_server: ServerConfig) -> None: """Ensure that urllib3 can validate SANs with IP addresses in them.""" with HTTPSConnectionPool( san_server.host, san_server.port, cert_reqs="CERT_REQUIRED", ca_certs=san_server.ca_certs, ) as https_pool: r = https_pool.request("GET", "/") assert r.status == 200 def test_common_name_without_san_fails(self, no_san_server: ServerConfig) -> None: with HTTPSConnectionPool( no_san_server.host, no_san_server.port, cert_reqs="CERT_REQUIRED", ca_certs=no_san_server.ca_certs, ) as https_pool: with pytest.raises( MaxRetryError, ) as e: https_pool.request("GET", "/") assert "mismatch, certificate is not valid" in str( e.value ) or "no appropriate subjectAltName" in str(e.value) def test_common_name_without_san_with_different_common_name( self, no_san_server_with_different_commmon_name: ServerConfig ) -> None: ctx = urllib3.util.ssl_.create_urllib3_context() try: ctx.hostname_checks_common_name = True except AttributeError: pytest.skip("Couldn't set 'SSLContext.hostname_checks_common_name'") with HTTPSConnectionPool( no_san_server_with_different_commmon_name.host, no_san_server_with_different_commmon_name.port, cert_reqs="CERT_REQUIRED", ca_certs=no_san_server_with_different_commmon_name.ca_certs, ssl_context=ctx, ) as https_pool: with pytest.raises(MaxRetryError) as e: https_pool.request("GET", "/") assert "mismatch, certificate is not valid for 'localhost'" in str( e.value ) or "hostname 'localhost' doesn't match 'example.com'" in str(e.value) @pytest.mark.parametrize("use_assert_hostname", [True, False]) def test_hostname_checks_common_name_respected( self, no_san_server: ServerConfig, use_assert_hostname: bool ) -> None: ctx = urllib3.util.ssl_.create_urllib3_context() if not hasattr(ctx, "hostname_checks_common_name"): pytest.skip("Test requires 'SSLContext.hostname_checks_common_name'") ctx.load_verify_locations(no_san_server.ca_certs) try: ctx.hostname_checks_common_name = True except AttributeError: pytest.skip("Couldn't set 'SSLContext.hostname_checks_common_name'") err: MaxRetryError | None try: with HTTPSConnectionPool( no_san_server.host, no_san_server.port, cert_reqs="CERT_REQUIRED", ssl_context=ctx, assert_hostname=no_san_server.host if use_assert_hostname else None, ) as https_pool: https_pool.request("GET", "/") except MaxRetryError as e: err = e else: err = None # commonName is only valid for DNS names, not IP addresses. if no_san_server.host == "localhost": assert err is None # IP addresses should fail for commonName. else: assert err is not None assert type(err.reason) is SSLError assert isinstance( err.reason.args[0], (ssl.SSLCertVerificationError, CertificateError) ) def test_assert_hostname_invalid_san( self, no_localhost_san_server: ServerConfig ) -> None: """Ensure SAN errors are not raised while assert_hostname is false""" with HTTPSConnectionPool( no_localhost_san_server.host, no_localhost_san_server.port, cert_reqs="CERT_REQUIRED", ca_certs=no_localhost_san_server.ca_certs, assert_hostname=False, ) as https_pool: https_pool.request("GET", "/") def test_assert_hostname_invalid_cn( self, no_san_server_with_different_commmon_name: ServerConfig ) -> None: """Ensure CN errors are not raised while assert_hostname is false""" with HTTPSConnectionPool( no_san_server_with_different_commmon_name.host, no_san_server_with_different_commmon_name.port, cert_reqs="CERT_REQUIRED", ca_certs=no_san_server_with_different_commmon_name.ca_certs, assert_hostname=False, ) as https_pool: https_pool.request("GET", "/") class TestHTTPS_IPV4SAN: def test_can_validate_ip_san(self, ipv4_san_server: ServerConfig) -> None: """Ensure that urllib3 can validate SANs with IP addresses in them.""" with HTTPSConnectionPool( ipv4_san_server.host, ipv4_san_server.port, cert_reqs="CERT_REQUIRED", ca_certs=ipv4_san_server.ca_certs, ) as https_pool: r = https_pool.request("GET", "/") assert r.status == 200 class TestHTTPS_IPV6SAN: @pytest.mark.parametrize("host", ["::1", "[::1]"]) def test_can_validate_ipv6_san( self, ipv6_san_server: ServerConfig, host: str, http_version: str ) -> None: """Ensure that urllib3 can validate SANs with IPv6 addresses in them.""" with HTTPSConnectionPool( host, ipv6_san_server.port, cert_reqs="CERT_REQUIRED", ca_certs=ipv6_san_server.ca_certs, ) as https_pool: r = https_pool.request("GET", "/") assert r.status == 200 assert r.headers["server"] == f"hypercorn-{http_version}" test_proxy_poolmanager.py 0000644 00000112760 15025306245 0011733 0 ustar 00 from __future__ import annotations import binascii import contextlib import hashlib import ipaddress import os.path import pathlib import shutil import socket import ssl import tempfile from test import LONG_TIMEOUT, SHORT_TIMEOUT, resolvesLocalhostFQDN, withPyOpenSSL from test.conftest import ServerConfig import pytest import trustme import urllib3.exceptions from dummyserver.socketserver import DEFAULT_CA, HAS_IPV6, get_unreachable_address from dummyserver.testcase import ( HypercornDummyProxyTestCase, IPv6HypercornDummyProxyTestCase, ) from urllib3 import HTTPResponse from urllib3._collections import HTTPHeaderDict from urllib3.connection import VerifiedHTTPSConnection from urllib3.connectionpool import connection_from_url from urllib3.exceptions import ( ConnectTimeoutError, InsecureRequestWarning, MaxRetryError, ProxyError, ProxySchemeUnknown, ProxySchemeUnsupported, ReadTimeoutError, SSLError, ) from urllib3.poolmanager import ProxyManager, proxy_from_url from urllib3.util.ssl_ import create_urllib3_context from urllib3.util.timeout import Timeout from .. import TARPIT_HOST, requires_network def assert_is_verified(pm: ProxyManager, *, proxy: bool, target: bool) -> None: pool = list(pm.pools._container.values())[-1] # retrieve last pool entry connection = ( pool.pool.queue[-1] if pool.pool is not None else None ) # retrieve last connection entry assert connection.proxy_is_verified is proxy assert connection.is_verified is target class TestHTTPProxyManager(HypercornDummyProxyTestCase): @classmethod def setup_class(cls) -> None: super().setup_class() cls.http_url = f"http://{cls.http_host}:{int(cls.http_port)}" cls.http_url_alt = f"http://{cls.http_host_alt}:{int(cls.http_port)}" cls.https_url = f"https://{cls.https_host}:{int(cls.https_port)}" cls.https_url_alt = f"https://{cls.https_host_alt}:{int(cls.https_port)}" cls.https_url_fqdn = f"https://{cls.https_host}.:{int(cls.https_port)}" cls.proxy_url = f"http://{cls.proxy_host}:{int(cls.proxy_port)}" cls.https_proxy_url = f"https://{cls.proxy_host}:{int(cls.https_proxy_port)}" # Generate another CA to test verification failure cls.certs_dir = tempfile.mkdtemp() bad_ca = trustme.CA() cls.bad_ca_path = os.path.join(cls.certs_dir, "ca_bad.pem") bad_ca.cert_pem.write_to_path(cls.bad_ca_path) @classmethod def teardown_class(cls) -> None: super().teardown_class() shutil.rmtree(cls.certs_dir) def test_basic_proxy(self) -> None: with proxy_from_url(self.proxy_url, ca_certs=DEFAULT_CA) as http: r = http.request("GET", f"{self.http_url}/") assert r.status == 200 r = http.request("GET", f"{self.https_url}/") assert r.status == 200 def test_https_proxy(self) -> None: with proxy_from_url(self.https_proxy_url, ca_certs=DEFAULT_CA) as https: r = https.request("GET", f"{self.https_url}/") assert r.status == 200 r = https.request("GET", f"{self.http_url}/") assert r.status == 200 def test_is_verified_http_proxy_to_http_target(self) -> None: with proxy_from_url(self.proxy_url, ca_certs=DEFAULT_CA) as http: r = http.request("GET", f"{self.http_url}/") assert r.status == 200 assert_is_verified(http, proxy=False, target=False) def test_is_verified_http_proxy_to_https_target(self) -> None: with proxy_from_url(self.proxy_url, ca_certs=DEFAULT_CA) as http: r = http.request("GET", f"{self.https_url}/") assert r.status == 200 assert_is_verified(http, proxy=False, target=True) def test_is_verified_https_proxy_to_http_target(self) -> None: with proxy_from_url(self.https_proxy_url, ca_certs=DEFAULT_CA) as https: r = https.request("GET", f"{self.http_url}/") assert r.status == 200 assert_is_verified(https, proxy=True, target=False) def test_is_verified_https_proxy_to_https_target(self) -> None: with proxy_from_url(self.https_proxy_url, ca_certs=DEFAULT_CA) as https: r = https.request("GET", f"{self.https_url}/") assert r.status == 200 assert_is_verified(https, proxy=True, target=True) def test_http_and_https_kwarg_ca_cert_data_proxy(self) -> None: with open(DEFAULT_CA) as pem_file: pem_file_data = pem_file.read() with proxy_from_url(self.https_proxy_url, ca_cert_data=pem_file_data) as https: r = https.request("GET", f"{self.https_url}/") assert r.status == 200 r = https.request("GET", f"{self.http_url}/") assert r.status == 200 def test_https_proxy_with_proxy_ssl_context(self) -> None: proxy_ssl_context = create_urllib3_context() proxy_ssl_context.load_verify_locations(DEFAULT_CA) with proxy_from_url( self.https_proxy_url, proxy_ssl_context=proxy_ssl_context, ca_certs=DEFAULT_CA, ) as https: r = https.request("GET", f"{self.https_url}/") assert r.status == 200 r = https.request("GET", f"{self.http_url}/") assert r.status == 200 @withPyOpenSSL def test_https_proxy_pyopenssl_not_supported(self) -> None: with proxy_from_url(self.https_proxy_url, ca_certs=DEFAULT_CA) as https: r = https.request("GET", f"{self.http_url}/") assert r.status == 200 with pytest.raises( ProxySchemeUnsupported, match="isn't available on non-native SSLContext" ): https.request("GET", f"{self.https_url}/") def test_https_proxy_forwarding_for_https(self) -> None: with proxy_from_url( self.https_proxy_url, ca_certs=DEFAULT_CA, use_forwarding_for_https=True, ) as https: r = https.request("GET", f"{self.http_url}/") assert r.status == 200 r = https.request("GET", f"{self.https_url}/") assert r.status == 200 def test_nagle_proxy(self) -> None: """Test that proxy connections do not have TCP_NODELAY turned on""" with ProxyManager(self.proxy_url) as http: hc2 = http.connection_from_host(self.http_host, self.http_port) conn = hc2._get_conn() try: hc2._make_request(conn, "GET", f"{self.http_url}/") tcp_nodelay_setting = conn.sock.getsockopt( # type: ignore[attr-defined] socket.IPPROTO_TCP, socket.TCP_NODELAY ) assert tcp_nodelay_setting == 0, ( "Expected TCP_NODELAY for proxies to be set " "to zero, instead was %s" % tcp_nodelay_setting ) finally: conn.close() @pytest.mark.parametrize("proxy_scheme", ["http", "https"]) @pytest.mark.parametrize("target_scheme", ["http", "https"]) def test_proxy_conn_fail_from_dns( self, proxy_scheme: str, target_scheme: str ) -> None: host, port = get_unreachable_address() with proxy_from_url( f"{proxy_scheme}://{host}:{port}/", retries=1, timeout=LONG_TIMEOUT ) as http: if target_scheme == "https": target_url = self.https_url else: target_url = self.http_url with pytest.raises(MaxRetryError) as e: http.request("GET", f"{target_url}/") assert isinstance(e.value.reason, ProxyError) assert isinstance( e.value.reason.original_error, urllib3.exceptions.NameResolutionError ) def test_oldapi(self) -> None: with ProxyManager( connection_from_url(self.proxy_url), ca_certs=DEFAULT_CA # type: ignore[arg-type] ) as http: r = http.request("GET", f"{self.http_url}/") assert r.status == 200 r = http.request("GET", f"{self.https_url}/") assert r.status == 200 @resolvesLocalhostFQDN() def test_proxy_https_fqdn(self) -> None: with proxy_from_url(self.proxy_url, ca_certs=DEFAULT_CA) as http: r = http.request("GET", f"{self.https_url_fqdn}/") assert r.status == 200 def test_proxy_verified(self) -> None: with proxy_from_url( self.proxy_url, cert_reqs="REQUIRED", ca_certs=self.bad_ca_path ) as http: with http._new_pool( "https", self.https_host, self.https_port ) as https_pool: with pytest.raises(MaxRetryError) as e: https_pool.request("GET", "/", retries=0) assert isinstance(e.value.reason, SSLError) assert ( "certificate verify failed" in str(e.value.reason) # PyPy is more specific or "self signed certificate in certificate chain" in str(e.value.reason) ), f"Expected 'certificate verify failed', instead got: {e.value.reason!r}" http = proxy_from_url( self.proxy_url, cert_reqs="REQUIRED", ca_certs=DEFAULT_CA ) with http._new_pool( "https", self.https_host, self.https_port ) as https_pool2: with contextlib.closing(https_pool._new_conn()) as conn: assert conn.__class__ == VerifiedHTTPSConnection https_pool2.request( "GET", "/" ) # Should succeed without exceptions. http = proxy_from_url( self.proxy_url, cert_reqs="REQUIRED", ca_certs=DEFAULT_CA ) with http._new_pool( "https", "127.0.0.1", self.https_port ) as https_fail_pool: with pytest.raises( MaxRetryError, match="doesn't match|IP address mismatch" ) as e: https_fail_pool.request("GET", "/", retries=0) assert isinstance(e.value.reason, SSLError) def test_redirect(self) -> None: with proxy_from_url(self.proxy_url) as http: r = http.request( "GET", f"{self.http_url}/redirect", fields={"target": f"{self.http_url}/"}, redirect=False, ) assert r.status == 303 r = http.request( "GET", f"{self.http_url}/redirect", fields={"target": f"{self.http_url}/"}, ) assert r.status == 200 assert r.data == b"Dummy server!" def test_cross_host_redirect(self) -> None: with proxy_from_url(self.proxy_url) as http: cross_host_location = f"{self.http_url_alt}/echo?a=b" with pytest.raises(MaxRetryError): http.request( "GET", f"{self.http_url}/redirect", fields={"target": cross_host_location}, retries=0, ) r = http.request( "GET", f"{self.http_url}/redirect", fields={"target": f"{self.http_url_alt}/echo?a=b"}, retries=1, ) assert isinstance(r, HTTPResponse) assert r._pool is not None assert r._pool.host != self.http_host_alt def test_cross_protocol_redirect(self) -> None: with proxy_from_url(self.proxy_url, ca_certs=DEFAULT_CA) as http: cross_protocol_location = f"{self.https_url}/echo?a=b" with pytest.raises(MaxRetryError): http.request( "GET", f"{self.http_url}/redirect", fields={"target": cross_protocol_location}, retries=0, ) r = http.request( "GET", f"{self.http_url}/redirect", fields={"target": f"{self.https_url}/echo?a=b"}, retries=1, ) assert isinstance(r, HTTPResponse) assert r._pool is not None assert r._pool.host == self.https_host def test_headers(self) -> None: with proxy_from_url( self.proxy_url, headers={"Foo": "bar"}, proxy_headers={"Hickory": "dickory"}, ca_certs=DEFAULT_CA, ) as http: r = http.request_encode_url("GET", f"{self.http_url}/headers") returned_headers = r.json() assert returned_headers.get("Foo") == "bar" assert returned_headers.get("Hickory") == "dickory" assert returned_headers.get("Host") == f"{self.http_host}:{self.http_port}" r = http.request_encode_url("GET", f"{self.http_url_alt}/headers") returned_headers = r.json() assert returned_headers.get("Foo") == "bar" assert returned_headers.get("Hickory") == "dickory" assert ( returned_headers.get("Host") == f"{self.http_host_alt}:{self.http_port}" ) r = http.request_encode_url("GET", f"{self.https_url}/headers") returned_headers = r.json() assert returned_headers.get("Foo") == "bar" assert returned_headers.get("Hickory") is None assert ( returned_headers.get("Host") == f"{self.https_host}:{self.https_port}" ) r = http.request_encode_body("POST", f"{self.http_url}/headers") returned_headers = r.json() assert returned_headers.get("Foo") == "bar" assert returned_headers.get("Hickory") == "dickory" assert returned_headers.get("Host") == f"{self.http_host}:{self.http_port}" r = http.request_encode_url( "GET", f"{self.http_url}/headers", headers={"Baz": "quux"} ) returned_headers = r.json() assert returned_headers.get("Foo") is None assert returned_headers.get("Baz") == "quux" assert returned_headers.get("Hickory") == "dickory" assert returned_headers.get("Host") == f"{self.http_host}:{self.http_port}" r = http.request_encode_url( "GET", f"{self.https_url}/headers", headers={"Baz": "quux"} ) returned_headers = r.json() assert returned_headers.get("Foo") is None assert returned_headers.get("Baz") == "quux" assert returned_headers.get("Hickory") is None assert ( returned_headers.get("Host") == f"{self.https_host}:{self.https_port}" ) r = http.request_encode_body( "GET", f"{self.http_url}/headers", headers={"Baz": "quux"} ) returned_headers = r.json() assert returned_headers.get("Foo") is None assert returned_headers.get("Baz") == "quux" assert returned_headers.get("Hickory") == "dickory" assert returned_headers.get("Host") == f"{self.http_host}:{self.http_port}" r = http.request_encode_body( "GET", f"{self.https_url}/headers", headers={"Baz": "quux"} ) returned_headers = r.json() assert returned_headers.get("Foo") is None assert returned_headers.get("Baz") == "quux" assert returned_headers.get("Hickory") is None assert ( returned_headers.get("Host") == f"{self.https_host}:{self.https_port}" ) def test_https_headers(self) -> None: with proxy_from_url( self.https_proxy_url, headers={"Foo": "bar"}, proxy_headers={"Hickory": "dickory"}, ca_certs=DEFAULT_CA, ) as http: r = http.request_encode_url("GET", f"{self.http_url}/headers") returned_headers = r.json() assert returned_headers.get("Foo") == "bar" assert returned_headers.get("Hickory") == "dickory" assert returned_headers.get("Host") == f"{self.http_host}:{self.http_port}" r = http.request_encode_url("GET", f"{self.http_url_alt}/headers") returned_headers = r.json() assert returned_headers.get("Foo") == "bar" assert returned_headers.get("Hickory") == "dickory" assert ( returned_headers.get("Host") == f"{self.http_host_alt}:{self.http_port}" ) r = http.request_encode_body( "GET", f"{self.https_url}/headers", headers={"Baz": "quux"} ) returned_headers = r.json() assert returned_headers.get("Foo") is None assert returned_headers.get("Baz") == "quux" assert returned_headers.get("Hickory") is None assert ( returned_headers.get("Host") == f"{self.https_host}:{self.https_port}" ) def test_https_headers_forwarding_for_https(self) -> None: with proxy_from_url( self.https_proxy_url, headers={"Foo": "bar"}, proxy_headers={"Hickory": "dickory"}, ca_certs=DEFAULT_CA, use_forwarding_for_https=True, ) as http: r = http.request_encode_url("GET", f"{self.https_url}/headers") returned_headers = r.json() assert returned_headers.get("Foo") == "bar" assert returned_headers.get("Hickory") == "dickory" assert ( returned_headers.get("Host") == f"{self.https_host}:{self.https_port}" ) def test_headerdict(self) -> None: default_headers = HTTPHeaderDict(a="b") proxy_headers = HTTPHeaderDict() proxy_headers.add("foo", "bar") with proxy_from_url( self.proxy_url, headers=default_headers, proxy_headers=proxy_headers ) as http: request_headers = HTTPHeaderDict(baz="quux") r = http.request("GET", f"{self.http_url}/headers", headers=request_headers) returned_headers = r.json() assert returned_headers.get("Foo") == "bar" assert returned_headers.get("Baz") == "quux" def test_proxy_pooling(self) -> None: with proxy_from_url(self.proxy_url, cert_reqs="NONE") as http: for x in range(2): http.urlopen("GET", self.http_url) assert len(http.pools) == 1 for x in range(2): http.urlopen("GET", self.http_url_alt) assert len(http.pools) == 1 for x in range(2): with pytest.warns(InsecureRequestWarning): http.urlopen("GET", self.https_url) assert len(http.pools) == 2 for x in range(2): with pytest.warns(InsecureRequestWarning): http.urlopen("GET", self.https_url_alt) assert len(http.pools) == 3 def test_proxy_pooling_ext(self) -> None: with proxy_from_url(self.proxy_url) as http: hc1 = http.connection_from_url(self.http_url) hc2 = http.connection_from_host(self.http_host, self.http_port) hc3 = http.connection_from_url(self.http_url_alt) hc4 = http.connection_from_host(self.http_host_alt, self.http_port) assert hc1 == hc2 assert hc2 == hc3 assert hc3 == hc4 sc1 = http.connection_from_url(self.https_url) sc2 = http.connection_from_host( self.https_host, self.https_port, scheme="https" ) sc3 = http.connection_from_url(self.https_url_alt) sc4 = http.connection_from_host( self.https_host_alt, self.https_port, scheme="https" ) assert sc1 == sc2 assert sc2 != sc3 assert sc3 == sc4 @requires_network() @pytest.mark.parametrize( ["proxy_scheme", "target_scheme", "use_forwarding_for_https"], [ ("http", "http", False), ("https", "http", False), # 'use_forwarding_for_https' is only valid for HTTPS+HTTPS. ("https", "https", True), ], ) def test_forwarding_proxy_request_timeout( self, proxy_scheme: str, target_scheme: str, use_forwarding_for_https: bool ) -> None: proxy_url = self.https_proxy_url if proxy_scheme == "https" else self.proxy_url target_url = f"{target_scheme}://{TARPIT_HOST}" with proxy_from_url( proxy_url, ca_certs=DEFAULT_CA, use_forwarding_for_https=use_forwarding_for_https, ) as proxy: with pytest.raises(MaxRetryError) as e: timeout = Timeout(connect=LONG_TIMEOUT, read=SHORT_TIMEOUT) proxy.request("GET", target_url, timeout=timeout) # We sent the request to the proxy but didn't get any response # so we're not sure if that's being caused by the proxy or the # target so we put the blame on the target. assert isinstance(e.value.reason, ReadTimeoutError) # stdlib http.client.HTTPConnection._tunnel() causes a ResourceWarning # see https://github.com/python/cpython/issues/103472 @pytest.mark.filterwarnings("default::ResourceWarning") @requires_network() @pytest.mark.parametrize( ["proxy_scheme", "target_scheme"], [("http", "https"), ("https", "https")] ) def test_tunneling_proxy_request_timeout( self, proxy_scheme: str, target_scheme: str ) -> None: proxy_url = self.https_proxy_url if proxy_scheme == "https" else self.proxy_url target_url = f"{target_scheme}://{TARPIT_HOST}" with proxy_from_url( proxy_url, ca_certs=DEFAULT_CA, ) as proxy: with pytest.raises(MaxRetryError) as e: timeout = Timeout(connect=LONG_TIMEOUT, read=SHORT_TIMEOUT) proxy.request("GET", target_url, timeout=timeout) assert isinstance(e.value.reason, ReadTimeoutError) # stdlib http.client.HTTPConnection._tunnel() causes a ResourceWarning # see https://github.com/python/cpython/issues/103472 @pytest.mark.filterwarnings("default::ResourceWarning") @requires_network() @pytest.mark.parametrize( ["proxy_scheme", "target_scheme", "use_forwarding_for_https"], [ ("http", "http", False), ("https", "http", False), # 'use_forwarding_for_https' is only valid for HTTPS+HTTPS. ("https", "https", True), ], ) def test_forwarding_proxy_connect_timeout( self, proxy_scheme: str, target_scheme: str, use_forwarding_for_https: bool ) -> None: proxy_url = f"{proxy_scheme}://{TARPIT_HOST}" target_url = self.https_url if target_scheme == "https" else self.http_url with proxy_from_url( proxy_url, ca_certs=DEFAULT_CA, timeout=SHORT_TIMEOUT, use_forwarding_for_https=use_forwarding_for_https, ) as proxy: with pytest.raises(MaxRetryError) as e: proxy.request("GET", target_url) assert isinstance(e.value.reason, ProxyError) assert isinstance(e.value.reason.original_error, ConnectTimeoutError) # stdlib http.client.HTTPConnection._tunnel() causes a ResourceWarning # see https://github.com/python/cpython/issues/103472 @pytest.mark.filterwarnings("default::ResourceWarning") @requires_network() @pytest.mark.parametrize( ["proxy_scheme", "target_scheme"], [("http", "https"), ("https", "https")] ) def test_tunneling_proxy_connect_timeout( self, proxy_scheme: str, target_scheme: str ) -> None: proxy_url = f"{proxy_scheme}://{TARPIT_HOST}" target_url = self.https_url if target_scheme == "https" else self.http_url with proxy_from_url( proxy_url, ca_certs=DEFAULT_CA, timeout=SHORT_TIMEOUT ) as proxy: with pytest.raises(MaxRetryError) as e: proxy.request("GET", target_url) assert isinstance(e.value.reason, ProxyError) assert isinstance(e.value.reason.original_error, ConnectTimeoutError) # stdlib http.client.HTTPConnection._tunnel() causes a ResourceWarning # see https://github.com/python/cpython/issues/103472 @pytest.mark.filterwarnings("default::ResourceWarning") @requires_network() @pytest.mark.parametrize( ["target_scheme", "use_forwarding_for_https"], [ ("http", False), ("https", False), ("https", True), ], ) def test_https_proxy_tls_error( self, target_scheme: str, use_forwarding_for_https: str ) -> None: target_url = self.https_url if target_scheme == "https" else self.http_url proxy_ctx = ssl.create_default_context() with proxy_from_url( self.https_proxy_url, proxy_ssl_context=proxy_ctx, use_forwarding_for_https=use_forwarding_for_https, ) as proxy: with pytest.raises(MaxRetryError) as e: proxy.request("GET", target_url) assert isinstance(e.value.reason, ProxyError) assert isinstance(e.value.reason.original_error, SSLError) # stdlib http.client.HTTPConnection._tunnel() causes a ResourceWarning # see https://github.com/python/cpython/issues/103472 @pytest.mark.filterwarnings("default::ResourceWarning") @requires_network() @pytest.mark.parametrize( ["proxy_scheme", "use_forwarding_for_https"], [ ("http", False), ("https", False), ("https", True), ], ) def test_proxy_https_target_tls_error( self, proxy_scheme: str, use_forwarding_for_https: str ) -> None: if proxy_scheme == "https" and use_forwarding_for_https: pytest.skip("Test is expected to fail due to urllib3/urllib3#2577") proxy_url = self.https_proxy_url if proxy_scheme == "https" else self.proxy_url proxy_ctx = ssl.create_default_context() proxy_ctx.load_verify_locations(DEFAULT_CA) ctx = ssl.create_default_context() with proxy_from_url( proxy_url, proxy_ssl_context=proxy_ctx, ssl_context=ctx, use_forwarding_for_https=use_forwarding_for_https, ) as proxy: with pytest.raises(MaxRetryError) as e: proxy.request("GET", self.https_url) assert isinstance(e.value.reason, SSLError) # stdlib http.client.HTTPConnection._tunnel() causes a ResourceWarning # see https://github.com/python/cpython/issues/103472 @pytest.mark.filterwarnings("default::ResourceWarning") def test_scheme_host_case_insensitive(self) -> None: """Assert that upper-case schemes and hosts are normalized.""" with proxy_from_url(self.proxy_url.upper(), ca_certs=DEFAULT_CA) as http: r = http.request("GET", f"{self.http_url.upper()}/") assert r.status == 200 r = http.request("GET", f"{self.https_url.upper()}/") assert r.status == 200 @pytest.mark.parametrize( "url, error_msg", [ ( "127.0.0.1", "Proxy URL had no scheme, should start with http:// or https://", ), ( "localhost:8080", "Proxy URL had no scheme, should start with http:// or https://", ), ( "ftp://google.com", "Proxy URL had unsupported scheme ftp, should use http:// or https://", ), ], ) # stdlib http.client.HTTPConnection._tunnel() causes a ResourceWarning # see https://github.com/python/cpython/issues/103472 @pytest.mark.filterwarnings("default::ResourceWarning") def test_invalid_schema(self, url: str, error_msg: str) -> None: with pytest.raises(ProxySchemeUnknown, match=error_msg): proxy_from_url(url) @pytest.mark.skipif(not HAS_IPV6, reason="Only runs on IPv6 systems") class TestIPv6HTTPProxyManager(IPv6HypercornDummyProxyTestCase): @classmethod def setup_class(cls) -> None: super().setup_class() cls.http_url = f"http://{cls.http_host}:{int(cls.http_port)}" cls.http_url_alt = f"http://{cls.http_host_alt}:{int(cls.http_port)}" cls.https_url = f"https://{cls.https_host}:{int(cls.https_port)}" cls.https_url_alt = f"https://{cls.https_host_alt}:{int(cls.https_port)}" cls.proxy_url = f"http://[{cls.proxy_host}]:{int(cls.proxy_port)}" # stdlib http.client.HTTPConnection._tunnel() causes a ResourceWarning # see https://github.com/python/cpython/issues/103472 @pytest.mark.filterwarnings("default::ResourceWarning") def test_basic_ipv6_proxy(self) -> None: with proxy_from_url(self.proxy_url, ca_certs=DEFAULT_CA) as http: r = http.request("GET", f"{self.http_url}/") assert r.status == 200 r = http.request("GET", f"{self.https_url}/") assert r.status == 200 class TestHTTPSProxyVerification: @staticmethod def _get_proxy_fingerprint_md5(ca_path: str) -> str: proxy_pem_path = pathlib.Path(ca_path).parent / "proxy.pem" proxy_der = ssl.PEM_cert_to_DER_cert(proxy_pem_path.read_text()) proxy_hashed = hashlib.md5(proxy_der).digest() fingerprint = binascii.hexlify(proxy_hashed).decode("ascii") return fingerprint @staticmethod def _get_certificate_formatted_proxy_host(host: str) -> str: try: addr = ipaddress.ip_address(host) except ValueError: return host if addr.version != 6: return host # Transform ipv6 like '::1' to 0:0:0:0:0:0:0:1 via '0000:0000:0000:0000:0000:0000:0000:0001' return addr.exploded.replace("0000", "0").replace("000", "") # stdlib http.client.HTTPConnection._tunnel() causes a ResourceWarning # see https://github.com/python/cpython/issues/103472 @pytest.mark.filterwarnings("default::ResourceWarning") def test_https_proxy_assert_fingerprint_md5( self, no_san_proxy_with_server: tuple[ServerConfig, ServerConfig] ) -> None: proxy, server = no_san_proxy_with_server proxy_url = f"https://{proxy.host}:{proxy.port}" destination_url = f"https://{server.host}:{server.port}" proxy_fingerprint = self._get_proxy_fingerprint_md5(proxy.ca_certs) with proxy_from_url( proxy_url, ca_certs=proxy.ca_certs, proxy_assert_fingerprint=proxy_fingerprint, ) as https: https.request("GET", destination_url) # stdlib http.client.HTTPConnection._tunnel() causes a ResourceWarning # see https://github.com/python/cpython/issues/103472 @pytest.mark.filterwarnings("default::ResourceWarning") def test_https_proxy_assert_fingerprint_md5_non_matching( self, no_san_proxy_with_server: tuple[ServerConfig, ServerConfig] ) -> None: proxy, server = no_san_proxy_with_server proxy_url = f"https://{proxy.host}:{proxy.port}" destination_url = f"https://{server.host}:{server.port}" proxy_fingerprint = self._get_proxy_fingerprint_md5(proxy.ca_certs) new_char = "b" if proxy_fingerprint[5] == "a" else "a" proxy_fingerprint = proxy_fingerprint[:5] + new_char + proxy_fingerprint[6:] with proxy_from_url( proxy_url, ca_certs=proxy.ca_certs, proxy_assert_fingerprint=proxy_fingerprint, ) as https: with pytest.raises(MaxRetryError) as e: https.request("GET", destination_url) assert "Fingerprints did not match" in str(e) # stdlib http.client.HTTPConnection._tunnel() causes a ResourceWarning # see https://github.com/python/cpython/issues/103472 @pytest.mark.filterwarnings("default::ResourceWarning") def test_https_proxy_assert_hostname( self, san_proxy_with_server: tuple[ServerConfig, ServerConfig] ) -> None: proxy, server = san_proxy_with_server destination_url = f"https://{server.host}:{server.port}" with proxy_from_url( proxy.base_url, ca_certs=proxy.ca_certs, proxy_assert_hostname=proxy.host ) as https: https.request("GET", destination_url) # stdlib http.client.HTTPConnection._tunnel() causes a ResourceWarning # see https://github.com/python/cpython/issues/103472 @pytest.mark.filterwarnings("default::ResourceWarning") def test_https_proxy_assert_hostname_non_matching( self, san_proxy_with_server: tuple[ServerConfig, ServerConfig] ) -> None: proxy, server = san_proxy_with_server destination_url = f"https://{server.host}:{server.port}" proxy_hostname = "example.com" with proxy_from_url( proxy.base_url, ca_certs=proxy.ca_certs, proxy_assert_hostname=proxy_hostname, ) as https: with pytest.raises(MaxRetryError) as e: https.request("GET", destination_url) proxy_host = self._get_certificate_formatted_proxy_host(proxy.host) msg = f"hostname \\'{proxy_hostname}\\' doesn\\'t match \\'{proxy_host}\\'" assert msg in str(e) # stdlib http.client.HTTPConnection._tunnel() causes a ResourceWarning # see https://github.com/python/cpython/issues/103472 @pytest.mark.filterwarnings("default::ResourceWarning") def test_https_proxy_hostname_verification( self, no_localhost_san_server: ServerConfig ) -> None: bad_server = no_localhost_san_server bad_proxy_url = f"https://{bad_server.host}:{bad_server.port}" # An exception will be raised before we contact the destination domain. test_url = "testing.com" with proxy_from_url(bad_proxy_url, ca_certs=bad_server.ca_certs) as https: with pytest.raises(MaxRetryError) as e: https.request("GET", "http://%s/" % test_url) assert isinstance(e.value.reason, ProxyError) ssl_error = e.value.reason.original_error assert isinstance(ssl_error, SSLError) assert "hostname 'localhost' doesn't match" in str( ssl_error ) or "Hostname mismatch" in str(ssl_error) with pytest.raises(MaxRetryError) as e: https.request("GET", "https://%s/" % test_url) assert isinstance(e.value.reason, ProxyError) ssl_error = e.value.reason.original_error assert isinstance(ssl_error, SSLError) assert "hostname 'localhost' doesn't match" in str( ssl_error ) or "Hostname mismatch" in str(ssl_error) # stdlib http.client.HTTPConnection._tunnel() causes a ResourceWarning # see https://github.com/python/cpython/issues/103472 @pytest.mark.filterwarnings("default::ResourceWarning") def test_https_proxy_ipv4_san( self, ipv4_san_proxy_with_server: tuple[ServerConfig, ServerConfig] ) -> None: proxy, server = ipv4_san_proxy_with_server proxy_url = f"https://{proxy.host}:{proxy.port}" destination_url = f"https://{server.host}:{server.port}" with proxy_from_url(proxy_url, ca_certs=proxy.ca_certs) as https: r = https.request("GET", destination_url) assert r.status == 200 def test_https_proxy_ipv6_san( self, ipv6_san_proxy_with_server: tuple[ServerConfig, ServerConfig] ) -> None: proxy, server = ipv6_san_proxy_with_server proxy_url = f"https://[{proxy.host}]:{proxy.port}" destination_url = f"https://{server.host}:{server.port}" with proxy_from_url(proxy_url, ca_certs=proxy.ca_certs) as https: r = https.request("GET", destination_url) assert r.status == 200 @pytest.mark.parametrize("target_scheme", ["http", "https"]) def test_https_proxy_no_san( self, no_san_proxy_with_server: tuple[ServerConfig, ServerConfig], target_scheme: str, ) -> None: proxy, server = no_san_proxy_with_server proxy_url = f"https://{proxy.host}:{proxy.port}" destination_url = f"{target_scheme}://{server.host}:{server.port}" with proxy_from_url(proxy_url, ca_certs=proxy.ca_certs) as https: with pytest.raises(MaxRetryError) as e: https.request("GET", destination_url) assert isinstance(e.value.reason, ProxyError) ssl_error = e.value.reason.original_error assert isinstance(ssl_error, SSLError) assert "no appropriate subjectAltName fields were found" in str( ssl_error ) or "Hostname mismatch, certificate is not valid for 'localhost'" in str( ssl_error ) def test_https_proxy_no_san_hostname_checks_common_name( self, no_san_proxy_with_server: tuple[ServerConfig, ServerConfig] ) -> None: proxy, server = no_san_proxy_with_server proxy_url = f"https://{proxy.host}:{proxy.port}" destination_url = f"https://{server.host}:{server.port}" proxy_ctx = urllib3.util.ssl_.create_urllib3_context() try: proxy_ctx.hostname_checks_common_name = True # PyPy doesn't like us setting 'hostname_checks_common_name' # but also has it enabled by default so we need to handle that. except AttributeError: pass if getattr(proxy_ctx, "hostname_checks_common_name", False) is not True: pytest.skip("Test requires 'SSLContext.hostname_checks_common_name=True'") with proxy_from_url( proxy_url, ca_certs=proxy.ca_certs, proxy_ssl_context=proxy_ctx ) as https: https.request("GET", destination_url) test_chunked_transfer.py 0000644 00000024666 15025306245 0011522 0 ustar 00 from __future__ import annotations import socket import pytest from dummyserver.testcase import ( ConnectionMarker, SocketDummyServerTestCase, consume_socket, ) from urllib3 import HTTPConnectionPool from urllib3.util import SKIP_HEADER from urllib3.util.retry import Retry class TestChunkedTransfer(SocketDummyServerTestCase): def start_chunked_handler(self) -> None: self.buffer = b"" def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] while not self.buffer.endswith(b"\r\n0\r\n\r\n"): self.buffer += sock.recv(65536) sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-type: text/plain\r\n" b"Content-Length: 0\r\n" b"\r\n" ) sock.close() self._start_server(socket_handler) @pytest.mark.parametrize( "chunks", [ ["foo", "bar", "", "bazzzzzzzzzzzzzzzzzzzzzz"], [b"foo", b"bar", b"", b"bazzzzzzzzzzzzzzzzzzzzzz"], ], ) def test_chunks(self, chunks: list[bytes | str]) -> None: self.start_chunked_handler() with HTTPConnectionPool(self.host, self.port, retries=False) as pool: pool.urlopen("GET", "/", body=chunks, headers=dict(DNT="1"), chunked=True) # type: ignore[arg-type] assert b"Transfer-Encoding" in self.buffer body = self.buffer.split(b"\r\n\r\n", 1)[1] lines = body.split(b"\r\n") # Empty chunks should have been skipped, as this could not be distinguished # from terminating the transmission for i, chunk in enumerate( [c.decode() if isinstance(c, bytes) else c for c in chunks if c] ): assert lines[i * 2] == hex(len(chunk))[2:].encode("utf-8") assert lines[i * 2 + 1] == chunk.encode("utf-8") def _test_body(self, data: bytes | str | None) -> None: self.start_chunked_handler() with HTTPConnectionPool(self.host, self.port, retries=False) as pool: pool.urlopen("GET", "/", data, chunked=True) header, body = self.buffer.split(b"\r\n\r\n", 1) assert b"Transfer-Encoding: chunked" in header.split(b"\r\n") if data: bdata = data if isinstance(data, bytes) else data.encode("utf-8") assert b"\r\n" + bdata + b"\r\n" in body assert body.endswith(b"\r\n0\r\n\r\n") len_str = body.split(b"\r\n", 1)[0] stated_len = int(len_str, 16) assert stated_len == len(bdata) else: assert body == b"0\r\n\r\n" def test_bytestring_body(self) -> None: self._test_body(b"thisshouldbeonechunk\r\nasdf") def test_unicode_body(self) -> None: self._test_body("thisshouldbeonechunk\r\näöüß") def test_empty_body(self) -> None: self._test_body(None) def test_empty_string_body(self) -> None: self._test_body("") def test_empty_iterable_body(self) -> None: self._test_body(None) def _get_header_lines(self, prefix: bytes) -> list[bytes]: header_block = self.buffer.split(b"\r\n\r\n", 1)[0].lower() header_lines = header_block.split(b"\r\n")[1:] return [x for x in header_lines if x.startswith(prefix)] def test_removes_duplicate_host_header(self) -> None: self.start_chunked_handler() chunks = [b"foo", b"bar", b"", b"bazzzzzzzzzzzzzzzzzzzzzz"] with HTTPConnectionPool(self.host, self.port, retries=False) as pool: pool.urlopen( "GET", "/", body=chunks, headers={"Host": "test.org"}, chunked=True ) host_headers = self._get_header_lines(b"host") assert len(host_headers) == 1 def test_provides_default_host_header(self) -> None: self.start_chunked_handler() chunks = [b"foo", b"bar", b"", b"bazzzzzzzzzzzzzzzzzzzzzz"] with HTTPConnectionPool(self.host, self.port, retries=False) as pool: pool.urlopen("GET", "/", body=chunks, chunked=True) host_headers = self._get_header_lines(b"host") assert len(host_headers) == 1 def test_provides_default_user_agent_header(self) -> None: self.start_chunked_handler() chunks = [b"foo", b"bar", b"", b"bazzzzzzzzzzzzzzzzzzzzzz"] with HTTPConnectionPool(self.host, self.port, retries=False) as pool: pool.urlopen("GET", "/", body=chunks, chunked=True) ua_headers = self._get_header_lines(b"user-agent") assert len(ua_headers) == 1 def test_preserve_user_agent_header(self) -> None: self.start_chunked_handler() chunks = [b"foo", b"bar", b"", b"bazzzzzzzzzzzzzzzzzzzzzz"] with HTTPConnectionPool(self.host, self.port, retries=False) as pool: pool.urlopen( "GET", "/", body=chunks, headers={"user-Agent": "test-agent"}, chunked=True, ) ua_headers = self._get_header_lines(b"user-agent") # Validate that there is only one User-Agent header. assert len(ua_headers) == 1 # Validate that the existing User-Agent header is the one that was # provided. assert ua_headers[0] == b"user-agent: test-agent" def test_remove_user_agent_header(self) -> None: self.start_chunked_handler() chunks = [b"foo", b"bar", b"", b"bazzzzzzzzzzzzzzzzzzzzzz"] with HTTPConnectionPool(self.host, self.port, retries=False) as pool: pool.urlopen( "GET", "/", body=chunks, headers={"User-Agent": SKIP_HEADER}, chunked=True, ) ua_headers = self._get_header_lines(b"user-agent") assert len(ua_headers) == 0 def test_provides_default_transfer_encoding_header(self) -> None: self.start_chunked_handler() chunks = [b"foo", b"bar", b"", b"bazzzzzzzzzzzzzzzzzzzzzz"] with HTTPConnectionPool(self.host, self.port, retries=False) as pool: pool.urlopen("GET", "/", body=chunks, chunked=True) te_headers = self._get_header_lines(b"transfer-encoding") assert len(te_headers) == 1 def test_preserve_transfer_encoding_header(self) -> None: self.start_chunked_handler() chunks = [b"foo", b"bar", b"", b"bazzzzzzzzzzzzzzzzzzzzzz"] with HTTPConnectionPool(self.host, self.port, retries=False) as pool: pool.urlopen( "GET", "/", body=chunks, headers={"transfer-Encoding": "test-transfer-encoding"}, chunked=True, ) te_headers = self._get_header_lines(b"transfer-encoding") # Validate that there is only one Transfer-Encoding header. assert len(te_headers) == 1 # Validate that the existing Transfer-Encoding header is the one that # was provided. assert te_headers[0] == b"transfer-encoding: test-transfer-encoding" def test_preserve_chunked_on_retry_after(self) -> None: self.chunked_requests = 0 self.socks: list[socket.socket] = [] def socket_handler(listener: socket.socket) -> None: for _ in range(2): sock = listener.accept()[0] self.socks.append(sock) request = consume_socket(sock) if b"Transfer-Encoding: chunked" in request.split(b"\r\n"): self.chunked_requests += 1 sock.send( b"HTTP/1.1 429 Too Many Requests\r\n" b"Content-Type: text/plain\r\n" b"Retry-After: 1\r\n" b"Content-Length: 0\r\n" b"Connection: close\r\n" b"\r\n" ) self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port) as pool: retries = Retry(total=1) pool.urlopen("GET", "/", chunked=True, retries=retries) for sock in self.socks: sock.close() assert self.chunked_requests == 2 def test_preserve_chunked_on_redirect( self, monkeypatch: pytest.MonkeyPatch ) -> None: self.chunked_requests = 0 def socket_handler(listener: socket.socket) -> None: for i in range(2): sock = listener.accept()[0] request = ConnectionMarker.consume_request(sock) if b"Transfer-Encoding: chunked" in request.split(b"\r\n"): self.chunked_requests += 1 if i == 0: sock.sendall( b"HTTP/1.1 301 Moved Permanently\r\n" b"Location: /redirect\r\n\r\n" ) else: sock.sendall(b"HTTP/1.1 200 OK\r\n\r\n") sock.close() self._start_server(socket_handler) with ConnectionMarker.mark(monkeypatch): with HTTPConnectionPool(self.host, self.port) as pool: retries = Retry(redirect=1) pool.urlopen( "GET", "/", chunked=True, preload_content=False, retries=retries ) assert self.chunked_requests == 2 def test_preserve_chunked_on_broken_connection( self, monkeypatch: pytest.MonkeyPatch ) -> None: self.chunked_requests = 0 def socket_handler(listener: socket.socket) -> None: for i in range(2): sock = listener.accept()[0] request = ConnectionMarker.consume_request(sock) if b"Transfer-Encoding: chunked" in request.split(b"\r\n"): self.chunked_requests += 1 if i == 0: # Bad HTTP version will trigger a connection close sock.sendall(b"HTTP/0.5 200 OK\r\n\r\n") else: sock.sendall(b"HTTP/1.1 200 OK\r\n\r\n") sock.close() self._start_server(socket_handler) with ConnectionMarker.mark(monkeypatch): with HTTPConnectionPool(self.host, self.port) as pool: retries = Retry(read=1) pool.urlopen( "GET", "/", chunked=True, preload_content=False, retries=retries ) assert self.chunked_requests == 2 test_socketlevel.py 0000644 00000256155 15025306245 0010515 0 ustar 00 # TODO: Break this module up into pieces. Maybe group by functionality tested # rather than the socket level-ness of it. from __future__ import annotations import contextlib import errno import io import os import os.path import select import shutil import socket import ssl import tempfile import threading import typing import zlib from collections import OrderedDict from pathlib import Path from test import LONG_TIMEOUT, SHORT_TIMEOUT, notWindows, resolvesLocalhostFQDN from threading import Event from unittest import mock import pytest import trustme from dummyserver.socketserver import ( DEFAULT_CA, DEFAULT_CERTS, encrypt_key_pem, get_unreachable_address, ) from dummyserver.testcase import SocketDummyServerTestCase, consume_socket from urllib3 import HTTPConnectionPool, HTTPSConnectionPool, ProxyManager, util from urllib3._collections import HTTPHeaderDict from urllib3.connection import HTTPConnection, _get_default_user_agent from urllib3.connectionpool import _url_from_pool from urllib3.exceptions import ( InsecureRequestWarning, MaxRetryError, ProtocolError, ProxyError, ReadTimeoutError, SSLError, ) from urllib3.poolmanager import proxy_from_url from urllib3.util import ssl_, ssl_wrap_socket from urllib3.util.retry import Retry from urllib3.util.timeout import Timeout from .. import LogRecorder, has_alpn if typing.TYPE_CHECKING: from _typeshed import StrOrBytesPath else: StrOrBytesPath = object class TestCookies(SocketDummyServerTestCase): def test_multi_setcookie(self) -> None: def multicookie_response_handler(listener: socket.socket) -> None: sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) sock.send( b"HTTP/1.1 200 OK\r\n" b"Set-Cookie: foo=1\r\n" b"Set-Cookie: bar=1\r\n" b"\r\n" ) sock.close() self._start_server(multicookie_response_handler) with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("GET", "/", retries=0) assert r.headers == {"set-cookie": "foo=1, bar=1"} assert r.headers.getlist("set-cookie") == ["foo=1", "bar=1"] class TestSNI(SocketDummyServerTestCase): def test_hostname_in_first_request_packet(self) -> None: done_receiving = Event() self.buf = b"" def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] self.buf = sock.recv(65536) # We only accept one packet done_receiving.set() # let the test know it can proceed sock.close() self._start_server(socket_handler) with HTTPSConnectionPool(self.host, self.port) as pool: try: pool.request("GET", "/", retries=0) except MaxRetryError: # We are violating the protocol pass successful = done_receiving.wait(LONG_TIMEOUT) assert successful, "Timed out waiting for connection accept" assert ( self.host.encode("ascii") in self.buf ), "missing hostname in SSL handshake" class TestALPN(SocketDummyServerTestCase): def test_alpn_protocol_in_first_request_packet(self) -> None: if not has_alpn(): pytest.skip("ALPN-support not available") done_receiving = Event() self.buf = b"" def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] self.buf = sock.recv(65536) # We only accept one packet done_receiving.set() # let the test know it can proceed sock.close() self._start_server(socket_handler) with HTTPSConnectionPool(self.host, self.port) as pool: try: pool.request("GET", "/", retries=0) except MaxRetryError: # We are violating the protocol pass successful = done_receiving.wait(LONG_TIMEOUT) assert successful, "Timed out waiting for connection accept" for protocol in util.ALPN_PROTOCOLS: assert ( protocol.encode("ascii") in self.buf ), "missing ALPN protocol in SSL handshake" def original_ssl_wrap_socket( sock: socket.socket, keyfile: StrOrBytesPath | None = None, certfile: StrOrBytesPath | None = None, server_side: bool = False, cert_reqs: ssl.VerifyMode = ssl.CERT_NONE, ssl_version: int = ssl.PROTOCOL_TLS, ca_certs: str | None = None, do_handshake_on_connect: bool = True, suppress_ragged_eofs: bool = True, ciphers: str | None = None, ) -> ssl.SSLSocket: if server_side and not certfile: raise ValueError("certfile must be specified for server-side operations") if keyfile and not certfile: raise ValueError("certfile must be specified") context = ssl.SSLContext(ssl_version) context.verify_mode = cert_reqs if ca_certs: context.load_verify_locations(ca_certs) if certfile: context.load_cert_chain(certfile, keyfile) if ciphers: context.set_ciphers(ciphers) return context.wrap_socket( sock=sock, server_side=server_side, do_handshake_on_connect=do_handshake_on_connect, suppress_ragged_eofs=suppress_ragged_eofs, ) class TestClientCerts(SocketDummyServerTestCase): """ Tests for client certificate support. """ @classmethod def setup_class(cls) -> None: cls.tmpdir = tempfile.mkdtemp() ca = trustme.CA() cert = ca.issue_cert("localhost") encrypted_key = encrypt_key_pem(cert.private_key_pem, b"letmein") cls.ca_path = os.path.join(cls.tmpdir, "ca.pem") cls.cert_combined_path = os.path.join(cls.tmpdir, "server.combined.pem") cls.cert_path = os.path.join(cls.tmpdir, "server.pem") cls.key_path = os.path.join(cls.tmpdir, "key.pem") cls.password_key_path = os.path.join(cls.tmpdir, "password_key.pem") ca.cert_pem.write_to_path(cls.ca_path) cert.private_key_and_cert_chain_pem.write_to_path(cls.cert_combined_path) cert.cert_chain_pems[0].write_to_path(cls.cert_path) cert.private_key_pem.write_to_path(cls.key_path) encrypted_key.write_to_path(cls.password_key_path) @classmethod def teardown_class(cls) -> None: shutil.rmtree(cls.tmpdir) def _wrap_in_ssl(self, sock: socket.socket) -> ssl.SSLSocket: """ Given a single socket, wraps it in TLS. """ return original_ssl_wrap_socket( sock, ssl_version=ssl.PROTOCOL_SSLv23, cert_reqs=ssl.CERT_REQUIRED, ca_certs=self.ca_path, certfile=self.cert_path, keyfile=self.key_path, server_side=True, ) def test_client_certs_two_files(self) -> None: """ Having a client cert in a separate file to its associated key works properly. """ done_receiving = Event() client_certs = [] def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] sock = self._wrap_in_ssl(sock) client_certs.append(sock.getpeercert()) data = b"" while not data.endswith(b"\r\n\r\n"): data += sock.recv(8192) sock.sendall( b"HTTP/1.1 200 OK\r\n" b"Server: testsocket\r\n" b"Connection: close\r\n" b"Content-Length: 6\r\n" b"\r\n" b"Valid!" ) done_receiving.wait(5) sock.close() self._start_server(socket_handler) with HTTPSConnectionPool( self.host, self.port, cert_file=self.cert_path, key_file=self.key_path, cert_reqs="REQUIRED", ca_certs=self.ca_path, ) as pool: pool.request("GET", "/", retries=0) done_receiving.set() assert len(client_certs) == 1 def test_client_certs_one_file(self) -> None: """ Having a client cert and its associated private key in just one file works properly. """ done_receiving = Event() client_certs = [] def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] sock = self._wrap_in_ssl(sock) client_certs.append(sock.getpeercert()) data = b"" while not data.endswith(b"\r\n\r\n"): data += sock.recv(8192) sock.sendall( b"HTTP/1.1 200 OK\r\n" b"Server: testsocket\r\n" b"Connection: close\r\n" b"Content-Length: 6\r\n" b"\r\n" b"Valid!" ) done_receiving.wait(5) sock.close() self._start_server(socket_handler) with HTTPSConnectionPool( self.host, self.port, cert_file=self.cert_combined_path, cert_reqs="REQUIRED", ca_certs=self.ca_path, ) as pool: pool.request("GET", "/", retries=0) done_receiving.set() assert len(client_certs) == 1 def test_missing_client_certs_raises_error(self) -> None: """ Having client certs not be present causes an error. """ done_receiving = Event() def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] try: self._wrap_in_ssl(sock) except ssl.SSLError: pass done_receiving.wait(5) sock.close() self._start_server(socket_handler) with HTTPSConnectionPool( self.host, self.port, cert_reqs="REQUIRED", ca_certs=self.ca_path ) as pool: with pytest.raises(MaxRetryError): pool.request("GET", "/", retries=0) done_receiving.set() done_receiving.set() def test_client_cert_with_string_password(self) -> None: self.run_client_cert_with_password_test("letmein") def test_client_cert_with_bytes_password(self) -> None: self.run_client_cert_with_password_test(b"letmein") def run_client_cert_with_password_test(self, password: bytes | str) -> None: """ Tests client certificate password functionality """ done_receiving = Event() client_certs = [] def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] sock = self._wrap_in_ssl(sock) client_certs.append(sock.getpeercert()) data = b"" while not data.endswith(b"\r\n\r\n"): data += sock.recv(8192) sock.sendall( b"HTTP/1.1 200 OK\r\n" b"Server: testsocket\r\n" b"Connection: close\r\n" b"Content-Length: 6\r\n" b"\r\n" b"Valid!" ) done_receiving.wait(5) sock.close() self._start_server(socket_handler) assert ssl_.SSLContext is not None ssl_context = ssl_.SSLContext(ssl_.PROTOCOL_SSLv23) ssl_context.load_cert_chain( certfile=self.cert_path, keyfile=self.password_key_path, password=password ) with HTTPSConnectionPool( self.host, self.port, ssl_context=ssl_context, cert_reqs="REQUIRED", ca_certs=self.ca_path, ) as pool: pool.request("GET", "/", retries=0) done_receiving.set() assert len(client_certs) == 1 def test_load_keyfile_with_invalid_password(self) -> None: assert ssl_.SSLContext is not None context = ssl_.SSLContext(ssl_.PROTOCOL_SSLv23) with pytest.raises(ssl.SSLError): context.load_cert_chain( certfile=self.cert_path, keyfile=self.password_key_path, password=b"letmei", ) def test_load_invalid_cert_file(self) -> None: assert ssl_.SSLContext is not None context = ssl_.SSLContext(ssl_.PROTOCOL_SSLv23) with pytest.raises(ssl.SSLError): context.load_cert_chain(certfile=self.password_key_path) class TestSocketClosing(SocketDummyServerTestCase): def test_recovery_when_server_closes_connection(self) -> None: # Does the pool work seamlessly if an open connection in the # connection pool gets hung up on by the server, then reaches # the front of the queue again? done_closing = Event() def socket_handler(listener: socket.socket) -> None: for i in 0, 1: sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf = sock.recv(65536) body = f"Response {int(i)}" sock.send( ( "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" "\r\n" "%s" % (len(body), body) ).encode("utf-8") ) sock.close() # simulate a server timing out, closing socket done_closing.set() # let the test know it can proceed self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port) as pool: response = pool.request("GET", "/", retries=0) assert response.status == 200 assert response.data == b"Response 0" done_closing.wait() # wait until the socket in our pool gets closed response = pool.request("GET", "/", retries=0) assert response.status == 200 assert response.data == b"Response 1" def test_connection_refused(self) -> None: # Does the pool retry if there is no listener on the port? host, port = get_unreachable_address() with HTTPConnectionPool(host, port, maxsize=3, block=True) as http: with pytest.raises(MaxRetryError): http.request("GET", "/", retries=0, release_conn=False) assert http.pool is not None assert http.pool.qsize() == http.pool.maxsize def test_connection_read_timeout(self) -> None: timed_out = Event() def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] while not sock.recv(65536).endswith(b"\r\n\r\n"): pass timed_out.wait() sock.close() self._start_server(socket_handler) with HTTPConnectionPool( self.host, self.port, timeout=SHORT_TIMEOUT, retries=False, maxsize=3, block=True, ) as http: try: with pytest.raises(ReadTimeoutError): http.request("GET", "/", release_conn=False) finally: timed_out.set() assert http.pool is not None assert http.pool.qsize() == http.pool.maxsize def test_read_timeout_dont_retry_method_not_in_allowlist(self) -> None: timed_out = Event() def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] sock.recv(65536) timed_out.wait() sock.close() self._start_server(socket_handler) with HTTPConnectionPool( self.host, self.port, timeout=LONG_TIMEOUT, retries=True ) as pool: try: with pytest.raises(ReadTimeoutError): pool.request("POST", "/") finally: timed_out.set() def test_https_connection_read_timeout(self) -> None: """Handshake timeouts should fail with a Timeout""" timed_out = Event() def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] while not sock.recv(65536): pass timed_out.wait() sock.close() # first ReadTimeoutError due to SocketTimeout self._start_server(socket_handler) with HTTPSConnectionPool( self.host, self.port, timeout=LONG_TIMEOUT, retries=False ) as pool: try: with pytest.raises(ReadTimeoutError): pool.request("GET", "/") finally: timed_out.set() # second ReadTimeoutError due to errno with HTTPSConnectionPool(host=self.host): err = OSError() err.errno = errno.EAGAIN with pytest.raises(ReadTimeoutError): pool._raise_timeout(err, "", 0) def test_timeout_errors_cause_retries(self) -> None: def socket_handler(listener: socket.socket) -> None: sock_timeout = listener.accept()[0] # Wait for a second request before closing the first socket. sock = listener.accept()[0] sock_timeout.close() # Second request. buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) # Now respond immediately. body = "Response 2" sock.send( ( "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" "\r\n" "%s" % (len(body), body) ).encode("utf-8") ) sock.close() # In situations where the main thread throws an exception, the server # thread can hang on an accept() call. This ensures everything times # out within 1 second. This should be long enough for any socket # operations in the test suite to complete default_timeout = socket.getdefaulttimeout() socket.setdefaulttimeout(1) try: self._start_server(socket_handler) t = Timeout(connect=LONG_TIMEOUT, read=LONG_TIMEOUT) with HTTPConnectionPool(self.host, self.port, timeout=t) as pool: response = pool.request("GET", "/", retries=1) assert response.status == 200 assert response.data == b"Response 2" finally: socket.setdefaulttimeout(default_timeout) def test_delayed_body_read_timeout(self) -> None: timed_out = Event() def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] buf = b"" body = "Hi" while not buf.endswith(b"\r\n\r\n"): buf = sock.recv(65536) sock.send( ( "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" "\r\n" % len(body) ).encode("utf-8") ) timed_out.wait() sock.send(body.encode("utf-8")) sock.close() self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port) as pool: response = pool.urlopen( "GET", "/", retries=0, preload_content=False, timeout=Timeout(connect=1, read=LONG_TIMEOUT), ) try: with pytest.raises(ReadTimeoutError): response.read() finally: timed_out.set() def test_delayed_body_read_timeout_with_preload(self) -> None: timed_out = Event() def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] buf = b"" body = "Hi" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) sock.send( ( "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" "\r\n" % len(body) ).encode("utf-8") ) timed_out.wait(5) sock.close() self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port) as pool: try: with pytest.raises(ReadTimeoutError): timeout = Timeout(connect=LONG_TIMEOUT, read=SHORT_TIMEOUT) pool.urlopen("GET", "/", retries=False, timeout=timeout) finally: timed_out.set() def test_incomplete_response(self) -> None: body = "Response" partial_body = body[:2] def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] # Consume request buf = b"" while not buf.endswith(b"\r\n\r\n"): buf = sock.recv(65536) # Send partial response and close socket. sock.send( ( "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" "\r\n" "%s" % (len(body), partial_body) ).encode("utf-8") ) sock.close() self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port) as pool: response = pool.request("GET", "/", retries=0, preload_content=False) with pytest.raises(ProtocolError): response.read() def test_retry_weird_http_version(self) -> None: """Retry class should handle httplib.BadStatusLine errors properly""" def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] # First request. # Pause before responding so the first request times out. buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) # send unknown http protocol body = "bad http 0.5 response" sock.send( ( "HTTP/0.5 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" "\r\n" "%s" % (len(body), body) ).encode("utf-8") ) sock.close() # Second request. sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) # Now respond immediately. sock.send( ( "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" "\r\n" "foo" % (len("foo")) ).encode("utf-8") ) sock.close() # Close the socket. self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port) as pool: retry = Retry(read=1) response = pool.request("GET", "/", retries=retry) assert response.status == 200 assert response.data == b"foo" def test_connection_cleanup_on_read_timeout(self) -> None: timed_out = Event() def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] buf = b"" body = "Hi" while not buf.endswith(b"\r\n\r\n"): buf = sock.recv(65536) sock.send( ( "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" "\r\n" % len(body) ).encode("utf-8") ) timed_out.wait() sock.close() self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port) as pool: assert pool.pool is not None poolsize = pool.pool.qsize() response = pool.urlopen( "GET", "/", retries=0, preload_content=False, timeout=LONG_TIMEOUT ) try: with pytest.raises(ReadTimeoutError): response.read() assert poolsize == pool.pool.qsize() finally: timed_out.set() def test_connection_cleanup_on_protocol_error_during_read(self) -> None: body = "Response" partial_body = body[:2] def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] # Consume request buf = b"" while not buf.endswith(b"\r\n\r\n"): buf = sock.recv(65536) # Send partial response and close socket. sock.send( ( "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" "\r\n" "%s" % (len(body), partial_body) ).encode("utf-8") ) sock.close() self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port) as pool: assert pool.pool is not None poolsize = pool.pool.qsize() response = pool.request("GET", "/", retries=0, preload_content=False) with pytest.raises(ProtocolError): response.read() assert poolsize == pool.pool.qsize() def test_connection_closed_on_read_timeout_preload_false(self) -> None: timed_out = Event() def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] # Consume request buf = b"" while not buf.endswith(b"\r\n\r\n"): buf = sock.recv(65535) # Send partial chunked response and then hang. sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/plain\r\n" b"Transfer-Encoding: chunked\r\n" b"\r\n" b"8\r\n" b"12345678\r\n" ) timed_out.wait(5) # Expect a new request, but keep hold of the old socket to avoid # leaking it. Because we don't want to hang this thread, we # actually use select.select to confirm that a new request is # coming in: this lets us time the thread out. rlist, _, _ = select.select([listener], [], [], 1) assert rlist new_sock = listener.accept()[0] # Consume request buf = b"" while not buf.endswith(b"\r\n\r\n"): buf = new_sock.recv(65535) # Send complete chunked response. new_sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/plain\r\n" b"Transfer-Encoding: chunked\r\n" b"\r\n" b"8\r\n" b"12345678\r\n" b"0\r\n\r\n" ) new_sock.close() sock.close() self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port) as pool: # First request should fail. response = pool.urlopen( "GET", "/", retries=0, preload_content=False, timeout=LONG_TIMEOUT ) try: with pytest.raises(ReadTimeoutError): response.read() finally: timed_out.set() # Second should succeed. response = pool.urlopen( "GET", "/", retries=0, preload_content=False, timeout=LONG_TIMEOUT ) assert len(response.read()) == 8 def test_closing_response_actually_closes_connection(self) -> None: done_closing = Event() complete = Event() def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf = sock.recv(65536) sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/plain\r\n" b"Content-Length: 0\r\n" b"\r\n" ) # Wait for the socket to close. done_closing.wait(timeout=LONG_TIMEOUT) # Look for the empty string to show that the connection got closed. # Don't get stuck in a timeout. sock.settimeout(LONG_TIMEOUT) new_data = sock.recv(65536) assert not new_data sock.close() complete.set() self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port) as pool: response = pool.request("GET", "/", retries=0, preload_content=False) assert response.status == 200 response.close() done_closing.set() # wait until the socket in our pool gets closed successful = complete.wait(timeout=LONG_TIMEOUT) assert successful, "Timed out waiting for connection close" def test_release_conn_param_is_respected_after_timeout_retry(self) -> None: """For successful ```urlopen(release_conn=False)```, the connection isn't released, even after a retry. This test allows a retry: one request fails, the next request succeeds. This is a regression test for issue #651 [1], where the connection would be released if the initial request failed, even if a retry succeeded. [1] <https://github.com/urllib3/urllib3/issues/651> """ def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] consume_socket(sock) # Close the connection, without sending any response (not even the # HTTP status line). This will trigger a `Timeout` on the client, # inside `urlopen()`. sock.close() # Expect a new request. Because we don't want to hang this thread, # we actually use select.select to confirm that a new request is # coming in: this lets us time the thread out. rlist, _, _ = select.select([listener], [], [], 5) assert rlist sock = listener.accept()[0] consume_socket(sock) # Send complete chunked response. sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/plain\r\n" b"Transfer-Encoding: chunked\r\n" b"\r\n" b"8\r\n" b"12345678\r\n" b"0\r\n\r\n" ) sock.close() self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port, maxsize=1) as pool: # First request should fail, but the timeout and `retries=1` should # save it. response = pool.urlopen( "GET", "/", retries=1, release_conn=False, preload_content=False, timeout=LONG_TIMEOUT, ) # The connection should still be on the response object, and none # should be in the pool. We opened two though. assert pool.num_connections == 2 assert pool.pool is not None assert pool.pool.qsize() == 0 assert response.connection is not None # Consume the data. This should put the connection back. response.read() assert pool.pool.qsize() == 1 assert response.connection is None def test_socket_close_socket_then_file(self) -> None: quit_event = threading.Event() def consume_ssl_socket( listener: socket.socket, ) -> None: try: with listener.accept()[0] as sock, original_ssl_wrap_socket( sock, server_side=True, keyfile=DEFAULT_CERTS["keyfile"], certfile=DEFAULT_CERTS["certfile"], ca_certs=DEFAULT_CA, ) as ssl_sock: consume_socket(ssl_sock, quit_event=quit_event) except (ConnectionResetError, ConnectionAbortedError, OSError): pass self._start_server(consume_ssl_socket, quit_event=quit_event) with socket.create_connection( (self.host, self.port) ) as sock, contextlib.closing( ssl_wrap_socket(sock, server_hostname=self.host, ca_certs=DEFAULT_CA) ) as ssl_sock, ssl_sock.makefile( "rb" ) as f: ssl_sock.close() f.close() with pytest.raises(OSError): ssl_sock.sendall(b"hello") assert ssl_sock.fileno() == -1 def test_socket_close_stays_open_with_makefile_open(self) -> None: quit_event = threading.Event() def consume_ssl_socket(listener: socket.socket) -> None: try: with listener.accept()[0] as sock, original_ssl_wrap_socket( sock, server_side=True, keyfile=DEFAULT_CERTS["keyfile"], certfile=DEFAULT_CERTS["certfile"], ca_certs=DEFAULT_CA, ) as ssl_sock: consume_socket(ssl_sock, quit_event=quit_event) except (ConnectionResetError, ConnectionAbortedError, OSError): pass self._start_server(consume_ssl_socket, quit_event=quit_event) with socket.create_connection( (self.host, self.port) ) as sock, contextlib.closing( ssl_wrap_socket(sock, server_hostname=self.host, ca_certs=DEFAULT_CA) ) as ssl_sock, ssl_sock.makefile( "rb" ): ssl_sock.close() ssl_sock.close() ssl_sock.sendall(b"hello") assert ssl_sock.fileno() > 0 class TestProxyManager(SocketDummyServerTestCase): def test_simple(self) -> None: def echo_socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) sock.send( ( "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" "\r\n" "%s" % (len(buf), buf.decode("utf-8")) ).encode("utf-8") ) sock.close() self._start_server(echo_socket_handler) base_url = f"http://{self.host}:{self.port}" with proxy_from_url(base_url) as proxy: r = proxy.request("GET", "http://google.com/") assert r.status == 200 # FIXME: The order of the headers is not predictable right now. We # should fix that someday (maybe when we migrate to # OrderedDict/MultiDict). assert sorted(r.data.split(b"\r\n")) == sorted( [ b"GET http://google.com/ HTTP/1.1", b"Host: google.com", b"Accept-Encoding: identity", b"Accept: */*", b"User-Agent: " + _get_default_user_agent().encode("utf-8"), b"", b"", ] ) def test_headers(self) -> None: def echo_socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) sock.send( ( "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" "\r\n" "%s" % (len(buf), buf.decode("utf-8")) ).encode("utf-8") ) sock.close() self._start_server(echo_socket_handler) base_url = f"http://{self.host}:{self.port}" # Define some proxy headers. proxy_headers = HTTPHeaderDict({"For The Proxy": "YEAH!"}) with proxy_from_url(base_url, proxy_headers=proxy_headers) as proxy: conn = proxy.connection_from_url("http://www.google.com/") r = conn.urlopen("GET", "http://www.google.com/", assert_same_host=False) assert r.status == 200 # FIXME: The order of the headers is not predictable right now. We # should fix that someday (maybe when we migrate to # OrderedDict/MultiDict). assert b"For The Proxy: YEAH!\r\n" in r.data def test_retries(self) -> None: close_event = Event() def echo_socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] # First request, which should fail sock.close() # Second request sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) sock.send( ( "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Content-Length: %d\r\n" "\r\n" "%s" % (len(buf), buf.decode("utf-8")) ).encode("utf-8") ) sock.close() close_event.set() self._start_server(echo_socket_handler) base_url = f"http://{self.host}:{self.port}" with proxy_from_url(base_url) as proxy: conn = proxy.connection_from_url("http://www.google.com") r = conn.urlopen( "GET", "http://www.google.com", assert_same_host=False, retries=1 ) assert r.status == 200 close_event.wait(timeout=LONG_TIMEOUT) with pytest.raises(ProxyError): conn.urlopen( "GET", "http://www.google.com", assert_same_host=False, retries=False, ) def test_connect_reconn(self) -> None: def proxy_ssl_one(listener: socket.socket) -> None: sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) s = buf.decode("utf-8") if not s.startswith("CONNECT "): sock.send(b"HTTP/1.1 405 Method not allowed\r\nAllow: CONNECT\r\n\r\n") sock.close() return if not s.startswith(f"CONNECT {self.host}:443"): sock.send(b"HTTP/1.1 403 Forbidden\r\n\r\n") sock.close() return sock.send(b"HTTP/1.1 200 Connection Established\r\n\r\n") ssl_sock = original_ssl_wrap_socket( sock, server_side=True, keyfile=DEFAULT_CERTS["keyfile"], certfile=DEFAULT_CERTS["certfile"], ca_certs=DEFAULT_CA, ) buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += ssl_sock.recv(65536) ssl_sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/plain\r\n" b"Content-Length: 2\r\n" b"Connection: close\r\n" b"\r\n" b"Hi" ) ssl_sock.close() def echo_socket_handler(listener: socket.socket) -> None: proxy_ssl_one(listener) proxy_ssl_one(listener) self._start_server(echo_socket_handler) base_url = f"http://{self.host}:{self.port}" with proxy_from_url(base_url, ca_certs=DEFAULT_CA) as proxy: url = f"https://{self.host}" conn = proxy.connection_from_url(url) r = conn.urlopen("GET", url, retries=0) assert r.status == 200 r = conn.urlopen("GET", url, retries=0) assert r.status == 200 def test_connect_ipv6_addr(self) -> None: ipv6_addr = "2001:4998:c:a06::2:4008" def echo_socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) s = buf.decode("utf-8") if s.startswith(f"CONNECT [{ipv6_addr}]:443"): sock.send(b"HTTP/1.1 200 Connection Established\r\n\r\n") ssl_sock = original_ssl_wrap_socket( sock, server_side=True, keyfile=DEFAULT_CERTS["keyfile"], certfile=DEFAULT_CERTS["certfile"], ) buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += ssl_sock.recv(65536) ssl_sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/plain\r\n" b"Content-Length: 2\r\n" b"Connection: close\r\n" b"\r\n" b"Hi" ) ssl_sock.close() else: sock.close() self._start_server(echo_socket_handler) base_url = f"http://{self.host}:{self.port}" with proxy_from_url(base_url, cert_reqs="NONE") as proxy: url = f"https://[{ipv6_addr}]" conn = proxy.connection_from_url(url) try: with pytest.warns(InsecureRequestWarning): r = conn.urlopen("GET", url, retries=0) assert r.status == 200 except MaxRetryError: pytest.fail("Invalid IPv6 format in HTTP CONNECT request") @pytest.mark.parametrize("target_scheme", ["http", "https"]) def test_https_proxymanager_connected_to_http_proxy( self, target_scheme: str ) -> None: errored = Event() def http_socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] sock.send(b"HTTP/1.0 501 Not Implemented\r\nConnection: close\r\n\r\n") errored.wait() sock.close() self._start_server(http_socket_handler) base_url = f"https://{self.host}:{self.port}" with ProxyManager(base_url, cert_reqs="NONE") as proxy: with pytest.raises(MaxRetryError) as e: proxy.request("GET", f"{target_scheme}://example.com", retries=0) errored.set() # Avoid a ConnectionAbortedError on Windows. assert type(e.value.reason) is ProxyError assert "Your proxy appears to only use HTTP and not HTTPS" in str( e.value.reason ) class TestSSL(SocketDummyServerTestCase): def test_ssl_failure_midway_through_conn(self) -> None: def socket_handler(listener: socket.socket) -> None: with listener.accept()[0] as sock, sock.dup() as sock2: ssl_sock = original_ssl_wrap_socket( sock, server_side=True, keyfile=DEFAULT_CERTS["keyfile"], certfile=DEFAULT_CERTS["certfile"], ca_certs=DEFAULT_CA, ) buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += ssl_sock.recv(65536) # Deliberately send from the non-SSL socket. sock2.send( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/plain\r\n" b"Content-Length: 2\r\n" b"\r\n" b"Hi" ) ssl_sock.close() self._start_server(socket_handler) with HTTPSConnectionPool(self.host, self.port, ca_certs=DEFAULT_CA) as pool: with pytest.raises( SSLError, match=r"(wrong version number|record overflow|record layer failure)", ): pool.request("GET", "/", retries=False) def test_ssl_read_timeout(self) -> None: timed_out = Event() def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] # disable Nagle's algorithm so there's no delay in sending a partial body sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, True) ssl_sock = original_ssl_wrap_socket( sock, server_side=True, keyfile=DEFAULT_CERTS["keyfile"], certfile=DEFAULT_CERTS["certfile"], ) buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += ssl_sock.recv(65536) # Send incomplete message (note Content-Length) ssl_sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/plain\r\n" b"Content-Length: 10\r\n" b"\r\n" b"Hi-" ) timed_out.wait() sock.close() ssl_sock.close() self._start_server(socket_handler) with HTTPSConnectionPool(self.host, self.port, ca_certs=DEFAULT_CA) as pool: response = pool.urlopen( "GET", "/", retries=0, preload_content=False, timeout=LONG_TIMEOUT ) try: with pytest.raises(ReadTimeoutError): response.read() finally: timed_out.set() def test_ssl_failed_fingerprint_verification(self) -> None: def socket_handler(listener: socket.socket) -> None: for i in range(2): sock = listener.accept()[0] try: ssl_sock = original_ssl_wrap_socket( sock, server_side=True, keyfile=DEFAULT_CERTS["keyfile"], certfile=DEFAULT_CERTS["certfile"], ca_certs=DEFAULT_CA, ) except (ssl.SSLError, ConnectionResetError, ConnectionAbortedError): pass else: with ssl_sock: try: ssl_sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/plain\r\n" b"Content-Length: 5\r\n\r\n" b"Hello" ) except (ssl.SSLEOFError, ConnectionResetError, BrokenPipeError): pass sock.close() self._start_server(socket_handler) # GitHub's fingerprint. Valid, but not matching. fingerprint = "A0:C4:A7:46:00:ED:A7:2D:C0:BE:CB:9A:8C:B6:07:CA:58:EE:74:5E" def request() -> None: pool = HTTPSConnectionPool( self.host, self.port, assert_fingerprint=fingerprint, cert_reqs="CERT_NONE", ) try: timeout = Timeout(connect=LONG_TIMEOUT, read=SHORT_TIMEOUT) response = pool.urlopen( "GET", "/", preload_content=False, retries=0, timeout=timeout ) response.read() finally: pool.close() with pytest.raises(MaxRetryError) as cm: request() assert type(cm.value.reason) is SSLError assert str(cm.value.reason) == ( "Fingerprints did not match. Expected " '"a0c4a74600eda72dc0becb9a8cb607ca58ee745e", got ' '"728b554c9afc1e88a11cad1bb2e7cc3edbc8f98a"' ) # Should not hang, see https://github.com/urllib3/urllib3/issues/529 with pytest.raises(MaxRetryError) as cm2: request() assert type(cm2.value.reason) is SSLError assert str(cm2.value.reason) == ( "Fingerprints did not match. Expected " '"a0c4a74600eda72dc0becb9a8cb607ca58ee745e", got ' '"728b554c9afc1e88a11cad1bb2e7cc3edbc8f98a"' ) def test_retry_ssl_error(self) -> None: def socket_handler(listener: socket.socket) -> None: # first request, trigger an SSLError sock = listener.accept()[0] sock2 = sock.dup() ssl_sock = original_ssl_wrap_socket( sock, server_side=True, keyfile=DEFAULT_CERTS["keyfile"], certfile=DEFAULT_CERTS["certfile"], ) buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += ssl_sock.recv(65536) # Deliberately send from the non-SSL socket to trigger an SSLError sock2.send( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/plain\r\n" b"Content-Length: 4\r\n" b"\r\n" b"Fail" ) sock2.close() ssl_sock.close() # retried request sock = listener.accept()[0] ssl_sock = original_ssl_wrap_socket( sock, server_side=True, keyfile=DEFAULT_CERTS["keyfile"], certfile=DEFAULT_CERTS["certfile"], ) buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += ssl_sock.recv(65536) ssl_sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/plain\r\n" b"Content-Length: 7\r\n\r\n" b"Success" ) ssl_sock.close() self._start_server(socket_handler) with HTTPSConnectionPool(self.host, self.port, ca_certs=DEFAULT_CA) as pool: response = pool.urlopen("GET", "/", retries=1) assert response.data == b"Success" def test_ssl_load_default_certs_when_empty(self) -> None: def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] try: ssl_sock = original_ssl_wrap_socket( sock, server_side=True, keyfile=DEFAULT_CERTS["keyfile"], certfile=DEFAULT_CERTS["certfile"], ca_certs=DEFAULT_CA, ) except (ssl.SSLError, OSError): return buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += ssl_sock.recv(65536) ssl_sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/plain\r\n" b"Content-Length: 5\r\n\r\n" b"Hello" ) ssl_sock.close() sock.close() context = mock.create_autospec(ssl_.SSLContext) context.load_default_certs = mock.Mock() context.options = 0 class MockSSLSocket: def __init__( self, sock: socket.socket, *args: object, **kwargs: object ) -> None: self._sock = sock def close(self) -> None: self._sock.close() context.wrap_socket = MockSSLSocket with mock.patch("urllib3.util.ssl_.SSLContext", lambda *_, **__: context): self._start_server(socket_handler) with HTTPSConnectionPool(self.host, self.port) as pool: # Without a proper `SSLContext`, this request will fail in some # arbitrary way, but we only want to know if load_default_certs() was # called, which is why we accept any `Exception` here. with pytest.raises(Exception): pool.request("GET", "/", timeout=SHORT_TIMEOUT) context.load_default_certs.assert_called_with() def test_ssl_dont_load_default_certs_when_given(self) -> None: def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] try: ssl_sock = original_ssl_wrap_socket( sock, server_side=True, keyfile=DEFAULT_CERTS["keyfile"], certfile=DEFAULT_CERTS["certfile"], ca_certs=DEFAULT_CA, ) except (ssl.SSLError, OSError): return buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += ssl_sock.recv(65536) ssl_sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/plain\r\n" b"Content-Length: 5\r\n\r\n" b"Hello" ) ssl_sock.close() sock.close() context = mock.create_autospec(ssl_.SSLContext) context.load_default_certs = mock.Mock() context.options = 0 class MockSSLSocket: def __init__( self, sock: socket.socket, *args: object, **kwargs: object ) -> None: self._sock = sock def close(self) -> None: self._sock.close() context.wrap_socket = MockSSLSocket with mock.patch("urllib3.util.ssl_.SSLContext", lambda *_, **__: context): for kwargs in [ {"ca_certs": "/a"}, {"ca_cert_dir": "/a"}, {"ca_certs": "a", "ca_cert_dir": "a"}, {"ssl_context": context}, ]: self._start_server(socket_handler) with HTTPSConnectionPool(self.host, self.port, **kwargs) as pool: with pytest.raises(Exception): pool.request("GET", "/", timeout=SHORT_TIMEOUT) context.load_default_certs.assert_not_called() def test_load_verify_locations_exception(self) -> None: """ Ensure that load_verify_locations raises SSLError for all backends """ with pytest.raises(SSLError): ssl_wrap_socket(None, ca_certs="/tmp/fake-file") # type: ignore[call-overload] def test_ssl_custom_validation_failure_terminates(self, tmpdir: Path) -> None: """ Ensure that the underlying socket is terminated if custom validation fails. """ server_closed = Event() def is_closed_socket(sock: socket.socket) -> bool: try: sock.settimeout(SHORT_TIMEOUT) except OSError: return True return False def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] try: _ = original_ssl_wrap_socket( sock, server_side=True, keyfile=DEFAULT_CERTS["keyfile"], certfile=DEFAULT_CERTS["certfile"], ca_certs=DEFAULT_CA, ) except ConnectionResetError: return except ssl.SSLError as e: assert "alert unknown ca" in str(e) if is_closed_socket(sock): server_closed.set() self._start_server(socket_handler) # client uses a different ca other_ca = trustme.CA() other_ca_path = str(tmpdir / "ca.pem") other_ca.cert_pem.write_to_path(other_ca_path) with HTTPSConnectionPool( self.host, self.port, cert_reqs="REQUIRED", ca_certs=other_ca_path ) as pool: with pytest.raises(SSLError): pool.request("GET", "/", retries=False, timeout=LONG_TIMEOUT) assert server_closed.wait(LONG_TIMEOUT), "The socket was not terminated" def _run_preload(self, pool: HTTPSConnectionPool, content_length: int) -> None: response = pool.request("GET", "/") assert len(response.data) == content_length def _run_read_None(self, pool: HTTPSConnectionPool, content_length: int) -> None: response = pool.request("GET", "/", preload_content=False) assert len(response.read(None)) == content_length assert response.read(None) == b"" def _run_read_amt(self, pool: HTTPSConnectionPool, content_length: int) -> None: response = pool.request("GET", "/", preload_content=False) assert len(response.read(content_length)) == content_length assert response.read(5) == b"" def _run_read1_None(self, pool: HTTPSConnectionPool, content_length: int) -> None: response = pool.request("GET", "/", preload_content=False) remaining = content_length while True: chunk = response.read1(None) if not chunk: break remaining -= len(chunk) assert remaining == 0 def _run_read1_amt(self, pool: HTTPSConnectionPool, content_length: int) -> None: response = pool.request("GET", "/", preload_content=False) remaining = content_length while True: chunk = response.read1(content_length) if not chunk: break remaining -= len(chunk) assert remaining == 0 @pytest.mark.integration @pytest.mark.parametrize( "method", [_run_preload, _run_read_None, _run_read_amt, _run_read1_None, _run_read1_amt], ) def test_requesting_large_resources_via_ssl( self, method: typing.Callable[[typing.Any, HTTPSConnectionPool, int], None] ) -> None: """ Ensure that it is possible to read 2 GiB or more via an SSL socket. https://github.com/urllib3/urllib3/issues/2513 """ content_length = 2**31 # (`int` max value in C) + 1. ssl_ready = Event() def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] ssl_sock = original_ssl_wrap_socket( sock, server_side=True, keyfile=DEFAULT_CERTS["keyfile"], certfile=DEFAULT_CERTS["certfile"], ca_certs=DEFAULT_CA, ) ssl_ready.set() while not ssl_sock.recv(65536).endswith(b"\r\n\r\n"): continue ssl_sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Type: text/plain\r\n" b"Content-Length: %d\r\n\r\n" % content_length ) chunks = 2 for i in range(chunks): ssl_sock.sendall(bytes(content_length // chunks)) ssl_sock.close() sock.close() self._start_server(socket_handler) ssl_ready.wait(5) with HTTPSConnectionPool( self.host, self.port, ca_certs=DEFAULT_CA, retries=False ) as pool: method(self, pool, content_length) class TestErrorWrapping(SocketDummyServerTestCase): def test_bad_statusline(self) -> None: self.start_response_handler( b"HTTP/1.1 Omg What Is This?\r\n" b"Content-Length: 0\r\n" b"\r\n" ) with HTTPConnectionPool(self.host, self.port, retries=False) as pool: with pytest.raises(ProtocolError): pool.request("GET", "/") def test_unknown_protocol(self) -> None: self.start_response_handler( b"HTTP/1000 200 OK\r\n" b"Content-Length: 0\r\n" b"\r\n" ) with HTTPConnectionPool(self.host, self.port, retries=False) as pool: with pytest.raises(ProtocolError): pool.request("GET", "/") class TestHeaders(SocketDummyServerTestCase): def test_httplib_headers_case_insensitive(self) -> None: self.start_response_handler( b"HTTP/1.1 200 OK\r\n" b"Content-Length: 0\r\n" b"Content-type: text/plain\r\n" b"\r\n" ) with HTTPConnectionPool(self.host, self.port, retries=False) as pool: HEADERS = {"Content-Length": "0", "Content-type": "text/plain"} r = pool.request("GET", "/") assert HEADERS == dict(r.headers.items()) # to preserve case sensitivity def start_parsing_handler(self) -> None: self.parsed_headers: typing.OrderedDict[str, str] = OrderedDict() self.received_headers: list[bytes] = [] def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) self.received_headers = [ header for header in buf.split(b"\r\n")[1:] if header ] for header in self.received_headers: (key, value) = header.split(b": ") self.parsed_headers[key.decode("ascii")] = value.decode("ascii") sock.send(b"HTTP/1.1 204 No Content\r\nContent-Length: 0\r\n\r\n") sock.close() self._start_server(socket_handler) def test_headers_are_sent_with_the_original_case(self) -> None: headers = {"foo": "bar", "bAz": "quux"} self.start_parsing_handler() expected_headers = { "Accept-Encoding": "identity", "Host": f"{self.host}:{self.port}", "User-Agent": _get_default_user_agent(), } expected_headers.update(headers) with HTTPConnectionPool(self.host, self.port, retries=False) as pool: pool.request("GET", "/", headers=HTTPHeaderDict(headers)) assert expected_headers == self.parsed_headers def test_ua_header_can_be_overridden(self) -> None: headers = {"uSeR-AgENt": "Definitely not urllib3!"} self.start_parsing_handler() expected_headers = { "Accept-Encoding": "identity", "Host": f"{self.host}:{self.port}", } expected_headers.update(headers) with HTTPConnectionPool(self.host, self.port, retries=False) as pool: pool.request("GET", "/", headers=HTTPHeaderDict(headers)) assert expected_headers == self.parsed_headers def test_request_headers_are_sent_in_the_original_order(self) -> None: # NOTE: Probability this test gives a false negative is 1/(K!) K = 16 # NOTE: Provide headers in non-sorted order (i.e. reversed) # so that if the internal implementation tries to sort them, # a change will be detected. expected_request_headers = [ (f"X-Header-{int(i)}", str(i)) for i in reversed(range(K)) ] def filter_non_x_headers( d: typing.OrderedDict[str, str] ) -> list[tuple[str, str]]: return [(k, v) for (k, v) in d.items() if k.startswith("X-Header-")] self.start_parsing_handler() with HTTPConnectionPool(self.host, self.port, retries=False) as pool: pool.request("GET", "/", headers=OrderedDict(expected_request_headers)) assert expected_request_headers == filter_non_x_headers(self.parsed_headers) @resolvesLocalhostFQDN() def test_request_host_header_ignores_fqdn_dot(self) -> None: self.start_parsing_handler() with HTTPConnectionPool(self.host + ".", self.port, retries=False) as pool: pool.request("GET", "/") self.assert_header_received( self.received_headers, "Host", f"{self.host}:{self.port}" ) def test_response_headers_are_returned_in_the_original_order(self) -> None: # NOTE: Probability this test gives a false negative is 1/(K!) K = 16 # NOTE: Provide headers in non-sorted order (i.e. reversed) # so that if the internal implementation tries to sort them, # a change will be detected. expected_response_headers = [ (f"X-Header-{int(i)}", str(i)) for i in reversed(range(K)) ] def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) sock.send( b"HTTP/1.1 200 OK\r\n" + b"\r\n".join( [ (k.encode("utf8") + b": " + v.encode("utf8")) for (k, v) in expected_response_headers ] ) + b"\r\n" ) sock.close() self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("GET", "/", retries=0) actual_response_headers = [ (k, v) for (k, v) in r.headers.items() if k.startswith("X-Header-") ] assert expected_response_headers == actual_response_headers @pytest.mark.parametrize( "method_type, body_type", [ ("GET", None), ("POST", None), ("POST", "bytes"), ("POST", "bytes-io"), ], ) def test_headers_sent_with_add( self, method_type: str, body_type: str | None ) -> None: """ Confirm that when adding headers with combine=True that we simply append to the most recent value, rather than create a new header line. """ body: None | bytes | io.BytesIO if body_type is None: body = None expected = b"\r\n\r\n" elif body_type == "bytes": body = b"my-body" expected = b"\r\n\r\nmy-body" elif body_type == "bytes-io": body = io.BytesIO(b"bytes-io-body") body.seek(0, 0) expected = b"bytes-io-body\r\n0\r\n\r\n" else: raise ValueError("Unknown body type") buffer: bytes = b"" def socket_handler(listener: socket.socket) -> None: nonlocal buffer sock = listener.accept()[0] sock.settimeout(0) while expected not in buffer: with contextlib.suppress(BlockingIOError): buffer += sock.recv(65536) sock.sendall( b"HTTP/1.1 200 OK\r\n" b"Server: example.com\r\n" b"Content-Length: 0\r\n\r\n" ) sock.close() self._start_server(socket_handler) headers = HTTPHeaderDict() headers.add("A", "1") headers.add("C", "3") headers.add("B", "2") headers.add("B", "3") headers.add("A", "4", combine=False) headers.add("C", "5", combine=True) headers.add("C", "6") with HTTPConnectionPool(self.host, self.port, retries=False) as pool: r = pool.request( method_type, "/", body=body, headers=headers, ) assert r.status == 200 assert b"A: 1\r\nA: 4\r\nC: 3, 5\r\nC: 6\r\nB: 2\r\nB: 3" in buffer class TestBrokenHeaders(SocketDummyServerTestCase): def _test_broken_header_parsing( self, headers: list[bytes], unparsed_data_check: str | None = None ) -> None: self.start_response_handler( ( b"HTTP/1.1 200 OK\r\n" b"Content-Length: 0\r\n" b"Content-type: text/plain\r\n" ) + b"\r\n".join(headers) + b"\r\n\r\n" ) with HTTPConnectionPool(self.host, self.port, retries=False) as pool: with LogRecorder() as logs: pool.request("GET", "/") for record in logs: if ( "Failed to parse headers" in record.msg and type(record.args) is tuple and _url_from_pool(pool, "/") == record.args[0] ): if ( unparsed_data_check is None or unparsed_data_check in record.getMessage() ): return pytest.fail("Missing log about unparsed headers") def test_header_without_name(self) -> None: self._test_broken_header_parsing([b": Value", b"Another: Header"]) def test_header_without_name_or_value(self) -> None: self._test_broken_header_parsing([b":", b"Another: Header"]) def test_header_without_colon_or_value(self) -> None: self._test_broken_header_parsing( [b"Broken Header", b"Another: Header"], "Broken Header" ) class TestHeaderParsingContentType(SocketDummyServerTestCase): def _test_okay_header_parsing(self, header: bytes) -> None: self.start_response_handler( (b"HTTP/1.1 200 OK\r\n" b"Content-Length: 0\r\n") + header + b"\r\n\r\n" ) with HTTPConnectionPool(self.host, self.port, retries=False) as pool: with LogRecorder() as logs: pool.request("GET", "/") for record in logs: assert "Failed to parse headers" not in record.msg def test_header_text_plain(self) -> None: self._test_okay_header_parsing(b"Content-type: text/plain") def test_header_message_rfc822(self) -> None: self._test_okay_header_parsing(b"Content-type: message/rfc822") class TestHEAD(SocketDummyServerTestCase): def test_chunked_head_response_does_not_hang(self) -> None: self.start_response_handler( b"HTTP/1.1 200 OK\r\n" b"Transfer-Encoding: chunked\r\n" b"Content-type: text/plain\r\n" b"\r\n" ) with HTTPConnectionPool(self.host, self.port, retries=False) as pool: r = pool.request("HEAD", "/", timeout=LONG_TIMEOUT, preload_content=False) # stream will use the read_chunked method here. assert [] == list(r.stream()) def test_empty_head_response_does_not_hang(self) -> None: self.start_response_handler( b"HTTP/1.1 200 OK\r\n" b"Content-Length: 256\r\n" b"Content-type: text/plain\r\n" b"\r\n" ) with HTTPConnectionPool(self.host, self.port, retries=False) as pool: r = pool.request("HEAD", "/", timeout=LONG_TIMEOUT, preload_content=False) # stream will use the read method here. assert [] == list(r.stream()) class TestStream(SocketDummyServerTestCase): def test_stream_none_unchunked_response_does_not_hang(self) -> None: done_event = Event() def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Length: 12\r\n" b"Content-type: text/plain\r\n" b"\r\n" b"hello, world" ) done_event.wait(5) sock.close() self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port, retries=False) as pool: r = pool.request("GET", "/", timeout=LONG_TIMEOUT, preload_content=False) # Stream should read to the end. assert [b"hello, world"] == list(r.stream(None)) done_event.set() def test_large_compressed_stream(self) -> None: done_event = Event() expected_total_length = 296085 def socket_handler(listener: socket.socket) -> None: compress = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) data = compress.compress(b"x" * expected_total_length) data += compress.flush() sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) sock.sendall( b"HTTP/1.1 200 OK\r\n" b"Content-Length: %d\r\n" b"Content-Encoding: gzip\r\n" b"\r\n" % (len(data),) + data ) done_event.wait(5) sock.close() self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port, retries=False) as pool: r = pool.request("GET", "/", timeout=LONG_TIMEOUT, preload_content=False) # Chunks must all be equal or less than 10240 # and only the last chunk is allowed to be smaller # than 10240. total_length = 0 chunks_smaller_than_10240 = 0 for chunk in r.stream(10240, decode_content=True): assert 0 < len(chunk) <= 10240 if len(chunk) < 10240: chunks_smaller_than_10240 += 1 else: assert chunks_smaller_than_10240 == 0 total_length += len(chunk) assert chunks_smaller_than_10240 == 1 assert expected_total_length == total_length done_event.set() class TestBadContentLength(SocketDummyServerTestCase): def test_enforce_content_length_get(self) -> None: done_event = Event() def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Length: 22\r\n" b"Content-type: text/plain\r\n" b"\r\n" b"hello, world" ) done_event.wait(LONG_TIMEOUT) sock.close() self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port, maxsize=1) as conn: # Test stream read when content length less than headers claim get_response = conn.request( "GET", url="/", preload_content=False, enforce_content_length=True ) data = get_response.stream(100) with pytest.raises(ProtocolError, match="12 bytes read, 10 more expected"): next(data) done_event.set() def test_enforce_content_length_no_body(self) -> None: done_event = Event() def socket_handler(listener: socket.socket) -> None: sock = listener.accept()[0] buf = b"" while not buf.endswith(b"\r\n\r\n"): buf += sock.recv(65536) sock.send( b"HTTP/1.1 200 OK\r\n" b"Content-Length: 22\r\n" b"Content-type: text/plain\r\n" b"\r\n" ) done_event.wait(1) sock.close() self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port, maxsize=1) as conn: # Test stream on 0 length body head_response = conn.request( "HEAD", url="/", preload_content=False, enforce_content_length=True ) data = [chunk for chunk in head_response.stream(1)] assert len(data) == 0 done_event.set() class TestRetryPoolSizeDrainFail(SocketDummyServerTestCase): def test_pool_size_retry_drain_fail(self) -> None: def socket_handler(listener: socket.socket) -> None: for _ in range(2): sock = listener.accept()[0] while not sock.recv(65536).endswith(b"\r\n\r\n"): pass # send a response with an invalid content length -- this causes # a ProtocolError to raise when trying to drain the connection sock.send( b"HTTP/1.1 404 NOT FOUND\r\n" b"Content-Length: 1000\r\n" b"Content-Type: text/plain\r\n" b"\r\n" ) sock.close() self._start_server(socket_handler) retries = Retry(total=1, raise_on_status=False, status_forcelist=[404]) with HTTPConnectionPool( self.host, self.port, maxsize=10, retries=retries, block=True ) as pool: pool.urlopen("GET", "/not_found", preload_content=False) assert pool.num_connections == 1 class TestBrokenPipe(SocketDummyServerTestCase): @notWindows() def test_ignore_broken_pipe_errors(self, monkeypatch: pytest.MonkeyPatch) -> None: # On Windows an aborted connection raises an error on # attempts to read data out of a socket that's been closed. sock_shut = Event() orig_connect = HTTPConnection.connect # a buffer that will cause two sendall calls buf = "a" * 1024 * 1024 * 4 def connect_and_wait(*args: typing.Any, **kw: typing.Any) -> None: ret = orig_connect(*args, **kw) assert sock_shut.wait(5) return ret def socket_handler(listener: socket.socket) -> None: for i in range(2): sock = listener.accept()[0] sock.send( b"HTTP/1.1 404 Not Found\r\n" b"Connection: close\r\n" b"Content-Length: 10\r\n" b"\r\n" b"xxxxxxxxxx" ) sock.shutdown(socket.SHUT_RDWR) sock_shut.set() sock.close() monkeypatch.setattr(HTTPConnection, "connect", connect_and_wait) self._start_server(socket_handler) with HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("POST", "/", body=buf) assert r.status == 404 assert r.headers["content-length"] == "10" assert r.data == b"xxxxxxxxxx" r = pool.request("POST", "/admin", chunked=True, body=buf) assert r.status == 404 assert r.headers["content-length"] == "10" assert r.data == b"xxxxxxxxxx" class TestMultipartResponse(SocketDummyServerTestCase): def test_multipart_assert_header_parsing_no_defects(self) -> None: quit_event = threading.Event() def socket_handler(listener: socket.socket) -> None: for _ in range(2): listener.settimeout(LONG_TIMEOUT) while True: if quit_event and quit_event.is_set(): return try: sock = listener.accept()[0] break except (TimeoutError, socket.timeout): continue sock.settimeout(LONG_TIMEOUT) while True: if quit_event and quit_event.is_set(): sock.close() return if sock.recv(65536).endswith(b"\r\n\r\n"): break sock.sendall( b"HTTP/1.1 404 Not Found\r\n" b"Server: example.com\r\n" b"Content-Type: multipart/mixed; boundary=36eeb8c4e26d842a\r\n" b"Content-Length: 73\r\n" b"\r\n" b"--36eeb8c4e26d842a\r\n" b"Content-Type: text/plain\r\n" b"\r\n" b"1\r\n" b"--36eeb8c4e26d842a--\r\n", ) sock.close() self._start_server(socket_handler, quit_event=quit_event) from urllib3.connectionpool import log with mock.patch.object(log, "warning") as log_warning: with HTTPConnectionPool(self.host, self.port, timeout=LONG_TIMEOUT) as pool: resp = pool.urlopen("GET", "/") assert resp.status == 404 assert ( resp.headers["content-type"] == "multipart/mixed; boundary=36eeb8c4e26d842a" ) assert len(resp.data) == 73 log_warning.assert_not_called() class TestContentFraming(SocketDummyServerTestCase): @pytest.mark.parametrize("content_length", [None, 0]) @pytest.mark.parametrize("method", ["POST", "PUT", "PATCH"]) def test_content_length_0_by_default( self, method: str, content_length: int | None ) -> None: buffer = bytearray() def socket_handler(listener: socket.socket) -> None: nonlocal buffer sock = listener.accept()[0] while not buffer.endswith(b"\r\n\r\n"): buffer += sock.recv(65536) sock.sendall( b"HTTP/1.1 200 OK\r\n" b"Server: example.com\r\n" b"Content-Length: 0\r\n\r\n" ) sock.close() self._start_server(socket_handler) headers = {} if content_length is not None: headers["Content-Length"] = str(content_length) with HTTPConnectionPool(self.host, self.port, timeout=3) as pool: resp = pool.request(method, "/") assert resp.status == 200 sent_bytes = bytes(buffer) assert b"Accept-Encoding: identity\r\n" in sent_bytes assert b"Content-Length: 0\r\n" in sent_bytes assert b"transfer-encoding" not in sent_bytes.lower() @pytest.mark.parametrize("chunked", [True, False]) @pytest.mark.parametrize("method", ["POST", "PUT", "PATCH"]) @pytest.mark.parametrize("body_type", ["file", "generator", "bytes"]) def test_chunked_specified( self, method: str, chunked: bool, body_type: str ) -> None: quit_event = threading.Event() buffer = bytearray() expected_bytes = b"\r\n\r\na\r\nxxxxxxxxxx\r\n0\r\n\r\n" def socket_handler(listener: socket.socket) -> None: nonlocal buffer listener.settimeout(LONG_TIMEOUT) while True: if quit_event.is_set(): return try: sock = listener.accept()[0] break except (TimeoutError, socket.timeout): continue sock.settimeout(LONG_TIMEOUT) while expected_bytes not in buffer: if quit_event.is_set(): return with contextlib.suppress(BlockingIOError): buffer += sock.recv(65536) sock.sendall( b"HTTP/1.1 200 OK\r\n" b"Server: example.com\r\n" b"Content-Length: 0\r\n\r\n" ) sock.close() self._start_server(socket_handler, quit_event=quit_event) body: typing.Any if body_type == "generator": def body_generator() -> typing.Generator[bytes, None, None]: yield b"x" * 10 body = body_generator() elif body_type == "file": body = io.BytesIO(b"x" * 10) body.seek(0, 0) else: if chunked is False: pytest.skip("urllib3 uses Content-Length in this case") body = b"x" * 10 with HTTPConnectionPool( self.host, self.port, timeout=LONG_TIMEOUT, retries=False ) as pool: resp = pool.request(method, "/", chunked=chunked, body=body) assert resp.status == 200 sent_bytes = bytes(buffer) assert sent_bytes.count(b":") == 5 assert b"Host: localhost:" in sent_bytes assert b"Accept-Encoding: identity\r\n" in sent_bytes assert b"Transfer-Encoding: chunked\r\n" in sent_bytes assert b"User-Agent: python-urllib3/" in sent_bytes assert b"content-length" not in sent_bytes.lower() assert expected_bytes in sent_bytes @pytest.mark.parametrize("method", ["POST", "PUT", "PATCH"]) @pytest.mark.parametrize( "body_type", ["file", "generator", "bytes", "bytearray", "file_text"] ) def test_chunked_not_specified(self, method: str, body_type: str) -> None: buffer = bytearray() expected_bytes: bytes body: typing.Any if body_type == "generator": def body_generator() -> typing.Generator[bytes, None, None]: yield b"x" * 10 body = body_generator() should_be_chunked = True elif body_type == "file": body = io.BytesIO(b"x" * 10) body.seek(0, 0) should_be_chunked = True elif body_type == "file_text": body = io.StringIO("x" * 10) body.seek(0, 0) should_be_chunked = True elif body_type == "bytearray": body = bytearray(b"x" * 10) should_be_chunked = False else: body = b"x" * 10 should_be_chunked = False if should_be_chunked: expected_bytes = b"\r\n\r\na\r\nxxxxxxxxxx\r\n0\r\n\r\n" else: expected_bytes = b"\r\n\r\nxxxxxxxxxx" def socket_handler(listener: socket.socket) -> None: nonlocal buffer sock = listener.accept()[0] sock.settimeout(0) while expected_bytes not in buffer: with contextlib.suppress(BlockingIOError): buffer += sock.recv(65536) sock.sendall( b"HTTP/1.1 200 OK\r\n" b"Server: example.com\r\n" b"Content-Length: 0\r\n\r\n" ) sock.close() self._start_server(socket_handler) with HTTPConnectionPool( self.host, self.port, timeout=LONG_TIMEOUT, retries=False ) as pool: resp = pool.request(method, "/", body=body) assert resp.status == 200 sent_bytes = bytes(buffer) assert sent_bytes.count(b":") == 5 assert b"Host: localhost:" in sent_bytes assert b"Accept-Encoding: identity\r\n" in sent_bytes assert b"User-Agent: python-urllib3/" in sent_bytes if should_be_chunked: assert b"content-length" not in sent_bytes.lower() assert b"Transfer-Encoding: chunked\r\n" in sent_bytes assert expected_bytes in sent_bytes else: assert b"Content-Length: 10\r\n" in sent_bytes assert b"transfer-encoding" not in sent_bytes.lower() assert sent_bytes.endswith(expected_bytes) @pytest.mark.parametrize( "header_transform", [str.lower, str.title, str.upper], ) @pytest.mark.parametrize( ["header", "header_value", "expected"], [ ("content-length", "10", b": 10\r\n\r\nxxxxxxxx"), ( "transfer-encoding", "chunked", b": chunked\r\n\r\n8\r\nxxxxxxxx\r\n0\r\n\r\n", ), ], ) def test_framing_set_via_headers( self, header_transform: typing.Callable[[str], str], header: str, header_value: str, expected: bytes, ) -> None: buffer = bytearray() def socket_handler(listener: socket.socket) -> None: nonlocal buffer sock = listener.accept()[0] sock.settimeout(0) while expected not in buffer: with contextlib.suppress(BlockingIOError): buffer += sock.recv(65536) sock.sendall( b"HTTP/1.1 200 OK\r\n" b"Server: example.com\r\n" b"Content-Length: 0\r\n\r\n" ) sock.close() self._start_server(socket_handler) with HTTPConnectionPool( self.host, self.port, timeout=LONG_TIMEOUT, retries=False ) as pool: resp = pool.request( "POST", "/", body=b"xxxxxxxx", headers={header_transform(header): header_value}, ) assert resp.status == 200 sent_bytes = bytes(buffer) assert sent_bytes.endswith(expected) test_no_ssl.py 0000644 00000002171 15025306245 0007455 0 ustar 00 """ Test connections without the builtin ssl module Note: Import urllib3 inside the test functions to get the importblocker to work """ from __future__ import annotations import pytest import urllib3 from dummyserver.testcase import ( HTTPSHypercornDummyServerTestCase, HypercornDummyServerTestCase, ) from urllib3.exceptions import InsecureRequestWarning from ..test_no_ssl import TestWithoutSSL class TestHTTPWithoutSSL(HypercornDummyServerTestCase, TestWithoutSSL): def test_simple(self) -> None: with urllib3.HTTPConnectionPool(self.host, self.port) as pool: r = pool.request("GET", "/") assert r.status == 200, r.data class TestHTTPSWithoutSSL(HTTPSHypercornDummyServerTestCase, TestWithoutSSL): def test_simple(self) -> None: with urllib3.HTTPSConnectionPool( self.host, self.port, cert_reqs="NONE" ) as pool: with pytest.warns(InsecureRequestWarning): try: pool.request("GET", "/") except urllib3.exceptions.SSLError as e: assert "SSL module is not available" in str(e) __init__.py 0000644 00000000000 15025306245 0006645 0 ustar 00
| ver. 1.4 |
Github
|
.
| PHP 8.2.28 | Generation time: 0.02 |
proxy
|
phpinfo
|
Settings