impuls.tasks

class impuls.tasks.AddEntity(entity: Entity, task_name: str = 'AddEntity')

Bases: Task

AddEntity is a simple task that adds the provided entity to the DB.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

entity: Entity
class impuls.tasks.ExecuteSQL(task_name: str, statement: str)

Bases: Task

ExecuteSQL task simply executes the provided statement.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

statement: str
class impuls.tasks.GenerateTripHeadsign(name: str | None = None)

Bases: Task

GenerateTripHeadsign is a task which fills the trip_headsign field for all Trips which don’t already have a headsign.

The generated headsign is the name of the last stop of the trip. This step will break if there are trips without any stops.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

class impuls.tasks.LoadBusManMDB(resource: str, agency_id: str, ignore_route_id: bool = False, ignore_stop_id: bool = False)

Bases: Task

LoadBusManMDB loads data into the database from a BusMan MDB database.

Only the following entities are loaded: Route, Stop, Calendar, Trip and StopTime.

Agency has to be manually curated beforehand (e.g. with AddEntity task).

The imported Calendar entities will be empty after the import. Providing dates for calendars must be done manually afterwards.

Most MDB databases seen in the wild have no stop positions. This step will set the latitude and longitude to 0. Further curation is usually necessary.

Parameters:

  • resource: name of the resource with MDB file

  • agency_id: ID of the manually curated Agency

  • ignore_route_id: use route_short_name as the ID, instead of the BusMan internal ID

  • ignore_stop_id: use stop_code as the ID, instead of the BusMan internal ID

This task additionally requires mdbtools to be installed. This package is available in most package managers.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

load_calendars(mdb_path: Path, db: DBConnection) None
load_routes(mdb_path: Path, db: DBConnection) None
load_stop_times(mdb_path: Path, db: DBConnection) None
load_stops(mdb_path: Path, db: DBConnection) None
load_trips(mdb_path: Path, db: DBConnection) None
agency_id: str
ignore_route_id: bool
ignore_stop_id: bool
resource: str
class impuls.tasks.LoadDB(resource: str)

Bases: Task

LoadDB overwrites the runtime database by data from a databases in the provided resource. The database must have been created by Impuls, usually by the SaveDB task or by the runtime as the impuls.db file in the workspace_directory. Mismatched schemas will cause problems later down the line.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

resource: str
class impuls.tasks.LoadGTFS(resource: str)

Bases: Task

LoadGTFS attempts to load GTFS data from a ZIP archive.

The loader only supports a subset of the GTFS schema. Due to implementation details, some invalid values may be accepted and some valid values may be rejected. In particular:

  • stops.txt location_types 3, 4, and 5 will cause an error,

  • parent_station may only refer to stop_ids defined in earlier lines,

  • agency_id in fare_attributes.txt is required if it’s present in agency.txt, even if there’s only one agency defined in the dataset.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

extract_gtfs_to(zip_path: str | PathLike[str], dir_path: str | PathLike[str]) None
resource: str
class impuls.tasks.ModifyRoutesFromCSV(resource: str, must_curate_all: bool = False, silent: bool = False)

Bases: ModifyFromCSV

ModifyStopsFromCSV implements the ModifyFromCSV step for Routes.

The CSV file pointed by the provided resource must have a header row and must have a route_id field.

The following fields may be present, and will be used to update the metadata of the matching Route:

  • route_short_name

  • route_long_name

  • route_type

  • route_color

  • route_text_color

  • route_sort_order

Parameters:
  • resource (str) – name of the resource with data (in CSV).

  • must_curate_all (bool) – if True, then this task will fail if some entities weren’t curated. Defaults to False.

  • silent (bool) – if True, doesn’t warn every time an entity from CSV isn’t found in the DB.

static csv_column_mapping() Mapping[str, CSVFieldData]

csv_field_mapping returns the mapping from a CSV column name to metadata about the column - the corresponding entity field and a converter from string to a value of an appropriate type.

static model_class() Type[Entity]

model_class returns the type from impuls.model whose entities are going to be modified

static primary_key_csv_column() str

primary_key_csv_field returns the CSV column name which contains the primary key

static query_for_all_ids() str

query_for_all_ids returns an SQL query string which returns all the known IDs of all entities of given type.

class impuls.tasks.ModifyStopsFromCSV(resource: str, must_curate_all: bool = False, silent: bool = False)

Bases: ModifyFromCSV

ModifyStopsFromCSV implements the ModifyFromCSV step for Stops.

The CSV file pointed by the provided resource must have a header row and must have a stop_id field.

The following fields may be present, and will be used to update the metadata of the matching Stop:

  • stop_name

  • stop_code

  • stop_lat

  • stop_lon

  • zone_id

  • wheelchair_boarding

  • platform_code

Parameters:
  • resource (str) – name of the resource with data (in CSV).

  • must_curate_all (bool) – if True, then this task will fail if some entities weren’t curated. Defaults to False.

  • silent (bool) – if True, doesn’t warn every time an entity from CSV isn’t found in the DB.

static csv_column_mapping() Mapping[str, CSVFieldData]

csv_field_mapping returns the mapping from a CSV column name to metadata about the column - the corresponding entity field and a converter from string to a value of an appropriate type.

static model_class() Type[Entity]

model_class returns the type from impuls.model whose entities are going to be modified

static primary_key_csv_column() str

primary_key_csv_field returns the CSV column name which contains the primary key

static query_for_all_ids() str

query_for_all_ids returns an SQL query string which returns all the known IDs of all entities of given type.

class impuls.tasks.RemoveUnusedEntities

Bases: Task

RemoveUnusedEntities removes entities from the database which serve no purpose:

drop_agencies_without_routes(db: DBConnection) None
drop_calendars_without_dates(db: DBConnection) None
drop_calendars_without_trips(db: DBConnection) None
drop_routes_without_trips(db: DBConnection) None
drop_stations_without_stops(db: DBConnection) None
drop_stops_without_stop_times(db: DBConnection) None
drop_trips_with_at_most_one_stop(db: DBConnection) None
execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

class impuls.tasks.SaveDB(to: str | PathLike[str])

Bases: Task

SaveDB saves the contained data as-is to a database at a provided path.

execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

to: Path
class impuls.tasks.SaveGTFS(headers: Mapping[str, Sequence[str]], target: str | PathLike[str], emit_empty_calendars: bool = False)

Bases: Task

SaveGTFS exports the contained data to as a GTFS zip file at the provided path.

headers is a mapping from a GTFS table (excluding the .txt extension) to a sequence of colum names. SaveGTFS doesn’t validate the provided mapping, so the caller must ensure all required columns and files are provided.

When emit_empty_calendars is set to True (default is False), empty calendars will still be generated in the calendar.txt file.

create_zip(dir: str | PathLike[str]) None
dump_tables(gtfs_dir: str | PathLike[str], db: DBConnection) None
execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

emit_empty_calendars: bool
headers: Mapping[str, Sequence[str]]
target: Path
class impuls.tasks.SplitTripLegs(route_selector: ~impuls.selector.Routes = Routes(agency_id=None, type=<Type.RAIL: 2>, ids=None), replacement_bus_short_name_pattern: ~re.Pattern[str] | None = None)

Bases: Task

SplitTripLegs splits Trips into multiple legs with different attributes, generating new Routes and Transfers on the go.

This task can be customized by subclassing and overriding specific methods.

The default configuration is meant for separating out bus replacement services for trains. Bus replacement service departures are identified by StopTime.platform set to BUS. Bus legs get assigned to a copy of the original Route with the type updated and ID suffixed by _BUS. TIMED transfers are also generated. In this configuration data is a boolean, flag set on bus departures.

arrival_only(stop_time: StopTime, previous_data: Any) StopTime

Creates a copy of a StopTime for an arrival-only, last stop of a trip. The second argument is the return value of get_departure_data() of the preceding StopTime. See compute_legs() for details.

The default behavior is to copy the stop_time, set its departure_time to be the same as the arrival_time, and ensure the platform is set to "BUS" if and only if previous_data is truthy.

compute_legs(original_trip: Trip, stop_times: list[StopTime]) list[tuple[list[StopTime], Any]]

Splits the provided list of StopTimes into multiple legs.

The default algorithm keeps track of the return value of get_departure_data(), and creates new legs when that value changes. This first StopTime with new data is assumed to be belonging to both legs - the result of arrival_only() is appended to the previous leg, while the result of departure_only() is appended to the current leg. As en example, the following stop_times:

  • StopTime(0, data=False)

  • StopTime(1, data=False)

  • StopTime(2, data=True)

  • StopTime(3, data=True)

  • StopTime(4, data=False)

  • StopTime(5, data=False)

Are separated into the following legs:

  • Leg 0, data=False:

    • StopTime(0)

    • StopTime(1)

    • arrival_only(StopTime(2), data=False)

  • Leg 1, data=True:

    • departure_only(StopTime(2), data=True)

    • StopTime(3)

    • arrival_only(StopTime(4), data=True)

  • Leg 2, data=False:

    • departure_only(StopTime(4), data=False)

    • StopTime(5)

As a special case, if whole_trip_is_replacement_bus() returns true, this function short-circuits to returning [(stop_times, True)].

departure_only(stop_time: StopTime, current_data: Any) StopTime

Creates a copy of a StopTime for a departure-only, first stop of a trip. The second argument is the return value of get_departure_data() of the this StopTime. See compute_legs() for details.

The default behavior is to copy the stop_time, set its arrival_time to be the same as the departure_time, and ensure the platform is set to "BUS" if and only if current_data is truthy.

final execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

get_departure_data(stop_time: StopTime) Any

Extracts leg-identifying data of the departure represented by the provided StopTime. The default behavior is to flag bus replacement service by returning stop_time.platform == "BUS".

get_transfer(trip_a: Trip, trip_b: Trip, transfer_stop_id: str) Transfer | None

Creates a Transfer object linking to legs of a trip. Defaults to creating a TIMED transfer.

process_trip(trip_id: str, db: DBConnection) None

Called by execute() on every selected trip. Default is to retrieve the objects from the database and then call compute_legs() and either update_trip_with_single_leg() or replace_trip_by_legs(), depending if there is one or more legs.

replace_trip_by_legs(original_trip: Trip, legs: list[tuple[list[StopTime], Any]], db: DBConnection) None

Replaces an existing Trip by multiple instances, as represented by legs. Called by process_trip() for trips with multiple legs.

The default implementation removes the original_trip, and then for every leg:

  • creates a new trip for each leg, as modified by update_trip(), with the ID suffixed by _0, _1, _2, …;

  • re-inserts StopTimes with only their trip_id changed;

  • creates Transfers between every leg, as returned by create_transfer().

save_bus_replacement_route_in_db(original_route_id: str, new_route_id: str, db: DBConnection) None

Saves a bus replacement route in the database. The default behavior is to create a copy of the original Route, call update_bus_replacement_route(), followed by db.create.

select_trip_ids(db: DBConnection) Iterable[str]

Selects which trips should be processed by this step. Defaults to all trips belonging to routes selected by route_selector.

update_bus_replacement_route(route: Route) None

Updates the attributes of a bus-replacement route. Defaults to setting the type to BUS.

update_trip(trip: Trip, data: Any, db: DBConnection) None

Modifies the attributes of a Trip representing a single leg. Called by update_trip_with_single_leg() and replace_trip_by_legs().

The default behavior depends on the value of data. If it is truthy, the trip’s route_id is suffixed by _BUS, and a new route is created by calling save_bus_replacement_route_in_db() (if it was not created before, as indicated by the added_routes set). Otherwise, the trip is left as-is.

update_trip_with_single_leg(trip: Trip, data: Any, db: DBConnection) None

Called by process_trip() for trips with a single leg. The default implementation simply calls update_trip() followed by db.update if data is truthy.

whole_trip_is_replacement_bus(trip: Trip) bool

Returns True if the whole Trip is operated by a replacement bus service. Defaults to searching the replacement_bus_short_name_pattern in the Trip.short_name. If replacement_bus_short_name_pattern is None, returns False.

Leg

alias of tuple[list[StopTime], Any]

added_routes: set[str]
replacement_bus_short_name_pattern: Pattern[str] | None

Trip.short_name pattern indicating that the whole trip is operated by a bus replacement service.

route_selector: Routes

Selects which routes’ trips should be separated by this step.

class impuls.tasks.TruncateCalendars(target: EmptyDateRange | InfiniteDateRange | LeftUnboundedDateRange | RightUnboundedDateRange | BoundedDateRange, fail_on_empty: bool = True)

Bases: Task

TruncateCalendars removes any services beyond the provided range.

For simplicity, all Calendars are converted to exception-based (all active dates represented by CalendarException).

apply_changes(db: DBConnection) None
check_if_empty() None
clear_state() None
compute_changes(db: DBConnection) None
compute_truncated_days_of(calendar: Calendar, db: DBConnection) set[Date]
drop_calendars(db: DBConnection) None
execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

make_all_calendars_use_exceptions(db: DBConnection) None
set_exceptions_on_calendars(db: DBConnection) None
fail_on_empty: bool
target: EmptyDateRange | InfiniteDateRange | LeftUnboundedDateRange | RightUnboundedDateRange | BoundedDateRange