Skip to content

Commit 1318fa8

Browse files
Merge pull request #831 from project-anuvaad/wfmrefactoring
Bulk Call - Auto Translation Completed Fix
2 parents 2c29298 + 452176a commit 1318fa8

File tree

4 files changed

+210
-2
lines changed

4 files changed

+210
-2
lines changed

anuvaad-etl/anuvaad-workflow-mgr/etl-wf-manager/configs/wfmconfig.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,8 @@
8181
#Specific variables
8282
granularity_list = ["manualEditingStartTime","manualEditingEndTime","parallelDocumentUpload","reviewerInProgress","reviewerCompleted"]
8383
workflowCodesTranslation = ["DP_WFLOW_FBT","WF_A_FCBMTKTR","DP_WFLOW_FBTTR","WF_A_FTTKTR"]
84+
85+
#Service URLs
86+
DOCUMENT_CONVERTER_SERVER_URL=os.environ.get("DOCUMENT_CONVERTER_SERVER_URL","http://anuvaad-etl-document-converter:5001/")
87+
ZUUL_ROUTES_FU_URL = os.environ.get("ZUUL_ROUTES_FU_URL","http://anuvaad-suploader:5001/")
88+
ZUUL_ROUTES_WFM_URL = os.environ.get("ZUUL_ROUTES_WFM_URL","http://anuvaad-etl-wf-manager:5001/")

anuvaad-etl/anuvaad-workflow-mgr/etl-wf-manager/controller/wfmcontroller.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,19 @@ def active_docs():
203203
log_exception("Something went wrong: " + str(e), None, e)
204204
return {"status": "FAILED", "message": "Something went wrong"}, 400
205205

206+
@wfmapp.route(context_path + '/v1/translate_pipeline', methods=["POST"])
207+
def translate_pipeline():
208+
try:
209+
service = WFMService()
210+
data = add_headers(request.get_json(), request)
211+
response = service.digitization_translation_pipeline(data)
212+
if not response:
213+
return {"response": response}, 400
214+
return {"response": response}, 200
215+
except Exception as e:
216+
log_exception("Something went wrong: " + str(e), None, e)
217+
return {"status": "FAILED", "message": "Something went wrong"}, 400
218+
206219
# Fetches required headers from the request and adds it to the body.
207220
def add_headers(data, api_request):
208221
headers = {
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
from anuvaad_auditor import log_info, log_error
2+
import json
3+
import requests
4+
from configs.wfmconfig import app_context
5+
from configs.wfmconfig import DOCUMENT_CONVERTER_SERVER_URL, ZUUL_ROUTES_FU_URL, ZUUL_ROUTES_WFM_URL
6+
import traceback
7+
8+
class PipelineCalls:
9+
10+
def document_export(self,user_id,record_id,filetype,headers):
11+
log_info("Performing Document Export",app_context)
12+
payload = json.dumps({
13+
"record_id": record_id,
14+
"user_id": user_id,
15+
"file_type": filetype
16+
})
17+
headers = {
18+
'Content-Type': 'application/json'
19+
}
20+
try:
21+
url = DOCUMENT_CONVERTER_SERVER_URL + "anuvaad-etl/document-converter/v0/document-exporter"
22+
response = requests.request("POST", url, headers=headers, data=payload)
23+
log_info(f"Document Export Response {response.status_code}",app_context)
24+
if response.status_code >=200 and response.status_code <=204:
25+
return response.json()["translated_document"]
26+
except Exception as e:
27+
log_error(f"Error during document conversion : {traceback.format_exc()}",app_context,e)
28+
29+
def download_file(self,download_path):
30+
log_info("Performing File Download",app_context)
31+
url = ZUUL_ROUTES_FU_URL + "anuvaad-api/file-uploader/v0/serve-file?filename=" + download_path
32+
try:
33+
response = requests.request("GET", url)
34+
if response.status_code >=200 and response.status_code <=204:
35+
return response.content
36+
except Exception as e:
37+
log_error(f"Error during file download : {traceback.format_exc()}",app_context,e)
38+
39+
40+
def upload_files(self,filepath,headers):
41+
# hit upload_file api and fetch file_id
42+
request_headers = {
43+
"x-user-id": headers["userID"],
44+
"x-org-id": headers["orgID"],
45+
"x-roles": headers["roles"],
46+
"x-request-id": headers["requestID"],
47+
"x-session-id": headers["sessionID"]
48+
}
49+
log_info("Performing Upload File",app_context)
50+
try:
51+
uploadfiles_body = {
52+
'file': open(filepath,'rb')
53+
}
54+
url = ZUUL_ROUTES_FU_URL + "anuvaad-api/file-uploader/v0/upload-file"
55+
req = requests.post(timeout=120,url=url,files=uploadfiles_body,headers=request_headers)
56+
if req.status_code >=200 and req.status_code <=204:
57+
file_id = req.json()["data"]
58+
return file_id
59+
else:
60+
return None
61+
except requests.exceptions.RequestException as e:
62+
log_error(f"Error during document conversion : {traceback.format_exc()}",app_context,e)
63+
64+
def translate(self,file_name,file_id,payload,headers):
65+
66+
request_headers = {
67+
"x-user-id": headers["userID"],
68+
"x-org-id": headers["orgID"],
69+
"x-roles": headers["roles"],
70+
"x-request-id": headers["requestID"],
71+
"x-session-id": headers["sessionID"]
72+
}
73+
74+
payload["jobName"] = file_name
75+
payload["files"][0]["path"] = file_id
76+
payload["files"][0]["type"] = file_id.split()[-1]
77+
78+
# Perform translation
79+
log_info(f"Performing Translation {file_id}",app_context)
80+
asyncwf_body = payload
81+
try:
82+
url = ZUUL_ROUTES_WFM_URL+"anuvaad-etl/wf-manager/v1/workflow/async/initiate"
83+
req = requests.post(timeout=120,url=url,json=asyncwf_body, headers=request_headers)
84+
if req.status_code >=200 and req.status_code <=204:
85+
resp = req.json()
86+
return resp
87+
else:
88+
return None
89+
except requests.exceptions.RequestException as e:
90+
log_error(f"Error during file download : {traceback.format_exc()}",app_context,e)
91+

anuvaad-etl/anuvaad-workflow-mgr/etl-wf-manager/service/wfmservice.py

Lines changed: 101 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from anuvaad_auditor.errorhandler import post_error_wf, post_error, log_exception
1111
from anuvaad_auditor.loghandler import log_info, log_error
1212
from repository.redisrepo import REDISRepository
13+
from service.pipelinecalls import PipelineCalls
1314
from configs.wfmconfig import app_context, workflowCodesTranslation
1415
import datetime
1516

@@ -19,6 +20,7 @@
1920
wfmutils = WFMUtils()
2021
validator = WFMValidator()
2122
redisRepo = REDISRepository()
23+
pipelineCalls = PipelineCalls()
2224

2325
class WFMService:
2426
def __init__(self):
@@ -389,6 +391,9 @@ def get_job_details_bulk(self, req_criteria, skip_pagination, isReviewer=False):
389391
jobIDs.append(jobID)
390392
if len(jobIDs) > 0:
391393
criteria["jobID"] = {"$in": jobIDs}
394+
if 'inputFileName' in req_criteria.keys():
395+
jobName_pattern = req_criteria["inputFileName"]
396+
criteria["input.jobName"] = {"$regex": jobName_pattern}
392397
if 'orgIDs' in req_criteria.keys():
393398
if req_criteria["orgIDs"]:
394399
orgIDs = []
@@ -404,7 +409,13 @@ def get_job_details_bulk(self, req_criteria, skip_pagination, isReviewer=False):
404409
if currentStat:
405410
currentStatus.append(currentStat)
406411
if len(currentStatus) > 0:
407-
criteria["granularity.currentStatus"] = {"$in": currentStatus}
412+
if "auto_translation_completed" in currentStatus:
413+
criteria["$or"] = [
414+
{"granularity.currentStatus": {"$in": currentStatus}},
415+
{"granularity.currentStatus": {"$exists": False}}
416+
]
417+
else:
418+
criteria["granularity.currentStatus"] = {"$in": currentStatus}
408419
if 'filterByStartTime' in req_criteria.keys():
409420
if 'startTimeStamp' in req_criteria['filterByStartTime'].keys() and 'endTimeStamp' in req_criteria['filterByStartTime'].keys():
410421
criteria["startTime"] = { "$gte": req_criteria['filterByStartTime']['startTimeStamp'], "$lte": req_criteria['filterByStartTime']['endTimeStamp']}
@@ -581,4 +592,92 @@ def get_active_doc_count(self):
581592
response = redisRepo.get_active_count()
582593
return response
583594
except Exception as e:
584-
log_exception("Active Job Status Retrieval: {wf_async_input['jobID']} " + str(e), None, e)
595+
log_exception("Active Job Status Retrieval: {wf_async_input['jobID']} " + str(e), None, e)
596+
597+
def digitization_translation_pipeline(self,data):
598+
"""
599+
{
600+
"record_id": "A_FWLBOD20TESOT-hnHgN-1693227866067%7C0-16932280318450673.json",
601+
"user_id": "d225fb2cd78a45078518356548f396ff1686290705872",
602+
"file_type": "pdf",
603+
"file_name": "name.pdf"
604+
"translation_async_flow" : {
605+
"workflowCode": "WF_A_FCBMTKTR",
606+
"jobName": "1958_1_1150_1155_updated.pdf",
607+
"jobDescription": "",
608+
"files": [
609+
{
610+
"path": "01c440b8-8aac-4352-9b44-c3fbf1ddca6e.pdf",
611+
"type": "pdf",
612+
"locale": "en",
613+
"model": {
614+
"uuid": "687baea0-4512-4fb9-9264-5c7b368afc59",
615+
"is_primary": true,
616+
"model_id": 103,
617+
"model_name": "English-Hindi IndicTrans Model-1",
618+
"source_language_code": "en",
619+
"source_language_name": "English",
620+
"target_language_code": "hi",
621+
"target_language_name": "Hindi",
622+
"description": "AAI4B en-hi model-1(indictrans/fairseq)",
623+
"status": "ACTIVE",
624+
"connection_details": {
625+
"kafka": {
626+
"input_topic": "KAFKA_AAI4B_NMT_TRANSLATION_INPUT_TOPIC",
627+
"output_topic": "KAFKA_AAI4B_NMT_TRANSLATION_OUTPUT_TOPIC"
628+
},
629+
"translation": {
630+
"api_endpoint": "AAIB_NMT_TRANSLATE_ENDPOINT",
631+
"host": "AAI4B_NMT_HOST"
632+
},
633+
"interactive": {
634+
"api_endpoint": "AAIB_NMT_IT_ENDPOINT",
635+
"host": "AAI4B_NMT_HOST"
636+
}
637+
},
638+
"interactive_translation": true
639+
},
640+
"context": "JUDICIARY",
641+
"modifiedSentences": "a"
642+
}
643+
]
644+
}
645+
}
646+
"""
647+
try:
648+
if "record_id" not in data.keys():
649+
return {"status" : "Error", "reason":"record_id missing"}
650+
if data["file_type"] in ["jpg","bmp","png","svg","jpeg"]:
651+
data["record_id"] = data["record_id"].replace("%7C","|")
652+
data["file_type"] = "pdf"
653+
data["file_name"] = data["file_name"].replace(data["file_name"].split(".")[-1],"pdf")
654+
655+
document = pipelineCalls.document_export(data["user_id"],data["record_id"],data["file_type"],data["metadata"])
656+
if document is None:
657+
return {"status":"Error","reason":"Document Export Failed"}
658+
file_content = pipelineCalls.download_file(document)
659+
if file_content is None:
660+
return {"status":"Error","reason":"File Download Failed"}
661+
662+
if not os.path.exists("upload_files"):
663+
# If it doesn't exist, create it
664+
os.makedirs("upload_files")
665+
666+
with open("./upload_files/"+data["file_name"], "wb") as file:
667+
file.write(file_content)
668+
669+
file_id = pipelineCalls.upload_files("./upload_files/"+data["file_name"],data["metadata"])
670+
if file_id is None:
671+
return {"status":"Error","reason":"File Upload Failed"}
672+
673+
# Delete uploaded file
674+
try:
675+
os.remove("./upload_files/"+data["file_name"])
676+
except Exception as e:
677+
log_error(f"Exception during file deletion",app_context,e)
678+
679+
response = pipelineCalls.translate(data["file_name"],file_id,data["translation_async_flow"],data["metadata"])
680+
return response
681+
except Exception as e:
682+
log_error(f"Exception occurred {e}",e,app_context)
683+

0 commit comments

Comments
 (0)