about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/apscheduler/triggers/cron/__init__.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/apscheduler/triggers/cron/__init__.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/apscheduler/triggers/cron/__init__.py')
-rw-r--r--.venv/lib/python3.12/site-packages/apscheduler/triggers/cron/__init__.py289
1 files changed, 289 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/apscheduler/triggers/cron/__init__.py b/.venv/lib/python3.12/site-packages/apscheduler/triggers/cron/__init__.py
new file mode 100644
index 00000000..03be8196
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/apscheduler/triggers/cron/__init__.py
@@ -0,0 +1,289 @@
+from datetime import datetime, timedelta
+
+from tzlocal import get_localzone
+
+from apscheduler.triggers.base import BaseTrigger
+from apscheduler.triggers.cron.fields import (
+    DEFAULT_VALUES,
+    BaseField,
+    DayOfMonthField,
+    DayOfWeekField,
+    MonthField,
+    WeekField,
+)
+from apscheduler.util import (
+    astimezone,
+    convert_to_datetime,
+    datetime_ceil,
+    datetime_repr,
+)
+
+
+class CronTrigger(BaseTrigger):
+    """
+    Triggers when current time matches all specified time constraints,
+    similarly to how the UNIX cron scheduler works.
+
+    :param int|str year: 4-digit year
+    :param int|str month: month (1-12)
+    :param int|str day: day of month (1-31)
+    :param int|str week: ISO week (1-53)
+    :param int|str day_of_week: number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
+    :param int|str hour: hour (0-23)
+    :param int|str minute: minute (0-59)
+    :param int|str second: second (0-59)
+    :param datetime|str start_date: earliest possible date/time to trigger on (inclusive)
+    :param datetime|str end_date: latest possible date/time to trigger on (inclusive)
+    :param datetime.tzinfo|str timezone: time zone to use for the date/time calculations (defaults
+        to scheduler timezone)
+    :param int|None jitter: delay the job execution by ``jitter`` seconds at most
+
+    .. note:: The first weekday is always **monday**.
+    """
+
+    FIELD_NAMES = (
+        "year",
+        "month",
+        "day",
+        "week",
+        "day_of_week",
+        "hour",
+        "minute",
+        "second",
+    )
+    FIELDS_MAP = {
+        "year": BaseField,
+        "month": MonthField,
+        "week": WeekField,
+        "day": DayOfMonthField,
+        "day_of_week": DayOfWeekField,
+        "hour": BaseField,
+        "minute": BaseField,
+        "second": BaseField,
+    }
+
+    __slots__ = "timezone", "start_date", "end_date", "fields", "jitter"
+
+    def __init__(
+        self,
+        year=None,
+        month=None,
+        day=None,
+        week=None,
+        day_of_week=None,
+        hour=None,
+        minute=None,
+        second=None,
+        start_date=None,
+        end_date=None,
+        timezone=None,
+        jitter=None,
+    ):
+        if timezone:
+            self.timezone = astimezone(timezone)
+        elif isinstance(start_date, datetime) and start_date.tzinfo:
+            self.timezone = astimezone(start_date.tzinfo)
+        elif isinstance(end_date, datetime) and end_date.tzinfo:
+            self.timezone = astimezone(end_date.tzinfo)
+        else:
+            self.timezone = get_localzone()
+
+        self.start_date = convert_to_datetime(start_date, self.timezone, "start_date")
+        self.end_date = convert_to_datetime(end_date, self.timezone, "end_date")
+
+        self.jitter = jitter
+
+        values = dict(
+            (key, value)
+            for (key, value) in locals().items()
+            if key in self.FIELD_NAMES and value is not None
+        )
+        self.fields = []
+        assign_defaults = False
+        for field_name in self.FIELD_NAMES:
+            if field_name in values:
+                exprs = values.pop(field_name)
+                is_default = False
+                assign_defaults = not values
+            elif assign_defaults:
+                exprs = DEFAULT_VALUES[field_name]
+                is_default = True
+            else:
+                exprs = "*"
+                is_default = True
+
+            field_class = self.FIELDS_MAP[field_name]
+            field = field_class(field_name, exprs, is_default)
+            self.fields.append(field)
+
+    @classmethod
+    def from_crontab(cls, expr, timezone=None):
+        """
+        Create a :class:`~CronTrigger` from a standard crontab expression.
+
+        See https://en.wikipedia.org/wiki/Cron for more information on the format accepted here.
+
+        :param expr: minute, hour, day of month, month, day of week
+        :param datetime.tzinfo|str timezone: time zone to use for the date/time calculations (
+            defaults to scheduler timezone)
+        :return: a :class:`~CronTrigger` instance
+
+        """
+        values = expr.split()
+        if len(values) != 5:
+            raise ValueError(f"Wrong number of fields; got {len(values)}, expected 5")
+
+        return cls(
+            minute=values[0],
+            hour=values[1],
+            day=values[2],
+            month=values[3],
+            day_of_week=values[4],
+            timezone=timezone,
+        )
+
+    def _increment_field_value(self, dateval, fieldnum):
+        """
+        Increments the designated field and resets all less significant fields to their minimum
+        values.
+
+        :type dateval: datetime
+        :type fieldnum: int
+        :return: a tuple containing the new date, and the number of the field that was actually
+            incremented
+        :rtype: tuple
+        """
+
+        values = {}
+        i = 0
+        while i < len(self.fields):
+            field = self.fields[i]
+            if not field.REAL:
+                if i == fieldnum:
+                    fieldnum -= 1
+                    i -= 1
+                else:
+                    i += 1
+                continue
+
+            if i < fieldnum:
+                values[field.name] = field.get_value(dateval)
+                i += 1
+            elif i > fieldnum:
+                values[field.name] = field.get_min(dateval)
+                i += 1
+            else:
+                value = field.get_value(dateval)
+                maxval = field.get_max(dateval)
+                if value == maxval:
+                    fieldnum -= 1
+                    i -= 1
+                else:
+                    values[field.name] = value + 1
+                    i += 1
+
+        difference = datetime(**values) - dateval.replace(tzinfo=None)
+        dateval = datetime.fromtimestamp(
+            dateval.timestamp() + difference.total_seconds(), self.timezone
+        )
+        return dateval, fieldnum
+
+    def _set_field_value(self, dateval, fieldnum, new_value):
+        values = {}
+        for i, field in enumerate(self.fields):
+            if field.REAL:
+                if i < fieldnum:
+                    values[field.name] = field.get_value(dateval)
+                elif i > fieldnum:
+                    values[field.name] = field.get_min(dateval)
+                else:
+                    values[field.name] = new_value
+
+        return datetime(**values, tzinfo=self.timezone, fold=dateval.fold)
+
+    def get_next_fire_time(self, previous_fire_time, now):
+        if previous_fire_time:
+            start_date = min(now, previous_fire_time + timedelta(microseconds=1))
+            if start_date == previous_fire_time:
+                start_date += timedelta(microseconds=1)
+        else:
+            start_date = max(now, self.start_date) if self.start_date else now
+
+        fieldnum = 0
+        next_date = datetime_ceil(start_date).astimezone(self.timezone)
+        while 0 <= fieldnum < len(self.fields):
+            field = self.fields[fieldnum]
+            curr_value = field.get_value(next_date)
+            next_value = field.get_next_value(next_date)
+
+            if next_value is None:
+                # No valid value was found
+                next_date, fieldnum = self._increment_field_value(
+                    next_date, fieldnum - 1
+                )
+            elif next_value > curr_value:
+                # A valid, but higher than the starting value, was found
+                if field.REAL:
+                    next_date = self._set_field_value(next_date, fieldnum, next_value)
+                    fieldnum += 1
+                else:
+                    next_date, fieldnum = self._increment_field_value(
+                        next_date, fieldnum
+                    )
+            else:
+                # A valid value was found, no changes necessary
+                fieldnum += 1
+
+            # Return if the date has rolled past the end date
+            if self.end_date and next_date > self.end_date:
+                return None
+
+        if fieldnum >= 0:
+            next_date = self._apply_jitter(next_date, self.jitter, now)
+            return min(next_date, self.end_date) if self.end_date else next_date
+
+    def __getstate__(self):
+        return {
+            "version": 2,
+            "timezone": self.timezone,
+            "start_date": self.start_date,
+            "end_date": self.end_date,
+            "fields": self.fields,
+            "jitter": self.jitter,
+        }
+
+    def __setstate__(self, state):
+        # This is for compatibility with APScheduler 3.0.x
+        if isinstance(state, tuple):
+            state = state[1]
+
+        if state.get("version", 1) > 2:
+            raise ValueError(
+                f"Got serialized data for version {state['version']} of "
+                f"{self.__class__.__name__}, but only versions up to 2 can be handled"
+            )
+
+        self.timezone = astimezone(state["timezone"])
+        self.start_date = state["start_date"]
+        self.end_date = state["end_date"]
+        self.fields = state["fields"]
+        self.jitter = state.get("jitter")
+
+    def __str__(self):
+        options = [f"{f.name}='{f}'" for f in self.fields if not f.is_default]
+        return "cron[{}]".format(", ".join(options))
+
+    def __repr__(self):
+        options = [f"{f.name}='{f}'" for f in self.fields if not f.is_default]
+        if self.start_date:
+            options.append(f"start_date={datetime_repr(self.start_date)!r}")
+        if self.end_date:
+            options.append(f"end_date={datetime_repr(self.end_date)!r}")
+        if self.jitter:
+            options.append(f"jitter={self.jitter}")
+
+        return "<{} ({}, timezone='{}')>".format(
+            self.__class__.__name__,
+            ", ".join(options),
+            self.timezone,
+        )