| | """ |
| | SPARKNET API Integration Tests - Phase 1B |
| | |
| | Comprehensive test suite for REST API endpoints: |
| | - Document API (/api/documents) |
| | - RAG API (/api/rag) |
| | - Auth API (/api/auth) |
| | - Health/Status endpoints |
| | |
| | Uses FastAPI TestClient for synchronous testing without running the server. |
| | """ |
| |
|
| | import pytest |
| | import json |
| | import io |
| | import os |
| | import sys |
| | from pathlib import Path |
| | from typing import Dict, Any, Optional |
| | from unittest.mock import patch, MagicMock, AsyncMock |
| |
|
| | |
| | sys.path.insert(0, str(Path(__file__).parent.parent.parent)) |
| |
|
| | from fastapi.testclient import TestClient |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @pytest.fixture(scope="module") |
| | def mock_components(): |
| | """Mock SPARKNET components for testing.""" |
| | |
| | mock_embeddings = MagicMock() |
| | mock_embeddings.embed_documents = MagicMock(return_value=[[0.1] * 1024]) |
| | mock_embeddings.embed_query = MagicMock(return_value=[0.1] * 1024) |
| |
|
| | mock_store = MagicMock() |
| | mock_store._collection = MagicMock() |
| | mock_store._collection.count = MagicMock(return_value=100) |
| | mock_store.search = MagicMock(return_value=[]) |
| | mock_store.add_documents = MagicMock(return_value=["doc_1"]) |
| |
|
| | mock_llm_client = MagicMock() |
| | mock_llm_client.generate = MagicMock(return_value="Mock response") |
| | mock_llm_client.get_llm = MagicMock(return_value=MagicMock()) |
| |
|
| | mock_workflow = MagicMock() |
| | mock_workflow.run = AsyncMock(return_value={ |
| | "response": "Test response", |
| | "sources": [], |
| | "confidence": 0.9 |
| | }) |
| |
|
| | return { |
| | "embeddings": mock_embeddings, |
| | "store": mock_store, |
| | "llm_client": mock_llm_client, |
| | "workflow": mock_workflow, |
| | } |
| |
|
| |
|
| | @pytest.fixture(scope="module") |
| | def client(mock_components): |
| | """Create TestClient with mocked dependencies.""" |
| | |
| | with patch.dict("api.main.app_state", { |
| | "start_time": 1000000, |
| | "embeddings": mock_components["embeddings"], |
| | "store": mock_components["store"], |
| | "llm_client": mock_components["llm_client"], |
| | "workflow": mock_components["workflow"], |
| | "rag_ready": True, |
| | "workflows": {}, |
| | "patents": {}, |
| | "planner": MagicMock(), |
| | "critic": MagicMock(), |
| | "memory": MagicMock(), |
| | "vision_ocr": None, |
| | }): |
| | from api.main import app |
| | with TestClient(app) as test_client: |
| | yield test_client |
| |
|
| |
|
| | @pytest.fixture |
| | def auth_headers(client) -> Dict[str, str]: |
| | """Get authentication headers with valid token.""" |
| | |
| | response = client.post( |
| | "/api/auth/token", |
| | data={"username": "admin", "password": "admin123"} |
| | ) |
| |
|
| | if response.status_code == 200: |
| | token = response.json()["access_token"] |
| | return {"Authorization": f"Bearer {token}"} |
| |
|
| | |
| | return {} |
| |
|
| |
|
| | @pytest.fixture |
| | def sample_pdf_file(): |
| | """Create a sample PDF file for upload tests.""" |
| | |
| | pdf_content = b"""%PDF-1.4 |
| | 1 0 obj << /Type /Catalog /Pages 2 0 R >> endobj |
| | 2 0 obj << /Type /Pages /Kids [3 0 R] /Count 1 >> endobj |
| | 3 0 obj << /Type /Page /Parent 2 0 R /MediaBox [0 0 612 792] /Contents 4 0 R >> endobj |
| | 4 0 obj << /Length 44 >> stream |
| | BT /F1 12 Tf 100 700 Td (Test Document) Tj ET |
| | endstream endobj |
| | xref |
| | 0 5 |
| | 0000000000 65535 f |
| | 0000000009 00000 n |
| | 0000000058 00000 n |
| | 0000000115 00000 n |
| | 0000000214 00000 n |
| | trailer << /Size 5 /Root 1 0 R >> |
| | startxref |
| | 306 |
| | %%EOF""" |
| | return io.BytesIO(pdf_content) |
| |
|
| |
|
| | @pytest.fixture |
| | def sample_text_file(): |
| | """Create a sample text file for upload tests.""" |
| | content = b"""SPARKNET Test Document |
| | |
| | This is a sample document for testing the document processing pipeline. |
| | |
| | ## Section 1: Introduction |
| | The SPARKNET framework provides AI-powered document intelligence. |
| | |
| | ## Section 2: Features |
| | - Multi-agent RAG pipeline |
| | - Table extraction |
| | - Evidence grounding |
| | |
| | ## Section 3: Conclusion |
| | This document tests the upload and processing functionality. |
| | """ |
| | return io.BytesIO(content) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestHealthEndpoints: |
| | """Test health and status endpoints.""" |
| |
|
| | def test_root_endpoint(self, client): |
| | """Test root endpoint returns service info.""" |
| | response = client.get("/") |
| | assert response.status_code == 200 |
| |
|
| | data = response.json() |
| | assert data["status"] == "operational" |
| | assert data["service"] == "SPARKNET API" |
| | assert "version" in data |
| |
|
| | def test_health_endpoint(self, client): |
| | """Test health endpoint returns component status.""" |
| | response = client.get("/api/health") |
| | assert response.status_code == 200 |
| |
|
| | data = response.json() |
| | assert "status" in data |
| | assert "components" in data |
| | assert "statistics" in data |
| | assert "uptime_seconds" in data |
| |
|
| | |
| | components = data["components"] |
| | expected_keys = ["rag", "embeddings", "vector_store", "llm_client"] |
| | for key in expected_keys: |
| | assert key in components |
| |
|
| | def test_status_endpoint(self, client): |
| | """Test status endpoint returns comprehensive info.""" |
| | response = client.get("/api/status") |
| | assert response.status_code == 200 |
| |
|
| | data = response.json() |
| | assert data["status"] == "operational" |
| | assert "statistics" in data |
| | assert "models" in data |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestAuthEndpoints: |
| | """Test authentication endpoints.""" |
| |
|
| | def test_get_token_valid_credentials(self, client): |
| | """Test token generation with valid credentials.""" |
| | response = client.post( |
| | "/api/auth/token", |
| | data={"username": "admin", "password": "admin123"} |
| | ) |
| |
|
| | |
| | if response.status_code == 200: |
| | data = response.json() |
| | assert "access_token" in data |
| | assert data["token_type"] == "bearer" |
| |
|
| | def test_get_token_invalid_credentials(self, client): |
| | """Test token generation fails with invalid credentials.""" |
| | response = client.post( |
| | "/api/auth/token", |
| | data={"username": "invalid", "password": "wrong"} |
| | ) |
| | assert response.status_code in [401, 500] |
| |
|
| | def test_get_current_user(self, client, auth_headers): |
| | """Test getting current user info.""" |
| | if not auth_headers: |
| | pytest.skip("Auth not available") |
| |
|
| | response = client.get("/api/auth/me", headers=auth_headers) |
| | assert response.status_code == 200 |
| |
|
| | data = response.json() |
| | assert "username" in data |
| |
|
| | def test_protected_endpoint_without_token(self, client): |
| | """Test that protected endpoints require authentication.""" |
| | response = client.get("/api/auth/me") |
| | assert response.status_code == 401 |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestDocumentEndpoints: |
| | """Test document management endpoints.""" |
| |
|
| | def test_list_documents_empty(self, client): |
| | """Test listing documents when none exist.""" |
| | response = client.get("/api/documents") |
| | assert response.status_code == 200 |
| |
|
| | data = response.json() |
| | assert isinstance(data, list) |
| |
|
| | def test_upload_text_document(self, client, sample_text_file): |
| | """Test uploading a text document.""" |
| | response = client.post( |
| | "/api/documents/upload", |
| | files={"file": ("test.txt", sample_text_file, "text/plain")} |
| | ) |
| |
|
| | assert response.status_code == 200 |
| | data = response.json() |
| |
|
| | assert "document_id" in data |
| | assert data["filename"] == "test.txt" |
| | assert data["status"] in ["uploaded", "processing", "processed"] |
| |
|
| | def test_upload_pdf_document(self, client, sample_pdf_file): |
| | """Test uploading a PDF document.""" |
| | response = client.post( |
| | "/api/documents/upload", |
| | files={"file": ("test.pdf", sample_pdf_file, "application/pdf")} |
| | ) |
| |
|
| | assert response.status_code == 200 |
| | data = response.json() |
| |
|
| | assert "document_id" in data |
| | assert data["filename"] == "test.pdf" |
| |
|
| | def test_upload_unsupported_format(self, client): |
| | """Test uploading unsupported file format is rejected.""" |
| | fake_file = io.BytesIO(b"fake executable content") |
| |
|
| | response = client.post( |
| | "/api/documents/upload", |
| | files={"file": ("test.exe", fake_file, "application/octet-stream")} |
| | ) |
| |
|
| | |
| | assert response.status_code in [400, 415] |
| |
|
| | def test_get_document_not_found(self, client): |
| | """Test getting non-existent document returns 404.""" |
| | response = client.get("/api/documents/nonexistent_id") |
| | assert response.status_code == 404 |
| |
|
| | def test_document_workflow(self, client, sample_text_file): |
| | """Test complete document workflow: upload -> process -> index.""" |
| | |
| | upload_response = client.post( |
| | "/api/documents/upload", |
| | files={"file": ("workflow_test.txt", sample_text_file, "text/plain")} |
| | ) |
| | assert upload_response.status_code == 200 |
| | doc_id = upload_response.json()["document_id"] |
| |
|
| | |
| | detail_response = client.get(f"/api/documents/{doc_id}/detail") |
| | assert detail_response.status_code == 200 |
| |
|
| | |
| | chunks_response = client.get(f"/api/documents/{doc_id}/chunks") |
| | assert chunks_response.status_code == 200 |
| |
|
| | |
| | index_response = client.post(f"/api/documents/{doc_id}/index") |
| | |
| | assert index_response.status_code in [200, 400, 422] |
| |
|
| | |
| | delete_response = client.delete(f"/api/documents/{doc_id}") |
| | assert delete_response.status_code == 200 |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestRAGEndpoints: |
| | """Test RAG query and search endpoints.""" |
| |
|
| | def test_rag_query_basic(self, client): |
| | """Test basic RAG query endpoint.""" |
| | response = client.post( |
| | "/api/rag/query", |
| | json={ |
| | "query": "What is SPARKNET?", |
| | "max_sources": 5 |
| | } |
| | ) |
| |
|
| | |
| | assert response.status_code in [200, 500, 503] |
| |
|
| | if response.status_code == 200: |
| | data = response.json() |
| | assert "response" in data or "error" in data |
| |
|
| | def test_rag_query_with_filters(self, client): |
| | """Test RAG query with document filters.""" |
| | response = client.post( |
| | "/api/rag/query", |
| | json={ |
| | "query": "Test query", |
| | "document_ids": ["doc_1", "doc_2"], |
| | "max_sources": 3, |
| | "min_confidence": 0.5 |
| | } |
| | ) |
| |
|
| | assert response.status_code in [200, 500, 503] |
| |
|
| | def test_rag_search_semantic(self, client): |
| | """Test semantic search without synthesis.""" |
| | response = client.post( |
| | "/api/rag/search", |
| | json={ |
| | "query": "document processing", |
| | "top_k": 10 |
| | } |
| | ) |
| |
|
| | assert response.status_code in [200, 500, 503] |
| |
|
| | if response.status_code == 200: |
| | data = response.json() |
| | assert "results" in data or "error" in data |
| |
|
| | def test_rag_store_status(self, client): |
| | """Test getting vector store status.""" |
| | response = client.get("/api/rag/store/status") |
| |
|
| | assert response.status_code in [200, 500] |
| |
|
| | if response.status_code == 200: |
| | data = response.json() |
| | assert "status" in data |
| |
|
| | def test_rag_cache_stats(self, client): |
| | """Test getting cache statistics.""" |
| | response = client.get("/api/rag/cache/stats") |
| |
|
| | assert response.status_code in [200, 404, 500] |
| |
|
| | def test_rag_query_empty_query(self, client): |
| | """Test that empty query is rejected.""" |
| | response = client.post( |
| | "/api/rag/query", |
| | json={"query": ""} |
| | ) |
| |
|
| | |
| | assert response.status_code == 422 |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestDocumentProcessing: |
| | """Test document processing functionality.""" |
| |
|
| | def test_process_document_endpoint(self, client, sample_text_file): |
| | """Test triggering document processing.""" |
| | |
| | upload_response = client.post( |
| | "/api/documents/upload", |
| | files={"file": ("process_test.txt", sample_text_file, "text/plain")} |
| | ) |
| |
|
| | if upload_response.status_code != 200: |
| | pytest.skip("Upload failed") |
| |
|
| | doc_id = upload_response.json()["document_id"] |
| |
|
| | |
| | process_response = client.post(f"/api/documents/{doc_id}/process") |
| | assert process_response.status_code in [200, 202, 400] |
| |
|
| | def test_batch_index_documents(self, client): |
| | """Test batch indexing multiple documents.""" |
| | response = client.post( |
| | "/api/documents/batch-index", |
| | json={"document_ids": ["doc_1", "doc_2", "doc_3"]} |
| | ) |
| |
|
| | |
| | assert response.status_code in [200, 400, 404] |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestErrorHandling: |
| | """Test API error handling.""" |
| |
|
| | def test_invalid_json_body(self, client): |
| | """Test handling of invalid JSON in request body.""" |
| | response = client.post( |
| | "/api/rag/query", |
| | content="not valid json", |
| | headers={"Content-Type": "application/json"} |
| | ) |
| |
|
| | assert response.status_code == 422 |
| |
|
| | def test_missing_required_fields(self, client): |
| | """Test handling of missing required fields.""" |
| | response = client.post( |
| | "/api/rag/query", |
| | json={} |
| | ) |
| |
|
| | assert response.status_code == 422 |
| |
|
| | def test_invalid_document_id_format(self, client): |
| | """Test handling of various document ID formats.""" |
| | |
| | response = client.get("/api/documents/../../etc/passwd") |
| | assert response.status_code in [400, 404] |
| |
|
| | |
| | long_id = "a" * 1000 |
| | response = client.get(f"/api/documents/{long_id}") |
| | assert response.status_code in [400, 404] |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestConcurrency: |
| | """Test concurrent request handling.""" |
| |
|
| | def test_multiple_health_checks(self, client): |
| | """Test multiple concurrent health checks.""" |
| | import concurrent.futures |
| |
|
| | def make_request(): |
| | return client.get("/api/health") |
| |
|
| | with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: |
| | futures = [executor.submit(make_request) for _ in range(10)] |
| | results = [f.result() for f in futures] |
| |
|
| | |
| | assert all(r.status_code == 200 for r in results) |
| |
|
| | def test_multiple_document_uploads(self, client): |
| | """Test handling multiple simultaneous uploads.""" |
| | import concurrent.futures |
| |
|
| | def upload_file(i): |
| | content = f"Test content {i}".encode() |
| | file = io.BytesIO(content) |
| | return client.post( |
| | "/api/documents/upload", |
| | files={"file": (f"test_{i}.txt", file, "text/plain")} |
| | ) |
| |
|
| | with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: |
| | futures = [executor.submit(upload_file, i) for i in range(5)] |
| | results = [f.result() for f in futures] |
| |
|
| | |
| | assert all(r.status_code in [200, 500] for r in results) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class TestIntegrationWorkflows: |
| | """Test end-to-end integration workflows.""" |
| |
|
| | def test_document_to_rag_query_workflow(self, client, sample_text_file): |
| | """Test complete workflow from document upload to RAG query.""" |
| | |
| | upload_response = client.post( |
| | "/api/documents/upload", |
| | files={"file": ("integration_test.txt", sample_text_file, "text/plain")} |
| | ) |
| |
|
| | if upload_response.status_code != 200: |
| | pytest.skip("Upload failed, skipping workflow test") |
| |
|
| | doc_id = upload_response.json()["document_id"] |
| |
|
| | |
| | get_response = client.get(f"/api/documents/{doc_id}") |
| | assert get_response.status_code == 200 |
| |
|
| | |
| | index_response = client.post(f"/api/documents/{doc_id}/index") |
| | |
| | if index_response.status_code != 200: |
| | pytest.skip("Indexing not available") |
| |
|
| | |
| | query_response = client.post( |
| | "/api/rag/query", |
| | json={ |
| | "query": "What does this document contain?", |
| | "document_ids": [doc_id] |
| | } |
| | ) |
| |
|
| | assert query_response.status_code in [200, 500, 503] |
| |
|
| | |
| | client.delete(f"/api/documents/{doc_id}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @pytest.mark.slow |
| | class TestPerformance: |
| | """Performance tests (marked as slow).""" |
| |
|
| | def test_large_document_upload(self, client): |
| | """Test uploading a larger document.""" |
| | |
| | large_content = b"Test content line\n" * 60000 |
| | large_file = io.BytesIO(large_content) |
| |
|
| | response = client.post( |
| | "/api/documents/upload", |
| | files={"file": ("large_test.txt", large_file, "text/plain")} |
| | ) |
| |
|
| | |
| | assert response.status_code in [200, 413] |
| |
|
| | def test_rapid_query_requests(self, client): |
| | """Test handling rapid consecutive queries.""" |
| | import time |
| |
|
| | start = time.time() |
| | responses = [] |
| |
|
| | for i in range(20): |
| | response = client.post( |
| | "/api/rag/query", |
| | json={"query": f"Test query {i}"} |
| | ) |
| | responses.append(response) |
| |
|
| | elapsed = time.time() - start |
| |
|
| | |
| | assert elapsed < 30 |
| |
|
| | |
| | success_count = sum(1 for r in responses if r.status_code in [200, 500, 503]) |
| | assert success_count >= len(responses) * 0.8 |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | if __name__ == "__main__": |
| | pytest.main([__file__, "-v", "--tb=short"]) |
| |
|