Skip to content

Question: How can I test my scheduler correctly? #129

@azmovi

Description

@azmovi

I created this MRE so that someone can help me correct this test; I tried but couldn't understand it from the documentation.

  • pyproject.toml
[project]
name = "test"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
    "faststream[rabbit,rabbitmq]>=0.6.5",
    "taskiq-faststream[otel,rabbit]>=0.4.0",
    "ipdb>=0.13.13",
    "pytest>=9.0.2",
    "pytest-asyncio>=1.3.0",
    "freezegun>=1.5.5",
]
  • Scheduler + test
from datetime import UTC, datetime, timedelta
import pytest
from faststream.rabbit import RabbitBroker, TestRabbitBroker
from freezegun import freeze_time
from taskiq_faststream.types import ScheduledTask
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_faststream import BrokerWrapper, StreamScheduler


broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
taskiq_broker = BrokerWrapper(broker)


@broker.subscriber('test')
async def return_hello():
    return 'hello'

midnight_schedule = [ScheduledTask(cron='0 0 * * *')]
scheduled_task = taskiq_broker.task(queue='test', schedule=midnight_schedule)

scheduler = StreamScheduler(
    broker=taskiq_broker,
    sources=[LabelScheduleSource(taskiq_broker)],
)

def get_today_midnight() -> datetime:
    today = datetime.now(tz=UTC)
    return today.replace(hour=0, minute=0, second=0, microsecond=0)



@pytest.mark.asyncio
async def test_task():
    async with TestRabbitBroker(broker) as br:
        midnight = get_today_midnight()
        with freeze_time(midnight) as frozen_time:
            await scheduler.startup()
            next_midnight = midnight + timedelta(hours=1)
            frozen_time.move_to(next_midnight)
            return_hello.mock.assert_called()
  • Pytest output: E AssertionError: Expected 'mock' to have been called.

  • One issue I tried was replacing the broker with taskiq_broker, but it still didn't work and shows another error.:
    Pyest output: E AttributeError: <taskiq_faststream.broker.BrokerWrapper object at 0x7c554cc217f0> does not have the attribute '_channel'

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions