Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ build:
python -m build

test:
python test.py
python -m unittest
71 changes: 68 additions & 3 deletions alfred/rest/jobs/v1.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Native imports
from typing import Text
import json
from typing import Any, Dict, Text

# Project imports
from alfred.rest.jobs.typed import CreateJobDict
Expand Down Expand Up @@ -29,7 +30,7 @@ def get(self, job_id: Text):
- job_id: Unique identifier of the Job.
"""
parsed_resp, _ = self.http_client.get(f"/api/job/detail/{job_id}")
return parsed_resp
return self.__normalize_job_response(parsed_resp)

def get_all(self, page_size: int = None, current_page: int = None):
"""
Expand All @@ -45,4 +46,68 @@ def get_all(self, page_size: int = None, current_page: int = None):
if current_page:
params["currentPage"] = current_page
parsed_resp, _ = self.http_client.get("/api/job/all", params=params)
return parsed_resp
return self.__normalize_job_response(parsed_resp)

def __normalize_job_response(self, payload: Any):
"""
Normalize job payloads returned by job endpoints.

Supports direct job objects as well as wrapped responses where jobs
are returned under a `result` key.
"""
if isinstance(payload, list):
return [self.__normalize_job(item) for item in payload]

if not isinstance(payload, dict):
return payload

response = dict(payload)
result = response.get("result")

if isinstance(result, list):
response["result"] = [self.__normalize_job(item) for item in result]
return response

if isinstance(result, dict):
response["result"] = self.__normalize_job(result)
return response

return self.__normalize_job(response)

def __normalize_job(self, job: Any):
"""
Normalize a single job object and coerce its metadata to a dictionary.
"""
if not isinstance(job, dict):
return job

normalized_job = dict(job)
normalized_job["metadata"] = self.__normalize_metadata(
normalized_job.get("metadata")
)
return normalized_job

@staticmethod
def __normalize_metadata(metadata: Any) -> Dict[str, Any]:
"""
Convert upstream job metadata into a dictionary when possible.

Metadata may arrive as a JSON-encoded string, an already parsed
dictionary, or an empty/invalid value. Non-dictionary results are
normalized to an empty dictionary.
"""
if isinstance(metadata, dict):
return metadata

if not metadata or not isinstance(metadata, str):
return {}

try:
parsed_metadata = json.loads(metadata)
except (TypeError, ValueError, json.JSONDecodeError):
return {}

if isinstance(parsed_metadata, dict):
return parsed_metadata

return {}
10 changes: 0 additions & 10 deletions test.py

This file was deleted.

94 changes: 49 additions & 45 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,90 +1,94 @@
import unittest
import json
import os
import unittest
from pathlib import Path

from alfred.base.config import Configuration
from alfred.http.typed import AuthConfiguration, HttpConfiguration
from alfred.http.http_client import HttpClient
from alfred.http.typed import AuthConfiguration, HttpConfiguration
from alfred.rest.files.typed import UploadLocalFilePayload
from alfred.rest.files.v1 import Files
from alfred.rest.jobs.typed import CreateJobDict
from alfred.rest.sessions import SessionsFactory
from alfred.rest.jobs.v1 import Jobs
from alfred.rest.sessions import SessionsFactory


class TestMain(unittest.TestCase):
_alfred_api_key = os.getenv("ALFRED_API_KEY", "").strip()
_alfred_base_url = os.getenv("ALFRED_BASE_URL", "https://app.tagshelf.com").strip()
_alfred_test_file_id = os.getenv("ALFRED_TEST_FILE_ID", "").strip()
_alfred_test_upload_file = os.getenv("ALFRED_TEST_UPLOAD_FILE", "").strip()

@classmethod
def setUpClass(cls):
if not cls._alfred_api_key:
raise unittest.SkipTest(
"Set ALFRED_API_KEY to run integration tests."
)

cls._config = Configuration.v1({"base_url": cls._alfred_base_url})
cls._auth_config = AuthConfiguration(api_key=cls._alfred_api_key)
cls._http_config = HttpConfiguration({"timeout": 10})
cls._http_client = HttpClient(
cls._config.get("base_url"), cls._auth_config, cls._http_config
)
cls._session_factory = SessionsFactory.create(
cls._config.get("version", 1), cls._http_client
)

# Configuration for integration tests
_alfred_api_key = "" # Insert your API key here <--------------------
_alfred_base_url = "https://app.tagshelf.com"

if not _alfred_api_key.strip():
raise ValueError("ALFRED_API_KEY must be set for integration tests.")

# Local test configuration
_config = Configuration.v1({"base_url": _alfred_base_url})
_auth_config = AuthConfiguration(
api_key= _alfred_api_key
)
_http_config = HttpConfiguration({"timeout": 10})
_http_client = HttpClient(_config.get("base_url"), _auth_config, _http_config)
_session_factory = SessionsFactory.create(_config.get("version", 1), _http_client)

"""
To run this test you can execute the following command:
python -m unittest tests.TestMain.test_get_file
"""
def test_get_file(self):
"""
Test case for getting a file by ID.
Run with:
python -m unittest tests.TestMain.test_get_file
"""
file_id = "" # Insert your file ID here <--------------------
if not self._alfred_test_file_id:
self.skipTest("Set ALFRED_TEST_FILE_ID to run this test.")

if not file_id.strip():
raise ValueError("You must set a valid file_id to run this test.")

file_service = Files(self._http_client)

file_response = file_service.get(file_id)
file_response = file_service.get(self._alfred_test_file_id)
file_string_response = json.dumps(file_response, indent=2)

print(f"File Response: \n{file_string_response}")

"""
To run this test you can execute the following command:
python -m unittest tests.TestMain.test_upload_file
"""
def test_upload_file(self):
"""
Test case for uploading a local file.
Run with:
python -m unittest tests.TestMain.test_upload_file
"""
fileService = Files(self._http_client)
if not self._alfred_test_upload_file:
self.skipTest("Set ALFRED_TEST_UPLOAD_FILE to run this test.")

file_path = Path(self._alfred_test_upload_file)
if not file_path.exists():
self.skipTest(f"Upload file does not exist: {file_path}")

file_service = Files(self._http_client)
raw_session = self._session_factory.create()
session_id = raw_session.get("session_id")

print(f"Session ID: {session_id}")

file_path = Path(__file__).parent / "test_files" / "file-name.jpeg"
with file_path.open("rb") as file:

payload: UploadLocalFilePayload = {
"file": file,
"filename": "file-name",
"session_id": session_id
"filename": file_path.name,
"session_id": session_id,
}
upload_response = fileService.upload_file(payload)

upload_response = file_service.upload_file(payload)
upload_string_response = json.dumps(upload_response, indent=2)
print(f"File Upload Response: \n{upload_string_response}")

job_service = Jobs(self._http_client)

payload: CreateJobDict = {
"session_id": session_id,
"channel": "test"
"channel": "test",
}

job_response = job_service.create(payload)
job_string_response = json.dumps(job_response, indent=2)
print(f"Job Creation Response: \n{job_string_response}")

if __name__ == '__main__':

if __name__ == "__main__":
unittest.main()
96 changes: 96 additions & 0 deletions tests/test_smoke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import unittest

from alfred.rest.jobs.v1 import Jobs


class FakeHttpClient:
def __init__(self, post_response=None, get_response=None):
self.post_response = post_response
self.get_response = get_response

def post(self, *_args, **_kwargs):
return self.post_response, None

def get(self, *_args, **_kwargs):
return self.get_response, None


class TestJobsSmoke(unittest.TestCase):
def test_create_returns_raw_job_creation_response(self):
jobs = Jobs(
FakeHttpClient(
post_response={
"id": "job-1",
}
)
)

result = jobs.create({})

self.assertEqual(result, {"id": "job-1"})

def test_get_normalizes_invalid_job_metadata_to_empty_dict(self):
jobs = Jobs(
FakeHttpClient(
get_response={
"id": "job-1",
"metadata": "not-json",
}
)
)

result = jobs.get("job-1")

self.assertEqual(result.get("metadata"), {})

def test_get_normalizes_wrapped_single_job_metadata(self):
jobs = Jobs(
FakeHttpClient(
get_response={
"result": {
"id": "job-1",
"metadata": "{\"metadata_key\": \"metadata_value\"}",
}
}
)
)

result = jobs.get("job-1")

self.assertEqual(
result.get("result"),
{"id": "job-1", "metadata": {"metadata_key": "metadata_value"}},
)

def test_get_all_normalizes_metadata_in_paginated_results(self):
jobs = Jobs(
FakeHttpClient(
get_response={
"result": [
{
"id": "job-1",
"metadata": "{\"metadata_key\": \"metadata_value\"}",
},
{
"id": "job-2",
"metadata": "",
},
],
"total": 2,
}
)
)

result = jobs.get_all()

self.assertEqual(
result.get("result"),
[
{"id": "job-1", "metadata": {"metadata_key": "metadata_value"}},
{"id": "job-2", "metadata": {}},
],
)


if __name__ == "__main__":
unittest.main()
Loading