import json
import os
import fire
import requests
from bs4 import BeautifulSoup, Comment
from r2r import (
EntityType,
R2RClient,
R2RPromptProvider,
Relation,
update_kg_prompt,
)
def escape_braces(text):
return text.replace("{", "{{").replace("}", "}}")
def get_all_yc_co_directory_urls():
this_file_path = os.path.abspath(os.path.dirname(__file__))
yc_company_dump_path = os.path.join(
this_file_path, "..", "data", "yc_companies.txt"
)
with open(yc_company_dump_path, "r") as f:
urls = f.readlines()
urls = [url.strip() for url in urls]
return {url.split("/")[-1]: url for url in urls}
# Function to fetch and clean HTML content
def fetch_and_clean_yc_co_data(url):
# Fetch the HTML content from the URL
response = requests.get(url)
response.raise_for_status() # Raise an error for bad status codes
html_content = response.text
# Parse the HTML content with BeautifulSoup
soup = BeautifulSoup(html_content, "html.parser")
# Remove all <script>, <style>, <meta>, <link>, <header>, <nav>, and <footer> elements
for element in soup(
["script", "style", "meta", "link", "header", "nav", "footer"]
):
element.decompose()
# Remove comments
for comment in soup.findAll(text=lambda text: isinstance(text, Comment)):
comment.extract()
# Select the main content (you can adjust the selector based on the structure of your target pages)
main_content = soup.select_one("main") or soup.body
if main_content:
spans = main_content.find_all(["span", "a"])
proc_spans = []
for span in spans:
proc_spans.append(span.get_text(separator=" ", strip=True))
span_text = "\n".join(proc_spans)
# Extract the text content from the main content
paragraphs = main_content.find_all(
["p", "h1", "h2", "h3", "h4", "h5", "h6", "li"]
)
cleaned_text = (
"### Bulk:\n\n"
+ "\n\n".join(
paragraph.get_text(separator=" ", strip=True)
for paragraph in paragraphs
)
+ "\n\n### Metadata:\n\n"
+ span_text
)
return cleaned_text
else:
return "Main content not found"
def execute_query(provider, query, params={}):
print(f"Executing query: {query}")
with provider.client.session(database=provider._database) as session:
result = session.run(query, params)
return [record.data() for record in result]
def main(
max_entries=50,
local_mode=True,
base_url="http://localhost:8000",
):
# Specify the entity types for the KG extraction prompt
entity_types = [
EntityType("COMPANY"),
EntityType("SCHOOL"),
EntityType("LOCATION"),
EntityType("PERSON"),
EntityType("DATE"),
EntityType("OTHER"),
EntityType("QUANTITY"),
EntityType("EVENT"),
EntityType("INDUSTRY"),
EntityType("MEDIA"),
]
# Specify the relations for the KG construction
relations = [
# Founder Relations
Relation("EDUCATED_AT"),
Relation("WORKED_AT"),
Relation("FOUNDED"),
# Company relations
Relation("RAISED"),
Relation("REVENUE"),
Relation("TEAM_SIZE"),
Relation("LOCATION"),
Relation("ACQUIRED_BY"),
Relation("ANNOUNCED"),
Relation("INDUSTRY"),
# Product relations
Relation("PRODUCT"),
Relation("FEATURES"),
Relation("TECHNOLOGY"),
# Additional relations
Relation("HAS"),
Relation("AS_OF"),
Relation("PARTICIPATED"),
Relation("ASSOCIATED"),
]
client = R2RClient(base_url=base_url)
r2r_prompts = R2RPromptProvider()
prompt_base = (
"zero_shot_ner_kg_extraction"
if local_mode
else "few_shot_ner_kg_extraction"
)
update_kg_prompt(client, r2r_prompts, prompt_base, entity_types, relations)
url_map = get_all_yc_co_directory_urls()
i = 0
# Ingest and clean the data for each company
for company, url in url_map.items():
company_data = fetch_and_clean_yc_co_data(url)
if i >= max_entries:
break
i += 1
try:
# Ingest as a text document
file_name = f"{company}.txt"
with open(file_name, "w") as f:
f.write(company_data)
client.ingest_files(
[file_name],
metadatas=[{"title": company}],
)
os.remove(file_name)
except:
continue
print(client.inspect_knowledge_graph(1_000)["results"])
if not local_mode:
update_kg_prompt(
client, r2r_prompts, "kg_agent", entity_types, relations
)
result = client.search(
query="Find up to 10 founders that worked at Google",
use_kg_search=True,
)["results"]
print("result:\n", result)
print("Search Result:\n", result["kg_search_results"])
result = client.rag(
query="Find up to 10 founders that worked at Google",
use_kg_search=True,
)
print("RAG Result:\n", result)
if __name__ == "__main__":
fire.Fire(main)