about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/jsonpath
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/jsonpath')
-rw-r--r--.venv/lib/python3.12/site-packages/jsonpath/__init__.py337
1 files changed, 337 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/jsonpath/__init__.py b/.venv/lib/python3.12/site-packages/jsonpath/__init__.py
new file mode 100644
index 00000000..8c05f5aa
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/jsonpath/__init__.py
@@ -0,0 +1,337 @@
+"""
+Author       : zhangxianbing
+Date         : 2020-12-27 09:22:14
+LastEditors  : zhangxianbing
+LastEditTime : 2022-03-14 10:33:55
+Description  : JSONPath
+"""
+__version__ = "1.0.6"
+__author__ = "zhangxianbing"
+
+import json
+import logging
+import os
+import re
+import sys
+from collections import defaultdict
+from typing import Union
+
+
+def create_logger(name: str = None, level: Union[int, str] = logging.INFO):
+    """Get or create a logger used for local debug."""
+
+    formater = logging.Formatter(
+        f"%(asctime)s-%(levelname)s-[{name}] %(message)s", datefmt="[%Y-%m-%d %H:%M:%S]"
+    )
+
+    handler = logging.StreamHandler()
+    handler.setLevel(level)
+    handler.setFormatter(formater)
+
+    logger = logging.getLogger(name)
+    logger.setLevel(level)
+    logger.addHandler(handler)
+
+    return logger
+
+
+logger = create_logger("jsonpath", os.getenv("PYLOGLEVEL", "INFO"))
+
+
+class ExprSyntaxError(Exception):
+    pass
+
+
+class JSONPath:
+    RESULT_TYPE = {
+        "VALUE": "A list of specific values.",
+        "PATH": "All path of specific values.",
+    }
+
+    # common patterns
+    SEP = ";"
+    REP_DOUBLEDOT = re.compile(r"\.\.")
+    REP_DOT = re.compile(r"(?<!\.)\.(?!\.)")
+
+    # save special patterns
+    REP_GET_QUOTE = re.compile(r"['](.*?)[']")
+    REP_PUT_QUOTE = re.compile(r"#Q(\d+)")
+    REP_GET_BACKQUOTE = re.compile(r"[`](.*?)[`]")
+    REP_PUT_BACKQUOTE = re.compile(r"#BQ(\d+)")
+    REP_GET_BRACKET = re.compile(r"[\[](.*?)[\]]")
+    REP_PUT_BRACKET = re.compile(r"#B(\d+)")
+    REP_GET_PAREN = re.compile(r"[\(](.*?)[\)]")
+    REP_PUT_PAREN = re.compile(r"#P(\d+)")
+
+    # operators
+    REP_SLICE_CONTENT = re.compile(r"^(-?\d*)?:(-?\d*)?(:-?\d*)?$")
+    REP_SELECT_CONTENT = re.compile(r"^([\w.']+)(, ?[\w.']+)+$")
+    REP_FILTER_CONTENT = re.compile(
+        r"@\.(.*?)(?=<=|>=|==|!=|>|<| in| not| is)|len\(@\.(.*?)\)"
+    )
+
+    # annotations
+    f: list
+    segments: list
+    lpath: int
+    subx = defaultdict(list)
+    result: list
+    result_type: str
+    eval_func: callable
+
+    def __init__(self, expr: str):
+        expr = self._parse_expr(expr)
+        self.segments = expr.split(JSONPath.SEP)
+        self.lpath = len(self.segments)
+        logger.debug(f"segments  : {self.segments}")
+
+        self.caller_globals = sys._getframe(1).f_globals
+
+    def parse(self, obj, result_type="VALUE", eval_func=eval):
+        if not isinstance(obj, (list, dict)):
+            raise TypeError("obj must be a list or a dict.")
+
+        if result_type not in JSONPath.RESULT_TYPE:
+            raise ValueError(
+                f"result_type must be one of {tuple(JSONPath.RESULT_TYPE.keys())}"
+            )
+        self.result_type = result_type
+        self.eval_func = eval_func
+
+        self.result = []
+        self._trace(obj, 0, "$")
+
+        return self.result
+
+    def search(self, obj, result_type="VALUE"):
+        return self.parse(obj, result_type)
+
+    def _parse_expr(self, expr):
+        logger.debug(f"before expr : {expr}")
+        # pick up special patterns
+        expr = JSONPath.REP_GET_QUOTE.sub(self._get_quote, expr)
+        expr = JSONPath.REP_GET_BACKQUOTE.sub(self._get_backquote, expr)
+        expr = JSONPath.REP_GET_BRACKET.sub(self._get_bracket, expr)
+        expr = JSONPath.REP_GET_PAREN.sub(self._get_paren, expr)
+        # split
+        expr = JSONPath.REP_DOUBLEDOT.sub(f"{JSONPath.SEP}..{JSONPath.SEP}", expr)
+        expr = JSONPath.REP_DOT.sub(JSONPath.SEP, expr)
+        # put back
+        expr = JSONPath.REP_PUT_PAREN.sub(self._put_paren, expr)
+        expr = JSONPath.REP_PUT_BRACKET.sub(self._put_bracket, expr)
+        expr = JSONPath.REP_PUT_BACKQUOTE.sub(self._put_backquote, expr)
+        expr = JSONPath.REP_PUT_QUOTE.sub(self._put_quote, expr)
+        if expr.startswith("$;"):
+            expr = expr[2:]
+
+        logger.debug(f"after expr  : {expr}")
+        return expr
+
+    # TODO abstract get and put procedures
+    def _get_quote(self, m):
+        n = len(self.subx["#Q"])
+        self.subx["#Q"].append(m.group(1))
+        return f"#Q{n}"
+
+    def _put_quote(self, m):
+        return self.subx["#Q"][int(m.group(1))]
+
+    def _get_backquote(self, m):
+        n = len(self.subx["#BQ"])
+        self.subx["#BQ"].append(m.group(1))
+        return f"`#BQ{n}`"
+
+    def _put_backquote(self, m):
+        return self.subx["#BQ"][int(m.group(1))]
+
+    def _get_bracket(self, m):
+        n = len(self.subx["#B"])
+        self.subx["#B"].append(m.group(1))
+        return f".#B{n}"
+
+    def _put_bracket(self, m):
+        return self.subx["#B"][int(m.group(1))]
+
+    def _get_paren(self, m):
+        n = len(self.subx["#P"])
+        self.subx["#P"].append(m.group(1))
+        return f"(#P{n})"
+
+    def _put_paren(self, m):
+        return self.subx["#P"][int(m.group(1))]
+
+    @staticmethod
+    def _gen_obj(m):
+        ret = "__obj"
+        for e in m.group(1).split("."):
+            ret += '["%s"]' % e
+        return ret
+
+    @staticmethod
+    def _traverse(f, obj, i: int, path: str, *args):
+        if isinstance(obj, list):
+            for idx, v in enumerate(obj):
+                f(v, i, f"{path}{JSONPath.SEP}{idx}", *args)
+        elif isinstance(obj, dict):
+            for k, v in obj.items():
+                f(v, i, f"{path}{JSONPath.SEP}{k}", *args)
+
+    @staticmethod
+    def _getattr(obj: dict, path: str, *, convert_number_str=False):
+        r = obj
+        for k in path.split("."):
+            try:
+                r = r.get(k)
+            except (AttributeError, KeyError) as err:
+                logger.error(err)
+                return None
+        if convert_number_str and isinstance(r, str):
+            try:
+                if r.isdigit():
+                    return int(r)
+                return float(r)
+            except ValueError:
+                pass
+        return r
+
+    @staticmethod
+    def _sorter(obj, sortbys):
+        for sortby in sortbys.split(",")[::-1]:
+            if sortby.startswith("~"):
+                obj.sort(
+                    key=lambda t, k=sortby: JSONPath._getattr(
+                        t[1], k[1:], convert_number_str=True
+                    ),
+                    reverse=True,
+                )
+            else:
+                obj.sort(
+                    key=lambda t, k=sortby: JSONPath._getattr(
+                        t[1], k, convert_number_str=True
+                    )
+                )
+
+    def _filter(self, obj, i: int, path: str, step: str):
+        r = False
+        try:
+            r = self.eval_func(step, None, {"__obj": obj})
+        except Exception as err:
+            logger.error(err)
+        if r:
+            self._trace(obj, i, path)
+
+    def _trace(self, obj, i: int, path):
+        """Perform operation on object.
+
+        Args:
+            obj ([type]): current operating object
+            i (int): current operation specified by index in self.segments
+        """
+
+        # store
+        if i >= self.lpath:
+            if self.result_type == "VALUE":
+                self.result.append(obj)
+            elif self.result_type == "PATH":
+                self.result.append(path)
+            logger.debug(f"path: {path} | value: {obj}")
+            return
+
+        step = self.segments[i]
+
+        # wildcard
+        if step == "*":
+            self._traverse(self._trace, obj, i + 1, path)
+            return
+
+        # recursive descent
+        if step == "..":
+            self._trace(obj, i + 1, path)
+            self._traverse(self._trace, obj, i, path)
+            return
+
+        # get value from list
+        if isinstance(obj, list) and step.isdigit():
+            ikey = int(step)
+            if ikey < len(obj):
+                self._trace(obj[ikey], i + 1, f"{path}{JSONPath.SEP}{step}")
+            return
+
+        # get value from dict
+        if isinstance(obj, dict) and step in obj:
+            self._trace(obj[step], i + 1, f"{path}{JSONPath.SEP}{step}")
+            return
+
+        # slice
+        if isinstance(obj, list) and JSONPath.REP_SLICE_CONTENT.fullmatch(step):
+            obj = list(enumerate(obj))
+            vals = self.eval_func(f"obj[{step}]")
+            for idx, v in vals:
+                self._trace(v, i + 1, f"{path}{JSONPath.SEP}{idx}")
+            return
+
+        # select
+        if isinstance(obj, dict) and JSONPath.REP_SELECT_CONTENT.fullmatch(step):
+            for k in step.split(","):
+                if k in obj:
+                    self._trace(obj[k], i + 1, f"{path}{JSONPath.SEP}{k}")
+            return
+
+        # filter
+        if step.startswith("?(") and step.endswith(")"):
+            step = step[2:-1]
+            step = JSONPath.REP_FILTER_CONTENT.sub(self._gen_obj, step)
+            self._traverse(self._filter, obj, i + 1, path, step)
+            return
+
+        # sorter
+        if step.startswith("/(") and step.endswith(")"):
+            if isinstance(obj, list):
+                obj = list(enumerate(obj))
+                self._sorter(obj, step[2:-1])
+                for idx, v in obj:
+                    self._trace(v, i + 1, f"{path}{JSONPath.SEP}{idx}")
+            elif isinstance(obj, dict):
+                obj = list(obj.items())
+                self._sorter(obj, step[2:-1])
+                for k, v in obj:
+                    self._trace(v, i + 1, f"{path}{JSONPath.SEP}{k}")
+            else:
+                raise ExprSyntaxError("sorter must acting on list or dict")
+            return
+
+        # field-extractor
+        if step.startswith("(") and step.endswith(")"):
+            if isinstance(obj, dict):
+                obj_ = {}
+                for k in step[1:-1].split(","):
+                    obj_[k] = self._getattr(obj, k)
+                self._trace(obj_, i + 1, path)
+            else:
+                raise ExprSyntaxError("field-extractor must acting on dict")
+
+            return
+
+
+def compile(expr):
+    return JSONPath(expr)
+
+
+# global cache
+_jsonpath_cache = {}
+
+
+def search(expr, data):
+    global _jsonpath_cache
+    if expr not in _jsonpath_cache:
+        _jsonpath_cache[expr] = JSONPath(expr)
+    return _jsonpath_cache[expr].parse(data)
+
+
+if __name__ == "__main__":
+    with open("test/data/2.json", "rb") as f:
+        d = json.load(f)
+    D = JSONPath("$.scores[/(score)].(score)").parse(d, "VALUE")
+    print(D)
+    for v in D:
+        print(v)