Mapper#

Mapper is a built-in Module for parallelizing a function or module method over a list of inputs. It holds a pool of replicas of the function or module, distributes the inputs to the replicas, and collects the results. The replicas are either created by the mapper automatically, or they can be created by the user and passed into the mapper (or a mix of both). The advantage of that flexibility is that the mapper can call replicas on totally different infrastructure (e.g. if you have two different clusters).

When the mapper creates the replicas, it creates duplicate envs of the original mapped module’s env (e.g. env_replica_1), and sends the module into the replica env on the same cluster, thus creating many modules each in separate processes (and potentially separate nodes). Keep in mind that you must specify the compute resources (e.g. compute={“CPU”: 0.5}) in the env constructor if you have a multinode cluster and want the replica envs to overflow onto worker nodes.

The mapper then simply calls each in a threadpool and collects the results. By default, the threadpool is the same size as the number of replicas, so each thread blocks until a replica is available. You can control this by setting concurrency, which is the number of simultaneous calls that can be made to any replica (e.g. if concurrency is 2, then 2 threads will be calling each replica at the same time).

The mapper can either sit locally or on a cluster, but you should generally put it on the cluster if you can. If the mapper is local, you’ll need to send the mapped module to the cluster before passing it to the mapper, and the mapper will create each new replica on the cluster remotely, which will take longer. The communication between the mapper and the replicas will also be faster and more dependable if they are on the same cluster. Just note that if you’re sending or returning a large amount of data, it may take time to transfer before or after you see the loading bar when the mapper is actually processing. Generally you’ll get the best performance if you read and write to blob storage or the filesystem rather than sending the data around.

Mapper Factory Method#

runhouse.mapper(module: Module | Callable, method: str | None = None, replicas: None | int | List[Module] = None, concurrency: int = 1, **kwargs) Mapper[source]#

A factory method for creating Mapper modules. A mapper is a module that can map a function or module method over a list of inputs in various ways.

Parameters:
  • module (Module) – The module or function to be mapped.

  • method (Optional[str], optional) – The method of the module to be called. If the module is already a callable, this value defaults to "call".

  • concurrency (int, optional) – The number of concurrent calls to each replica, executed in separate threads. Defaults to 1.

  • replicas (Optional[List[Module]], optional) – List of user-specified replicas, or an int specifying the number of replicas to be automatically created. Defaults to None.

Returns:

The resulting Mapper object.

Return type:

Mapper

Example

>>> def local_sum(arg1, arg2, arg3):
>>>     return arg1 + arg2 + arg3
>>>
>>> # Option 1: Pass a function directly to the mapper, and send both to the cluster
>>> mapper = rh.mapper(local_sum, replicas=2).to(my_cluster)
>>> mapper.map([1, 2], [1, 4], [2, 3])
>>> # Option 2: Create a remote module yourself and pass it to the mapper, which is still local
>>> remote_fn = rh.function(local_sum).to(my_cluster, env=my_fn_env)
>>> mapper = rh.mapper(remote_fn, replicas=2)
>>> mapper.map([1, 2], [1, 4], [2, 3])
>>> # output: [4, 9]
>>> # Option 3: Create a remote module and mapper for greater flexibility, and send both to the cluster
>>> # You can map over a "class" module (stateless) or an "instance" module to preserve state
>>> remote_class = rh.module(cls=MyClass).to(system=cluster, env=my_module_env)
>>> stateless_mapper = rh.mapper(remote_class, method="my_class_method", replicas=2).to(cluster)
>>> mapper.map([1, 2], [1, 4], [2, 3])
>>> remote_app = remote_class()
>>> stateful_mapper = rh.mapper(remote_app, method="my_instance_method", replicas=2).to(cluster)
>>> mapper.map([1, 2], [1, 4], [2, 3])

Mapper Class#

class runhouse.Mapper(module: Module | None = None, method: str | None = None, replicas: None | int | List[Module] = None, concurrency=1, **kwargs)[source]#
__init__(module: Module | None = None, method: str | None = None, replicas: None | int | List[Module] = None, concurrency=1, **kwargs)[source]#

Runhouse Mapper object. It is used for mapping a function or module method over a list of inputs, across a series of replicas.

Note

To create a Mapper, please use the factory method mapper().

call(*args, method: str | None = None, **kwargs)[source]#

Call the function or method on a single replica.

Example

>>> def local_sum(arg1, arg2, arg3):
>>>     return arg1 + arg2 + arg3
>>>
>>> remote_fn = rh.function(local_sum).to(my_cluster)
>>> mapper = rh.mapper(remote_fn, replicas=2)
>>> for i in range(10):
>>>     mapper.call(i, 1, 2)
>>>     # output: 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, run in round-robin replica order
map(*args, method: str | None = None, retries: int = 0, **kwargs)[source]#

Map the function or method over a list of arguments.

Example

>>> mapper = rh.mapper(local_sum, replicas=2).to(my_cluster)
>>> mapper.map([1, 2], [1, 4], [2, 3], retries=3)
>>> # If you're mapping over a remote module, you can choose not to specify which method to call initially
>>> # so you can call different methods in different maps (note that our replicas can hold state!)
>>> # Note that in the example below we're careful to use the same number of replicas as data
>>> # we have to process, or the state in a replica would be overwritten by the next call.
>>> shards = len(source_paths)
>>> mapper = rh.mapper(remote_module, replicas=shards).to(my_cluster)
>>> mapper.map(*source_paths, method="load_data")
>>> mapper.map([]*shards, method="process_data")  # Calls each replica once with no args
>>> mapper.map(*output_paths, method="save_data")
starmap(arg_list: List, method: str | None = None, retries: int = 0, **kwargs)[source]#

Like map() except that the elements of the iterable are expected to be iterables that are unpacked as arguments. An iterable of [(1,2), (3, 4)] results in func(1,2), func(3,4)].

Example

>>> def local_sum(arg1, arg2, arg3):
>>>     return arg1 + arg2 + arg3
>>>
>>> remote_fn = rh.function(local_sum).to(my_cluster)
>>> mapper = rh.mapper(remote_fn, replicas=2)
>>> arg_list = [(1,2), (3, 4)]
>>> # runs the function twice, once with args (1, 2) and once with args (3, 4)
>>> mapper.starmap(arg_list)
to(system: str | Cluster, env: str | List[str] | Env | None = None, name: str | None = None, force_install: bool = False)[source]#

Put a copy of the Mapper and its internal module on the destination system and env, and return the new mapper.

Example

>>> local_mapper = rh.mapper(my_module, replicas=2)
>>> cluster_mapper = local_mapper.to(my_cluster)