pipeline¶
Functional pipeline steps and utilities.
- undertale.pipeline.Cluster(type: str, parallelism: int = 1, cores: int = 1, memory: str = '16GB', partition: str | None = None, logs: str | None = None)¶
A factory method returning a Dask-compatible Cluster.
- Parameters:
type – The type of Cluster to create. See
CLUSTER_TYPES.parallelism – The maximum degree of parallelism.
cores – The number of cores per worker (SLURM).
memory – The maximum amount of memory per worker.
partition – The SLURM partition to use.
logs – A path to a logging directory - if not provided, the current directory will be used.
- Returns:
An instance of the requested Cluster type. Can be used as a context manager.
- undertale.pipeline.Client(*args, **kwargs)¶
Wraps a Dask Client.
- undertale.pipeline.merge(client: Client, objects: List) Future¶
Merge a list of objects into a single future.
This is somehow not functionality that Dask provides natively, but is useful if you need to express a joint dependency on several objects/futures.
- Parameters:
client – The Dask Client where tasks should be issued.
objects – A list of objects to be merged - can be Futures of objects.
- Returns:
A single Future wrapping the list of objects
- undertale.pipeline.fanout(client: Client, function: Callable, inputs: Future, output: str, *args, **kwargs) Future¶
Executes a given callable across all elements of some iterable Future.
The given callable should expect a single input file path and generate a single output file, returning its path.
This is equivalent to the following:
[ function(i, output, **kwargs) for i in inputs ]
- Parameters:
client – The Dask Client where tasks should be issued.
function – A callable function to run on each input.
inputs – A Future containing a list of object over which to run.
output – The output location where processed results should go.
*args – Any other arguments to be passed to
function.**kwargs – Any other keyword arguments to be passed to
function.
- Returns:
A future representing a list of results returned by
functionapplied to each ofinputs.
Warning
This function will block on
inputsand then materializes it. You should probably not plan to pass a lot of data throughinputs, instead pass a list of filenames to process.
- undertale.pipeline.flush(client: Client) None¶
Flush the dask client and gracefully shutdown workers.
This should be run at the end of pipelines to ensure graceful worker shutdown and work completion.
- Parameters:
client – The Dask Client where tasks are run.
Modules
Binary segmentation, disassembly, and decompilation. |
|
C/C++ compilation. |
|
Custom Dask wrappers and utilities. |
|
JSON parsing. |
|
Parquet parsing. |
|
Tar file parsing. |