2024-10-17 11:16:33 +02:00
|
|
|
import arrow
|
|
|
|
|
2024-09-10 11:05:24 +02:00
|
|
|
from app import config, alias_utils
|
|
|
|
from app.db import Session
|
|
|
|
from app.events.event_dispatcher import GlobalDispatcher
|
2024-10-17 11:16:33 +02:00
|
|
|
from app.models import Alias, SyncEvent
|
2024-09-10 11:05:24 +02:00
|
|
|
from tests.utils import random_token
|
|
|
|
from .event_test_utils import (
|
|
|
|
OnMemoryDispatcher,
|
|
|
|
_create_linked_user,
|
2024-10-04 15:17:59 +02:00
|
|
|
_get_event_from_string,
|
2024-09-10 11:05:24 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
on_memory_dispatcher = OnMemoryDispatcher()
|
|
|
|
|
|
|
|
|
|
|
|
def setup_module():
|
|
|
|
GlobalDispatcher.set_dispatcher(on_memory_dispatcher)
|
|
|
|
config.EVENT_WEBHOOK = "http://test"
|
|
|
|
|
|
|
|
|
|
|
|
def teardown_module():
|
|
|
|
GlobalDispatcher.set_dispatcher(None)
|
|
|
|
config.EVENT_WEBHOOK = None
|
|
|
|
|
|
|
|
|
|
|
|
def setup_function(func):
|
|
|
|
on_memory_dispatcher.clear()
|
|
|
|
|
|
|
|
|
2024-10-17 11:16:33 +02:00
|
|
|
def test_event_taken_updates():
|
|
|
|
event = SyncEvent.create(content="test".encode("utf-8"), flush=True)
|
|
|
|
assert event.taken_time is None
|
|
|
|
assert event.mark_as_taken()
|
|
|
|
assert event.taken_time is not None
|
|
|
|
|
|
|
|
|
|
|
|
def test_event_mark_as_taken_does_nothing_for_taken_events():
|
|
|
|
now = arrow.utcnow()
|
|
|
|
event = SyncEvent.create(content="test".encode("utf-8"), taken_time=now, flush=True)
|
|
|
|
assert not event.mark_as_taken()
|
|
|
|
|
|
|
|
|
|
|
|
def test_event_mark_as_taken_does_nothing_for_not_before_events():
|
|
|
|
now = arrow.utcnow()
|
|
|
|
event = SyncEvent.create(content="test".encode("utf-8"), taken_time=now, flush=True)
|
|
|
|
older_than = now.shift(minutes=-1)
|
|
|
|
assert not event.mark_as_taken(allow_taken_older_than=older_than)
|
|
|
|
|
|
|
|
|
|
|
|
def test_event_mark_as_taken_works_for_before_events():
|
|
|
|
now = arrow.utcnow()
|
|
|
|
event = SyncEvent.create(content="test".encode("utf-8"), taken_time=now, flush=True)
|
|
|
|
older_than = now.shift(minutes=+1)
|
|
|
|
assert event.mark_as_taken(allow_taken_older_than=older_than)
|
|
|
|
|
|
|
|
|
2024-09-10 11:05:24 +02:00
|
|
|
def test_fire_event_on_alias_creation():
|
|
|
|
(user, pu) = _create_linked_user()
|
|
|
|
alias = Alias.create_new_random(user)
|
|
|
|
Session.flush()
|
|
|
|
assert len(on_memory_dispatcher.memory) == 1
|
|
|
|
event_data = on_memory_dispatcher.memory[0]
|
|
|
|
event_content = _get_event_from_string(event_data, user, pu)
|
|
|
|
assert event_content.alias_created is not None
|
|
|
|
alias_created = event_content.alias_created
|
2024-09-13 14:25:38 +02:00
|
|
|
assert alias.id == alias_created.id
|
|
|
|
assert alias.email == alias_created.email
|
|
|
|
assert "" == alias_created.note
|
2024-09-10 11:05:24 +02:00
|
|
|
assert alias.enabled == alias_created.enabled
|
2024-09-13 13:39:41 +02:00
|
|
|
assert int(alias.created_at.timestamp) == alias_created.created_at
|
2024-09-10 11:05:24 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_fire_event_on_alias_creation_with_note():
|
|
|
|
(user, pu) = _create_linked_user()
|
|
|
|
note = random_token(10)
|
|
|
|
alias = Alias.create_new_random(user, note=note)
|
|
|
|
Session.flush()
|
|
|
|
assert len(on_memory_dispatcher.memory) == 1
|
|
|
|
event_data = on_memory_dispatcher.memory[0]
|
|
|
|
event_content = _get_event_from_string(event_data, user, pu)
|
|
|
|
assert event_content.alias_created is not None
|
|
|
|
alias_created = event_content.alias_created
|
2024-09-13 14:25:38 +02:00
|
|
|
assert alias.id == alias_created.id
|
|
|
|
assert alias.email == alias_created.email
|
|
|
|
assert note == alias_created.note
|
2024-09-10 11:05:24 +02:00
|
|
|
assert alias.enabled == alias_created.enabled
|
|
|
|
|
|
|
|
|
|
|
|
def test_fire_event_on_alias_deletion():
|
|
|
|
(user, pu) = _create_linked_user()
|
|
|
|
alias = Alias.create_new_random(user)
|
|
|
|
alias_id = alias.id
|
|
|
|
Session.flush()
|
|
|
|
on_memory_dispatcher.clear()
|
|
|
|
alias_utils.delete_alias(alias, user)
|
|
|
|
assert len(on_memory_dispatcher.memory) == 1
|
|
|
|
event_data = on_memory_dispatcher.memory[0]
|
|
|
|
event_content = _get_event_from_string(event_data, user, pu)
|
|
|
|
assert event_content.alias_deleted is not None
|
|
|
|
alias_deleted = event_content.alias_deleted
|
2024-09-13 14:25:38 +02:00
|
|
|
assert alias_id == alias_deleted.id
|
|
|
|
assert alias.email == alias_deleted.email
|
2024-09-10 11:05:24 +02:00
|
|
|
|
|
|
|
|
|
|
|
def test_fire_event_on_alias_status_change():
|
|
|
|
(user, pu) = _create_linked_user()
|
|
|
|
alias = Alias.create_new_random(user)
|
|
|
|
Session.flush()
|
|
|
|
on_memory_dispatcher.clear()
|
2024-10-14 12:45:00 +02:00
|
|
|
alias_utils.change_alias_status(alias, enabled=True)
|
2024-09-10 11:05:24 +02:00
|
|
|
assert len(on_memory_dispatcher.memory) == 1
|
|
|
|
event_data = on_memory_dispatcher.memory[0]
|
|
|
|
event_content = _get_event_from_string(event_data, user, pu)
|
|
|
|
assert event_content.alias_status_change is not None
|
|
|
|
event = event_content.alias_status_change
|
2024-09-13 14:25:38 +02:00
|
|
|
assert alias.id == event.id
|
|
|
|
assert alias.email == event.email
|
|
|
|
assert int(alias.created_at.timestamp) == event.created_at
|
2024-09-10 11:05:24 +02:00
|
|
|
assert event.enabled
|