diff --git a/CHANGELOG.md b/CHANGELOG.md index bc07a72..9b34570 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,20 @@ # Changelog +## [1.3.1] + +### Changed +- Upgraded `pyTigerGraph` dependency to `>=2.0.3` +- Improved ingestion statistics: loading job results now parsed for accurate document counts and rejected line tracking +- Clarified file preparation log message to distinguish JSONL copies from converted files + +### Fixed +- **WebSocket chat endpoint no longer crashes on early client disconnect** + - `WebSocketDisconnect` caught separately during auth and conversation ID phases + - Prevents `ASGI application` error when client closes before sending credentials +- **Loading jobs auto-recreated before ingestion** if missing (e.g., after schema drop or reinitialization) + - Checks for required loading job before JSONL ingestion loop + - Recreates from GSQL template if not found; fails with clear error if recreation fails + ## [1.3.0] ### Added diff --git a/VERSION b/VERSION index f0bb29e..3a3cd8c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.3.0 +1.3.1 diff --git a/common/gsql/graphrag/louvain/stream_community.gsql b/common/gsql/graphrag/louvain/stream_community.gsql index fd99f67..e04f7fc 100644 --- a/common/gsql/graphrag/louvain/stream_community.gsql +++ b/common/gsql/graphrag/louvain/stream_community.gsql @@ -1,9 +1,8 @@ CREATE OR REPLACE DISTRIBUTED QUERY stream_community(UINT iter) { Comms = {Community.*}; - // Get communities of the current iteration Comms = SELECT s FROM Comms:s WHERE s.iteration == iter; - PRINT Comms; + PRINT Comms[Comms.id]; } diff --git a/common/metrics/tg_proxy.py b/common/metrics/tg_proxy.py index a9a325f..76c2029 100644 --- a/common/metrics/tg_proxy.py +++ b/common/metrics/tg_proxy.py @@ -1,3 +1,4 @@ +import json import time import re from pyTigerGraph import TigerGraphConnection @@ -113,14 +114,17 @@ def _runInstalledQuery(self, query_name, params, sizeLimit=None, usePost=False): return result def __del__(self): - if self.auth_mode == "pwd" and self._tg_connection.apiToken != '': - tg_version = self._tg_connection.getVer() - ver = tg_version.split(".") - resp = self._tg_connection._delete( - self._tg_connection.gsUrl + "/gsql/v1/tokens" if int(ver[0]) >= 4 else self._tg_connection.restppUrl + "/requesttoken", - authMode="pwd", - data=str({"token": self._tg_connection.apiToken}), - resKey=None, - jsonData=True - ) + try: + if self.auth_mode == "pwd" and self._tg_connection.apiToken != '': + tg_version = self._tg_connection.getVer() + ver = tg_version.split(".") + resp = self._tg_connection._delete( + self._tg_connection.gsUrl + "/gsql/v1/tokens" if int(ver[0]) >= 4 else self._tg_connection.restppUrl + "/requesttoken", + authMode="pwd", + data=json.dumps({"token": self._tg_connection.apiToken}), + resKey=None, + jsonData=True + ) + except Exception: + pass metrics.tg_active_connections.dec() diff --git a/common/requirements.txt b/common/requirements.txt index 12c9fcf..0a7c34f 100644 --- a/common/requirements.txt +++ b/common/requirements.txt @@ -138,7 +138,7 @@ python-multipart==0.0.20 python-iso639==2025.2.18 python-magic==0.4.27 pyTigerDriver==1.0.15 -pyTigerGraph==1.9.1 +pyTigerGraph>=2.0.3 pytz==2025.2 PyYAML==6.0.2 rapidfuzz==3.13.0 diff --git a/common/utils/text_extractors.py b/common/utils/text_extractors.py index 2b3e78d..449ace5 100644 --- a/common/utils/text_extractors.py +++ b/common/utils/text_extractors.py @@ -137,6 +137,8 @@ def __init__(self): '.xml': 'application/xml', '.jpeg': 'image/jpeg', '.jpg': 'image/jpeg', + '.png': 'image/png', + '.gif': 'image/gif', '.jsonl': 'application/x-jsonlines' } @@ -288,7 +290,7 @@ async def process_with_semaphore(file_path): 'error': result.get('error', 'Unknown error') }) - logger.info(f"Processed {len(processed_files_info)} files, extracted {total_docs} total documents") + logger.info(f"Prepared {len(processed_files_info)} files ({len(jsonl_files_copied)} JSONL copied, {len(files_to_process)} converted), {total_docs} total documents") logger.info(f"Created {len([f for f in processed_files_info if f.get('status') == 'success'])} JSONL files in {temp_folder}") return { diff --git a/ecc/app/graphrag/graph_rag.py b/ecc/app/graphrag/graph_rag.py index 49a5760..c7ef5be 100644 --- a/ecc/app/graphrag/graph_rag.py +++ b/ecc/app/graphrag/graph_rag.py @@ -23,9 +23,11 @@ from aiochannel import Channel, ChannelClosed from graphrag import workers from graphrag.util import ( + COMMUNITY_QUERIES, check_vertex_has_desc, http_timeout, init, + install_queries, load_q, loading_event, make_headers, @@ -517,8 +519,12 @@ async def run(graphname: str, conn: AsyncTigerGraphConnection): type_end = time.perf_counter() # Community Detection + # Ensure community queries are installed. Per TG docs, only DROP operations + # invalidate queries (not ADD/ALTER), but partial init failures or manual + # schema edits could still leave queries missing. community_start = time.perf_counter() if community_detection_switch: + await install_queries(COMMUNITY_QUERIES, conn) logger.info("Community Processing Start") comm_process_chan = Channel() upsert_chan = Channel() @@ -551,3 +557,38 @@ async def run(graphname: str, conn: AsyncTigerGraphConnection): f"DONE. graphrag community initializer dT: {community_end-community_start}" ) logger.info(f"DONE. graphrag.run() total time elaplsed: {end-init_start}") + + # Verify all required queries and loading jobs are still intact after the + # pipeline. Per TG docs, only DROP invalidates queries, and ALTER/DROP + # invalidates loading jobs. Log any missing ones to catch unexpected side + # effects from schema changes. + from graphrag.util import REQUIRED_QUERIES + installed = set( + q.split("/")[-1] + for q in await conn.getEndpoints(dynamic=True) + if f"/{conn.graphname}/" in q + ) + expected = {q.split("/")[-1] for q in REQUIRED_QUERIES} + missing = expected - installed + if missing: + logger.error( + f"Queries missing after ECC pipeline: {sorted(missing)}." + ) + else: + logger.info("Post-pipeline check: all required queries are installed.") + + current_schema = await conn.gsql(f"USE GRAPH {conn.graphname}\nls") + expected_jobs = [ + "load_documents_content_json", + "load_documents_content_json_with_images", + ] + missing_jobs = [ + j for j in expected_jobs + if f"- CREATE LOADING JOB {j} {{" not in current_schema + ] + if missing_jobs: + logger.error( + f"Loading jobs missing after ECC pipeline: {missing_jobs}." + ) + else: + logger.info("Post-pipeline check: all loading jobs are intact.") diff --git a/ecc/app/graphrag/util.py b/ecc/app/graphrag/util.py index f12157f..094d95b 100644 --- a/ecc/app/graphrag/util.py +++ b/ecc/app/graphrag/util.py @@ -18,7 +18,6 @@ import logging import re import traceback -from glob import glob import httpx from graphrag import reusable_channel, workers @@ -46,6 +45,24 @@ # the base concurrency since each worker is mostly waiting on I/O (LLM/embedding API calls). _worker_concurrency = _default_concurrency * 2 tg_sem = asyncio.Semaphore(_default_concurrency) + +COMMUNITY_QUERIES = [ + "common/gsql/graphrag/louvain/graphrag_louvain_init", + "common/gsql/graphrag/louvain/graphrag_louvain_communities", + "common/gsql/graphrag/louvain/modularity", + "common/gsql/graphrag/louvain/stream_community", + "common/gsql/graphrag/get_community_children", + "common/gsql/graphrag/communities_have_desc", +] + +REQUIRED_QUERIES = [ + "common/gsql/graphrag/StreamIds", + "common/gsql/graphrag/StreamDocContent", + "common/gsql/graphrag/StreamChunkContent", + "common/gsql/graphrag/SetEpochProcessing", + "common/gsql/graphrag/get_vertices_or_remove", + "common/gsql/supportai/create_entity_type_relationships", +] load_q = reusable_channel.ReuseableChannel() # will pause workers until the event is false @@ -77,7 +94,8 @@ async def install_queries( async with tg_sem: res = await conn.gsql(query) logger.info(f"INSTALL QUERY ALL returned: {str(res)[:200]}") - if isinstance(res, str) and "error" in res.lower(): + res_lower = res.lower() if isinstance(res, str) else "" + if "error" in res_lower or "does not exist" in res_lower or "failed" in res_lower: raise Exception(res) max_wait = 600 # seconds @@ -115,26 +133,9 @@ async def init( Returns: (extractor, embedding_store) """ - # install requried queries - requried_queries = [ - "common/gsql/graphrag/StreamIds", - "common/gsql/graphrag/StreamDocContent", - "common/gsql/graphrag/StreamChunkContent", - "common/gsql/graphrag/SetEpochProcessing", - "common/gsql/graphrag/get_community_children", - "common/gsql/graphrag/communities_have_desc", - "common/gsql/graphrag/get_vertices_or_remove", - "common/gsql/graphrag/louvain/graphrag_louvain_init", - "common/gsql/graphrag/louvain/graphrag_louvain_communities", - "common/gsql/graphrag/louvain/modularity", - "common/gsql/graphrag/louvain/stream_community", - "common/gsql/supportai/create_entity_type_relationships" - ] - # add louvain to queries - q = [x.split(".gsql")[0] for x in glob("common/gsql/graphrag/louvain/*")] - requried_queries.extend(q) - logger.info(f"Installing queries needed for GraphRAG all together") - await install_queries(requried_queries, conn) + # install required queries + logger.info("Installing queries needed for GraphRAG all together") + await install_queries(REQUIRED_QUERIES, conn) # extractor graph_cfg = get_graphrag_config(conn.graphname) @@ -241,7 +242,10 @@ async def upsert_batch(conn: AsyncTigerGraphConnection, data: str): async def check_vertex_exists(conn, v_id: str): async with tg_sem: try: - res = await conn.getVerticesById("Entity", v_id) + from urllib.parse import quote + url = (conn.restppUrl + "/graph/" + conn.graphname + + "/vertices/Entity/" + quote(v_id, safe="")) + res = await conn._req("GET", url, params={"select": "description"}) except Exception as e: if "is not a valid vertex id" not in str(e): diff --git a/ecc/app/graphrag/workers.py b/ecc/app/graphrag/workers.py index c0b35cc..516b83c 100644 --- a/ecc/app/graphrag/workers.py +++ b/ecc/app/graphrag/workers.py @@ -53,7 +53,8 @@ async def install_query( async with util.tg_sem: res = await conn.gsql(query) - if "error" in res: + res_lower = res.lower() if isinstance(res, str) else "" + if "error" in res_lower or "does not exist" in res_lower or "failed" in res_lower: LogWriter.error(res) return { "result": None, @@ -209,15 +210,33 @@ async def embed( logger.error(f"Failed to add embeddings for {v_id}: {e}") +def _is_near_duplicate(new_desc, existing_descs, threshold=0.85): + from difflib import SequenceMatcher + new_lower = new_desc.lower() + new_len = len(new_lower) + sm = SequenceMatcher(None, new_lower) + for existing in existing_descs: + ex_lower = existing.lower() + ex_len = len(ex_lower) + if not (new_len + ex_len) or 2 * min(new_len, ex_len) / (new_len + ex_len) < threshold: + continue + sm.set_seq2(ex_lower) + if sm.quick_ratio() >= threshold and sm.ratio() >= threshold: + return True + return False + + async def get_vert_desc(conn, v_id, node: Node): - desc = [node.properties.get("description", "")] + new_desc = node.properties.get("description", "") exists = await util.check_vertex_exists(conn, v_id) - # if vertex exists, get description content and append this description to it if not exists.get("error", False): - # deduplicate descriptions - desc.extend(exists["resp"][0]["attributes"]["description"]) - desc = list(set(desc)) - return desc + resp = exists.get("resp") + if resp and len(resp) > 0 and "attributes" in resp[0]: + existing_descs = resp[0]["attributes"].get("description", []) + if not new_desc or _is_near_duplicate(new_desc, existing_descs): + return existing_descs if existing_descs else [new_desc] + return existing_descs + [new_desc] + return [new_desc] extract_sem = asyncio.Semaphore(util._worker_concurrency) diff --git a/ecc/app/main.py b/ecc/app/main.py index d15ac75..58751bb 100644 --- a/ecc/app/main.py +++ b/ecc/app/main.py @@ -216,7 +216,13 @@ async def run_with_tracking(task_key: str, run_func, graphname: str, conn): try: running_tasks[task_key] = {"status": "running", "started_at": time.time()} LogWriter.info(f"Starting ECC task: {task_key}") - + + # Verify the graph still exists before doing any work + try: + await conn.getVertexTypes() + except Exception: + raise Exception(f"Graph '{graphname}' does not exist or is not accessible") + # Reload config at the start of each job to ensure latest settings are used LogWriter.info("Reloading configuration for new job...") from common.config import reload_llm_config, reload_graphrag_config, reload_db_config diff --git a/ecc/app/supportai/util.py b/ecc/app/supportai/util.py index 6269624..3e3f07a 100644 --- a/ecc/app/supportai/util.py +++ b/ecc/app/supportai/util.py @@ -64,7 +64,8 @@ async def install_queries( async with tg_sem: res = await conn.gsql(query) logger.info(f"INSTALL QUERY ALL returned: {str(res)[:200]}") - if isinstance(res, str) and "error" in res.lower(): + res_lower = res.lower() if isinstance(res, str) else "" + if "error" in res_lower or "does not exist" in res_lower or "failed" in res_lower: raise Exception(res) max_wait = 300 # seconds @@ -215,7 +216,10 @@ async def upsert_vertex( async def check_vertex_exists(conn, v_id: str): async with tg_sem: try: - res = await conn.getVerticesById("Entity", v_id) + from urllib.parse import quote + url = (conn.restppUrl + "/graph/" + conn.graphname + + "/vertices/Entity/" + quote(v_id, safe="")) + res = await conn._req("GET", url, params={"select": "description"}) except Exception as e: err = traceback.format_exc() diff --git a/ecc/app/supportai/workers.py b/ecc/app/supportai/workers.py index 2c62169..07104fb 100644 --- a/ecc/app/supportai/workers.py +++ b/ecc/app/supportai/workers.py @@ -52,7 +52,8 @@ async def install_query( async with util.tg_sem: res = await conn.gsql(query) - if "error" in res: + res_lower = res.lower() if isinstance(res, str) else "" + if "error" in res_lower or "does not exist" in res_lower or "failed" in res_lower: LogWriter.error(res) return { "result": None, @@ -164,15 +165,33 @@ async def embed( await embed_store.aadd_embeddings([(content, [])], [{"vertex_id": v_id}]) +def _is_near_duplicate(new_desc, existing_descs, threshold=0.85): + from difflib import SequenceMatcher + new_lower = new_desc.lower() + new_len = len(new_lower) + sm = SequenceMatcher(None, new_lower) + for existing in existing_descs: + ex_lower = existing.lower() + ex_len = len(ex_lower) + if not (new_len + ex_len) or 2 * min(new_len, ex_len) / (new_len + ex_len) < threshold: + continue + sm.set_seq2(ex_lower) + if sm.quick_ratio() >= threshold and sm.ratio() >= threshold: + return True + return False + + async def get_vert_desc(conn, v_id, node: Node): - desc = [node.properties.get("description", "")] + new_desc = node.properties.get("description", "") exists = await util.check_vertex_exists(conn, v_id) - # if vertex exists, get description content and append this description to it - if not exists["error"]: - # deduplicate descriptions - desc.extend(exists["results"][0]["attributes"]["description"]) - desc = list(set(desc)) - return desc + if not exists.get("error", False): + resp = exists.get("resp") + if resp and len(resp) > 0 and "attributes" in resp[0]: + existing_descs = resp[0]["attributes"].get("description", []) + if not new_desc or _is_near_duplicate(new_desc, existing_descs): + return existing_descs if existing_descs else [new_desc] + return existing_descs + [new_desc] + return [new_desc] async def extract( diff --git a/graphrag/app/agent/agent_graph.py b/graphrag/app/agent/agent_graph.py index fc9fc96..5341cf0 100644 --- a/graphrag/app/agent/agent_graph.py +++ b/graphrag/app/agent/agent_graph.py @@ -88,9 +88,11 @@ def __init__( self.supportai_enabled = True self.supportai_retriever = supportai_retriever.lower().replace(" ", "") try: - self.db_connection.getQueryMetadata("StreamDocContent") - except TigerGraphException as e: - logger.info(f"StreamDocContent not found in the graph {self.db_connection.graphname}. Disabling supportai.") + vtypes = self.db_connection.getVertexTypes() + if "DocumentChunk" not in vtypes: + raise ValueError("DocumentChunk vertex type not found") + except Exception as e: + logger.info(f"SupportAI schema not found in graph {self.db_connection.graphname}. Disabling supportai.") self.supportai_enabled = False def emit_progress(self, msg): @@ -372,9 +374,6 @@ def hybrid_search(self, state): state["context"] = { "function_call": query_name, "result": step[0], - "query_output_format": self.db_connection.getQueryMetadata( - query_name - )["output"], } state["lookup_source"] = "supportai" return state @@ -401,9 +400,6 @@ def similarity_search(self, state): state["context"] = { "function_call": query_name, "result": step[0], - "query_output_format": self.db_connection.getQueryMetadata( - query_name - )["output"], } state["lookup_source"] = "supportai" return state @@ -429,9 +425,6 @@ def sibling_search(self, state): state["context"] = { "function_call": query_name, "result": step[0], - "query_output_format": self.db_connection.getQueryMetadata( - query_name - )["output"], } state["lookup_source"] = "supportai" return state @@ -458,9 +451,6 @@ def community_search(self, state): state["context"] = { "function_call": query_name, "result": step[0], - "query_output_format": self.db_connection.getQueryMetadata( - query_name - )["output"], } state["lookup_source"] = "supportai" return state diff --git a/graphrag/app/routers/ui.py b/graphrag/app/routers/ui.py index 30971cd..400435d 100644 --- a/graphrag/app/routers/ui.py +++ b/graphrag/app/routers/ui.py @@ -77,7 +77,6 @@ route_prefix = "/ui" # APIRouter's prefix doesn't work with the websocket, so it has to be done here router = APIRouter(tags=["UI"]) security = HTTPBasic() -GRAPH_NAME_RE = re.compile(r"- Graph (.*)\(") llm_config_lock = asyncio.Lock() # Cache for user role lookups (avoids repeated GSQL calls) @@ -269,12 +268,8 @@ def auth(usr: str, password: str, conn=None) -> tuple[list[str], TigerGraphConne ) try: - # parse user info - info = conn.gsql("LS USER") - graphs = [] - for m in GRAPH_NAME_RE.finditer(info): - groups = m.groups() - graphs.extend(groups) + graph_list = conn.listGraphs() + graphs = [g["graphName"] for g in graph_list if "graphName" in g] except requests.exceptions.HTTPError as e: raise HTTPException( @@ -1124,16 +1119,26 @@ async def chat( logger.error("WebSocket authentication timeout - no credentials received") await websocket.close(code=1008, reason="Authentication timeout") return + except WebSocketDisconnect: + logger.info("WebSocket disconnected during authentication") + return except Exception as e: logger.error(f"Authentication failed: {e}") - await websocket.close(code=1008, reason=f"Authentication failed") + try: + await websocket.close(code=1008, reason="Authentication failed") + except Exception: + pass return # Get RAG pattern rag_pattern = rag_pattern or "hybridsearch" - + # Get conversation ID - conversation_id = await websocket.receive_text() + try: + conversation_id = await websocket.receive_text() + except WebSocketDisconnect: + logger.info("WebSocket disconnected before conversation ID received") + return logger.info( f"WebSocket conversation_id received: {conversation_id or 'empty'} " f"(graph={graphname}, rag_pattern={rag_pattern})" @@ -2348,11 +2353,10 @@ async def test_db_connection( graphname="", ) - # Test connection by listing users if db_test_config.get("getToken", False): test_conn.getToken() - - test_conn.gsql("LS USER") + + test_conn.listGraphs() return { "status": "success", diff --git a/graphrag/app/supportai/supportai.py b/graphrag/app/supportai/supportai.py index 6fb992c..f38eba9 100644 --- a/graphrag/app/supportai/supportai.py +++ b/graphrag/app/supportai/supportai.py @@ -520,6 +520,32 @@ def create_ingest( return res +_LOADING_JOB_GSQL_FILES = { + "load_documents_content_json_with_images": "common/gsql/supportai/SupportAI_InitialLoadJSON_WithImages.gsql", + "load_documents_content_json": "common/gsql/supportai/SupportAI_InitialLoadJSON.gsql", +} + + +def _ensure_loading_jobs(conn: TigerGraphConnection, graphname: str, load_job_id: str) -> None: + """Check that the required loading job exists; recreate it if missing.""" + current_schema = conn.gsql(f"USE GRAPH {graphname}\n ls") + marker = f"- CREATE LOADING JOB {load_job_id} {{" + if marker in current_schema: + return + + gsql_file = _LOADING_JOB_GSQL_FILES.get(load_job_id) + if not gsql_file: + raise Exception(f"Loading job '{load_job_id}' not found and no GSQL template available to recreate it") + + logger.info(f"Loading job '{load_job_id}' missing — recreating from {gsql_file}") + with open(gsql_file, "r") as f: + q_body = f.read() + result = conn.gsql(f"USE GRAPH {graphname}\nBEGIN\n{q_body}\nEND\n") + logger.info(f"Loading job creation result: {result}") + if isinstance(result, str) and ("error" in result.lower() or "failed" in result.lower()): + raise Exception(f"Failed to recreate loading job '{load_job_id}': {result}") + + def ingest( graphname: str, loader_info: LoadingInfo, @@ -658,7 +684,7 @@ def ingest( elif ingest_config.get("data_source") == "server": try: data_source_id = ingest_config.get("data_source_id", "DocumentContent") - + # Read from temporary folder containing JSONL files (one per input file) data_path = ingest_config.get("data_path") if not data_path or not os.path.exists(data_path): @@ -668,30 +694,55 @@ def ingest( if not jsonl_files: raise Exception(f"No JSONL files found in: {data_path}") logger.info(f"Found {len(jsonl_files)} JSONL files to ingest from: {data_path}") - + + # Ensure loading job exists — recreate if missing (e.g. after schema drop) + _ensure_loading_jobs(conn, graphname, loader_info.load_job_id) + total_doc_count = 0 ingested_files = [] - + # Process each JSONL file separately for jsonl_filename in jsonl_files: jsonl_file = os.path.join(data_path, jsonl_filename) logger.info(f"Processing JSONL file: {jsonl_filename}") - + try: # Load documents directly from file - more memory efficient - conn.runLoadingJobWithFile(jsonl_file, data_source_id, loader_info.load_job_id) - - # Count documents for reporting - with open(jsonl_file, 'r', encoding='utf-8') as f: - doc_count = sum(1 for line in f if line.strip()) + load_result = conn.runLoadingJobWithFile(jsonl_file, data_source_id, loader_info.load_job_id) + logger.info(f"Loading job raw result for {jsonl_filename}: {load_result}") + + # Parse loading job statistics + valid_lines = 0 + rejected_lines = 0 + doc_count = 0 + if load_result: + for entry in load_result: + stats = entry.get("statistics", {}) + parsing = stats.get("parsingStatistics", stats) + file_level = parsing.get("fileLevel", {}) + valid_lines += file_level.get("validLine", stats.get("validLine", 0)) + rejected_lines += file_level.get("invalidLine", stats.get("invalidLine", 0)) + obj_level = parsing.get("objectLevel", stats) + for v in obj_level.get("vertex", []): + if v.get("typeName") == "Document": + doc_count += v.get("validObject", 0) + if doc_count == 0: + # Fallback: count lines in JSONL file + with open(jsonl_file, 'r', encoding='utf-8') as f: + doc_count = sum(1 for line in f if line.strip()) total_doc_count += doc_count ingested_files.append({ 'jsonl_file': jsonl_filename, 'document_count': doc_count, + 'valid_lines': valid_lines, + 'rejected_lines': rejected_lines, 'status': 'success' }) - logger.info(f"Successfully ingested {doc_count} documents from {jsonl_filename}") - + logger.info( + f"Successfully ingested {doc_count} documents from {jsonl_filename} " + f"(validLine={valid_lines}, rejectedLine={rejected_lines})" + ) + except Exception as file_error: logger.error(f"Failed to ingest {jsonl_filename}: {file_error}") ingested_files.append({