import asyncio import os import uuid import pytest from fastapi.datastructures import UploadFile from r2r import ( Document, KVLoggingSingleton, R2RConfig, R2REngine, R2RPipeFactory, R2RPipelineFactory, R2RProviderFactory, VectorSearchSettings, generate_id_from_label, ) from r2r.base.abstractions.llm import GenerationConfig @pytest.fixture(scope="session", autouse=True) def event_loop_policy(): asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) @pytest.fixture(scope="function") def event_loop(): loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close() asyncio.set_event_loop(None) @pytest.fixture(scope="session", autouse=True) async def cleanup_tasks(): yield for task in asyncio.all_tasks(): if task is not asyncio.current_task(): task.cancel() try: await task except asyncio.CancelledError: pass @pytest.fixture(scope="function") def app(request): config = R2RConfig.from_json() config.logging.provider = "local" config.logging.logging_path = uuid.uuid4().hex vector_db_provider = request.param if vector_db_provider == "pgvector": config.vector_database.provider = "pgvector" config.vector_database.extra_fields["vecs_collection"] = ( config.logging.logging_path ) try: providers = R2RProviderFactory(config).create_providers() pipes = R2RPipeFactory(config, providers).create_pipes() pipelines = R2RPipelineFactory(config, pipes).create_pipelines() r2r = R2REngine( config=config, providers=providers, pipelines=pipelines, ) try: KVLoggingSingleton.configure(config.logging) except: KVLoggingSingleton._config.logging_path = ( config.logging.logging_path ) yield r2r finally: if os.path.exists(config.logging.logging_path): os.remove(config.logging.logging_path) @pytest.fixture def logging_connection(): return KVLoggingSingleton() @pytest.mark.parametrize("app", ["pgvector"], indirect=True) @pytest.mark.asyncio async def test_ingest_txt_document(app, logging_connection): try: await app.aingest_documents( [ Document( id=generate_id_from_label("doc_1"), data="The quick brown fox jumps over the lazy dog.", type="txt", metadata={"author": "John Doe"}, ), ] ) except asyncio.CancelledError: pass @pytest.mark.parametrize("app", ["pgvector"], indirect=True) @pytest.mark.asyncio async def test_ingest_txt_file(app, logging_connection): try: # Prepare the test data metadata = {"author": "John Doe"} files = [ UploadFile( filename="test.txt", file=open( os.path.join( os.path.dirname(__file__), "..", "r2r", "examples", "data", "test.txt", ), "rb", ), ) ] # Set file size manually for file in files: file.file.seek(0, 2) # Move to the end of the file file.size = file.file.tell() # Get the file size file.file.seek(0) # Move back to the start of the file await app.aingest_files(metadatas=[metadata], files=files) except asyncio.CancelledError: pass @pytest.mark.parametrize("app", ["pgvector"], indirect=True) @pytest.mark.asyncio async def test_ingest_search_txt_file(app, logging_connection): try: # Prepare the test data metadata = {} files = [ UploadFile( filename="aristotle.txt", file=open( os.path.join( os.path.dirname(__file__), "..", "r2r", "examples", "data", "aristotle.txt", ), "rb", ), ), ] # Set file size manually for file in files: file.file.seek(0, 2) # Move to the end of the file file.size = file.file.tell() # Get the file size file.file.seek(0) # Move back to the start of the file await app.aingest_files(metadatas=[metadata], files=files) search_results = await app.asearch("who was aristotle?") assert len(search_results["vector_search_results"]) == 10 assert ( "was an Ancient Greek philosopher and polymath" in search_results["vector_search_results"][0]["metadata"]["text"] ) search_results = await app.asearch( "who was aristotle?", vector_search_settings=VectorSearchSettings(search_limit=20), ) assert len(search_results["vector_search_results"]) == 20 assert ( "was an Ancient Greek philosopher and polymath" in search_results["vector_search_results"][0]["metadata"]["text"] ) run_info = await logging_connection.get_run_info( log_type_filter="search" ) assert len(run_info) == 2, f"Expected 2 runs, but got {len(run_info)}" logs = await logging_connection.get_logs( [run.run_id for run in run_info], 100 ) assert len(logs) == 6, f"Expected 6 logs, but got {len(logs)}" ## test stream response = await app.arag( query="Who was aristotle?", rag_generation_config=GenerationConfig( **{"model": "gpt-3.5-turbo", "stream": True} ), ) collector = "" async for chunk in response: collector += chunk assert "Aristotle" in collector assert "Greek" in collector assert "philosopher" in collector assert "polymath" in collector assert "Ancient" in collector except asyncio.CancelledError: pass @pytest.mark.parametrize("app", ["pgvector"], indirect=True) @pytest.mark.asyncio async def test_ingest_search_then_delete(app, logging_connection): try: # Ingest a document await app.aingest_documents( [ Document( id=generate_id_from_label("doc_1"), data="The quick brown fox jumps over the lazy dog.", type="txt", metadata={"author": "John Doe"}, ), ] ) # Search for the document search_results = await app.asearch("who was aristotle?") # Verify that the search results are not empty assert ( len(search_results["vector_search_results"]) > 0 ), "Expected search results, but got none" assert ( search_results["vector_search_results"][0]["metadata"]["text"] == "The quick brown fox jumps over the lazy dog." ) # Delete the document delete_result = await app.adelete(["author"], ["John Doe"]) # Verify the deletion was successful expected_deletion_message = "deleted successfully" assert ( expected_deletion_message in delete_result ), f"Expected successful deletion message, but got {delete_result}" # Search for the document again search_results_2 = await app.asearch("who was aristotle?") # Verify that the search results are empty assert ( len(search_results_2["vector_search_results"]) == 0 ), f"Expected no search results, but got {search_results_2['results']}" except asyncio.CancelledError: pass @pytest.mark.parametrize("app", ["local", "pgvector"], indirect=True) @pytest.mark.asyncio async def test_ingest_user_documents(app, logging_connection): try: user_id_0 = generate_id_from_label("user_0") user_id_1 = generate_id_from_label("user_1") try: await app.aingest_documents( [ Document( id=generate_id_from_label("doc_01"), data="The quick brown fox jumps over the lazy dog.", type="txt", metadata={"author": "John Doe", "user_id": user_id_0}, ), Document( id=generate_id_from_label("doc_11"), data="The lazy dog jumps over the quick brown fox.", type="txt", metadata={"author": "John Doe", "user_id": user_id_1}, ), ] ) user_id_results = await app.ausers_overview([user_id_0, user_id_1]) assert set([stats.user_id for stats in user_id_results]) == set( [user_id_0, user_id_1] ), f"Expected user ids {user_id_0} and {user_id_1}, but got {user_id_results}" user_0_docs = await app.adocuments_overview(user_ids=[user_id_0]) user_1_docs = await app.adocuments_overview(user_ids=[user_id_1]) assert ( len(user_0_docs) == 1 ), f"Expected 1 document for user {user_id_0}, but got {len(user_0_docs)}" assert ( len(user_1_docs) == 1 ), f"Expected 1 document for user {user_id_1}, but got {len(user_1_docs)}" assert user_0_docs[0].document_id == generate_id_from_label( "doc_01" ), f"Expected document id {str(generate_id_from_label('doc_0'))} for user {user_id_0}, but got {user_0_docs[0].document_id}" assert user_1_docs[0].document_id == generate_id_from_label( "doc_11" ), f"Expected document id {str(generate_id_from_label('doc_1'))} for user {user_id_1}, but got {user_1_docs[0].document_id}" finally: await app.adelete( ["document_id", "document_id"], [ str(generate_id_from_label("doc_01")), str(generate_id_from_label("doc_11")), ], ) except asyncio.CancelledError: pass @pytest.mark.parametrize("app", ["pgvector"], indirect=True) @pytest.mark.asyncio async def test_delete_by_id(app, logging_connection): try: await app.aingest_documents( [ Document( id=generate_id_from_label("doc_1"), data="The quick brown fox jumps over the lazy dog.", type="txt", metadata={"author": "John Doe"}, ), ] ) search_results = await app.asearch("who was aristotle?") assert len(search_results["vector_search_results"]) > 0 await app.adelete( ["document_id"], [str(generate_id_from_label("doc_1"))] ) search_results = await app.asearch("who was aristotle?") assert len(search_results["vector_search_results"]) == 0 except asyncio.CancelledError: pass @pytest.mark.parametrize("app", ["pgvector"], indirect=True) @pytest.mark.asyncio async def test_double_ingest(app, logging_connection): try: await app.aingest_documents( [ Document( id=generate_id_from_label("doc_1"), data="The quick brown fox jumps over the lazy dog.", type="txt", metadata={"author": "John Doe"}, ), ] ) search_results = await app.asearch("who was aristotle?") assert len(search_results["vector_search_results"]) == 1 with pytest.raises(Exception): await app.aingest_documents( [ Document( id=generate_id_from_label("doc_1"), data="The quick brown fox jumps over the lazy dog.", type="txt", metadata={"author": "John Doe"}, ), ] ) except asyncio.CancelledError: pass