From 4a52a71956a8d46fcb7294ac71734504bb09bcc2 Mon Sep 17 00:00:00 2001 From: S. Solomon Darnell Date: Fri, 28 Mar 2025 21:52:21 -0500 Subject: two version of R2R are here --- .../site-packages/firecrawl/firecrawl.py | 1286 ++++++++++++++++++++ 1 file changed, 1286 insertions(+) create mode 100644 .venv/lib/python3.12/site-packages/firecrawl/firecrawl.py (limited to '.venv/lib/python3.12/site-packages/firecrawl/firecrawl.py') diff --git a/.venv/lib/python3.12/site-packages/firecrawl/firecrawl.py b/.venv/lib/python3.12/site-packages/firecrawl/firecrawl.py new file mode 100644 index 00000000..f4ddb91e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/firecrawl/firecrawl.py @@ -0,0 +1,1286 @@ +""" +FirecrawlApp Module + +This module provides a class `FirecrawlApp` for interacting with the Firecrawl API. +It includes methods to scrape URLs, perform searches, initiate and monitor crawl jobs, +and check the status of these jobs. The module uses requests for HTTP communication +and handles retries for certain HTTP status codes. + +Classes: + - FirecrawlApp: Main class for interacting with the Firecrawl API. +""" +import logging +import os +import time +from typing import Any, Dict, Optional, List, Union, Callable +import json + +import requests +import pydantic +import websockets + +logger : logging.Logger = logging.getLogger("firecrawl") + +class SearchParams(pydantic.BaseModel): + query: str + limit: Optional[int] = 5 + tbs: Optional[str] = None + filter: Optional[str] = None + lang: Optional[str] = "en" + country: Optional[str] = "us" + location: Optional[str] = None + origin: Optional[str] = "api" + timeout: Optional[int] = 60000 + scrapeOptions: Optional[Dict[str, Any]] = None + +class GenerateLLMsTextParams(pydantic.BaseModel): + """ + Parameters for the LLMs.txt generation operation. + """ + maxUrls: Optional[int] = 10 + showFullText: Optional[bool] = False + __experimental_stream: Optional[bool] = None + +class DeepResearchParams(pydantic.BaseModel): + """ + Parameters for the deep research operation. + """ + maxDepth: Optional[int] = 7 + timeLimit: Optional[int] = 270 + maxUrls: Optional[int] = 20 + __experimental_streamSteps: Optional[bool] = None + +class DeepResearchResponse(pydantic.BaseModel): + """ + Response from the deep research operation. + """ + success: bool + id: str + error: Optional[str] = None + +class DeepResearchStatusResponse(pydantic.BaseModel): + """ + Status response from the deep research operation. + """ + success: bool + data: Optional[Dict[str, Any]] = None + status: str + error: Optional[str] = None + expiresAt: str + currentDepth: int + maxDepth: int + activities: List[Dict[str, Any]] + sources: List[Dict[str, Any]] + summaries: List[str] + +class FirecrawlApp: + class SearchResponse(pydantic.BaseModel): + """ + Response from the search operation. + """ + success: bool + data: List[Dict[str, Any]] + warning: Optional[str] = None + error: Optional[str] = None + + class ExtractParams(pydantic.BaseModel): + """ + Parameters for the extract operation. + """ + prompt: Optional[str] = None + schema_: Optional[Any] = pydantic.Field(None, alias='schema') + system_prompt: Optional[str] = None + allow_external_links: Optional[bool] = False + enable_web_search: Optional[bool] = False + # Just for backwards compatibility + enableWebSearch: Optional[bool] = False + show_sources: Optional[bool] = False + + + + + class ExtractResponse(pydantic.BaseModel): + """ + Response from the extract operation. + """ + success: bool + data: Optional[Any] = None + error: Optional[str] = None + + def __init__(self, api_key: Optional[str] = None, api_url: Optional[str] = None) -> None: + """ + Initialize the FirecrawlApp instance with API key, API URL. + + Args: + api_key (Optional[str]): API key for authenticating with the Firecrawl API. + api_url (Optional[str]): Base URL for the Firecrawl API. + """ + self.api_key = api_key or os.getenv('FIRECRAWL_API_KEY') + self.api_url = api_url or os.getenv('FIRECRAWL_API_URL', 'https://api.firecrawl.dev') + + # Only require API key when using cloud service + if 'api.firecrawl.dev' in self.api_url and self.api_key is None: + logger.warning("No API key provided for cloud service") + raise ValueError('No API key provided') + + logger.debug(f"Initialized FirecrawlApp with API URL: {self.api_url}") + + def scrape_url(self, url: str, params: Optional[Dict[str, Any]] = None) -> Any: + """ + Scrape the specified URL using the Firecrawl API. + + Args: + url (str): The URL to scrape. + params (Optional[Dict[str, Any]]): Additional parameters for the scrape request. + + Returns: + Any: The scraped data if the request is successful. + + Raises: + Exception: If the scrape request fails. + """ + + headers = self._prepare_headers() + + # Prepare the base scrape parameters with the URL + scrape_params = {'url': url} + + # If there are additional params, process them + if params: + # Handle extract (for v1) + extract = params.get('extract', {}) + if extract: + if 'schema' in extract and hasattr(extract['schema'], 'schema'): + extract['schema'] = extract['schema'].schema() + scrape_params['extract'] = extract + + # Include any other params directly at the top level of scrape_params + for key, value in params.items(): + if key not in ['extract']: + scrape_params[key] = value + + json = params.get("jsonOptions", {}) + if json: + if 'schema' in json and hasattr(json['schema'], 'schema'): + json['schema'] = json['schema'].schema() + scrape_params['jsonOptions'] = json + + # Include any other params directly at the top level of scrape_params + for key, value in params.items(): + if key not in ['jsonOptions']: + scrape_params[key] = value + + + endpoint = f'/v1/scrape' + # Make the POST request with the prepared headers and JSON data + response = requests.post( + f'{self.api_url}{endpoint}', + headers=headers, + json=scrape_params, + timeout=(scrape_params["timeout"] + 5000 if "timeout" in scrape_params else None), + ) + if response.status_code == 200: + try: + response = response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + if response['success'] and 'data' in response: + return response['data'] + elif "error" in response: + raise Exception(f'Failed to scrape URL. Error: {response["error"]}') + else: + raise Exception(f'Failed to scrape URL. Error: {response}') + else: + self._handle_error(response, 'scrape URL') + + def search(self, query: str, params: Optional[Union[Dict[str, Any], SearchParams]] = None) -> Dict[str, Any]: + """ + Search for content using the Firecrawl API. + + Args: + query (str): The search query string. + params (Optional[Union[Dict[str, Any], SearchParams]]): Additional search parameters. + + Returns: + Dict[str, Any]: The search response containing success status and search results. + """ + if params is None: + params = {} + + if isinstance(params, dict): + search_params = SearchParams(query=query, **params) + else: + search_params = params + search_params.query = query + + response = requests.post( + f"{self.api_url}/v1/search", + headers={"Authorization": f"Bearer {self.api_key}"}, + json=search_params.dict(exclude_none=True) + ) + + if response.status_code != 200: + raise Exception(f"Request failed with status code {response.status_code}") + + try: + return response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + + def crawl_url(self, url: str, + params: Optional[Dict[str, Any]] = None, + poll_interval: Optional[int] = 2, + idempotency_key: Optional[str] = None) -> Any: + """ + Initiate a crawl job for the specified URL using the Firecrawl API. + + Args: + url (str): The URL to crawl. + params (Optional[Dict[str, Any]]): Additional parameters for the crawl request. + poll_interval (Optional[int]): Time in seconds between status checks when waiting for job completion. Defaults to 2 seconds. + idempotency_key (Optional[str]): A unique uuid key to ensure idempotency of requests. + + Returns: + Dict[str, Any]: A dictionary containing the crawl results. The structure includes: + - 'success' (bool): Indicates if the crawl was successful. + - 'status' (str): The final status of the crawl job (e.g., 'completed'). + - 'completed' (int): Number of scraped pages that completed. + - 'total' (int): Total number of scraped pages. + - 'creditsUsed' (int): Estimated number of API credits used for this crawl. + - 'expiresAt' (str): ISO 8601 formatted date-time string indicating when the crawl data expires. + - 'data' (List[Dict]): List of all the scraped pages. + + Raises: + Exception: If the crawl job initiation or monitoring fails. + """ + endpoint = f'/v1/crawl' + headers = self._prepare_headers(idempotency_key) + json_data = {'url': url} + if params: + json_data.update(params) + response = self._post_request(f'{self.api_url}{endpoint}', json_data, headers) + if response.status_code == 200: + try: + id = response.json().get('id') + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + return self._monitor_job_status(id, headers, poll_interval) + + else: + self._handle_error(response, 'start crawl job') + + + def async_crawl_url(self, url: str, params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> Dict[str, Any]: + """ + Initiate a crawl job asynchronously. + + Args: + url (str): The URL to crawl. + params (Optional[Dict[str, Any]]): Additional parameters for the crawl request. + idempotency_key (Optional[str]): A unique uuid key to ensure idempotency of requests. + + Returns: + Dict[str, Any]: A dictionary containing the crawl initiation response. The structure includes: + - 'success' (bool): Indicates if the crawl initiation was successful. + - 'id' (str): The unique identifier for the crawl job. + - 'url' (str): The URL to check the status of the crawl job. + """ + endpoint = f'/v1/crawl' + headers = self._prepare_headers(idempotency_key) + json_data = {'url': url} + if params: + json_data.update(params) + response = self._post_request(f'{self.api_url}{endpoint}', json_data, headers) + if response.status_code == 200: + try: + return response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + else: + self._handle_error(response, 'start crawl job') + + def check_crawl_status(self, id: str) -> Any: + """ + Check the status of a crawl job using the Firecrawl API. + + Args: + id (str): The ID of the crawl job. + + Returns: + Any: The status of the crawl job. + + Raises: + Exception: If the status check request fails. + """ + endpoint = f'/v1/crawl/{id}' + + headers = self._prepare_headers() + response = self._get_request(f'{self.api_url}{endpoint}', headers) + if response.status_code == 200: + try: + status_data = response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + if status_data['status'] == 'completed': + if 'data' in status_data: + data = status_data['data'] + while 'next' in status_data: + if len(status_data['data']) == 0: + break + next_url = status_data.get('next') + if not next_url: + logger.warning("Expected 'next' URL is missing.") + break + try: + status_response = self._get_request(next_url, headers) + if status_response.status_code != 200: + logger.error(f"Failed to fetch next page: {status_response.status_code}") + break + try: + next_data = status_response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + data.extend(next_data.get('data', [])) + status_data = next_data + except Exception as e: + logger.error(f"Error during pagination request: {e}") + break + status_data['data'] = data + + response = { + 'status': status_data.get('status'), + 'total': status_data.get('total'), + 'completed': status_data.get('completed'), + 'creditsUsed': status_data.get('creditsUsed'), + 'expiresAt': status_data.get('expiresAt'), + 'data': status_data.get('data') + } + + if 'error' in status_data: + response['error'] = status_data['error'] + + if 'next' in status_data: + response['next'] = status_data['next'] + + return { + 'success': False if 'error' in status_data else True, + **response + } + else: + self._handle_error(response, 'check crawl status') + + def check_crawl_errors(self, id: str) -> Dict[str, Any]: + """ + Returns information about crawl errors. + + Args: + id (str): The ID of the crawl job. + + Returns: + Dict[str, Any]: Information about crawl errors. + """ + headers = self._prepare_headers() + response = self._get_request(f'{self.api_url}/v1/crawl/{id}/errors', headers) + if response.status_code == 200: + try: + return response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + else: + self._handle_error(response, "check crawl errors") + + def cancel_crawl(self, id: str) -> Dict[str, Any]: + """ + Cancel an asynchronous crawl job using the Firecrawl API. + + Args: + id (str): The ID of the crawl job to cancel. + + Returns: + Dict[str, Any]: The response from the cancel crawl request. + """ + headers = self._prepare_headers() + response = self._delete_request(f'{self.api_url}/v1/crawl/{id}', headers) + if response.status_code == 200: + try: + return response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + else: + self._handle_error(response, "cancel crawl job") + + def crawl_url_and_watch(self, url: str, params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> 'CrawlWatcher': + """ + Initiate a crawl job and return a CrawlWatcher to monitor the job via WebSocket. + + Args: + url (str): The URL to crawl. + params (Optional[Dict[str, Any]]): Additional parameters for the crawl request. + idempotency_key (Optional[str]): A unique uuid key to ensure idempotency of requests. + + Returns: + CrawlWatcher: An instance of CrawlWatcher to monitor the crawl job. + """ + crawl_response = self.async_crawl_url(url, params, idempotency_key) + if crawl_response['success'] and 'id' in crawl_response: + return CrawlWatcher(crawl_response['id'], self) + else: + raise Exception("Crawl job failed to start") + + def map_url(self, url: str, params: Optional[Dict[str, Any]] = None) -> Any: + """ + Perform a map search using the Firecrawl API. + + Args: + url (str): The URL to perform the map search on. + params (Optional[Dict[str, Any]]): Additional parameters for the map search. + + Returns: + List[str]: A list of URLs discovered during the map search. + """ + endpoint = f'/v1/map' + headers = self._prepare_headers() + + # Prepare the base scrape parameters with the URL + json_data = {'url': url} + if params: + json_data.update(params) + + # Make the POST request with the prepared headers and JSON data + response = requests.post( + f'{self.api_url}{endpoint}', + headers=headers, + json=json_data, + ) + if response.status_code == 200: + try: + response = response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + if response['success'] and 'links' in response: + return response + elif 'error' in response: + raise Exception(f'Failed to map URL. Error: {response["error"]}') + else: + raise Exception(f'Failed to map URL. Error: {response}') + else: + self._handle_error(response, 'map') + + def batch_scrape_urls(self, urls: List[str], + params: Optional[Dict[str, Any]] = None, + poll_interval: Optional[int] = 2, + idempotency_key: Optional[str] = None) -> Any: + """ + Initiate a batch scrape job for the specified URLs using the Firecrawl API. + + Args: + urls (List[str]): The URLs to scrape. + params (Optional[Dict[str, Any]]): Additional parameters for the scraper. + poll_interval (Optional[int]): Time in seconds between status checks when waiting for job completion. Defaults to 2 seconds. + idempotency_key (Optional[str]): A unique uuid key to ensure idempotency of requests. + + Returns: + Dict[str, Any]: A dictionary containing the scrape results. The structure includes: + - 'success' (bool): Indicates if the batch scrape was successful. + - 'status' (str): The final status of the batch scrape job (e.g., 'completed'). + - 'completed' (int): Number of scraped pages that completed. + - 'total' (int): Total number of scraped pages. + - 'creditsUsed' (int): Estimated number of API credits used for this batch scrape. + - 'expiresAt' (str): ISO 8601 formatted date-time string indicating when the batch scrape data expires. + - 'data' (List[Dict]): List of all the scraped pages. + + Raises: + Exception: If the batch scrape job initiation or monitoring fails. + """ + endpoint = f'/v1/batch/scrape' + headers = self._prepare_headers(idempotency_key) + json_data = {'urls': urls} + if params: + json_data.update(params) + response = self._post_request(f'{self.api_url}{endpoint}', json_data, headers) + if response.status_code == 200: + try: + id = response.json().get('id') + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + return self._monitor_job_status(id, headers, poll_interval) + + else: + self._handle_error(response, 'start batch scrape job') + + + def async_batch_scrape_urls(self, urls: List[str], params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> Dict[str, Any]: + """ + Initiate a crawl job asynchronously. + + Args: + urls (List[str]): The URLs to scrape. + params (Optional[Dict[str, Any]]): Additional parameters for the scraper. + idempotency_key (Optional[str]): A unique uuid key to ensure idempotency of requests. + + Returns: + Dict[str, Any]: A dictionary containing the batch scrape initiation response. The structure includes: + - 'success' (bool): Indicates if the batch scrape initiation was successful. + - 'id' (str): The unique identifier for the batch scrape job. + - 'url' (str): The URL to check the status of the batch scrape job. + """ + endpoint = f'/v1/batch/scrape' + headers = self._prepare_headers(idempotency_key) + json_data = {'urls': urls} + if params: + json_data.update(params) + response = self._post_request(f'{self.api_url}{endpoint}', json_data, headers) + if response.status_code == 200: + try: + return response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + else: + self._handle_error(response, 'start batch scrape job') + + def batch_scrape_urls_and_watch(self, urls: List[str], params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> 'CrawlWatcher': + """ + Initiate a batch scrape job and return a CrawlWatcher to monitor the job via WebSocket. + + Args: + urls (List[str]): The URLs to scrape. + params (Optional[Dict[str, Any]]): Additional parameters for the scraper. + idempotency_key (Optional[str]): A unique uuid key to ensure idempotency of requests. + + Returns: + CrawlWatcher: An instance of CrawlWatcher to monitor the batch scrape job. + """ + crawl_response = self.async_batch_scrape_urls(urls, params, idempotency_key) + if crawl_response['success'] and 'id' in crawl_response: + return CrawlWatcher(crawl_response['id'], self) + else: + raise Exception("Batch scrape job failed to start") + + def check_batch_scrape_status(self, id: str) -> Any: + """ + Check the status of a batch scrape job using the Firecrawl API. + + Args: + id (str): The ID of the batch scrape job. + + Returns: + Any: The status of the batch scrape job. + + Raises: + Exception: If the status check request fails. + """ + endpoint = f'/v1/batch/scrape/{id}' + + headers = self._prepare_headers() + response = self._get_request(f'{self.api_url}{endpoint}', headers) + if response.status_code == 200: + try: + status_data = response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + if status_data['status'] == 'completed': + if 'data' in status_data: + data = status_data['data'] + while 'next' in status_data: + if len(status_data['data']) == 0: + break + next_url = status_data.get('next') + if not next_url: + logger.warning("Expected 'next' URL is missing.") + break + try: + status_response = self._get_request(next_url, headers) + if status_response.status_code != 200: + logger.error(f"Failed to fetch next page: {status_response.status_code}") + break + try: + next_data = status_response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + data.extend(next_data.get('data', [])) + status_data = next_data + except Exception as e: + logger.error(f"Error during pagination request: {e}") + break + status_data['data'] = data + + response = { + 'status': status_data.get('status'), + 'total': status_data.get('total'), + 'completed': status_data.get('completed'), + 'creditsUsed': status_data.get('creditsUsed'), + 'expiresAt': status_data.get('expiresAt'), + 'data': status_data.get('data') + } + + if 'error' in status_data: + response['error'] = status_data['error'] + + if 'next' in status_data: + response['next'] = status_data['next'] + + return { + 'success': False if 'error' in status_data else True, + **response + } + else: + self._handle_error(response, 'check batch scrape status') + + def check_batch_scrape_errors(self, id: str) -> Dict[str, Any]: + """ + Returns information about batch scrape errors. + + Args: + id (str): The ID of the crawl job. + + Returns: + Dict[str, Any]: Information about crawl errors. + """ + headers = self._prepare_headers() + response = self._get_request(f'{self.api_url}/v1/batch/scrape/{id}/errors', headers) + if response.status_code == 200: + try: + return response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + else: + self._handle_error(response, "check batch scrape errors") + + def extract(self, urls: Optional[List[str]] = None, params: Optional[ExtractParams] = None) -> Any: + """ + Extracts information from a URL using the Firecrawl API. + + Args: + urls (Optional[List[str]]): The URLs to extract information from. + params (Optional[ExtractParams]): Additional parameters for the extract request. + + Returns: + Union[ExtractResponse, ErrorResponse]: The response from the extract operation. + """ + headers = self._prepare_headers() + + if not params or (not params.get('prompt') and not params.get('schema')): + raise ValueError("Either prompt or schema is required") + + if not urls and not params.get('prompt'): + raise ValueError("Either urls or prompt is required") + + schema = params.get('schema') + if schema: + if hasattr(schema, 'model_json_schema'): + # Convert Pydantic model to JSON schema + schema = schema.model_json_schema() + # Otherwise assume it's already a JSON schema dict + + request_data = { + 'urls': urls, + 'allowExternalLinks': params.get('allow_external_links', params.get('allowExternalLinks', False)), + 'enableWebSearch': params.get('enable_web_search', params.get('enableWebSearch', False)), + 'showSources': params.get('show_sources', params.get('showSources', False)), + 'schema': schema, + 'origin': 'api-sdk' + } + + if not request_data['urls']: + request_data['urls'] = [] + # Only add prompt and systemPrompt if they exist + if params.get('prompt'): + request_data['prompt'] = params['prompt'] + if params.get('system_prompt'): + request_data['systemPrompt'] = params['system_prompt'] + elif params.get('systemPrompt'): # Check legacy field name + request_data['systemPrompt'] = params['systemPrompt'] + + try: + # Send the initial extract request + response = self._post_request( + f'{self.api_url}/v1/extract', + request_data, + headers + ) + if response.status_code == 200: + try: + data = response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + if data['success']: + job_id = data.get('id') + if not job_id: + raise Exception('Job ID not returned from extract request.') + + # Poll for the extract status + while True: + status_response = self._get_request( + f'{self.api_url}/v1/extract/{job_id}', + headers + ) + if status_response.status_code == 200: + try: + status_data = status_response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + if status_data['status'] == 'completed': + if status_data['success']: + return status_data + else: + raise Exception(f'Failed to extract. Error: {status_data["error"]}') + elif status_data['status'] in ['failed', 'cancelled']: + raise Exception(f'Extract job {status_data["status"]}. Error: {status_data["error"]}') + else: + self._handle_error(status_response, "extract-status") + + time.sleep(2) # Polling interval + else: + raise Exception(f'Failed to extract. Error: {data["error"]}') + else: + self._handle_error(response, "extract") + except Exception as e: + raise ValueError(str(e), 500) + + return {'success': False, 'error': "Internal server error."} + + def get_extract_status(self, job_id: str) -> Dict[str, Any]: + """ + Retrieve the status of an extract job. + + Args: + job_id (str): The ID of the extract job. + + Returns: + Dict[str, Any]: The status of the extract job. + + Raises: + ValueError: If there is an error retrieving the status. + """ + headers = self._prepare_headers() + try: + response = self._get_request(f'{self.api_url}/v1/extract/{job_id}', headers) + if response.status_code == 200: + try: + return response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + else: + self._handle_error(response, "get extract status") + except Exception as e: + raise ValueError(str(e), 500) + + def async_extract(self, urls: List[str], params: Optional[Dict[str, Any]] = None, idempotency_key: Optional[str] = None) -> Dict[str, Any]: + """ + Initiate an asynchronous extract job. + + Args: + urls (List[str]): The URLs to extract data from. + params (Optional[Dict[str, Any]]): Additional parameters for the extract request. + idempotency_key (Optional[str]): A unique key to ensure idempotency of requests. + + Returns: + Dict[str, Any]: The response from the extract operation. + + Raises: + ValueError: If there is an error initiating the extract job. + """ + headers = self._prepare_headers(idempotency_key) + + schema = params.get('schema') if params else None + if schema: + if hasattr(schema, 'model_json_schema'): + # Convert Pydantic model to JSON schema + schema = schema.model_json_schema() + # Otherwise assume it's already a JSON schema dict + + jsonData = {'urls': urls, **(params or {})} + request_data = { + **jsonData, + 'allowExternalLinks': params.get('allow_external_links', False) if params else False, + 'schema': schema, + 'origin': 'api-sdk' + } + + try: + response = self._post_request(f'{self.api_url}/v1/extract', request_data, headers) + if response.status_code == 200: + try: + return response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + else: + self._handle_error(response, "async extract") + except Exception as e: + raise ValueError(str(e), 500) + + def generate_llms_text(self, url: str, params: Optional[Union[Dict[str, Any], GenerateLLMsTextParams]] = None) -> Dict[str, Any]: + """ + Generate LLMs.txt for a given URL and poll until completion. + + Args: + url (str): The URL to generate LLMs.txt from. + params (Optional[Union[Dict[str, Any], GenerateLLMsTextParams]]): Parameters for the LLMs.txt generation. + + Returns: + Dict[str, Any]: A dictionary containing the generation results. The structure includes: + - 'success' (bool): Indicates if the generation was successful. + - 'status' (str): The final status of the generation job. + - 'data' (Dict): The generated LLMs.txt data. + - 'error' (Optional[str]): Error message if the generation failed. + - 'expiresAt' (str): ISO 8601 formatted date-time string indicating when the data expires. + + Raises: + Exception: If the generation job fails or an error occurs during status checks. + """ + if params is None: + params = {} + + if isinstance(params, dict): + generation_params = GenerateLLMsTextParams(**params) + else: + generation_params = params + + response = self.async_generate_llms_text(url, generation_params) + if not response.get('success') or 'id' not in response: + return response + + job_id = response['id'] + while True: + status = self.check_generate_llms_text_status(job_id) + + if status['status'] == 'completed': + return status + elif status['status'] == 'failed': + raise Exception(f'LLMs.txt generation failed. Error: {status.get("error")}') + elif status['status'] != 'processing': + break + + time.sleep(2) # Polling interval + + return {'success': False, 'error': 'LLMs.txt generation job terminated unexpectedly'} + + def async_generate_llms_text(self, url: str, params: Optional[Union[Dict[str, Any], GenerateLLMsTextParams]] = None) -> Dict[str, Any]: + """ + Initiate an asynchronous LLMs.txt generation operation. + + Args: + url (str): The URL to generate LLMs.txt from. + params (Optional[Union[Dict[str, Any], GenerateLLMsTextParams]]): Parameters for the LLMs.txt generation. + + Returns: + Dict[str, Any]: A dictionary containing the generation initiation response. The structure includes: + - 'success' (bool): Indicates if the generation initiation was successful. + - 'id' (str): The unique identifier for the generation job. + + Raises: + Exception: If the generation job initiation fails. + """ + if params is None: + params = {} + + if isinstance(params, dict): + generation_params = GenerateLLMsTextParams(**params) + else: + generation_params = params + + headers = self._prepare_headers() + json_data = {'url': url, **generation_params.dict(exclude_none=True)} + + try: + response = self._post_request(f'{self.api_url}/v1/llmstxt', json_data, headers) + if response.status_code == 200: + try: + return response.json() + except: + raise Exception('Failed to parse Firecrawl response as JSON.') + else: + self._handle_error(response, 'start LLMs.txt generation') + except Exception as e: + raise ValueError(str(e)) + + return {'success': False, 'error': 'Internal server error'} + + def check_generate_llms_text_status(self, id: str) -> Dict[str, Any]: + """ + Check the status of a LLMs.txt generation operation. + + Args: + id (str): The ID of the LLMs.txt generation operation. + + Returns: + Dict[str, Any]: The current status and results of the generation operation. + + Raises: + Exception: If the status check fails. + """ + headers = self._prepare_headers() + try: + response = self._get_request(f'{self.api_url}/v1/llmstxt/{id}', headers) + if response.status_code == 200: + try: + return response.json() + except: + raise Exception('Failed to parse Firecrawl response as JSON.') + elif response.status_code == 404: + raise Exception('LLMs.txt generation job not found') + else: + self._handle_error(response, 'check LLMs.txt generation status') + except Exception as e: + raise ValueError(str(e)) + + return {'success': False, 'error': 'Internal server error'} + + def _prepare_headers(self, idempotency_key: Optional[str] = None) -> Dict[str, str]: + """ + Prepare the headers for API requests. + + Args: + idempotency_key (Optional[str]): A unique key to ensure idempotency of requests. + + Returns: + Dict[str, str]: The headers including content type, authorization, and optionally idempotency key. + """ + if idempotency_key: + return { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {self.api_key}', + 'x-idempotency-key': idempotency_key + } + + return { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {self.api_key}', + } + + def _post_request(self, url: str, + data: Dict[str, Any], + headers: Dict[str, str], + retries: int = 3, + backoff_factor: float = 0.5) -> requests.Response: + """ + Make a POST request with retries. + + Args: + url (str): The URL to send the POST request to. + data (Dict[str, Any]): The JSON data to include in the POST request. + headers (Dict[str, str]): The headers to include in the POST request. + retries (int): Number of retries for the request. + backoff_factor (float): Backoff factor for retries. + + Returns: + requests.Response: The response from the POST request. + + Raises: + requests.RequestException: If the request fails after the specified retries. + """ + for attempt in range(retries): + response = requests.post(url, headers=headers, json=data, timeout=((data["timeout"] + 5000) if "timeout" in data else None)) + if response.status_code == 502: + time.sleep(backoff_factor * (2 ** attempt)) + else: + return response + return response + + def _get_request(self, url: str, + headers: Dict[str, str], + retries: int = 3, + backoff_factor: float = 0.5) -> requests.Response: + """ + Make a GET request with retries. + + Args: + url (str): The URL to send the GET request to. + headers (Dict[str, str]): The headers to include in the GET request. + retries (int): Number of retries for the request. + backoff_factor (float): Backoff factor for retries. + + Returns: + requests.Response: The response from the GET request. + + Raises: + requests.RequestException: If the request fails after the specified retries. + """ + for attempt in range(retries): + response = requests.get(url, headers=headers) + if response.status_code == 502: + time.sleep(backoff_factor * (2 ** attempt)) + else: + return response + return response + + def _delete_request(self, url: str, + headers: Dict[str, str], + retries: int = 3, + backoff_factor: float = 0.5) -> requests.Response: + """ + Make a DELETE request with retries. + + Args: + url (str): The URL to send the DELETE request to. + headers (Dict[str, str]): The headers to include in the DELETE request. + retries (int): Number of retries for the request. + backoff_factor (float): Backoff factor for retries. + + Returns: + requests.Response: The response from the DELETE request. + + Raises: + requests.RequestException: If the request fails after the specified retries. + """ + for attempt in range(retries): + response = requests.delete(url, headers=headers) + if response.status_code == 502: + time.sleep(backoff_factor * (2 ** attempt)) + else: + return response + return response + + def _monitor_job_status(self, id: str, headers: Dict[str, str], poll_interval: int) -> Any: + """ + Monitor the status of a crawl job until completion. + + Args: + id (str): The ID of the crawl job. + headers (Dict[str, str]): The headers to include in the status check requests. + poll_interval (int): Secounds between status checks. + Returns: + Any: The crawl results if the job is completed successfully. + + Raises: + Exception: If the job fails or an error occurs during status checks. + """ + while True: + api_url = f'{self.api_url}/v1/crawl/{id}' + + status_response = self._get_request(api_url, headers) + if status_response.status_code == 200: + try: + status_data = status_response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + if status_data['status'] == 'completed': + if 'data' in status_data: + data = status_data['data'] + while 'next' in status_data: + if len(status_data['data']) == 0: + break + status_response = self._get_request(status_data['next'], headers) + try: + status_data = status_response.json() + except: + raise Exception(f'Failed to parse Firecrawl response as JSON.') + data.extend(status_data.get('data', [])) + status_data['data'] = data + return status_data + else: + raise Exception('Crawl job completed but no data was returned') + elif status_data['status'] in ['active', 'paused', 'pending', 'queued', 'waiting', 'scraping']: + poll_interval=max(poll_interval,2) + time.sleep(poll_interval) # Wait for the specified interval before checking again + else: + raise Exception(f'Crawl job failed or was stopped. Status: {status_data["status"]}') + else: + self._handle_error(status_response, 'check crawl status') + + def _handle_error(self, response: requests.Response, action: str) -> None: + """ + Handle errors from API responses. + + Args: + response (requests.Response): The response object from the API request. + action (str): Description of the action that was being performed. + + Raises: + Exception: An exception with a message containing the status code and error details from the response. + """ + try: + error_message = response.json().get('error', 'No error message provided.') + error_details = response.json().get('details', 'No additional error details provided.') + except: + raise requests.exceptions.HTTPError(f'Failed to parse Firecrawl error response as JSON. Status code: {response.status_code}', response=response) + + + if response.status_code == 402: + message = f"Payment Required: Failed to {action}. {error_message} - {error_details}" + elif response.status_code == 408: + message = f"Request Timeout: Failed to {action} as the request timed out. {error_message} - {error_details}" + elif response.status_code == 409: + message = f"Conflict: Failed to {action} due to a conflict. {error_message} - {error_details}" + elif response.status_code == 500: + message = f"Internal Server Error: Failed to {action}. {error_message} - {error_details}" + else: + message = f"Unexpected error during {action}: Status code {response.status_code}. {error_message} - {error_details}" + + # Raise an HTTPError with the custom message and attach the response + raise requests.exceptions.HTTPError(message, response=response) + + def deep_research(self, query: str, params: Optional[Union[Dict[str, Any], DeepResearchParams]] = None, + on_activity: Optional[Callable[[Dict[str, Any]], None]] = None, + on_source: Optional[Callable[[Dict[str, Any]], None]] = None) -> Dict[str, Any]: + """ + Initiates a deep research operation on a given query and polls until completion. + + Args: + query (str): The query to research. + params (Optional[Union[Dict[str, Any], DeepResearchParams]]): Parameters for the deep research operation. + on_activity (Optional[Callable[[Dict[str, Any]], None]]): Optional callback to receive activity updates in real-time. + + Returns: + Dict[str, Any]: The final research results. + + Raises: + Exception: If the research operation fails. + """ + if params is None: + params = {} + + if isinstance(params, dict): + research_params = DeepResearchParams(**params) + else: + research_params = params + + response = self.async_deep_research(query, research_params) + if not response.get('success') or 'id' not in response: + return response + + job_id = response['id'] + last_activity_count = 0 + last_source_count = 0 + + while True: + status = self.check_deep_research_status(job_id) + + if on_activity and 'activities' in status: + new_activities = status['activities'][last_activity_count:] + for activity in new_activities: + on_activity(activity) + last_activity_count = len(status['activities']) + + if on_source and 'sources' in status: + new_sources = status['sources'][last_source_count:] + for source in new_sources: + on_source(source) + last_source_count = len(status['sources']) + + if status['status'] == 'completed': + return status + elif status['status'] == 'failed': + raise Exception(f'Deep research failed. Error: {status.get("error")}') + elif status['status'] != 'processing': + break + + time.sleep(2) # Polling interval + + return {'success': False, 'error': 'Deep research job terminated unexpectedly'} + + def async_deep_research(self, query: str, params: Optional[Union[Dict[str, Any], DeepResearchParams]] = None) -> Dict[str, Any]: + """ + Initiates an asynchronous deep research operation. + + Args: + query (str): The query to research. + params (Optional[Union[Dict[str, Any], DeepResearchParams]]): Parameters for the deep research operation. + + Returns: + Dict[str, Any]: The response from the deep research initiation. + + Raises: + Exception: If the research initiation fails. + """ + if params is None: + params = {} + + if isinstance(params, dict): + research_params = DeepResearchParams(**params) + else: + research_params = params + + headers = self._prepare_headers() + json_data = {'query': query, **research_params.dict(exclude_none=True)} + + try: + response = self._post_request(f'{self.api_url}/v1/deep-research', json_data, headers) + if response.status_code == 200: + try: + return response.json() + except: + raise Exception('Failed to parse Firecrawl response as JSON.') + else: + self._handle_error(response, 'start deep research') + except Exception as e: + raise ValueError(str(e)) + + return {'success': False, 'error': 'Internal server error'} + + def check_deep_research_status(self, id: str) -> Dict[str, Any]: + """ + Check the status of a deep research operation. + + Args: + id (str): The ID of the deep research operation. + + Returns: + Dict[str, Any]: The current status and results of the research operation. + + Raises: + Exception: If the status check fails. + """ + headers = self._prepare_headers() + try: + response = self._get_request(f'{self.api_url}/v1/deep-research/{id}', headers) + if response.status_code == 200: + try: + return response.json() + except: + raise Exception('Failed to parse Firecrawl response as JSON.') + elif response.status_code == 404: + raise Exception('Deep research job not found') + else: + self._handle_error(response, 'check deep research status') + except Exception as e: + raise ValueError(str(e)) + + return {'success': False, 'error': 'Internal server error'} + +class CrawlWatcher: + def __init__(self, id: str, app: FirecrawlApp): + self.id = id + self.app = app + self.data: List[Dict[str, Any]] = [] + self.status = "scraping" + self.ws_url = f"{app.api_url.replace('http', 'ws')}/v1/crawl/{id}" + self.event_handlers = { + 'done': [], + 'error': [], + 'document': [] + } + + async def connect(self): + async with websockets.connect(self.ws_url, extra_headers={"Authorization": f"Bearer {self.app.api_key}"}) as websocket: + await self._listen(websocket) + + async def _listen(self, websocket): + async for message in websocket: + msg = json.loads(message) + await self._handle_message(msg) + + def add_event_listener(self, event_type: str, handler): + if event_type in self.event_handlers: + self.event_handlers[event_type].append(handler) + + def dispatch_event(self, event_type: str, detail: Dict[str, Any]): + if event_type in self.event_handlers: + for handler in self.event_handlers[event_type]: + handler(detail) + + async def _handle_message(self, msg: Dict[str, Any]): + if msg['type'] == 'done': + self.status = 'completed' + self.dispatch_event('done', {'status': self.status, 'data': self.data, 'id': self.id}) + elif msg['type'] == 'error': + self.status = 'failed' + self.dispatch_event('error', {'status': self.status, 'data': self.data, 'error': msg['error'], 'id': self.id}) + elif msg['type'] == 'catchup': + self.status = msg['data']['status'] + self.data.extend(msg['data'].get('data', [])) + for doc in self.data: + self.dispatch_event('document', {'data': doc, 'id': self.id}) + elif msg['type'] == 'document': + self.data.append(msg['data']) + self.dispatch_event('document', {'data': msg['data'], 'id': self.id}) -- cgit v1.2.3