omni.pipelines.common#
Functions
|
Hashes a function into unique bytes. |
|
Redefines a result key in the dictionary to a different key. |
Classes
|
This class defines a processing pipeline linking multiple Stages. |
|
Stage object representing one node of a processing pipeline. |
- class omni.pipelines.common.Pipeline(definition: List)[source]#
This class defines a processing pipeline linking multiple Stages.
The definition list accepts a tuple of Stage objects with the following syntax:
- Parameters:
- definition: List
A list of tuples containing Stage objects that define how a pipeline is connected.
Examples
>>> # the start keyword is for stages that do not have input from other stages >>> spec = [("start", stage0), (stage0, stage1), ((stage0, stage1), stage2)] >>> # last entry has multiple inputs to another stage >>> # create the pipeline object >>> pipeline = Pipeline(spec) >>> # run the pipeline >>> pipeline.run()
Stages are run in the order that they are defined in (In the example above: stage0 -> stage1 -> stage2).
- Attributes:
results
Dict: A dictionary of the output return values for the pipeline.
Methods
run:
Runs the pipeline.
- property results: Dict#
Dict: A dictionary of the output return values for the pipeline.
- class omni.pipelines.common.Stage(function_to_call: Callable, stage_outputs: Optional[List] = None, hash_output: Optional[str] = None, stage_name: Optional[str] = None, **kwargs)[source]#
Stage object representing one node of a processing pipeline.
Constructs a stage object that wraps a callable for use in a pipeline. When the run method is called the callable is executed and it’s return value is stored in a result dictionary that can be accessed by the results property. The return value is parsed into this dictionary based on the list provide by stage_outputs (The position of each string in the list corresponds to each positional return value).
Note: Stage objects convert their results to JSON so they can be written to file. This procedure will convert certain data structures (e.g. tuples to list, etc) and give unexpected results if not careful. In general, the current workaround is to use JSON compatible data structures for your function returns.
- Parameters:
- function_to_call: Callable
A callable to wrap that the stage executes.
- stage_outputs: List[str]
A list of strings to label each return value from the function_to_call.
- hash_output: str
A directory that should be created or checked to cache the stage execution.
- stage_name: str
Override the name for the stage.
- Attributes:
args
Dict: A dictionary of only the provided input arguments to the stage on construction.
input_args
Dict: A dictionary of all input arguments to the stage.
inputs
List[str]: A list of input argument names for the stage.
outputs
List[str]: A list of output argument names for the stage.
results
Dict: A dictionary of the output return values for the stage.
state
bool: A flag that specifies whether the current stage has been run
Methods
run:
Run the stage.
- property args: Dict#
Dict: A dictionary of only the provided input arguments to the stage on construction.
- property input_args: Dict#
Dict: A dictionary of all input arguments to the stage. Is only populated after the run method is invoked.
- property inputs: List[str]#
List[str]: A list of input argument names for the stage.
- property outputs: List[str]#
List[str]: A list of output argument names for the stage.
- property results: Dict#
Dict: A dictionary of the output return values for the stage.
- run(*args, force_skip_stage: bool = False, force_run_stage: bool = False, force_hash_write: bool = False, **kwargs) Dict [source]#
Call function and save outputs in result dictionary.
Runs the wrapped function_to_call with given args/kwargs. If any kwargs were specified on construction of the stage object, they will have precedence over any arguments specified in the calling run method. Hashes for the stage will also be checked to detect whether the stage should be run or not.
- Parameters:
- force_skip_stage: bool
Specifies whether this stage should be forcefully skipped
- force_run_stage: bool
Specifies whether this stage should be forcefully run. Has precedence over force_skip_stage
- force_hash_write: bool
Specifies whether the hash for this stage should be written out even if the stage has not run.
- property state: bool#
bool: A flag that specifies whether the current stage has been run (The callable has executed).
- omni.pipelines.common.get_func_hash(func: Union[Callable, code]) bytes [source]#
Hashes a function into unique bytes.
This function grabs relevant bytes from the function code object for hashing. It is ordered as the following:
consts, methods, code
In consts, the doc sting of the code object is removed and any embedded code objects are appended to the end of bytes array.
- Parameters:
- func: Union[Callable, CodeType]
Function to hash.
- Returns:
- bytes
Unique bytes representing the function.
- omni.pipelines.common.redefine_result_key(dictionary: Dict, from_key: str, to_key: str) Dict [source]#
Redefines a result key in the dictionary to a different key.
- Parameters:
- dictionary: Dict
Dictionary to change key of.
- from_key: str
Key to change.
- to_key: str
Key to replace with.
- Returns:
- Dict
New dictionary with replaced keys.
Examples
>>> dictionary = {"hello": 1, "test": 2} >>> new_dict = redefine_result_key(dictionary, "hello", "testing") >>> # new_dict is now: {"testing": 1, "test": 2}