Added too many exceptions test (#1378)

Co-authored-by: Adrià Casajús <adria.casajus@proton.ch>
This commit is contained in:
Adrià Casajús 2022-10-27 14:04:03 +02:00 committed by GitHub
parent 02f42821c5
commit 6d8fba0320
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 29 additions and 0 deletions

View File

@ -1,3 +1,8 @@
import threading
import time
from typing import Optional
import werkzeug.exceptions
from flask_login import login_user
from app.parallel_limiter import _InnerLock
@ -11,3 +16,27 @@ def test_parallel_limiter(flask_app):
pl = _InnerLock("test", max_wait_secs=1)
for loop_id in range(10):
assert pl(lambda x: x)(loop_id) == loop_id
def sleep_thread(pl: _InnerLock, sem: Optional[threading.Semaphore] = None):
if sem is not None:
sem.release()
pl(time.sleep)(1)
def test_too_many_requests(flask_app):
user = create_new_user()
with flask_app.test_request_context():
login_user(user)
sem = threading.Semaphore(0)
pl = _InnerLock("test", max_wait_secs=5)
t = threading.Thread(target=sleep_thread, args=(pl, sem))
t.daemon = True
t.start()
sem.acquire()
try:
got_exception = False
pl(sleep_thread)(pl)
except werkzeug.exceptions.TooManyRequests:
got_exception = True
assert got_exception