impuls.tasks

class impuls.tasks.AddEntity(entity: Entity, task_name: str = 'AddEntity')

Bases: Task

AddEntity is a simple task that adds the provided entity to the DB.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

entity: Entity
class impuls.tasks.AssignDirections(outbound_stop_pairs: Iterable[tuple[str, str]], routes: Routes = Routes(agency_id=None, type=None, ids=None), overwrite: bool = False, task_name: str | None = None)

Bases: Task

AssignDirections sets Trip.direction based on a list of stop pairs defining the outbound direction.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

find_direction_of_trip(db: DBConnection, trip_id: str) tuple[Literal[0, 1], str]

Attempts to assign a direction to the provided trip.

Returns a tuple of the assigned direction and the trip_id, so that the returned tuple can be directly used in an UPDATE statement.

If the direction can’t be assigned, raises DataError.

get_trip_ids_to_process(db: DBConnection) list[str]

Returns a list of all trip_ids for which this step should run.

This includes all trips belonging to the routes selected by the configured selector, without any direction (unless overwrite <impuls.tasks.AssignDirections.overwrite is set).

get_unambiguous_stop_sequences(db: DBConnection, trip_id: str) dict[str, int]

Returns a mapping of stop_id to stop_sequence for the provided trip, excluding any ambiguous stops - stops where the trip calls at more than once.

outbound_stop_pairs: list[tuple[str, str]]

Stop ID pairs defining the outbound direction.

If a trip stops at the first stop of a pair before stopping at the second stop, it will be assigned to the outbound direction. Otherwise, if a trip stops at the second stop before stopping at the first stop, it will be assigned the inbound direction.

Only unambiguous stops (where the trip calls exactly once) are considered.

All selected trips must match at least one pair, otherwise this step fails with a MultipleDataErrors.

This might be easier explained by the algorithm’s pseudo code:

def assign_direction_id(trip_stop_id_to_stop_sequence: Mapping[str, int]) -> Direction:
    for stop_a, stop_b in outbound_stop_pairs:
        idx_a = trip_stop_id_to_stop_sequence.get(stop_a)
        idx_b = trip_stop_id_to_stop_sequence.get(stop_b)
        if idx_a is not None and idx_b is not None:
            return Direction.OUTBOUND if idx_a < idx_b else Direction.INBOUND
    raise DataError(...)
overwrite: bool

Should the step overwrite existing directions? Defaults to False, preserving any existing Trip.direction.

routes: Routes

Selects routes for which direction assignment should run. Defaults to all routes.

class impuls.tasks.ExecuteSQL(task_name: str, statement: str)

Bases: Task

ExecuteSQL task simply executes the provided statement.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

statement: str
class impuls.tasks.ExtendCalendars(duration_days: int = 30, start_date: Date | None = None, fallback_weekdays: Sequence[Sequence[int]] = [[1, 2, 3, 4], [2, 3, 0, 4], [1, 3, 0, 4], [1, 2, 0, 4], [3, 2, 1, 0], [6], [5]], holidays: Container[date] | None = None)

Bases: Task

ExtendCalendars ensures the GTFS has calendar data for duration_days starting from start_date.

This is done by copying the services from the latest covered Monday over every Monday without services, and so on for every weekday. fallback_weekdays allow alternative weekdays to be used as the source for copying over a specific weekday.

Holidays are never used as the source for copying schedules, and Sunday schedules are used when a target day is a holiday. A set of holidays dates must be explicitly provided in the constructor.

As a side effect, all Calendars are converted to exception-based (all active dates represented by CalendarException).

compute_coverages() None
execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

extend_calendars() None
fill_extend_template_with_fallback_days() None
find_base_extend_template() None
find_extend_template() None
find_fallback_day_for_weekday(weekday: int) Date | None
is_extension_necessary() bool
is_holiday(day: Date) bool
load_calendar(db: DBConnection) None
update_calendars(db: DBConnection) None
DEFAULT_FALLBACK_WEEKDAYS: Final[Sequence[Sequence[int]]] = [[1, 2, 3, 4], [2, 3, 0, 4], [1, 3, 0, 4], [1, 2, 0, 4], [3, 2, 1, 0], [6], [5]]
WEEKDAY_NAMES: Final[Sequence[str]] = ('mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun')
duration_days: int

Expected period of validity of the GTFS starting from the computed start date. Defaults to 30.

fallback_weekdays: Sequence[Sequence[int]]

Lookup table for alternative weekdays which can be used to extend coverage.

For example, the algorithm copies the latest Monday calendars over every Monday which should, but doesn’t have any coverage. However, if the dataset contains absolutely no usable Mondays with service, the first weekday with service in fallback_weekdays[0] will be used as the sources when extending calendars over uncovered Mondays.

Defaults to the following fallbacks (see DEFAULT_FALLBACK_WEEKDAYS):

  • Monday: fallback to Tuesday, Wednesday, Thursday or Friday;

  • Tuesday: fallback to Wednesday, Thursday, Monday or Friday;

  • Wednesday: fallback to Tuesday, Thursday, Monday or Friday;

  • Thursday: fallback to Wednesday, Tuesday, Monday or Friday;

  • Friday: fallback to Thursday, Wednesday, Tuesday or Monday;

  • Saturday: fallback to Sunday;

  • Sunday: fallback to Saturday.

holidays: Container[date]

Set of days which are never used as the source for copying services, and where Sunday schedules will be used when extending coverage.

A good idea is to provide an object from the holidays module.

When omitted in the constructor, defaults to a new, empty set. This may be exploited by sub-classes, which lazily load holidays from a resource, like ExtendCalendarsFromPolishExceptions.

Note that extra logic may be implemented by overriding is_holiday(), which defaults to a simple day in self.holidays.

start_date: Date | None

The start day of the expected coverage. If None, defaults to the earliest date covered. Mostly useful to be overridden by Date.today().

class impuls.tasks.ExtendCalendarsFromPolishExceptions(resource_name: str, region: PolishRegion, duration_days: int = 30, start_date: Date | None = None, fallback_weekdays: Sequence[Sequence[int]] = [[1, 2, 3, 4], [2, 3, 0, 4], [1, 3, 0, 4], [1, 2, 0, 4], [3, 2, 1, 0], [6], [5]])

Bases: ExtendCalendars

ExtendCalendarsFromPolishExceptions is an extension of ExtendCalendars which lazily-loads holidays from impuls.tools.polish_calendar_exceptions.RESOURCE.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

load_holidays(r: ManagedResource) None
class impuls.tasks.GenerateTripHeadsign(name: str | None = None)

Bases: Task

GenerateTripHeadsign is a task which fills the trip_headsign field for all Trips which don’t already have a headsign.

The generated headsign is the name of the last stop of the trip. This step will break if there are trips without any stops.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

class impuls.tasks.LoadBusManMDB(resource: str, agency_id: str, ignore_route_id: bool = False, ignore_stop_id: bool = False, save_blocks: bool = False)

Bases: Task

LoadBusManMDB loads data into the database from a BusMan MDB database.

Only the following entities are loaded: Route, Stop, Calendar, Trip and StopTime.

Agency has to be manually curated beforehand (e.g. with AddEntity task).

The imported Calendar entities will be empty after the import. Providing dates for calendars must be done manually afterwards.

Most MDB databases seen in the wild have no stop positions. This step will set the latitude and longitude to 0. Further curation is usually necessary.

Parameters:

  • resource: name of the resource with MDB file

  • agency_id: ID of the manually curated Agency

  • ignore_route_id: use route_short_name as the ID, instead of the BusMan internal ID

  • ignore_stop_id: use stop_code as the ID, instead of the BusMan internal ID

  • save_blocks: use the tTeams table to fill block_id and block_short_name

This task additionally requires mdbtools to be installed. This package is available in most package managers.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

load_calendars(mdb_path: Path, db: DBConnection) None
load_routes(mdb_path: Path, db: DBConnection) None
load_stop_times(mdb_path: Path, db: DBConnection) None
load_stops(mdb_path: Path, db: DBConnection) None
load_trips(mdb_path: Path, db: DBConnection) None
agency_id: str
ignore_route_id: bool
ignore_stop_id: bool
resource: str
save_blocks: bool
class impuls.tasks.LoadDB(resource: str)

Bases: Task

LoadDB overwrites the runtime database by data from a databases in the provided resource. The database must have been created by Impuls, usually by the SaveDB task or by the runtime as the impuls.db file in the workspace_directory. Mismatched schemas will cause problems later down the line.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

resource: str
class impuls.tasks.LoadGTFS(resource: str, extra_fields: bool = False, extra_files: Sequence[str] | None = None)

Bases: Task

LoadGTFS attempts to load GTFS data from a ZIP archive.

The loader only supports a subset of the GTFS schema. Due to implementation details, some invalid values may be accepted and some valid values may be rejected. In particular:

  • stops.txt location_types 3, 4, and 5 will cause an error,

  • parent_station may only refer to stop_ids defined in earlier lines,

  • agency_id in fare_attributes.txt is required if it’s present in agency.txt, even if there’s only one agency defined in the dataset.

The additional parameter, extra_fields, controls how unrecognized columns are processed. Default (False) is to ignore them. If set to True, extra columns will be encoded to a string-to-string JSON object and placed in the extra_fields_json column, if that is available.

extra_files is a list of extra files (including any extensions!) to be loaded using the generic ExtraTableRow schema. Note that ExtraTableRow.table_name will be saved as provided in the extra_files list, that is including any filename extensions.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

extract_gtfs_to(zip_path: str | PathLike[str], dir_path: str | PathLike[str]) None
extra_fields: bool
extra_files: Sequence[str]
resource: str
class impuls.tasks.ModifyRoutesFromCSV(resource: str, must_curate_all: bool = False, silent: bool = False)

Bases: ModifyFromCSV

ModifyStopsFromCSV implements the ModifyFromCSV step for Routes.

The CSV file pointed by the provided resource must have a header row and must have a route_id field.

The following fields may be present, and will be used to update the metadata of the matching Route:

  • route_short_name

  • route_long_name

  • route_type

  • route_color

  • route_text_color

  • route_sort_order

Parameters:
  • resource (str) – name of the resource with data (in CSV).

  • must_curate_all (bool) – if True, then this task will fail if some entities weren’t curated. Defaults to False.

  • silent (bool) – if True, doesn’t warn every time an entity from CSV isn’t found in the DB.

static csv_column_mapping() Mapping[str, CSVFieldData]

csv_field_mapping returns the mapping from a CSV column name to metadata about the column - the corresponding entity field and a converter from string to a value of an appropriate type.

static model_class() Type[Entity]

model_class returns the type from impuls.model whose entities are going to be modified

static primary_key_csv_column() str

primary_key_csv_field returns the CSV column name which contains the primary key

static query_for_all_ids() str

query_for_all_ids returns an SQL query string which returns all the known IDs of all entities of given type.

class impuls.tasks.ModifyStopsFromCSV(resource: str, must_curate_all: bool = False, silent: bool = False)

Bases: ModifyFromCSV

ModifyStopsFromCSV implements the ModifyFromCSV step for Stops.

The CSV file pointed by the provided resource must have a header row and must have a stop_id field.

The following fields may be present, and will be used to update the metadata of the matching Stop:

  • stop_name

  • stop_code

  • stop_lat

  • stop_lon

  • zone_id

  • wheelchair_boarding

  • platform_code

Parameters:
  • resource (str) – name of the resource with data (in CSV).

  • must_curate_all (bool) – if True, then this task will fail if some entities weren’t curated. Defaults to False.

  • silent (bool) – if True, doesn’t warn every time an entity from CSV isn’t found in the DB.

static csv_column_mapping() Mapping[str, CSVFieldData]

csv_field_mapping returns the mapping from a CSV column name to metadata about the column - the corresponding entity field and a converter from string to a value of an appropriate type.

static model_class() Type[Entity]

model_class returns the type from impuls.model whose entities are going to be modified

static primary_key_csv_column() str

primary_key_csv_field returns the CSV column name which contains the primary key

static query_for_all_ids() str

query_for_all_ids returns an SQL query string which returns all the known IDs of all entities of given type.

class impuls.tasks.RemoveUnusedEntities

Bases: Task

RemoveUnusedEntities removes entities from the database which serve no purpose:

drop_agencies_without_routes(db: DBConnection) None
drop_calendars_without_dates(db: DBConnection) None
drop_calendars_without_trips(db: DBConnection) None
drop_routes_without_trips(db: DBConnection) None
drop_stations_without_stops(db: DBConnection) None
drop_stops_without_stop_times(db: DBConnection) None
drop_trips_with_at_most_one_stop(db: DBConnection) None
execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

class impuls.tasks.SaveDB(to: str | PathLike[str])

Bases: Task

SaveDB saves the contained data as-is to a database at a provided path.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

to: Path
class impuls.tasks.SaveGTFS(headers: Mapping[str, Sequence[str]], target: str | PathLike[str], emit_empty_calendars: bool = False, ensure_order: bool = False)

Bases: Task

SaveGTFS exports the contained data to as a GTFS zip file at the provided path.

headers is a mapping from a GTFS table to a sequence of column names.

All keys must include the .txt extension. If a GTFS table exists both as a standalone SQL table and as a ExtraTableRow, the former will always be preferred. For example:

headers = {
    "agency.txt": ("agency_id", "agency_name", "agency_url", "agency_timezone", "agency_lang"),
    "routes.txt": ("agency_id", "route_id", "route_short_name", "route_long_name", "route_type"),
    "extra_file.txt": ("custom_column_1", "custom_column_2"),
}

SaveGTFS doesn’t validate the provided mapping, so the caller must ensure all required columns and files are provided.

The table names are used as file names and must not contain any path separators, or other disallowed characters/character sequences by the current OS and file system.

When emit_empty_calendars is set to True (default is False), empty calendars will still be generated in the calendar.txt file.

When ensure_order is set to True (most) output tables will be sorted by their primary key. This might slow down the task. The default is False, which saves tables in arbitrary order.

create_zip(dir: str | PathLike[str]) None
dump_tables(gtfs_dir: str | PathLike[str], db: DBConnection) None
execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

emit_empty_calendars: bool
ensure_order: bool
headers: Mapping[str, Sequence[str]]
target: Path
class impuls.tasks.SimplifyCalendars(generate_new_ids: bool = True, id_prefix: str = '')

Bases: Task

SimplifyCalendars removes duplicate Calendars, so that no two Calendars have the same active day sets.

If generate_new_ids is set (the default) the step will also overwrite the used calendar_id values to sequential numbers, prefixed by id_prefix (which defaults to an empty string).

compute_day_sets(db: DBConnection) dict[str, set[Date]]
execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

fold_duplicate_calendars(day_sets: Mapping[str, Iterable[Date]], db: DBConnection) list[str]
reassign_calendar_ids(ids: Iterable[str], db: DBConnection) None
generate_new_ids: bool
id_prefix: str
class impuls.tasks.SplitTripLegs(route_selector: ~impuls.selector.Routes = Routes(agency_id=None, type=<Type.RAIL: 2>, ids=None), replacement_bus_short_name_pattern: ~re.Pattern[str] | None = None)

Bases: Task

SplitTripLegs splits Trips into multiple legs with different attributes, generating new Routes and Transfers on the go.

This task can be customized by subclassing and overriding specific methods.

The default configuration is meant for separating out bus replacement services for trains. Bus replacement service departures are identified by StopTime.platform set to BUS. Bus legs get assigned to a copy of the original Route with the type updated and ID suffixed by _BUS. TIMED transfers are also generated. In this configuration data is a boolean, flag set on bus departures.

arrival_only(stop_time: StopTime, previous_data: Any) StopTime

Creates a copy of a StopTime for an arrival-only, last stop of a trip. The second argument is the return value of get_departure_data() of the preceding StopTime. See compute_legs() for details.

The default behavior is to copy the stop_time, set its departure_time to be the same as the arrival_time, and ensure the platform is set to "BUS" if and only if previous_data is truthy.

compute_legs(original_trip: Trip, stop_times: list[StopTime]) list[tuple[list[StopTime], Any]]

Splits the provided list of StopTimes into multiple legs.

The default algorithm keeps track of the return value of get_departure_data(), and creates new legs when that value changes. This first StopTime with new data is assumed to be belonging to both legs - the result of arrival_only() is appended to the previous leg, while the result of departure_only() is appended to the current leg. As en example, the following stop_times:

  • StopTime(0, data=False)

  • StopTime(1, data=False)

  • StopTime(2, data=True)

  • StopTime(3, data=True)

  • StopTime(4, data=False)

  • StopTime(5, data=False)

Are separated into the following legs:

  • Leg 0, data=False:

    • StopTime(0)

    • StopTime(1)

    • arrival_only(StopTime(2), data=False)

  • Leg 1, data=True:

    • departure_only(StopTime(2), data=True)

    • StopTime(3)

    • arrival_only(StopTime(4), data=True)

  • Leg 2, data=False:

    • departure_only(StopTime(4), data=False)

    • StopTime(5)

As a special case, if whole_trip_is_replacement_bus() returns true, this function short-circuits to returning [(stop_times, True)].

departure_only(stop_time: StopTime, current_data: Any) StopTime

Creates a copy of a StopTime for a departure-only, first stop of a trip. The second argument is the return value of get_departure_data() of the this StopTime. See compute_legs() for details.

The default behavior is to copy the stop_time, set its arrival_time to be the same as the departure_time, and ensure the platform is set to "BUS" if and only if current_data is truthy.

final execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

get_departure_data(stop_time: StopTime) Any

Extracts leg-identifying data of the departure represented by the provided StopTime. The default behavior is to flag bus replacement service by returning stop_time.platform == "BUS".

get_transfer(trip_a: Trip, trip_b: Trip, transfer_stop_id: str) Transfer | None

Creates a Transfer object linking to legs of a trip. Defaults to creating a TIMED transfer.

process_trip(trip_id: str, db: DBConnection) None

Called by execute() on every selected trip. Default is to retrieve the objects from the database and then call compute_legs() and either update_trip_with_single_leg() or replace_trip_by_legs(), depending if there is one or more legs.

replace_trip_by_legs(original_trip: Trip, legs: list[tuple[list[StopTime], Any]], db: DBConnection) None

Replaces an existing Trip by multiple instances, as represented by legs. Called by process_trip() for trips with multiple legs.

The default implementation removes the original_trip, and then for every leg:

  • creates a new trip for each leg, as modified by update_trip(), with the ID suffixed by _0, _1, _2, …;

  • re-inserts StopTimes with only their trip_id changed;

  • creates Transfers between every leg, as returned by create_transfer().

save_bus_replacement_route_in_db(original_route_id: str, new_route_id: str, db: DBConnection) None

Saves a bus replacement route in the database. The default behavior is to create a copy of the original Route, call update_bus_replacement_route(), followed by db.create.

select_trip_ids(db: DBConnection) Iterable[str]

Selects which trips should be processed by this step. Defaults to all trips belonging to routes selected by route_selector.

update_bus_replacement_route(route: Route) None

Updates the attributes of a bus-replacement route. Defaults to setting the type to BUS.

update_trip(trip: Trip, data: Any, db: DBConnection) None

Modifies the attributes of a Trip representing a single leg. Called by update_trip_with_single_leg() and replace_trip_by_legs().

The default behavior depends on the value of data. If it is truthy, the trip’s route_id is suffixed by _BUS, and a new route is created by calling save_bus_replacement_route_in_db() (if it was not created before, as indicated by the added_routes set). Otherwise, the trip is left as-is.

update_trip_with_single_leg(trip: Trip, data: Any, db: DBConnection) None

Called by process_trip() for trips with a single leg. The default implementation simply calls update_trip() followed by db.update if data is truthy.

whole_trip_is_replacement_bus(trip: Trip) bool

Returns True if the whole Trip is operated by a replacement bus service. Defaults to searching the replacement_bus_short_name_pattern in the Trip.short_name. If replacement_bus_short_name_pattern is None, returns False.

Leg

alias of tuple[list[StopTime], Any]

added_routes: set[str]
replacement_bus_short_name_pattern: Pattern[str] | None

Trip.short_name pattern indicating that the whole trip is operated by a bus replacement service.

route_selector: Routes

Selects which routes’ trips should be separated by this step.

class impuls.tasks.TruncateCalendars(target: ForwardRef('EmptyDateRange') | ForwardRef('InfiniteDateRange') | ForwardRef('LeftUnboundedDateRange') | ForwardRef('RightUnboundedDateRange') | ForwardRef('BoundedDateRange'), fail_on_empty: bool = True)

Bases: Task

TruncateCalendars removes any services beyond the provided range.

For simplicity, all Calendars are converted to exception-based (all active dates represented by CalendarException).

apply_changes(db: DBConnection) None
check_if_empty() None
clear_state() None
compute_changes(db: DBConnection) None
compute_truncated_days_of(calendar: Calendar, db: DBConnection) set[Date]
drop_calendars(db: DBConnection) None
execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

make_all_calendars_use_exceptions(db: DBConnection) None
set_exceptions_on_calendars(db: DBConnection) None
fail_on_empty: bool
target: ForwardRef('EmptyDateRange') | ForwardRef('InfiniteDateRange') | ForwardRef('LeftUnboundedDateRange') | ForwardRef('RightUnboundedDateRange') | ForwardRef('BoundedDateRange')