import io
import pathlib
import tarfile
from random import randint
from kubernetes import client, stream
from .utils import pod
from .utils.loggus import init_logger
logger = init_logger(__name__ if __name__ != "__main__" else pathlib.Path(__file__).stem, level="debug")
[docs]
def _check_pvc_exists(api_instance: client.CoreV1Api, namespace: str, pvc_name: str):
"""Create a persistent volume claim (pvc) if it does not exist
Parameters
----------
api_instance: kubernetes client
Kubernetes client
namespace: str
Kubernetes Namespace
pvc_name: str
Name used in the creation of the pvc
Returns
-------
Bool
Wether the PVC exists or not
"""
try:
pvcs = api_instance.list_namespaced_persistent_volume_claim(namespace=namespace)
except Exception as e:
logger.error("Could not list pvcs in namespace {n}. Erorr: {e}".format(n=namespace, e=e))
raise e
for pvc in pvcs.items:
if pvc.metadata.name == pvc_name:
return True
return False
[docs]
def _create_pvc(api_instance: client.CoreV1Api, namespace: str, pvc_name: str) -> None:
"""Create a persistent volume claim (pvc) if it does not exist
Parameters
----------
api_instance: kubernetes client
Kubernetes client
namespace: str
Kubernetes Namespace
pvc_name: str
Name used in the creation of the pvc
Returns
-------
None
None
"""
if _check_pvc_exists(api_instance, namespace, pvc_name):
logger.info(f"PVC {pvc_name} already exists in namespace {namespace}. Skipping creation.")
return
# create pvc
logger.info("Creating PVC {pvc} in namespace {namespace}.".format(pvc=pvc_name, namespace=namespace))
body = client.V1PersistentVolume(
api_version="v1",
kind="PersistentVolumeClaim",
metadata=client.V1ObjectMeta(name=pvc_name, namespace=namespace),
spec=client.V1PersistentVolumeClaimSpec(
access_modes=["ReadWriteMany"],
resources=client.V1ResourceRequirements(requests={"storage": "100Gi"}), #TODO: make this configurable
storage_class_name="rook-cephfs", #TODO: make this configurable
)
)
api_instance.create_namespaced_persistent_volume_claim(namespace=namespace, body=body)
logger.info("PVC {pvc} successfully created in namespace {namespace}".format(pvc=pvc_name, namespace=namespace))
[docs]
def _delete_pvc(api_instance: client.CoreV1Api, namespace: str, pvc_name: str) -> None:
"""Delete a persistent volume claim (pvc) if it exists
Parameters
----------
api_instance: kubernetes client
Kubernetes client
namespace: str
Kubernetes Namespace
pvc_name: str
Name used of the pvc to delete
Returns
-------
None
None
"""
# check if pvc already exists
if not _check_pvc_exists(api_instance, namespace, pvc_name):
logger.info(f"PVC {pvc_name} not found in namespace {namespace}. Skipping deletion.")
return
try:
_ = api_instance.delete_namespaced_persistent_volume_claim(
name=pvc_name,
namespace=namespace,
body=client.V1DeleteOptions(
propagation_policy='Foreground',
)
)
logger.info(f"PVC '{pvc_name}' deleted successfully.")
except Exception as e:
logger.error("Could not delete PVC {pvc_name}. Please delete manually! Error: {e}".format(pvc_name=pvc_name, e=e))
# taken from https://github.com/kubernetes-client/python/issues/476
# I don't know why there isn't a built-in function for this yet
[docs]
def _copy_files_to_pod(api_instance: client.CoreV1Api, namespace: str, pod_name: str, source_path: pathlib.Path, dest_path: pathlib.Path) -> None:
"""Copy files to the temporary pod
Parameters
----------
api_instance: kubernetes client
Kubernetes client
namespace: str
Namespace to use for creating pvc
pod_name: str
Name of the pod to copy files to
source_path: pathlib.Path
Path to the files to copy
dest_path: pathlib.Path
Mount path of the pvc in the pod
Returns
-------
None
None"""
buf = io.BytesIO()
with tarfile.open(fileobj=buf, mode='w:tar') as tar: # To compress set 'w:gz'
tar.add(source_path, arcname=dest_path.joinpath(source_path.name))
buf.seek(0)
exec_command = ['tar', 'xvf', '-', '-C', '/'] # To decompress set 'xzvf'
resp = stream.stream(api_instance.connect_get_namespaced_pod_exec, pod_name, namespace,
command=exec_command,
stderr=True, stdin=True,
stdout=True, tty=False,
_preload_content=False)
# copy data to pod in chunks to avoid issues with larger files
chunk_size = 10 * 1024 * 1024
while resp.is_open():
resp.update(timeout=1)
# if resp.peek_stdout():
# logger.debug(f"STDOUT: {resp.read_stdout()}")
if resp.peek_stderr():
logger.error(f"STDERR: {resp.read_stderr()}")
raise RuntimeError(f"Error copying files to pod: {resp.read_stderr()}")
if read := buf.read(chunk_size):
#logger.debug(f"Uploading chunk: {read}")
logger.debug(f"Uploading chunkof size: {len(read)}")
resp.write_stdin(read)
else:
break
resp.close()
logger.info("Files succesfully transfered to kube pvc!")
[docs]
def _update_file_dest_name(api_instance: client.CoreV1Api, namespace: str, pod_name: str, old_name: pathlib.Path, new_name: pathlib.Path) -> None:
"""Copy files to the temporary pod
Parameters
----------
api_instance: kubernetes client
Kubernetes client
namespace: str
Namespace to use for creating pvc
pod_name: str
Name of the pod to copy files to
old_name: str
Name of the directory where the files have been copied
new_name: str
Name of the new directory
Returns
-------
None
None"""
exec_command = ["/bin/sh", "-c", 'cp -R ./{o}/* ./{n} && rm -R ./{o}/*'.format(o=old_name, n=new_name)]
resp = stream.stream(api_instance.connect_get_namespaced_pod_exec, pod_name, namespace,
command=exec_command,
stderr=True, stdin=True,
stdout=True, tty=False,
_preload_content=False)
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
print(f"STDOUT: \n{resp.read_stdout()}")
if resp.peek_stderr():
print(f"STDERR: \n{resp.read_stderr()}")
resp.close()
logger.info("Files directory successfully renamed to avoid conflicts!")
[docs]
def upload_files_to_pvc(namespace: str, prefix: str, pvc: str, files_path: list[(pathlib.Path, pathlib.Path)]) -> str:
"""Upload model and data files to the pvc
Parameters
----------
kube: kubernetes client
Kubernetes client
pvc: str
Name of pvc to use for data and model storage
namespace: str
Namespace to use for creating pvc
Returns
-------
None
None
"""
api_instance = client.CoreV1Api()
pvc_name = "{x}-{y}".format(x=prefix, y=pvc) if prefix else pvc
pod_name = "{x}-seaquest-tmp-uploader".format(x=prefix)
pod_name = pod.make_pod_name_unique(api_instance, namespace, pod_name)
try:
_create_pvc(api_instance, namespace, pvc_name)
logger.info("Creating pod to move data to pvc ...")
pod._launch_pod(api_instance, namespace, pod_name, pvc_name)
pod._wait_for_running_state(api_instance, namespace, pod_name, 120) #TODO: make this configurable
logger.info("Attempting to upload files to pvc ...")
for (file_path, dest_dir) in files_path:
dest_path = pathlib.PurePosixPath(pvc_name)
if dest_dir is not None:
dest_path = dest_path.joinpath(dest_dir)
_copy_files_to_pod(api_instance, namespace, pod_name, file_path, dest_path)
# change dir name to avoid conflicts in case of multiple jobs with the same model and different config (e.g. different data files)
# model_dir in _generate_runner_config should also be changed if this part changes
new_name = pathlib.PurePosixPath(pvc_name).joinpath("_".join([files_path[1][1], files_path[1][0].stem]))
_update_file_dest_name(api_instance, namespace, pod_name, dest_path, new_name)
except Exception as e: # TODO: handle stuff here
pod._delete_pod(api_instance, namespace, pod_name)
logger.error(e)
raise e
pod._delete_pod(api_instance, namespace, pod_name)
return pvc_name