diff --git a/cope2n-api/fwd_api/api/ctel_view.py b/cope2n-api/fwd_api/api/ctel_view.py index 8bdf416..c76fad3 100755 --- a/cope2n-api/fwd_api/api/ctel_view.py +++ b/cope2n-api/fwd_api/api/ctel_view.py @@ -84,7 +84,7 @@ class CtelViewSet(viewsets.ViewSet): file_path = FileUtils.resize_and_save_file(file_name, new_request, file_obj, 100) S3_path = FileUtils.save_to_S3(file_name, new_request, file_path) - files: [{ + files =[{ "file_name": file_name, "file_path": file_path, # local path to file "file_type": "" diff --git a/cope2n-api/fwd_api/celery_worker/internal_task.py b/cope2n-api/fwd_api/celery_worker/internal_task.py index 4ef99ba..b424172 100755 --- a/cope2n-api/fwd_api/celery_worker/internal_task.py +++ b/cope2n-api/fwd_api/celery_worker/internal_task.py @@ -1,31 +1,34 @@ -import time -import uuid -import os import base64 +import copy +import csv +import json +import os +import time import traceback +import uuid from multiprocessing.pool import ThreadPool -from fwd_api.models import SubscriptionRequest, UserProfile -from fwd_api.celery_worker.worker import app +from celery.utils.log import get_task_logger +from opentelemetry import trace + +from fwd import settings from fwd_api.celery_worker.task_warpper import VerboseTask +from fwd_api.celery_worker.worker import app +from fwd_api.constant.common import FileCategory +from fwd_api.middleware.local_storage import get_current_trace_id +from fwd_api.models import (FeedbackRequest, Report, SubscriptionRequest, + SubscriptionRequestFile, UserProfile) +from fwd_api.utils.accuracy import predict_result_to_ready + from ..constant.common import FolderFileType, image_extensions from ..exception.exceptions import FileContentInvalidException -from fwd_api.models import SubscriptionRequestFile, FeedbackRequest, Report from ..utils import file as FileUtils from ..utils import process as ProcessUtil from ..utils import s3 as S3Util from ..utils.accuracy import validate_feedback_file -from fwd_api.constant.common import FileCategory -from fwd_api.middleware.local_storage import get_current_trace_id -import csv -import json -import copy - -from fwd_api.utils.accuracy import predict_result_to_ready -from celery.utils.log import get_task_logger -from fwd import settings logger = get_task_logger(__name__) +tracer = trace.get_tracer() s3_client = S3Util.MinioS3Client( endpoint=settings.S3_ENDPOINT, @@ -34,6 +37,7 @@ s3_client = S3Util.MinioS3Client( bucket_name=settings.S3_BUCKET_NAME ) +@tracer.start_as_current_span("process_pdf_file") def process_pdf_file(file_name: str, file_path: str, request, user, doc_type: str, index_in_request: int) -> list: try: # Origin file @@ -54,7 +58,7 @@ def process_pdf_file(file_name: str, file_path: str, request, user, doc_type: st request.save() return None - +@tracer.start_as_current_span("process_image_file") def process_image_file(file_name: str, file_path, request, user) -> list: new_request_file: SubscriptionRequestFile = SubscriptionRequestFile(file_path=file_path, request=request, @@ -67,7 +71,7 @@ def process_image_file(file_name: str, file_path, request, user) -> list: 'request_file_id': new_request_file.code }] -@app.task(base=VerboseTask, name="csv_feedback") +@app.task(base=VerboseTask, name="csv_feedback", track_started=True) def process_csv_feedback(csv_file_path, feedback_id): # load file to RAM status = {} @@ -163,7 +167,7 @@ def process_csv_feedback(csv_file_path, feedback_id): logger.error(f"Unable to set S3: {e}") feedback_rq.save() -@app.task(base=VerboseTask, name='do_pdf') +@app.task(base=VerboseTask, name='do_pdf', track_started=True) def process_pdf(rq_id, sub_id, p_type, user_id, files): """ files: [{ @@ -231,7 +235,7 @@ def process_pdf(rq_id, sub_id, p_type, user_id, files): for sub_rq_id, sub_id, urls, user_id, p_type, metadata in to_queue: ProcessUtil.send_to_queue2(sub_rq_id, sub_id, urls, user_id, p_type, metadata) -@app.task(base=VerboseTask, name='upload_file_to_s3') +@app.task(base=VerboseTask, name='upload_file_to_s3', track_started=True) def upload_file_to_s3(local_file_path, s3_key, request_id): if s3_client.s3_client is not None: try: @@ -245,7 +249,7 @@ def upload_file_to_s3(local_file_path, s3_key, request_id): else: logger.info(f"S3 is not available, skipping,...") -@app.task(base=VerboseTask, name='upload_feedback_to_s3') +@app.task(base=VerboseTask, name='upload_feedback_to_s3', track_started=True) def upload_feedback_to_s3(local_file_path, s3_key, feedback_id): if s3_client.s3_client is not None: try: @@ -259,7 +263,7 @@ def upload_feedback_to_s3(local_file_path, s3_key, feedback_id): else: logger.info(f"S3 is not available, skipping,...") -@app.task(base=VerboseTask, name='upload_report_to_s3') +@app.task(base=VerboseTask, name='upload_report_to_s3', track_started=True) def upload_report_to_s3(local_file_path, s3_key, report_id, delay): if s3_client.s3_client is not None: try: @@ -275,7 +279,7 @@ def upload_report_to_s3(local_file_path, s3_key, report_id, delay): else: logger.info(f"S3 is not available, skipping,...") -@app.task(base=VerboseTask, name='remove_local_file') +@app.task(base=VerboseTask, name='remove_local_file', track_started=True) def remove_local_file(local_file_path, request_id): logger.info(f"Removing local file: {local_file_path}, ...") try: @@ -283,7 +287,7 @@ def remove_local_file(local_file_path, request_id): except Exception as e: logger.info(f"Unable to remove local file: {e}") -@app.task(base=VerboseTask, name='upload_obj_to_s3') +@app.task(base=VerboseTask, name='upload_obj_to_s3', track_started=True) def upload_obj_to_s3(byte_obj, s3_key): if s3_client.s3_client is not None: obj = base64.b64decode(byte_obj) diff --git a/cope2n-api/fwd_api/celery_worker/process_report_tasks.py b/cope2n-api/fwd_api/celery_worker/process_report_tasks.py index a93faa7..07ef146 100755 --- a/cope2n-api/fwd_api/celery_worker/process_report_tasks.py +++ b/cope2n-api/fwd_api/celery_worker/process_report_tasks.py @@ -18,12 +18,15 @@ import json import copy import os +from opentelemetry import trace from celery.utils.log import get_task_logger from fwd import settings redis_client = RedisUtils() logger = get_task_logger(__name__) +tracer = trace.get_tracer() + s3_client = S3Util.MinioS3Client( endpoint=settings.S3_ENDPOINT, @@ -32,13 +35,14 @@ s3_client = S3Util.MinioS3Client( bucket_name=settings.S3_BUCKET_NAME ) +@tracer.start_as_current_span("mean_list") def mean_list(l): l = [x for x in l if x is not None] if len(l) == 0: return 0 return sum(l)/len(l) -@app.task(base=VerboseTask, name='make_a_report_2') +@app.task(base=VerboseTask, name='make_a_report_2', track_started=True) def make_a_report_2(report_id, query_set): report_type = query_set.pop("report_type", "accuracy") if report_type == "accuracy": @@ -48,7 +52,7 @@ def make_a_report_2(report_id, query_set): else: raise TypeError("Invalid report type") - +@tracer.start_as_current_span("create_accuracy_report") def create_accuracy_report(report_id, **kwargs): try: start_date = timezone.datetime.strptime(kwargs["start_date_str"], '%Y-%m-%dT%H:%M:%S%z') @@ -245,7 +249,7 @@ def create_accuracy_report(report_id, **kwargs): traceback.print_exc() return 400 - +@tracer.start_as_current_span("create_billing_report") def create_billing_report(report_id, **kwargs): try: start_date = timezone.datetime.strptime( diff --git a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py index ac6f253..ad95116 100755 --- a/cope2n-api/fwd_api/celery_worker/process_result_tasks.py +++ b/cope2n-api/fwd_api/celery_worker/process_result_tasks.py @@ -1,24 +1,26 @@ -import traceback -import time -import uuid import logging - +import time +import traceback +import uuid from copy import deepcopy +from celery.utils.log import get_task_logger +from opentelemetry import trace + +from fwd_api.celery_worker.task_warpper import VerboseTask from fwd_api.celery_worker.worker import app -from fwd_api.models import SubscriptionRequest +from fwd_api.constant.common import ProcessType from fwd_api.exception.exceptions import InvalidException from fwd_api.models import SubscriptionRequest, SubscriptionRequestFile -from fwd_api.constant.common import ProcessType -from fwd_api.utils.redis import RedisUtils from fwd_api.utils import process as ProcessUtil -from fwd_api.celery_worker.task_warpper import VerboseTask -from celery.utils.log import get_task_logger +from fwd_api.utils.redis import RedisUtils logger = get_task_logger(__name__) +tracer = trace.get_tracer() redis_client = RedisUtils() +@tracer.start_as_current_span("aggregate_result") def aggregate_result(results): sorted_results = [None] * len(results) doc_types = [] @@ -58,16 +60,17 @@ def aggregate_result(results): return des_result +@tracer.start_as_current_span("print_id") def print_id(rq_id): logger.info(" [x] Received {rq}".format(rq=rq_id)) - +@tracer.start_as_current_span("to_status") def to_status(result): if 'status' in result and result['status'] not in [200, 201, 202]: return 4 return 3 - +@tracer.start_as_current_span("update_user") def update_user(rq: SubscriptionRequest): sub = rq.subscription predict_status = rq.status @@ -75,7 +78,7 @@ def update_user(rq: SubscriptionRequest): sub.current_token += ProcessUtil.token_value(int(rq.process_type)) sub.save() -@app.task(base=VerboseTask, name='process_sap_invoice_result') +@app.task(base=VerboseTask, name='process_sap_invoice_result', track_started=True) def process_invoice_sap_result(rq_id, result): try: rq: SubscriptionRequest = \ @@ -97,7 +100,7 @@ def process_invoice_sap_result(rq_id, result): return "FailInvoice" -@app.task(base=VerboseTask, name='process_fi_invoice_result') +@app.task(base=VerboseTask, name='process_fi_invoice_result', track_started=True) def process_invoice_fi_result(rq_id, result): try: rq: SubscriptionRequest = \ @@ -118,7 +121,7 @@ def process_invoice_fi_result(rq_id, result): traceback.print_exc() return "FailInvoice" -@app.task(base=VerboseTask, name='process_manulife_invoice_result') +@app.task(base=VerboseTask, name='process_manulife_invoice_result', track_started=True) def process_invoice_manulife_result(rq_id, result): try: rq: SubscriptionRequest = \ @@ -141,7 +144,7 @@ def process_invoice_manulife_result(rq_id, result): random_processor_name = None -@app.task(base=VerboseTask, name='process_sbt_invoice_result') +@app.task(base=VerboseTask, name='process_sbt_invoice_result', track_started=True) def process_invoice_sbt_result(rq_id, result, metadata): global random_processor_name if random_processor_name is None: @@ -194,7 +197,7 @@ def process_invoice_sbt_result(rq_id, result, metadata): rq.save() return "FailInvoice" - +@tracer.start_as_current_span("_update_subscription_rq_file") def _update_subscription_rq_file(request_id, index_in_request, doc_type, result): image = SubscriptionRequestFile.objects.filter(request=request_id, index_in_request=index_in_request, doc_type=doc_type).first() retailer_name = None @@ -234,6 +237,7 @@ def _update_subscription_rq_file(request_id, index_in_request, doc_type, result) image.predict_result = _predict_result image.save() +@tracer.start_as_current_span("__get_actual_predict_result") def __get_actual_predict_result(result: dict): predicted_res = result.get('content', {}).get('document', []) if len(predicted_res)==0: diff --git a/cope2n-api/fwd_api/celery_worker/worker.py b/cope2n-api/fwd_api/celery_worker/worker.py index 816a073..acbe75c 100755 --- a/cope2n-api/fwd_api/celery_worker/worker.py +++ b/cope2n-api/fwd_api/celery_worker/worker.py @@ -1,16 +1,51 @@ +import copy import os import django from celery import Celery -from celery.signals import setup_logging # noqa -from kombu import Queue -import copy +from celery.signals import setup_logging, worker_process_init # noqa +from kombu import Exchange, Queue +from opentelemetry import metrics, trace +from opentelemetry.exporter.otlp.proto.http.metric_exporter import \ + OTLPMetricExporter +from opentelemetry.exporter.otlp.proto.http.trace_exporter import \ + OTLPSpanExporter +from opentelemetry.instrumentation.celery import CeleryInstrumentor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor from fwd import settings os.environ.setdefault("DJANGO_SETTINGS_MODULE", "fwd.settings") django.setup() +env = environ.Env( + DEBUG=(bool, False) +) + +tracer_endpoint = env.str("tracer_endpoint", "http://jaeger_collector:4318") +service_name = "sbt_celery_backend" + +@worker_process_init.connect(weak=False) +def init_celery_tracing(*args, **kwargs): + CeleryInstrumentor().instrument() + span_exporter = OTLPSpanExporter(endpoint=f"{tracer_endpoint}/v1/traces") + processor = BatchSpanProcessor(span_exporter=span_exporter) + + attributes = {SERVICE_NAME: service_name} + resource = Resource(attributes=attributes) + trace_provider = TracerProvider(resource=resource) + trace_provider.add_span_processor(span_processor=processor) + trace.set_tracer_provider(tracer_provider=trace_provider) + + reader = PeriodicExportingMetricReader( + OTLPMetricExporter(endpoint=f"{tracer_endpoint}/v1/metrics")) + meter_provider = MeterProvider(resource=resource, metric_readers=[reader]) + metrics.set_meter_provider(meter_provider=meter_provider) + app: Celery = Celery( 'postman', broker=settings.BROKER_URL, @@ -24,6 +59,7 @@ app.autodiscover_tasks() @setup_logging.connect def config_loggers(*args, **kwargs): from logging.config import dictConfig # noqa + from django.conf import settings # noqa log_config = copy.deepcopy(settings.LOGGING) diff --git a/cope2n-api/requirements.txt b/cope2n-api/requirements.txt index 125350c..789be16 100755 --- a/cope2n-api/requirements.txt +++ b/cope2n-api/requirements.txt @@ -59,4 +59,17 @@ openpyxl==3.1.2 # For sdsvkvu compatibility # torch==1.13.1+cu116 # torchvision==0.14.1+cu116 -# --extra-index-url https://download.pytorch.org/whl/cu116 \ No newline at end of file +# --extra-index-url https://download.pytorch.org/whl/cu116 + +opentelemetry-api==1.27.0 +opentelemetry-distro==0.48b0 +opentelemetry-exporter-otlp==1.27.0 +opentelemetry-exporter-otlp-proto-common==1.27.0 +opentelemetry-exporter-otlp-proto-http==1.27.0 +opentelemetry-instrumentation==0.48b0 +opentelemetry-instrumentation-celery==0.48b0 +opentelemetry-instrumentation-django==0.48b0 +opentelemetry-proto==1.27.0 +opentelemetry-sdk==1.27.0 +opentelemetry-semantic-conventions==0.48b0 +opentelemetry-util-http==0.48b0 \ No newline at end of file