File manager - Edit - /home/u478019808/domains/bestandroidphones.store/public_html/static/img/logo/dummyserver.tar
Back
testcase.py 0000644 00000031667 15025234504 0006746 0 ustar 00 from __future__ import annotations import contextlib import socket import ssl import threading import typing from test import LONG_TIMEOUT import hypercorn import pytest from dummyserver.app import hypercorn_app from dummyserver.asgi_proxy import ProxyApp from dummyserver.hypercornserver import run_hypercorn_in_thread from dummyserver.socketserver import DEFAULT_CERTS, HAS_IPV6, SocketServerThread from urllib3.connection import HTTPConnection from urllib3.util.ssltransport import SSLTransport from urllib3.util.url import parse_url def consume_socket( sock: SSLTransport | socket.socket, chunks: int = 65536, quit_event: threading.Event | None = None, ) -> bytearray: consumed = bytearray() sock.settimeout(LONG_TIMEOUT) while True: if quit_event and quit_event.is_set(): break try: b = sock.recv(chunks) except (TimeoutError, socket.timeout): continue assert isinstance(b, bytes) consumed += b if b.endswith(b"\r\n\r\n"): break return consumed class SocketDummyServerTestCase: """ A simple socket-based server is created for this class that is good for exactly one request. """ scheme = "http" host = "localhost" server_thread: typing.ClassVar[SocketServerThread] port: typing.ClassVar[int] tmpdir: typing.ClassVar[str] ca_path: typing.ClassVar[str] cert_combined_path: typing.ClassVar[str] cert_path: typing.ClassVar[str] key_path: typing.ClassVar[str] password_key_path: typing.ClassVar[str] server_context: typing.ClassVar[ssl.SSLContext] client_context: typing.ClassVar[ssl.SSLContext] proxy_server: typing.ClassVar[SocketDummyServerTestCase] @classmethod def _start_server( cls, socket_handler: typing.Callable[[socket.socket], None], quit_event: threading.Event | None = None, ) -> None: ready_event = threading.Event() cls.server_thread = SocketServerThread( socket_handler=socket_handler, ready_event=ready_event, host=cls.host, quit_event=quit_event, ) cls.server_thread.start() ready_event.wait(5) if not ready_event.is_set(): raise Exception("most likely failed to start server") cls.port = cls.server_thread.port @classmethod def start_response_handler( cls, response: bytes, num: int = 1, block_send: threading.Event | None = None, ) -> threading.Event: ready_event = threading.Event() quit_event = threading.Event() def socket_handler(listener: socket.socket) -> None: for _ in range(num): ready_event.set() listener.settimeout(LONG_TIMEOUT) while True: if quit_event.is_set(): return try: sock = listener.accept()[0] break except (TimeoutError, socket.timeout): continue consume_socket(sock, quit_event=quit_event) if quit_event.is_set(): sock.close() return if block_send: while not block_send.wait(LONG_TIMEOUT): if quit_event.is_set(): sock.close() return block_send.clear() sock.send(response) sock.close() cls._start_server(socket_handler, quit_event=quit_event) return ready_event @classmethod def start_basic_handler( cls, num: int = 1, block_send: threading.Event | None = None ) -> threading.Event: return cls.start_response_handler( b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n", num, block_send, ) @staticmethod def quit_server_thread(server_thread: SocketServerThread) -> None: if server_thread.quit_event: server_thread.quit_event.set() # in principle the maximum time that the thread can take to notice # the quit_event is LONG_TIMEOUT and the thread should terminate # shortly after that, we give 5 seconds leeway just in case server_thread.join(LONG_TIMEOUT * 2 + 5.0) if server_thread.is_alive(): raise Exception("server_thread did not exit") @classmethod def teardown_class(cls) -> None: if hasattr(cls, "server_thread"): cls.quit_server_thread(cls.server_thread) def teardown_method(self) -> None: if hasattr(self, "server_thread"): self.quit_server_thread(self.server_thread) def assert_header_received( self, received_headers: typing.Iterable[bytes], header_name: str, expected_value: str | None = None, ) -> None: header_name_bytes = header_name.encode("ascii") if expected_value is None: expected_value_bytes = None else: expected_value_bytes = expected_value.encode("ascii") header_titles = [] for header in received_headers: key, value = header.split(b": ") header_titles.append(key) if key == header_name_bytes and expected_value_bytes is not None: assert value == expected_value_bytes assert header_name_bytes in header_titles class IPV4SocketDummyServerTestCase(SocketDummyServerTestCase): @classmethod def _start_server( cls, socket_handler: typing.Callable[[socket.socket], None], quit_event: threading.Event | None = None, ) -> None: ready_event = threading.Event() cls.server_thread = SocketServerThread( socket_handler=socket_handler, ready_event=ready_event, host=cls.host, quit_event=quit_event, ) cls.server_thread.USE_IPV6 = False cls.server_thread.start() ready_event.wait(5) if not ready_event.is_set(): raise Exception("most likely failed to start server") cls.port = cls.server_thread.port class HypercornDummyServerTestCase: host = "localhost" host_alt = "127.0.0.1" port: typing.ClassVar[int] base_url: typing.ClassVar[str] base_url_alt: typing.ClassVar[str] certs: typing.ClassVar[dict[str, typing.Any]] = {} _stack: typing.ClassVar[contextlib.ExitStack] @classmethod def setup_class(cls) -> None: with contextlib.ExitStack() as stack: config = hypercorn.Config() if cls.certs: config.certfile = cls.certs["certfile"] config.keyfile = cls.certs["keyfile"] config.verify_mode = cls.certs["cert_reqs"] config.ca_certs = cls.certs["ca_certs"] config.alpn_protocols = cls.certs["alpn_protocols"] config.bind = [f"{cls.host}:0"] stack.enter_context(run_hypercorn_in_thread(config, hypercorn_app)) cls._stack = stack.pop_all() cls.port = typing.cast(int, parse_url(config.bind[0]).port) @classmethod def teardown_class(cls) -> None: cls._stack.close() class HTTPSHypercornDummyServerTestCase(HypercornDummyServerTestCase): scheme = "https" host = "localhost" certs = DEFAULT_CERTS certs_dir = "" bad_ca_path = "" class HypercornDummyProxyTestCase: http_host: typing.ClassVar[str] = "localhost" http_host_alt: typing.ClassVar[str] = "127.0.0.1" http_port: typing.ClassVar[int] http_url: typing.ClassVar[str] http_url_alt: typing.ClassVar[str] https_host: typing.ClassVar[str] = "localhost" https_host_alt: typing.ClassVar[str] = "127.0.0.1" https_certs: typing.ClassVar[dict[str, typing.Any]] = DEFAULT_CERTS https_port: typing.ClassVar[int] https_url: typing.ClassVar[str] https_url_alt: typing.ClassVar[str] https_url_fqdn: typing.ClassVar[str] proxy_host: typing.ClassVar[str] = "localhost" proxy_host_alt: typing.ClassVar[str] = "127.0.0.1" proxy_port: typing.ClassVar[int] proxy_url: typing.ClassVar[str] https_proxy_port: typing.ClassVar[int] https_proxy_url: typing.ClassVar[str] certs_dir: typing.ClassVar[str] = "" bad_ca_path: typing.ClassVar[str] = "" server_thread: typing.ClassVar[threading.Thread] _stack: typing.ClassVar[contextlib.ExitStack] @classmethod def setup_class(cls) -> None: with contextlib.ExitStack() as stack: http_server_config = hypercorn.Config() http_server_config.bind = [f"{cls.http_host}:0"] stack.enter_context( run_hypercorn_in_thread(http_server_config, hypercorn_app) ) cls.http_port = typing.cast(int, parse_url(http_server_config.bind[0]).port) https_server_config = hypercorn.Config() https_server_config.certfile = cls.https_certs["certfile"] https_server_config.keyfile = cls.https_certs["keyfile"] https_server_config.verify_mode = cls.https_certs["cert_reqs"] https_server_config.ca_certs = cls.https_certs["ca_certs"] https_server_config.alpn_protocols = cls.https_certs["alpn_protocols"] https_server_config.bind = [f"{cls.https_host}:0"] stack.enter_context( run_hypercorn_in_thread(https_server_config, hypercorn_app) ) cls.https_port = typing.cast( int, parse_url(https_server_config.bind[0]).port ) http_proxy_config = hypercorn.Config() http_proxy_config.bind = [f"{cls.proxy_host}:0"] stack.enter_context(run_hypercorn_in_thread(http_proxy_config, ProxyApp())) cls.proxy_port = typing.cast(int, parse_url(http_proxy_config.bind[0]).port) https_proxy_config = hypercorn.Config() https_proxy_config.certfile = cls.https_certs["certfile"] https_proxy_config.keyfile = cls.https_certs["keyfile"] https_proxy_config.verify_mode = cls.https_certs["cert_reqs"] https_proxy_config.ca_certs = cls.https_certs["ca_certs"] https_proxy_config.alpn_protocols = cls.https_certs["alpn_protocols"] https_proxy_config.bind = [f"{cls.proxy_host}:0"] upstream_ca_certs = cls.https_certs.get("ca_certs") stack.enter_context( run_hypercorn_in_thread(https_proxy_config, ProxyApp(upstream_ca_certs)) ) cls.https_proxy_port = typing.cast( int, parse_url(https_proxy_config.bind[0]).port ) cls._stack = stack.pop_all() @classmethod def teardown_class(cls) -> None: cls._stack.close() @pytest.mark.skipif(not HAS_IPV6, reason="IPv6 not available") class IPv6HypercornDummyServerTestCase(HypercornDummyServerTestCase): host = "::1" @pytest.mark.skipif(not HAS_IPV6, reason="IPv6 not available") class IPv6HypercornDummyProxyTestCase(HypercornDummyProxyTestCase): http_host = "localhost" http_host_alt = "127.0.0.1" https_host = "localhost" https_host_alt = "127.0.0.1" https_certs = DEFAULT_CERTS proxy_host = "::1" proxy_host_alt = "127.0.0.1" class ConnectionMarker: """ Marks an HTTP(S)Connection's socket after a request was made. Helps a test server understand when a client finished a request, without implementing a complete HTTP server. """ MARK_FORMAT = b"$#MARK%04x*!" @classmethod @contextlib.contextmanager def mark( cls, monkeypatch: pytest.MonkeyPatch ) -> typing.Generator[None, None, None]: """ Mark connections under in that context. """ orig_request = HTTPConnection.request def call_and_mark( target: typing.Callable[..., None] ) -> typing.Callable[..., None]: def part( self: HTTPConnection, *args: typing.Any, **kwargs: typing.Any ) -> None: target(self, *args, **kwargs) self.sock.sendall(cls._get_socket_mark(self.sock, False)) return part with monkeypatch.context() as m: m.setattr(HTTPConnection, "request", call_and_mark(orig_request)) yield @classmethod def consume_request(cls, sock: socket.socket, chunks: int = 65536) -> bytearray: """ Consume a socket until after the HTTP request is sent. """ consumed = bytearray() mark = cls._get_socket_mark(sock, True) while True: b = sock.recv(chunks) if not b: break consumed += b if consumed.endswith(mark): break return consumed @classmethod def _get_socket_mark(cls, sock: socket.socket, server: bool) -> bytes: if server: port = sock.getpeername()[1] else: port = sock.getsockname()[1] return cls.MARK_FORMAT % (port,) asgi_proxy.py 0000644 00000006765 15025234504 0007320 0 ustar 00 from __future__ import annotations import typing import httpx import trio from hypercorn.typing import ( ASGIReceiveCallable, ASGISendCallable, HTTPResponseBodyEvent, HTTPResponseStartEvent, HTTPScope, Scope, ) async def _read_body(receive: ASGIReceiveCallable) -> bytes: body = bytearray() body_consumed = False while not body_consumed: event = await receive() if event["type"] == "http.request": body.extend(event["body"]) body_consumed = not event["more_body"] else: raise ValueError(event["type"]) return bytes(body) class ProxyApp: def __init__(self, upstream_ca_certs: str | None = None): self.upstream_ca_certs = upstream_ca_certs async def __call__( self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable ) -> None: assert scope["type"] == "http" if scope["method"] in ["GET", "POST"]: await self.absolute_uri(scope, receive, send) elif scope["method"] == "CONNECT": await self.connect(scope, send) else: raise ValueError(scope["method"]) async def absolute_uri( self, scope: HTTPScope, receive: ASGIReceiveCallable, send: ASGISendCallable, ) -> None: async with httpx.AsyncClient(verify=self.upstream_ca_certs or True) as client: client_response = await client.request( method=scope["method"], url=scope["path"], headers=list(scope["headers"]), content=await _read_body(receive), ) headers = [] for header in ( "Date", "Cache-Control", "Server", "Content-Type", "Location", ): v = client_response.headers.get(header) if v: headers.append((header.encode(), v.encode())) headers.append((b"Content-Length", str(len(client_response.content)).encode())) await send( HTTPResponseStartEvent( type="http.response.start", status=client_response.status_code, headers=headers, ) ) await send( HTTPResponseBodyEvent( type="http.response.body", body=client_response.content, more_body=False, ) ) async def connect(self, scope: HTTPScope, send: ASGISendCallable) -> None: async def start_forward( reader: trio.SocketStream, writer: trio.SocketStream ) -> None: while True: try: data = await reader.receive_some(4096) except trio.ClosedResourceError: break if not data: break await writer.send_all(data) await writer.aclose() host, port = scope["path"].split(":") async with await trio.open_tcp_stream(host, int(port)) as upstream: await send({"type": "http.response.start", "status": 200, "headers": []}) await send({"type": "http.response.body", "body": b"", "more_body": True}) client = typing.cast(trio.SocketStream, scope["extensions"]["_transport"]) async with trio.open_nursery(strict_exception_groups=True) as nursery: nursery.start_soon(start_forward, client, upstream) nursery.start_soon(start_forward, upstream, client) hypercornserver.py 0000644 00000004334 15025234504 0010362 0 ustar 00 from __future__ import annotations import concurrent.futures import contextlib import functools import sys import threading from typing import Generator import hypercorn import hypercorn.trio import hypercorn.typing import trio from quart_trio import QuartTrio # https://github.com/pgjones/hypercorn/blob/19dfb96411575a6a647cdea63fa581b48ebb9180/src/hypercorn/utils.py#L172-L178 async def graceful_shutdown(shutdown_event: threading.Event) -> None: while True: if shutdown_event.is_set(): return await trio.sleep(0.1) async def _start_server( config: hypercorn.Config, app: QuartTrio, ready_event: threading.Event, shutdown_event: threading.Event, ) -> None: async with trio.open_nursery() as nursery: config.bind = await nursery.start( functools.partial( hypercorn.trio.serve, app, config, shutdown_trigger=functools.partial(graceful_shutdown, shutdown_event), ) ) ready_event.set() @contextlib.contextmanager def run_hypercorn_in_thread( config: hypercorn.Config, app: hypercorn.typing.ASGIFramework ) -> Generator[None, None, None]: ready_event = threading.Event() shutdown_event = threading.Event() with concurrent.futures.ThreadPoolExecutor( 1, thread_name_prefix="hypercorn dummyserver" ) as executor: future = executor.submit( trio.run, _start_server, config, app, ready_event, shutdown_event, ) ready_event.wait(5) if not ready_event.is_set(): raise Exception("most likely failed to start server") try: yield finally: shutdown_event.set() future.result() def main() -> int: # For debugging dummyserver itself - PYTHONPATH=src python -m dummyserver.hypercornserver from .app import hypercorn_app config = hypercorn.Config() config.bind = ["localhost:0"] ready_event = threading.Event() shutdown_event = threading.Event() trio.run(_start_server, config, hypercorn_app, ready_event, shutdown_event) return 0 if __name__ == "__main__": sys.exit(main()) socketserver.py 0000644 00000013027 15025234504 0007640 0 ustar 00 #!/usr/bin/env python """ Dummy server used for unit testing. """ from __future__ import annotations import logging import os import socket import ssl import sys import threading import typing import warnings import trustme from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from urllib3.exceptions import HTTPWarning from urllib3.util import resolve_cert_reqs, resolve_ssl_version if typing.TYPE_CHECKING: from typing_extensions import ParamSpec P = ParamSpec("P") log = logging.getLogger(__name__) CERTS_PATH = os.path.join(os.path.dirname(__file__), "certs") DEFAULT_CERTS: dict[str, typing.Any] = { "certfile": os.path.join(CERTS_PATH, "server.crt"), "keyfile": os.path.join(CERTS_PATH, "server.key"), "cert_reqs": ssl.CERT_OPTIONAL, "ca_certs": os.path.join(CERTS_PATH, "cacert.pem"), "alpn_protocols": ["h2", "http/1.1"], } DEFAULT_CA = os.path.join(CERTS_PATH, "cacert.pem") DEFAULT_CA_KEY = os.path.join(CERTS_PATH, "cacert.key") def _resolves_to_ipv6(host: str) -> bool: """Returns True if the system resolves host to an IPv6 address by default.""" resolves_to_ipv6 = False try: for res in socket.getaddrinfo(host, None, socket.AF_UNSPEC): af, _, _, _, _ = res if af == socket.AF_INET6: resolves_to_ipv6 = True except socket.gaierror: pass return resolves_to_ipv6 def _has_ipv6(host: str) -> bool: """Returns True if the system can bind an IPv6 address.""" sock = None has_ipv6 = False if socket.has_ipv6: # has_ipv6 returns true if cPython was compiled with IPv6 support. # It does not tell us if the system has IPv6 support enabled. To # determine that we must bind to an IPv6 address. # https://github.com/urllib3/urllib3/pull/611 # https://bugs.python.org/issue658327 try: sock = socket.socket(socket.AF_INET6) sock.bind((host, 0)) has_ipv6 = _resolves_to_ipv6("localhost") except Exception: pass if sock: sock.close() return has_ipv6 # Some systems may have IPv6 support but DNS may not be configured # properly. We can not count that localhost will resolve to ::1 on all # systems. See https://github.com/urllib3/urllib3/pull/611 and # https://bugs.python.org/issue18792 HAS_IPV6_AND_DNS = _has_ipv6("localhost") HAS_IPV6 = _has_ipv6("::1") # Different types of servers we have: class NoIPv6Warning(HTTPWarning): "IPv6 is not available" class SocketServerThread(threading.Thread): """ :param socket_handler: Callable which receives a socket argument for one request. :param ready_event: Event which gets set when the socket handler is ready to receive requests. """ USE_IPV6 = HAS_IPV6_AND_DNS def __init__( self, socket_handler: typing.Callable[[socket.socket], None], host: str = "localhost", ready_event: threading.Event | None = None, quit_event: threading.Event | None = None, ) -> None: super().__init__() self.daemon = True self.socket_handler = socket_handler self.host = host self.ready_event = ready_event self.quit_event = quit_event def _start_server(self) -> None: if self.USE_IPV6: sock = socket.socket(socket.AF_INET6) else: warnings.warn("No IPv6 support. Falling back to IPv4.", NoIPv6Warning) sock = socket.socket(socket.AF_INET) if sys.platform != "win32": sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) with sock: sock.bind((self.host, 0)) self.port = sock.getsockname()[1] # Once listen() returns, the server socket is ready sock.listen(1) if self.ready_event: self.ready_event.set() self.socket_handler(sock) def run(self) -> None: self._start_server() def ssl_options_to_context( # type: ignore[no-untyped-def] keyfile=None, certfile=None, server_side=None, cert_reqs=None, ssl_version: str | int | None = None, ca_certs=None, do_handshake_on_connect=None, suppress_ragged_eofs=None, ciphers=None, alpn_protocols=None, ) -> ssl.SSLContext: """Return an equivalent SSLContext based on ssl.wrap_socket args.""" ssl_version = resolve_ssl_version(ssl_version) cert_none = resolve_cert_reqs("CERT_NONE") if cert_reqs is None: cert_reqs = cert_none else: cert_reqs = resolve_cert_reqs(cert_reqs) ctx = ssl.SSLContext(ssl_version) ctx.load_cert_chain(certfile, keyfile) ctx.verify_mode = cert_reqs if ctx.verify_mode != cert_none: ctx.load_verify_locations(cafile=ca_certs) if alpn_protocols and hasattr(ctx, "set_alpn_protocols"): try: ctx.set_alpn_protocols(alpn_protocols) except NotImplementedError: pass return ctx def get_unreachable_address() -> tuple[str, int]: # reserved as per rfc2606 return ("something.invalid", 54321) def encrypt_key_pem(private_key_pem: trustme.Blob, password: bytes) -> trustme.Blob: private_key = serialization.load_pem_private_key( private_key_pem.bytes(), password=None, backend=default_backend() ) encrypted_key = private_key.private_bytes( serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.BestAvailableEncryption(password), ) return trustme.Blob(encrypted_key) certs/server.crt 0000644 00000002361 15025234504 0007706 0 ustar 00 -----BEGIN CERTIFICATE----- MIIDeTCCAmGgAwIBAgIUQeadxkH6YMoSecB2rNbFEr1u1kUwDQYJKoZIhvcNAQEL BQAwQDEXMBUGA1UECgwOdHJ1c3RtZSB2MC41LjMxJTAjBgNVBAsMHFRlc3Rpbmcg Q0EgIzdEUWJWOTBzV2xZSEstY0wwHhcNMDAwMTAxMDAwMDAwWhcNMzgwMTAxMDAw MDAwWjBCMRcwFQYDVQQKDA50cnVzdG1lIHYwLjUuMzEnMCUGA1UECwweVGVzdGlu ZyBjZXJ0ICMtSGRsMnMyTEYyeVp0NDFOMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A MIIBCgKCAQEArRLZX+5dyCh4N7q90sH2Q4Ea6QLK8OfoUQPWtpzAtINDUAdfSXCC /qYTtGeSCGjB4W0LfvRTI8afHoD/M+YpaCRnx7T1sy1taA2rnGrEVXEHalVP+RI4 t4ZWtX56aez2M0Fs6o4MtzAuP6fKgSdWzIvOmtCxqn0Zf2KbfEHnQylsy2LgPa/x Lg50fbZ195+h4EAB3d2/jqaeFGGhN+7zrrv4+L1eeW3bzOkvPEkTNepq3Gy/8r5e 0i2icEnM+eBfl8NYgQ1toJYvDIy5Qi2TRzaFxBVmqUOc8EFtHpL7E9YLbTTW15xd oLVLdXI5igGxkwPYoeiiAJWxIsC/hL1RRQIDAQABo2kwZzAdBgNVHQ4EFgQUMU6+ uwNmL8TxLwjrj7jzzlwDPiowDAYDVR0TAQH/BAIwADAfBgNVHSMEGDAWgBR1n9/R 563QoSo7DgaARp9VPtFYMTAXBgNVHREBAf8EDTALgglsb2NhbGhvc3QwDQYJKoZI hvcNAQELBQADggEBAJ6w5neQKw+/efA/I3IHzt8GaSHQ/YehMHx8GxCViJUmLg6P Vf74k856Knvh7IsVaqF1uRi6qQaFPik6CwtBCj7/ZftdseOCDljd+8EWyQ+ZWie7 +tzMIdQWZxYSdR9Ov42VD++a6oWJtfJhWV5eyDit99FFK31/M1ZXoceiDS5AsIG6 wfsxrFj1qV9pLNSIlfrnycYhYx7avVJTf+2mfZgTO9Tx+VPapkZrfCnP/2jpN39u zblFFjP9Ir0QqBw7MXjVX+Y1HkQ2TQnEeSsp1HuFRIZYx72Cttnckv1Lxcx/HiQB oebTDYiRfxOAEeIMgIhX88Jca8vNIRcXDeGK9mU= -----END CERTIFICATE----- certs/README.rst 0000644 00000000777 15025234504 0007366 0 ustar 00 Generating new certificates --------------------------- Here's how you can regenerate the certificates:: import trustme ca = trustme.CA() server_cert = ca.issue_cert("localhost") ca.cert_pem.write_to_path("cacert.pem") ca.private_key_pem.write_to_path("cacert.key") server_cert.cert_chain_pems[0].write_to_path("server.crt") server_cert.private_key_pem.write_to_path("server.key") This will break a number of tests: you will need to update the relevant fingerprints and hashes. certs/cacert.pem 0000644 00000002371 15025234504 0007633 0 ustar 00 -----BEGIN CERTIFICATE----- MIIDfzCCAmegAwIBAgIUVGEi+7bkaGRIPoQrp9zFFAT5JYQwDQYJKoZIhvcNAQEL BQAwQDEXMBUGA1UECgwOdHJ1c3RtZSB2MC41LjMxJTAjBgNVBAsMHFRlc3Rpbmcg Q0EgIzdEUWJWOTBzV2xZSEstY0wwHhcNMDAwMTAxMDAwMDAwWhcNMzgwMTAxMDAw MDAwWjBAMRcwFQYDVQQKDA50cnVzdG1lIHYwLjUuMzElMCMGA1UECwwcVGVzdGlu ZyBDQSAjN0RRYlY5MHNXbFlISy1jTDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC AQoCggEBAJ999DRCEmO8PhnmUdo4fDn9cdgraXp1kSrrg9ruBTmjdBtUjaZujsLp usc4b/d45o/rWa6fZGLwvR3Lxq4Mx/cj+euqSy32NkhBACWDTXlXanoGOfVtcGur xAzeVGK7lRG8poDx6CLYmIX53YBA22oDzeXYa3Tb2kTSNnOczJgQ5c84QhuwGW17 rYU6ejsE1cnu9X9pZzYSouq81ra3v44vPH6vEJuNOpdvTtUU85jbbTeQWZz1lPb+ BpwmCl3WOgTdVG+s/AvYZ2v6EYAamtGW7+5BLgLqobBUHrhQnwYUJuzXT3f+yoei 0vlSIgrtwDnHGRzrhTo7R2/ysp3ZoAsCAwEAAaNxMG8wHQYDVR0OBBYEFHWf39Hn rdChKjsOBoBGn1U+0VgxMBIGA1UdEwEB/wQIMAYBAf8CAQkwDgYDVR0PAQH/BAQD AgEGMCoGA1UdJQEB/wQgMB4GCCsGAQUFBwMCBggrBgEFBQcDAQYIKwYBBQUHAwMw DQYJKoZIhvcNAQELBQADggEBAIAylnWX2WTB+mrVVpE2W8i0HollTMJIPJA9Jq3Q /t2uPjXDEAVAcBmQju8qy2tHpamvzpQseVm3EF3UFNlGxwOGKsTzU4J45qOJITZk eLRAcWNEt6cgqj8ml8PuMHU7oDnp7pP6VPe5KQH1a0FYQnDNEwg7MyX+GjnXeRwd re6y9nMC+XKCYUAd1/nQcrZdnSsws1M5lzXir2vuyyN9EUkf2xMMKA2E1s0f+5he 3eNghAXtZw616ITBoMb7ckG6a0+YobbiQ0tKgB8D3MG2544Gx6xhCXf7pX4q4g// 1nTPeYFsBDyqEOEhcW1o9/MSSbjpUJC+QUmCb2Y1wYeum+w= -----END CERTIFICATE----- certs/cacert.key 0000644 00000003217 15025234504 0007642 0 ustar 00 -----BEGIN RSA PRIVATE KEY----- MIIEpAIBAAKCAQEAn330NEISY7w+GeZR2jh8Of1x2CtpenWRKuuD2u4FOaN0G1SN pm6Owum6xzhv93jmj+tZrp9kYvC9HcvGrgzH9yP566pLLfY2SEEAJYNNeVdqegY5 9W1wa6vEDN5UYruVEbymgPHoItiYhfndgEDbagPN5dhrdNvaRNI2c5zMmBDlzzhC G7AZbXuthTp6OwTVye71f2lnNhKi6rzWtre/ji88fq8Qm406l29O1RTzmNttN5BZ nPWU9v4GnCYKXdY6BN1Ub6z8C9hna/oRgBqa0Zbv7kEuAuqhsFQeuFCfBhQm7NdP d/7Kh6LS+VIiCu3AOccZHOuFOjtHb/KyndmgCwIDAQABAoIBAGQg9wc308O5kmNA LXMKszLU4nwMBRRUaua/JPB1LeKZs3LVCnjKP+YuRox76g87X8RKxOrUNnnHGXNz UzBB5ehKNcS2DKy2Pi3uYOEsJZ9gOgCRmCF0q3dtRo+tpNy3V0bjYMTjGhGGWXsC +wRhs15DNShvTkb3H3jFYFoEvo1YUKsvImBWJGwDbdDMfMZv4aeBWXlOrF+2fwt2 TM3G1o8xzEEWBB4FLZBW+tq2zfUSa1KwqqyQ4ZIqXepjQcN6nNfuHADA+nxuruVV LPUhz4ZmsBEnJ7CL9zWJkLUw/al9/6Q14tleRmiZTTztqAlFgZUpNhaKSzVdsIc/ Xz3+OgECgYEAzgNu7eFJAOzq+ZVFRrrbA7tu+f318FTCaADJ1kFgAJyj6b9yOGan LNL4TfXVjzgqtfQ4AIVqEHwXO7yS+YgTprvgzqRxqNRZ6ikuo2IPkIwXIAXZAlwd JsWLPBXOlOFW6LHvhYxjY2xF+A9y4AbuZ3UDRUQ+tp226VfEaeY80+ECgYEAxjDV cJqeBO06YRVGmcXfAYwRLJGT4hvIZeiSbxX/kJ0rx+cYLT/XZbAguJYQ5ZK2lkeA YneXYDlSTxmBxHxiWwWe3mcmctdE4Jbw8oIZ8a49a4KE/F2ojC4gmisIt3/OqGOw C4e/pDCE/QV64LWdazgUWHPGoVEmZx9/oMm/MWsCgYEAsLtlSJFB7ZdRpTcXLSxT gwoilDf36mrsNAipHjMLRrsaKwbf197If72k4kyJHspSabHO8TOC4A10aPzHIWZJ ZXo7y0prbyhs0mLt7Z/MNnbXx9L8bffT0lUZszwJ8tK1mf47utfK05opFDs8k0+e 6gYJ/jwjiMoYBmoSx76KZEECgYBagJxHAmQcbdQV1yhZOhFe3H5PMt8sBnHZj32m +o2slQkUDQRuTVPoHKikgeqPWxLDxzzqOiBHEYXzlvs6JW6okAV/G+1jzcenI2Y9 54k/YsirWnut3nsEIGBE5lfhq5xMKtGOQlwR9xITlLgK+wQ6nO41ghD3Q15dAvY+ D0KepwKBgQChHvbyzw0t76J2gLxUSyuG7VsId651bpqTYUsbSDFlRo4g8UbBAkHd fdv5BOon3ALJFreSK+a78es0kpiLwrS2SqG/y3mb9aUoLpCVB1haDmmP4Rn4AYXz OCfUkusuSoXOR8CMjqkXYl5QjeJAUAt9GTsZnXIbOQKbaZwkeV0HEg== -----END RSA PRIVATE KEY----- certs/server.key 0000644 00000003217 15025234504 0007707 0 ustar 00 -----BEGIN RSA PRIVATE KEY----- MIIEpQIBAAKCAQEArRLZX+5dyCh4N7q90sH2Q4Ea6QLK8OfoUQPWtpzAtINDUAdf SXCC/qYTtGeSCGjB4W0LfvRTI8afHoD/M+YpaCRnx7T1sy1taA2rnGrEVXEHalVP +RI4t4ZWtX56aez2M0Fs6o4MtzAuP6fKgSdWzIvOmtCxqn0Zf2KbfEHnQylsy2Lg Pa/xLg50fbZ195+h4EAB3d2/jqaeFGGhN+7zrrv4+L1eeW3bzOkvPEkTNepq3Gy/ 8r5e0i2icEnM+eBfl8NYgQ1toJYvDIy5Qi2TRzaFxBVmqUOc8EFtHpL7E9YLbTTW 15xdoLVLdXI5igGxkwPYoeiiAJWxIsC/hL1RRQIDAQABAoIBAQCZ/62f6G9WDHx7 yhPhlmjTw+r37l45YYCbpbjFoFDvzeR1LzogFJbak1fxLD8KcHwjY23ZNvlLWg53 i/yIZ4Hsgog9cM0283LoJVHPykiMZhhdCzAvxYDl/AjnUXUHD6w6CzsoseCql5pv VZOgvCpFsxjRNGUB+HJZoJoNRG7MmHaY638pGHGMiVbskT9Ww3emtMLdTKy1rQcj 9XO/4mlaBGD79wYxy5Hlysbh2UYuQRv8XN5V46Uugk6sC/i2G7VC8KkqPQ2pM+rA LaeWSuN9dfBwiKcHtJssMP95ilsXsjqh3NoVuFODDXHv3u+nBAxtg2TnLZFkDZXH FvxPJu8BAoGBANwWWzvl/dnTtVDFbhF/61v3AVZV4vVpokXRJKny05CZPZWROTc4 LXMVw9kxeecNdo0yR4jn1yyNUmKnQpTmpsR9Yo9NYH+Z1CLxswpc7ILfVRZBK6bK cCG43lM5xZprG6FXhqkHN2u9z5Y8/PuaMzC8iVs402/gakgPKmn8OjdhAoGBAMlQ mmrx24n9YY/dOn55XC5V/iN3Z6mIsHThnDIU515bwLwZVG7chOLSiWHAh4JzUH+v bV3NnlE1jhf5ln0WAadCtIeVprJG6msNTQlbTMTTV5kVNdbfYX6sFQEI+hC1LCiV yJtuNIa4P5W/PtoC3FWUlcAH8C91S/M4CeZZ0HhlAoGBAIxflgE2SBrO9S53PiTb OfqGKMwwK3nrzhxJsODUiCwKEUV8Qsn9gr+MekXlUKMV6y9Tily/wnYgDRPvKoBe PK/GaT6NU6cPLka7cj6B1jgCyfpPxs+y/qIDj4n1pxs+hXj6omDcwXRutCBW9eRk DZJgLhuIuxL4R9F+GsdOoLMBAoGAKQn1cLe9OXQd32YJ9p5m3EtLc49z4murDSiw 3sTEJcgukinXvIHX1SV2PCczeLRpRJ5OfUDddVCllt2agAVscNx4UOuA//bU8t3T RoUGMVmkEeDxCMyg42HRJlTeJWnJhryCGK1up8gHrk8+UNMkd43CuVLk88fFo99Y pUzJ4sECgYEAvBDTo3k3sD18qV6p6tQwy+MVjvQb9V81GHP18tYcVKta3LkkqUFa 3qSyVxi7gl3JtynG7NJ7+GDx6zxW2xUR72NTcJwWvesLI+1orM288pyNDVw9MJ/j AyVFnW5SEYEqdizTnQxL+rQB4CyeHfwZx2/1/Qr0ezLGUJv51lnk4mQ= -----END RSA PRIVATE KEY----- __init__.py 0000644 00000000000 15025234504 0006643 0 ustar 00 app.py 0000644 00000031416 15025234504 0005703 0 ustar 00 from __future__ import annotations import collections import contextlib import datetime import email.utils import gzip import mimetypes import zlib from io import BytesIO from pathlib import Path from typing import Iterator import trio from quart import Response, make_response, request from quart.typing import ResponseReturnValue from quart_trio import QuartTrio hypercorn_app = QuartTrio(__name__) # Globals are not safe in Flask/Quart but work for our Hypercorn use case RETRY_TEST_NAMES: collections.Counter[str] = collections.Counter() LAST_RETRY_AFTER_REQ: datetime.datetime = datetime.datetime.min pyodide_testing_app = QuartTrio(__name__) DEFAULT_HEADERS = [ # Allow cross-origin requests for emscripten ("Access-Control-Allow-Origin", "*"), ("Cross-Origin-Opener-Policy", "same-origin"), ("Cross-Origin-Embedder-Policy", "require-corp"), ("Feature-Policy", "sync-xhr *;"), ("Access-Control-Allow-Headers", "*"), ] @hypercorn_app.route("/") @pyodide_testing_app.route("/") @pyodide_testing_app.route("/index") async def index() -> ResponseReturnValue: return await make_response("Dummy server!") @hypercorn_app.route("/alpn_protocol") async def alpn_protocol() -> ResponseReturnValue: """Return the requester's certificate.""" alpn_protocol = request.scope["extensions"]["tls"]["alpn_protocol"] return await make_response(alpn_protocol) @hypercorn_app.route("/certificate") async def certificate() -> ResponseReturnValue: """Return the requester's certificate.""" print("scope", request.scope) subject = request.scope["extensions"]["tls"]["client_cert_name"] subject_as_dict = dict(part.split("=") for part in subject.split(", ")) return await make_response(subject_as_dict) @hypercorn_app.route("/specific_method", methods=["GET", "POST", "PUT"]) @pyodide_testing_app.route("/specific_method", methods=["GET", "POST", "PUT"]) async def specific_method() -> ResponseReturnValue: "Confirm that the request matches the desired method type" method_param = (await request.values).get("method", "") if request.method.upper() == method_param.upper(): return await make_response("", 200) else: return await make_response( f"Wrong method: {method_param} != {request.method}", 400 ) @hypercorn_app.route("/upload", methods=["POST"]) async def upload() -> ResponseReturnValue: "Confirm that the uploaded file conforms to specification" params = await request.form param = params.get("upload_param") filename_param = params.get("upload_filename") size = int(params.get("upload_size", "0")) files_ = (await request.files).getlist(param) assert files_ is not None if len(files_) != 1: return await make_response( f"Expected 1 file for '{param}', not {len(files_)}", 400 ) file_ = files_[0] # data is short enough to read synchronously without blocking the event loop with contextlib.closing(file_.stream) as stream: data = stream.read() if int(size) != len(data): return await make_response(f"Wrong size: {int(size)} != {len(data)}", 400) if filename_param != file_.filename: return await make_response( f"Wrong filename: {filename_param} != {file_.filename}", 400 ) return await make_response() @hypercorn_app.route("/chunked") async def chunked() -> ResponseReturnValue: def generate() -> Iterator[str]: for _ in range(4): yield "123" return await make_response(generate()) @hypercorn_app.route("/chunked_gzip") async def chunked_gzip() -> ResponseReturnValue: def generate() -> Iterator[bytes]: compressor = zlib.compressobj(6, zlib.DEFLATED, 16 + zlib.MAX_WBITS) for uncompressed in [b"123"] * 4: yield compressor.compress(uncompressed) yield compressor.flush() return await make_response(generate(), 200, [("Content-Encoding", "gzip")]) @hypercorn_app.route("/keepalive") async def keepalive() -> ResponseReturnValue: if request.args.get("close", b"0") == b"1": headers = [("Connection", "close")] return await make_response("Closing", 200, headers) headers = [("Connection", "keep-alive")] return await make_response("Keeping alive", 200, headers) @hypercorn_app.route("/echo", methods=["GET", "POST", "PUT"]) async def echo() -> ResponseReturnValue: "Echo back the params" if request.method == "GET": return await make_response(request.query_string) return await make_response(await request.get_data()) @hypercorn_app.route("/echo_json", methods=["POST"]) @pyodide_testing_app.route("/echo_json", methods=["POST", "OPTIONS"]) async def echo_json() -> ResponseReturnValue: "Echo back the JSON" if request.method == "OPTIONS": return await make_response("", 200) data = await request.get_data() return await make_response(data, 200, request.headers) @hypercorn_app.route("/echo_uri/<path:rest>") @hypercorn_app.route("/echo_uri", defaults={"rest": ""}) async def echo_uri(rest: str) -> ResponseReturnValue: "Echo back the requested URI" assert request.full_path is not None return await make_response(request.full_path) @hypercorn_app.route("/echo_params") async def echo_params() -> ResponseReturnValue: "Echo back the query parameters" await request.get_data() echod = sorted((k, v) for k, v in request.args.items()) return await make_response(repr(echod)) @hypercorn_app.route("/headers", methods=["GET", "POST"]) async def headers() -> ResponseReturnValue: return await make_response(dict(request.headers.items())) @hypercorn_app.route("/headers_and_params") async def headers_and_params() -> ResponseReturnValue: return await make_response( { "headers": dict(request.headers), "params": request.args, } ) @hypercorn_app.route("/multi_headers", methods=["GET", "POST"]) async def multi_headers() -> ResponseReturnValue: return await make_response({"headers": list(request.headers)}) @hypercorn_app.route("/multi_redirect") async def multi_redirect() -> ResponseReturnValue: "Performs a redirect chain based on ``redirect_codes``" params = request.args codes = params.get("redirect_codes", "200") head, tail = codes.split(",", 1) if "," in codes else (codes, None) assert head is not None status = head if not tail: return await make_response("Done redirecting", status) headers = [("Location", f"/multi_redirect?redirect_codes={tail}")] return await make_response("", status, headers) @hypercorn_app.route("/encodingrequest") async def encodingrequest() -> ResponseReturnValue: "Check for UA accepting gzip/deflate encoding" data = b"hello, world!" encoding = request.headers.get("Accept-Encoding", "") headers = [] if encoding == "gzip": headers = [("Content-Encoding", "gzip")] file_ = BytesIO() with contextlib.closing(gzip.GzipFile("", mode="w", fileobj=file_)) as zipfile: zipfile.write(data) data = file_.getvalue() elif encoding == "deflate": headers = [("Content-Encoding", "deflate")] data = zlib.compress(data) elif encoding == "garbage-gzip": headers = [("Content-Encoding", "gzip")] data = b"garbage" elif encoding == "garbage-deflate": headers = [("Content-Encoding", "deflate")] data = b"garbage" return await make_response(data, 200, headers) @hypercorn_app.route("/redirect", methods=["GET", "POST", "PUT"]) async def redirect() -> ResponseReturnValue: "Perform a redirect to ``target``" values = await request.values target = values.get("target", "/") status = values.get("status", "303 See Other") status_code = status.split(" ")[0] headers = [("Location", target)] return await make_response("", status_code, headers) @hypercorn_app.route("/redirect_after") async def redirect_after() -> ResponseReturnValue: "Perform a redirect to ``target``" params = request.args date = params.get("date") if date: dt = datetime.datetime.fromtimestamp(float(date), tz=datetime.timezone.utc) http_dt = email.utils.format_datetime(dt, usegmt=True) retry_after = str(http_dt) else: retry_after = "1" target = params.get("target", "/") headers = [("Location", target), ("Retry-After", retry_after)] return await make_response("", 303, headers) @hypercorn_app.route("/retry_after") async def retry_after() -> ResponseReturnValue: global LAST_RETRY_AFTER_REQ params = request.args if datetime.datetime.now() - LAST_RETRY_AFTER_REQ < datetime.timedelta(seconds=1): status = params.get("status", "429 Too Many Requests") status_code = status.split(" ")[0] return await make_response("", status_code, [("Retry-After", "1")]) LAST_RETRY_AFTER_REQ = datetime.datetime.now() return await make_response("", 200) @hypercorn_app.route("/status") @pyodide_testing_app.route("/status") async def status() -> ResponseReturnValue: values = await request.values status = values.get("status", "200 OK") status_code = status.split(" ")[0] return await make_response("", status_code) @hypercorn_app.route("/source_address") async def source_address() -> ResponseReturnValue: """Return the requester's IP address.""" return await make_response(request.remote_addr) @hypercorn_app.route("/successful_retry", methods=["GET", "PUT"]) async def successful_retry() -> ResponseReturnValue: """First return an error and then success It's not currently very flexible as the number of retries is hard-coded. """ test_name = request.headers.get("test-name", None) if not test_name: return await make_response("test-name header not set", 400) RETRY_TEST_NAMES[test_name] += 1 if RETRY_TEST_NAMES[test_name] >= 2: return await make_response("Retry successful!", 200) else: return await make_response("need to keep retrying!", 418) @pyodide_testing_app.after_request def apply_caching(response: Response) -> ResponseReturnValue: for header, value in DEFAULT_HEADERS: response.headers[header] = value return response @pyodide_testing_app.route("/slow") async def slow() -> ResponseReturnValue: await trio.sleep(10) return await make_response("TEN SECONDS LATER", 200) @pyodide_testing_app.route("/bigfile") async def bigfile() -> ResponseReturnValue: # great big text file, should force streaming # if supported bigdata = 1048576 * b"WOOO YAY BOOYAKAH" return await make_response(bigdata, 200) @pyodide_testing_app.route("/mediumfile") async def mediumfile() -> ResponseReturnValue: # quite big file bigdata = 1024 * b"WOOO YAY BOOYAKAH" return await make_response(bigdata, 200) @pyodide_testing_app.route("/upload", methods=["POST", "OPTIONS"]) async def pyodide_upload() -> ResponseReturnValue: if request.method == "OPTIONS": return await make_response("", 200) spare_data = await request.get_data(parse_form_data=True) if len(spare_data) != 0: return await make_response("Bad upload data", 404) files = await request.files form = await request.form if form["upload_param"] != "filefield" or form["upload_filename"] != "lolcat.txt": return await make_response("Bad upload form values", 404) if len(files) != 1 or files.get("filefield") is None: return await make_response("Missing file in form", 404) file = files["filefield"] if file.filename != "lolcat.txt": return await make_response(f"File name incorrect {file.name}", 404) with contextlib.closing(file): data = file.read().decode("utf-8") if data != "I'm in ur multipart form-data, hazing a cheezburgr": return await make_response(f"File data incorrect {data}", 200) return await make_response("Uploaded file correct", 200) @pyodide_testing_app.route("/pyodide/<py_file>") async def pyodide(py_file: str) -> ResponseReturnValue: file_path = Path(pyodide_testing_app.config["pyodide_dist_dir"], py_file) if file_path.exists(): mime_type, encoding = mimetypes.guess_type(file_path) if not mime_type: mime_type = "text/plain" return await make_response( file_path.read_bytes(), 200, [("Content-Type", mime_type)] ) else: return await make_response("", 404) @pyodide_testing_app.route("/wheel/dist.whl") async def wheel() -> ResponseReturnValue: # serve our wheel wheel_folder = Path(__file__).parent.parent / "dist" wheels = list(wheel_folder.glob("*.whl")) if len(wheels) > 0: wheel = wheels[0] headers = [("Content-Disposition", f"inline; filename='{wheel.name}'")] resp = await make_response(wheel.read_bytes(), 200, headers) return resp else: return await make_response(f"NO WHEEL IN {wheel_folder}", 404)
| ver. 1.4 |
Github
|
.
| PHP 8.2.28 | Generation time: 0.02 |
proxy
|
phpinfo
|
Settings