impuls

class impuls.App(name: str | None = None, workspace_directory: Path = PosixPath('_impuls_workspace'))

Bases: ABC

App is a helper abstract class for writing applications/scripts using Impuls. It provides a helper, glue code from main to running a Pipeline or Pipelines (returned by MultiFile):

class MyApp(impuls.App):
    def prepare(
        self,
        args: argparse.Namespace,
        options: impuls.PipelineOptions,
    ) -> impuls.Pipeline | impuls.multi_file.MultiFile[impuls.Resource]:
        ... # Prepare your own Pipeline or MultiFile

if __name__ == "__main__":
    MyApp().run()
add_arguments(parser: ArgumentParser) None

add_argument may be overwritten to add extra arguments to be parsed from the command line. Those arguments will then be provided to the run() method.

Several default arguments are always added, namely:

  • -f / --force-run,

  • -c / --from-cache,

  • -v / --verbose.

The first two options are used to create PipelineOptions, while the last one is used when setting up logging.

after_run() None

after_run may be overwritten to execute arbitrary actions after the Pipeline(s) are run. Default is to do nothing.

before_run() None

before_run may be overwritten to execute arbitrary actions after prepare() is called, but before the Pipeline(s) are run. Default is to do nothing.

abstract prepare(args: Namespace, options: PipelineOptions) Pipeline | MultiFile[Resource]

prepare must be overwritten and must return a Pipeline or a MultiFile to be run by the App.

final run(args_str: list[str] | None = None) None

run parses command-line arguments (either from the provided list or sys.argv), prepares the Pipeline(s) and runs them.

name: str
workspace_directory: Path
class impuls.DBConnection(path: str | PathLike[str])

Bases: object

DBConnection represents a connection with an Impuls database.

This is a thin wrapper around sqlite3.Connection that uses ImpulsBase interface to provide a dumb ORM engine.

Transactions

The database is run in an auto-commit mode - the user is fully responsible for managing transactions: unless .begin() is used, statements implicitly begin and commit a transaction.

ORM substitutions

Typed queries work by substituting 3 keywords in the passed SQL:

  • :table - replaced with the table name

  • :cols - replaced with comma-separated column names, in brackets

  • :vals - replaced with question marks (corresponding to table columns), in brackets

  • :set - replaced with “column_name=?, …”, without brackets

  • :where - replaced with “primary_key_column=? AND …”, without brackets.

The substitutions maybe better explained by an example - to persist a CalendarException, it’s enough to write the following query: INSERT INTO :table VALUES :vals;.

Such query will be automatically expanded to the following: INSERT INTO calendar_exceptions VALUES (?, ?, ?);

Similarly UPDATE :table SET :set WHERE :where; on CalendarException turns into UPDATE calendar_exceptions SET calendar_id = ?, date = ?, exception_type = ? WHERE calendar_id = ? AND date = ?;

Warning

This class assumes that LiteralStrings returned by the entities’ sql_* methods are safe to directly use in sql statements. It is the programmer’s responsibility to ensure so.

Closing the DB

DBConnection’s close() method releases resources held by the DBConnection. Any unclosed transactions are not closed.

DBConnection can be used in a with statement - and such connection will be automatically closed upon exit from the with block. (Note that this behavior is different to sqlite3.Connection)

New SQL functions

For convenience several additional SQL function are provided, apart from those described at https://www.sqlite.org/lang_corefunc.html:

  • unicode_lower - equivalent to Python’s str.lower

  • unicode_upper - equivalent to Python’s str.upper

  • unicode_casefold - equivalent to Python’s str.casefold

  • unicode_title - equivalent to Python’s str.title

  • re_sub - equivalent to Python’s re.sub

__enter__() Self
__exit__(*_: Any) None
begin() None

Manually starts a transaction

classmethod cloned(from_: str | PathLike[str], in_: str | PathLike[str]) Self

Creates a new database inside in_ with the contents of from_. Returns a DBConnection to the new database.

close() None

Closes any handles used to communicate with the sqlite3 engine. Any open transactions are not implicitly committed.

commit() None

Commits an ongoing transaction, persisting any changes made to the DB

count(typ: Type[EntityT]) int

Returns the amount of instances of the provided type

create(entity: Entity) None

Creates a new entity in the database

create_many(typ: Type[EntityT], entities: Iterable[EntityT]) None

Creates multiple entries in the database

classmethod create_with_schema(path: str | PathLike[str]) Self

Opens a new DB connection and executes DDL statements to prepare the database to hold Impuls model data.

raw_execute(sql: str, parameters: Sequence[None | int | float | str] = ()) UntypedQueryResult

Executes a “raw” SQL query - no ORM substitutions are made in the query. The parameters and results are passed unchanged to/from the sqlite3 module.

raw_execute_many(sql: str, parameters: Iterable[Sequence[None | int | float | str]]) UntypedQueryResult

Executes a “raw” SQL query multiple times - no ORM substitutions are made in the query.

The parameters and results are passed unchanged to/from the sqlite3 module.

Logically equivalent to:

for parameter in parameters:
    raw_execute(sql, parameter)

Except that results are collected into a single Cursor - which means SELECT queries can’t be used with this function.

released() Generator[str, None, None]

Returns the path of the database, temporarily closing the connection.

Any operations on the database within the body of the contextmanager are not permitted. This is intended for interfacing with other programs which expect a path to a SQLite database.

retrieve(typ: Type[EntityT], *pk: None | int | float | str) EntityT | None

Retrieves an object of type typ with given primary key (usually its ID) from the database.

Returns None if no such object is found.

retrieve_all(typ: Type[EntityT]) TypedQueryResult[EntityT]

Retrieves all objects of specific type from the database

retrieve_must(typ: Type[EntityT], *pk: None | int | float | str) EntityT

Retrieves an object of type typ with given primary key (usually its ID) from the database.

Raises EmptyQueryResult if no such object is found.

rollback() None

Rolls back an ongoing transaction, reverting any changes made to the DB

transaction() Generator[Self, None, None]

Abstracts transactions in a with block.:

with database.transaction():
    do_something_on(database)

A transaction is opened at the entry to the with block. If an exception is raised in the body, the changes are rolled back. Otherwise, the changes are automatically committed.

typed_in_execute(sql: str, parameters: Entity) UntypedQueryResult

Executes a “typed” SQL query - ORM substitutions are made to the query.

The parameters object is automatically converted to format accepted by the sqlite3 module. Results are passed unchanged.

typed_in_execute_many(sql: str, typ: Type[EntityT], parameters: Iterable[EntityT]) UntypedQueryResult

Executes a “typed” SQL query - ORM substitutions are made to the query.

The parameters objects are automatically converted to format accepted by the sqlite3 module. Results are passed unchanged.

Logically equivalent to:

for parameter in parameters:
    raw_execute(sql, parameter)

Except that results are collected into a single Cursor - which means SELECT queries can’t be used with this function.

typed_out_execute(sql: str, typ: Type[EntityT], parameters: Sequence[None | int | float | str] = ()) TypedQueryResult[EntityT]

Executes a “typed” SQL query - ORM substitutions are made to the query.

The parameters are passed unchanged to the sqlite3 module. Results are automatically converted to instances of typ ImpulsBase objects.

update(entity: Entity) None

Updates the attributes of an entity in the database

update_many(typ: Type[EntityT], entities: Iterable[EntityT]) None

Updates the attributes of multiple entries in the database

property in_transaction: bool

Proxy to sqlite3.Connection.in_transaction - should be True if there’s an ongoing transaction.

class impuls.HTTPResource(request: Request, session: Session | None = None)

Bases: ConcreteResource

HTTPResource is a Resource on a remote server, accessible using HTTP or HTTPS.

Due to limitation of the Last-Modified and If-Modified-Since headers, last_modified is precise only to the second, if the file server has updated the resource within less than a second, a conditional fetch may not catch such change.

fetch(conditional: bool) Iterator[bytes]

fetch returns the content of the resource; preferably in chunks of FETCH_CHUNK_SIZE length.

last_modified and fetch_time attributes of the should be updated right before the first chunk is returned.

If the conditional is set, the Resource must raise InputNotModified if the resource was not modified since last_modified. In this case, last_modified and fetch_time must not be updated.

classmethod get(url: str, /, params: Mapping[str, str] | Sequence[tuple[str, str]] | None = None, headers: Mapping[str, str] | None = None) Self

get creates a HTTPResource performing a GET request to the provided URL.

Parameters:
  • params – Optional dictionary or a list of k-v tuples. Those parameters are appended to the URL.

  • headers – Optional dictionary of headers to send to the server.

classmethod post(url: str, /, params: Mapping[str, str] | Sequence[tuple[str, str]] | None = None, headers: Mapping[str, str] | None = None, data: Mapping[str, str] | Sequence[tuple[str, str]] | None = None, json: Any = None) Self

post creates a HTTPResource performing a POST request to the provided URL.

Parameters:
  • params – Optional dictionary or a list of k-v tuples. Those parameters are appended to the URL.

  • headers – Optional dictionary of headers to send to the server.

  • data – Optional body to attach to the request. May be a dictionary or a list of k-v tuples - in this case data is URL-form encoded before sending to the server.

  • json – Optional the body to attach to the to the request, using JSON encoding. If both data and json is provided, data takes precedence.

request: Request
session: Session
class impuls.LocalResource(path: str | PathLike[str])

Bases: ConcreteResource

LocalResource is a Resource located on the local filesystem.

LocalResources are assumed to be always available, and thus don’t need to be cached.

This however introduces a few issues with the last_modified and fetch_times fields when within the Pipeline:

fetch_time thus is not the time when the file was last opened, and if the file was modified after Pipeline has started, last_modified won’t be updated; but new file content will be returned.

Those quirks should not be an issue as long as:

  • the file is not modified while the pipeline is running,

  • the pipeline does not rely on the actual access time of the file.

fetch(conditional: bool) Iterator[bytes]

fetch returns the content of the resource; preferably in chunks of FETCH_CHUNK_SIZE length.

last_modified and fetch_time attributes of the should be updated right before the first chunk is returned.

If the conditional is set, the Resource must raise InputNotModified if the resource was not modified since last_modified. In this case, last_modified and fetch_time must not be updated.

update_last_modified(fake_fetch_time: bool = False) bool

update_last_modified refreshes the last_modified attribute to the modification time of the file; without fetching it.

If fake_fetch_time is set to True (it defaults to False), fetch_time is also set to the last modification time.

path: Path
class impuls.Pipeline(tasks: list[Task], resources: Mapping[str, Resource] | None = None, options: PipelineOptions = PipelineOptions(force_run=False, from_cache=False, workspace_directory=PosixPath('_impuls_workspace')), name: str = '', db_path: str | PathLike[str] | None = None, run_on_existing_db: bool = False)

Bases: object

Pipeline encapsulates the process of downloading and processing multiple resources by a sequence of tasks.

Parameters:
  • tasks (list[Task]) – List of Task instances to be executed in the Pipeline

  • resources (Mapping[str, Resource] | None) – Additional Resource instances to be made available to the tasks being executed, by their name. Defaults to no additional resources.

  • options (PipelineOptions) – Detailed options controlling the behavior of the Pipeline, usually controllable by the end-user. See the documentation for the class itself for more details.

  • name (str) – Prefix to be used by Pipeline and Task loggers. Defaults to no prefix.

  • db_path (StrPath | None) – Path where the SQLite database with data should be stored. Defaults to impuls.db inside of the workspace directory. For advanced usage only, the SaveDB task should be used.

  • run_on_existing_db (bool) – Don’t clear the database before executing the Tasks; effectively assuming that the database stored at db_path exists and has the expected schema. Advanced usage only.

open_db() DBConnection

open_db opens a DBConnection to an empty database stored in the workspace, following the Impuls model.

Except that the database may not be stored in the workspace nor it may be empty, but this is reserved for advanced usage only.

prepare_resources() None

prepare_resources ensures that all resources are cached and available locally. Raises InputNotModified if none of the resources have changed since previous run, or MultipleDataErrors with ResourceNotCached.

run() None

run ensures all resources are cached and then executes all tasks on a fresh database.

class impuls.PipelineOptions(force_run: bool = False, from_cache: bool = False, workspace_directory: Path = PosixPath('_impuls_workspace'))

Bases: object

PipelineOptions control the behavior of Pipeline.

force_run: bool = False

force_run, when set to True, suppresses the InputNotModified error and forces the Pipeline to run.

The default value is False, and Pipeline raises InputNotModified if all resources were not modified.

This option has no effect if there are no resources or from_cache is set - in those cases the Pipeline runs unconditionally.

from_cache: bool = False

from_cache, when set to True, causes the Pipeline to never fetch any resource, forcing to use locally cached ones. If any Resource is not cached, MultipleDataError with ResourceNotCached will be raised.

Default value is False.

Forces the pipeline to run.

workspace_directory: Path = PosixPath('_impuls_workspace')

workspace_directory controls the directory where input resources are cached, and where tasks may store their workload to preserve it across runs.

If the given directory doesn’t exist, Pipeline attempts to create it (and its parents).

class impuls.Resource

Bases: ABC

Resource is the base abstract class describing any type of resource, which can be downloaded and used by the Pipeline.

This base class has a abstract method (fetch()) and 2 abstract, settable properties. The latter are implemented by impuls.resource.ConcreteResource (which stores those two properties) and impuls.resource.WrappedResource (which delegates the calls to another Resource). When creating a new type of resource, inherit from either of those two classes instead.

abstract fetch(conditional: bool) Iterator[bytes]

fetch returns the content of the resource; preferably in chunks of FETCH_CHUNK_SIZE length.

last_modified and fetch_time attributes of the should be updated right before the first chunk is returned.

If the conditional is set, the Resource must raise InputNotModified if the resource was not modified since last_modified. In this case, last_modified and fetch_time must not be updated.

abstract property fetch_time: datetime

fetch_time contains the timestamp of the last successful call to fetch. Only available after a call to fetch. Must be an aware datetime instance.

abstract property last_modified: datetime

last_modified contains the last update time of the resource. Only available after a call to fetch. Must be an aware datetime instance.

class impuls.Task(name: str | None = None)

Bases: ABC

Task is the fundamental block of a Pipeline, responsible for actually working on the data.

abstract execute(r: TaskRuntime) None

execute process the data in the runtime environment.

As of now, Tasks are guaranteed to run in a single thread with a single runtime, but execute may be called multiple times in different runtime. Thus, it is safe for Task implementations to hold some execute-related state, but that state should be reset on entry to execute.

logger: Logger
name: str
class impuls.TaskRuntime(db: DBConnection, resources: Mapping[str, ManagedResource], options: PipelineOptions)

Bases: object

TaskRuntime is the argument passed to Task.execute(), with the runtime environment for the task to act upon.

db: DBConnection
options: PipelineOptions
resources: Mapping[str, ManagedResource]
impuls.initialize_logging(verbose: bool) None

Resets logging handlers to ensure only a single, logging.StreamHandler using Impuls’s custom logsColoredFormatter outputs onto the terminal (via stderr). Any other registered logging.Handlers printing to stdout or stderr are removed.