pipeline

Functional pipeline steps and utilities.

undertale.pipeline.Cluster(type: str, parallelism: int = 1, cores: int = 1, memory: str = '16GB', partition: str | None = None, logs: str | None = None)

A factory method returning a Dask-compatible Cluster.

Parameters:
  • type – The type of Cluster to create. See CLUSTER_TYPES.

  • parallelism – The maximum degree of parallelism.

  • cores – The number of cores per worker (SLURM).

  • memory – The maximum amount of memory per worker.

  • partition – The SLURM partition to use.

  • logs – A path to a logging directory - if not provided, the current directory will be used.

Returns:

An instance of the requested Cluster type. Can be used as a context manager.

undertale.pipeline.Client(*args, **kwargs)

Wraps a Dask Client.

undertale.pipeline.merge(client: Client, objects: List) Future

Merge a list of objects into a single future.

This is somehow not functionality that Dask provides natively, but is useful if you need to express a joint dependency on several objects/futures.

Parameters:
  • client – The Dask Client where tasks should be issued.

  • objects – A list of objects to be merged - can be Futures of objects.

Returns:

A single Future wrapping the list of objects

undertale.pipeline.fanout(client: Client, function: Callable, inputs: Future, output: str, *args, **kwargs) Future

Executes a given callable across all elements of some iterable Future.

The given callable should expect a single input file path and generate a single output file, returning its path.

This is equivalent to the following:

[ function(i, output, **kwargs) for i in inputs ]
Parameters:
  • client – The Dask Client where tasks should be issued.

  • function – A callable function to run on each input.

  • inputs – A Future containing a list of object over which to run.

  • output – The output location where processed results should go.

  • *args – Any other arguments to be passed to function.

  • **kwargs – Any other keyword arguments to be passed to function.

Returns:

A future representing a list of results returned by function applied to each of inputs.

Warning

This function will block on inputs and then materializes it. You should probably not plan to pass a lot of data through inputs, instead pass a list of filenames to process.

undertale.pipeline.flush(client: Client) None

Flush the dask client and gracefully shutdown workers.

This should be run at the end of pipelines to ensure graceful worker shutdown and work completion.

Parameters:

client – The Dask Client where tasks are run.

Modules

binary

Binary segmentation, disassembly, and decompilation.

cpp

C/C++ compilation.

dask

Custom Dask wrappers and utilities.

json

JSON parsing.

parquet

Parquet parsing.

tarfile

Tar file parsing.