import copy
import logging
import os
from pathlib import Path
from typing import Dict, List, Optional, Union
from runhouse.globals import obj_store
from runhouse.resources.envs.utils import (
_process_env_vars,
run_setup_command,
run_with_logs,
)
from runhouse.resources.folders import Folder
from runhouse.resources.hardware import _get_cluster_from, Cluster
from runhouse.resources.packages import Package
from runhouse.resources.resource import Resource
logger = logging.getLogger(__name__)
[docs]class Env(Resource):
RESOURCE_TYPE = "env"
[docs] def __init__(
self,
name: Optional[str] = None,
reqs: List[Union[str, Package]] = [],
setup_cmds: List[str] = None,
env_vars: Union[Dict, str] = {},
working_dir: Optional[Union[str, Path]] = None,
secrets: Optional[Union[str, "Secret"]] = [],
compute: Optional[Dict] = {},
dryrun: bool = True,
**kwargs, # We have this here to ignore extra arguments when calling from_config
):
"""
Runhouse Env object.
.. note::
To create an Env, please use the factory method :func:`env`.
"""
super().__init__(name=name, dryrun=dryrun)
self._reqs = reqs
self.setup_cmds = setup_cmds
self.env_vars = env_vars
self.working_dir = working_dir
self.secrets = secrets
self.compute = compute
@property
def env_name(self):
return self.name or "base"
[docs] @staticmethod
def from_config(config: dict, dryrun: bool = False, _resolve_children: bool = True):
"""Create an Env object from a config dict"""
config["reqs"] = [
Package.from_config(req, dryrun=True, _resolve_children=_resolve_children)
if isinstance(req, dict)
else req
for req in config.get("reqs", [])
]
config["working_dir"] = (
Package.from_config(
config["working_dir"], dryrun=True, _resolve_children=_resolve_children
)
if isinstance(config["working_dir"], dict)
else config["working_dir"]
)
resource_subtype = config.get("resource_subtype")
if resource_subtype == "CondaEnv":
from runhouse import CondaEnv
return CondaEnv(**config, dryrun=dryrun)
return Env(**config, dryrun=dryrun)
@staticmethod
def _set_env_vars(env_vars):
for k, v in env_vars.items():
os.environ[k] = v
def config(self, condensed=True):
config = super().config(condensed)
self.save_attrs_to_config(
config, ["setup_cmds", "env_vars", "env_name", "compute"]
)
config.update(
{
"reqs": [
self._resource_string_for_subconfig(package, condensed)
for package in self._reqs
],
"working_dir": self._resource_string_for_subconfig(
self.working_dir, condensed
),
}
)
return config
@property
def reqs(self):
return (self._reqs or []) + ([self.working_dir] if self.working_dir else [])
@reqs.setter
def reqs(self, reqs):
self._reqs = reqs
def _reqs_to(self, system: Union[str, Cluster], path=None, mount=False):
"""Send self.reqs to the system (cluster or file system)"""
new_reqs = []
for req in self.reqs:
if isinstance(req, str):
new_req = Package.from_string(req)
if isinstance(new_req.install_target, Folder):
req = new_req
if isinstance(req, Package) and isinstance(req.install_target, Folder):
req = (
req.to(system, path=path, mount=mount)
if isinstance(system, Cluster)
else req.to(system, path=path)
)
new_reqs.append(req)
if self.working_dir:
return new_reqs[:-1], new_reqs[-1]
return new_reqs, None
def _secrets_to(self, system: Union[str, Cluster]):
from runhouse.resources.secrets import Secret
new_secrets = []
for secret in self.secrets:
if isinstance(secret, str):
secret = Secret.from_name(secret)
if hasattr(secret, "path") and secret.path:
new_secrets.append(secret.to(system=system))
else:
new_secrets.append(secret.to(system=system, env=self))
return new_secrets
def _install_reqs(self, cluster: Cluster = None, reqs: List = None):
reqs = reqs or self.reqs
if reqs:
for package in reqs:
if isinstance(package, str):
pkg = Package.from_string(package)
if pkg.install_method in ["reqs", "local"] and cluster:
pkg = pkg.to(cluster)
elif hasattr(package, "_install"):
pkg = package
else:
raise ValueError(f"package {package} not recognized")
logger.debug(f"Installing package: {str(pkg)}")
pkg._install(env=self, cluster=cluster)
def _run_setup_cmds(self, cluster: Cluster = None, setup_cmds: List = None):
setup_cmds = setup_cmds or self.setup_cmds
if not setup_cmds:
return
for cmd in setup_cmds:
cmd = f"{self._run_cmd} {cmd}" if self._run_cmd else cmd
run_setup_command(cmd, cluster=cluster)
[docs] def install(self, force: bool = False, cluster: Cluster = None):
"""Locally install packages and run setup commands."""
# Hash the config_for_rns to check if we need to install
env_config = self.config()
# Remove the name because auto-generated names will be different, but the installed components are the same
env_config.pop("name")
install_hash = hash(str(env_config))
# Check the existing hash
if install_hash in obj_store.installed_envs and not force:
logger.debug("Env already installed, skipping")
return
obj_store.installed_envs[install_hash] = self.name
self._install_reqs(cluster=cluster)
self._run_setup_cmds(cluster=cluster)
def _run_command(self, command: str, **kwargs):
"""Run command locally inside the environment"""
if self._run_cmd:
command = f"{self._run_cmd} {command}"
logging.info(f"Running command in {self.name}: {command}")
return run_with_logs(command, **kwargs)
[docs] def to(
self, system: Union[str, Cluster], path=None, mount=False, force_install=False
):
"""
Send environment to the system (Cluster or file system).
This includes installing packages and running setup commands if system is a cluster.
Example:
>>> env = rh.env(reqs=["numpy", "pip"])
>>> cluster_env = env.to(my_cluster)
>>> s3_env = env.to("s3", path="s3_bucket/my_env")
"""
system = _get_cluster_from(system)
new_env = copy.deepcopy(self)
new_env.reqs, new_env.working_dir = self._reqs_to(system, path, mount)
new_env.secrets = self._secrets_to(system)
if isinstance(system, Cluster):
key = (
system.put_resource(new_env)
if new_env.name
else system.default_env.name
)
env_vars = _process_env_vars(self.env_vars)
if env_vars:
system.call(key, "_set_env_vars", env_vars)
if new_env.name:
system.call(key, "install", force=force_install)
else:
system.call(key, "_install_reqs", reqs=new_env.reqs)
system.call(key, "_run_setup_cmds", setup_cmds=new_env.setup_cmds)
return new_env
@property
def _activate_cmd(self):
return ""
@property
def _run_cmd(self):
return ""