# pylint: skip-file import json import string import os import datetime import time import requests from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from requests import HTTPError,Session from urllib.parse import urljoin,quote from urllib.request import urlretrieve from errors.rag_err import UnprocessableEntity, LLMError basedir = os.path.join(os.path.dirname(__file__)) class TimeoutHTTPAdapter(HTTPAdapter): def __init__(self, timeout, *args, **kwargs): """TimeoutHTTPAdapter constructor. Args: timeout (int): How many seconds to wait for the server to send data before giving up. """ self.timeout = timeout super().__init__(*args, **kwargs) def send(self, request, **kwargs): """Override :obj:`HTTPAdapter` send method to add a default timeout.""" timeout = kwargs.get("timeout") if timeout is None: kwargs["timeout"] = self.timeout return super().send(request, **kwargs) class GeneNetworkQAClient(Session): """GeneNetworkQA Client This class provides a client object interface to the GeneNetworkQA API. It extends the `requests.Session` class and includes authorization, base URL, request timeouts, and request retries. Args: account (str): Base address subdomain. api_key (str): API key. version (str, optional): API version, defaults to "v3". timeout (int, optional): Timeout value, defaults to 5. total_retries (int, optional): Total retries value, defaults to 5. backoff_factor (int, optional): Retry backoff factor value, defaults to 30. Usage: from genenetworkqa import GeneNetworkQAClient gnqa = GeneNetworkQAClient(account="account-name", api_key="XXXXXXXXXXXXXXXXXXX...") """ BASE_URL = 'https://genenetwork.fahamuai.com/api/tasks' def __init__(self, account, api_key, version="v3", timeout=30, total_retries=5, backoff_factor=30): super().__init__() self.headers.update( {"Authorization": "Bearer " + api_key}) self.answer_url = f"{self.BASE_URL}/answers" self.feedback_url = f"{self.BASE_URL}/feedback" adapter = TimeoutHTTPAdapter( timeout=timeout, max_retries=Retry( total=total_retries, status_forcelist=[429, 500, 502, 503, 504], backoff_factor=backoff_factor, ), ) self.mount("https://", adapter) self.mount("http://", adapter) @staticmethod def format_bibliography_info(bib_info): if isinstance(bib_info, str): # Remove '.txt' bib_info = bib_info.removesuffix('.txt') elif isinstance(bib_info, dict): # Format string bibliography information bib_info = "{0}.{1}.{2}.{3} ".format(bib_info.get('author', ''), bib_info.get('title', ''), bib_info.get('year', ''), bib_info.get('doi', '')) return bib_info @staticmethod def ask_the_documents(extend_url, my_auth): try: response = requests.post( base_url + extend_url, data={}, headers=my_auth) response.raise_for_status() except requests.exceptions.RequestException as e: # Handle the exception appropriately, e.g., log the error raise RuntimeError(f"Error making the request: {e}") if response.status_code != 200: return negative_status_msg(response), 0 task_id = get_task_id_from_result(response) response = get_answer_using_task_id(task_id, my_auth) if response.status_code != 200: return negative_status_msg(response), 0 return response, 1 @staticmethod def negative_status_msg(response): return f"Error: Status code -{response.status_code}- Reason::{response.reason}" # return f"Problems\n\tStatus code => {response.status_code}\n\tReason => {response.reason}" def ask(self, exUrl, *args, **kwargs): askUrl = self.BASE_URL + exUrl res = self.custom_request('POST', askUrl, *args, **kwargs) if (res.status_code != 200): return self.negative_status_msg(res), 0 task_id = self.getTaskIDFromResult(res) return res, task_id def answer(self, taskid, *args, **kwargs): query = self.answer_url + self.extendForTaskID(taskid) res = self.custom_request('GET', query, *args, **kwargs) if (res.status_code != 200): print('The result is {0}',format(res)) return self.negative_status_msg(res), 0 return res, 1 def get_answer(self, taskid, *args, **kwargs): query = self.answer_url + self.extendTaskID(taskid) res = self.custom_request('GET', query, *args, **kwargs) if (res.status_code != 200): print('The result is {0}',format(res)) return self.negative_status_msg(res), 0 return res, 1 def custom_request(self, method, url, *args, **kwargs): max_retries = 50 retry_delay = 3 for i in range(max_retries): try: response = super().request(method, url, *args, **kwargs) response.raise_for_status() except requests.exceptions.HTTPError as error: if error.response.status_code ==500: raise LLMError(error.request, error.response, f"Response Error,status_code:{error.response.status_code},Reason: Use of Invalid Token") elif error.response.status_code ==404: raise LLMError(error.request,error.response,f"404 Client Error: Not Found for url: {self.BASE_URL}") raise error except requests.exceptions.RequestException as error: raise error if response.ok: if method.lower() == "get" and response.json().get("data") is None: time.sleep(retry_delay) continue else: return response else: time.sleep(retry_delay) return response @staticmethod def get_task_id_from_result(response): task_id = json.loads(response.text) result = f"?task_id={task_id.get('task_id', '')}" return result @staticmethod def get_answer_using_task_id(extend_url, my_auth): try: response = requests.get( answer_url + extend_url, data={}, headers=my_auth) response.raise_for_status() return response except requests.exceptions.RequestException as error: # Handle the exception appropriately, e.g., log the error raise error @staticmethod def filter_response_text(val): """ Filters out non-printable characters from the input string and parses it as JSON. Args: val (str): Input string to be filtered and parsed. Returns: dict: Parsed JSON object. # remove this """ return json.loads(''.join([str(char) for char in val if char in string.printable])) def getTaskIDFromResult(self, res): return json.loads(res.text) def extendTaskID(self, task_id): return '?task_id=' + str(task_id['task_id']) def extendForTaskID(self, task_id): return '?task_id=' + str(task_id) def get_gnqa(self, query): qstr = quote(query) res, task_id = api_client.ask('?ask=' + qstr) res, success = api_client.get_answer(task_id) if success == 1: resp_text = filter_response_text(res.text) answer = resp_text.get('data', {}).get('answer', '') context = resp_text.get('data', {}).get('context', '') return answer, context else: return res, "Unfortunately, I have nothing."