Source code for seaquest.utils.pod
from random import randint
from time import sleep
from kubernetes import client
from .loggus import init_logger
logger = init_logger(__name__, level="debug")
[docs]
def make_pod_name_unique(api_instance: client.CoreV1Api, namespace: str, pod_name: str) -> str:
"""Make pod name unique by appending a random number if a pod with the same name exists
Parameters
----------
api_instance: kubernetes client
Kubernetes client
namespace: str
Namespace to get pods from
pod_name: str
Desired pod name
Returns
-------
str
Unique pod name
"""
all_pod_names = _get_list_of_pods(api_instance, namespace)
pod_name = "{pod}-{rand}".format(pod=pod_name, rand=randint(1000, 9999))
while pod_name in all_pod_names:
pod_name = "{pod}-{rand}".format(pod=pod_name, rand=randint(1000, 9999))
return pod_name
[docs]
def _get_list_of_pods(api_instance: client.CoreV1Api, namespace: str) -> list[str]:
"""Get list of pod names in a namespace
Parameters
----------
api_instance: kubernetes client
Kubernetes client
namespace: str
Namespace to get pods from
Returns
-------
list
List of pod names
"""
pods = api_instance.list_namespaced_pod(namespace=namespace)
pod_names = [pod.metadata.name for pod in pods.items]
return pod_names
[docs]
def _launch_pod(api_instance: client.CoreV1Api, namespace: str, pod_name:str, pvc_name: str) -> None:
"""Launch a pod that will be quickly deleted after the operations are done
Parameters
----------
api_instance: kubernetes client
Kubernetes client
namespace: str
Namespace to use for creating pod
pod_name: str
Name of pod
pvc_name: str
Name of pvc to bind the pod to
Returns
-------
None
"""
body = client.V1Pod(
api_version="v1",
kind="Pod",
metadata=client.V1ObjectMeta(name=pod_name, namespace=namespace),
spec=client.V1PodSpec(
containers=[
client.V1Container(
name="alpine",
image="alpine:latest",
command=['tail', '-f', '/dev/null'],
volume_mounts=[client.V1VolumeMount(mount_path="/{x}".format(x=pvc_name), name=pvc_name)]
)
],
volumes=[client.V1Volume(name=pvc_name, persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name))],
)
)
api_instance.create_namespaced_pod(namespace, body)
logger.info("Succesfully created pod {pod}".format(pod=pod_name))
[docs]
def _delete_pod(api_instance: client.CoreV1Api, namespace: str, pod_name: str) -> None:
"""Delete the temporary pod used to upload files to the pvc
Parameters
----------
api_instance: kubernetes client
Kubernetes client
namespace: str
Namespace to use for creating pvc
pod_name: str
Name of the pod to delete
Returns
-------
None
None
"""
all_pods = _get_list_of_pods(api_instance, namespace)
if pod_name not in all_pods:
logger.warning(f"Pod {pod_name} not found in namespace {namespace}. Skipping deletion.")
return
api_instance.delete_namespaced_pod(name=pod_name, namespace=namespace)
logger.info("Successfully deleted pod {pod}.".format(pod=pod_name))
[docs]
def _is_running(api_instance: client.CoreV1Api, pod_name: str, namespace: str, timeout: int) -> bool:
"""Check if the pod is done initializing and is running
Parameters
----------
api_instance: kubernetes client
Kubernetes client
pod_name: str
Name of the pod to check
namespace: str
Namespace to use for creating pvc
timeout: int
Time to wait for pod to reach running state until raising error
Returns
-------
bool
True if the pod is done initializing, False otherwise
"""
# Watch and streaming solutions didn't work so we use this for now ):
# (this snippet is also present in the official examples)
# decorators for timeout didn't work because function is unpicklable
while True:
resp = api_instance.read_namespaced_pod(name=pod_name, namespace=namespace)
if resp.status.phase == 'Running':
return True
sleep(1)
timeout -= 1
if timeout == 0:
raise TimeoutError("Timeout waiting for pod to be in running state.")
[docs]
def _wait_for_running_state(api_instance: client.CoreV1Api, namespace: str, pod_name: str, timeout: int) -> bool:
"""Wait for the pod to be in running state, else timeout after 60 seconds
Parameters
----------
api_instance: kubernetes client
Kubernetes client
namespace: str
Namespace to use for creating pvc
pod_name: str
Name of the pod to check
timeout:
Time to wait for pod to reach running state until raising error
Returns
-------
bool
True if the pod is done initializing, else raise TimeoutError
"""
try:
_is_running(api_instance, pod_name, namespace, timeout)
except TimeoutError:
logger.error("Timeout error: Pod {pod} failed to start in {t} seconds".format(pod=pod_name, t=timeout))
raise TimeoutError(f"Pod {pod_name} failed to start.")