1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
import os
import sys
import json
import time
import configparser
from r2r import R2R, Document, GenerationConfig, R2RClient
class DocOps:
_type = ''
values_key = {
"text" : {"name": "contexts", "append": 1},
"associatedQuery": {"name": "question", "append": 0},
"id": {"name": "id", "append": 1},
"title": {"name": "titles", "append": 1},
"document_id": {"name": "document_id", "append": 1},
"extraction_id": {"name": "extraction_id", "append": 1},
"content": {"name": "answer", "append": 0}
}
def __init__(self):
self._type = 'QuestionList'
def writeDatasetFile(responses, outp_file):
print(outp_file)
output = json.dumps(responses, indent=2)
if os.path.exists(outp_file):
with open(outp_file, "a") as the_data:
the_data.write('' + output)
else:
with open(outp_file, "a") as the_data:
the_data.write(output)
def get_ragas_out_dict():
return { "titles": [],
"extraction_id": [],
"document_id": [],
"id": [],
"contexts": [],
"answer": "",
"question": ""}
def extract_response(obj, values_key, thedict):
if isinstance(obj, dict):
for key, val in obj.items():
if (key in values_key.keys()):
if (values_key[key]["append"]):
thedict[values_key[key]["name"]].append(val.replace("\n", " ").strip())
else:
thedict[values_key[key]["name"]] = val.replace("\n", " ").strip()
print(("", "Key -> {0}\tValue -> {1}".format(key,val)) [verbose])
else:
if (len(obj.items()) == 1 ):
print(key, " --> ", val)
extract_response(val, values_key, thedict)
elif isinstance(obj, list):
for item in obj:
extract_response(item, values_key, thedict)
class QuestionList:
_verbose = 0
_doc = ''
_fname = ''
_question_list = {
"domainexpert": {
"gn": [],
"aging": [],
"diabetes": []
},
"citizenscientist": {
"gn": [],
"aging": [],
"diabetes": []
}
}
def __init__(self, the_file, verbose=0):
print('QuestionList has been initialized {0}, verbosity is {1}'.format(the_file, verbose))
self._fname = the_file
self._verbose = verbose
self.read_document()
self.parse_document()
#self._print()
def read_document(self):
with open(self._fname, "r") as r_file:
self._doc = json.load(r_file)
def reset_responses():
return {
'question': [],
'answer': [],
'contexts': [],
'task_id': []
}
def parse_document(self):
print(('', '\nParse question list') [self._verbose] )
for item in self._doc:
level = item['level']
domain = item['domain']
query_lst = item['query']
self._question_list[level][domain] = query_lst
#print(('', 'Level --> {0} \tDomain --> {1}\n{2}'.format(level, domain, self.print_list(query_lst))) [self._verbose])
#create_datasets(query_lst, domain, level)
def print_list(self, the_lst):
ndx = 1
for item in the_lst:
print('\t[{0}] {1}'.format(ndx, item))
ndx += 1
def _print(self):
print(json.dumps(self._question_list, indent=2))
def get(self, level, domain):
return self._question_list[level][domain]
|