Compare commits

...

4 Commits

Author SHA1 Message Date
Daniel Mühlbachler-Pietrzykowski ff0f6db756
Merge 5e1e9d6c55 into 015036b499 2024-04-17 17:35:33 +05:30
Adrià Casajús 015036b499
Prevent proton mailboxes from enabling pgp encryption (#2086) 2024-04-12 15:19:41 +02:00
Son Nguyen Kim d5df91aab6
Premium user can enable data breach monitoring (#2084)
* add User.enable_data_breach_check column

* user can turn on/off the data breach check

* only run data breach check for user who enables it

* add tips to run tests using a local DB (without docker)

* refactor True check

* trim trailing space

* fix test

* Apply suggestions from code review

Co-authored-by: Adrià Casajús <acasajus@users.noreply.github.com>

* format

---------

Co-authored-by: Son NK <son@simplelogin.io>
Co-authored-by: Adrià Casajús <acasajus@users.noreply.github.com>
2024-04-12 10:39:23 +02:00
Daniel Muehlbachler-Pietrzykowski 5e1e9d6c55
feat: use oidc well-known url 2024-03-24 11:01:51 +01:00
14 changed files with 319 additions and 45 deletions

View File

@ -68,6 +68,12 @@ For most tests, you will need to have ``redis`` installed and started on your ma
sh scripts/run-test.sh sh scripts/run-test.sh
``` ```
You can also run tests using a local Postgres DB to speed things up. This can be done by
- creating an empty test DB and running the database migration by `dropdb test && createdb test && DB_URI=postgresql://localhost:5432/test alembic upgrade head`
- replacing the `DB_URI` in `test.env` file by `DB_URI=postgresql://localhost:5432/test`
## Run the code locally ## Run the code locally
Install npm packages Install npm packages

View File

@ -3,11 +3,13 @@ from flask_login import login_user
from app.auth.base import auth_bp from app.auth.base import auth_bp
from app.db import Session from app.db import Session
from app.extensions import limiter
from app.log import LOG from app.log import LOG
from app.models import EmailChange, ResetPasswordCode from app.models import EmailChange, ResetPasswordCode
@auth_bp.route("/change_email", methods=["GET", "POST"]) @auth_bp.route("/change_email", methods=["GET", "POST"])
@limiter.limit("3/hour")
def change_email(): def change_email():
code = request.args.get("code") code = request.args.get("code")

View File

@ -1,6 +1,8 @@
from flask import request, session, redirect, flash, url_for from flask import request, session, redirect, flash, url_for
from requests_oauthlib import OAuth2Session from requests_oauthlib import OAuth2Session
import requests
from app import config from app import config
from app.auth.base import auth_bp from app.auth.base import auth_bp
from app.auth.views.login_utils import after_login from app.auth.views.login_utils import after_login
@ -16,7 +18,7 @@ from app.db import Session
from app.email_utils import send_welcome_email from app.email_utils import send_welcome_email
from app.log import LOG from app.log import LOG
from app.models import User, SocialAuth from app.models import User, SocialAuth
from app.utils import encode_url, sanitize_email, sanitize_next_url from app.utils import sanitize_email, sanitize_next_url
# need to set explicitly redirect_uri instead of leaving the lib to pre-fill redirect_uri # need to set explicitly redirect_uri instead of leaving the lib to pre-fill redirect_uri
@ -24,6 +26,7 @@ from app.utils import encode_url, sanitize_email, sanitize_next_url
_redirect_uri = URL + "/auth/oidc/callback" _redirect_uri = URL + "/auth/oidc/callback"
SESSION_STATE_KEY = "oauth_state" SESSION_STATE_KEY = "oauth_state"
SESSION_NEXT_KEY = "oauth_redirect_next"
@auth_bp.route("/oidc/login") @auth_bp.route("/oidc/login")
@ -32,18 +35,21 @@ def oidc_login():
return redirect(url_for("auth.login")) return redirect(url_for("auth.login"))
next_url = sanitize_next_url(request.args.get("next")) next_url = sanitize_next_url(request.args.get("next"))
if next_url:
redirect_uri = _redirect_uri + "?next=" + encode_url(next_url) auth_url = OIDC_AUTHORIZATION_URL
else: if config.OIDC_WELL_KNOWN_URL is not None:
redirect_uri = _redirect_uri auth_url = requests.get(config.OIDC_WELL_KNOWN_URL).json()[
"authorization_endpoint"
]
oidc = OAuth2Session( oidc = OAuth2Session(
config.OIDC_CLIENT_ID, scope=[OIDC_SCOPES], redirect_uri=redirect_uri config.OIDC_CLIENT_ID, scope=[OIDC_SCOPES], redirect_uri=_redirect_uri
) )
authorization_url, state = oidc.authorization_url(OIDC_AUTHORIZATION_URL) authorization_url, state = oidc.authorization_url(auth_url)
# State is used to prevent CSRF, keep this for later. # State is used to prevent CSRF, keep this for later.
session[SESSION_STATE_KEY] = state session[SESSION_STATE_KEY] = state
session[SESSION_NEXT_KEY] = next_url
return redirect(authorization_url) return redirect(authorization_url)
@ -60,6 +66,13 @@ def oidc_callback():
flash("Please use another sign in method then", "warning") flash("Please use another sign in method then", "warning")
return redirect("/") return redirect("/")
token_url = OIDC_TOKEN_URL
user_info_url = OIDC_USER_INFO_URL
if config.OIDC_WELL_KNOWN_URL is not None:
oidc_configuration = requests.get(config.OIDC_WELL_KNOWN_URL).json()
user_info_url = oidc_configuration["userinfo_endpoint"]
token_url = oidc_configuration["token_endpoint"]
oidc = OAuth2Session( oidc = OAuth2Session(
config.OIDC_CLIENT_ID, config.OIDC_CLIENT_ID,
state=session[SESSION_STATE_KEY], state=session[SESSION_STATE_KEY],
@ -67,12 +80,12 @@ def oidc_callback():
redirect_uri=_redirect_uri, redirect_uri=_redirect_uri,
) )
oidc.fetch_token( oidc.fetch_token(
OIDC_TOKEN_URL, token_url,
client_secret=config.OIDC_CLIENT_SECRET, client_secret=config.OIDC_CLIENT_SECRET,
authorization_response=request.url, authorization_response=request.url,
) )
oidc_user_data = oidc.get(OIDC_USER_INFO_URL) oidc_user_data = oidc.get(user_info_url)
if oidc_user_data.status_code != 200: if oidc_user_data.status_code != 200:
LOG.e( LOG.e(
f"cannot get oidc user data {oidc_user_data.status_code} {oidc_user_data.text}" f"cannot get oidc user data {oidc_user_data.status_code} {oidc_user_data.text}"
@ -111,7 +124,8 @@ def oidc_callback():
Session.commit() Session.commit()
# The activation link contains the original page, for ex authorize page # The activation link contains the original page, for ex authorize page
next_url = sanitize_next_url(request.args.get("next")) if request.args else None next_url = session[SESSION_NEXT_KEY]
session[SESSION_NEXT_KEY] = None
return after_login(user, next_url) return after_login(user, next_url)

View File

@ -245,6 +245,7 @@ FACEBOOK_CLIENT_ID = os.environ.get("FACEBOOK_CLIENT_ID")
FACEBOOK_CLIENT_SECRET = os.environ.get("FACEBOOK_CLIENT_SECRET") FACEBOOK_CLIENT_SECRET = os.environ.get("FACEBOOK_CLIENT_SECRET")
CONNECT_WITH_OIDC_ICON = os.environ.get("CONNECT_WITH_OIDC_ICON") CONNECT_WITH_OIDC_ICON = os.environ.get("CONNECT_WITH_OIDC_ICON")
OIDC_WELL_KNOWN_URL = os.environ.get("OIDC_WELL_KNOWN_URL")
OIDC_AUTHORIZATION_URL = os.environ.get("OIDC_AUTHORIZATION_URL") OIDC_AUTHORIZATION_URL = os.environ.get("OIDC_AUTHORIZATION_URL")
OIDC_USER_INFO_URL = os.environ.get("OIDC_USER_INFO_URL") OIDC_USER_INFO_URL = os.environ.get("OIDC_USER_INFO_URL")
OIDC_TOKEN_URL = os.environ.get("OIDC_TOKEN_URL") OIDC_TOKEN_URL = os.environ.get("OIDC_TOKEN_URL")

View File

@ -179,8 +179,15 @@ def mailbox_detail_route(mailbox_id):
elif request.form.get("form-name") == "toggle-pgp": elif request.form.get("form-name") == "toggle-pgp":
if request.form.get("pgp-enabled") == "on": if request.form.get("pgp-enabled") == "on":
mailbox.disable_pgp = False if mailbox.is_proton():
flash(f"PGP is enabled on {mailbox.email}", "success") mailbox.disable_pgp = True
flash(
"Enabling PGP for a Proton Mail mailbox is redundant and does not add any security benefit",
"info",
)
else:
mailbox.disable_pgp = False
flash(f"PGP is enabled on {mailbox.email}", "info")
else: else:
mailbox.disable_pgp = True mailbox.disable_pgp = True
flash(f"PGP is disabled on {mailbox.email}", "info") flash(f"PGP is disabled on {mailbox.email}", "info")

View File

@ -227,6 +227,21 @@ def setting():
Session.commit() Session.commit()
flash("Your preference has been updated", "success") flash("Your preference has been updated", "success")
return redirect(url_for("dashboard.setting")) return redirect(url_for("dashboard.setting"))
elif request.form.get("form-name") == "enable_data_breach_check":
if not current_user.is_premium():
flash("Only premium plan can enable data breach monitoring", "warning")
return redirect(url_for("dashboard.setting"))
choose = request.form.get("enable_data_breach_check")
if choose == "on":
LOG.i("User {current_user} has enabled data breach monitoring")
current_user.enable_data_breach_check = True
flash("Data breach monitoring is enabled", "success")
else:
LOG.i("User {current_user} has disabled data breach monitoring")
current_user.enable_data_breach_check = False
flash("Data breach monitoring is disabled", "info")
Session.commit()
return redirect(url_for("dashboard.setting"))
elif request.form.get("form-name") == "sender-in-ra": elif request.form.get("form-name") == "sender-in-ra":
choose = request.form.get("enable") choose = request.form.get("enable")
if choose == "on": if choose == "on":

View File

@ -525,6 +525,11 @@ class User(Base, ModelMixin, UserMixin, PasswordOracle):
sa.Boolean, default=True, nullable=False, server_default="1" sa.Boolean, default=True, nullable=False, server_default="1"
) )
# user opted in for data breach check
enable_data_breach_check = sa.Column(
sa.Boolean, default=False, nullable=False, server_default="0"
)
# bitwise flags. Allow for future expansion # bitwise flags. Allow for future expansion
flags = sa.Column( flags = sa.Column(
sa.BigInteger, sa.BigInteger,

View File

@ -1070,6 +1070,7 @@ def get_alias_to_check_hibp(
Alias.id >= min_alias_id, Alias.id >= min_alias_id,
Alias.id < max_alias_id, Alias.id < max_alias_id,
User.disabled == False, # noqa: E712 User.disabled == False, # noqa: E712
User.enable_data_breach_check,
or_( or_(
User.lifetime, User.lifetime,
ManualSubscription.end_at > now, ManualSubscription.end_at > now,

View File

@ -118,6 +118,7 @@ WORDS_FILE_PATH=local_data/test_words.txt
# Login with OIDC # Login with OIDC
# CONNECT_WITH_OIDC_ICON=fa-github # CONNECT_WITH_OIDC_ICON=fa-github
# OIDC_WELL_KNOWN_URL=to_fill
# OIDC_AUTHORIZATION_URL=to_fill # OIDC_AUTHORIZATION_URL=to_fill
# OIDC_USER_INFO_URL=to_fill # OIDC_USER_INFO_URL=to_fill
# OIDC_TOKEN_URL=to_fill # OIDC_TOKEN_URL=to_fill

View File

@ -0,0 +1,29 @@
"""empty message
Revision ID: fa2f19bb4e5a
Revises: 52510a633d6f
Create Date: 2024-04-09 13:12:26.305340
"""
import sqlalchemy_utils
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'fa2f19bb4e5a'
down_revision = '52510a633d6f'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('users', sa.Column('enable_data_breach_check', sa.Boolean(), server_default='0', nullable=False))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('users', 'enable_data_breach_check')
# ### end Alembic commands ###

View File

@ -249,6 +249,42 @@
</div> </div>
</div> </div>
<!-- END Random alias --> <!-- END Random alias -->
<!-- Data breach check -->
<div class="card" id="data-breach">
<div class="card-body">
<div class="card-title">Data breach monitoring</div>
<div class="mt-1 mb-3">
{% if not current_user.is_premium() %}
<div class="alert alert-info" role="alert">
This feature is only available on Premium plan.
<a href="{{ url_for('dashboard.pricing') }}"
target="_blank"
rel="noopener noreferrer">
Upgrade<i class="fe fe-external-link"></i>
</a>
</div>
{% endif %}
If enabled, we will inform you via email if one of your aliases appears in a data breach.
<br>
SimpleLogin uses <a href="https://haveibeenpwned.com/">HaveIBeenPwned</a> API for checking for data breaches.
</div>
<form method="post" action="#data-breach">
{{ csrf_form.csrf_token }}
<input type="hidden" name="form-name" value="enable_data_breach_check">
<div class="form-check">
<input type="checkbox"
id="enable_data_breach_check"
name="enable_data_breach_check"
{% if current_user.enable_data_breach_check %} checked{% endif %}
class="form-check-input">
<label for="enable_data_breach_check">Enable data breach monitoring</label>
</div>
<button type="submit" class="btn btn-outline-primary">Update</button>
</form>
</div>
</div>
<!-- END Data breach check -->
<!-- Sender Format --> <!-- Sender Format -->
<div class="card" id="sender-format"> <div class="card" id="sender-format">
<div class="card-body"> <div class="card-body">
@ -285,7 +321,9 @@
No Name (i.e. only reverse-alias) No Name (i.e. only reverse-alias)
</option> </option>
</select> </select>
<button class="btn btn-outline-primary mt-3">Update</button> <button class="btn btn-outline-primary mt-3">
Update
</button>
</form> </form>
</div> </div>
</div> </div>
@ -295,7 +333,9 @@
<div class="card-body"> <div class="card-body">
<div class="card-title"> <div class="card-title">
Reverse Alias Replacement Reverse Alias Replacement
<div class="badge badge-warning">Experimental</div> <div class="badge badge-warning">
Experimental
</div>
</div> </div>
<div class="mb-3"> <div class="mb-3">
When replying to a forwarded email, the <b>reverse-alias</b> can be automatically included When replying to a forwarded email, the <b>reverse-alias</b> can be automatically included
@ -312,9 +352,13 @@
name="replace-ra" name="replace-ra"
{% if current_user.replace_reverse_alias %} checked{% endif %} {% if current_user.replace_reverse_alias %} checked{% endif %}
class="form-check-input"> class="form-check-input">
<label for="replace-ra">Enable replacing reverse alias</label> <label for="replace-ra">
Enable replacing reverse alias
</label>
</div> </div>
<button type="submit" class="btn btn-outline-primary">Update</button> <button type="submit" class="btn btn-outline-primary">
Update
</button>
</form> </form>
</div> </div>
</div> </div>

View File

@ -1,5 +1,5 @@
from app import config from app import config
from flask import url_for from flask import url_for, session
from urllib.parse import parse_qs from urllib.parse import parse_qs
from urllib3.util import parse_url from urllib3.util import parse_url
from app.auth.views.oidc import create_user from app.auth.views.oidc import create_user
@ -11,6 +11,64 @@ from app.config import URL, OIDC_CLIENT_ID
def test_oidc_login(flask_client): def test_oidc_login(flask_client):
config.OIDC_WELL_KNOWN_URL = None
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
with flask_client:
r = flask_client.get(
url_for("auth.oidc_login"),
follow_redirects=False,
)
location = r.headers.get("Location")
assert location is not None
parsed = parse_url(location)
query = parse_qs(parsed.query)
expected_redirect_url = f"{URL}/auth/oidc/callback"
assert "code" == query["response_type"][0]
assert OIDC_CLIENT_ID == query["client_id"][0]
assert expected_redirect_url == query["redirect_uri"][0]
assert session["oauth_redirect_next"] is None
def test_oidc_login_next_url(flask_client):
config.OIDC_WELL_KNOWN_URL = None
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
with flask_client:
r = flask_client.get(
url_for("auth.oidc_login", next="/dashboard/settings/"),
follow_redirects=False,
)
location = r.headers.get("Location")
assert location is not None
parsed = parse_url(location)
query = parse_qs(parsed.query)
expected_redirect_url = f"{URL}/auth/oidc/callback"
assert "code" == query["response_type"][0]
assert OIDC_CLIENT_ID == query["client_id"][0]
assert expected_redirect_url == query["redirect_uri"][0]
assert session["oauth_redirect_next"] == "/dashboard/settings/"
@patch("requests.get")
def test_oidc_login_with_well_known_url(mock_get, flask_client):
config.OIDC_WELL_KNOWN_URL = None
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
mock_get.return_value.json.return_value = {
"authorization_endpoint": "http://localhost:7777/authorization-endpoint",
}
config.OIDC_WELL_KNOWN_URL = "http://localhost:7777/well-known-url"
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_login"), url_for("auth.oidc_login"),
follow_redirects=False, follow_redirects=False,
@ -30,6 +88,9 @@ def test_oidc_login(flask_client):
def test_oidc_login_no_client_id(flask_client): def test_oidc_login_no_client_id(flask_client):
config.OIDC_CLIENT_ID = None config.OIDC_CLIENT_ID = None
config.OIDC_WELL_KNOWN_URL = None
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_login"), url_for("auth.oidc_login"),
@ -49,6 +110,9 @@ def test_oidc_login_no_client_id(flask_client):
def test_oidc_login_no_client_secret(flask_client): def test_oidc_login_no_client_secret(flask_client):
config.OIDC_CLIENT_SECRET = None config.OIDC_CLIENT_SECRET = None
config.OIDC_WELL_KNOWN_URL = None
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_login"), url_for("auth.oidc_login"),
@ -67,8 +131,10 @@ def test_oidc_login_no_client_secret(flask_client):
def test_oidc_callback_no_oauth_state(flask_client): def test_oidc_callback_no_oauth_state(flask_client):
with flask_client.session_transaction() as session: config.OIDC_WELL_KNOWN_URL = None
session["oauth_state"] = None with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = None
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_callback"), url_for("auth.oidc_callback"),
@ -79,8 +145,10 @@ def test_oidc_callback_no_oauth_state(flask_client):
def test_oidc_callback_no_client_id(flask_client): def test_oidc_callback_no_client_id(flask_client):
with flask_client.session_transaction() as session: config.OIDC_WELL_KNOWN_URL = None
session["oauth_state"] = "state" with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = "state"
config.OIDC_CLIENT_ID = None config.OIDC_CLIENT_ID = None
r = flask_client.get( r = flask_client.get(
@ -97,13 +165,15 @@ def test_oidc_callback_no_client_id(flask_client):
assert expected_redirect_url == parsed.path assert expected_redirect_url == parsed.path
config.OIDC_CLIENT_ID = "to_fill" config.OIDC_CLIENT_ID = "to_fill"
with flask_client.session_transaction() as session: with flask_client.session_transaction() as sess:
session["oauth_state"] = None sess["oauth_state"] = None
def test_oidc_callback_no_client_secret(flask_client): def test_oidc_callback_no_client_secret(flask_client):
with flask_client.session_transaction() as session: config.OIDC_WELL_KNOWN_URL = None
session["oauth_state"] = "state" with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = "state"
config.OIDC_CLIENT_SECRET = None config.OIDC_CLIENT_SECRET = None
r = flask_client.get( r = flask_client.get(
@ -120,16 +190,18 @@ def test_oidc_callback_no_client_secret(flask_client):
assert expected_redirect_url == parsed.path assert expected_redirect_url == parsed.path
config.OIDC_CLIENT_SECRET = "to_fill" config.OIDC_CLIENT_SECRET = "to_fill"
with flask_client.session_transaction() as session: with flask_client.session_transaction() as sess:
session["oauth_state"] = None sess["oauth_state"] = None
@patch("requests_oauthlib.OAuth2Session.fetch_token") @patch("requests_oauthlib.OAuth2Session.fetch_token")
@patch("requests_oauthlib.OAuth2Session.get") @patch("requests_oauthlib.OAuth2Session.get")
def test_oidc_callback_invalid_user(mock_get, mock_fetch_token, flask_client): def test_oidc_callback_invalid_user(mock_get, mock_fetch_token, flask_client):
mock_get.return_value = MockResponse(400, {}) mock_get.return_value = MockResponse(400, {})
with flask_client.session_transaction() as session: config.OIDC_WELL_KNOWN_URL = None
session["oauth_state"] = "state" with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = "state"
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_callback"), url_for("auth.oidc_callback"),
@ -145,16 +217,18 @@ def test_oidc_callback_invalid_user(mock_get, mock_fetch_token, flask_client):
assert expected_redirect_url == parsed.path assert expected_redirect_url == parsed.path
assert mock_get.called assert mock_get.called
with flask_client.session_transaction() as session: with flask_client.session_transaction() as sess:
session["oauth_state"] = None sess["oauth_state"] = None
@patch("requests_oauthlib.OAuth2Session.fetch_token") @patch("requests_oauthlib.OAuth2Session.fetch_token")
@patch("requests_oauthlib.OAuth2Session.get") @patch("requests_oauthlib.OAuth2Session.get")
def test_oidc_callback_no_email(mock_get, mock_fetch_token, flask_client): def test_oidc_callback_no_email(mock_get, mock_fetch_token, flask_client):
mock_get.return_value = MockResponse(200, {}) mock_get.return_value = MockResponse(200, {})
with flask_client.session_transaction() as session: config.OIDC_WELL_KNOWN_URL = None
session["oauth_state"] = "state" with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = "state"
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_callback"), url_for("auth.oidc_callback"),
@ -180,8 +254,10 @@ def test_oidc_callback_disabled_registration(mock_get, mock_fetch_token, flask_c
config.DISABLE_REGISTRATION = True config.DISABLE_REGISTRATION = True
email = random_string() email = random_string()
mock_get.return_value = MockResponse(200, {"email": email}) mock_get.return_value = MockResponse(200, {"email": email})
with flask_client.session_transaction() as session: config.OIDC_WELL_KNOWN_URL = None
session["oauth_state"] = "state" with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = "state"
r = flask_client.get( r = flask_client.get(
url_for("auth.oidc_callback"), url_for("auth.oidc_callback"),
@ -198,8 +274,8 @@ def test_oidc_callback_disabled_registration(mock_get, mock_fetch_token, flask_c
assert mock_get.called assert mock_get.called
config.DISABLE_REGISTRATION = False config.DISABLE_REGISTRATION = False
with flask_client.session_transaction() as session: with flask_client.session_transaction() as sess:
session["oauth_state"] = None sess["oauth_state"] = None
@patch("requests_oauthlib.OAuth2Session.fetch_token") @patch("requests_oauthlib.OAuth2Session.fetch_token")
@ -213,8 +289,10 @@ def test_oidc_callback_registration(mock_get, mock_fetch_token, flask_client):
config.OIDC_NAME_FIELD: "name", config.OIDC_NAME_FIELD: "name",
}, },
) )
with flask_client.session_transaction() as session: config.OIDC_WELL_KNOWN_URL = None
session["oauth_state"] = "state" with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = "state"
user = User.get_by(email=email) user = User.get_by(email=email)
assert user is None assert user is None
@ -237,8 +315,8 @@ def test_oidc_callback_registration(mock_get, mock_fetch_token, flask_client):
assert user is not None assert user is not None
assert user.email == email assert user.email == email
with flask_client.session_transaction() as session: with flask_client.session_transaction() as sess:
session["oauth_state"] = None sess["oauth_state"] = None
@patch("requests_oauthlib.OAuth2Session.fetch_token") @patch("requests_oauthlib.OAuth2Session.fetch_token")
@ -251,8 +329,10 @@ def test_oidc_callback_login(mock_get, mock_fetch_token, flask_client):
"email": email, "email": email,
}, },
) )
with flask_client.session_transaction() as session: config.OIDC_WELL_KNOWN_URL = None
session["oauth_state"] = "state" with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = None
sess["oauth_state"] = "state"
user = User.create( user = User.create(
email=email, email=email,
@ -277,8 +357,50 @@ def test_oidc_callback_login(mock_get, mock_fetch_token, flask_client):
assert expected_redirect_url == parsed.path assert expected_redirect_url == parsed.path
assert mock_get.called assert mock_get.called
with flask_client.session_transaction() as session: with flask_client.session_transaction() as sess:
session["oauth_state"] = None sess["oauth_state"] = None
@patch("requests_oauthlib.OAuth2Session.fetch_token")
@patch("requests_oauthlib.OAuth2Session.get")
def test_oidc_callback_login_with_next_url(mock_get, mock_fetch_token, flask_client):
email = random_string()
mock_get.return_value = MockResponse(
200,
{
"email": email,
},
)
config.OIDC_WELL_KNOWN_URL = None
with flask_client.session_transaction() as sess:
sess["oauth_redirect_next"] = "/dashboard/settings/"
sess["oauth_state"] = "state"
user = User.create(
email=email,
name="name",
password="",
activated=True,
)
user = User.get_by(email=email)
assert user is not None
r = flask_client.get(
url_for("auth.oidc_callback"),
follow_redirects=False,
)
location = r.headers.get("Location")
assert location is not None
parsed = parse_url(location)
expected_redirect_url = "/dashboard/settings/"
assert expected_redirect_url == parsed.path
assert mock_get.called
with flask_client.session_transaction() as sess:
sess["oauth_state"] = None
def test_create_user(): def test_create_user():

View File

@ -31,6 +31,7 @@ def test_get_alias_for_free_user_has_no_alias():
def test_get_alias_for_lifetime_with_null_hibp_date(): def test_get_alias_for_lifetime_with_null_hibp_date():
user = create_new_user() user = create_new_user()
user.lifetime = True user.lifetime = True
user.enable_data_breach_check = True
alias_id = Alias.create_new_random(user).id alias_id = Alias.create_new_random(user).id
Session.commit() Session.commit()
aliases = list( aliases = list(
@ -42,6 +43,7 @@ def test_get_alias_for_lifetime_with_null_hibp_date():
def test_get_alias_for_lifetime_with_old_hibp_date(): def test_get_alias_for_lifetime_with_old_hibp_date():
user = create_new_user() user = create_new_user()
user.lifetime = True user.lifetime = True
user.enable_data_breach_check = True
alias = Alias.create_new_random(user) alias = Alias.create_new_random(user)
alias.hibp_last_check = arrow.now().shift(days=-1) alias.hibp_last_check = arrow.now().shift(days=-1)
alias_id = alias.id alias_id = alias.id
@ -97,6 +99,7 @@ sub_generator_list = [
@pytest.mark.parametrize("sub_generator", sub_generator_list) @pytest.mark.parametrize("sub_generator", sub_generator_list)
def test_get_alias_for_sub(sub_generator): def test_get_alias_for_sub(sub_generator):
user = create_new_user() user = create_new_user()
user.enable_data_breach_check = True
sub_generator(user) sub_generator(user)
alias_id = Alias.create_new_random(user).id alias_id = Alias.create_new_random(user).id
Session.commit() Session.commit()
@ -140,3 +143,26 @@ def test_already_checked_is_not_checked():
cron.get_alias_to_check_hibp(arrow.now(), [user.id], alias_id, alias_id + 1) cron.get_alias_to_check_hibp(arrow.now(), [user.id], alias_id, alias_id + 1)
) )
assert len(aliases) == 0 assert len(aliases) == 0
def test_outed_in_user_is_checked():
user = create_new_user()
user.lifetime = True
user.enable_data_breach_check = True
alias_id = Alias.create_new_random(user).id
Session.commit()
aliases = list(
cron.get_alias_to_check_hibp(arrow.now(), [], alias_id, alias_id + 1)
)
assert len(aliases) == 1
def test_outed_out_user_is_not_checked():
user = create_new_user()
user.lifetime = True
alias_id = Alias.create_new_random(user).id
Session.commit()
aliases = list(
cron.get_alias_to_check_hibp(arrow.now(), [], alias_id, alias_id + 1)
)
assert len(aliases) == 0

View File

@ -51,6 +51,7 @@ FACEBOOK_CLIENT_SECRET=to_fill
# Login with OIDC # Login with OIDC
CONNECT_WITH_OIDC_ICON=fa-github CONNECT_WITH_OIDC_ICON=fa-github
# OIDC_WELL_KNOWN_URL=to_fill
OIDC_AUTHORIZATION_URL=to_fill OIDC_AUTHORIZATION_URL=to_fill
OIDC_USER_INFO_URL=to_fill OIDC_USER_INFO_URL=to_fill
OIDC_TOKEN_URL=to_fill OIDC_TOKEN_URL=to_fill