diff --git a/.gitignore b/.gitignore index de61154e3e..209b48f61a 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,7 @@ env/ .eggs/ .tox/ .pytest_cache/ +.hypothesis/ .vscode/ .eggs/ diff --git a/requirements-dev.txt b/requirements-dev.txt index 77fedb9716..04330b5602 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -5,3 +5,4 @@ pytest-httpbin==2.1.0 httpbin~=0.10.0 trustme wheel +hypothesis diff --git a/tests/test_hypothesis_auth.py b/tests/test_hypothesis_auth.py new file mode 100644 index 0000000000..99b7cb5a0d --- /dev/null +++ b/tests/test_hypothesis_auth.py @@ -0,0 +1,502 @@ +""" +Hypothesis-based property tests for requests.auth module. + +These tests use property-based testing to verify the invariants and properties +of authentication classes and functions. +""" + +import base64 +import re + +import pytest +from hypothesis import assume, given, settings +from hypothesis import strategies as st + +from requests.auth import HTTPBasicAuth, HTTPDigestAuth, HTTPProxyAuth, _basic_auth_str +from requests.models import PreparedRequest + + +# Strategies for usernames and passwords +safe_text = st.text( + alphabet=st.characters( + min_codepoint=ord(" "), max_codepoint=ord("~"), blacklist_characters=":" + ), + min_size=1, + max_size=50, +) + + +class TestBasicAuthStrProperties: + """Property-based tests for _basic_auth_str function.""" + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_basic_auth_str_format(self, username: str, password: str) -> None: + """_basic_auth_str should return properly formatted Basic auth string.""" + result = _basic_auth_str(username, password) + assert isinstance(result, str) + assert result.startswith("Basic ") + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_basic_auth_str_base64_decodable(self, username: str, password: str) -> None: + """_basic_auth_str should produce valid base64 encoding.""" + result = _basic_auth_str(username, password) + # Extract the base64 part + b64_part = result.replace("Basic ", "") + try: + decoded = base64.b64decode(b64_part) + assert isinstance(decoded, bytes) + # Should contain username and password separated by colon + assert b":" in decoded + except Exception: + # If decoding fails, test fails + pytest.fail("Failed to decode base64") + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_basic_auth_str_contains_credentials(self, username: str, password: str) -> None: + """_basic_auth_str should encode username and password.""" + result = _basic_auth_str(username, password) + b64_part = result.replace("Basic ", "") + decoded = base64.b64decode(b64_part).decode("latin1") + assert username in decoded + assert password in decoded + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=1, max_size=50), st.text(min_size=1, max_size=50)) + def test_basic_auth_str_deterministic(self, username: str, password: str) -> None: + """_basic_auth_str should be deterministic.""" + try: + result1 = _basic_auth_str(username, password) + result2 = _basic_auth_str(username, password) + assert result1 == result2 + except Exception: + # Some characters may cause encoding issues + pass + + +class TestHTTPBasicAuthProperties: + """Property-based tests for HTTPBasicAuth class.""" + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_http_basic_auth_creation(self, username: str, password: str) -> None: + """HTTPBasicAuth should be creatable with username and password.""" + auth = HTTPBasicAuth(username, password) + assert isinstance(auth, HTTPBasicAuth) + assert auth.username == username + assert auth.password == password + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_http_basic_auth_adds_header(self, username: str, password: str) -> None: + """HTTPBasicAuth should add Authorization header to request.""" + auth = HTTPBasicAuth(username, password) + req = PreparedRequest() + req.prepare_method("GET") + req.prepare_url("http://example.com", None) + req.prepare_headers({}) + + result = auth(req) + assert "Authorization" in result.headers + assert result.headers["Authorization"].startswith("Basic ") + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_http_basic_auth_equality(self, username: str, password: str) -> None: + """HTTPBasicAuth instances with same credentials should be equal.""" + auth1 = HTTPBasicAuth(username, password) + auth2 = HTTPBasicAuth(username, password) + assert auth1 == auth2 + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text, safe_text) + def test_http_basic_auth_inequality( + self, username1: str, username2: str, password: str + ) -> None: + """HTTPBasicAuth instances with different credentials should not be equal.""" + assume(username1 != username2) + auth1 = HTTPBasicAuth(username1, password) + auth2 = HTTPBasicAuth(username2, password) + assert auth1 != auth2 + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_http_basic_auth_returns_request(self, username: str, password: str) -> None: + """HTTPBasicAuth should return the request object.""" + auth = HTTPBasicAuth(username, password) + req = PreparedRequest() + req.prepare_method("GET") + req.prepare_url("http://example.com", None) + req.prepare_headers({}) + + result = auth(req) + assert result is req + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_http_basic_auth_ne_operator(self, username: str, password: str) -> None: + """HTTPBasicAuth __ne__ should work correctly.""" + auth1 = HTTPBasicAuth(username, password) + auth2 = HTTPBasicAuth(username, password) + assert not (auth1 != auth2) + + +class TestHTTPProxyAuthProperties: + """Property-based tests for HTTPProxyAuth class.""" + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_http_proxy_auth_creation(self, username: str, password: str) -> None: + """HTTPProxyAuth should be creatable with username and password.""" + auth = HTTPProxyAuth(username, password) + assert isinstance(auth, HTTPProxyAuth) + assert auth.username == username + assert auth.password == password + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_http_proxy_auth_adds_header(self, username: str, password: str) -> None: + """HTTPProxyAuth should add Proxy-Authorization header.""" + auth = HTTPProxyAuth(username, password) + req = PreparedRequest() + req.prepare_method("GET") + req.prepare_url("http://example.com", None) + req.prepare_headers({}) + + result = auth(req) + assert "Proxy-Authorization" in result.headers + assert result.headers["Proxy-Authorization"].startswith("Basic ") + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_http_proxy_auth_is_basic_auth_subclass( + self, username: str, password: str + ) -> None: + """HTTPProxyAuth should be a subclass of HTTPBasicAuth.""" + auth = HTTPProxyAuth(username, password) + assert isinstance(auth, HTTPBasicAuth) + + +class TestHTTPDigestAuthProperties: + """Property-based tests for HTTPDigestAuth class.""" + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_http_digest_auth_creation(self, username: str, password: str) -> None: + """HTTPDigestAuth should be creatable with username and password.""" + auth = HTTPDigestAuth(username, password) + assert isinstance(auth, HTTPDigestAuth) + assert auth.username == username + assert auth.password == password + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_http_digest_auth_has_thread_local(self, username: str, password: str) -> None: + """HTTPDigestAuth should have thread-local storage.""" + auth = HTTPDigestAuth(username, password) + assert hasattr(auth, "_thread_local") + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_http_digest_auth_init_per_thread_state( + self, username: str, password: str + ) -> None: + """HTTPDigestAuth should initialize per-thread state.""" + auth = HTTPDigestAuth(username, password) + auth.init_per_thread_state() + assert hasattr(auth._thread_local, "init") + assert hasattr(auth._thread_local, "last_nonce") + assert hasattr(auth._thread_local, "nonce_count") + assert hasattr(auth._thread_local, "chal") + assert hasattr(auth._thread_local, "pos") + assert hasattr(auth._thread_local, "num_401_calls") + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_http_digest_auth_equality(self, username: str, password: str) -> None: + """HTTPDigestAuth instances with same credentials should be equal.""" + auth1 = HTTPDigestAuth(username, password) + auth2 = HTTPDigestAuth(username, password) + assert auth1 == auth2 + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text, safe_text) + def test_http_digest_auth_inequality( + self, username1: str, username2: str, password: str + ) -> None: + """HTTPDigestAuth instances with different credentials should not be equal.""" + assume(username1 != username2) + auth1 = HTTPDigestAuth(username1, password) + auth2 = HTTPDigestAuth(username2, password) + assert auth1 != auth2 + + +class TestAuthInvariants: + """Test invariants that should hold for authentication classes.""" + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_basic_auth_idempotent(self, username: str, password: str) -> None: + """Applying HTTPBasicAuth multiple times should be idempotent.""" + auth = HTTPBasicAuth(username, password) + req = PreparedRequest() + req.prepare_method("GET") + req.prepare_url("http://example.com", None) + req.prepare_headers({}) + + result1 = auth(req) + auth_header1 = result1.headers["Authorization"] + + # Apply again + result2 = auth(result1) + auth_header2 = result2.headers["Authorization"] + + # Should produce same header + assert auth_header1 == auth_header2 + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_proxy_auth_idempotent(self, username: str, password: str) -> None: + """Applying HTTPProxyAuth multiple times should be idempotent.""" + auth = HTTPProxyAuth(username, password) + req = PreparedRequest() + req.prepare_method("GET") + req.prepare_url("http://example.com", None) + req.prepare_headers({}) + + result1 = auth(req) + auth_header1 = result1.headers["Proxy-Authorization"] + + # Apply again + result2 = auth(result1) + auth_header2 = result2.headers["Proxy-Authorization"] + + # Should produce same header + assert auth_header1 == auth_header2 + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_basic_auth_header_format(self, username: str, password: str) -> None: + """HTTPBasicAuth should produce correctly formatted header.""" + auth = HTTPBasicAuth(username, password) + req = PreparedRequest() + req.prepare_method("GET") + req.prepare_url("http://example.com", None) + req.prepare_headers({}) + + result = auth(req) + auth_header = result.headers["Authorization"] + + # Should match Basic auth format + assert re.match(r"^Basic [A-Za-z0-9+/]+=*$", auth_header) + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_proxy_auth_header_format(self, username: str, password: str) -> None: + """HTTPProxyAuth should produce correctly formatted header.""" + auth = HTTPProxyAuth(username, password) + req = PreparedRequest() + req.prepare_method("GET") + req.prepare_url("http://example.com", None) + req.prepare_headers({}) + + result = auth(req) + auth_header = result.headers["Proxy-Authorization"] + + # Should match Basic auth format + assert re.match(r"^Basic [A-Za-z0-9+/]+=*$", auth_header) + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text, safe_text, safe_text) + def test_different_credentials_different_headers( + self, user1: str, pass1: str, user2: str, pass2: str + ) -> None: + """Different credentials should produce different headers.""" + assume((user1, pass1) != (user2, pass2)) + + auth1 = HTTPBasicAuth(user1, pass1) + auth2 = HTTPBasicAuth(user2, pass2) + + req1 = PreparedRequest() + req1.prepare_method("GET") + req1.prepare_url("http://example.com", None) + req1.prepare_headers({}) + + req2 = PreparedRequest() + req2.prepare_method("GET") + req2.prepare_url("http://example.com", None) + req2.prepare_headers({}) + + result1 = auth1(req1) + result2 = auth2(req2) + + # Different credentials should produce different headers + assert result1.headers["Authorization"] != result2.headers["Authorization"] + + +class TestAuthHeaderEncoding: + """Test encoding properties of auth headers.""" + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=20, + ), + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=20, + ), + ) + def test_ascii_credentials_always_work(self, username: str, password: str) -> None: + """ASCII-only credentials should always work.""" + auth = HTTPBasicAuth(username, password) + req = PreparedRequest() + req.prepare_method("GET") + req.prepare_url("http://example.com", None) + req.prepare_headers({}) + + result = auth(req) + assert "Authorization" in result.headers + # Verify we can decode the header + b64_part = result.headers["Authorization"].replace("Basic ", "") + decoded = base64.b64decode(b64_part) + assert username.encode("latin1") in decoded + assert password.encode("latin1") in decoded + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_basic_auth_str_roundtrip(self, username: str, password: str) -> None: + """Basic auth string should be decodable to recover credentials.""" + auth_str = _basic_auth_str(username, password) + b64_part = auth_str.replace("Basic ", "") + decoded = base64.b64decode(b64_part).decode("latin1") + + # Should be in format "username:password" + parts = decoded.split(":", 1) + assert len(parts) == 2 + assert parts[0] == username + assert parts[1] == password + + +class TestAuthEquality: + """Test equality and inequality operations for auth classes.""" + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_basic_auth_equal_to_itself(self, username: str, password: str) -> None: + """HTTPBasicAuth should be equal to itself.""" + auth = HTTPBasicAuth(username, password) + assert auth == auth + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_proxy_auth_equal_to_itself(self, username: str, password: str) -> None: + """HTTPProxyAuth should be equal to itself.""" + auth = HTTPProxyAuth(username, password) + assert auth == auth + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_digest_auth_equal_to_itself(self, username: str, password: str) -> None: + """HTTPDigestAuth should be equal to itself.""" + auth = HTTPDigestAuth(username, password) + assert auth == auth + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_basic_auth_not_equal_to_none(self, username: str, password: str) -> None: + """HTTPBasicAuth should not be equal to None.""" + auth = HTTPBasicAuth(username, password) + assert auth != None # noqa: E711 + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_basic_auth_not_equal_to_other_type( + self, username: str, password: str + ) -> None: + """HTTPBasicAuth should not be equal to other types.""" + auth = HTTPBasicAuth(username, password) + assert auth != "not an auth object" + assert auth != 123 + assert auth != {} + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_basic_auth_copy_is_equal(self, username: str, password: str) -> None: + """A copy of HTTPBasicAuth should be equal to original.""" + auth1 = HTTPBasicAuth(username, password) + auth2 = HTTPBasicAuth(auth1.username, auth1.password) + assert auth1 == auth2 + + +class TestDigestAuthSpecificProperties: + """Test properties specific to HTTPDigestAuth.""" + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_digest_auth_nonce_count_starts_at_zero( + self, username: str, password: str + ) -> None: + """HTTPDigestAuth nonce_count should start at 0.""" + auth = HTTPDigestAuth(username, password) + auth.init_per_thread_state() + assert auth._thread_local.nonce_count == 0 + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_digest_auth_last_nonce_starts_empty( + self, username: str, password: str + ) -> None: + """HTTPDigestAuth last_nonce should start empty.""" + auth = HTTPDigestAuth(username, password) + auth.init_per_thread_state() + assert auth._thread_local.last_nonce == "" + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_digest_auth_chal_starts_empty(self, username: str, password: str) -> None: + """HTTPDigestAuth chal should start as empty dict.""" + auth = HTTPDigestAuth(username, password) + auth.init_per_thread_state() + assert auth._thread_local.chal == {} + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_digest_auth_pos_starts_none(self, username: str, password: str) -> None: + """HTTPDigestAuth pos should start as None.""" + auth = HTTPDigestAuth(username, password) + auth.init_per_thread_state() + assert auth._thread_local.pos is None + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_digest_auth_num_401_calls_starts_none( + self, username: str, password: str + ) -> None: + """HTTPDigestAuth num_401_calls should start as None.""" + auth = HTTPDigestAuth(username, password) + auth.init_per_thread_state() + assert auth._thread_local.num_401_calls is None + + @settings(max_examples=1000, deadline=None) + @given(safe_text, safe_text) + def test_digest_auth_multiple_init_idempotent( + self, username: str, password: str + ) -> None: + """Calling init_per_thread_state multiple times should be safe.""" + auth = HTTPDigestAuth(username, password) + auth.init_per_thread_state() + auth.init_per_thread_state() + # Should still have all attributes + assert hasattr(auth._thread_local, "init") + assert hasattr(auth._thread_local, "nonce_count") + diff --git a/tests/test_hypothesis_cookies.py b/tests/test_hypothesis_cookies.py new file mode 100644 index 0000000000..acfc6a3c0d --- /dev/null +++ b/tests/test_hypothesis_cookies.py @@ -0,0 +1,524 @@ +""" +Hypothesis-based property tests for requests.cookies module. + +These tests use property-based testing to verify the invariants and properties +of cookie handling classes and functions. +""" + +import http.cookiejar as cookielib +from http.cookies import Morsel + +import pytest +from hypothesis import assume, given, settings +from hypothesis import strategies as st + +from requests.cookies import ( + RequestsCookieJar, + cookiejar_from_dict, + create_cookie, + merge_cookies, +) + + +# Strategies for cookie names and values +cookie_names = st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=30, +) +# Cookie values: empty strings have special handling, so use non-empty values +cookie_values = st.text( + alphabet=st.characters(blacklist_characters='"'), + min_size=1, + max_size=100, +) + + +class TestRequestsCookieJarProperties: + """Property-based tests for RequestsCookieJar class.""" + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, max_size=20)) + def test_cookiejar_from_dict(self, cookies: dict) -> None: + """cookiejar_from_dict should create RequestsCookieJar from dict.""" + jar = cookiejar_from_dict(cookies) + assert isinstance(jar, RequestsCookieJar) + assert len(jar) == len(cookies) + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, max_size=20)) + def test_cookiejar_preserves_values(self, cookies: dict) -> None: + """RequestsCookieJar should preserve cookie values.""" + jar = cookiejar_from_dict(cookies) + for name, value in cookies.items(): + assert jar.get(name) == value + + @settings(max_examples=1000, deadline=None) + @given(cookie_names, cookie_values) + def test_cookiejar_set_get(self, name: str, value: str) -> None: + """Setting and getting cookies should work.""" + jar = RequestsCookieJar() + jar.set(name, value) + assert jar.get(name) == value + + @settings(max_examples=1000, deadline=None) + @given(cookie_names, cookie_values) + def test_cookiejar_setitem_getitem(self, name: str, value: str) -> None: + """Dict-style access should work.""" + jar = RequestsCookieJar() + jar[name] = value + assert jar[name] == value + + @settings(max_examples=1000, deadline=None) + @given(cookie_names, cookie_values) + def test_cookiejar_contains(self, name: str, value: str) -> None: + """'in' operator should work for cookies.""" + jar = RequestsCookieJar() + jar[name] = value + assert name in jar + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, min_size=1, max_size=20)) + def test_cookiejar_keys(self, cookies: dict) -> None: + """keys() should return all cookie names.""" + jar = cookiejar_from_dict(cookies) + keys = jar.keys() + assert len(keys) == len(cookies) + assert all(k in cookies for k in keys) + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, min_size=1, max_size=20)) + def test_cookiejar_values(self, cookies: dict) -> None: + """values() should return all cookie values.""" + jar = cookiejar_from_dict(cookies) + values = jar.values() + assert len(values) == len(cookies) + assert all(v in cookies.values() for v in values) + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, min_size=1, max_size=20)) + def test_cookiejar_items(self, cookies: dict) -> None: + """items() should return name-value pairs.""" + jar = cookiejar_from_dict(cookies) + items = jar.items() + assert len(items) == len(cookies) + assert all(isinstance(item, tuple) and len(item) == 2 for item in items) + + @settings(max_examples=1000, deadline=None) + @given(cookie_names, cookie_values) + def test_cookiejar_delitem(self, name: str, value: str) -> None: + """Deleting cookies should work.""" + jar = RequestsCookieJar() + jar[name] = value + assert name in jar + del jar[name] + assert name not in jar + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, max_size=20)) + def test_cookiejar_len(self, cookies: dict) -> None: + """len() should return number of cookies.""" + jar = cookiejar_from_dict(cookies) + assert len(jar) == len(cookies) + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, max_size=20)) + def test_cookiejar_iteration(self, cookies: dict) -> None: + """Iterating over jar should yield cookies.""" + jar = cookiejar_from_dict(cookies) + count = 0 + for cookie in jar: + count += 1 + assert hasattr(cookie, "name") + assert hasattr(cookie, "value") + assert count == len(cookies) + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, max_size=20)) + def test_cookiejar_copy(self, cookies: dict) -> None: + """copy() should create independent copy.""" + jar = cookiejar_from_dict(cookies) + jar_copy = jar.copy() + assert jar_copy is not jar + assert len(jar_copy) == len(jar) + # Verify values are the same + for name in cookies: + assert jar.get(name) == jar_copy.get(name) + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, max_size=20)) + def test_cookiejar_copy_is_independent(self, cookies: dict) -> None: + """Modifying copy should not affect original.""" + jar = cookiejar_from_dict(cookies) + jar_copy = jar.copy() + jar_copy.set("new_cookie", "new_value") + assert "new_cookie" not in jar + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries(cookie_names, cookie_values, max_size=10), + st.dictionaries(cookie_names, cookie_values, max_size=10), + ) + def test_cookiejar_update(self, cookies1: dict, cookies2: dict) -> None: + """update() should merge cookies.""" + jar = cookiejar_from_dict(cookies1) + jar.update(cookiejar_from_dict(cookies2)) + # All cookies from both dicts should be present + for name in cookies2: + assert name in jar + + +class TestCreateCookieProperties: + """Property-based tests for create_cookie function.""" + + @settings(max_examples=1000, deadline=None) + @given(cookie_names, cookie_values) + def test_create_cookie_basic(self, name: str, value: str) -> None: + """create_cookie should create valid cookie.""" + cookie = create_cookie(name, value) + assert isinstance(cookie, cookielib.Cookie) + assert cookie.name == name + assert cookie.value == value + + @settings(max_examples=1000, deadline=None) + @given(cookie_names, cookie_values) + def test_create_cookie_has_required_attributes(self, name: str, value: str) -> None: + """Created cookie should have all required attributes.""" + cookie = create_cookie(name, value) + assert hasattr(cookie, "name") + assert hasattr(cookie, "value") + assert hasattr(cookie, "domain") + assert hasattr(cookie, "path") + assert hasattr(cookie, "secure") + assert hasattr(cookie, "expires") + + @settings(max_examples=1000, deadline=None) + @given( + cookie_names, + cookie_values, + st.text( + alphabet=st.characters(min_codepoint=ord("a"), max_codepoint=ord("z")), + min_size=1, + max_size=30, + ), + ) + def test_create_cookie_with_domain(self, name: str, value: str, domain: str) -> None: + """create_cookie should accept domain parameter.""" + cookie = create_cookie(name, value, domain=domain) + assert cookie.domain == domain + + @settings(max_examples=1000, deadline=None) + @given(cookie_names, cookie_values, st.text(min_size=1, max_size=50)) + def test_create_cookie_with_path(self, name: str, value: str, path: str) -> None: + """create_cookie should accept path parameter.""" + cookie = create_cookie(name, value, path=path) + assert cookie.path == path + + @settings(max_examples=1000, deadline=None) + @given(cookie_names, cookie_values, st.booleans()) + def test_create_cookie_with_secure(self, name: str, value: str, secure: bool) -> None: + """create_cookie should accept secure parameter.""" + cookie = create_cookie(name, value, secure=secure) + assert cookie.secure == secure + + +class TestCookieJarFromDictProperties: + """Property-based tests for cookiejar_from_dict function.""" + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, max_size=20)) + def test_cookiejar_from_dict_creates_jar(self, cookies: dict) -> None: + """cookiejar_from_dict should create RequestsCookieJar.""" + jar = cookiejar_from_dict(cookies) + assert isinstance(jar, RequestsCookieJar) + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, max_size=20)) + def test_cookiejar_from_dict_preserves_all_cookies(self, cookies: dict) -> None: + """All cookies from dict should be in jar.""" + jar = cookiejar_from_dict(cookies) + for name, value in cookies.items(): + assert jar.get(name) == value + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries(cookie_names, cookie_values, max_size=10), + st.dictionaries(cookie_names, cookie_values, max_size=10), + ) + def test_cookiejar_from_dict_with_existing_jar( + self, cookies1: dict, cookies2: dict + ) -> None: + """cookiejar_from_dict should add to existing jar.""" + jar = cookiejar_from_dict(cookies1) + result = cookiejar_from_dict(cookies2, cookiejar=jar) + # Should be the same jar + assert result is jar + # Should have cookies from both dicts + for name in cookies2: + assert name in result + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries(cookie_names, cookie_values, min_size=1, max_size=10), + st.dictionaries(cookie_names, cookie_values, max_size=10), + ) + def test_cookiejar_from_dict_overwrite( + self, cookies1: dict, cookies2: dict + ) -> None: + """cookiejar_from_dict with overwrite=True should replace cookies.""" + jar = cookiejar_from_dict(cookies1) + # Add cookies from dict2 with same names + result = cookiejar_from_dict(cookies2, cookiejar=jar, overwrite=True) + # Cookies from dict2 should be present + for name, value in cookies2.items(): + assert result.get(name) == value + + +class TestMergeCookiesProperties: + """Property-based tests for merge_cookies function.""" + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries(cookie_names, cookie_values, max_size=10), + st.dictionaries(cookie_names, cookie_values, max_size=10), + ) + def test_merge_cookies_from_dicts(self, cookies1: dict, cookies2: dict) -> None: + """merge_cookies should merge two dicts into jar.""" + jar = cookiejar_from_dict(cookies1) + result = merge_cookies(jar, cookies2) + # Should return a jar + assert isinstance(result, cookielib.CookieJar) + # Should contain cookies from both + for name in cookies2: + # Cookie should be in the jar + found = any(c.name == name for c in result) + assert found + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries(cookie_names, cookie_values, max_size=10), + st.dictionaries(cookie_names, cookie_values, max_size=10), + ) + def test_merge_cookies_from_jars(self, cookies1: dict, cookies2: dict) -> None: + """merge_cookies should merge two jars.""" + jar1 = cookiejar_from_dict(cookies1) + jar2 = cookiejar_from_dict(cookies2) + result = merge_cookies(jar1, jar2) + assert isinstance(result, cookielib.CookieJar) + + def test_merge_cookies_raises_on_non_jar(self) -> None: + """merge_cookies should raise ValueError if first arg is not a jar.""" + with pytest.raises(ValueError): + merge_cookies({}, {}) + + +class TestRequestsCookieJarDictInterface: + """Test dict-like interface of RequestsCookieJar.""" + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, min_size=1, max_size=20)) + def test_cookiejar_dict_conversion(self, cookies: dict) -> None: + """RequestsCookieJar should be convertible to dict.""" + jar = cookiejar_from_dict(cookies) + result = dict(jar.items()) + assert isinstance(result, dict) + # All original cookies should be in result + for name, value in cookies.items(): + assert result.get(name) == value + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, max_size=20)) + def test_cookiejar_get_dict(self, cookies: dict) -> None: + """get_dict() should return plain dict.""" + jar = cookiejar_from_dict(cookies) + result = jar.get_dict() + assert isinstance(result, dict) + assert len(result) == len(cookies) + + @settings(max_examples=1000, deadline=None) + @given(cookie_names, cookie_values, cookie_values) + def test_cookiejar_get_with_default( + self, name: str, value: str, default: str + ) -> None: + """get() should return default for missing cookies.""" + jar = RequestsCookieJar() + assert jar.get(name, default) == default + jar.set(name, value) + assert jar.get(name, default) == value + + @settings(max_examples=1000, deadline=None) + @given( + cookie_names, + cookie_values, + st.text( + alphabet=st.characters(min_codepoint=ord("a"), max_codepoint=ord("z")), + min_size=1, + max_size=20, + ), + ) + def test_cookiejar_get_with_domain( + self, name: str, value: str, domain: str + ) -> None: + """get() should support domain parameter.""" + jar = RequestsCookieJar() + jar.set(name, value, domain=domain) + result = jar.get(name, domain=domain) + assert result == value + + @settings(max_examples=1000, deadline=None) + @given(cookie_names, cookie_values, st.text(min_size=1, max_size=20)) + def test_cookiejar_get_with_path(self, name: str, value: str, path: str) -> None: + """get() should support path parameter.""" + jar = RequestsCookieJar() + jar.set(name, value, path=path) + result = jar.get(name, path=path) + assert result == value + + +class TestRequestsCookieJarInvariants: + """Test invariants that should always hold for RequestsCookieJar.""" + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, max_size=20)) + def test_cookiejar_is_cookiejar(self, cookies: dict) -> None: + """RequestsCookieJar should be a CookieJar.""" + jar = cookiejar_from_dict(cookies) + assert isinstance(jar, cookielib.CookieJar) + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, max_size=20)) + def test_cookiejar_len_equals_item_count(self, cookies: dict) -> None: + """len() should equal number of items.""" + jar = cookiejar_from_dict(cookies) + assert len(jar) == len(list(jar)) + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, max_size=20)) + def test_cookiejar_keys_values_same_length(self, cookies: dict) -> None: + """keys() and values() should have same length.""" + jar = cookiejar_from_dict(cookies) + assert len(jar.keys()) == len(jar.values()) + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, max_size=20)) + def test_cookiejar_items_length_equals_len(self, cookies: dict) -> None: + """items() length should equal len().""" + jar = cookiejar_from_dict(cookies) + assert len(jar.items()) == len(jar) + + @settings(max_examples=1000, deadline=None) + @given(cookie_names, cookie_values, cookie_values) + def test_cookiejar_set_get_roundtrip( + self, name: str, value1: str, value2: str + ) -> None: + """Setting a value and getting it should return the same value.""" + jar = RequestsCookieJar() + jar.set(name, value1) + assert jar.get(name) == value1 + # Update value + jar.set(name, value2) + assert jar.get(name) == value2 + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, min_size=1, max_size=20)) + def test_cookiejar_contains_all_set_cookies(self, cookies: dict) -> None: + """All set cookies should be in the jar.""" + jar = RequestsCookieJar() + for name, value in cookies.items(): + jar.set(name, value) + for name in cookies: + assert name in jar + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(cookie_names, cookie_values, max_size=20)) + def test_cookiejar_pickleable_roundtrip(self, cookies: dict) -> None: + """RequestsCookieJar should be pickleable.""" + import pickle + + jar = cookiejar_from_dict(cookies) + state = jar.__getstate__() + new_jar = RequestsCookieJar() + new_jar.__setstate__(state) + # Should have same cookies + assert len(new_jar) == len(jar) + + +class TestCookieSetNoneValue: + """Test setting None as cookie value.""" + + @settings(max_examples=1000, deadline=None) + @given(cookie_names) + def test_set_none_removes_cookie(self, name: str) -> None: + """Setting cookie to None should remove it.""" + jar = RequestsCookieJar() + jar.set(name, "some_value") + assert name in jar + jar.set(name, None) + assert name not in jar + + @settings(max_examples=1000, deadline=None) + @given(cookie_names, cookie_values) + def test_set_none_on_nonexistent_cookie(self, name: str, value: str) -> None: + """Setting None on nonexistent cookie should not raise.""" + jar = RequestsCookieJar() + jar.set(name, None) # Should not raise + assert name not in jar + + +class TestCookieJarListMethods: + """Test list_* methods of RequestsCookieJar.""" + + @settings(max_examples=1000, deadline=None) + @given( + st.lists( + st.tuples( + cookie_names, + cookie_values, + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=20, + ), + ), + min_size=1, + max_size=10, + ) + ) + def test_list_domains(self, cookies_with_domains: list) -> None: + """list_domains() should return all unique domains.""" + jar = RequestsCookieJar() + domains = set() + for name, value, domain in cookies_with_domains: + jar.set(name, value, domain=domain) + domains.add(domain) + result_domains = jar.list_domains() + assert len(result_domains) >= 1 + # All domains should be represented + for domain in domains: + assert domain in result_domains + + @settings(max_examples=1000, deadline=None) + @given( + st.lists( + st.tuples(cookie_names, cookie_values, st.text(min_size=1, max_size=20)), + min_size=1, + max_size=10, + ) + ) + def test_list_paths(self, cookies_with_paths: list) -> None: + """list_paths() should return all unique paths.""" + jar = RequestsCookieJar() + paths = set() + for name, value, path in cookies_with_paths: + jar.set(name, value, path=path) + paths.add(path) + result_paths = jar.list_paths() + assert len(result_paths) >= 1 + # All paths should be represented + for path in paths: + assert path in result_paths + diff --git a/tests/test_hypothesis_models.py b/tests/test_hypothesis_models.py new file mode 100644 index 0000000000..841100e42b --- /dev/null +++ b/tests/test_hypothesis_models.py @@ -0,0 +1,537 @@ +""" +Hypothesis-based property tests for requests.models module. + +These tests use property-based testing to verify the invariants and properties +of Request, PreparedRequest, and Response classes. +""" + +import json +from io import BytesIO + +import pytest +from hypothesis import assume, given, settings +from hypothesis import strategies as st + +from requests.exceptions import InvalidJSONError, InvalidURL, MissingSchema +from requests.models import PreparedRequest, Request, Response +from requests.structures import CaseInsensitiveDict + + +# Custom strategies for HTTP methods and URLs +http_methods = st.sampled_from(["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"]) +valid_schemes = st.sampled_from(["http", "https"]) +valid_domains = st.text( + alphabet=st.characters(min_codepoint=ord("a"), max_codepoint=ord("z")), + min_size=1, + max_size=30, +) +valid_paths = st.text( + alphabet=st.characters(min_codepoint=ord("a"), max_codepoint=ord("z")) | st.just("/"), + min_size=0, + max_size=50, +) + + +@st.composite +def valid_urls(draw): + """Strategy for generating valid URLs.""" + scheme = draw(valid_schemes) + domain = draw(valid_domains) + path = draw(valid_paths) + return f"{scheme}://{domain}.com/{path}" + + +class TestRequestProperties: + """Property-based tests for Request class.""" + + @settings(max_examples=1000, deadline=None) + @given(http_methods, valid_urls()) + def test_request_creation(self, method: str, url: str) -> None: + """Request should be creatable with method and URL.""" + req = Request(method=method, url=url) + assert isinstance(req, Request) + assert req.method == method + assert req.url == url + + @settings(max_examples=1000, deadline=None) + @given( + http_methods, + valid_urls(), + st.dictionaries( + st.text(min_size=1, max_size=20), st.text(min_size=0, max_size=100), max_size=10 + ), + ) + def test_request_with_headers(self, method: str, url: str, headers: dict) -> None: + """Request should accept headers.""" + req = Request(method=method, url=url, headers=headers) + assert req.headers == headers + + @settings(max_examples=1000, deadline=None) + @given( + http_methods, + valid_urls(), + st.dictionaries(st.text(min_size=1, max_size=20), st.text(min_size=0, max_size=100)), + ) + def test_request_with_params(self, method: str, url: str, params: dict) -> None: + """Request should accept params.""" + req = Request(method=method, url=url, params=params) + assert req.params == params + + @settings(max_examples=1000, deadline=None) + @given(http_methods, valid_urls()) + def test_request_prepare_returns_prepared_request(self, method: str, url: str) -> None: + """Request.prepare() should return PreparedRequest.""" + req = Request(method=method, url=url) + prepared = req.prepare() + assert isinstance(prepared, PreparedRequest) + + @settings(max_examples=1000, deadline=None) + @given(http_methods, valid_urls()) + def test_request_repr(self, method: str, url: str) -> None: + """Request repr should include method.""" + req = Request(method=method, url=url) + repr_str = repr(req) + assert isinstance(repr_str, str) + assert method in repr_str + assert "Request" in repr_str + + @settings(max_examples=1000, deadline=None) + @given( + http_methods, + valid_urls(), + st.text(min_size=0, max_size=100), + ) + def test_request_with_data(self, method: str, url: str, data: str) -> None: + """Request should accept data.""" + req = Request(method=method, url=url, data=data) + assert req.data == data + + +class TestPreparedRequestProperties: + """Property-based tests for PreparedRequest class.""" + + @settings(max_examples=1000, deadline=None) + @given(http_methods) + def test_prepared_request_method_normalization(self, method: str) -> None: + """PreparedRequest should normalize method to uppercase.""" + preq = PreparedRequest() + preq.prepare_method(method) + assert preq.method == method.upper() + + @settings(max_examples=1000, deadline=None) + @given(valid_urls()) + def test_prepared_request_url(self, url: str) -> None: + """PreparedRequest should accept and store URL.""" + preq = PreparedRequest() + try: + preq.prepare_url(url, None) + assert preq.url is not None + assert isinstance(preq.url, str) + except (InvalidURL, MissingSchema): + # Some generated URLs may be invalid + pass + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters(min_codepoint=ord("a"), max_codepoint=ord("z")), + min_size=1, + max_size=20, + ), + st.text(min_size=0, max_size=100), + max_size=10, + ) + ) + def test_prepared_request_headers(self, headers: dict) -> None: + """PreparedRequest should store headers as CaseInsensitiveDict.""" + preq = PreparedRequest() + try: + preq.prepare_headers(headers) + assert isinstance(preq.headers, CaseInsensitiveDict) + for key, value in headers.items(): + assert key in preq.headers or key.lower() in preq.headers + except Exception: + # Some header values might be invalid + pass + + @settings(max_examples=1000, deadline=None) + @given(http_methods, valid_urls()) + def test_prepared_request_copy(self, method: str, url: str) -> None: + """PreparedRequest.copy() should create independent copy.""" + preq = PreparedRequest() + preq.prepare_method(method) + try: + preq.prepare_url(url, None) + copy = preq.copy() + assert copy is not preq + assert copy.method == preq.method + assert copy.url == preq.url + except (InvalidURL, MissingSchema): + pass + + @settings(max_examples=1000, deadline=None) + @given(http_methods) + def test_prepared_request_repr(self, method: str) -> None: + """PreparedRequest repr should include method.""" + preq = PreparedRequest() + preq.prepare_method(method) + repr_str = repr(preq) + assert isinstance(repr_str, str) + assert method.upper() in repr_str + assert "PreparedRequest" in repr_str + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text(min_size=1, max_size=20), st.text(min_size=0, max_size=50), max_size=5 + ) + ) + def test_prepared_request_json_body(self, data: dict) -> None: + """PreparedRequest should handle JSON data.""" + preq = PreparedRequest() + try: + preq.prepare_body(data=None, files=None, json=data) + assert preq.body is not None + # Body should be valid JSON + parsed = json.loads(preq.body) + assert parsed == data + except (InvalidJSONError, TypeError): + # Some data might not be JSON serializable + pass + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=0, max_size=100)) + def test_prepared_request_string_body(self, data: str) -> None: + """PreparedRequest should handle string data.""" + preq = PreparedRequest() + preq.prepare_headers({}) # Headers must be initialized first + preq.prepare_body(data=data, files=None, json=None) + # String data should be encoded + assert preq.body is not None or data == "" + + @settings(max_examples=1000, deadline=None) + @given( + http_methods, + valid_urls(), + st.dictionaries(st.text(min_size=1, max_size=20), st.text(min_size=0, max_size=50)), + ) + def test_prepared_request_params_encoding( + self, method: str, url: str, params: dict + ) -> None: + """PreparedRequest should encode params into URL.""" + preq = PreparedRequest() + preq.prepare_method(method) + try: + preq.prepare_url(url, params) + if params: + # URL should contain encoded params + assert "?" in preq.url or not params + except (InvalidURL, MissingSchema): + pass + + +class TestResponseProperties: + """Property-based tests for Response class.""" + + @settings(max_examples=1000, deadline=None) + @given(st.integers(min_value=100, max_value=599)) + def test_response_status_code(self, status_code: int) -> None: + """Response should accept valid HTTP status codes.""" + resp = Response() + resp.status_code = status_code + assert resp.status_code == status_code + + @settings(max_examples=1000, deadline=None) + @given(st.integers(min_value=200, max_value=399)) + def test_response_ok_for_2xx_3xx(self, status_code: int) -> None: + """Response with 2xx or 3xx status should be ok.""" + resp = Response() + resp.status_code = status_code + resp.url = "http://example.com" + assert resp.ok is True + + @settings(max_examples=1000, deadline=None) + @given(st.integers(min_value=400, max_value=599)) + def test_response_not_ok_for_4xx_5xx(self, status_code: int) -> None: + """Response with 4xx or 5xx status should not be ok.""" + resp = Response() + resp.status_code = status_code + resp.url = "http://example.com" + assert resp.ok is False + + @settings(max_examples=1000, deadline=None) + @given(st.binary(min_size=0, max_size=1000)) + def test_response_content(self, content: bytes) -> None: + """Response should store and return content.""" + resp = Response() + resp._content = content + resp._content_consumed = True + assert resp.content == content + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=0, max_size=100)) + def test_response_text(self, text: str) -> None: + """Response should convert content to text.""" + resp = Response() + resp._content = text.encode("utf-8") + resp._content_consumed = True + resp.encoding = "utf-8" + assert isinstance(resp.text, str) + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters(min_codepoint=ord("a"), max_codepoint=ord("z")), + min_size=1, + max_size=20, + ), + st.one_of(st.text(min_size=0, max_size=50), st.integers(), st.floats(allow_nan=False)), + max_size=5, + ) + ) + def test_response_json(self, data: dict) -> None: + """Response.json() should parse JSON content.""" + resp = Response() + try: + json_str = json.dumps(data) + resp._content = json_str.encode("utf-8") + resp._content_consumed = True + resp.encoding = "utf-8" + parsed = resp.json() + assert parsed == data + except (ValueError, TypeError): + # Some data might not be JSON serializable + pass + + @settings(max_examples=1000, deadline=None) + @given(st.integers(min_value=100, max_value=599)) + def test_response_repr(self, status_code: int) -> None: + """Response repr should include status code.""" + resp = Response() + resp.status_code = status_code + repr_str = repr(resp) + assert isinstance(repr_str, str) + assert str(status_code) in repr_str + assert "Response" in repr_str + + @settings(max_examples=1000, deadline=None) + @given(st.integers(min_value=100, max_value=599)) + def test_response_bool(self, status_code: int) -> None: + """Response bool conversion should match ok property.""" + resp = Response() + resp.status_code = status_code + resp.url = "http://example.com" + assert bool(resp) == resp.ok + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters(min_codepoint=ord("a"), max_codepoint=ord("z")), + min_size=1, + max_size=20, + ), + st.text(min_size=0, max_size=100), + max_size=10, + ) + ) + def test_response_headers(self, headers: dict) -> None: + """Response headers should be CaseInsensitiveDict.""" + resp = Response() + resp.headers = CaseInsensitiveDict(headers) + assert isinstance(resp.headers, CaseInsensitiveDict) + for key, value in headers.items(): + assert resp.headers.get(key.lower()) == value or resp.headers.get(key) == value + + @settings(max_examples=1000, deadline=None) + @given(st.sampled_from([301, 302, 303, 307, 308])) + def test_response_is_redirect(self, status_code: int) -> None: + """Response with redirect status and location should be redirect.""" + resp = Response() + resp.status_code = status_code + resp.headers = CaseInsensitiveDict({"location": "http://example.com/new"}) + assert resp.is_redirect is True + + @settings(max_examples=1000, deadline=None) + @given(st.sampled_from([301, 308])) + def test_response_is_permanent_redirect(self, status_code: int) -> None: + """Response with 301 or 308 and location should be permanent redirect.""" + resp = Response() + resp.status_code = status_code + resp.headers = CaseInsensitiveDict({"location": "http://example.com/new"}) + assert resp.is_permanent_redirect is True + + @settings(max_examples=1000, deadline=None) + @given(st.sampled_from([200, 404, 500])) + def test_response_is_not_redirect(self, status_code: int) -> None: + """Response without redirect status should not be redirect.""" + resp = Response() + resp.status_code = status_code + resp.headers = CaseInsensitiveDict({}) + assert resp.is_redirect is False + + +class TestRequestResponseInvariants: + """Test invariants that should hold across Request/Response interactions.""" + + @settings(max_examples=1000, deadline=None) + @given(http_methods, valid_urls()) + def test_request_prepare_preserves_method(self, method: str, url: str) -> None: + """Preparing a request should preserve method (as uppercase).""" + req = Request(method=method, url=url) + prepared = req.prepare() + assert prepared.method == method.upper() + + @settings(max_examples=1000, deadline=None) + @given(http_methods, valid_urls()) + def test_request_prepare_preserves_url(self, method: str, url: str) -> None: + """Preparing a request should preserve URL.""" + req = Request(method=method, url=url) + try: + prepared = req.prepare() + # URL should be present (may be modified/normalized) + assert prepared.url is not None + assert isinstance(prepared.url, str) + except (InvalidURL, MissingSchema): + pass + + @settings(max_examples=1000, deadline=None) + @given( + http_methods, + valid_urls(), + st.dictionaries( + st.text( + alphabet=st.characters(min_codepoint=ord("a"), max_codepoint=ord("z")), + min_size=1, + max_size=20, + ), + st.text(min_size=0, max_size=100), + max_size=5, + ), + ) + def test_request_prepare_preserves_headers( + self, method: str, url: str, headers: dict + ) -> None: + """Preparing a request should preserve headers.""" + req = Request(method=method, url=url, headers=headers) + try: + prepared = req.prepare() + assert isinstance(prepared.headers, CaseInsensitiveDict) + for key in headers: + # Header should be present (case-insensitively) + assert ( + key in prepared.headers + or key.lower() in prepared.headers + or key.upper() in prepared.headers + ) + except Exception: + pass + + @settings(max_examples=1000, deadline=None) + @given(st.integers(min_value=100, max_value=599)) + def test_response_bool_consistency(self, status_code: int) -> None: + """Response bool and ok property should be consistent.""" + resp = Response() + resp.status_code = status_code + resp.url = "http://example.com" + assert bool(resp) == resp.ok + + @settings(max_examples=1000, deadline=None) + @given(st.binary(min_size=0, max_size=500)) + def test_response_content_idempotent(self, content: bytes) -> None: + """Accessing response.content multiple times should return same value.""" + resp = Response() + resp._content = content + resp._content_consumed = True + first = resp.content + second = resp.content + assert first == second == content + + @settings(max_examples=1000, deadline=None) + @given( + st.integers(min_value=200, max_value=299), + st.text(min_size=0, max_size=100), + ) + def test_response_text_is_unicode(self, status_code: int, text: str) -> None: + """Response.text should always return str (unicode).""" + resp = Response() + resp.status_code = status_code + resp._content = text.encode("utf-8") + resp._content_consumed = True + resp.encoding = "utf-8" + result = resp.text + assert isinstance(result, str) + + +class TestRequestEncodingInvariants: + """Test encoding-related invariants for requests.""" + + @settings(max_examples=1000, deadline=None) + @given( + http_methods, + valid_urls(), + st.dictionaries(st.text(min_size=1, max_size=20), st.text(min_size=0, max_size=50)), + ) + def test_params_in_prepared_url(self, method: str, url: str, params: dict) -> None: + """Params should be encoded in prepared URL.""" + assume(len(params) > 0) + req = Request(method=method, url=url, params=params) + try: + prepared = req.prepare() + # If params were provided, URL should be modified + if params: + assert "?" in prepared.url or prepared.url != url + except (InvalidURL, MissingSchema): + pass + + @settings(max_examples=1000, deadline=None) + @given(http_methods, valid_urls(), st.text(min_size=1, max_size=100)) + def test_string_body_is_encoded(self, method: str, url: str, body: str) -> None: + """String body should be encoded in prepared request.""" + req = Request(method=method, url=url, data=body) + try: + prepared = req.prepare() + if body: + assert prepared.body is not None + except (InvalidURL, MissingSchema): + pass + + +class TestPreparedRequestPathURL: + """Test path_url property of PreparedRequest.""" + + @settings(max_examples=1000, deadline=None) + @given(valid_urls()) + def test_path_url_excludes_scheme_and_host(self, url: str) -> None: + """path_url should exclude scheme and host.""" + preq = PreparedRequest() + try: + preq.prepare_url(url, None) + path_url = preq.path_url + assert isinstance(path_url, str) + # Should start with / + assert path_url.startswith("/") + # Should not contain :// + assert "://" not in path_url + except (InvalidURL, MissingSchema): + pass + + @settings(max_examples=1000, deadline=None) + @given( + valid_urls(), + st.dictionaries(st.text(min_size=1, max_size=10), st.text(min_size=1, max_size=10)), + ) + def test_path_url_includes_query(self, url: str, params: dict) -> None: + """path_url should include query parameters.""" + assume(len(params) > 0) + preq = PreparedRequest() + try: + preq.prepare_url(url, params) + path_url = preq.path_url + if params: + assert "?" in path_url + except (InvalidURL, MissingSchema): + pass + diff --git a/tests/test_hypothesis_structures.py b/tests/test_hypothesis_structures.py new file mode 100644 index 0000000000..0e0f6598d2 --- /dev/null +++ b/tests/test_hypothesis_structures.py @@ -0,0 +1,613 @@ +""" +Hypothesis-based property tests for requests.structures module. + +These tests use property-based testing to verify the invariants and properties +of data structures like CaseInsensitiveDict and LookupDict. +""" + +from collections.abc import Mapping + +import pytest +from hypothesis import assume, given, settings +from hypothesis import strategies as st + +from requests.structures import CaseInsensitiveDict, LookupDict + + +class TestCaseInsensitiveDictProperties: + """Property-based tests for CaseInsensitiveDict.""" + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(st.text(min_size=1), st.text())) + def test_caseinsensitivedict_creation(self, data: dict) -> None: + """CaseInsensitiveDict should be creatable from dict.""" + cid = CaseInsensitiveDict(data) + assert isinstance(cid, CaseInsensitiveDict) + # Length should match unique case-insensitive keys + unique_keys = {k.lower() for k in data.keys()} + assert len(cid) == len(unique_keys) + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + ) + ) + def test_caseinsensitivedict_case_insensitive_get( + self, data: dict + ) -> None: + """CaseInsensitiveDict should be case-insensitive for lookups.""" + cid = CaseInsensitiveDict(data) + for key, value in data.items(): + # Test various case combinations + assert cid[key.lower()] == value + assert cid[key.upper()] == value + assert cid[key] == value + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + ) + ) + def test_caseinsensitivedict_preserves_case(self, data: dict) -> None: + """CaseInsensitiveDict should preserve original key case.""" + cid = CaseInsensitiveDict(data) + keys = list(cid.keys()) + # Keys should maintain their original case + assert all(isinstance(k, str) for k in keys) + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text(min_size=1, max_size=50), st.text(min_size=0, max_size=100) + ) + ) + def test_caseinsensitivedict_len(self, data: dict) -> None: + """CaseInsensitiveDict length should match number of unique case-insensitive keys.""" + cid = CaseInsensitiveDict(data) + # Calculate expected length based on unique case-insensitive keys + unique_keys = {k.lower() for k in data.keys()} + assert len(cid) == len(unique_keys) + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + ) + def test_caseinsensitivedict_setitem_getitem( + self, key: str, value: str + ) -> None: + """Setting and getting items should work case-insensitively.""" + cid = CaseInsensitiveDict() + cid[key] = value + assert cid[key] == value + assert cid[key.lower()] == value + assert cid[key.upper()] == value + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + ) + def test_caseinsensitivedict_delitem( + self, key: str, value: str + ) -> None: + """Deleting items should work case-insensitively.""" + cid = CaseInsensitiveDict() + cid[key] = value + # Delete with different case + del cid[key.upper()] + assert key.lower() not in cid + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + ) + ) + def test_caseinsensitivedict_iteration(self, data: dict) -> None: + """Iterating over CaseInsensitiveDict should yield keys.""" + cid = CaseInsensitiveDict(data) + keys = list(cid) + assert len(keys) == len(data) + assert all(isinstance(k, str) for k in keys) + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + ) + ) + def test_caseinsensitivedict_items(self, data: dict) -> None: + """items() should return key-value pairs.""" + cid = CaseInsensitiveDict(data) + items = list(cid.items()) + assert len(items) == len(data) + assert all(isinstance(item, tuple) and len(item) == 2 for item in items) + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + ) + ) + def test_caseinsensitivedict_lower_items(self, data: dict) -> None: + """lower_items() should return lowercase keys.""" + cid = CaseInsensitiveDict(data) + lower_items = list(cid.lower_items()) + assert all(key.islower() for key, _ in lower_items) + assert len(lower_items) == len(data) + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + ) + ) + def test_caseinsensitivedict_copy(self, data: dict) -> None: + """copy() should create an independent copy.""" + cid = CaseInsensitiveDict(data) + cid_copy = cid.copy() + assert cid == cid_copy + assert cid is not cid_copy + assert isinstance(cid_copy, CaseInsensitiveDict) + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + ) + ) + def test_caseinsensitivedict_equality(self, data: dict) -> None: + """Two CaseInsensitiveDicts with same data should be equal.""" + cid1 = CaseInsensitiveDict(data) + cid2 = CaseInsensitiveDict(data) + assert cid1 == cid2 + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + ) + ) + def test_caseinsensitivedict_equality_with_dict( + self, data: dict + ) -> None: + """CaseInsensitiveDict should equal dict with same data.""" + cid = CaseInsensitiveDict(data) + # Create a regular dict with lowercase keys + lowered_data = {k.lower(): v for k, v in data.items()} + regular_dict = dict(lowered_data) + # They should be equal when comparing case-insensitively + assert cid == CaseInsensitiveDict(regular_dict) + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + ), + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + ), + ) + def test_caseinsensitivedict_update( + self, data1: dict, data2: dict + ) -> None: + """update() should merge dictionaries.""" + cid = CaseInsensitiveDict(data1) + original_len = len(cid) + cid.update(data2) + # Length should be at least the maximum of the two + assert len(cid) >= max(len(data1), len(data2)) + # All keys from data2 should be present + for key in data2: + assert key in cid or key.lower() in cid or key.upper() in cid + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + ) + def test_caseinsensitivedict_contains( + self, key: str, value: str + ) -> None: + """'in' operator should work case-insensitively.""" + cid = CaseInsensitiveDict({key: value}) + assert key in cid + assert key.lower() in cid + assert key.upper() in cid + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + min_size=1, + ) + ) + def test_caseinsensitivedict_repr(self, data: dict) -> None: + """repr() should return a valid string representation.""" + cid = CaseInsensitiveDict(data) + repr_str = repr(cid) + assert isinstance(repr_str, str) + assert len(repr_str) > 0 + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + st.text(min_size=0, max_size=100), + st.text(min_size=0, max_size=100), + ) + def test_caseinsensitivedict_overwrites_on_same_key( + self, key: str, value1: str, value2: str + ) -> None: + """Setting same key (different case) should overwrite.""" + cid = CaseInsensitiveDict() + cid[key.lower()] = value1 + cid[key.upper()] = value2 + # Should have only one entry + assert len(cid) == 1 + # Should have the last value + assert cid[key] == value2 + + +class TestLookupDictProperties: + """Property-based tests for LookupDict.""" + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=1, max_size=50)) + def test_lookupdict_creation(self, name: str) -> None: + """LookupDict should be creatable with a name.""" + ld = LookupDict(name=name) + assert isinstance(ld, LookupDict) + assert ld.name == name + + @settings(max_examples=1000, deadline=None) + @given( + st.text(min_size=1, max_size=50), + st.text(min_size=1, max_size=50), + st.text(min_size=0, max_size=100), + ) + def test_lookupdict_setattr_getitem( + self, name: str, key: str, value: str + ) -> None: + """LookupDict should allow attribute-style access.""" + # Filter out dunder attributes and 'name' to avoid restricted/reserved attributes + assume(not key.startswith("__") and key != "name") + ld = LookupDict(name=name) + # Set via attribute + setattr(ld, key, value) + # Get via item access + result = ld[key] + assert result == value + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=1, max_size=50), st.text(min_size=1, max_size=50)) + def test_lookupdict_getitem_missing_returns_none( + self, name: str, key: str + ) -> None: + """LookupDict should return None for missing keys.""" + # Filter out 'name' since it's an instance attribute that will be returned + assume(key != "name") + ld = LookupDict(name=name) + result = ld[key] + assert result is None + + @settings(max_examples=1000, deadline=None) + @given( + st.text(min_size=1, max_size=50), + st.text(min_size=1, max_size=50), + st.text(min_size=0, max_size=100), + st.text(min_size=0, max_size=100), + ) + def test_lookupdict_get_method( + self, name: str, key: str, value: str, default: str + ) -> None: + """LookupDict.get() should work like dict.get().""" + # Filter out 'name' since it's an instance attribute + assume(key != "name") + ld = LookupDict(name=name) + # Missing key should return default + assert ld.get(key, default) == default + # Set value + setattr(ld, key, value) + # Now should return value + assert ld.get(key, default) == value + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=1, max_size=50)) + def test_lookupdict_repr(self, name: str) -> None: + """LookupDict repr should include name.""" + ld = LookupDict(name=name) + repr_str = repr(ld) + assert isinstance(repr_str, str) + assert name in repr_str + assert "lookup" in repr_str.lower() + + @settings(max_examples=1000, deadline=None) + @given( + st.text(min_size=1, max_size=50), + st.dictionaries( + st.text(min_size=1, max_size=20).filter(lambda x: not x.startswith("__") and x not in ['name', 'get']), + st.text(min_size=0, max_size=100), + min_size=1, + max_size=10, + ), + ) + def test_lookupdict_multiple_attributes( + self, name: str, attrs: dict + ) -> None: + """LookupDict should handle multiple attributes.""" + ld = LookupDict(name=name) + # Set multiple attributes (filter out dunder/special attributes and method names) + for key, value in attrs.items(): + setattr(ld, key, value) + # Verify all are accessible + for key, value in attrs.items(): + assert ld[key] == value + assert ld.get(key) == value + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=1, max_size=50)) + def test_lookupdict_is_dict_subclass(self, name: str) -> None: + """LookupDict should be a dict subclass.""" + ld = LookupDict(name=name) + assert isinstance(ld, dict) + + @settings(max_examples=1000, deadline=None) + @given( + st.text(min_size=1, max_size=50), + st.text(min_size=1, max_size=50), + st.text(min_size=0, max_size=100), + ) + def test_lookupdict_none_default_behavior( + self, name: str, key: str, value: str + ) -> None: + """LookupDict should return None by default for missing keys.""" + # Filter out dunder attributes and 'name' to avoid restricted/reserved attributes + assume(not key.startswith("__") and key != "name") + ld = LookupDict(name=name) + # Missing key + assert ld.get(key) is None + # With explicit None default + assert ld.get(key, None) is None + # Set value + setattr(ld, key, value) + # Should not be None anymore + assert ld.get(key) is not None + + +class TestCaseInsensitiveDictInvariants: + """Test invariants that should always hold for CaseInsensitiveDict.""" + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=20, + ), + st.text(min_size=0, max_size=50), + ) + ) + def test_caseinsensitivedict_is_mapping(self, data: dict) -> None: + """CaseInsensitiveDict should be a Mapping.""" + cid = CaseInsensitiveDict(data) + assert isinstance(cid, Mapping) + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=20, + ), + st.text(min_size=0, max_size=50), + ) + ) + def test_caseinsensitivedict_keys_values_same_length( + self, data: dict + ) -> None: + """keys() and values() should have same length.""" + cid = CaseInsensitiveDict(data) + assert len(list(cid.keys())) == len(list(cid.values())) + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=20, + ), + st.text(min_size=0, max_size=50), + ) + ) + def test_caseinsensitivedict_consistency_across_operations( + self, data: dict + ) -> None: + """All access methods should be consistent.""" + cid = CaseInsensitiveDict(data) + for key, value in data.items(): + # Different ways to access should give same result + assert cid[key] == value + assert cid.get(key) == value + assert key in cid or key.lower() in cid or key.upper() in cid + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=20, + ), + st.text(min_size=0, max_size=50), + st.text(min_size=0, max_size=50), + ) + def test_caseinsensitivedict_set_get_roundtrip( + self, key: str, value1: str, value2: str + ) -> None: + """Setting a value and getting it should return the same value.""" + cid = CaseInsensitiveDict() + cid[key] = value1 + assert cid[key] == value1 + # Update with different case + cid[key.upper()] = value2 + assert cid[key.lower()] == value2 + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=20, + ), + st.text(min_size=0, max_size=50), + min_size=1, + ) + ) + def test_caseinsensitivedict_copy_is_equal(self, data: dict) -> None: + """A copy should be equal to the original.""" + cid = CaseInsensitiveDict(data) + cid_copy = cid.copy() + assert cid == cid_copy + + @settings(max_examples=1000, deadline=None) + @given( + st.dictionaries( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=20, + ), + st.text(min_size=0, max_size=50), + min_size=1, + ) + ) + def test_caseinsensitivedict_copy_is_independent( + self, data: dict + ) -> None: + """Modifying a copy should not affect the original.""" + cid = CaseInsensitiveDict(data) + cid_copy = cid.copy() + # Modify copy + cid_copy["new_key"] = "new_value" + # Original should not have new key + assert "new_key" not in cid + diff --git a/tests/test_hypothesis_utils.py b/tests/test_hypothesis_utils.py new file mode 100644 index 0000000000..55f57d0e4a --- /dev/null +++ b/tests/test_hypothesis_utils.py @@ -0,0 +1,690 @@ +""" +Hypothesis-based property tests for requests.utils module. + +These tests use property-based testing to automatically generate test cases +and find edge cases that might not be caught by traditional example-based tests. +""" + +import codecs +import os +import socket +import struct +from collections import OrderedDict +from io import BytesIO, StringIO + +import pytest +from hypothesis import assume, given, settings +from hypothesis import strategies as st + +from requests.exceptions import InvalidURL +from requests.structures import CaseInsensitiveDict +from requests.utils import ( + _parse_content_type_header, + address_in_network, + dotted_netmask, + from_key_val_list, + get_encoding_from_headers, + guess_json_utf, + is_ipv4_address, + is_valid_cidr, + iter_slices, + parse_dict_header, + parse_header_links, + parse_list_header, + prepend_scheme_if_needed, + requote_uri, + super_len, + to_key_val_list, + unquote_header_value, + unquote_unreserved, + urldefragauth, +) + + +class TestSuperLenProperties: + """Property-based tests for super_len function.""" + + @settings(max_examples=1000, deadline=None) + @given(st.text(alphabet=st.characters(max_codepoint=127))) + def test_super_len_string_equals_len(self, s: str) -> None: + """super_len of an ASCII string should equal its byte length when encoded.""" + # Note: In urllib3 2.x+, strings are treated as UTF-8 for length calculation + expected = len(s.encode("utf-8")) + assert super_len(s) == expected + + @settings(max_examples=1000, deadline=None) + @given(st.binary()) + def test_super_len_bytes_equals_len(self, b: bytes) -> None: + """super_len of bytes should equal its length.""" + assert super_len(b) == len(b) + + @settings(max_examples=1000, deadline=None) + @given(st.lists(st.integers())) + def test_super_len_list_equals_len(self, lst: list) -> None: + """super_len of a list should equal its length.""" + assert super_len(lst) == len(lst) + + @settings(max_examples=1000, deadline=None) + @given(st.binary()) + def test_super_len_bytesio_equals_len(self, data: bytes) -> None: + """super_len of BytesIO should equal data length.""" + bio = BytesIO(data) + assert super_len(bio) == len(data) + + @settings(max_examples=1000, deadline=None) + @given(st.binary()) + def test_super_len_bytesio_partially_read(self, data: bytes) -> None: + """super_len should account for partially read BytesIO.""" + assume(len(data) > 1) + bio = BytesIO(data) + # Read some bytes + read_amount = len(data) // 2 + bio.read(read_amount) + remaining = len(data) - read_amount + assert super_len(bio) == remaining + + @settings(max_examples=1000, deadline=None) + @given(st.text()) + def test_super_len_stringio(self, s: str) -> None: + """super_len of StringIO should equal string length.""" + sio = StringIO(s) + assert super_len(sio) == len(s) + + +class TestKeyValListProperties: + """Property-based tests for key-value list conversion functions.""" + + @settings(max_examples=1000, deadline=None) + @given(st.lists(st.tuples(st.text(min_size=1), st.text()))) + def test_to_key_val_list_from_list( + self, items: list[tuple[str, str]] + ) -> None: + """to_key_val_list should preserve list of tuples.""" + result = to_key_val_list(items) + assert result == items + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(st.text(min_size=1), st.text())) + def test_to_key_val_list_from_dict(self, d: dict) -> None: + """to_key_val_list should convert dict to list of tuples.""" + result = to_key_val_list(d) + assert isinstance(result, list) + assert set(result) == set(d.items()) + + @settings(max_examples=1000, deadline=None) + @given(st.none()) + def test_to_key_val_list_none(self, value: None) -> None: + """to_key_val_list should return None for None input.""" + assert to_key_val_list(value) is None + + @settings(max_examples=1000, deadline=None) + @given(st.one_of(st.text(), st.integers(), st.booleans())) + def test_to_key_val_list_invalid_types(self, value) -> None: + """to_key_val_list should raise ValueError for invalid types.""" + with pytest.raises(ValueError): + to_key_val_list(value) + + @settings(max_examples=1000, deadline=None) + @given(st.lists(st.tuples(st.text(min_size=1), st.text()), unique_by=lambda x: x[0])) + def test_from_key_val_list_returns_ordered_dict( + self, items: list[tuple[str, str]] + ) -> None: + """from_key_val_list should return OrderedDict.""" + result = from_key_val_list(items) + assert isinstance(result, OrderedDict) + # OrderedDict collapses duplicate keys, so check length and values + assert len(result) == len(items) + for key, value in items: + assert result[key] == value + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(st.text(min_size=1), st.text())) + def test_from_key_val_list_from_dict(self, d: dict) -> None: + """from_key_val_list should work with dict input.""" + result = from_key_val_list(d) + assert isinstance(result, OrderedDict) + assert dict(result) == d + + @settings(max_examples=1000, deadline=None) + @given(st.none()) + def test_from_key_val_list_none(self, value: None) -> None: + """from_key_val_list should return None for None input.""" + assert from_key_val_list(value) is None + + @settings(max_examples=1000, deadline=None) + @given(st.one_of(st.text(), st.integers(), st.booleans())) + def test_from_key_val_list_invalid_types(self, value) -> None: + """from_key_val_list should raise ValueError for invalid types.""" + with pytest.raises(ValueError): + from_key_val_list(value) + + @settings(max_examples=1000, deadline=None) + @given(st.lists(st.tuples(st.text(min_size=1), st.text()), unique_by=lambda x: x[0])) + def test_roundtrip_to_from_key_val_list( + self, items: list[tuple[str, str]] + ) -> None: + """Converting to dict and back should preserve data (with unique keys).""" + result = to_key_val_list(from_key_val_list(items)) + # Result should be equal to items (since we have unique keys) + assert result == items + + +class TestUnquoteHeaderValueProperties: + """Property-based tests for unquote_header_value function.""" + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=1)) + def test_unquote_non_quoted_unchanged(self, value: str) -> None: + """Unquoted values without surrounding quotes should be unchanged.""" + assume(not (value.startswith('"') and value.endswith('"'))) + assert unquote_header_value(value) == value + + @settings(max_examples=1000, deadline=None) + @given(st.text().filter(lambda x: x != '"' and '\\"' not in x)) + def test_unquote_quoted_removes_quotes(self, value: str) -> None: + """Quoted values should have quotes removed.""" + quoted = f'"{value}"' + result = unquote_header_value(quoted) + # The function also processes escape sequences + # Filter out escaped quotes since they can produce quotes in the result + assert not (result.startswith('"') and result.endswith('"')) + + @settings(max_examples=1000, deadline=None) + @given(st.none()) + def test_unquote_none(self, value: None) -> None: + """None input should return None.""" + assert unquote_header_value(value) is None + + +class TestIPv4Properties: + """Property-based tests for IPv4 address validation.""" + + @settings(max_examples=1000, deadline=None) + @given( + st.integers(min_value=0, max_value=255), + st.integers(min_value=0, max_value=255), + st.integers(min_value=0, max_value=255), + st.integers(min_value=0, max_value=255), + ) + def test_valid_ipv4_address( + self, a: int, b: int, c: int, d: int + ) -> None: + """Valid IPv4 addresses should be recognized.""" + ip = f"{a}.{b}.{c}.{d}" + assert is_ipv4_address(ip) + + @settings(max_examples=1000, deadline=None) + @given(st.text().filter(lambda x: "." not in x and not x.isdigit() and "\x00" not in x and not any(c.isspace() for c in x))) + def test_invalid_ipv4_no_dots(self, value: str) -> None: + """Strings without dots (except single numbers) should not be valid IPv4.""" + # Note: Single numbers like "0" are valid shorthand IPs + # Also filter out null characters which cause ValueError + # Also filter out strings containing any whitespace (socket.inet_aton strips whitespace) + assert not is_ipv4_address(value) + + @settings(max_examples=1000, deadline=None) + @given(st.integers(min_value=1, max_value=32)) + def test_dotted_netmask_valid_range(self, mask: int) -> None: + """dotted_netmask should work for valid mask values.""" + result = dotted_netmask(mask) + parts = result.split(".") + assert len(parts) == 4 + assert all(0 <= int(p) <= 255 for p in parts) + + @settings(max_examples=1000, deadline=None) + @given( + st.integers(min_value=0, max_value=255), + st.integers(min_value=0, max_value=255), + st.integers(min_value=0, max_value=255), + st.integers(min_value=0, max_value=255), + st.integers(min_value=1, max_value=32), + ) + def test_valid_cidr( + self, a: int, b: int, c: int, d: int, mask: int + ) -> None: + """Valid CIDR notation should be recognized.""" + cidr = f"{a}.{b}.{c}.{d}/{mask}" + assert is_valid_cidr(cidr) + + @settings(max_examples=1000, deadline=None) + @given(st.text()) + def test_invalid_cidr_no_slash(self, value: str) -> None: + """CIDR without slash should be invalid.""" + assume("/" not in value) + assert not is_valid_cidr(value) + + @settings(max_examples=1000, deadline=None) + @given( + st.integers(min_value=0, max_value=255), + st.integers(min_value=0, max_value=255), + st.integers(min_value=0, max_value=255), + st.integers(min_value=0, max_value=255), + st.integers(min_value=33, max_value=100), + ) + def test_invalid_cidr_mask_too_large( + self, a: int, b: int, c: int, d: int, mask: int + ) -> None: + """CIDR with mask > 32 should be invalid.""" + cidr = f"{a}.{b}.{c}.{d}/{mask}" + assert not is_valid_cidr(cidr) + + +class TestIterSlicesProperties: + """Property-based tests for iter_slices function.""" + + @settings(max_examples=1000, deadline=None) + @given(st.text(), st.integers(min_value=1, max_value=100)) + def test_iter_slices_covers_all_content( + self, text: str, slice_length: int + ) -> None: + """iter_slices should return all content when joined.""" + result = "".join(iter_slices(text, slice_length)) + assert result == text + + @settings(max_examples=1000, deadline=None) + @given(st.binary(), st.integers(min_value=1, max_value=100)) + def test_iter_slices_bytes_covers_all( + self, data: bytes, slice_length: int + ) -> None: + """iter_slices should return all bytes content when joined.""" + result = b"".join(iter_slices(data, slice_length)) + assert result == data + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=1), st.integers(min_value=1, max_value=10)) + def test_iter_slices_max_slice_size( + self, text: str, slice_length: int + ) -> None: + """No slice should exceed the specified length.""" + slices = list(iter_slices(text, slice_length)) + for s in slices[:-1]: # All but last + assert len(s) == slice_length + # Last slice can be shorter + if slices: + assert len(slices[-1]) <= slice_length + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=1)) + def test_iter_slices_none_length(self, text: str) -> None: + """None or invalid slice_length should return full string (non-empty).""" + result = list(iter_slices(text, None)) + assert len(result) == 1 + assert result[0] == text + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=1)) + def test_iter_slices_zero_length(self, text: str) -> None: + """Zero slice_length should return full string (non-empty).""" + result = list(iter_slices(text, 0)) + assert len(result) == 1 + assert result[0] == text + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=1)) + def test_iter_slices_negative_length(self, text: str) -> None: + """Negative slice_length should return full string (non-empty).""" + result = list(iter_slices(text, -1)) + assert len(result) == 1 + assert result[0] == text + + +class TestGuessJSONUTFProperties: + """Property-based tests for guess_json_utf function.""" + + @settings(max_examples=1000, deadline=None) + @given(st.sampled_from(["utf-8", "utf-16", "utf-32"])) + def test_guess_json_utf_recognizes_encoding( + self, encoding: str + ) -> None: + """guess_json_utf should recognize common JSON encodings.""" + data = "{}".encode(encoding) + result = guess_json_utf(data) + # Result should be related to the encoding + assert result is not None + assert encoding.split("-")[0] in result + + @settings(max_examples=1000, deadline=None) + @given(st.binary(min_size=4, max_size=4)) + def test_guess_json_utf_returns_string_or_none( + self, data: bytes + ) -> None: + """guess_json_utf should return str or None.""" + result = guess_json_utf(data) + assert result is None or isinstance(result, str) + + +class TestURLDefragAuthProperties: + """Property-based tests for urldefragauth function.""" + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=20, + ), + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=20, + ), + ) + def test_urldefragauth_removes_fragment( + self, path: str, fragment: str + ) -> None: + """urldefragauth should remove fragments.""" + url = f"http://example.com/{path}#{fragment}" + result = urldefragauth(url) + assert "#" not in result + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=5, + max_size=20, + ), + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=5, + max_size=20, + ), + ) + def test_urldefragauth_removes_auth( + self, user: str, password: str + ) -> None: + """urldefragauth should remove authentication.""" + # Use longer strings to avoid substring collisions with domain + assume("example" not in user and "example" not in password) + assume("com" not in user and "com" not in password) + assume("path" not in user and "path" not in password) + url = f"http://{user}:{password}@example.com/path" + result = urldefragauth(url) + # Check that the auth part is removed (@ should not be in result) + assert "@" not in result or not result.startswith("http://") + + +class TestRequoteURIProperties: + """Property-based tests for requote_uri function.""" + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=100, + ) + ) + def test_requote_uri_idempotent(self, path: str) -> None: + """requote_uri should be idempotent for safe strings.""" + url = f"http://example.com/{path}" + first = requote_uri(url) + second = requote_uri(first) + assert first == second + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=1)) + def test_requote_uri_returns_string(self, path: str) -> None: + """requote_uri should always return a string.""" + try: + url = f"http://example.com/{path}" + result = requote_uri(url) + assert isinstance(result, str) + except InvalidURL: + # Some paths may cause InvalidURL, which is acceptable + pass + + +class TestUnquoteUnreservedProperties: + """Property-based tests for unquote_unreserved function.""" + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ) + ) + def test_unquote_unreserved_returns_string(self, uri: str) -> None: + """unquote_unreserved should return a string.""" + try: + result = unquote_unreserved(uri) + assert isinstance(result, str) + except InvalidURL: + # Invalid percent-escape sequences may raise InvalidURL + pass + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet="abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-._~", + min_size=1, + max_size=50, + ) + ) + def test_unquote_unreserved_unreserved_chars_unchanged( + self, uri: str + ) -> None: + """Unreserved characters should remain unchanged.""" + result = unquote_unreserved(uri) + assert result == uri + + +class TestPreprendSchemeIfNeededProperties: + """Property-based tests for prepend_scheme_if_needed function.""" + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ) + ) + def test_prepend_scheme_adds_scheme_if_missing( + self, domain: str + ) -> None: + """prepend_scheme_if_needed should add scheme if missing.""" + url = f"{domain}.com/path" + result = prepend_scheme_if_needed(url, "http") + assert result.startswith("http://") + + @settings(max_examples=1000, deadline=None) + @given( + st.sampled_from(["http", "https", "ftp"]), + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=50, + ), + ) + def test_prepend_scheme_preserves_existing_scheme( + self, scheme: str, domain: str + ) -> None: + """prepend_scheme_if_needed should not replace existing scheme.""" + url = f"{scheme}://{domain}.com/path" + result = prepend_scheme_if_needed(url, "ftp") + assert result.startswith(f"{scheme}://") + + +class TestParseHeaderLinksProperties: + """Property-based tests for parse_header_links function.""" + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=0, max_size=0)) + def test_parse_header_links_empty_string(self, value: str) -> None: + """Empty string should return empty list.""" + assert parse_header_links(value) == [] + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=100, + ) + ) + def test_parse_header_links_returns_list(self, url: str) -> None: + """parse_header_links should return a list.""" + link = f"" + result = parse_header_links(link) + assert isinstance(result, list) + + +class TestParseDictHeaderProperties: + """Property-based tests for parse_dict_header function.""" + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=20, + ), + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=20, + ), + ) + def test_parse_dict_header_simple_pair( + self, key: str, value: str + ) -> None: + """parse_dict_header should parse simple key=value pairs (ASCII only).""" + header = f'{key}="{value}"' + result = parse_dict_header(header) + assert isinstance(result, dict) + assert key in result + + @settings(max_examples=1000, deadline=None) + @given(st.text(min_size=1, max_size=20)) + def test_parse_dict_header_no_value(self, key: str) -> None: + """parse_dict_header should handle keys without values.""" + assume("=" not in key and "," not in key) + result = parse_dict_header(key) + assert isinstance(result, dict) + assert result.get(key) is None + + +class TestParseListHeaderProperties: + """Property-based tests for parse_list_header function.""" + + @settings(max_examples=1000, deadline=None) + @given( + st.lists( + st.text( + alphabet=st.characters( + min_codepoint=ord("a"), max_codepoint=ord("z") + ), + min_size=1, + max_size=20, + ), + min_size=1, + max_size=10, + ) + ) + def test_parse_list_header_returns_list(self, items: list[str]) -> None: + """parse_list_header should return a list.""" + header = ", ".join(items) + result = parse_list_header(header) + assert isinstance(result, list) + assert len(result) == len(items) + + +class TestParseContentTypeHeaderProperties: + """Property-based tests for _parse_content_type_header function.""" + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet="abcdefghijklmnopqrstuvwxyz/", + min_size=3, + max_size=30, + ) + ) + def test_parse_content_type_header_returns_tuple( + self, content_type: str + ) -> None: + """_parse_content_type_header should return a tuple.""" + result = _parse_content_type_header(content_type) + assert isinstance(result, tuple) + assert len(result) == 2 + assert isinstance(result[0], str) + assert isinstance(result[1], dict) + + @settings(max_examples=1000, deadline=None) + @given( + st.text( + alphabet="abcdefghijklmnopqrstuvwxyz/", + min_size=3, + max_size=30, + ), + st.text( + alphabet="abcdefghijklmnopqrstuvwxyz-", + min_size=1, + max_size=20, + ), + ) + def test_parse_content_type_header_with_charset( + self, content_type: str, charset: str + ) -> None: + """_parse_content_type_header should parse charset parameter.""" + header = f"{content_type}; charset={charset}" + result = _parse_content_type_header(header) + assert isinstance(result[1], dict) + if "charset" in result[1]: + assert isinstance(result[1]["charset"], (str, bool)) + + +class TestGetEncodingFromHeadersProperties: + """Property-based tests for get_encoding_from_headers function.""" + + @settings(max_examples=1000, deadline=None) + @given(st.dictionaries(st.text(), st.text())) + def test_get_encoding_from_headers_with_caseinsensitive_dict( + self, headers_dict: dict + ) -> None: + """get_encoding_from_headers should work with CaseInsensitiveDict.""" + headers = CaseInsensitiveDict(headers_dict) + result = get_encoding_from_headers(headers) + assert result is None or isinstance(result, str) + + @settings(max_examples=1000, deadline=None) + @given( + st.sampled_from( + ["utf-8", "iso-8859-1", "utf-16", "ascii", "windows-1252"] + ) + ) + def test_get_encoding_from_headers_with_valid_charset( + self, charset: str + ) -> None: + """get_encoding_from_headers should extract valid charsets.""" + headers = CaseInsensitiveDict( + {"content-type": f"text/html; charset={charset}"} + ) + result = get_encoding_from_headers(headers) + assert result == charset +