impuls.tasks¶
- class impuls.tasks.AddEntity(entity: Entity, task_name: str = 'AddEntity')¶
Bases:
TaskAddEntity is a simple task that adds the provided entity to the DB.
- execute(r: TaskRuntime) None¶
execute process the data in the runtime environment.
As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.
- class impuls.tasks.ExecuteSQL(task_name: str, statement: str)¶
Bases:
TaskExecuteSQL task simply executes the provided statement.
- execute(r: TaskRuntime) None¶
execute process the data in the runtime environment.
As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.
- statement: str¶
- class impuls.tasks.GenerateTripHeadsign(name: str | None = None)¶
Bases:
TaskGenerateTripHeadsign is a task which fills the trip_headsign field for all
Tripswhich don’t already have a headsign.The generated headsign is the name of the last stop of the trip. This step will break if there are trips without any stops.
- execute(r: TaskRuntime) None¶
execute process the data in the runtime environment.
As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.
- class impuls.tasks.LoadBusManMDB(resource: str, agency_id: str, ignore_route_id: bool = False, ignore_stop_id: bool = False)¶
Bases:
TaskLoadBusManMDB loads data into the database from a BusMan MDB database.
Only the following entities are loaded:
Route,Stop,Calendar,TripandStopTime.Agencyhas to be manually curated beforehand (e.g. withAddEntitytask).The imported
Calendarentities will be empty after the import. Providing dates for calendars must be done manually afterwards.Most MDB databases seen in the wild have no stop positions. This step will set the latitude and longitude to 0. Further curation is usually necessary.
Parameters:
resource: name of the resource with MDB fileagency_id: ID of the manually curated Agencyignore_route_id: use route_short_name as the ID, instead of the BusMan internal IDignore_stop_id: use stop_code as the ID, instead of the BusMan internal ID
This task additionally requires mdbtools to be installed. This package is available in most package managers.
- execute(r: TaskRuntime) None¶
execute process the data in the runtime environment.
As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.
- load_calendars(mdb_path: Path, db: DBConnection) None¶
- load_routes(mdb_path: Path, db: DBConnection) None¶
- load_stop_times(mdb_path: Path, db: DBConnection) None¶
- load_stops(mdb_path: Path, db: DBConnection) None¶
- load_trips(mdb_path: Path, db: DBConnection) None¶
- agency_id: str¶
- ignore_route_id: bool¶
- ignore_stop_id: bool¶
- resource: str¶
- class impuls.tasks.LoadDB(resource: str)¶
Bases:
TaskLoadDB overwrites the runtime database by data from a databases in the provided resource. The database must have been created by Impuls, usually by the
SaveDBtask or by the runtime as theimpuls.dbfile in theworkspace_directory. Mismatched schemas will cause problems later down the line.- execute(r: TaskRuntime) None¶
execute process the data in the runtime environment.
As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.
- resource: str¶
- class impuls.tasks.LoadGTFS(resource: str)¶
Bases:
TaskLoadGTFS attempts to load GTFS data from a ZIP archive.
The loader only supports a subset of the GTFS schema. Due to implementation details, some invalid values may be accepted and some valid values may be rejected. In particular:
stops.txt location_types 3, 4, and 5 will cause an error,
parent_station may only refer to stop_ids defined in earlier lines,
agency_id in fare_attributes.txt is required if it’s present in agency.txt, even if there’s only one agency defined in the dataset.
- execute(r: TaskRuntime) None¶
execute process the data in the runtime environment.
As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.
- extract_gtfs_to(zip_path: str | PathLike[str], dir_path: str | PathLike[str]) None¶
- resource: str¶
- class impuls.tasks.ModifyRoutesFromCSV(resource: str, must_curate_all: bool = False, silent: bool = False)¶
Bases:
ModifyFromCSVModifyStopsFromCSV implements the
ModifyFromCSVstep forRoutes.The CSV file pointed by the provided resource must have a header row and must have a
route_idfield.The following fields may be present, and will be used to update the metadata of the matching
Route:route_short_nameroute_long_nameroute_typeroute_colorroute_text_colorroute_sort_order
- Parameters:
resource (str) – name of the resource with data (in CSV).
must_curate_all (bool) – if True, then this task will fail if some entities weren’t curated. Defaults to False.
silent (bool) – if True, doesn’t warn every time an entity from CSV isn’t found in the DB.
- static csv_column_mapping() Mapping[str, CSVFieldData]¶
csv_field_mapping returns the mapping from a CSV column name to metadata about the column - the corresponding entity field and a converter from string to a value of an appropriate type.
- static model_class() Type[Entity]¶
model_class returns the type from impuls.model whose entities are going to be modified
- static primary_key_csv_column() str¶
primary_key_csv_field returns the CSV column name which contains the primary key
- static query_for_all_ids() str¶
query_for_all_ids returns an SQL query string which returns all the known IDs of all entities of given type.
- class impuls.tasks.ModifyStopsFromCSV(resource: str, must_curate_all: bool = False, silent: bool = False)¶
Bases:
ModifyFromCSVModifyStopsFromCSV implements the
ModifyFromCSVstep forStops.The CSV file pointed by the provided resource must have a header row and must have a
stop_idfield.The following fields may be present, and will be used to update the metadata of the matching
Stop:stop_namestop_codestop_latstop_lonzone_idwheelchair_boardingplatform_code
- Parameters:
resource (str) – name of the resource with data (in CSV).
must_curate_all (bool) – if True, then this task will fail if some entities weren’t curated. Defaults to False.
silent (bool) – if True, doesn’t warn every time an entity from CSV isn’t found in the DB.
- static csv_column_mapping() Mapping[str, CSVFieldData]¶
csv_field_mapping returns the mapping from a CSV column name to metadata about the column - the corresponding entity field and a converter from string to a value of an appropriate type.
- static model_class() Type[Entity]¶
model_class returns the type from impuls.model whose entities are going to be modified
- static primary_key_csv_column() str¶
primary_key_csv_field returns the CSV column name which contains the primary key
- static query_for_all_ids() str¶
query_for_all_ids returns an SQL query string which returns all the known IDs of all entities of given type.
- class impuls.tasks.RemoveUnusedEntities¶
Bases:
TaskRemoveUnusedEntities removes entities from the database which serve no purpose:
Calendarswithout any active dates,Stops(withLocationType.STOP) with noStopTimes,Stations(withLocationType.STATION) with no childStops,Routeswith noTrips <impuls.model.Trip,
- drop_agencies_without_routes(db: DBConnection) None¶
- drop_calendars_without_dates(db: DBConnection) None¶
- drop_calendars_without_trips(db: DBConnection) None¶
- drop_routes_without_trips(db: DBConnection) None¶
- drop_stations_without_stops(db: DBConnection) None¶
- drop_stops_without_stop_times(db: DBConnection) None¶
- drop_trips_with_at_most_one_stop(db: DBConnection) None¶
- execute(r: TaskRuntime) None¶
execute process the data in the runtime environment.
As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.
- class impuls.tasks.SaveDB(to: str | PathLike[str])¶
Bases:
TaskSaveDB saves the contained data as-is to a database at a provided path.
- execute(r: TaskRuntime) None¶
execute process the data in the runtime environment.
As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.
- to: Path¶
- class impuls.tasks.SaveGTFS(headers: Mapping[str, Sequence[str]], target: str | PathLike[str], emit_empty_calendars: bool = False)¶
Bases:
TaskSaveGTFS exports the contained data to as a GTFS zip file at the provided path.
headersis a mapping from a GTFS table (excluding the .txt extension) to a sequence of colum names. SaveGTFS doesn’t validate the provided mapping, so the caller must ensure all required columns and files are provided.When
emit_empty_calendarsis set to True (default is False), empty calendars will still be generated in the calendar.txt file.- create_zip(dir: str | PathLike[str]) None¶
- dump_tables(gtfs_dir: str | PathLike[str], db: DBConnection) None¶
- execute(r: TaskRuntime) None¶
execute process the data in the runtime environment.
As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.
- emit_empty_calendars: bool¶
- headers: Mapping[str, Sequence[str]]¶
- target: Path¶
- class impuls.tasks.SplitTripLegs(route_selector: ~impuls.selector.Routes = Routes(agency_id=None, type=<Type.RAIL: 2>, ids=None), replacement_bus_short_name_pattern: ~re.Pattern[str] | None = None)¶
Bases:
TaskSplitTripLegs splits
Tripsinto multiple legs with different attributes, generating newRoutesandTransferson the go.This task can be customized by subclassing and overriding specific methods.
The default configuration is meant for separating out bus replacement services for trains. Bus replacement service departures are identified by
StopTime.platformset toBUS. Bus legs get assigned to a copy of the originalRoutewith the type updated and ID suffixed by_BUS.TIMEDtransfers are also generated. In this configurationdatais a boolean, flag set on bus departures.- arrival_only(stop_time: StopTime, previous_data: Any) StopTime¶
Creates a copy of a
StopTimefor an arrival-only, last stop of a trip. The second argument is the return value ofget_departure_data()of the precedingStopTime. Seecompute_legs()for details.The default behavior is to copy the stop_time, set its
departure_timeto be the same as thearrival_time, and ensure theplatformis set to"BUS"if and only ifprevious_datais truthy.
- compute_legs(original_trip: Trip, stop_times: list[StopTime]) list[tuple[list[StopTime], Any]]¶
Splits the provided list of
StopTimesinto multiple legs.The default algorithm keeps track of the return value of
get_departure_data(), and creates new legs when that value changes. This firstStopTimewith newdatais assumed to be belonging to both legs - the result ofarrival_only()is appended to the previous leg, while the result ofdeparture_only()is appended to the current leg. As en example, the following stop_times:StopTime(0, data=False)StopTime(1, data=False)StopTime(2, data=True)StopTime(3, data=True)StopTime(4, data=False)StopTime(5, data=False)
Are separated into the following legs:
Leg 0,
data=False:StopTime(0)StopTime(1)arrival_only(StopTime(2), data=False)
Leg 1,
data=True:departure_only(StopTime(2), data=True)StopTime(3)arrival_only(StopTime(4), data=True)
Leg 2,
data=False:departure_only(StopTime(4), data=False)StopTime(5)
As a special case, if
whole_trip_is_replacement_bus()returns true, this function short-circuits to returning[(stop_times, True)].
- departure_only(stop_time: StopTime, current_data: Any) StopTime¶
Creates a copy of a
StopTimefor a departure-only, first stop of a trip. The second argument is the return value ofget_departure_data()of the thisStopTime. Seecompute_legs()for details.The default behavior is to copy the stop_time, set its
arrival_timeto be the same as thedeparture_time, and ensure theplatformis set to"BUS"if and only ifcurrent_datais truthy.
- final execute(r: TaskRuntime) None¶
execute process the data in the runtime environment.
As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.
- get_departure_data(stop_time: StopTime) Any¶
Extracts leg-identifying data of the departure represented by the provided
StopTime. The default behavior is to flag bus replacement service by returningstop_time.platform == "BUS".
- get_transfer(trip_a: Trip, trip_b: Trip, transfer_stop_id: str) Transfer | None¶
Creates a
Transferobject linking to legs of a trip. Defaults to creating aTIMEDtransfer.
- process_trip(trip_id: str, db: DBConnection) None¶
Called by
execute()on every selected trip. Default is to retrieve the objects from the database and then callcompute_legs()and eitherupdate_trip_with_single_leg()orreplace_trip_by_legs(), depending if there is one or more legs.
- replace_trip_by_legs(original_trip: Trip, legs: list[tuple[list[StopTime], Any]], db: DBConnection) None¶
Replaces an existing
Tripby multiple instances, as represented bylegs. Called byprocess_trip()for trips with multiple legs.The default implementation removes the
original_trip, and then for every leg:creates a new trip for each leg, as modified by
update_trip(), with the ID suffixed by _0, _1, _2, …;creates
Transfersbetween every leg, as returned bycreate_transfer().
- save_bus_replacement_route_in_db(original_route_id: str, new_route_id: str, db: DBConnection) None¶
Saves a bus replacement route in the
database. The default behavior is to create a copy of the originalRoute, callupdate_bus_replacement_route(), followed bydb.create.
- select_trip_ids(db: DBConnection) Iterable[str]¶
Selects which trips should be processed by this step. Defaults to all trips belonging to routes selected by
route_selector.
- update_bus_replacement_route(route: Route) None¶
Updates the attributes of a bus-replacement route. Defaults to setting the
typetoBUS.
- update_trip(trip: Trip, data: Any, db: DBConnection) None¶
Modifies the attributes of a
Triprepresenting a single leg. Called byupdate_trip_with_single_leg()andreplace_trip_by_legs().The default behavior depends on the value of
data. If it is truthy, the trip’sroute_idis suffixed by_BUS, and a new route is created by callingsave_bus_replacement_route_in_db()(if it was not created before, as indicated by theadded_routesset). Otherwise, the trip is left as-is.
- update_trip_with_single_leg(trip: Trip, data: Any, db: DBConnection) None¶
Called by
process_trip()for trips with a single leg. The default implementation simply callsupdate_trip()followed bydb.updateifdatais truthy.
- whole_trip_is_replacement_bus(trip: Trip) bool¶
Returns True if the whole
Tripis operated by a replacement bus service. Defaults to searching thereplacement_bus_short_name_patternin theTrip.short_name. Ifreplacement_bus_short_name_patternisNone, returnsFalse.
- added_routes: set[str]¶
- replacement_bus_short_name_pattern: Pattern[str] | None¶
Trip.short_namepattern indicating that the whole trip is operated by a bus replacement service.
- class impuls.tasks.TruncateCalendars(target: EmptyDateRange | InfiniteDateRange | LeftUnboundedDateRange | RightUnboundedDateRange | BoundedDateRange, fail_on_empty: bool = True)¶
Bases:
TaskTruncateCalendars removes any services beyond the provided range.
For simplicity, all
Calendarsare converted to exception-based (all active dates represented byCalendarException).- apply_changes(db: DBConnection) None¶
- check_if_empty() None¶
- clear_state() None¶
- compute_changes(db: DBConnection) None¶
- compute_truncated_days_of(calendar: Calendar, db: DBConnection) set[Date]¶
- drop_calendars(db: DBConnection) None¶
- execute(r: TaskRuntime) None¶
execute process the data in the runtime environment.
As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.
- make_all_calendars_use_exceptions(db: DBConnection) None¶
- set_exceptions_on_calendars(db: DBConnection) None¶
- fail_on_empty: bool¶