about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/mysql/reflection.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sqlalchemy/dialects/mysql/reflection.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/dialects/mysql/reflection.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sqlalchemy/dialects/mysql/reflection.py677
1 files changed, 677 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/mysql/reflection.py b/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/mysql/reflection.py
new file mode 100644
index 00000000..3998be97
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/mysql/reflection.py
@@ -0,0 +1,677 @@
+# dialects/mysql/reflection.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+
+import re
+
+from .enumerated import ENUM
+from .enumerated import SET
+from .types import DATETIME
+from .types import TIME
+from .types import TIMESTAMP
+from ... import log
+from ... import types as sqltypes
+from ... import util
+
+
+class ReflectedState:
+    """Stores raw information about a SHOW CREATE TABLE statement."""
+
+    def __init__(self):
+        self.columns = []
+        self.table_options = {}
+        self.table_name = None
+        self.keys = []
+        self.fk_constraints = []
+        self.ck_constraints = []
+
+
+@log.class_logger
+class MySQLTableDefinitionParser:
+    """Parses the results of a SHOW CREATE TABLE statement."""
+
+    def __init__(self, dialect, preparer):
+        self.dialect = dialect
+        self.preparer = preparer
+        self._prep_regexes()
+
+    def parse(self, show_create, charset):
+        state = ReflectedState()
+        state.charset = charset
+        for line in re.split(r"\r?\n", show_create):
+            if line.startswith("  " + self.preparer.initial_quote):
+                self._parse_column(line, state)
+            # a regular table options line
+            elif line.startswith(") "):
+                self._parse_table_options(line, state)
+            # an ANSI-mode table options line
+            elif line == ")":
+                pass
+            elif line.startswith("CREATE "):
+                self._parse_table_name(line, state)
+            elif "PARTITION" in line:
+                self._parse_partition_options(line, state)
+            # Not present in real reflection, but may be if
+            # loading from a file.
+            elif not line:
+                pass
+            else:
+                type_, spec = self._parse_constraints(line)
+                if type_ is None:
+                    util.warn("Unknown schema content: %r" % line)
+                elif type_ == "key":
+                    state.keys.append(spec)
+                elif type_ == "fk_constraint":
+                    state.fk_constraints.append(spec)
+                elif type_ == "ck_constraint":
+                    state.ck_constraints.append(spec)
+                else:
+                    pass
+        return state
+
+    def _check_view(self, sql: str) -> bool:
+        return bool(self._re_is_view.match(sql))
+
+    def _parse_constraints(self, line):
+        """Parse a KEY or CONSTRAINT line.
+
+        :param line: A line of SHOW CREATE TABLE output
+        """
+
+        # KEY
+        m = self._re_key.match(line)
+        if m:
+            spec = m.groupdict()
+            # convert columns into name, length pairs
+            # NOTE: we may want to consider SHOW INDEX as the
+            # format of indexes in MySQL becomes more complex
+            spec["columns"] = self._parse_keyexprs(spec["columns"])
+            if spec["version_sql"]:
+                m2 = self._re_key_version_sql.match(spec["version_sql"])
+                if m2 and m2.groupdict()["parser"]:
+                    spec["parser"] = m2.groupdict()["parser"]
+            if spec["parser"]:
+                spec["parser"] = self.preparer.unformat_identifiers(
+                    spec["parser"]
+                )[0]
+            return "key", spec
+
+        # FOREIGN KEY CONSTRAINT
+        m = self._re_fk_constraint.match(line)
+        if m:
+            spec = m.groupdict()
+            spec["table"] = self.preparer.unformat_identifiers(spec["table"])
+            spec["local"] = [c[0] for c in self._parse_keyexprs(spec["local"])]
+            spec["foreign"] = [
+                c[0] for c in self._parse_keyexprs(spec["foreign"])
+            ]
+            return "fk_constraint", spec
+
+        # CHECK constraint
+        m = self._re_ck_constraint.match(line)
+        if m:
+            spec = m.groupdict()
+            return "ck_constraint", spec
+
+        # PARTITION and SUBPARTITION
+        m = self._re_partition.match(line)
+        if m:
+            # Punt!
+            return "partition", line
+
+        # No match.
+        return (None, line)
+
+    def _parse_table_name(self, line, state):
+        """Extract the table name.
+
+        :param line: The first line of SHOW CREATE TABLE
+        """
+
+        regex, cleanup = self._pr_name
+        m = regex.match(line)
+        if m:
+            state.table_name = cleanup(m.group("name"))
+
+    def _parse_table_options(self, line, state):
+        """Build a dictionary of all reflected table-level options.
+
+        :param line: The final line of SHOW CREATE TABLE output.
+        """
+
+        options = {}
+
+        if line and line != ")":
+            rest_of_line = line
+            for regex, cleanup in self._pr_options:
+                m = regex.search(rest_of_line)
+                if not m:
+                    continue
+                directive, value = m.group("directive"), m.group("val")
+                if cleanup:
+                    value = cleanup(value)
+                options[directive.lower()] = value
+                rest_of_line = regex.sub("", rest_of_line)
+
+        for nope in ("auto_increment", "data directory", "index directory"):
+            options.pop(nope, None)
+
+        for opt, val in options.items():
+            state.table_options["%s_%s" % (self.dialect.name, opt)] = val
+
+    def _parse_partition_options(self, line, state):
+        options = {}
+        new_line = line[:]
+
+        while new_line.startswith("(") or new_line.startswith(" "):
+            new_line = new_line[1:]
+
+        for regex, cleanup in self._pr_options:
+            m = regex.search(new_line)
+            if not m or "PARTITION" not in regex.pattern:
+                continue
+
+            directive = m.group("directive")
+            directive = directive.lower()
+            is_subpartition = directive == "subpartition"
+
+            if directive == "partition" or is_subpartition:
+                new_line = new_line.replace(") */", "")
+                new_line = new_line.replace(",", "")
+                if is_subpartition and new_line.endswith(")"):
+                    new_line = new_line[:-1]
+                if self.dialect.name == "mariadb" and new_line.endswith(")"):
+                    if (
+                        "MAXVALUE" in new_line
+                        or "MINVALUE" in new_line
+                        or "ENGINE" in new_line
+                    ):
+                        # final line of MariaDB partition endswith ")"
+                        new_line = new_line[:-1]
+
+                defs = "%s_%s_definitions" % (self.dialect.name, directive)
+                options[defs] = new_line
+
+            else:
+                directive = directive.replace(" ", "_")
+                value = m.group("val")
+                if cleanup:
+                    value = cleanup(value)
+                options[directive] = value
+            break
+
+        for opt, val in options.items():
+            part_def = "%s_partition_definitions" % (self.dialect.name)
+            subpart_def = "%s_subpartition_definitions" % (self.dialect.name)
+            if opt == part_def or opt == subpart_def:
+                # builds a string of definitions
+                if opt not in state.table_options:
+                    state.table_options[opt] = val
+                else:
+                    state.table_options[opt] = "%s, %s" % (
+                        state.table_options[opt],
+                        val,
+                    )
+            else:
+                state.table_options["%s_%s" % (self.dialect.name, opt)] = val
+
+    def _parse_column(self, line, state):
+        """Extract column details.
+
+        Falls back to a 'minimal support' variant if full parse fails.
+
+        :param line: Any column-bearing line from SHOW CREATE TABLE
+        """
+
+        spec = None
+        m = self._re_column.match(line)
+        if m:
+            spec = m.groupdict()
+            spec["full"] = True
+        else:
+            m = self._re_column_loose.match(line)
+            if m:
+                spec = m.groupdict()
+                spec["full"] = False
+        if not spec:
+            util.warn("Unknown column definition %r" % line)
+            return
+        if not spec["full"]:
+            util.warn("Incomplete reflection of column definition %r" % line)
+
+        name, type_, args = spec["name"], spec["coltype"], spec["arg"]
+
+        try:
+            col_type = self.dialect.ischema_names[type_]
+        except KeyError:
+            util.warn(
+                "Did not recognize type '%s' of column '%s'" % (type_, name)
+            )
+            col_type = sqltypes.NullType
+
+        # Column type positional arguments eg. varchar(32)
+        if args is None or args == "":
+            type_args = []
+        elif args[0] == "'" and args[-1] == "'":
+            type_args = self._re_csv_str.findall(args)
+        else:
+            type_args = [int(v) for v in self._re_csv_int.findall(args)]
+
+        # Column type keyword options
+        type_kw = {}
+
+        if issubclass(col_type, (DATETIME, TIME, TIMESTAMP)):
+            if type_args:
+                type_kw["fsp"] = type_args.pop(0)
+
+        for kw in ("unsigned", "zerofill"):
+            if spec.get(kw, False):
+                type_kw[kw] = True
+        for kw in ("charset", "collate"):
+            if spec.get(kw, False):
+                type_kw[kw] = spec[kw]
+        if issubclass(col_type, (ENUM, SET)):
+            type_args = _strip_values(type_args)
+
+            if issubclass(col_type, SET) and "" in type_args:
+                type_kw["retrieve_as_bitwise"] = True
+
+        type_instance = col_type(*type_args, **type_kw)
+
+        col_kw = {}
+
+        # NOT NULL
+        col_kw["nullable"] = True
+        # this can be "NULL" in the case of TIMESTAMP
+        if spec.get("notnull", False) == "NOT NULL":
+            col_kw["nullable"] = False
+        # For generated columns, the nullability is marked in a different place
+        if spec.get("notnull_generated", False) == "NOT NULL":
+            col_kw["nullable"] = False
+
+        # AUTO_INCREMENT
+        if spec.get("autoincr", False):
+            col_kw["autoincrement"] = True
+        elif issubclass(col_type, sqltypes.Integer):
+            col_kw["autoincrement"] = False
+
+        # DEFAULT
+        default = spec.get("default", None)
+
+        if default == "NULL":
+            # eliminates the need to deal with this later.
+            default = None
+
+        comment = spec.get("comment", None)
+
+        if comment is not None:
+            comment = cleanup_text(comment)
+
+        sqltext = spec.get("generated")
+        if sqltext is not None:
+            computed = dict(sqltext=sqltext)
+            persisted = spec.get("persistence")
+            if persisted is not None:
+                computed["persisted"] = persisted == "STORED"
+            col_kw["computed"] = computed
+
+        col_d = dict(
+            name=name, type=type_instance, default=default, comment=comment
+        )
+        col_d.update(col_kw)
+        state.columns.append(col_d)
+
+    def _describe_to_create(self, table_name, columns):
+        """Re-format DESCRIBE output as a SHOW CREATE TABLE string.
+
+        DESCRIBE is a much simpler reflection and is sufficient for
+        reflecting views for runtime use.  This method formats DDL
+        for columns only- keys are omitted.
+
+        :param columns: A sequence of DESCRIBE or SHOW COLUMNS 6-tuples.
+          SHOW FULL COLUMNS FROM rows must be rearranged for use with
+          this function.
+        """
+
+        buffer = []
+        for row in columns:
+            (name, col_type, nullable, default, extra) = (
+                row[i] for i in (0, 1, 2, 4, 5)
+            )
+
+            line = [" "]
+            line.append(self.preparer.quote_identifier(name))
+            line.append(col_type)
+            if not nullable:
+                line.append("NOT NULL")
+            if default:
+                if "auto_increment" in default:
+                    pass
+                elif col_type.startswith("timestamp") and default.startswith(
+                    "C"
+                ):
+                    line.append("DEFAULT")
+                    line.append(default)
+                elif default == "NULL":
+                    line.append("DEFAULT")
+                    line.append(default)
+                else:
+                    line.append("DEFAULT")
+                    line.append("'%s'" % default.replace("'", "''"))
+            if extra:
+                line.append(extra)
+
+            buffer.append(" ".join(line))
+
+        return "".join(
+            [
+                (
+                    "CREATE TABLE %s (\n"
+                    % self.preparer.quote_identifier(table_name)
+                ),
+                ",\n".join(buffer),
+                "\n) ",
+            ]
+        )
+
+    def _parse_keyexprs(self, identifiers):
+        """Unpack '"col"(2),"col" ASC'-ish strings into components."""
+
+        return [
+            (colname, int(length) if length else None, modifiers)
+            for colname, length, modifiers in self._re_keyexprs.findall(
+                identifiers
+            )
+        ]
+
+    def _prep_regexes(self):
+        """Pre-compile regular expressions."""
+
+        self._re_columns = []
+        self._pr_options = []
+
+        _final = self.preparer.final_quote
+
+        quotes = dict(
+            zip(
+                ("iq", "fq", "esc_fq"),
+                [
+                    re.escape(s)
+                    for s in (
+                        self.preparer.initial_quote,
+                        _final,
+                        self.preparer._escape_identifier(_final),
+                    )
+                ],
+            )
+        )
+
+        self._pr_name = _pr_compile(
+            r"^CREATE (?:\w+ +)?TABLE +"
+            r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +\($" % quotes,
+            self.preparer._unescape_identifier,
+        )
+
+        self._re_is_view = _re_compile(r"^CREATE(?! TABLE)(\s.*)?\sVIEW")
+
+        # `col`,`col2`(32),`col3`(15) DESC
+        #
+        self._re_keyexprs = _re_compile(
+            r"(?:"
+            r"(?:%(iq)s((?:%(esc_fq)s|[^%(fq)s])+)%(fq)s)"
+            r"(?:\((\d+)\))?(?: +(ASC|DESC))?(?=\,|$))+" % quotes
+        )
+
+        # 'foo' or 'foo','bar' or 'fo,o','ba''a''r'
+        self._re_csv_str = _re_compile(r"\x27(?:\x27\x27|[^\x27])*\x27")
+
+        # 123 or 123,456
+        self._re_csv_int = _re_compile(r"\d+")
+
+        # `colname` <type> [type opts]
+        #  (NOT NULL | NULL)
+        #   DEFAULT ('value' | CURRENT_TIMESTAMP...)
+        #   COMMENT 'comment'
+        #  COLUMN_FORMAT (FIXED|DYNAMIC|DEFAULT)
+        #  STORAGE (DISK|MEMORY)
+        self._re_column = _re_compile(
+            r"  "
+            r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +"
+            r"(?P<coltype>\w+)"
+            r"(?:\((?P<arg>(?:\d+|\d+,\d+|"
+            r"(?:'(?:''|[^'])*',?)+))\))?"
+            r"(?: +(?P<unsigned>UNSIGNED))?"
+            r"(?: +(?P<zerofill>ZEROFILL))?"
+            r"(?: +CHARACTER SET +(?P<charset>[\w_]+))?"
+            r"(?: +COLLATE +(?P<collate>[\w_]+))?"
+            r"(?: +(?P<notnull>(?:NOT )?NULL))?"
+            r"(?: +DEFAULT +(?P<default>"
+            r"(?:NULL|'(?:''|[^'])*'|[\-\w\.\(\)]+"
+            r"(?: +ON UPDATE [\-\w\.\(\)]+)?)"
+            r"))?"
+            r"(?: +(?:GENERATED ALWAYS)? ?AS +(?P<generated>\("
+            r".*\))? ?(?P<persistence>VIRTUAL|STORED)?"
+            r"(?: +(?P<notnull_generated>(?:NOT )?NULL))?"
+            r")?"
+            r"(?: +(?P<autoincr>AUTO_INCREMENT))?"
+            r"(?: +COMMENT +'(?P<comment>(?:''|[^'])*)')?"
+            r"(?: +COLUMN_FORMAT +(?P<colfmt>\w+))?"
+            r"(?: +STORAGE +(?P<storage>\w+))?"
+            r"(?: +(?P<extra>.*))?"
+            r",?$" % quotes
+        )
+
+        # Fallback, try to parse as little as possible
+        self._re_column_loose = _re_compile(
+            r"  "
+            r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +"
+            r"(?P<coltype>\w+)"
+            r"(?:\((?P<arg>(?:\d+|\d+,\d+|\x27(?:\x27\x27|[^\x27])+\x27))\))?"
+            r".*?(?P<notnull>(?:NOT )NULL)?" % quotes
+        )
+
+        # (PRIMARY|UNIQUE|FULLTEXT|SPATIAL) INDEX `name` (USING (BTREE|HASH))?
+        # (`col` (ASC|DESC)?, `col` (ASC|DESC)?)
+        # KEY_BLOCK_SIZE size | WITH PARSER name  /*!50100 WITH PARSER name */
+        self._re_key = _re_compile(
+            r"  "
+            r"(?:(?P<type>\S+) )?KEY"
+            r"(?: +%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s)?"
+            r"(?: +USING +(?P<using_pre>\S+))?"
+            r" +\((?P<columns>.+?)\)"
+            r"(?: +USING +(?P<using_post>\S+))?"
+            r"(?: +KEY_BLOCK_SIZE *[ =]? *(?P<keyblock>\S+))?"
+            r"(?: +WITH PARSER +(?P<parser>\S+))?"
+            r"(?: +COMMENT +(?P<comment>(\x27\x27|\x27([^\x27])*?\x27)+))?"
+            r"(?: +/\*(?P<version_sql>.+)\*/ *)?"
+            r",?$" % quotes
+        )
+
+        # https://forums.mysql.com/read.php?20,567102,567111#msg-567111
+        # It means if the MySQL version >= \d+, execute what's in the comment
+        self._re_key_version_sql = _re_compile(
+            r"\!\d+ " r"(?: *WITH PARSER +(?P<parser>\S+) *)?"
+        )
+
+        # CONSTRAINT `name` FOREIGN KEY (`local_col`)
+        # REFERENCES `remote` (`remote_col`)
+        # MATCH FULL | MATCH PARTIAL | MATCH SIMPLE
+        # ON DELETE CASCADE ON UPDATE RESTRICT
+        #
+        # unique constraints come back as KEYs
+        kw = quotes.copy()
+        kw["on"] = "RESTRICT|CASCADE|SET NULL|NO ACTION|SET DEFAULT"
+        self._re_fk_constraint = _re_compile(
+            r"  "
+            r"CONSTRAINT +"
+            r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +"
+            r"FOREIGN KEY +"
+            r"\((?P<local>[^\)]+?)\) REFERENCES +"
+            r"(?P<table>%(iq)s[^%(fq)s]+%(fq)s"
+            r"(?:\.%(iq)s[^%(fq)s]+%(fq)s)?) +"
+            r"\((?P<foreign>(?:%(iq)s[^%(fq)s]+%(fq)s(?: *, *)?)+)\)"
+            r"(?: +(?P<match>MATCH \w+))?"
+            r"(?: +ON DELETE (?P<ondelete>%(on)s))?"
+            r"(?: +ON UPDATE (?P<onupdate>%(on)s))?" % kw
+        )
+
+        # CONSTRAINT `CONSTRAINT_1` CHECK (`x` > 5)'
+        # testing on MariaDB 10.2 shows that the CHECK constraint
+        # is returned on a line by itself, so to match without worrying
+        # about parenthesis in the expression we go to the end of the line
+        self._re_ck_constraint = _re_compile(
+            r"  "
+            r"CONSTRAINT +"
+            r"%(iq)s(?P<name>(?:%(esc_fq)s|[^%(fq)s])+)%(fq)s +"
+            r"CHECK +"
+            r"\((?P<sqltext>.+)\),?" % kw
+        )
+
+        # PARTITION
+        #
+        # punt!
+        self._re_partition = _re_compile(r"(?:.*)(?:SUB)?PARTITION(?:.*)")
+
+        # Table-level options (COLLATE, ENGINE, etc.)
+        # Do the string options first, since they have quoted
+        # strings we need to get rid of.
+        for option in _options_of_type_string:
+            self._add_option_string(option)
+
+        for option in (
+            "ENGINE",
+            "TYPE",
+            "AUTO_INCREMENT",
+            "AVG_ROW_LENGTH",
+            "CHARACTER SET",
+            "DEFAULT CHARSET",
+            "CHECKSUM",
+            "COLLATE",
+            "DELAY_KEY_WRITE",
+            "INSERT_METHOD",
+            "MAX_ROWS",
+            "MIN_ROWS",
+            "PACK_KEYS",
+            "ROW_FORMAT",
+            "KEY_BLOCK_SIZE",
+            "STATS_SAMPLE_PAGES",
+        ):
+            self._add_option_word(option)
+
+        for option in (
+            "PARTITION BY",
+            "SUBPARTITION BY",
+            "PARTITIONS",
+            "SUBPARTITIONS",
+            "PARTITION",
+            "SUBPARTITION",
+        ):
+            self._add_partition_option_word(option)
+
+        self._add_option_regex("UNION", r"\([^\)]+\)")
+        self._add_option_regex("TABLESPACE", r".*? STORAGE DISK")
+        self._add_option_regex(
+            "RAID_TYPE",
+            r"\w+\s+RAID_CHUNKS\s*\=\s*\w+RAID_CHUNKSIZE\s*=\s*\w+",
+        )
+
+    _optional_equals = r"(?:\s*(?:=\s*)|\s+)"
+
+    def _add_option_string(self, directive):
+        regex = r"(?P<directive>%s)%s" r"'(?P<val>(?:[^']|'')*?)'(?!')" % (
+            re.escape(directive),
+            self._optional_equals,
+        )
+        self._pr_options.append(_pr_compile(regex, cleanup_text))
+
+    def _add_option_word(self, directive):
+        regex = r"(?P<directive>%s)%s" r"(?P<val>\w+)" % (
+            re.escape(directive),
+            self._optional_equals,
+        )
+        self._pr_options.append(_pr_compile(regex))
+
+    def _add_partition_option_word(self, directive):
+        if directive == "PARTITION BY" or directive == "SUBPARTITION BY":
+            regex = r"(?<!\S)(?P<directive>%s)%s" r"(?P<val>\w+.*)" % (
+                re.escape(directive),
+                self._optional_equals,
+            )
+        elif directive == "SUBPARTITIONS" or directive == "PARTITIONS":
+            regex = r"(?<!\S)(?P<directive>%s)%s" r"(?P<val>\d+)" % (
+                re.escape(directive),
+                self._optional_equals,
+            )
+        else:
+            regex = r"(?<!\S)(?P<directive>%s)(?!\S)" % (re.escape(directive),)
+        self._pr_options.append(_pr_compile(regex))
+
+    def _add_option_regex(self, directive, regex):
+        regex = r"(?P<directive>%s)%s" r"(?P<val>%s)" % (
+            re.escape(directive),
+            self._optional_equals,
+            regex,
+        )
+        self._pr_options.append(_pr_compile(regex))
+
+
+_options_of_type_string = (
+    "COMMENT",
+    "DATA DIRECTORY",
+    "INDEX DIRECTORY",
+    "PASSWORD",
+    "CONNECTION",
+)
+
+
+def _pr_compile(regex, cleanup=None):
+    """Prepare a 2-tuple of compiled regex and callable."""
+
+    return (_re_compile(regex), cleanup)
+
+
+def _re_compile(regex):
+    """Compile a string to regex, I and UNICODE."""
+
+    return re.compile(regex, re.I | re.UNICODE)
+
+
+def _strip_values(values):
+    "Strip reflected values quotes"
+    strip_values = []
+    for a in values:
+        if a[0:1] == '"' or a[0:1] == "'":
+            # strip enclosing quotes and unquote interior
+            a = a[1:-1].replace(a[0] * 2, a[0])
+        strip_values.append(a)
+    return strip_values
+
+
+def cleanup_text(raw_text: str) -> str:
+    if "\\" in raw_text:
+        raw_text = re.sub(
+            _control_char_regexp, lambda s: _control_char_map[s[0]], raw_text
+        )
+    return raw_text.replace("''", "'")
+
+
+_control_char_map = {
+    "\\\\": "\\",
+    "\\0": "\0",
+    "\\a": "\a",
+    "\\b": "\b",
+    "\\t": "\t",
+    "\\n": "\n",
+    "\\v": "\v",
+    "\\f": "\f",
+    "\\r": "\r",
+    # '\\e':'\e',
+}
+_control_char_regexp = re.compile(
+    "|".join(re.escape(k) for k in _control_char_map)
+)