impuls.tasks.merge

class impuls.tasks.merge.DatabaseToMerge(resource_name: str, prefix: str, pre_merge_pipeline: Pipeline | None = None)

Bases: object

DatabaseToMerge represents a database to-be-merged with the runtime database.

pre_merge_pipeline: Pipeline | None = None

Pipeline to run just before merging. This pipeline runs on a temporary copy of the database resource - any changes are not persistent across runs.

prefix: str

Prefix to add before IDs of copied entities.

resource_name: str

Name of the resource with the Impuls DB to be merged.

class impuls.tasks.merge.Merge(databases_to_merge: list[DatabaseToMerge], separator: str = ':', feed_version_separator: str = '/', distance_between_similar_stops_m: float = 10.0)

Bases: Task

Merge tasks inserts data from provided impuls databases into the current one.

The user must ensure that f"{db_to_merge.prefix}{separator}{id}" generates unique ids across all data. This especially applies if the runtime database already has data.

This tasks also performs merging of some entity types, provided they have the same ID:

  • Agency and Attribution instances are always merged - attributes of the first encountered instance are kept.

  • Calendar, CalendarException, FareAttribute, FareRule, ShapePoint, Trip, StopTime, Frequency and Transfer instances are never merged - incoming ids are always prefixed by the DatabaseToMerge.prefix and separator.

  • Route instances are merged if they have the same agency_id, short_name, type and color. Other attributes of the first encountered instance are kept. If the comparison attributes do not match, the incoming id will have a numeric suffix added.

  • Stop instances are merged if they have the same name, code, zone_id, location_type, parent_station, wheelchair_boarding, platform_code and are within distance_between_similar_stops_m (default 10 meters) of each other. Other attributes of the first encountered instance are kept. If the comparison attributes do not match, the incoming id will have a numeric suffix added.

  • Translation merging depends on the selector:

    • all feed_info translations are completely ignored, due to the too complex logic of FeedInfo merging;

    • field_value based translations are always merged - attributes of the first encountered instance are kept;

    • record_id based agency and attributions translations are always merged - attributes of the first encountered instance are kept;

    • record_id based stops and routes translations are merged - attributes of the first encountered instances are kept). Any id changes caused by Stop and Route merging also apply to the record_id;

    • record_id based trips and stop_times translations are never merged - incoming record_id is always prefixed by the DatabaseToMerge.prefix and separator.

  • FeedInfo is treated specially:

    • If it exists in the current (runtime) database, it is kept as-is, and any other instances are ignored.

    • If all to-merge databases have a FeedInfo object, then the attributes of the first encountered one are kept, except that:

      • feed_start_date is set to the min of all encountered feed_start_date

      • feed_end_date is set to the max of all encountered feed_end_date

      • feed_version is set to all encountered feed_versions, separated with feed_version_separator

    • Otherwise, no FeedInfo object will be created.

    The first case is meant for merging on smaller, helper datasets to an already-loaded major database. The second case serves merging versioned datasets, with the last case preventing any inconsistencies in the FeedInfo object.

  • Stop.zone_id is left untouched - effectively merging zones across datasets.

  • Trip.block_id and Trip.shape_id are prefixed with DatabaseToMerge.prefix - effectively never merging blocks or shapes across datasets.

collect_incoming_feed_info(db: DBConnection) None
execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

initialize_known_feed_info(db: DBConnection) None
initialize_known_objects(db: DBConnection) None
initialize_known_routes(db: DBConnection) None
initialize_known_stops(db: DBConnection) None
insert_feed_info(db: DBConnection) None
merge(db: DBConnection, incoming_path: str, incoming_prefix: str, pre_merge_pipeline: Pipeline | None = None) None
merge_agencies(db: DBConnection) None
merge_attributions(db: DBConnection) None
merge_calendar_exceptions(db: DBConnection) None
merge_calendars(db: DBConnection, incoming_prefix: str) None
merge_fares(db: DBConnection, incoming_prefix: str) None
merge_frequencies(db: DBConnection) None
merge_routes(db: DBConnection) None
merge_shapes(db: DBConnection, incoming_prefix: str) None
merge_stop_times(db: DBConnection) None
merge_stops(db: DBConnection) None
merge_transfers(db: DBConnection) None
merge_translations(db: DBConnection) None
merge_trips(db: DBConnection, incoming_prefix: str) None
merge_with_attached(db: DBConnection, incoming_prefix: str) None
resolve_route_conflicts(db: DBConnection) ConflictResolution
resolve_stop_conflicts(db: DBConnection) ConflictResolution
static run_pre_merge_pipeline(on: str, pipeline: Pipeline | None) None
databases_to_merge: list[DatabaseToMerge]

List of databases to merge into the runtime DB.

distance_between_similar_stops_m: float

How close should 2 stops with the same StopHash be in order to be merged. Defaults to 10 meters.

feed_version_separator: str

Separator for FeedInfo.version when a new FeedInfo is created based on the incoming databases. Defaults to /.

separator: str

Separator inserted between DatabaseToMerge.prefix and entity IDs. Defaults to :.

class impuls.tasks.merge.ConflictResolution(ids_to_change: list[tuple[str, str]], total: int, merged: int)

Bases: NamedTuple

ConflictResolution describes how IDs from an incoming DB need to be changed to avoid merge conflicts. Not all entities will have an entry in the ids_to_change list - such entities don’t need to be merged, as an entity in the runtime database already contains a similar-enough object.

ids_to_change: list[tuple[str, str]]

(new_id, old_id) pairs of ids that need to be changed

merged: int

Alias for field number 2

total: int

Alias for field number 1