impuls¶
- class impuls.App(name: str | None = None, workspace_directory: Path = PosixPath('_impuls_workspace'))¶
Bases:
ABCApp is a helper abstract class for writing applications/scripts using Impuls. It provides a helper, glue code from main to running a
PipelineorPipelines(returned byMultiFile):class MyApp(impuls.App): def prepare( self, args: argparse.Namespace, options: impuls.PipelineOptions, ) -> impuls.Pipeline | impuls.multi_file.MultiFile[impuls.Resource]: ... # Prepare your own Pipeline or MultiFile if __name__ == "__main__": MyApp().run()
- add_arguments(parser: ArgumentParser) None¶
add_argument may be overwritten to add extra arguments to be parsed from the command line. Those arguments will then be provided to the
run()method.Several default arguments are always added, namely:
-f/--force-run,-c/--from-cache,-v/--verbose,-I/--input-not-modified-exit-code.
The first two options are used to create
PipelineOptions, while the last one is used when setting up logging.
- after_run() None¶
after_run may be overwritten to execute arbitrary actions after the Pipeline(s) are run. Default is to do nothing.
- before_run() None¶
before_run may be overwritten to execute arbitrary actions after
prepare()is called, but before the Pipeline(s) are run. Default is to do nothing.
- abstract prepare(args: Namespace, options: PipelineOptions) Pipeline | MultiFile[Resource]¶
prepare must be overwritten and must return a
Pipelineor aMultiFileto be run by the App.
- final run(args_str: list[str] | None = None) None¶
run parses command-line arguments (either from the provided list or sys.argv), prepares the Pipeline(s) and runs them.
- name: str¶
- workspace_directory: Path¶
- class impuls.DBConnection(path: str | PathLike[str])¶
Bases:
objectDBConnection represents a connection with an Impuls database.
This is a thin wrapper around sqlite3.Connection that uses ImpulsBase interface to provide a dumb ORM engine.
Transactions
The database is run in an auto-commit mode - the user is fully responsible for managing transactions: unless .begin() is used, statements implicitly begin and commit a transaction.
ORM substitutions
Typed queries work by substituting 3 keywords in the passed SQL:
:table- replaced with the table name:cols- replaced with comma-separated column names, in brackets:vals- replaced with question marks (corresponding to table columns), in brackets:set- replaced with “column_name=?, …”, without brackets:where- replaced with “primary_key_column=? AND …”, without brackets.
The substitutions maybe better explained by an example - to persist a CalendarException, it’s enough to write the following query:
INSERT INTO :table VALUES :vals;.Such query will be automatically expanded to the following:
INSERT INTO calendar_exceptions VALUES (?, ?, ?);Similarly
UPDATE :table SET :set WHERE :where;on CalendarException turns intoUPDATE calendar_exceptions SET calendar_id = ?, date = ?, exception_type = ? WHERE calendar_id = ? AND date = ?;Warning
This class assumes that LiteralStrings returned by the entities’ sql_* methods are safe to directly use in sql statements. It is the programmer’s responsibility to ensure so.
Closing the DB
DBConnection’s close() method releases resources held by the DBConnection. Any unclosed transactions are not closed.
DBConnection can be used in a
withstatement - and such connection will be automatically closed upon exit from the with block. (Note that this behavior is different to sqlite3.Connection)New SQL functions
For convenience several additional SQL function are provided, apart from those described at https://www.sqlite.org/lang_corefunc.html:
unicode_lower- equivalent to Python’s str.lowerunicode_upper- equivalent to Python’s str.upperunicode_casefold- equivalent to Python’s str.casefoldunicode_title- equivalent to Python’s str.titlere_sub- equivalent to Python’s re.sub
- __enter__() Self¶
- __exit__(*_: Any) None¶
- begin() None¶
Manually starts a transaction
- classmethod cloned(from_: str | PathLike[str], in_: str | PathLike[str]) Self¶
Creates a new database inside
in_with the contents offrom_. Returns a DBConnection to the new database.
- close() None¶
Closes any handles used to communicate with the sqlite3 engine. Any open transactions are not implicitly committed.
- commit() None¶
Commits an ongoing transaction, persisting any changes made to the DB
- count(typ: Type[EntityT]) int¶
Returns the amount of instances of the provided type
- create_many(typ: Type[EntityT], entities: Iterable[EntityT]) None¶
Creates multiple entries in the database
- classmethod create_with_schema(path: str | PathLike[str]) Self¶
Opens a new DB connection and executes DDL statements to prepare the database to hold Impuls model data.
- raw_execute(sql: str, parameters: Sequence[None | int | float | str] = ()) UntypedQueryResult¶
Executes a “raw” SQL query - no ORM substitutions are made in the query. The parameters and results are passed unchanged to/from the sqlite3 module.
- raw_execute_many(sql: str, parameters: Iterable[Sequence[None | int | float | str]]) UntypedQueryResult¶
Executes a “raw” SQL query multiple times - no ORM substitutions are made in the query.
The parameters and results are passed unchanged to/from the sqlite3 module.
Logically equivalent to:
for parameter in parameters: raw_execute(sql, parameter)
Except that results are collected into a single Cursor - which means SELECT queries can’t be used with this function.
- released() Generator[str, None, None]¶
Returns the path of the database, temporarily closing the connection.
Any operations on the database within the body of the contextmanager are not permitted. This is intended for interfacing with other programs which expect a path to a SQLite database.
- retrieve(typ: Type[EntityT], *pk: None | int | float | str) EntityT | None¶
Retrieves an object of type
typwith given primary key (usually its ID) from the database.Returns
Noneif no such object is found.
- retrieve_all(typ: Type[EntityT]) TypedQueryResult[EntityT]¶
Retrieves all objects of specific type from the database
- retrieve_must(typ: Type[EntityT], *pk: None | int | float | str) EntityT¶
Retrieves an object of type
typwith given primary key (usually its ID) from the database.Raises EmptyQueryResult if no such object is found.
- rollback() None¶
Rolls back an ongoing transaction, reverting any changes made to the DB
- transaction() Generator[Self, None, None]¶
Abstracts transactions in a
withblock.:with database.transaction(): do_something_on(database)
A transaction is opened at the entry to the with block. If an exception is raised in the body, the changes are rolled back. Otherwise, the changes are automatically committed.
- typed_in_execute(sql: str, parameters: Entity) UntypedQueryResult¶
Executes a “typed” SQL query - ORM substitutions are made to the query.
The
parametersobject is automatically converted to format accepted by the sqlite3 module. Results are passed unchanged.
- typed_in_execute_many(sql: str, typ: Type[EntityT], parameters: Iterable[EntityT]) UntypedQueryResult¶
Executes a “typed” SQL query - ORM substitutions are made to the query.
The
parametersobjects are automatically converted to format accepted by the sqlite3 module. Results are passed unchanged.Logically equivalent to:
for parameter in parameters: raw_execute(sql, parameter)
Except that results are collected into a single Cursor - which means SELECT queries can’t be used with this function.
- typed_out_execute(sql: str, typ: Type[EntityT], parameters: Sequence[None | int | float | str] = ()) TypedQueryResult[EntityT]¶
Executes a “typed” SQL query - ORM substitutions are made to the query.
The
parametersare passed unchanged to the sqlite3 module. Results are automatically converted to instances oftypImpulsBase objects.
- update_many(typ: Type[EntityT], entities: Iterable[EntityT]) None¶
Updates the attributes of multiple entries in the database
- property in_transaction: bool¶
Proxy to sqlite3.Connection.in_transaction - should be True if there’s an ongoing transaction.
- class impuls.HTTPResource(request: Request, session: Session | None = None)¶
Bases:
ConcreteResourceHTTPResource is a
Resourceon a remote server, accessible using HTTP or HTTPS.Due to limitation of the Last-Modified and If-Modified-Since headers, last_modified is precise only to the second, if the file server has updated the resource within less than a second, a conditional fetch may not catch such change.
- fetch(conditional: bool) Iterator[bytes]¶
fetch returns the content of the resource; preferably in chunks of
FETCH_CHUNK_SIZElength.last_modifiedandfetch_timeattributes of the should be updated right before the first chunk is returned.If the conditional is set, the Resource must raise InputNotModified if the resource was not modified since
last_modified. In this case,last_modifiedandfetch_timemust not be updated.
- classmethod get(url: str, /, params: Mapping[str, str] | Sequence[tuple[str, str]] | None = None, headers: Mapping[str, str] | None = None) Self¶
get creates a HTTPResource performing a GET request to the provided URL.
- Parameters:
params – Optional dictionary or a list of k-v tuples. Those parameters are appended to the URL.
headers – Optional dictionary of headers to send to the server.
- load_extra_metadata(metadata: dict[str, Any]) None¶
Invoked by Impuls resource mechanism to load extra metadata returned by
save_extra_metadata(). Not called if a resource has no extra metadata.
- classmethod post(url: str, /, params: Mapping[str, str] | Sequence[tuple[str, str]] | None = None, headers: Mapping[str, str] | None = None, data: str | bytes | Mapping[str, str] | Sequence[tuple[str, str]] | None = None, json: Any = None) Self¶
post creates a HTTPResource performing a POST request to the provided URL.
- Parameters:
params – Optional dictionary or a list of k-v tuples. Those parameters are appended to the URL.
headers – Optional dictionary of headers to send to the server.
data – Optional body to attach to the request. Apart from a string or bytes, this may be a dictionary or a list of k-v tuples - in this case
datais URL-form encoded before sending to the server.json – Optional the body to attach to the to the request, using JSON encoding. If both
dataandjsonis provided,datatakes precedence.
- save_extra_metadata() dict[str, Any] | None¶
Serializes any extra metadata into JSON to be preserved across runs.
If an empty dictionary or None is returned, extra metadata is not saved.
- etag: str | None¶
- request: Request¶
- session: Session¶
- class impuls.LocalResource(path: str | PathLike[str])¶
Bases:
ConcreteResourceLocalResource is a
Resourcelocated on the local filesystem.LocalResources are assumed to be always available, and thus don’t need to be cached.
This however introduces a few issues with the last_modified and fetch_times fields when within the
Pipeline:fetch_timeis always the same aslast_modified,last_modifiedis only updated before the pipeline starts
fetch_timethus is not the time when the file was last opened, and if the file was modified afterPipelinehas started,last_modifiedwon’t be updated; but new file content will be returned.Those quirks should not be an issue as long as:
the file is not modified while the pipeline is running,
the pipeline does not rely on the actual access time of the file.
- fetch(conditional: bool) Iterator[bytes]¶
fetch returns the content of the resource; preferably in chunks of
FETCH_CHUNK_SIZElength.last_modifiedandfetch_timeattributes of the should be updated right before the first chunk is returned.If the conditional is set, the Resource must raise InputNotModified if the resource was not modified since
last_modified. In this case,last_modifiedandfetch_timemust not be updated.
- update_last_modified(fake_fetch_time: bool = False) bool¶
update_last_modified refreshes the last_modified attribute to the modification time of the file; without fetching it.
If
fake_fetch_timeis set toTrue(it defaults toFalse),fetch_timeis also set to the last modification time.
- path: Path¶
- class impuls.Pipeline(tasks: list[Task], resources: Mapping[str, Resource] | None = None, options: PipelineOptions = PipelineOptions(force_run=False, from_cache=False, workspace_directory=PosixPath('_impuls_workspace')), name: str = '', db_path: str | PathLike[str] | None = None, run_on_existing_db: bool = False)¶
Bases:
objectPipeline encapsulates the process of downloading and processing multiple resources by a sequence of tasks.
- Parameters:
tasks (list[Task]) – List of
Taskinstances to be executed in the Pipelineresources (Mapping[str, Resource] | None) – Additional
Resourceinstances to be made available to the tasks being executed, by their name. Defaults to no additional resources.options (PipelineOptions) – Detailed options controlling the behavior of the Pipeline, usually controllable by the end-user. See the documentation for the class itself for more details.
name (str) – Prefix to be used by Pipeline and Task loggers. Defaults to no prefix.
db_path (StrPath | None) – Path where the SQLite database with data should be stored. Defaults to impuls.db inside of the workspace directory. For advanced usage only, the
SaveDBtask should be used.run_on_existing_db (bool) – Don’t clear the database before executing the Tasks; effectively assuming that the database stored at
db_pathexists and has the expected schema. Advanced usage only.
- open_db() DBConnection¶
open_db opens a
DBConnectionto an empty database stored in the workspace, following the Impulsmodel.Except that the database may not be stored in the workspace nor it may be empty, but this is reserved for advanced usage only.
- prepare_resources() None¶
prepare_resources ensures that all resources are cached and available locally. Raises
InputNotModifiedif none of the resources have changed since previous run, orMultipleDataErrorswithResourceNotCached.
- run() None¶
run ensures all resources are cached and then executes all tasks on a fresh database.
- class impuls.PipelineOptions(force_run: bool = False, from_cache: bool = False, workspace_directory: Path = PosixPath('_impuls_workspace'))¶
Bases:
objectPipelineOptions control the behavior of
Pipeline.- force_run: bool = False¶
force_run, when set to
True, suppresses theInputNotModifiederror and forces thePipelineto run.The default value is
False, andPipelineraisesInputNotModifiedif all resources were not modified.This option has no effect if there are no resources or
from_cacheis set - in those cases thePipelineruns unconditionally.
- from_cache: bool = False¶
from_cache, when set to
True, causes the Pipeline to never fetch any resource, forcing to use locally cached ones. If anyResourceis not cached,MultipleDataErrorwithResourceNotCachedwill be raised.Default value is
False.Forces the pipeline to run.
- class impuls.Resource¶
Bases:
ABCResource is the base abstract class describing any type of resource, which can be downloaded and used by the
Pipeline.This base class has a abstract method (
fetch()) and 2 abstract, settable properties. The latter are implemented byimpuls.resource.ConcreteResource(which stores those two properties) andimpuls.resource.WrappedResource(which delegates the calls to another Resource). When creating a new type of resource, inherit from either of those two classes instead.Extra attributes can be preserved across runs by overriding the
save_extra_metadata()andload_extra_metadata()methods.- abstract fetch(conditional: bool) Iterator[bytes]¶
fetch returns the content of the resource; preferably in chunks of
FETCH_CHUNK_SIZElength.last_modifiedandfetch_timeattributes of the should be updated right before the first chunk is returned.If the conditional is set, the Resource must raise InputNotModified if the resource was not modified since
last_modified. In this case,last_modifiedandfetch_timemust not be updated.
- load_extra_metadata(metadata: dict[str, Any]) None¶
Invoked by Impuls resource mechanism to load extra metadata returned by
save_extra_metadata(). Not called if a resource has no extra metadata.
- save_extra_metadata() dict[str, Any] | None¶
Serializes any extra metadata into JSON to be preserved across runs.
If an empty dictionary or None is returned, extra metadata is not saved.
- abstract property fetch_time: datetime¶
fetch_time contains the timestamp of the last successful call to fetch. Only available after a call to fetch. Must be an aware datetime instance.
- abstract property last_modified: datetime¶
last_modified contains the last update time of the resource. Only available after a call to fetch. Must be an aware datetime instance.
- class impuls.Task(name: str | None = None)¶
Bases:
ABCTask is the fundamental block of a
Pipeline, responsible for actually working on the data.- abstract execute(r: TaskRuntime) None¶
execute process the data in the runtime environment.
As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.
- logger: Logger¶
- name: str¶
- class impuls.TaskRuntime(db: DBConnection, resources: Mapping[str, ManagedResource], options: PipelineOptions)¶
Bases:
objectTaskRuntime is the argument passed to
Task.execute(), with the runtime environment for the task to act upon.- db: DBConnection¶
- options: PipelineOptions¶
- resources: Mapping[str, ManagedResource]¶
- impuls.initialize_logging(verbose: bool) None¶
Resets logging handlers to ensure only a single, logging.StreamHandler using Impuls’s custom
logsColoredFormatteroutputs onto the terminal (via stderr). Any other registered logging.Handlers printing to stdout or stderr are removed.