-
Notifications
You must be signed in to change notification settings - Fork 10
Open
Description
I created this MRE so that someone can help me correct this test; I tried but couldn't understand it from the documentation.
- pyproject.toml
[project]
name = "test"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"faststream[rabbit,rabbitmq]>=0.6.5",
"taskiq-faststream[otel,rabbit]>=0.4.0",
"ipdb>=0.13.13",
"pytest>=9.0.2",
"pytest-asyncio>=1.3.0",
"freezegun>=1.5.5",
]- Scheduler + test
from datetime import UTC, datetime, timedelta
import pytest
from faststream.rabbit import RabbitBroker, TestRabbitBroker
from freezegun import freeze_time
from taskiq_faststream.types import ScheduledTask
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_faststream import BrokerWrapper, StreamScheduler
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
taskiq_broker = BrokerWrapper(broker)
@broker.subscriber('test')
async def return_hello():
return 'hello'
midnight_schedule = [ScheduledTask(cron='0 0 * * *')]
scheduled_task = taskiq_broker.task(queue='test', schedule=midnight_schedule)
scheduler = StreamScheduler(
broker=taskiq_broker,
sources=[LabelScheduleSource(taskiq_broker)],
)
def get_today_midnight() -> datetime:
today = datetime.now(tz=UTC)
return today.replace(hour=0, minute=0, second=0, microsecond=0)
@pytest.mark.asyncio
async def test_task():
async with TestRabbitBroker(broker) as br:
midnight = get_today_midnight()
with freeze_time(midnight) as frozen_time:
await scheduler.startup()
next_midnight = midnight + timedelta(hours=1)
frozen_time.move_to(next_midnight)
return_hello.mock.assert_called()-
Pytest output:
E AssertionError: Expected 'mock' to have been called. -
One issue I tried was replacing the broker with taskiq_broker, but it still didn't work and shows another error.:
Pyest output:E AttributeError: <taskiq_faststream.broker.BrokerWrapper object at 0x7c554cc217f0> does not have the attribute '_channel'
Metadata
Metadata
Assignees
Labels
No labels