Add support for allowed redirect domains

This commit is contained in:
Carlos Quintana 2022-02-16 09:38:55 +01:00
parent 39222cf868
commit a44acf1846
No known key found for this signature in database
GPG Key ID: 9A3A2DE1C3E2A4B1
5 changed files with 47 additions and 9 deletions

View File

@ -418,3 +418,5 @@ PHONE_PROVIDER_2_SECRET = os.environ.get("PHONE_PROVIDER_2_SECRET")
ZENDESK_HOST = os.environ.get("ZENDESK_HOST") ZENDESK_HOST = os.environ.get("ZENDESK_HOST")
ZENDESK_API_TOKEN = os.environ.get("ZENDESK_API_TOKEN") ZENDESK_API_TOKEN = os.environ.get("ZENDESK_API_TOKEN")
ZENDESK_ENABLED = "ZENDESK_ENABLED" in os.environ ZENDESK_ENABLED = "ZENDESK_ENABLED" in os.environ
ALLOWED_REDIRECT_DOMAINS = sl_getenv("ALLOWED_REDIRECT_DOMAINS", list) or []

View File

@ -3,11 +3,11 @@ import string
import time import time
import urllib.parse import urllib.parse
from functools import wraps from functools import wraps
from typing import Optional from typing import List, Optional
from unidecode import unidecode from unidecode import unidecode
from .config import WORDS_FILE_PATH from .config import WORDS_FILE_PATH, ALLOWED_REDIRECT_DOMAINS
from .log import LOG from .log import LOG
with open(WORDS_FILE_PATH) as f: with open(WORDS_FILE_PATH) as f:
@ -75,12 +75,37 @@ def sanitize_email(email_address: str, not_lower=False) -> str:
return email_address return email_address
class NextUrlSanitizer:
def __init__(self, allowed_domains: List[str]):
self.allowed_domains = allowed_domains
def sanitize(self, url: Optional[str]) -> Optional[str]:
if not url:
return None
# Relative redirect
if url[0] == "/":
return url
return self.__handle_absolute_redirect(url)
def __handle_absolute_redirect(self, url: str) -> Optional[str]:
if not self.__is_absolute_url(url):
# Unknown url, something like &next=something.example.com
return None
parsed = urllib.parse.urlparse(url)
if parsed.hostname in self.allowed_domains:
return url
# Not allowed domain
return None
def __is_absolute_url(self, url: str) -> bool:
return url.startswith(
("http://", "https://", "http%3A%2F%2F", "https%3A%2F%2F")
)
def sanitize_next_url(url: Optional[str]) -> Optional[str]: def sanitize_next_url(url: Optional[str]) -> Optional[str]:
if not url: sanitizer = NextUrlSanitizer(ALLOWED_REDIRECT_DOMAINS)
return None return sanitizer.sanitize(url)
if url[0] != "/":
return None
return url
def query2str(query): def query2str(query):

View File

@ -171,4 +171,7 @@ DISABLE_ONBOARDING=true
# TEMP_DIR = /tmp # TEMP_DIR = /tmp
#ALIAS_AUTOMATIC_DISABLE=true #ALIAS_AUTOMATIC_DISABLE=true
# domains that can be present in the &next= section when using absolute urls
ALLOWED_REDIRECT_DOMAINS=[]

View File

@ -51,4 +51,5 @@ FACEBOOK_CLIENT_SECRET=to_fill
PGP_SENDER_PRIVATE_KEY_PATH=local_data/private-pgp.asc PGP_SENDER_PRIVATE_KEY_PATH=local_data/private-pgp.asc
ALIAS_AUTOMATIC_DISABLE=true ALIAS_AUTOMATIC_DISABLE=true
ALLOWED_REDIRECT_DOMAINS=["test.simplelogin.local"]

View File

@ -1,3 +1,4 @@
from app.config import ALLOWED_REDIRECT_DOMAINS
from app.utils import random_string, random_words, sanitize_next_url from app.utils import random_string, random_words, sanitize_next_url
@ -15,12 +16,18 @@ def test_sanitize_url():
cases = [ cases = [
{"url": None, "expected": None}, {"url": None, "expected": None},
{"url": "", "expected": None}, {"url": "", "expected": None},
{"url": "http://unknown.domain", "expected": None},
{"url": "https://badzone.org", "expected": None}, {"url": "https://badzone.org", "expected": None},
{"url": "/", "expected": "/"}, {"url": "/", "expected": "/"},
{"url": "/auth", "expected": "/auth"}, {"url": "/auth", "expected": "/auth"},
{"url": "/some/path", "expected": "/some/path"}, {"url": "/some/path", "expected": "/some/path"},
] ]
for domain in ALLOWED_REDIRECT_DOMAINS:
cases.append({"url": f"http://{domain}", "expected": f"http://{domain}"})
cases.append({"url": f"https://{domain}", "expected": f"https://{domain}"})
cases.append({"url": domain, "expected": None})
for case in cases: for case in cases:
res = sanitize_next_url(case["url"]) res = sanitize_next_url(case["url"])
assert res == case["expected"] assert res == case["expected"]