impuls.multi_file¶
- class impuls.multi_file.MultiFile(options: PipelineOptions, intermediate_provider: IntermediateFeedProvider[ResourceT_co], intermediate_pipeline_tasks_factory: Callable[[IntermediateFeed[LocalResource]], list[Task]], pre_merge_pipeline_tasks_factory: Callable[[IntermediateFeed[LocalResource]], list[Task]] = empty_tasks_factory, final_pipeline_tasks_factory: Callable[[list[IntermediateFeed[LocalResource]]], list[Task]] = empty_tasks_factory)¶
Bases:
Generic[ResourceT_co]MultiFile prepares
Pipelinesand multipleResourceobjects for creating a single, continuous database, when the source data is available in multiple disjoint files.This is a solution to a common problem. Say the source data has the following files:
2023-04-01.txt
2023-04-17.txt
2023-05-01.txt
But the result is supposed to be a single GTFS feed.
To be on the same page, further terminology needs to be introduced:
“intermediate” and “version” refer to sole, disjoined feed
“intermediate input” refers to an intermediate feed in an arbitrary format
“intermediate database” refers to an intermediate feed stored as an Impuls database
“final” refers to the coherent, merged feed
MultiFile will preserve intermediate inputs across runs, avoiding re-downloading. Intermediate databases will also be preserved across runs - reducing the need to re-create them. If all
IntermediateFeedhave not changed -InputNotModifiedwill be raised and no actual work will be performed.Special folders in the workspace directory will be used to store intermediate inputs and intermediate databases. Running multiple programs accessing the same workspace can cause unexpected issues and is not supported.
Unfortunately, as of now, MultiFile ignores changes in additional
Resource- only changes in the intermediate inputs cause Pipelines to be created.Several Pipeline options change their meaning in the MultiFile context:
from_cache: nothing is ever fetched; additional_resources must be either cached or local, all cached intermediate inputs are used, bypassing the
IntermediateFileProvider. If the intermediate databases are up-to-date, only the final pipeline is created, unless force_run is also set to True.force_run: any cached intermediate databases are ignored - in other words every intermediate input will have a corresponding intermediate pipeline created. The final pipeline is also created.
The process of creating all of the necessary pipelines can be summarized in 5 steps:
Figure out which intermediate feeds are needed
Remove stale and no-longer-needed cached intermediate inputs and databases
Fetch missing intermediate inputs
Prepare intermediate pipelines for missing local feeds
Prepare final pipeline for merging intermediate feeds
- final_pipeline_tasks_factory() list[Task]¶
Factory for tasks applied on the final pipeline.
A
Mergetask is prepended to the returned list, based on the needed intermediate feeds (as returned by aIntermediateFeedProvider).The factory must not modified the provided list of intermediate feeds.
The returned objects might be mutated - this function should always return new instances of Tasks in a new list.
- intermediate_dbs_path() Path¶
- intermediate_inputs_path() Path¶
- pre_merge_pipeline_tasks_factory() list[Task]¶
Factory for tasks applied right before merging. Runs as a sub-pipeline of the final pipeline, see
impuls.tasks.merge.DatabaseToMerge.pre_merge_pipeline.A
impuls.tasks.TruncateCalendarstask is prepended to the returned list, based on the start_date attribute of the next intermediate feed.The returned objects might be mutated - this function should always return new instances of Tasks in a new list.
- prepare() Pipelines¶
Returns the
Pipelinesnecessary to produce a single, merged feed.Raises
InputNotModifiedif the intermediate inputs have not changed, barring other options like from_cache or force_run.
- additional_resources: Mapping[str, Resource]¶
Additional resources, made available for all intermediate and final pipelines.
- distance_between_similar_stops_m: float = 10.0¶
Passed through to the
Mergetask - maximum distance for stops to be considered similar.
- feed_version_separator: str = '/'¶
Passed through to the
Mergetask - used for delimiting feed_version in a single FeedInfo object.
- intermediate_pipeline_tasks_factory: Callable[[IntermediateFeed[LocalResource]], list[Task]]¶
Factory for tasks needed to turn an intermediate input to an intermediate database.
The returned objects might be mutated - this function should always return new instances of Tasks in a new list.
- intermediate_provider: IntermediateFeedProvider[ResourceT_co]¶
intermediate_provider is responsible for calculating which intermediate feeds are required to create the final database.
- merge_separator: str = ':'¶
Passed through to the
Mergetask - used for delimiting id fields and a unique prefix.
- options: PipelineOptions¶
Options for the MultiFile process and created
Pipelines.
- class impuls.multi_file.CachedFeedMetadata¶
Bases:
TypedDictJSON object used to preserve IntermediateFeed data across runs, stored in workspace/intermediate_inputs/{version}.metadata
- fetch_time: float¶
- last_modified: float¶
- start_date: str¶
- version: str¶
- class impuls.multi_file.IntermediateFeed(resource: ResourceT_co, resource_name: str, version: str, start_date: Date)¶
Bases:
Generic[ResourceT_co]IntermediateFeed represents self-contained schedules for a set period of time - a single version of timetables.
- as_cached_feed_metadata() CachedFeedMetadata¶
Returns attributes of this IntermediateFeed as CachedFeedMetadata for preserving them across runs.
- as_local_resource(stored_at: Path) IntermediateFeed[LocalResource]¶
as_local_resource returns the same IntermediateFeed, but with the resource replaced by a LocalResource stored at the provided path. Resource metadata (last_modified and fetch_time) are also copied.
- static from_cached_feed_metadata(r: LocalResource, d: CachedFeedMetadata) IntermediateFeed[LocalResource]¶
Creates an IntermediateFeed from loaded metadata and a LocalResource.
- resource: ResourceT_co¶
resources represents arbitrary data containing schedule data by a
Resource. This resource’s last_modified time must be filled in by theIntermediateFeedProvider- and must be available before the first call to fetch.
- resource_name: str¶
resource_name is a string used for identifying the resource. This should be the version string plus an appropriate file extension.
- version: str¶
version is an arbitrary string identifying the feed.
- class impuls.multi_file.IntermediateFeedProvider(*args, **kwargs)¶
Bases:
Protocol[ResourceT]IntermediateFeedProvider is an abstraction over an external repository of versioned schedules. The provider is responsible for communicating with the external repository and figuring out which feeds are needed to create a complete database.
In most cases, this boils down to calling an external API enumerating all files required to form a coherent and continuous dataset, and returning a list of
IntermediateFeedwith the sameResourcetype.- needed() list[IntermediateFeed[ResourceT]]¶
- class impuls.multi_file.Pipelines(intermediates: list[Pipeline], final: Pipeline)¶
Bases:
NamedTuplePipelines is the result of resolving multi-file feed.
The final pipeline creates a single merged database from feeds indicated by an IntermediateFeedProvider.
Intermediate pipelines are used to create databases corresponding to missing individual feeds. The intermediate pipelines export database resources required by the final pipeline.
- run() None¶
run runs all pipelines in order
- impuls.multi_file.prune_outdated_feeds(feeds: list[IntermediateFeed[ResourceT]], today: Date) None¶
Removes feeds which end before
today.