omni.pipelines.common#

Functions

get_func_hash(func)

Hashes a function into unique bytes.

redefine_result_key(dictionary, from_key, to_key)

Redefines a result key in the dictionary to a different key.

Classes

Pipeline(definition)

This class defines a processing pipeline linking multiple Stages.

Stage(function_to_call[, stage_outputs, ...])

Stage object representing one node of a processing pipeline.

class omni.pipelines.common.Pipeline(definition: List)[source]#

This class defines a processing pipeline linking multiple Stages.

The definition list accepts a tuple of Stage objects with the following syntax:

Parameters:
definition: List

A list of tuples containing Stage objects that define how a pipeline is connected.

Examples

>>> # the start keyword is for stages that do not have input from other stages
>>> spec = [("start", stage0), (stage0, stage1), ((stage0, stage1), stage2)]
>>> # last entry has multiple inputs to another stage
>>> # create the pipeline object
>>> pipeline = Pipeline(spec)
>>> # run the pipeline
>>> pipeline.run()

Stages are run in the order that they are defined in (In the example above: stage0 -> stage1 -> stage2).

Attributes:
results

Dict: A dictionary of the output return values for the pipeline.

Methods

run:

Runs the pipeline.

property results: Dict#

Dict: A dictionary of the output return values for the pipeline.

run(*args, **kwargs) None[source]#

Runs the pipeline. Any stages linked to a “start” keyword accepts the input args/kwargs from this run method call.

class omni.pipelines.common.Stage(function_to_call: Callable, stage_outputs: Optional[List] = None, hash_output: Optional[str] = None, stage_name: Optional[str] = None, **kwargs)[source]#

Stage object representing one node of a processing pipeline.

Constructs a stage object that wraps a callable for use in a pipeline. When the run method is called the callable is executed and it’s return value is stored in a result dictionary that can be accessed by the results property. The return value is parsed into this dictionary based on the list provide by stage_outputs (The position of each string in the list corresponds to each positional return value).

Note: Stage objects convert their results to JSON so they can be written to file. This procedure will convert certain data structures (e.g. tuples to list, etc) and give unexpected results if not careful. In general, the current workaround is to use JSON compatible data structures for your function returns.

Parameters:
function_to_call: Callable

A callable to wrap that the stage executes.

stage_outputs: List[str]

A list of strings to label each return value from the function_to_call.

hash_output: str

A directory that should be created or checked to cache the stage execution.

stage_name: str

Override the name for the stage.

Attributes:
args

Dict: A dictionary of only the provided input arguments to the stage on construction.

input_args

Dict: A dictionary of all input arguments to the stage.

inputs

List[str]: A list of input argument names for the stage.

outputs

List[str]: A list of output argument names for the stage.

results

Dict: A dictionary of the output return values for the stage.

state

bool: A flag that specifies whether the current stage has been run

Methods

run:

Run the stage.

property args: Dict#

Dict: A dictionary of only the provided input arguments to the stage on construction.

property input_args: Dict#

Dict: A dictionary of all input arguments to the stage. Is only populated after the run method is invoked.

property inputs: List[str]#

List[str]: A list of input argument names for the stage.

property outputs: List[str]#

List[str]: A list of output argument names for the stage.

property results: Dict#

Dict: A dictionary of the output return values for the stage.

run(*args, force_skip_stage: bool = False, force_run_stage: bool = False, force_hash_write: bool = False, **kwargs) Dict[source]#

Call function and save outputs in result dictionary.

Runs the wrapped function_to_call with given args/kwargs. If any kwargs were specified on construction of the stage object, they will have precedence over any arguments specified in the calling run method. Hashes for the stage will also be checked to detect whether the stage should be run or not.

Parameters:
force_skip_stage: bool

Specifies whether this stage should be forcefully skipped

force_run_stage: bool

Specifies whether this stage should be forcefully run. Has precedence over force_skip_stage

force_hash_write: bool

Specifies whether the hash for this stage should be written out even if the stage has not run.

property state: bool#

bool: A flag that specifies whether the current stage has been run (The callable has executed).

omni.pipelines.common.get_func_hash(func: Union[Callable, code]) bytes[source]#

Hashes a function into unique bytes.

This function grabs relevant bytes from the function code object for hashing. It is ordered as the following:

consts, methods, code

In consts, the doc sting of the code object is removed and any embedded code objects are appended to the end of bytes array.

Parameters:
func: Union[Callable, CodeType]

Function to hash.

Returns:
bytes

Unique bytes representing the function.

omni.pipelines.common.redefine_result_key(dictionary: Dict, from_key: str, to_key: str) Dict[source]#

Redefines a result key in the dictionary to a different key.

Parameters:
dictionary: Dict

Dictionary to change key of.

from_key: str

Key to change.

to_key: str

Key to replace with.

Returns:
Dict

New dictionary with replaced keys.

Examples

>>> dictionary = {"hello": 1, "test": 2}
>>> new_dict = redefine_result_key(dictionary, "hello", "testing")
>>> # new_dict is now: {"testing": 1, "test": 2}