Store hmaced partner api tokens (#1028)

* Store hmaced partner api tokens

* MR comments
This commit is contained in:
Carlos Quintana 2022-06-02 11:24:04 +02:00 committed by GitHub
parent 7ba9bcb9e2
commit dba56f0dae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 107 additions and 1 deletions

View File

@ -473,3 +473,6 @@ def setup_nameservers():
NAMESERVERS = setup_nameservers()
DISABLE_CREATE_CONTACTS_FOR_FREE_USERS = False
PARTNER_API_TOKEN_SECRET = os.environ.get("PARTNER_API_TOKEN_SECRET") or (
FLASK_SECRET + "partnerapitoken"
)

View File

@ -1,4 +1,9 @@
from __future__ import annotations
import base64
import enum
import hashlib
import hmac
import os
import random
import uuid
@ -18,6 +23,7 @@ from sqlalchemy import text, desc, CheckConstraint, Index, Column
from sqlalchemy.dialects.postgresql import TSVECTOR
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import deferred
from sqlalchemy.sql import and_
from sqlalchemy_utils import ArrowType
from app import s3
@ -37,6 +43,7 @@ from app.config import (
MAX_NB_DIRECTORY,
ROOT_DIR,
NOREPLY,
PARTNER_API_TOKEN_SECRET,
)
from app.db import Session
from app.errors import (
@ -3075,16 +3082,56 @@ class Partner(Base, ModelMixin):
name = sa.Column(sa.String(128), unique=True, nullable=False)
contact_email = sa.Column(sa.String(128), unique=True, nullable=False)
@staticmethod
def find_by_token(token: str) -> Optional[Partner]:
hmaced = PartnerApiToken.hmac_token(token)
res = (
Session.query(Partner, PartnerApiToken)
.filter(
and_(
PartnerApiToken.token == hmaced,
Partner.id == PartnerApiToken.partner_id,
)
)
.first()
)
if res:
partner, partner_api_token = res
return partner
return None
class PartnerApiToken(Base, ModelMixin):
__tablename__ = "partner_api_token"
token = sa.Column(sa.String(32), unique=True, nullable=False, index=True)
token = sa.Column(sa.String(50), unique=True, nullable=False, index=True)
partner_id = sa.Column(
sa.ForeignKey("partner.id", ondelete="cascade"), nullable=False, index=True
)
expiration_time = sa.Column(ArrowType, unique=False, nullable=True)
@staticmethod
def generate(
partner_id: int, expiration_time: Optional[ArrowType]
) -> Tuple[PartnerApiToken, str]:
raw_token = random_string(32)
encoded = PartnerApiToken.hmac_token(raw_token)
instance = PartnerApiToken.create(
token=encoded, partner_id=partner_id, expiration_time=expiration_time
)
return instance, raw_token
@staticmethod
def hmac_token(token: str) -> str:
as_str = base64.b64encode(
hmac.new(
PARTNER_API_TOKEN_SECRET.encode("utf-8"),
token.encode("utf-8"),
hashlib.sha3_256,
).digest()
).decode("utf-8")
return as_str.rstrip("=")
class PartnerUser(Base, ModelMixin):
__tablename__ = "partner_user"

View File

@ -186,3 +186,4 @@ ALLOWED_REDIRECT_DOMAINS=[]
# DNS nameservers to be used by the app
# Multiple nameservers can be specified, separated by ','
NAMESERVERS="1.1.1.1"
PARTNER_API_TOKEN_SECRET="changeme"

View File

@ -0,0 +1,30 @@
"""update partner_api_token token length
Revision ID: 2b1d3cd93e4b
Revises: 088f23324464
Create Date: 2022-05-25 16:43:33.017076
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '2b1d3cd93e4b'
down_revision = '088f23324464'
branch_labels = None
depends_on = None
def upgrade():
op.alter_column('partner_api_token', 'token',
existing_type=sa.String(length=32),
type_=sa.String(length=50),
nullable=False)
def downgrade():
op.alter_column('partner_api_token', 'token',
existing_type=sa.String(length=50),
type_=sa.String(length=32),
nullable=False)

View File

@ -0,0 +1,25 @@
from app.models import Partner, PartnerApiToken
from app.utils import random_string
def test_generate_partner_api_token(flask_client):
partner = Partner.create(
name=random_string(10),
contact_email="{s}@{s}.com".format(s=random_string(10)),
commit=True,
)
partner_api_token, token = PartnerApiToken.generate(partner.id, None)
assert token is not None
assert len(token) > 0
assert partner_api_token.partner_id == partner.id
assert partner_api_token.expiration_time is None
hmaced = PartnerApiToken.hmac_token(token)
assert hmaced == partner_api_token.token
retrieved_partner = Partner.find_by_token(token)
assert retrieved_partner is not None
assert retrieved_partner.id == partner.id