diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/apscheduler/util.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/apscheduler/util.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/apscheduler/util.py | 461 |
1 files changed, 461 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/util.py b/.venv/lib/python3.12/site-packages/apscheduler/util.py new file mode 100644 index 00000000..82eb8c07 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/apscheduler/util.py @@ -0,0 +1,461 @@ +"""This module contains several handy functions primarily meant for internal use.""" + +__all__ = ( + "asint", + "asbool", + "astimezone", + "convert_to_datetime", + "datetime_to_utc_timestamp", + "utc_timestamp_to_datetime", + "datetime_ceil", + "get_callable_name", + "obj_to_ref", + "ref_to_obj", + "maybe_ref", + "check_callable_args", + "normalize", + "localize", + "undefined", +) + +import re +import sys +from calendar import timegm +from datetime import date, datetime, time, timedelta, timezone, tzinfo +from functools import partial +from inspect import isbuiltin, isclass, isfunction, ismethod, signature + +if sys.version_info < (3, 14): + from asyncio import iscoroutinefunction +else: + from inspect import iscoroutinefunction + +if sys.version_info < (3, 9): + from backports.zoneinfo import ZoneInfo +else: + from zoneinfo import ZoneInfo + + +class _Undefined: + def __nonzero__(self): + return False + + def __bool__(self): + return False + + def __repr__(self): + return "<undefined>" + + +undefined = ( + _Undefined() +) #: a unique object that only signifies that no value is defined + + +def asint(text): + """ + Safely converts a string to an integer, returning ``None`` if the string is ``None``. + + :type text: str + :rtype: int + + """ + if text is not None: + return int(text) + + +def asbool(obj): + """ + Interprets an object as a boolean value. + + :rtype: bool + + """ + if isinstance(obj, str): + obj = obj.strip().lower() + if obj in ("true", "yes", "on", "y", "t", "1"): + return True + + if obj in ("false", "no", "off", "n", "f", "0"): + return False + + raise ValueError(f'Unable to interpret value "{obj}" as boolean') + + return bool(obj) + + +def astimezone(obj): + """ + Interprets an object as a timezone. + + :rtype: tzinfo + + """ + if isinstance(obj, str): + if obj == "UTC": + return timezone.utc + + return ZoneInfo(obj) + + if isinstance(obj, tzinfo): + if obj.tzname(None) == "local": + raise ValueError( + "Unable to determine the name of the local timezone -- you must " + "explicitly specify the name of the local timezone. Please refrain " + "from using timezones like EST to prevent problems with daylight " + "saving time. Instead, use a locale based timezone name (such as " + "Europe/Helsinki)." + ) + elif isinstance(obj, ZoneInfo): + return obj + elif hasattr(obj, "zone"): + # pytz timezones + if obj.zone: + return ZoneInfo(obj.zone) + + return timezone(obj._offset) + + return obj + + if obj is not None: + raise TypeError(f"Expected tzinfo, got {obj.__class__.__name__} instead") + + +def asdate(obj): + if isinstance(obj, str): + return date.fromisoformat(obj) + + return obj + + +_DATE_REGEX = re.compile( + r"(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})" + r"(?:[ T](?P<hour>\d{1,2}):(?P<minute>\d{1,2}):(?P<second>\d{1,2})" + r"(?:\.(?P<microsecond>\d{1,6}))?" + r"(?P<timezone>Z|[+-]\d\d:\d\d)?)?$" +) + + +def convert_to_datetime(input, tz, arg_name): + """ + Converts the given object to a timezone aware datetime object. + + If a timezone aware datetime object is passed, it is returned unmodified. + If a native datetime object is passed, it is given the specified timezone. + If the input is a string, it is parsed as a datetime with the given timezone. + + Date strings are accepted in three different forms: date only (Y-m-d), date with + time (Y-m-d H:M:S) or with date+time with microseconds (Y-m-d H:M:S.micro). + Additionally you can override the time zone by giving a specific offset in the + format specified by ISO 8601: Z (UTC), +HH:MM or -HH:MM. + + :param str|datetime input: the datetime or string to convert to a timezone aware + datetime + :param datetime.tzinfo tz: timezone to interpret ``input`` in + :param str arg_name: the name of the argument (used in an error message) + :rtype: datetime + + """ + if input is None: + return + elif isinstance(input, datetime): + datetime_ = input + elif isinstance(input, date): + datetime_ = datetime.combine(input, time()) + elif isinstance(input, str): + m = _DATE_REGEX.match(input) + if not m: + raise ValueError("Invalid date string") + + values = m.groupdict() + tzname = values.pop("timezone") + if tzname == "Z": + tz = timezone.utc + elif tzname: + hours, minutes = (int(x) for x in tzname[1:].split(":")) + sign = 1 if tzname[0] == "+" else -1 + tz = timezone(sign * timedelta(hours=hours, minutes=minutes)) + + values = {k: int(v or 0) for k, v in values.items()} + datetime_ = datetime(**values) + else: + raise TypeError(f"Unsupported type for {arg_name}: {input.__class__.__name__}") + + if datetime_.tzinfo is not None: + return datetime_ + if tz is None: + raise ValueError( + f'The "tz" argument must be specified if {arg_name} has no timezone information' + ) + if isinstance(tz, str): + tz = astimezone(tz) + + return localize(datetime_, tz) + + +def datetime_to_utc_timestamp(timeval): + """ + Converts a datetime instance to a timestamp. + + :type timeval: datetime + :rtype: float + + """ + if timeval is not None: + return timegm(timeval.utctimetuple()) + timeval.microsecond / 1000000 + + +def utc_timestamp_to_datetime(timestamp): + """ + Converts the given timestamp to a datetime instance. + + :type timestamp: float + :rtype: datetime + + """ + if timestamp is not None: + return datetime.fromtimestamp(timestamp, timezone.utc) + + +def timedelta_seconds(delta): + """ + Converts the given timedelta to seconds. + + :type delta: timedelta + :rtype: float + + """ + return delta.days * 24 * 60 * 60 + delta.seconds + delta.microseconds / 1000000.0 + + +def datetime_ceil(dateval): + """ + Rounds the given datetime object upwards. + + :type dateval: datetime + + """ + if dateval.microsecond > 0: + return dateval + timedelta(seconds=1, microseconds=-dateval.microsecond) + return dateval + + +def datetime_repr(dateval): + return dateval.strftime("%Y-%m-%d %H:%M:%S %Z") if dateval else "None" + + +def timezone_repr(timezone: tzinfo) -> str: + if isinstance(timezone, ZoneInfo): + return timezone.key + + return repr(timezone) + + +def get_callable_name(func): + """ + Returns the best available display name for the given function/callable. + + :rtype: str + + """ + if ismethod(func): + self = func.__self__ + cls = self if isclass(self) else type(self) + return f"{cls.__qualname__}.{func.__name__}" + elif isclass(func) or isfunction(func) or isbuiltin(func): + return func.__qualname__ + elif hasattr(func, "__call__") and callable(func.__call__): + # instance of a class with a __call__ method + return type(func).__qualname__ + + raise TypeError( + f"Unable to determine a name for {func!r} -- maybe it is not a callable?" + ) + + +def obj_to_ref(obj): + """ + Returns the path to the given callable. + + :rtype: str + :raises TypeError: if the given object is not callable + :raises ValueError: if the given object is a :class:`~functools.partial`, lambda or a nested + function + + """ + if isinstance(obj, partial): + raise ValueError("Cannot create a reference to a partial()") + + name = get_callable_name(obj) + if "<lambda>" in name: + raise ValueError("Cannot create a reference to a lambda") + if "<locals>" in name: + raise ValueError("Cannot create a reference to a nested function") + + if ismethod(obj): + module = obj.__self__.__module__ + else: + module = obj.__module__ + + return f"{module}:{name}" + + +def ref_to_obj(ref): + """ + Returns the object pointed to by ``ref``. + + :type ref: str + + """ + if not isinstance(ref, str): + raise TypeError("References must be strings") + if ":" not in ref: + raise ValueError("Invalid reference") + + modulename, rest = ref.split(":", 1) + try: + obj = __import__(modulename, fromlist=[rest]) + except ImportError as exc: + raise LookupError( + f"Error resolving reference {ref}: could not import module" + ) from exc + + try: + for name in rest.split("."): + obj = getattr(obj, name) + return obj + except Exception: + raise LookupError(f"Error resolving reference {ref}: error looking up object") + + +def maybe_ref(ref): + """ + Returns the object that the given reference points to, if it is indeed a reference. + If it is not a reference, the object is returned as-is. + + """ + if not isinstance(ref, str): + return ref + return ref_to_obj(ref) + + +def check_callable_args(func, args, kwargs): + """ + Ensures that the given callable can be called with the given arguments. + + :type args: tuple + :type kwargs: dict + + """ + pos_kwargs_conflicts = [] # parameters that have a match in both args and kwargs + positional_only_kwargs = [] # positional-only parameters that have a match in kwargs + unsatisfied_args = [] # parameters in signature that don't have a match in args or kwargs + unsatisfied_kwargs = [] # keyword-only arguments that don't have a match in kwargs + unmatched_args = list( + args + ) # args that didn't match any of the parameters in the signature + # kwargs that didn't match any of the parameters in the signature + unmatched_kwargs = list(kwargs) + # indicates if the signature defines *args and **kwargs respectively + has_varargs = has_var_kwargs = False + + try: + sig = signature(func, follow_wrapped=False) + except ValueError: + # signature() doesn't work against every kind of callable + return + + for param in sig.parameters.values(): + if param.kind == param.POSITIONAL_OR_KEYWORD: + if param.name in unmatched_kwargs and unmatched_args: + pos_kwargs_conflicts.append(param.name) + elif unmatched_args: + del unmatched_args[0] + elif param.name in unmatched_kwargs: + unmatched_kwargs.remove(param.name) + elif param.default is param.empty: + unsatisfied_args.append(param.name) + elif param.kind == param.POSITIONAL_ONLY: + if unmatched_args: + del unmatched_args[0] + elif param.name in unmatched_kwargs: + unmatched_kwargs.remove(param.name) + positional_only_kwargs.append(param.name) + elif param.default is param.empty: + unsatisfied_args.append(param.name) + elif param.kind == param.KEYWORD_ONLY: + if param.name in unmatched_kwargs: + unmatched_kwargs.remove(param.name) + elif param.default is param.empty: + unsatisfied_kwargs.append(param.name) + elif param.kind == param.VAR_POSITIONAL: + has_varargs = True + elif param.kind == param.VAR_KEYWORD: + has_var_kwargs = True + + # Make sure there are no conflicts between args and kwargs + if pos_kwargs_conflicts: + raise ValueError( + "The following arguments are supplied in both args and kwargs: {}".format( + ", ".join(pos_kwargs_conflicts) + ) + ) + + # Check if keyword arguments are being fed to positional-only parameters + if positional_only_kwargs: + raise ValueError( + "The following arguments cannot be given as keyword arguments: {}".format( + ", ".join(positional_only_kwargs) + ) + ) + + # Check that the number of positional arguments minus the number of matched kwargs + # matches the argspec + if unsatisfied_args: + raise ValueError( + "The following arguments have not been supplied: {}".format( + ", ".join(unsatisfied_args) + ) + ) + + # Check that all keyword-only arguments have been supplied + if unsatisfied_kwargs: + raise ValueError( + "The following keyword-only arguments have not been supplied in kwargs: " + "{}".format(", ".join(unsatisfied_kwargs)) + ) + + # Check that the callable can accept the given number of positional arguments + if not has_varargs and unmatched_args: + raise ValueError( + f"The list of positional arguments is longer than the target callable can " + f"handle (allowed: {len(args) - len(unmatched_args)}, given in args: " + f"{len(args)})" + ) + + # Check that the callable can accept the given keyword arguments + if not has_var_kwargs and unmatched_kwargs: + raise ValueError( + "The target callable does not accept the following keyword arguments: " + "{}".format(", ".join(unmatched_kwargs)) + ) + + +def iscoroutinefunction_partial(f): + while isinstance(f, partial): + f = f.func + + # The asyncio version of iscoroutinefunction includes testing for @coroutine + # decorations vs. the inspect version which does not. + return iscoroutinefunction(f) + + +def normalize(dt): + return datetime.fromtimestamp(dt.timestamp(), dt.tzinfo) + + +def localize(dt, tzinfo): + if hasattr(tzinfo, "localize"): + return tzinfo.localize(dt) + + return normalize(dt.replace(tzinfo=tzinfo)) |