Source code for seaquest.launcher

import itertools
import pathlib

from kubernetes import client
from yaml import safe_dump

from seaquest.utils.loggus import init_logger

logger = init_logger(__name__ if __name__ != "__main__" else pathlib.Path(__file__).stem, level="debug")


[docs] def _prepare_afinity(job_config: dict) -> client.V1Affinity: """create the affinity object for the k8s job Parameters ---------- job_config: dict Dictionary of parsed job configuration arguments Returns ------- affinity: client.V1Affinity kubernetes affinity object """ return client.V1Affinity( # TODO: make this configurable node_affinity=client.V1NodeAffinity( required_during_scheduling_ignored_during_execution=client.V1NodeSelector( node_selector_terms=[client.V1NodeSelectorTerm( match_expressions=[client.V1NodeSelectorRequirement( key="nvidia.com/gpu.product", operator="In", values=[job_config["graphics-card"]], )] )] ) ) )
[docs] def _prepare_containers(config: dict, job_name: str, pvc: str, arguments: dict) -> list[client.V1Container]: """create the container objects for the k8s job Parameters ---------- config: dict Dictionary of parsed job configuration arguments job_name: str The name of the job arguments: dict A list of arguments to pass to the runner module that will run inside the job Returns ------- containers: list[client.V1Container] list of kubernetes container objects """ output_dir_name = arguments["-od"] requirements_path = str(pathlib.PurePosixPath(arguments["-md"]).joinpath("requirements.txt")) arguments["-od"] = str(pathlib.PurePosixPath("/").joinpath(pvc).joinpath(output_dir_name)) list_arguments = " ".join(itertools.chain.from_iterable(zip(list(arguments.keys()), list(arguments.values())))) return [client.V1Container( name="program", image="iancuonescu/seaquest:latest", #image="alpine:latest", #image="huggingface/transformers-pytorch-gpu:4.41.2", # TODO: make this configurable volume_mounts=[client.V1VolumeMount( name=pvc, mount_path="/{pvc}".format(pvc=pvc) )], resources=client.V1ResourceRequirements( limits=config["limits"], requests=config["requests"] ), command=[ "/bin/sh", "-c" # 'tail', '-f', '/dev/null' ], args=[ "cd {pvc} && \ mkdir {copy_dir} -p && \ cp -R {source} {dest} && \ rm -R {source} && \ mkdir {output_dir} -p && \ cd {copy_dir} && \ python -m pip install -r {reqs} --no-build-isolation &&\ python -m seaquest.runner -cf /{pvc}/{copy_dir}/{source}/runner.yaml {arguments}".format(pvc=pvc, copy_dir=job_name, \ source=arguments["-md"], dest=job_name, \ output_dir=output_dir_name, \ reqs=requirements_path, \ arguments=list_arguments), # TODO: make this prettier (perhaps a shell script) ], )]
[docs] def _prepare_volumes(pvc: str) -> list[client.V1Volume]: """create the volume objects for the k8s job Parameters ---------- pvc: str Name of the PVC Returns ------- volumes: list[client.V1Volume] List of kubernetes volume objects to mount to the job """ return [client.V1Volume( name=pvc, persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(pvc) )]
[docs] def _prepare_job_spec(job_config: dict, job_name: str, pvc: str, arguments: dict) -> client.V1JobSpec: """create the specification for the k8s job Parameters ---------- job_config: dict Dictionary of job configuration parameters job_name: str Name of the job pvc: str Name of the PVC arguments: dict List of arguments to pass to the runner module that runs inside the job Returns ------- status: client.V1JobStatus Kubernetes job spec object """ return client.V1JobSpec( backoff_limit=0, # TODO: make this configurable? # ttl_seconds_after_finished = 0, : # TODO: make this configurable template=client.V1PodTemplateSpec( spec=client.V1PodSpec( affinity=_prepare_afinity(job_config) if "graphics-card" in job_config else None, containers=_prepare_containers(job_config["resources"], job_name, pvc, arguments), volumes=_prepare_volumes(pvc), restart_policy="Never" ) ) )
[docs] def _launch_job(api_instance: client.BatchV1Api, namespace: str, job_name:str, job_config: dict, pvc: str, arguments: dict) -> None: """launches a kubernetes job based on the provided configuration Parameters ---------- api_instance: kubernetes client Kubernetes client namespace: str Kubernetes namespace job_name: str Name of the job job_config: dict Dictionary of job configuration parameters pvc: str Name of the PVC arguments: dict List of arguments to pass to the runner module that runs inside the job Returns ------- None """ logger.info("Launching job {job} ...".format(job=job_name)) body = client.V1Job( api_version="batch/v1", kind="Job", metadata=client.V1ObjectMeta(name=job_name, namespace=namespace), spec=_prepare_job_spec(job_config, job_name, pvc, arguments), status=client.V1JobStatus(), # None? ) api_instance.create_namespaced_job(namespace, body) logger.info("Succesfully launched job {job}!".format(job=job_name))
[docs] def create_jobs(api_instance: client.BatchV1Api, num_jobs: int, namespace: str, job_config: dict, prefix: str, suffix: str, model_name:str, model_fun: str, pvc: str, model_dir: str, data_file: str) -> list: """Launches 'num_jobs' kubernetes jobs based on the specified configuration using a unique naming assignment formed of the model name and the data file Parameters ---------- api_instance: kubernetes client Kubernetes client num_jobs: int Number of jobs to launch namespace: str Kubernetes namespace job_config: dict Dictionary of job configuration parameters prefix: str Prefix for the job name suffix: str Suffix for the job name model_name: str Name of the model to experiment with model_fun: str Name of the function called (either 'train' or 'infer') pvc: str Name of the PVC model_dir: str Name of the model directory data_file: str Name of the data file Returns ------- all_created_jobs: list A list of names of the succesfully launched jobs """ all_created_jobs = [] for idx in range(num_jobs): job_name = "{prefix}-{model}-{fun}-{data_file}-job{suffix}-{idx}".format(prefix=prefix, model=model_name, fun=model_fun, idx=idx, data_file=data_file, suffix="" if suffix is None else "-{s}".format(s=suffix)) job_name = job_name.lower() # kube convention arguments = { "-od": "{job_name}_output".format(job_name=job_name), "-md": model_dir } try: _launch_job(api_instance, namespace, job_name, job_config, pvc, arguments) except Exception as e: logger.critical("Failed to launch job number {num} due to error {err}".format(num=idx, err=e)) raise e all_created_jobs.append(job_name) return all_created_jobs