Commit
·
f1351d2
1
Parent(s):
a64ddb6
Update file parsing progress info (#3780)
Browse files### What problem does this PR solve?
Refine the file parsing progress info
### Type of change
- [x] Refactoring
Signed-off-by: jinhai <[email protected]>
- rag/svr/task_executor.py +57 -43
rag/svr/task_executor.py
CHANGED
|
@@ -370,72 +370,86 @@ def run_raptor(row, chat_mdl, embd_mdl, callback=None):
|
|
| 370 |
return res, tk_count, vector_size
|
| 371 |
|
| 372 |
|
| 373 |
-
def do_handle_task(
|
| 374 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 375 |
try:
|
| 376 |
-
|
|
|
|
| 377 |
except Exception as e:
|
| 378 |
-
|
| 379 |
raise
|
| 380 |
-
|
|
|
|
|
|
|
| 381 |
try:
|
| 382 |
-
|
| 383 |
-
|
|
|
|
|
|
|
|
|
|
| 384 |
except Exception as e:
|
| 385 |
-
|
| 386 |
raise
|
| 387 |
else:
|
| 388 |
-
|
| 389 |
-
|
| 390 |
-
|
| 391 |
-
|
|
|
|
| 392 |
return
|
| 393 |
-
if not
|
| 394 |
-
|
| 395 |
return
|
| 396 |
# TODO: exception handler
|
| 397 |
-
## set_progress(
|
| 398 |
-
|
| 399 |
-
|
| 400 |
-
)
|
| 401 |
-
st = timer()
|
| 402 |
try:
|
| 403 |
-
tk_count, vector_size = embedding(
|
| 404 |
except Exception as e:
|
| 405 |
-
|
| 406 |
-
logging.exception("
|
| 407 |
tk_count = 0
|
| 408 |
raise
|
| 409 |
-
logging.info("Embedding
|
| 410 |
-
|
| 411 |
-
# logging.info(f"task_executor init_kb index {search.index_name(
|
| 412 |
-
init_kb(
|
| 413 |
-
chunk_count = len(set([
|
| 414 |
-
|
| 415 |
es_r = ""
|
| 416 |
es_bulk_size = 4
|
| 417 |
-
for b in range(0, len(
|
| 418 |
-
es_r = settings.docStoreConn.insert(
|
| 419 |
if b % 128 == 0:
|
| 420 |
-
|
| 421 |
-
logging.info("Indexing
|
| 422 |
if es_r:
|
| 423 |
-
|
| 424 |
-
|
| 425 |
-
settings.docStoreConn.delete({"doc_id": r["doc_id"]}, search.index_name(r["tenant_id"]), r["kb_id"])
|
| 426 |
logging.error('Insert chunk error: ' + str(es_r))
|
| 427 |
raise Exception('Insert chunk error: ' + str(es_r))
|
| 428 |
|
| 429 |
-
if TaskService.do_cancel(
|
| 430 |
-
settings.docStoreConn.delete({"doc_id":
|
| 431 |
return
|
| 432 |
|
| 433 |
-
|
| 434 |
-
DocumentService.increment_chunk_num(
|
| 435 |
-
|
| 436 |
-
logging.info(
|
| 437 |
-
"Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format(
|
| 438 |
-
r["id"], tk_count, len(cks), timer() - st))
|
| 439 |
|
| 440 |
|
| 441 |
def handle_task():
|
|
|
|
| 370 |
return res, tk_count, vector_size
|
| 371 |
|
| 372 |
|
| 373 |
+
def do_handle_task(task):
|
| 374 |
+
task_id = task["id"]
|
| 375 |
+
task_from_page = task["from_page"]
|
| 376 |
+
task_to_page = task["to_page"]
|
| 377 |
+
task_tenant_id = task["tenant_id"]
|
| 378 |
+
task_embedding_id = task["embd_id"]
|
| 379 |
+
task_language = task["language"]
|
| 380 |
+
task_llm_id = task["llm_id"]
|
| 381 |
+
task_dataset_id = task["kb_id"]
|
| 382 |
+
task_doc_id = task["doc_id"]
|
| 383 |
+
task_document_name = task["name"]
|
| 384 |
+
task_parser_config = task["parser_config"]
|
| 385 |
+
|
| 386 |
+
# prepare the progress callback function
|
| 387 |
+
progress_callback = partial(set_progress, task_id, task_from_page, task_to_page)
|
| 388 |
try:
|
| 389 |
+
# bind embedding model
|
| 390 |
+
embedding_model = LLMBundle(task_tenant_id, LLMType.EMBEDDING, llm_name=task_embedding_id, lang=task_language)
|
| 391 |
except Exception as e:
|
| 392 |
+
progress_callback(-1, msg=f'Fail to bind embedding model: {str(e)}')
|
| 393 |
raise
|
| 394 |
+
|
| 395 |
+
# Either using RAPTOR or Standard chunking methods
|
| 396 |
+
if task.get("task_type", "") == "raptor":
|
| 397 |
try:
|
| 398 |
+
# bind LLM for raptor
|
| 399 |
+
chat_model = LLMBundle(task_tenant_id, LLMType.CHAT, llm_name=task_llm_id, lang=task_language)
|
| 400 |
+
|
| 401 |
+
# run RAPTOR
|
| 402 |
+
chunks, tk_count, vector_size = run_raptor(task, chat_model, embedding_model, progress_callback)
|
| 403 |
except Exception as e:
|
| 404 |
+
progress_callback(-1, msg=f'Fail to bind LLM used by RAPTOR: {str(e)}')
|
| 405 |
raise
|
| 406 |
else:
|
| 407 |
+
# Standard chunking methods
|
| 408 |
+
start_ts = timer()
|
| 409 |
+
chunks = build(task)
|
| 410 |
+
logging.info("Build document {}: {:.2f}s".format(task_document_name, timer() - start_ts))
|
| 411 |
+
if chunks is None:
|
| 412 |
return
|
| 413 |
+
if not chunks:
|
| 414 |
+
progress_callback(1., msg=f"No chunk built from {task_document_name}")
|
| 415 |
return
|
| 416 |
# TODO: exception handler
|
| 417 |
+
## set_progress(task["did"], -1, "ERROR: ")
|
| 418 |
+
progress_callback(msg="Generate {} chunks".format(len(chunks)))
|
| 419 |
+
start_ts = timer()
|
|
|
|
|
|
|
| 420 |
try:
|
| 421 |
+
tk_count, vector_size = embedding(chunks, embedding_model, task_parser_config, progress_callback)
|
| 422 |
except Exception as e:
|
| 423 |
+
progress_callback(-1, "Generate embedding error:{}".format(str(e)))
|
| 424 |
+
logging.exception("run_embedding got exception")
|
| 425 |
tk_count = 0
|
| 426 |
raise
|
| 427 |
+
logging.info("Embedding {} elapsed: {:.2f}".format(task_document_name, timer() - start_ts))
|
| 428 |
+
progress_callback(msg="Embedding chunks ({:.2f}s)".format(timer() - start_ts))
|
| 429 |
+
# logging.info(f"task_executor init_kb index {search.index_name(task_tenant_id)} embedding_model {embedding_model.llm_name} vector length {vector_size}")
|
| 430 |
+
init_kb(task, vector_size)
|
| 431 |
+
chunk_count = len(set([chunk["id"] for chunk in chunks]))
|
| 432 |
+
start_ts = timer()
|
| 433 |
es_r = ""
|
| 434 |
es_bulk_size = 4
|
| 435 |
+
for b in range(0, len(chunks), es_bulk_size):
|
| 436 |
+
es_r = settings.docStoreConn.insert(chunks[b:b + es_bulk_size], search.index_name(task_tenant_id), task_dataset_id)
|
| 437 |
if b % 128 == 0:
|
| 438 |
+
progress_callback(prog=0.8 + 0.1 * (b + 1) / len(chunks), msg="")
|
| 439 |
+
logging.info("Indexing {} elapsed: {:.2f}".format(task_document_name, timer() - start_ts))
|
| 440 |
if es_r:
|
| 441 |
+
progress_callback(-1, "Insert chunk error, detail info please check log file. Please also check Elasticsearch/Infinity status!")
|
| 442 |
+
settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id)
|
|
|
|
| 443 |
logging.error('Insert chunk error: ' + str(es_r))
|
| 444 |
raise Exception('Insert chunk error: ' + str(es_r))
|
| 445 |
|
| 446 |
+
if TaskService.do_cancel(task_id):
|
| 447 |
+
settings.docStoreConn.delete({"doc_id": task_doc_id}, search.index_name(task_tenant_id), task_dataset_id)
|
| 448 |
return
|
| 449 |
|
| 450 |
+
progress_callback(1., msg="Finish Index ({:.2f}s)".format(timer() - start_ts))
|
| 451 |
+
DocumentService.increment_chunk_num(task_doc_id, task_dataset_id, tk_count, chunk_count, 0)
|
| 452 |
+
logging.info("Chunk doc({}), token({}), chunks({}), elapsed:{:.2f}".format(task_id, tk_count, len(chunks), timer() - start_ts))
|
|
|
|
|
|
|
|
|
|
| 453 |
|
| 454 |
|
| 455 |
def handle_task():
|