使用 Python 运行作业
本指南适用于批处理用户,他们对使用 Python 与 Kubernetes 交互有基本了解。有关更多信息,请参阅Kueue 概述。
在开始之前
查看 管理集群配额 了解初始集群设置的详细信息。您还需要安装 kubernetes python。我们建议使用虚拟环境。
python -m venv env
source env/bin/activate
pip install kubernetes requests
请注意,以下版本用于开发这些示例
- Python: 3.9.12
- kubernetes: 26.1.0
- requests: 2.31.0
您可以按照 安装说明 安装 Kueue,或使用以下安装示例。
Python 中的 Kueue
Kueue 的核心是一个 自定义资源 的控制器,因此要从 Python 与它交互,我们不需要自定义 SDK,而是可以使用 Kubernetes Python 库提供的通用函数。在本指南中,我们提供了多种以这种方式与 Kueue 交互的示例。如果您想请求一个新示例或希望获得特定用例的帮助,请 提交问题。
示例
以下示例演示了在 Python 中使用 Kueue 的不同用例。
安装 Kueue
此示例演示了将 Kueue 安装到现有集群。您可以将此脚本保存到本地计算机,保存为 install-kueue-queues.py
。
#!/usr/bin/env python3
from kubernetes import utils, config, client
import tempfile
import requests
import argparse
# install-kueue-queues.py will:
# 1. install queue from the latest or a specific version on GitHub
# This example will demonstrate installing Kueue and applying a YAML file (local) to install Kueue
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--version",
help="Version of Kueue to install (if undefined, will install from master branch)",
default=None,
)
return parser
def main():
"""
Install Kueue and the Queue components.
This will error if they are already installed.
"""
parser = get_parser()
args, _ = parser.parse_known_args()
install_kueue(args.version)
def get_install_url(version):
"""
Get the install version.
If a version is specified, use it. Otherwise install
from the main branch.
"""
if version is not None:
return f"https://github.com/kubernetes-sigs/kueue/releases/download/v{version}/manifests.yaml"
return "https://github.com/kubernetes-sigs/kueue/config/default?ref=main"
def install_kueue(version):
"""
Install Kueue of a particular version.
"""
print("⭐️ Installing Kueue...")
url = get_install_url(version)
with tempfile.NamedTemporaryFile(delete=True) as install_yaml:
res = requests.get(url)
assert res.status_code == 200
install_yaml.write(res.content)
utils.create_from_yaml(api_client, install_yaml.name)
if __name__ == "__main__":
main()
然后按如下方式运行
python install-kueue-queues.py
⭐️ Installing Kueue...
⭐️ Applying queues from single-clusterqueue-setup.yaml...
您还可以针对特定版本
python install-kueue-queues.py --version v0.8.0
示例作业
对于下一个示例,我们从安装了 Kueue 的集群开始,并首先创建我们的队列
#!/usr/bin/env python3
import argparse
from kubernetes import config, client
# create_job.py
# This example will demonstrate full steps to submit a Job.
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--job-name",
help="generateName field to set for job",
default="sample-job-",
)
parser.add_argument(
"--image",
help="container image to use",
default="gcr.io/k8s-staging-perf-tests/sleep:v0.1.0",
)
parser.add_argument(
"--args",
nargs="+",
help="args for container",
default=["30s"],
)
return parser
def generate_job_crd(job_name, image, args):
"""
Generate an equivalent job CRD to sample-job.yaml
"""
metadata = client.V1ObjectMeta(
generate_name=job_name, labels={"kueue.x-k8s.io/queue-name": "user-queue"}
)
# Job container
container = client.V1Container(
image=image,
name="dummy-job",
args=args,
resources={
"requests": {
"cpu": 1,
"memory": "200Mi",
}
},
)
# Job template
template = {"spec": {"containers": [container], "restartPolicy": "Never"}}
return client.V1Job(
api_version="batch/v1",
kind="Job",
metadata=metadata,
spec=client.V1JobSpec(
parallelism=1, completions=3, suspend=True, template=template
),
)
def main():
"""
Run a job.
"""
parser = get_parser()
args, _ = parser.parse_known_args()
# Generate a CRD spec
crd = generate_job_crd(args.job_name, args.image, args.args)
batch_api = client.BatchV1Api()
print(f"📦️ Container image selected is {args.image}...")
print(f"⭐️ Creating sample job with prefix {args.job_name}...")
batch_api.create_namespaced_job("default", crd)
print(
'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get jobs" to see jobs'
)
if __name__ == "__main__":
main()
并按如下方式运行
python sample-job.py
📦️ Container image selected is gcr.io/k8s-staging-perf-tests/sleep:v0.1.0...
⭐️ Creating sample job with prefix sample-job-...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs
或尝试更改作业的名称 (generateName
)
python sample-job.py --job-name sleep-job-
📦️ Container image selected is gcr.io/k8s-staging-perf-tests/sleep:v0.1.0...
⭐️ Creating sample job with prefix sleep-job-...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs
您还可以使用 --image
更改容器映像,并使用 --args
更改参数。有关更多自定义信息,您可以编辑示例脚本。
与队列和作业交互
如果您正在开发一个提交作业并需要与作业交互和检查作业的应用程序,您可能希望直接与队列或作业交互。在运行上述示例后,您可以测试以下示例以与结果交互。将以下内容写入名为 sample-queue-control.py
的脚本。
#!/usr/bin/env python3
import argparse
from kubernetes import config, client
# sample-queue-control.py
# This will show how to interact with queues
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Interact with Queues e",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--namespace",
help="namespace to list for",
default="default",
)
return parser
def main():
"""
Get a listing of jobs in the queue
"""
parser = get_parser()
args, _ = parser.parse_known_args()
listing = crd_api.list_namespaced_custom_object(
group="kueue.x-k8s.io",
version="v1beta1",
namespace=args.namespace,
plural="localqueues",
)
list_queues(listing)
listing = crd_api.list_namespaced_custom_object(
group="batch",
version="v1",
namespace=args.namespace,
plural="jobs",
)
list_jobs(listing)
def list_jobs(listing):
"""
Iterate and show job metadata.
"""
if not listing:
print("💼️ There are no jobs.")
return
print("\n💼️ Jobs")
for job in listing["items"]:
jobname = job["metadata"]["name"]
status = (
"TBA" if "succeeded" not in job["status"] else job["status"]["succeeded"]
)
ready = job["status"]["ready"]
print(f"Found job {jobname}")
print(f" Succeeded: {status}")
print(f" Ready: {ready}")
def list_queues(listing):
"""
Helper function to iterate over and list queues.
"""
if not listing:
print("⛑️ There are no queues.")
return
print("\n⛑️ Local Queues")
# This is listing queues
for q in listing["items"]:
print(f'Found queue {q["metadata"]["name"]}')
print(f" Admitted workloads: {q['status']['admittedWorkloads']}")
print(f" Pending workloads: {q['status']['pendingWorkloads']}")
# And flavors with resources
for f in q["status"]["flavorUsage"]:
print(f' Flavor {f["name"]} has resources {f["resources"]}')
if __name__ == "__main__":
main()
为了使输出更有趣,我们可以先运行一些随机作业
python sample-job.py
python sample-job.py
python sample-job.py --job-name tacos
然后运行脚本以查看队列和您之前提交的示例作业。
python sample-queue-control.py
⛑️ Local Queues
Found queue user-queue
Admitted workloads: 3
Pending workloads: 0
Flavor default-flavor has resources [{'name': 'cpu', 'total': '3'}, {'name': 'memory', 'total': '600Mi'}]
💼️ Jobs
Found job sample-job-8n5sb
Succeeded: 3
Ready: 0
Found job sample-job-gnxtl
Succeeded: 1
Ready: 0
Found job tacos46bqw
Succeeded: 1
Ready: 1
如果您想将作业筛选到特定队列,您可以通过 `job[“metadata”][“labels”][“kueue.x-k8s.io/queue-name”]’ 下的作业标签来执行此操作。要按名称列出特定作业,您可以执行
from kubernetes import client, config
# Interact with batch
config.load_kube_config()
batch_api = client.BatchV1Api()
# This is providing the name, and namespace
job = batch_api.read_namespaced_job("tacos46bqw", "default")
print(job)
请参阅 BatchV1 API 文档以获取更多调用。
Flux Operator 作业
对于此示例,我们将使用 Flux Operator 提交作业,并专门使用 Python SDK 轻松完成此操作。鉴于我们在 设置 中创建的 Python 环境,我们可以按如下方式直接安装此 Python SDK
pip install fluxoperator
我们还需要 安装 Flux operator。
kubectl apply -f https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml
将以下脚本写入 sample-flux-operator-job.py
#!/usr/bin/env python3
import argparse
from kubernetes import config, client
import fluxoperator.models as models
# sample-flux-operator.py
# This example will demonstrate full steps to submit a Job via the Flux Operator.
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue Flux Operator Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--job-name",
help="generateName field to set for job (job prefix does not work here)",
default="hello-world",
)
parser.add_argument(
"--image",
help="container image to use",
default="ghcr.io/flux-framework/flux-restful-api",
)
parser.add_argument(
"--tasks",
help="Number of tasks",
default=1,
type=int,
)
parser.add_argument(
"--quiet",
help="Do not show extra flux output (only hello worlds!)",
action="store_true",
default=False,
)
parser.add_argument(
"--command",
help="command to run",
default="echo",
)
parser.add_argument(
"--args", nargs="+", help="args for container", default=["hello", "world"]
)
return parser
def generate_minicluster_crd(job_name, image, command, args, quiet=False, tasks=1):
"""
Generate a minicluster CRD
"""
container = models.MiniClusterContainer(
command=command + " " + " ".join(args),
resources={
"limits": {
"cpu": 1,
"memory": "2Gi",
}
},
)
# 4 pods and 4 tasks will echo hello-world x 4
spec = models.MiniClusterSpec(
job_labels={"kueue.x-k8s.io/queue-name": "user-queue"},
containers=[container],
size=4,
tasks=tasks,
logging={"quiet": quiet},
)
return models.MiniCluster(
kind="MiniCluster",
api_version="flux-framework.org/v1alpha1",
metadata=client.V1ObjectMeta(
generate_name=job_name,
namespace="default",
),
spec=spec,
)
def main():
"""
Run an example job using the Flux Operator.
"""
parser = get_parser()
args, _ = parser.parse_known_args()
# Generate a CRD spec
minicluster = generate_minicluster_crd(
args.job_name, args.image, args.command, args.args, args.quiet, args.tasks
)
crd_api = client.CustomObjectsApi()
print(f"📦️ Container image selected is {args.image}...")
print(f"⭐️ Creating sample job with prefix {args.job_name}...")
crd_api.create_namespaced_custom_object(
group="flux-framework.org",
version="v1alpha1",
namespace="default",
plural="miniclusters",
body=minicluster,
)
print(
'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get pods" to see pods'
)
if __name__ == "__main__":
main()
现在尝试运行示例
python sample-flux-operator-job.py
📦️ Container image selected is ghcr.io/flux-framework/flux-restful-api...
⭐️ Creating sample job with prefix hello-world...
Use:
"kubectl get queue" to see queue assignment
"kubectl get pods" to see pods
您几乎可以立即看到 MiniCluster 作业已提交到本地队列
kubectl get queue
NAME CLUSTERQUEUE PENDING WORKLOADS ADMITTED WORKLOADS
user-queue cluster-queue 0 1
4 个 Pod 正在运行(我们正在创建一个具有 4 个节点的网络集群)
kubectl get pods
NAME READY STATUS RESTARTS AGE
hello-world7qgqd-0-wp596 1/1 Running 0 7s
hello-world7qgqd-1-d7r87 1/1 Running 0 7s
hello-world7qgqd-2-rfn4t 1/1 Running 0 7s
hello-world7qgqd-3-blvtn 1/1 Running 0 7s
如果您查看主代理 pod 的日志(上面作业的索引 0),会有很多用于调试的输出,并且您可以在末尾看到运行的“hello world”
kubectl logs hello-world7qgqd-0-wp596
Flux Operator 主导代理输出
🌀 Submit Mode: flux start -o --config /etc/flux/config -Scron.directory=/etc/flux/system/cron.d -Stbon.fanout=256 -Srundir=/run/flux -Sstatedir=/var/lib/flux -Slocal-uri=local:///run/flux/local -Slog-stderr-level=6 -Slog-stderr-mode=local flux submit -n 1 --quiet --watch echo hello world
broker.info[0]: start: none->join 0.399725ms
broker.info[0]: parent-none: join->init 0.030894ms
cron.info[0]: synchronizing cron tasks to event heartbeat.pulse
job-manager.info[0]: restart: 0 jobs
job-manager.info[0]: restart: 0 running jobs
job-manager.info[0]: restart: checkpoint.job-manager not found
broker.info[0]: rc1.0: running /etc/flux/rc1.d/01-sched-fluxion
sched-fluxion-resource.info[0]: version 0.27.0-15-gc90fbcc2
sched-fluxion-resource.warning[0]: create_reader: allowlist unsupported
sched-fluxion-resource.info[0]: populate_resource_db: loaded resources from core's resource.acquire
sched-fluxion-qmanager.info[0]: version 0.27.0-15-gc90fbcc2
broker.info[0]: rc1.0: running /etc/flux/rc1.d/02-cron
broker.info[0]: rc1.0: /etc/flux/rc1 Exited (rc=0) 0.5s
broker.info[0]: rc1-success: init->quorum 0.485239s
broker.info[0]: online: hello-world7qgqd-0 (ranks 0)
broker.info[0]: online: hello-world7qgqd-[0-3] (ranks 0-3)
broker.info[0]: quorum-full: quorum->run 0.354587s
hello world
broker.info[0]: rc2.0: flux submit -n 1 --quiet --watch echo hello world Exited (rc=0) 0.3s
broker.info[0]: rc2-success: run->cleanup 0.308392s
broker.info[0]: cleanup.0: flux queue stop --quiet --all --nocheckpoint Exited (rc=0) 0.1s
broker.info[0]: cleanup.1: flux cancel --user=all --quiet --states RUN Exited (rc=0) 0.1s
broker.info[0]: cleanup.2: flux queue idle --quiet Exited (rc=0) 0.1s
broker.info[0]: cleanup-success: cleanup->shutdown 0.252899s
broker.info[0]: children-complete: shutdown->finalize 47.6699ms
broker.info[0]: rc3.0: running /etc/flux/rc3.d/01-sched-fluxion
broker.info[0]: rc3.0: /etc/flux/rc3 Exited (rc=0) 0.2s
broker.info[0]: rc3-success: finalize->goodbye 0.212425s
broker.info[0]: goodbye: goodbye->exit 0.06917ms
如果您提交并要求执行四项任务,您将看到“hello world”四次
python sample-flux-operator-job.py --tasks 4
...
broker.info[0]: quorum-full: quorum->run 23.5812s
hello world
hello world
hello world
hello world
您可以进一步自定义作业,并可以在 Flux Operator 问题讨论区 上提问。最后,有关如何在 Python 之外的 YAML 中执行此操作的说明,请参阅 运行 Flux MiniCluster。
MPI Operator 作业
对于此示例,我们将使用 MPI Operator 提交作业,并特别使用 Python SDK 来轻松完成此操作。鉴于我们在 设置 中创建的 Python 环境,我们可以按照以下方法直接将此 Python SDK 安装到其中
git clone --depth 1 https://github.com/kubeflow/mpi-operator /tmp/mpijob
cd /tmp/mpijob/sdk/python/v2beta1
python setup.py install
cd -
重要的是,为了让此操作奏效,必须在 Kueue 之前安装 MPI Operator!让我们从一个新的 Kind 集群开始。我们还需要 安装 MPI operator 和 Kueue。在此,我们安装与此示例一起测试的确切版本
kubectl apply -f https://github.com/kubeflow/mpi-operator/releases/download/v0.4.0/mpi-operator.yaml
kubectl apply -f https://github.com/kubernetes-sigs/kueue/releases/download/v0.4.0/manifests.yaml
查看 mpi-operator 发布页面 和 Kueue 发布页面 以获取其他版本。您需要等到 Kueue 准备就绪。您可以按照以下方法确定这一点
# Wait until you see all pods in the kueue-system are Running
kubectl get pods -n kueue-system
当 Kueue 准备就绪时
kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/kueue/main/site/static/examples/admin/single-clusterqueue-setup.yaml
现在尝试运行示例 MPI 作业。
python sample-mpijob.py
📦️ Container image selected is mpioperator/mpi-pi:openmpi...
⭐️ Creating sample job with prefix pi...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs
#!/usr/bin/env python3
import argparse
from kubernetes import config, client
import mpijob.models as models
# sample-mpijob.py
# This example will demonstrate full steps to submit a Job via the MPI Operator
# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client
def get_parser():
parser = argparse.ArgumentParser(
description="Submit Kueue MPI Operator Job Example",
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"--job-name",
help="generateName field to set for job (job prefix does not work here)",
default="pi",
)
parser.add_argument(
"--image",
help="container image to use",
default="mpioperator/mpi-pi:openmpi",
)
parser.add_argument(
"--command",
help="command to run",
default="mpirun",
)
parser.add_argument(
"--args",
nargs="+",
help="args for container",
default=["-n", "2", "/home/mpiuser/pi"],
)
return parser
def generate_job_crd(job_name, image, command, args):
"""
Generate an equivalent job CRD to sample-job.yaml
"""
metadata = client.V1ObjectMeta(
name=job_name, labels={"kueue.x-k8s.io/queue-name": "user-queue"}
)
# containers for launcher and worker
launcher_container = client.V1Container(
image=image,
name="mpi-launcher",
command=[command],
args=args,
security_context=client.V1SecurityContext(run_as_user=1000),
resources={
"limits": {
"cpu": 1,
"memory": "1Gi",
}
},
)
worker_container = client.V1Container(
image=image,
name="mpi-worker",
command=["/usr/sbin/sshd"],
args=["-De", "-f", "/home/mpiuser/.sshd_config"],
security_context=client.V1SecurityContext(run_as_user=1000),
resources={
"limits": {
"cpu": 1,
"memory": "1Gi",
}
},
)
# Create the Launcher and worker replica specs
launcher = models.V2beta1ReplicaSpec(
replicas=1,
template=client.V1PodTemplateSpec(
spec=client.V1PodSpec(containers=[launcher_container])
),
)
worker = models.V2beta1ReplicaSpec(
replicas=2,
template=client.V1PodTemplateSpec(
spec=client.V1PodSpec(containers=[worker_container])
),
)
# runPolicy for jobspec
policy = models.V2beta1RunPolicy(
clean_pod_policy="Running", ttl_seconds_after_finished=60
)
# Create the jobspec
jobspec = models.V2beta1MPIJobSpec(
slots_per_worker=1,
run_policy=policy,
ssh_auth_mount_path="/home/mpiuser/.ssh",
mpi_replica_specs={"Launcher": launcher, "Worker": worker},
)
return models.V2beta1MPIJob(
metadata=metadata,
api_version="kubeflow.org/v2beta1",
kind="MPIJob",
spec=jobspec,
)
def main():
"""
Run an MPI job. This requires the MPI Operator to be installed.
"""
parser = get_parser()
args, _ = parser.parse_known_args()
# Generate a CRD spec
crd = generate_job_crd(args.job_name, args.image, args.command, args.args)
crd_api = client.CustomObjectsApi()
print(f"📦️ Container image selected is {args.image}...")
print(f"⭐️ Creating sample job with prefix {args.job_name}...")
crd_api.create_namespaced_custom_object(
group="kubeflow.org",
version="v2beta1",
namespace="default",
plural="mpijobs",
body=crd,
)
print(
'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get jobs" to see jobs'
)
if __name__ == "__main__":
main()
提交后,您可以看到队列有一个已承认的工作负载!
$ kubectl get queue
NAME CLUSTERQUEUE PENDING WORKLOADS ADMITTED WORKLOADS
user-queue cluster-queue 0 1
并且作业“pi-launcher”已经启动
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
pi-launcher 0/1 9s 9s
MPI Operator 通过一个与节点通过 ssh 交互的中央启动器来工作。我们可以检查一个工作器和启动器,以了解两者如何工作
$ kubectl logs pods/pi-worker-1
Server listening on 0.0.0.0 port 22.
Server listening on :: port 22.
Accepted publickey for mpiuser from 10.244.0.8 port 51694 ssh2: ECDSA SHA256:rgZdwufXolOkUPA1w0bf780BNJC8e4/FivJb1/F7OOI
Received disconnect from 10.244.0.8 port 51694:11: disconnected by user
Disconnected from user mpiuser 10.244.0.8 port 51694
Received signal 15; terminating.
作业相当快,并且我们可以在启动器中看到 pi 的输出
$ kubectl logs pods/pi-launcher-f4gqv
Warning: Permanently added 'pi-worker-0.pi-worker.default.svc,10.244.0.7' (ECDSA) to the list of known hosts.
Warning: Permanently added 'pi-worker-1.pi-worker.default.svc,10.244.0.9' (ECDSA) to the list of known hosts.
Rank 1 on host pi-worker-1
Workers: 2
Rank 0 on host pi-worker-0
pi is approximately 3.1410376000000002
这看起来像 pi!🎉️🥧️如果您有兴趣在 Python 之外的 YAML 中运行此示例,请参阅 运行 MPIJob。