Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
# Changelog

## [1.3.1]

### Changed
- Upgraded `pyTigerGraph` dependency to `>=2.0.3`
- Improved ingestion statistics: loading job results now parsed for accurate document counts and rejected line tracking
- Clarified file preparation log message to distinguish JSONL copies from converted files

### Fixed
- **WebSocket chat endpoint no longer crashes on early client disconnect**
- `WebSocketDisconnect` caught separately during auth and conversation ID phases
- Prevents `ASGI application` error when client closes before sending credentials
- **Loading jobs auto-recreated before ingestion** if missing (e.g., after schema drop or reinitialization)
- Checks for required loading job before JSONL ingestion loop
- Recreates from GSQL template if not found; fails with clear error if recreation fails

## [1.3.0]

### Added
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.3.0
1.3.1
3 changes: 1 addition & 2 deletions common/gsql/graphrag/louvain/stream_community.gsql
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
CREATE OR REPLACE DISTRIBUTED QUERY stream_community(UINT iter) {
Comms = {Community.*};

// Get communities of the current iteration
Comms = SELECT s FROM Comms:s
WHERE s.iteration == iter;

PRINT Comms;
PRINT Comms[Comms.id];
}
24 changes: 14 additions & 10 deletions common/metrics/tg_proxy.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import time
import re
from pyTigerGraph import TigerGraphConnection
Expand Down Expand Up @@ -113,14 +114,17 @@ def _runInstalledQuery(self, query_name, params, sizeLimit=None, usePost=False):
return result

def __del__(self):
if self.auth_mode == "pwd" and self._tg_connection.apiToken != '':
tg_version = self._tg_connection.getVer()
ver = tg_version.split(".")
resp = self._tg_connection._delete(
self._tg_connection.gsUrl + "/gsql/v1/tokens" if int(ver[0]) >= 4 else self._tg_connection.restppUrl + "/requesttoken",
authMode="pwd",
data=str({"token": self._tg_connection.apiToken}),
resKey=None,
jsonData=True
)
try:
if self.auth_mode == "pwd" and self._tg_connection.apiToken != '':
tg_version = self._tg_connection.getVer()
ver = tg_version.split(".")
resp = self._tg_connection._delete(
self._tg_connection.gsUrl + "/gsql/v1/tokens" if int(ver[0]) >= 4 else self._tg_connection.restppUrl + "/requesttoken",
authMode="pwd",
data=json.dumps({"token": self._tg_connection.apiToken}),
resKey=None,
jsonData=True
)
except Exception:
pass
metrics.tg_active_connections.dec()
2 changes: 1 addition & 1 deletion common/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ python-multipart==0.0.20
python-iso639==2025.2.18
python-magic==0.4.27
pyTigerDriver==1.0.15
pyTigerGraph==1.9.1
pyTigerGraph>=2.0.3
pytz==2025.2
PyYAML==6.0.2
rapidfuzz==3.13.0
Expand Down
4 changes: 3 additions & 1 deletion common/utils/text_extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ def __init__(self):
'.xml': 'application/xml',
'.jpeg': 'image/jpeg',
'.jpg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.jsonl': 'application/x-jsonlines'
}

Expand Down Expand Up @@ -288,7 +290,7 @@ async def process_with_semaphore(file_path):
'error': result.get('error', 'Unknown error')
})

logger.info(f"Processed {len(processed_files_info)} files, extracted {total_docs} total documents")
logger.info(f"Prepared {len(processed_files_info)} files ({len(jsonl_files_copied)} JSONL copied, {len(files_to_process)} converted), {total_docs} total documents")
logger.info(f"Created {len([f for f in processed_files_info if f.get('status') == 'success'])} JSONL files in {temp_folder}")

return {
Expand Down
41 changes: 41 additions & 0 deletions ecc/app/graphrag/graph_rag.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
from aiochannel import Channel, ChannelClosed
from graphrag import workers
from graphrag.util import (
COMMUNITY_QUERIES,
check_vertex_has_desc,
http_timeout,
init,
install_queries,
load_q,
loading_event,
make_headers,
Expand Down Expand Up @@ -517,8 +519,12 @@ async def run(graphname: str, conn: AsyncTigerGraphConnection):
type_end = time.perf_counter()

# Community Detection
# Ensure community queries are installed. Per TG docs, only DROP operations
# invalidate queries (not ADD/ALTER), but partial init failures or manual
# schema edits could still leave queries missing.
community_start = time.perf_counter()
if community_detection_switch:
await install_queries(COMMUNITY_QUERIES, conn)
logger.info("Community Processing Start")
comm_process_chan = Channel()
upsert_chan = Channel()
Expand Down Expand Up @@ -551,3 +557,38 @@ async def run(graphname: str, conn: AsyncTigerGraphConnection):
f"DONE. graphrag community initializer dT: {community_end-community_start}"
)
logger.info(f"DONE. graphrag.run() total time elaplsed: {end-init_start}")

# Verify all required queries and loading jobs are still intact after the
# pipeline. Per TG docs, only DROP invalidates queries, and ALTER/DROP
# invalidates loading jobs. Log any missing ones to catch unexpected side
# effects from schema changes.
from graphrag.util import REQUIRED_QUERIES
installed = set(
q.split("/")[-1]
for q in await conn.getEndpoints(dynamic=True)
if f"/{conn.graphname}/" in q
)
expected = {q.split("/")[-1] for q in REQUIRED_QUERIES}
missing = expected - installed
if missing:
logger.error(
f"Queries missing after ECC pipeline: {sorted(missing)}."
)
else:
logger.info("Post-pipeline check: all required queries are installed.")

current_schema = await conn.gsql(f"USE GRAPH {conn.graphname}\nls")
expected_jobs = [
"load_documents_content_json",
"load_documents_content_json_with_images",
]
missing_jobs = [
j for j in expected_jobs
if f"- CREATE LOADING JOB {j} {{" not in current_schema
]
if missing_jobs:
logger.error(
f"Loading jobs missing after ECC pipeline: {missing_jobs}."
)
else:
logger.info("Post-pipeline check: all loading jobs are intact.")
50 changes: 27 additions & 23 deletions ecc/app/graphrag/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import logging
import re
import traceback
from glob import glob

import httpx
from graphrag import reusable_channel, workers
Expand Down Expand Up @@ -46,6 +45,24 @@
# the base concurrency since each worker is mostly waiting on I/O (LLM/embedding API calls).
_worker_concurrency = _default_concurrency * 2
tg_sem = asyncio.Semaphore(_default_concurrency)

COMMUNITY_QUERIES = [
"common/gsql/graphrag/louvain/graphrag_louvain_init",
"common/gsql/graphrag/louvain/graphrag_louvain_communities",
"common/gsql/graphrag/louvain/modularity",
"common/gsql/graphrag/louvain/stream_community",
"common/gsql/graphrag/get_community_children",
"common/gsql/graphrag/communities_have_desc",
]

REQUIRED_QUERIES = [
"common/gsql/graphrag/StreamIds",
"common/gsql/graphrag/StreamDocContent",
"common/gsql/graphrag/StreamChunkContent",
"common/gsql/graphrag/SetEpochProcessing",
"common/gsql/graphrag/get_vertices_or_remove",
"common/gsql/supportai/create_entity_type_relationships",
]
load_q = reusable_channel.ReuseableChannel()

# will pause workers until the event is false
Expand Down Expand Up @@ -77,7 +94,8 @@ async def install_queries(
async with tg_sem:
res = await conn.gsql(query)
logger.info(f"INSTALL QUERY ALL returned: {str(res)[:200]}")
if isinstance(res, str) and "error" in res.lower():
res_lower = res.lower() if isinstance(res, str) else ""
if "error" in res_lower or "does not exist" in res_lower or "failed" in res_lower:
raise Exception(res)

max_wait = 600 # seconds
Expand Down Expand Up @@ -115,26 +133,9 @@ async def init(
Returns:
(extractor, embedding_store)
"""
# install requried queries
requried_queries = [
"common/gsql/graphrag/StreamIds",
"common/gsql/graphrag/StreamDocContent",
"common/gsql/graphrag/StreamChunkContent",
"common/gsql/graphrag/SetEpochProcessing",
"common/gsql/graphrag/get_community_children",
"common/gsql/graphrag/communities_have_desc",
"common/gsql/graphrag/get_vertices_or_remove",
"common/gsql/graphrag/louvain/graphrag_louvain_init",
"common/gsql/graphrag/louvain/graphrag_louvain_communities",
"common/gsql/graphrag/louvain/modularity",
"common/gsql/graphrag/louvain/stream_community",
"common/gsql/supportai/create_entity_type_relationships"
]
# add louvain to queries
q = [x.split(".gsql")[0] for x in glob("common/gsql/graphrag/louvain/*")]
requried_queries.extend(q)
logger.info(f"Installing queries needed for GraphRAG all together")
await install_queries(requried_queries, conn)
# install required queries
logger.info("Installing queries needed for GraphRAG all together")
await install_queries(REQUIRED_QUERIES, conn)

# extractor
graph_cfg = get_graphrag_config(conn.graphname)
Expand Down Expand Up @@ -241,7 +242,10 @@ async def upsert_batch(conn: AsyncTigerGraphConnection, data: str):
async def check_vertex_exists(conn, v_id: str):
async with tg_sem:
try:
res = await conn.getVerticesById("Entity", v_id)
from urllib.parse import quote
url = (conn.restppUrl + "/graph/" + conn.graphname
+ "/vertices/Entity/" + quote(v_id, safe=""))
res = await conn._req("GET", url, params={"select": "description"})

except Exception as e:
if "is not a valid vertex id" not in str(e):
Expand Down
33 changes: 26 additions & 7 deletions ecc/app/graphrag/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ async def install_query(
async with util.tg_sem:
res = await conn.gsql(query)

if "error" in res:
res_lower = res.lower() if isinstance(res, str) else ""
if "error" in res_lower or "does not exist" in res_lower or "failed" in res_lower:
LogWriter.error(res)
return {
"result": None,
Expand Down Expand Up @@ -209,15 +210,33 @@ async def embed(
logger.error(f"Failed to add embeddings for {v_id}: {e}")


def _is_near_duplicate(new_desc, existing_descs, threshold=0.85):
from difflib import SequenceMatcher
new_lower = new_desc.lower()
new_len = len(new_lower)
sm = SequenceMatcher(None, new_lower)
for existing in existing_descs:
ex_lower = existing.lower()
ex_len = len(ex_lower)
if not (new_len + ex_len) or 2 * min(new_len, ex_len) / (new_len + ex_len) < threshold:
continue
sm.set_seq2(ex_lower)
if sm.quick_ratio() >= threshold and sm.ratio() >= threshold:
return True
return False
Comment thread
chengbiao-jin marked this conversation as resolved.


async def get_vert_desc(conn, v_id, node: Node):
desc = [node.properties.get("description", "")]
new_desc = node.properties.get("description", "")
exists = await util.check_vertex_exists(conn, v_id)
# if vertex exists, get description content and append this description to it
if not exists.get("error", False):
# deduplicate descriptions
desc.extend(exists["resp"][0]["attributes"]["description"])
desc = list(set(desc))
return desc
resp = exists.get("resp")
if resp and len(resp) > 0 and "attributes" in resp[0]:
existing_descs = resp[0]["attributes"].get("description", [])
if not new_desc or _is_near_duplicate(new_desc, existing_descs):
return existing_descs if existing_descs else [new_desc]
return existing_descs + [new_desc]
return [new_desc]


extract_sem = asyncio.Semaphore(util._worker_concurrency)
Expand Down
8 changes: 7 additions & 1 deletion ecc/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,13 @@ async def run_with_tracking(task_key: str, run_func, graphname: str, conn):
try:
running_tasks[task_key] = {"status": "running", "started_at": time.time()}
LogWriter.info(f"Starting ECC task: {task_key}")


# Verify the graph still exists before doing any work
try:
await conn.getVertexTypes()
except Exception:
raise Exception(f"Graph '{graphname}' does not exist or is not accessible")

# Reload config at the start of each job to ensure latest settings are used
LogWriter.info("Reloading configuration for new job...")
from common.config import reload_llm_config, reload_graphrag_config, reload_db_config
Expand Down
8 changes: 6 additions & 2 deletions ecc/app/supportai/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ async def install_queries(
async with tg_sem:
res = await conn.gsql(query)
logger.info(f"INSTALL QUERY ALL returned: {str(res)[:200]}")
if isinstance(res, str) and "error" in res.lower():
res_lower = res.lower() if isinstance(res, str) else ""
if "error" in res_lower or "does not exist" in res_lower or "failed" in res_lower:
raise Exception(res)

max_wait = 300 # seconds
Expand Down Expand Up @@ -215,7 +216,10 @@ async def upsert_vertex(
async def check_vertex_exists(conn, v_id: str):
async with tg_sem:
try:
res = await conn.getVerticesById("Entity", v_id)
from urllib.parse import quote
url = (conn.restppUrl + "/graph/" + conn.graphname
+ "/vertices/Entity/" + quote(v_id, safe=""))
res = await conn._req("GET", url, params={"select": "description"})

except Exception as e:
err = traceback.format_exc()
Expand Down
35 changes: 27 additions & 8 deletions ecc/app/supportai/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ async def install_query(
async with util.tg_sem:
res = await conn.gsql(query)

if "error" in res:
res_lower = res.lower() if isinstance(res, str) else ""
if "error" in res_lower or "does not exist" in res_lower or "failed" in res_lower:
LogWriter.error(res)
return {
"result": None,
Expand Down Expand Up @@ -164,15 +165,33 @@ async def embed(
await embed_store.aadd_embeddings([(content, [])], [{"vertex_id": v_id}])


def _is_near_duplicate(new_desc, existing_descs, threshold=0.85):
from difflib import SequenceMatcher
new_lower = new_desc.lower()
new_len = len(new_lower)
sm = SequenceMatcher(None, new_lower)
for existing in existing_descs:
ex_lower = existing.lower()
ex_len = len(ex_lower)
if not (new_len + ex_len) or 2 * min(new_len, ex_len) / (new_len + ex_len) < threshold:
continue
sm.set_seq2(ex_lower)
if sm.quick_ratio() >= threshold and sm.ratio() >= threshold:
return True
return False
Comment thread
chengbiao-jin marked this conversation as resolved.


async def get_vert_desc(conn, v_id, node: Node):
desc = [node.properties.get("description", "")]
new_desc = node.properties.get("description", "")
exists = await util.check_vertex_exists(conn, v_id)
# if vertex exists, get description content and append this description to it
if not exists["error"]:
# deduplicate descriptions
desc.extend(exists["results"][0]["attributes"]["description"])
desc = list(set(desc))
return desc
if not exists.get("error", False):
resp = exists.get("resp")
if resp and len(resp) > 0 and "attributes" in resp[0]:
existing_descs = resp[0]["attributes"].get("description", [])
if not new_desc or _is_near_duplicate(new_desc, existing_descs):
return existing_descs if existing_descs else [new_desc]
return existing_descs + [new_desc]
return [new_desc]


async def extract(
Expand Down
Loading
Loading