|
8 | 8 | from prisma import Prisma |
9 | 9 | from prisma.models import User, Profile |
10 | 10 |
|
11 | | -from ..utils import CURRENT_DATABASE |
| 11 | +from ..utils import CURRENT_DATABASE, RawQueries |
12 | 12 |
|
13 | 13 |
|
14 | 14 | @pytest.mark.asyncio |
@@ -215,126 +215,58 @@ async def test_transaction_already_closed(client: Prisma) -> None: |
215 | 215 |
|
216 | 216 |
|
217 | 217 | @pytest.mark.asyncio |
218 | | -@pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available') |
219 | | -async def test_read_uncommited_isolation_level(client: Prisma) -> None: |
220 | | - """A transaction isolation level is set to `READ_UNCOMMITED`""" |
221 | | - client2 = Prisma() |
222 | | - await client2.connect() |
223 | | - |
224 | | - user = await client.user.create(data={'name': 'Robert'}) |
225 | | - |
226 | | - async with client.tx(isolation_level=prisma.TransactionIsolationLevel.READ_UNCOMMITED) as tx1: |
227 | | - tx1_user = await tx1.user.find_first_or_raise(where={'id': user.id}) |
228 | | - tx1_count = await tx1.user.count() |
229 | | - |
230 | | - async with client2.tx() as tx2: |
231 | | - await tx2.user.update(data={'name': 'Tegan'}, where={'id': user.id}) |
232 | | - await tx2.user.create(data={'name': 'Bobby'}) |
233 | | - |
234 | | - dirty_user = await tx1.user.find_first_or_raise(where={'id': user.id}) |
235 | | - |
236 | | - non_repeatable_user = await tx1.user.find_first_or_raise(where={'id': user.id}) |
237 | | - phantom_count = await tx1.user.count() |
238 | | - |
239 | | - # Have dirty read |
240 | | - assert tx1_user.name != dirty_user.name |
241 | | - # Have non-repeatable read |
242 | | - assert tx1_user.name != non_repeatable_user.name |
243 | | - # Have phantom read |
244 | | - assert tx1_count != phantom_count |
245 | | - |
246 | | - |
247 | | -@pytest.mark.asyncio |
248 | | -@pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available') |
249 | | -async def test_read_commited_isolation_level(client: Prisma) -> None: |
250 | | - """A transaction isolation level is set to `READ_COMMITED`""" |
251 | | - client2 = Prisma() |
252 | | - await client2.connect() |
253 | | - |
254 | | - user = await client.user.create(data={'name': 'Robert'}) |
255 | | - |
256 | | - async with client.tx(isolation_level=prisma.TransactionIsolationLevel.READ_COMMITED) as tx1: |
257 | | - tx1_user = await tx1.user.find_first_or_raise(where={'id': user.id}) |
258 | | - tx1_count = await tx1.user.count() |
259 | | - |
260 | | - async with client2.tx() as tx2: |
261 | | - await tx2.user.update(data={'name': 'Tegan'}, where={'id': user.id}) |
262 | | - await tx2.user.create(data={'name': 'Bobby'}) |
263 | | - |
264 | | - dirty_user = await tx1.user.find_first_or_raise(where={'id': user.id}) |
265 | | - |
266 | | - non_repeatable_user = await tx1.user.find_first_or_raise(where={'id': user.id}) |
267 | | - phantom_count = await tx1.user.count() |
268 | | - |
269 | | - # No dirty read |
270 | | - assert tx1_user.name == dirty_user.name |
271 | | - # Have non-repeatable read |
272 | | - assert tx1_user.name != non_repeatable_user.name |
273 | | - # Have phantom read |
274 | | - assert tx1_count != phantom_count |
275 | | - |
276 | | - |
277 | | -@pytest.mark.asyncio |
278 | | -@pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available') |
279 | | -async def test_repeatable_read_isolation_level(client: Prisma) -> None: |
280 | | - """A transaction isolation level is set to `REPEATABLE_READ`""" |
281 | | - client2 = Prisma() |
282 | | - await client2.connect() |
283 | | - |
284 | | - user = await client.user.create(data={'name': 'Robert'}) |
285 | | - |
286 | | - async with client.tx(isolation_level=prisma.TransactionIsolationLevel.REPEATABLE_READ) as tx1: |
287 | | - tx1_user = await tx1.user.find_first_or_raise(where={'id': user.id}) |
288 | | - tx1_count = await tx1.user.count() |
289 | | - |
290 | | - async with client2.tx() as tx2: |
291 | | - await tx2.user.update(data={'name': 'Tegan'}, where={'id': user.id}) |
292 | | - await tx2.user.create(data={'name': 'Bobby'}) |
293 | | - |
294 | | - dirty_user = await tx1.user.find_first_or_raise(where={'id': user.id}) |
295 | | - |
296 | | - non_repeatable_user = await tx1.user.find_first_or_raise(where={'id': user.id}) |
297 | | - phantom_count = await tx1.user.count() |
298 | | - |
299 | | - # No dirty read |
300 | | - assert tx1_user.name == dirty_user.name |
301 | | - # No non-repeatable read |
302 | | - assert tx1_user.name == non_repeatable_user.name |
303 | | - # Have phantom read |
304 | | - assert tx1_count != phantom_count |
305 | | - |
306 | | - |
307 | | -@pytest.mark.asyncio |
308 | | -@pytest.mark.skipif(True, reason='Available for SQL Server only') |
309 | | -async def test_snapshot_isolation_level() -> None: |
310 | | - """A transaction isolation level is set to `SNAPSHOT`""" |
311 | | - raise NotImplementedError |
312 | | - |
313 | | - |
314 | | -@pytest.mark.asyncio |
315 | | -async def test_serializable_isolation_level(client: Prisma) -> None: |
316 | | - """A transaction isolation level is set to `SERIALIZABLE`""" |
317 | | - client2 = Prisma() |
318 | | - await client2.connect() |
319 | | - |
320 | | - user = await client.user.create(data={'name': 'Robert'}) |
321 | | - |
322 | | - async with client.tx(isolation_level=prisma.TransactionIsolationLevel.SERIALIZABLE) as tx1: |
323 | | - tx1_user = await tx1.user.find_first_or_raise(where={'id': user.id}) |
324 | | - tx1_count = await tx1.user.count() |
325 | | - |
326 | | - async with client2.tx() as tx2: |
327 | | - await tx2.user.update(data={'name': 'Tegan'}, where={'id': user.id}) |
328 | | - await tx2.user.create(data={'name': 'Bobby'}) |
| 218 | +@pytest.mark.parametrize( |
| 219 | + ('input_level', 'expected_level'), |
| 220 | + [ |
| 221 | + pytest.param( |
| 222 | + prisma.TransactionIsolationLevel.READ_UNCOMMITTED, |
| 223 | + 'READ_UNCOMMITTED', |
| 224 | + id='read uncommitted', |
| 225 | + marks=pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available'), |
| 226 | + ), |
| 227 | + pytest.param( |
| 228 | + prisma.TransactionIsolationLevel.READ_COMMITTED, |
| 229 | + 'READ_COMMITTED', |
| 230 | + id='read committed', |
| 231 | + marks=pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available'), |
| 232 | + ), |
| 233 | + pytest.param( |
| 234 | + prisma.TransactionIsolationLevel.REPEATABLE_READ, |
| 235 | + 'REPEATABLE_READ', |
| 236 | + id='repeatable read', |
| 237 | + marks=pytest.mark.skipif(CURRENT_DATABASE in ['cockroachdb', 'sqlite'], reason='Not available'), |
| 238 | + ), |
| 239 | + pytest.param( |
| 240 | + prisma.TransactionIsolationLevel.SNAPSHOT, |
| 241 | + 'SNAPSHOT', |
| 242 | + id='snapshot', |
| 243 | + marks=pytest.mark.skipif(True, reason='Available for SQL Server only'), |
| 244 | + ), |
| 245 | + pytest.param( |
| 246 | + prisma.TransactionIsolationLevel.SERIALIZABLE, |
| 247 | + 'SERIALIZABLE', |
| 248 | + id='serializable', |
| 249 | + marks=pytest.mark.skipif( |
| 250 | + CURRENT_DATABASE == 'sqlite', reason='PRAGMA has only effect in shared-cache mode' |
| 251 | + ), |
| 252 | + ), |
| 253 | + ], |
| 254 | +) |
| 255 | +# TODO: remove after issue will be resolved |
| 256 | +@pytest.mark.skipif(CURRENT_DATABASE in ['mysql', 'mariadb'], reason='https://github.com/prisma/prisma/issues/22890') |
| 257 | +async def test_isolation_level( |
| 258 | + client: Prisma, raw_queries: RawQueries, input_level: prisma.TransactionIsolationLevel, expected_level: str |
| 259 | +) -> None: |
| 260 | + """A transaction isolation level is set correctly""" |
| 261 | + async with client.tx(isolation_level=input_level) as tx: |
| 262 | + results = await tx.query_raw(raw_queries.select_tx_isolation) |
329 | 263 |
|
330 | | - dirty_user = await tx1.user.find_first_or_raise(where={'id': user.id}) |
| 264 | + assert len(results) == 1 |
331 | 265 |
|
332 | | - non_repeatable_user = await tx1.user.find_first_or_raise(where={'id': user.id}) |
333 | | - phantom_count = await tx1.user.count() |
| 266 | + row = results[0] |
| 267 | + assert any(row) |
334 | 268 |
|
335 | | - # No dirty read |
336 | | - assert tx1_user.name == dirty_user.name |
337 | | - # No non-repeatable read |
338 | | - assert tx1_user.name == non_repeatable_user.name |
339 | | - # No phantom read |
340 | | - assert tx1_count == phantom_count |
| 269 | + level = next(iter(row.values())) |
| 270 | + # The result can depends on the database, so we do upper() and replace() |
| 271 | + level = str(level).upper().replace(' ', '_').replace('-', '_') |
| 272 | + assert level == expected_level |
0 commit comments