From 4a52a71956a8d46fcb7294ac71734504bb09bcc2 Mon Sep 17 00:00:00 2001 From: S. Solomon Darnell Date: Fri, 28 Mar 2025 21:52:21 -0500 Subject: two version of R2R are here --- .../python3.12/site-packages/jsonpath/__init__.py | 337 +++++++++++++++++++++ 1 file changed, 337 insertions(+) create mode 100644 .venv/lib/python3.12/site-packages/jsonpath/__init__.py (limited to '.venv/lib/python3.12/site-packages/jsonpath') diff --git a/.venv/lib/python3.12/site-packages/jsonpath/__init__.py b/.venv/lib/python3.12/site-packages/jsonpath/__init__.py new file mode 100644 index 00000000..8c05f5aa --- /dev/null +++ b/.venv/lib/python3.12/site-packages/jsonpath/__init__.py @@ -0,0 +1,337 @@ +""" +Author : zhangxianbing +Date : 2020-12-27 09:22:14 +LastEditors : zhangxianbing +LastEditTime : 2022-03-14 10:33:55 +Description : JSONPath +""" +__version__ = "1.0.6" +__author__ = "zhangxianbing" + +import json +import logging +import os +import re +import sys +from collections import defaultdict +from typing import Union + + +def create_logger(name: str = None, level: Union[int, str] = logging.INFO): + """Get or create a logger used for local debug.""" + + formater = logging.Formatter( + f"%(asctime)s-%(levelname)s-[{name}] %(message)s", datefmt="[%Y-%m-%d %H:%M:%S]" + ) + + handler = logging.StreamHandler() + handler.setLevel(level) + handler.setFormatter(formater) + + logger = logging.getLogger(name) + logger.setLevel(level) + logger.addHandler(handler) + + return logger + + +logger = create_logger("jsonpath", os.getenv("PYLOGLEVEL", "INFO")) + + +class ExprSyntaxError(Exception): + pass + + +class JSONPath: + RESULT_TYPE = { + "VALUE": "A list of specific values.", + "PATH": "All path of specific values.", + } + + # common patterns + SEP = ";" + REP_DOUBLEDOT = re.compile(r"\.\.") + REP_DOT = re.compile(r"(?=|==|!=|>|<| in| not| is)|len\(@\.(.*?)\)" + ) + + # annotations + f: list + segments: list + lpath: int + subx = defaultdict(list) + result: list + result_type: str + eval_func: callable + + def __init__(self, expr: str): + expr = self._parse_expr(expr) + self.segments = expr.split(JSONPath.SEP) + self.lpath = len(self.segments) + logger.debug(f"segments : {self.segments}") + + self.caller_globals = sys._getframe(1).f_globals + + def parse(self, obj, result_type="VALUE", eval_func=eval): + if not isinstance(obj, (list, dict)): + raise TypeError("obj must be a list or a dict.") + + if result_type not in JSONPath.RESULT_TYPE: + raise ValueError( + f"result_type must be one of {tuple(JSONPath.RESULT_TYPE.keys())}" + ) + self.result_type = result_type + self.eval_func = eval_func + + self.result = [] + self._trace(obj, 0, "$") + + return self.result + + def search(self, obj, result_type="VALUE"): + return self.parse(obj, result_type) + + def _parse_expr(self, expr): + logger.debug(f"before expr : {expr}") + # pick up special patterns + expr = JSONPath.REP_GET_QUOTE.sub(self._get_quote, expr) + expr = JSONPath.REP_GET_BACKQUOTE.sub(self._get_backquote, expr) + expr = JSONPath.REP_GET_BRACKET.sub(self._get_bracket, expr) + expr = JSONPath.REP_GET_PAREN.sub(self._get_paren, expr) + # split + expr = JSONPath.REP_DOUBLEDOT.sub(f"{JSONPath.SEP}..{JSONPath.SEP}", expr) + expr = JSONPath.REP_DOT.sub(JSONPath.SEP, expr) + # put back + expr = JSONPath.REP_PUT_PAREN.sub(self._put_paren, expr) + expr = JSONPath.REP_PUT_BRACKET.sub(self._put_bracket, expr) + expr = JSONPath.REP_PUT_BACKQUOTE.sub(self._put_backquote, expr) + expr = JSONPath.REP_PUT_QUOTE.sub(self._put_quote, expr) + if expr.startswith("$;"): + expr = expr[2:] + + logger.debug(f"after expr : {expr}") + return expr + + # TODO abstract get and put procedures + def _get_quote(self, m): + n = len(self.subx["#Q"]) + self.subx["#Q"].append(m.group(1)) + return f"#Q{n}" + + def _put_quote(self, m): + return self.subx["#Q"][int(m.group(1))] + + def _get_backquote(self, m): + n = len(self.subx["#BQ"]) + self.subx["#BQ"].append(m.group(1)) + return f"`#BQ{n}`" + + def _put_backquote(self, m): + return self.subx["#BQ"][int(m.group(1))] + + def _get_bracket(self, m): + n = len(self.subx["#B"]) + self.subx["#B"].append(m.group(1)) + return f".#B{n}" + + def _put_bracket(self, m): + return self.subx["#B"][int(m.group(1))] + + def _get_paren(self, m): + n = len(self.subx["#P"]) + self.subx["#P"].append(m.group(1)) + return f"(#P{n})" + + def _put_paren(self, m): + return self.subx["#P"][int(m.group(1))] + + @staticmethod + def _gen_obj(m): + ret = "__obj" + for e in m.group(1).split("."): + ret += '["%s"]' % e + return ret + + @staticmethod + def _traverse(f, obj, i: int, path: str, *args): + if isinstance(obj, list): + for idx, v in enumerate(obj): + f(v, i, f"{path}{JSONPath.SEP}{idx}", *args) + elif isinstance(obj, dict): + for k, v in obj.items(): + f(v, i, f"{path}{JSONPath.SEP}{k}", *args) + + @staticmethod + def _getattr(obj: dict, path: str, *, convert_number_str=False): + r = obj + for k in path.split("."): + try: + r = r.get(k) + except (AttributeError, KeyError) as err: + logger.error(err) + return None + if convert_number_str and isinstance(r, str): + try: + if r.isdigit(): + return int(r) + return float(r) + except ValueError: + pass + return r + + @staticmethod + def _sorter(obj, sortbys): + for sortby in sortbys.split(",")[::-1]: + if sortby.startswith("~"): + obj.sort( + key=lambda t, k=sortby: JSONPath._getattr( + t[1], k[1:], convert_number_str=True + ), + reverse=True, + ) + else: + obj.sort( + key=lambda t, k=sortby: JSONPath._getattr( + t[1], k, convert_number_str=True + ) + ) + + def _filter(self, obj, i: int, path: str, step: str): + r = False + try: + r = self.eval_func(step, None, {"__obj": obj}) + except Exception as err: + logger.error(err) + if r: + self._trace(obj, i, path) + + def _trace(self, obj, i: int, path): + """Perform operation on object. + + Args: + obj ([type]): current operating object + i (int): current operation specified by index in self.segments + """ + + # store + if i >= self.lpath: + if self.result_type == "VALUE": + self.result.append(obj) + elif self.result_type == "PATH": + self.result.append(path) + logger.debug(f"path: {path} | value: {obj}") + return + + step = self.segments[i] + + # wildcard + if step == "*": + self._traverse(self._trace, obj, i + 1, path) + return + + # recursive descent + if step == "..": + self._trace(obj, i + 1, path) + self._traverse(self._trace, obj, i, path) + return + + # get value from list + if isinstance(obj, list) and step.isdigit(): + ikey = int(step) + if ikey < len(obj): + self._trace(obj[ikey], i + 1, f"{path}{JSONPath.SEP}{step}") + return + + # get value from dict + if isinstance(obj, dict) and step in obj: + self._trace(obj[step], i + 1, f"{path}{JSONPath.SEP}{step}") + return + + # slice + if isinstance(obj, list) and JSONPath.REP_SLICE_CONTENT.fullmatch(step): + obj = list(enumerate(obj)) + vals = self.eval_func(f"obj[{step}]") + for idx, v in vals: + self._trace(v, i + 1, f"{path}{JSONPath.SEP}{idx}") + return + + # select + if isinstance(obj, dict) and JSONPath.REP_SELECT_CONTENT.fullmatch(step): + for k in step.split(","): + if k in obj: + self._trace(obj[k], i + 1, f"{path}{JSONPath.SEP}{k}") + return + + # filter + if step.startswith("?(") and step.endswith(")"): + step = step[2:-1] + step = JSONPath.REP_FILTER_CONTENT.sub(self._gen_obj, step) + self._traverse(self._filter, obj, i + 1, path, step) + return + + # sorter + if step.startswith("/(") and step.endswith(")"): + if isinstance(obj, list): + obj = list(enumerate(obj)) + self._sorter(obj, step[2:-1]) + for idx, v in obj: + self._trace(v, i + 1, f"{path}{JSONPath.SEP}{idx}") + elif isinstance(obj, dict): + obj = list(obj.items()) + self._sorter(obj, step[2:-1]) + for k, v in obj: + self._trace(v, i + 1, f"{path}{JSONPath.SEP}{k}") + else: + raise ExprSyntaxError("sorter must acting on list or dict") + return + + # field-extractor + if step.startswith("(") and step.endswith(")"): + if isinstance(obj, dict): + obj_ = {} + for k in step[1:-1].split(","): + obj_[k] = self._getattr(obj, k) + self._trace(obj_, i + 1, path) + else: + raise ExprSyntaxError("field-extractor must acting on dict") + + return + + +def compile(expr): + return JSONPath(expr) + + +# global cache +_jsonpath_cache = {} + + +def search(expr, data): + global _jsonpath_cache + if expr not in _jsonpath_cache: + _jsonpath_cache[expr] = JSONPath(expr) + return _jsonpath_cache[expr].parse(data) + + +if __name__ == "__main__": + with open("test/data/2.json", "rb") as f: + d = json.load(f) + D = JSONPath("$.scores[/(score)].(score)").parse(d, "VALUE") + print(D) + for v in D: + print(v) -- cgit v1.2.3