impuls.multi_file

class impuls.multi_file.MultiFile(options: PipelineOptions, intermediate_provider: IntermediateFeedProvider[ResourceT_co], intermediate_pipeline_tasks_factory: Callable[[IntermediateFeed[LocalResource]], list[Task]], pre_merge_pipeline_tasks_factory: Callable[[IntermediateFeed[LocalResource]], list[Task]] = empty_tasks_factory, final_pipeline_tasks_factory: Callable[[list[IntermediateFeed[LocalResource]]], list[Task]] = empty_tasks_factory)

Bases: Generic[ResourceT_co]

MultiFile prepares Pipelines and multiple Resource objects for creating a single, continuous database, when the source data is available in multiple disjoint files.

This is a solution to a common problem. Say the source data has the following files:

  • 2023-04-01.txt

  • 2023-04-17.txt

  • 2023-05-01.txt

But the result is supposed to be a single GTFS feed.

To be on the same page, further terminology needs to be introduced:

  • “intermediate” and “version” refer to sole, disjoined feed

  • “intermediate input” refers to an intermediate feed in an arbitrary format

  • “intermediate database” refers to an intermediate feed stored as an Impuls database

  • “final” refers to the coherent, merged feed

MultiFile will preserve intermediate inputs across runs, avoiding re-downloading. Intermediate databases will also be preserved across runs - reducing the need to re-create them. If all IntermediateFeed have not changed - InputNotModified will be raised and no actual work will be performed.

Special folders in the workspace directory will be used to store intermediate inputs and intermediate databases. Running multiple programs accessing the same workspace can cause unexpected issues and is not supported.

Unfortunately, as of now, MultiFile ignores changes in additional Resource - only changes in the intermediate inputs cause Pipelines to be created.

Several Pipeline options change their meaning in the MultiFile context:

  • from_cache: nothing is ever fetched; additional_resources must be either cached or local, all cached intermediate inputs are used, bypassing the IntermediateFileProvider. If the intermediate databases are up-to-date, only the final pipeline is created, unless force_run is also set to True.

  • force_run: any cached intermediate databases are ignored - in other words every intermediate input will have a corresponding intermediate pipeline created. The final pipeline is also created.

The process of creating all of the necessary pipelines can be summarized in 5 steps:

  1. Figure out which intermediate feeds are needed

  2. Remove stale and no-longer-needed cached intermediate inputs and databases

  3. Fetch missing intermediate inputs

  4. Prepare intermediate pipelines for missing local feeds

  5. Prepare final pipeline for merging intermediate feeds

final_pipeline_tasks_factory() list[Task]

Factory for tasks applied on the final pipeline.

A Merge task is prepended to the returned list, based on the needed intermediate feeds (as returned by a IntermediateFeedProvider).

The factory must not modified the provided list of intermediate feeds.

The returned objects might be mutated - this function should always return new instances of Tasks in a new list.

intermediate_dbs_path() Path
intermediate_inputs_path() Path
pre_merge_pipeline_tasks_factory() list[Task]

Factory for tasks applied right before merging. Runs as a sub-pipeline of the final pipeline, see impuls.tasks.merge.DatabaseToMerge.pre_merge_pipeline.

A impuls.tasks.TruncateCalendars task is prepended to the returned list, based on the start_date attribute of the next intermediate feed.

The returned objects might be mutated - this function should always return new instances of Tasks in a new list.

prepare() Pipelines

Returns the Pipelines necessary to produce a single, merged feed.

Raises InputNotModified if the intermediate inputs have not changed, barring other options like from_cache or force_run.

additional_resources: Mapping[str, Resource]

Additional resources, made available for all intermediate and final pipelines.

distance_between_similar_stops_m: float = 10.0

Passed through to the Merge task - maximum distance for stops to be considered similar.

feed_version_separator: str = '/'

Passed through to the Merge task - used for delimiting feed_version in a single FeedInfo object.

intermediate_pipeline_tasks_factory: Callable[[IntermediateFeed[LocalResource]], list[Task]]

Factory for tasks needed to turn an intermediate input to an intermediate database.

The returned objects might be mutated - this function should always return new instances of Tasks in a new list.

intermediate_provider: IntermediateFeedProvider[ResourceT_co]

intermediate_provider is responsible for calculating which intermediate feeds are required to create the final database.

merge_separator: str = ':'

Passed through to the Merge task - used for delimiting id fields and a unique prefix.

options: PipelineOptions

Options for the MultiFile process and created Pipelines.

class impuls.multi_file.CachedFeedMetadata

Bases: TypedDict

JSON object used to preserve IntermediateFeed data across runs, stored in workspace/intermediate_inputs/{version}.metadata

fetch_time: float
last_modified: float
start_date: str
version: str
class impuls.multi_file.IntermediateFeed(resource: ResourceT_co, resource_name: str, version: str, start_date: Date)

Bases: Generic[ResourceT_co]

IntermediateFeed represents self-contained schedules for a set period of time - a single version of timetables.

as_cached_feed_metadata() CachedFeedMetadata

Returns attributes of this IntermediateFeed as CachedFeedMetadata for preserving them across runs.

as_local_resource(stored_at: Path) IntermediateFeed[LocalResource]

as_local_resource returns the same IntermediateFeed, but with the resource replaced by a LocalResource stored at the provided path. Resource metadata (last_modified and fetch_time) are also copied.

static from_cached_feed_metadata(r: LocalResource, d: CachedFeedMetadata) IntermediateFeed[LocalResource]

Creates an IntermediateFeed from loaded metadata and a LocalResource.

resource: ResourceT_co

resources represents arbitrary data containing schedule data by a Resource. This resource’s last_modified time must be filled in by the IntermediateFeedProvider - and must be available before the first call to fetch.

resource_name: str

resource_name is a string used for identifying the resource. This should be the version string plus an appropriate file extension.

start_date: Date

start_date represents the first day for which this feed’s schedules are valid

version: str

version is an arbitrary string identifying the feed.

class impuls.multi_file.IntermediateFeedProvider(*args, **kwargs)

Bases: Protocol[ResourceT]

IntermediateFeedProvider is an abstraction over an external repository of versioned schedules. The provider is responsible for communicating with the external repository and figuring out which feeds are needed to create a complete database.

In most cases, this boils down to calling an external API enumerating all files required to form a coherent and continuous dataset, and returning a list of IntermediateFeed with the same Resource type.

needed() list[IntermediateFeed[ResourceT]]
class impuls.multi_file.Pipelines(intermediates: list[Pipeline], final: Pipeline)

Bases: NamedTuple

Pipelines is the result of resolving multi-file feed.

The final pipeline creates a single merged database from feeds indicated by an IntermediateFeedProvider.

Intermediate pipelines are used to create databases corresponding to missing individual feeds. The intermediate pipelines export database resources required by the final pipeline.

run() None

run runs all pipelines in order

final: Pipeline

Final Pipeline, taking all intermediate databases and merging them all together.

intermediates: list[Pipeline]

Pipelines creating all intermediate databases.

impuls.multi_file.empty_tasks_factory(*_: Any) list[Task]

Returns an empty task list.

impuls.multi_file.prune_outdated_feeds(feeds: list[IntermediateFeed[ResourceT]], today: Date) None

Removes feeds which end before today.