diff --git a/core/config.py b/core/config.py index 3eec09b..fa62818 100644 --- a/core/config.py +++ b/core/config.py @@ -11,10 +11,12 @@ class Settings(BaseSettings): ALLOWED_HOSTS: str REFRESH_TOKEN_EXPIRE_DAYS: int ACCESS_TOKEN_EXPIRE_MINUTES: int - DEBUG: bool + DEVELOPMENT: bool + TESTING: bool # Databases POSTGRESQL_URL: str + SQLITE_TEST_URL: str # Media USER_AVATAR_PATH: str diff --git a/core/versioning.py b/core/versioning.py index fda00dc..928aaf4 100644 --- a/core/versioning.py +++ b/core/versioning.py @@ -30,7 +30,7 @@ def __init__( }, } - if settings.DEBUG: + if settings.DEVELOPMENT: self.app = FastAPI(**__kwargs) else: self.app = FastAPI(docs_url=None, redoc_url=None, **__kwargs) diff --git a/db/postgresql.py b/db/postgresql.py index 8d3af23..e9f34d1 100644 --- a/db/postgresql.py +++ b/db/postgresql.py @@ -5,8 +5,11 @@ from core import settings -database = Database(settings.POSTGRESQL_URL) metadata = MetaData() +if not settings.TESTING: + database = Database(url=settings.POSTGRESQL_URL) +else: + database = Database(url=settings.SQLITE_TEST_URL, force_rollback=True) class MainMeta(ModelMeta): @@ -25,11 +28,11 @@ async def connect_to_postgresql(): except Exception as e: logger.error(f"PostgreSQL Connection Failed {e}.") else: - logger.info("PostgreSQL Connected.") + logger.info(f"PostgreSQL Connected.") async def close_postgresql_connection(): """Shutdown Event Handler for Disconnect to PostgreSQL Database""" await database.disconnect() - logger.info("PostgreSQL Closed.") + logger.info(f"PostgreSQL Closed.") diff --git a/requirements.txt b/requirements.txt index 563df35..039f005 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ argon2-cffi==21.3.0 argon2-cffi-bindings==21.2.0 asgiref==3.5.0 asyncpg==0.25.0 +attrs==21.4.0 bcrypt==3.2.0 certifi==2021.10.8 cffi==1.15.0 @@ -19,11 +20,14 @@ email-validator==1.1.3 fastapi==0.75.0 fastapi-utils==0.2.1 greenlet==1.1.2 -h11==0.13.0 +h11==0.12.0 +httpcore==0.14.7 httptools==0.2.0 +httpx==0.22.0 idna==3.3 importlib-metadata==4.11.3 importlib-resources==5.4.0 +iniconfig==1.1.1 itsdangerous==2.1.1 Jinja2==3.1.0 Mako==1.2.0 @@ -31,22 +35,30 @@ MarkupSafe==2.1.1 orjson==3.6.7 orm==0.3.1 ormar==0.10.25 +packaging==21.3 passlib==1.7.4 +pluggy==1.0.0 psycopg2-binary==2.9.3 +py==1.11.0 pyasn1==0.4.8 pycparser==2.21 pydantic==1.9.0 +pyparsing==3.0.7 +pytest==7.1.1 +pytest-asyncio==0.18.3 python-dotenv==0.19.2 python-jose==3.3.0 python-magic==0.4.25 python-multipart==0.0.5 PyYAML==5.4.1 requests==2.27.1 +rfc3986==1.5.0 rsa==4.8 six==1.16.0 sniffio==1.2.0 SQLAlchemy==1.4.31 starlette==0.17.1 +tomli==2.0.1 typesystem==0.3.1 typing-extensions==4.1.1 ujson==4.3.0 diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..6a9fa00 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,5 @@ +[tool:pytest] +asyncio_mode = auto +filterwarnings = + ignore:.*U.*mode is deprecated:DeprecationWarning +addopts = -p no:warnings diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/api/__init__.py b/tests/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/api/v1_0_0/__init__.py b/tests/api/v1_0_0/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/api/v1_0_0/test_user.py b/tests/api/v1_0_0/test_user.py new file mode 100644 index 0000000..4fba175 --- /dev/null +++ b/tests/api/v1_0_0/test_user.py @@ -0,0 +1,123 @@ +from fastapi import status +from httpx import AsyncClient +from typing import Dict, Any +from core import Level +from ...conftest import VERSIONS, FIRST_ADMIN + + +URL_STR: str = VERSIONS.get("1.0.0") + "user/" + + +class TestUserRoutes: + """Test Cases Class for Test APIs of User Entity Model Routes""" + + async def test_login_failed_username_user( + self, + client: AsyncClient, + ): + response = await client.post( + url=URL_STR + "login/", + data={ + "username": "wrongusername", + "password": FIRST_ADMIN.get("password"), + }, + ) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + async def test_login_failed_password_user( + self, + client: AsyncClient, + ): + response = await client.post( + url=URL_STR + "login/", + data={ + "username": FIRST_ADMIN.get("username"), + "password": "wrongpassword", + }, + ) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + async def test_login_successfull_user( + self, + client: AsyncClient, + ): + response = await client.post(url=URL_STR + "login/", data=FIRST_ADMIN) + + assert response.status_code == status.HTTP_200_OK + + content: Dict[str, Any] = response.json() + + assert "token_type" in content + assert "access_token" in content + assert "refresh_token" in content + + async def test_refresh_failed_user( + self, + client: AsyncClient, + ): + response = await client.post( + url=URL_STR + "refresh/", + json={ + "refresh": "fakerefreshtoken", + }, + ) + + assert response.status_code == status.HTTP_400_BAD_REQUEST + + async def test_refresh_successfull_user( + self, + client: AsyncClient, + ): + response = await client.post(url=URL_STR + "login/", data=FIRST_ADMIN) + tokens: Dict[str, Any] = response.json() + + response = await client.post( + url=URL_STR + "refresh/", + json={ + "refresh": tokens.get("refresh_token"), + }, + ) + + assert response.status_code == status.HTTP_200_OK + + content: Dict[str, Any] = response.json() + + assert "token_type" in content + assert "access_token" in content + + async def test_list_user( + self, + client: AsyncClient, + admin_token_headers: Dict[str, str], + ): + response = await client.get(URL_STR, headers=admin_token_headers) + + assert response.status_code == status.HTTP_200_OK + + content: Dict[str, Any] = response.json() + + assert content["count"] == 1 + assert content["next"] is None + assert content["previous"] is None + + assert content["results"][0]["mobile"] == FIRST_ADMIN.get("username") + assert content["results"][0]["level"] == Level.ADMIN.value + assert content["results"][0]["is_active"] is True + + async def test_create_user( + self, + client: AsyncClient, + admin_token_headers: Dict[str, str], + ): + response = await client.post( + url=URL_STR, + headers=admin_token_headers, + json={ + "mobile": "9123456789", + "password": "secretpassword", + }, + ) + + assert response.status_code == status.HTTP_201_CREATED diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..6ed9709 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,86 @@ +from httpx import AsyncClient +from pytest_asyncio import fixture +from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine +from asyncio import get_event_loop +from typing import Dict, Generator +from core import settings, Level +from db import metadata +from models import User +from schemas import UserInDBSchema +from main import app + + +engine: AsyncEngine = create_async_engine(settings.SQLITE_TEST_URL, echo=True) + +FIRST_ADMIN = { + "username": "9928917653", + "password": "sepehrbazyar", +} + +VERSIONS = { + version: f"/v{version}/" for version in [ + "1.0.0", + ] +} + + +@fixture(autouse=True, scope="session") +async def create_test_database(): + """SetUp & TearDown Fixture to Create & Delete Database by Metadata""" + + async with engine.begin() as conn: + await conn.run_sync(metadata.create_all) + yield + await conn.run_sync(metadata.drop_all) + + +@fixture(autouse=True, scope="function") +async def clean_tables_database(): + """Clearing Database Tables to Delete All Rows for Each Test Case""" + + try: + yield + finally: + async with engine.begin() as conn: + for table in reversed(metadata.sorted_tables): + await conn.execute(table.delete()) + + +@fixture(scope="session") +async def client() -> Generator[AsyncClient, None, None]: + """Session Generator Fixture to Yielding Async Client for Requesting APIs""" + + async with AsyncClient(app=app, base_url=settings.BASE_URL) as client: + yield client + + +@fixture(autouse=True, scope="function") +async def insert_first_admin(): + """Fixture to Inserting First Admin User for Access Protected Routes View""" + + form = UserInDBSchema( + level=Level.ADMIN, + mobile=FIRST_ADMIN.get("username"), + password=FIRST_ADMIN.get("password"), + ) + await User.sign_up(form=form) + + +@fixture(scope="function") +async def admin_token_headers(client: AsyncClient) -> Dict[str, str]: + """Sent Authorize Data to Login Super Admin Get Access Token""" + + response = await client.post(f"/user/login/", data=FIRST_ADMIN) + tokens: Dict[str, str] = response.json() + return { + "Authorization": f"Bearer {tokens.get('access_token')}", + } + + +@fixture(autouse=True, scope="session") +def event_loop(): + """Override the Event Loop Tests for Set Async Fixture Tests""" + + loop = get_event_loop() + yield loop + loop.close() diff --git a/tests/models/__init__.py b/tests/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/models/test_user.py b/tests/models/test_user.py new file mode 100644 index 0000000..7bcf354 --- /dev/null +++ b/tests/models/test_user.py @@ -0,0 +1,164 @@ +from pytest import MonkeyPatch, raises +from pydantic import ValidationError +from uuid import UUID +from pathlib import Path +from base64 import b64encode +from core import settings, pwd_context, Level +from models import User +from schemas import ( + UserInDBSchema, + UserUpdateSchema, + ChangePasswordSchema, +) + + +class UserConstants: + """Constants Parameters of User for Inheritanced in Child Test Case Classes""" + + _MOBILE, _PASSWORD = "9123456789", "qwerty1234" + _AVATAR = "iVBORw0KGgoAAAANSUhEUgAAASIAAAEiAQMAAABncE31AAAABlBMVEXMADPMmQAZ5W44AAAFPklEQVR4nJSawa3kNgyGbfgwR5fgFlJAAL9StoQccwgiBznkuCWkhRQQYKeUV8Ic9zBYBpIlm7Qo+YsPgwf6w7OGI1P8SQ7sCiLP8vdHA9gM9dN5b9ya1M8n9WhTy0kt7Scuz/Nem5pfx7J+dKj3sawO9ZDj2e8eVf4MHWqSvLBRvveod/mnHWqUvLBFXl3qme90qEH2u6PIZ5d678vqUmFf2HKx1dTzsuccak0LGyXe6FLvtKwutUhc2P7Zp57R1KVmEXnFZf24o96POyoCEh/7rqnNUuJQ5poi8U1EbRznmsoTu1T6eumL3lHpO3apIT4sPvbzjkpe7VMhAqJd2KCe1tHetUYXrpUfayrtjD61xN951hvHp/a92qfm+NtMeuP4VHo7bqhH8qfeOD71tNHLvabkg7W7cSIVP5cbao/Nc3/jDOMec26oIT1r6m+cYdij3B31JX32N045Ij8aZ2W6NvW/vrSgeHIe6/qlRU2b+o6/tqjHpvz1W4uaN+X731vUsqnfMbSodVN7IrRekLCp/RVanpVN7dXQ+JVG2dS+D419MSWqvEOhsRMfsqn3sbX350SVdzs03rZlDyQ5ToTG+72WcLPulO+wUM6I/OE7LJ8RORYGP2qOkTrjavDj9JSpHKODfzI8RDYV74MfUuZM5bMj+OEpekmdQ8EPiGuhvhXKc1g0q/OxEfSlUEuhHIelVEOd28E92KZC5RwguEdpSkhUPhHEc9hcqJybBPEcFm9tKs+xCYpyV6JKzhTEc1jIt0r+FdyzVDKVn56oymFjolReGLxMYMrUkWMGL/dIv80fKl8N4jhszlRxS6Iqh0V3yZ8qj45U5bB1p86cPFFXhyXjXyq/T4arw2SnjgXs1KfjLvmqdEeiXo675KvSMIn67rhL/lbZc5DaYfNOKc8k6uKw5C759/rPLw5L7pJ/rl/ncFjrHLR2VzNWdlczVlrS1YyVlnQ1Y6UlXc1YaUlXM1Za0tWMlZZ0NWOlJV3NWGlJVzNWWtLVjJWWdDVjrSU9zThUdk8zDpXd04xDZfc041DZPc04VHZPMw6V3dOMQ2X3NGNt9zRjbfc0Y233NGNtt5oxHC+rtVvNeFLWbjXjhTrsVjOelLVbzWgppSWNZjypi5Y0mtFSSksazagoqyWNZrSU0pJGMyrKakmjGS2ltKTRjIqyWtJoRkNpLWk0o6KsljSa0VBaSxrNqCirJY1mNJTWkkYzKspqSaMZDaXsVjMqympJoxk1ZbWk1oyKumhJrRk1ZbWk1oyaslryQ31qStvp/2LrQt+R+Yv5nv2ObE+w/cX2Ktv37B1i7yN7t1mcYDGHxS8WC1lcRTGaxXt2drBziJ1pXk21trOzlp3bLAdg+QTLTView3Imln+xXI7lhSjHZPkqy31ZHs1ycpbfM63AdAfTMEwPMW3FdBrTfPnR6c3TW6e23GvRvMBku9O1TCMzvc20O6sDsJoCrE+wWgerm7AaDKvnsNoQqzOxmhWsf7FaGqvLsRofqxey2iOrY7KaKKyvslotq/uyGjKrR7PaNquTs5o7rN+zXgDrK7AeBet3sN4J68Owng7sD7FeE+tbsR4Y66ex3hzr87Ge4VBpxtbFepmsL4p6rKxfy3q/rI/MetKsv8165azvznr4bB6AzRawOQU288DmJ9gsBpvrQDMibN6Eza6wOZjmTE09xSP+FA+b9fm/c0NsBonNM7HZKDZnxWa22PwXmyVjc2loxo3Ny7HZOzbHx2YC2Xwhm1Vkc49shpLNY7LZTjYn2p45Rdd/AQAA//9/35AUGS3FfQAAAABJRU5ErkJggg==" # noqa: E501 + + +class TestUserSchema(UserConstants): + """Test Cases Class to Schemas of User Class Model Entity Pydantic""" + + def test_hashed_password_user(self): + form = UserInDBSchema( + mobile=self._MOBILE, + password=self._PASSWORD, + ) + + assert pwd_context.verify(self._PASSWORD, form.password) + + def test_empty_update_user(self): + with raises(ValidationError): + UserUpdateSchema() + + def test_avatar_invalid_string_user(self): + with raises(ValidationError): + UserUpdateSchema(avatar="invalid_avatar") + + def test_avatar_invalid_encoded_user(self): + with raises(ValidationError): + UserUpdateSchema(avatar=b64encode("invalid_avatar".encode()).decode()) + + def test_avatar_valid_encoded_user(self): + UserUpdateSchema(avatar=self._AVATAR) + + def test_unmatched_passwords_user(self): + with raises(ValidationError): + ChangePasswordSchema( + old_password="oldpassword", + new_password="newpassword", + confirm_password="unmatchpassword", + ) + + +class TestUserModel(UserConstants): + """Test Cases Class to Methods of User Class Model Entity ORM""" + + async def test_create_simple_user(self): + form = UserInDBSchema(mobile=self._MOBILE, password=self._PASSWORD) + user = await User.sign_up(form=form) + + assert user is not None + assert isinstance(user.id, UUID) + + async def test_create_duplicate_user(self): + form = UserInDBSchema(mobile=self._MOBILE, password=self._PASSWORD) + + user1 = await User.sign_up(form=form) + assert user1 is not None + + user2 = await User.sign_up(form=form) + assert user2 is None + + async def test_create_admin_user(self): + form = UserInDBSchema( + level=Level.ADMIN, + mobile=self._MOBILE, + password=self._PASSWORD, + ) + user = await User.sign_up(form=form) + + assert user.level_ is Level.ADMIN + assert isinstance(user.level, str) + + async def test_create_avatar_user(self, monkeypatch: MonkeyPatch): + get_path = lambda mobile: str(Path(settings.USER_AVATAR_PATH) / f"{mobile}.png") + + async def mock_save_avatar(phone_number: str, avatar: bytes) -> str: + """Mocked Saved Binary Buffer Avatar Method with Returned the Path""" + + return get_path(mobile=phone_number) + + monkeypatch.setattr(target=User, name="save_avatar", value=mock_save_avatar) + + form = UserInDBSchema( + avatar=self._AVATAR, + mobile=self._MOBILE, + password=self._PASSWORD, + ) + user = await User.sign_up(form=form) + + assert user.avatar == get_path(mobile=self._MOBILE) + + async def test_login_password_user(self): + form = UserInDBSchema(mobile=self._MOBILE, password=self._PASSWORD) + user = await User.sign_up(form=form) + + assert await user.sign_in(password=self._PASSWORD) is True + assert await user.sign_in(password="wrongpassword") is False + + async def test_edit_successfull_activate_user(self): + form = UserInDBSchema(mobile=self._MOBILE, password=self._PASSWORD) + user = await User.sign_up(form=form) + + edited_form = UserUpdateSchema(is_active=False) + result = await user.edit(update_form=edited_form) + + assert result is True + + async def test_edit_failed_phone_number_user(self): + form1 = UserInDBSchema(mobile=self._MOBILE, password=self._PASSWORD) + user1 = await User.sign_up(form=form1) + + form2 = UserInDBSchema(mobile="9987654321", password=self._PASSWORD) + user2 = await User.sign_up(form=form2) + + edited_form = UserUpdateSchema(mobile=self._MOBILE) + result = await user2.edit(update_form=edited_form) + + assert result is False + + async def test_change_password_failed_user(self): + form = UserInDBSchema(mobile=self._MOBILE, password=self._PASSWORD) + user = await User.sign_up(form=form) + + new_password = "newpassword" + passwords_form = ChangePasswordSchema( + old_password="wrongpassword", + new_password=new_password, + confirm_password=new_password, + ) + result = await user.change_password(passwords=passwords_form) + + assert result is False + assert await user.sign_in(password=new_password) is False + assert await user.sign_in(password=self._PASSWORD) is True + + async def test_change_password_successfull_user(self): + form = UserInDBSchema(mobile=self._MOBILE, password=self._PASSWORD) + user = await User.sign_up(form=form) + + new_password = "newpassword" + passwords_form = ChangePasswordSchema( + old_password=self._PASSWORD, + new_password=new_password, + confirm_password=new_password, + ) + result = await user.change_password(passwords=passwords_form) + + assert result is True + assert await user.sign_in(password=new_password) is True + assert await user.sign_in(password=self._PASSWORD) is False