使用 Python 运行作业

使用 Python 以编程方式运行 Kueue 作业

本指南适用于批处理用户,他们对使用 Python 与 Kubernetes 交互有基本了解。有关更多信息,请参阅Kueue 概述

在开始之前

查看 管理集群配额 了解初始集群设置的详细信息。您还需要安装 kubernetes python。我们建议使用虚拟环境。

python -m venv env
source env/bin/activate
pip install kubernetes requests

请注意,以下版本用于开发这些示例

  • Python: 3.9.12
  • kubernetes: 26.1.0
  • requests: 2.31.0

您可以按照 安装说明 安装 Kueue,或使用以下安装示例。

Python 中的 Kueue

Kueue 的核心是一个 自定义资源 的控制器,因此要从 Python 与它交互,我们不需要自定义 SDK,而是可以使用 Kubernetes Python 库提供的通用函数。在本指南中,我们提供了多种以这种方式与 Kueue 交互的示例。如果您想请求一个新示例或希望获得特定用例的帮助,请 提交问题

示例

以下示例演示了在 Python 中使用 Kueue 的不同用例。

安装 Kueue

此示例演示了将 Kueue 安装到现有集群。您可以将此脚本保存到本地计算机,保存为 install-kueue-queues.py

#!/usr/bin/env python3

from kubernetes import utils, config, client
import tempfile
import requests
import argparse

# install-kueue-queues.py will:
# 1. install queue from the latest or a specific version on GitHub
# This example will demonstrate installing Kueue and applying a YAML file (local) to install Kueue

# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client


def get_parser():
    parser = argparse.ArgumentParser(
        description="Submit Kueue Job Example",
        formatter_class=argparse.RawTextHelpFormatter,
    )
    parser.add_argument(
        "--version",
        help="Version of Kueue to install (if undefined, will install from master branch)",
        default=None,
    )
    return parser


def main():
    """
    Install Kueue and the Queue components.

    This will error if they are already installed.
    """
    parser = get_parser()
    args, _ = parser.parse_known_args()
    install_kueue(args.version)


def get_install_url(version):
    """
    Get the install version.

    If a version is specified, use it. Otherwise install
    from the main branch.
    """
    if version is not None:
        return f"https://github.com/kubernetes-sigs/kueue/releases/download/v{version}/manifests.yaml"
    return "https://github.com/kubernetes-sigs/kueue/config/default?ref=main"


def install_kueue(version):
    """
    Install Kueue of a particular version.
    """
    print("⭐️ Installing Kueue...")
    url = get_install_url(version)
    with tempfile.NamedTemporaryFile(delete=True) as install_yaml:
        res = requests.get(url)
        assert res.status_code == 200
        install_yaml.write(res.content)
        utils.create_from_yaml(api_client, install_yaml.name)


if __name__ == "__main__":
    main()

然后按如下方式运行

python install-kueue-queues.py 
⭐️ Installing Kueue...
⭐️ Applying queues from single-clusterqueue-setup.yaml...

您还可以针对特定版本

python install-kueue-queues.py --version v0.8.0

示例作业

对于下一个示例,我们从安装了 Kueue 的集群开始,并首先创建我们的队列

#!/usr/bin/env python3

import argparse
from kubernetes import config, client

# create_job.py
# This example will demonstrate full steps to submit a Job.

# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client


def get_parser():
    parser = argparse.ArgumentParser(
        description="Submit Kueue Job Example",
        formatter_class=argparse.RawTextHelpFormatter,
    )
    parser.add_argument(
        "--job-name",
        help="generateName field to set for job",
        default="sample-job-",
    )
    parser.add_argument(
        "--image",
        help="container image to use",
        default="gcr.io/k8s-staging-perf-tests/sleep:v0.1.0",
    )
    parser.add_argument(
        "--args",
        nargs="+",
        help="args for container",
        default=["30s"],
    )
    return parser


def generate_job_crd(job_name, image, args):
    """
    Generate an equivalent job CRD to sample-job.yaml
    """
    metadata = client.V1ObjectMeta(
        generate_name=job_name, labels={"kueue.x-k8s.io/queue-name": "user-queue"}
    )

    # Job container
    container = client.V1Container(
        image=image,
        name="dummy-job",
        args=args,
        resources={
            "requests": {
                "cpu": 1,
                "memory": "200Mi",
            }
        },
    )

    # Job template
    template = {"spec": {"containers": [container], "restartPolicy": "Never"}}
    return client.V1Job(
        api_version="batch/v1",
        kind="Job",
        metadata=metadata,
        spec=client.V1JobSpec(
            parallelism=1, completions=3, suspend=True, template=template
        ),
    )


def main():
    """
    Run a job.
    """
    parser = get_parser()
    args, _ = parser.parse_known_args()

    # Generate a CRD spec
    crd = generate_job_crd(args.job_name, args.image, args.args)
    batch_api = client.BatchV1Api()
    print(f"📦️ Container image selected is {args.image}...")
    print(f"⭐️ Creating sample job with prefix {args.job_name}...")
    batch_api.create_namespaced_job("default", crd)
    print(
        'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get jobs" to see jobs'
    )


if __name__ == "__main__":
    main()

并按如下方式运行

python sample-job.py
📦️ Container image selected is gcr.io/k8s-staging-perf-tests/sleep:v0.1.0...
⭐️ Creating sample job with prefix sample-job-...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs

或尝试更改作业的名称 (generateName)

python sample-job.py --job-name sleep-job-
📦️ Container image selected is gcr.io/k8s-staging-perf-tests/sleep:v0.1.0...
⭐️ Creating sample job with prefix sleep-job-...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs

您还可以使用 --image 更改容器映像,并使用 --args 更改参数。有关更多自定义信息,您可以编辑示例脚本。

与队列和作业交互

如果您正在开发一个提交作业并需要与作业交互和检查作业的应用程序,您可能希望直接与队列或作业交互。在运行上述示例后,您可以测试以下示例以与结果交互。将以下内容写入名为 sample-queue-control.py 的脚本。

#!/usr/bin/env python3

import argparse
from kubernetes import config, client

# sample-queue-control.py
# This will show how to interact with queues

# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client


def get_parser():
    parser = argparse.ArgumentParser(
        description="Interact with Queues e",
        formatter_class=argparse.RawTextHelpFormatter,
    )
    parser.add_argument(
        "--namespace",
        help="namespace to list for",
        default="default",
    )
    return parser


def main():
    """
    Get a listing of jobs in the queue
    """
    parser = get_parser()
    args, _ = parser.parse_known_args()

    listing = crd_api.list_namespaced_custom_object(
        group="kueue.x-k8s.io",
        version="v1beta1",
        namespace=args.namespace,
        plural="localqueues",
    )
    list_queues(listing)

    listing = crd_api.list_namespaced_custom_object(
        group="batch",
        version="v1",
        namespace=args.namespace,
        plural="jobs",
    )
    list_jobs(listing)


def list_jobs(listing):
    """
    Iterate and show job metadata.
    """
    if not listing:
        print("💼️ There are no jobs.")
        return

    print("\n💼️ Jobs")
    for job in listing["items"]:
        jobname = job["metadata"]["name"]
        status = (
            "TBA" if "succeeded" not in job["status"] else job["status"]["succeeded"]
        )
        ready = job["status"]["ready"]
        print(f"Found job {jobname}")
        print(f"  Succeeded: {status}")
        print(f"  Ready: {ready}")


def list_queues(listing):
    """
    Helper function to iterate over and list queues.
    """
    if not listing:
        print("⛑️  There are no queues.")
        return

    print("\n⛑️  Local Queues")

    # This is listing queues
    for q in listing["items"]:
        print(f'Found queue {q["metadata"]["name"]}')
        print(f"  Admitted workloads: {q['status']['admittedWorkloads']}")
        print(f"  Pending workloads: {q['status']['pendingWorkloads']}")

        # And flavors with resources
        for f in q["status"]["flavorUsage"]:
            print(f'  Flavor {f["name"]} has resources {f["resources"]}')


if __name__ == "__main__":
    main()

为了使输出更有趣,我们可以先运行一些随机作业

python sample-job.py
python sample-job.py
python sample-job.py --job-name tacos

然后运行脚本以查看队列和您之前提交的示例作业。

python sample-queue-control.py
⛑️  Local Queues
Found queue user-queue
  Admitted workloads: 3
  Pending workloads: 0
  Flavor default-flavor has resources [{'name': 'cpu', 'total': '3'}, {'name': 'memory', 'total': '600Mi'}]

💼️ Jobs
Found job sample-job-8n5sb
  Succeeded: 3
  Ready: 0
Found job sample-job-gnxtl
  Succeeded: 1
  Ready: 0
Found job tacos46bqw
  Succeeded: 1
  Ready: 1

如果您想将作业筛选到特定队列,您可以通过 `job[“metadata”][“labels”][“kueue.x-k8s.io/queue-name”]’ 下的作业标签来执行此操作。要按名称列出特定作业,您可以执行

from kubernetes import client, config

# Interact with batch
config.load_kube_config()
batch_api = client.BatchV1Api()

# This is providing the name, and namespace
job = batch_api.read_namespaced_job("tacos46bqw", "default")
print(job)

请参阅 BatchV1 API 文档以获取更多调用。

Flux Operator 作业

对于此示例,我们将使用 Flux Operator 提交作业,并专门使用 Python SDK 轻松完成此操作。鉴于我们在 设置 中创建的 Python 环境,我们可以按如下方式直接安装此 Python SDK

pip install fluxoperator

我们还需要 安装 Flux operator

kubectl apply -f https://raw.githubusercontent.com/flux-framework/flux-operator/main/examples/dist/flux-operator.yaml

将以下脚本写入 sample-flux-operator-job.py

#!/usr/bin/env python3

import argparse
from kubernetes import config, client
import fluxoperator.models as models

# sample-flux-operator.py
# This example will demonstrate full steps to submit a Job via the Flux Operator.

# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client


def get_parser():
    parser = argparse.ArgumentParser(
        description="Submit Kueue Flux Operator Job Example",
        formatter_class=argparse.RawTextHelpFormatter,
    )
    parser.add_argument(
        "--job-name",
        help="generateName field to set for job (job prefix does not work here)",
        default="hello-world",
    )
    parser.add_argument(
        "--image",
        help="container image to use",
        default="ghcr.io/flux-framework/flux-restful-api",
    )
    parser.add_argument(
        "--tasks",
        help="Number of tasks",
        default=1,
        type=int,
    )
    parser.add_argument(
        "--quiet",
        help="Do not show extra flux output (only hello worlds!)",
        action="store_true",
        default=False,
    )

    parser.add_argument(
        "--command",
        help="command to run",
        default="echo",
    )
    parser.add_argument(
        "--args", nargs="+", help="args for container", default=["hello", "world"]
    )
    return parser


def generate_minicluster_crd(job_name, image, command, args, quiet=False, tasks=1):
    """
    Generate a minicluster CRD
    """
    container = models.MiniClusterContainer(
        command=command + " " + " ".join(args),
        resources={
            "limits": {
                "cpu": 1,
                "memory": "2Gi",
            }
        },
    )

    # 4 pods and 4 tasks will echo hello-world x 4
    spec = models.MiniClusterSpec(
        job_labels={"kueue.x-k8s.io/queue-name": "user-queue"},
        containers=[container],
        size=4,
        tasks=tasks,
        logging={"quiet": quiet},
    )

    return models.MiniCluster(
        kind="MiniCluster",
        api_version="flux-framework.org/v1alpha1",
        metadata=client.V1ObjectMeta(
            generate_name=job_name,
            namespace="default",
        ),
        spec=spec,
    )


def main():
    """
    Run an example job using the Flux Operator.
    """
    parser = get_parser()
    args, _ = parser.parse_known_args()

    # Generate a CRD spec
    minicluster = generate_minicluster_crd(
        args.job_name, args.image, args.command, args.args, args.quiet, args.tasks
    )
    crd_api = client.CustomObjectsApi()

    print(f"📦️ Container image selected is {args.image}...")
    print(f"⭐️ Creating sample job with prefix {args.job_name}...")
    crd_api.create_namespaced_custom_object(
        group="flux-framework.org",
        version="v1alpha1",
        namespace="default",
        plural="miniclusters",
        body=minicluster,
    )
    print(
        'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get pods" to see pods'
    )


if __name__ == "__main__":
    main()

现在尝试运行示例

python sample-flux-operator-job.py
📦️ Container image selected is ghcr.io/flux-framework/flux-restful-api...
⭐️ Creating sample job with prefix hello-world...
Use:
"kubectl get queue" to see queue assignment
"kubectl get pods" to see pods

您几乎可以立即看到 MiniCluster 作业已提交到本地队列

kubectl get queue
NAME         CLUSTERQUEUE    PENDING WORKLOADS   ADMITTED WORKLOADS
user-queue   cluster-queue   0                   1

4 个 Pod 正在运行(我们正在创建一个具有 4 个节点的网络集群)

kubectl get pods
NAME                       READY   STATUS      RESTARTS   AGE
hello-world7qgqd-0-wp596   1/1     Running     0          7s
hello-world7qgqd-1-d7r87   1/1     Running     0          7s
hello-world7qgqd-2-rfn4t   1/1     Running     0          7s
hello-world7qgqd-3-blvtn   1/1     Running     0          7s

如果您查看主代理 pod 的日志(上面作业的索引 0),会有很多用于调试的输出,并且您可以在末尾看到运行的“hello world”

kubectl logs hello-world7qgqd-0-wp596 
Flux Operator 主导代理输出
🌀 Submit Mode: flux start -o --config /etc/flux/config -Scron.directory=/etc/flux/system/cron.d   -Stbon.fanout=256   -Srundir=/run/flux    -Sstatedir=/var/lib/flux   -Slocal-uri=local:///run/flux/local     -Slog-stderr-level=6    -Slog-stderr-mode=local  flux submit  -n 1 --quiet  --watch echo hello world
broker.info[0]: start: none->join 0.399725ms
broker.info[0]: parent-none: join->init 0.030894ms
cron.info[0]: synchronizing cron tasks to event heartbeat.pulse
job-manager.info[0]: restart: 0 jobs
job-manager.info[0]: restart: 0 running jobs
job-manager.info[0]: restart: checkpoint.job-manager not found
broker.info[0]: rc1.0: running /etc/flux/rc1.d/01-sched-fluxion
sched-fluxion-resource.info[0]: version 0.27.0-15-gc90fbcc2
sched-fluxion-resource.warning[0]: create_reader: allowlist unsupported
sched-fluxion-resource.info[0]: populate_resource_db: loaded resources from core's resource.acquire
sched-fluxion-qmanager.info[0]: version 0.27.0-15-gc90fbcc2
broker.info[0]: rc1.0: running /etc/flux/rc1.d/02-cron
broker.info[0]: rc1.0: /etc/flux/rc1 Exited (rc=0) 0.5s
broker.info[0]: rc1-success: init->quorum 0.485239s
broker.info[0]: online: hello-world7qgqd-0 (ranks 0)
broker.info[0]: online: hello-world7qgqd-[0-3] (ranks 0-3)
broker.info[0]: quorum-full: quorum->run 0.354587s
hello world
broker.info[0]: rc2.0: flux submit -n 1 --quiet --watch echo hello world Exited (rc=0) 0.3s
broker.info[0]: rc2-success: run->cleanup 0.308392s
broker.info[0]: cleanup.0: flux queue stop --quiet --all --nocheckpoint Exited (rc=0) 0.1s
broker.info[0]: cleanup.1: flux cancel --user=all --quiet --states RUN Exited (rc=0) 0.1s
broker.info[0]: cleanup.2: flux queue idle --quiet Exited (rc=0) 0.1s
broker.info[0]: cleanup-success: cleanup->shutdown 0.252899s
broker.info[0]: children-complete: shutdown->finalize 47.6699ms
broker.info[0]: rc3.0: running /etc/flux/rc3.d/01-sched-fluxion
broker.info[0]: rc3.0: /etc/flux/rc3 Exited (rc=0) 0.2s
broker.info[0]: rc3-success: finalize->goodbye 0.212425s
broker.info[0]: goodbye: goodbye->exit 0.06917ms

如果您提交并要求执行四项任务,您将看到“hello world”四次

python sample-flux-operator-job.py --tasks 4
...
broker.info[0]: quorum-full: quorum->run 23.5812s
hello world
hello world
hello world
hello world

您可以进一步自定义作业,并可以在 Flux Operator 问题讨论区 上提问。最后,有关如何在 Python 之外的 YAML 中执行此操作的说明,请参阅 运行 Flux MiniCluster

MPI Operator 作业

对于此示例,我们将使用 MPI Operator 提交作业,并特别使用 Python SDK 来轻松完成此操作。鉴于我们在 设置 中创建的 Python 环境,我们可以按照以下方法直接将此 Python SDK 安装到其中

git clone --depth 1 https://github.com/kubeflow/mpi-operator /tmp/mpijob
cd /tmp/mpijob/sdk/python/v2beta1
python setup.py install
cd -

重要的是,为了让此操作奏效,必须在 Kueue 之前安装 MPI Operator!让我们从一个新的 Kind 集群开始。我们还需要 安装 MPI operator 和 Kueue。在此,我们安装与此示例一起测试的确切版本

kubectl apply -f https://github.com/kubeflow/mpi-operator/releases/download/v0.4.0/mpi-operator.yaml
kubectl apply -f https://github.com/kubernetes-sigs/kueue/releases/download/v0.4.0/manifests.yaml

查看 mpi-operator 发布页面Kueue 发布页面 以获取其他版本。您需要等到 Kueue 准备就绪。您可以按照以下方法确定这一点

# Wait until you see all pods in the kueue-system are Running
kubectl get pods -n kueue-system

当 Kueue 准备就绪时

kubectl apply -f https://raw.githubusercontent.com/kubernetes-sigs/kueue/main/site/static/examples/admin/single-clusterqueue-setup.yaml

现在尝试运行示例 MPI 作业。

python sample-mpijob.py
📦️ Container image selected is mpioperator/mpi-pi:openmpi...
⭐️ Creating sample job with prefix pi...
Use:
"kubectl get queue" to see queue assignment
"kubectl get jobs" to see jobs
#!/usr/bin/env python3

import argparse
from kubernetes import config, client
import mpijob.models as models

# sample-mpijob.py
# This example will demonstrate full steps to submit a Job via the MPI Operator

# Make sure your cluster is running!
config.load_kube_config()
crd_api = client.CustomObjectsApi()
api_client = crd_api.api_client


def get_parser():
    parser = argparse.ArgumentParser(
        description="Submit Kueue MPI Operator Job Example",
        formatter_class=argparse.RawTextHelpFormatter,
    )
    parser.add_argument(
        "--job-name",
        help="generateName field to set for job (job prefix does not work here)",
        default="pi",
    )
    parser.add_argument(
        "--image",
        help="container image to use",
        default="mpioperator/mpi-pi:openmpi",
    )
    parser.add_argument(
        "--command",
        help="command to run",
        default="mpirun",
    )
    parser.add_argument(
        "--args",
        nargs="+",
        help="args for container",
        default=["-n", "2", "/home/mpiuser/pi"],
    )
    return parser


def generate_job_crd(job_name, image, command, args):
    """
    Generate an equivalent job CRD to sample-job.yaml
    """
    metadata = client.V1ObjectMeta(
        name=job_name, labels={"kueue.x-k8s.io/queue-name": "user-queue"}
    )

    # containers for launcher and worker
    launcher_container = client.V1Container(
        image=image,
        name="mpi-launcher",
        command=[command],
        args=args,
        security_context=client.V1SecurityContext(run_as_user=1000),
        resources={
            "limits": {
                "cpu": 1,
                "memory": "1Gi",
            }
        },
    )

    worker_container = client.V1Container(
        image=image,
        name="mpi-worker",
        command=["/usr/sbin/sshd"],
        args=["-De", "-f", "/home/mpiuser/.sshd_config"],
        security_context=client.V1SecurityContext(run_as_user=1000),
        resources={
            "limits": {
                "cpu": 1,
                "memory": "1Gi",
            }
        },
    )

    # Create the Launcher and worker replica specs
    launcher = models.V2beta1ReplicaSpec(
        replicas=1,
        template=client.V1PodTemplateSpec(
            spec=client.V1PodSpec(containers=[launcher_container])
        ),
    )

    worker = models.V2beta1ReplicaSpec(
        replicas=2,
        template=client.V1PodTemplateSpec(
            spec=client.V1PodSpec(containers=[worker_container])
        ),
    )

    # runPolicy for jobspec
    policy = models.V2beta1RunPolicy(
        clean_pod_policy="Running", ttl_seconds_after_finished=60
    )

    # Create the jobspec
    jobspec = models.V2beta1MPIJobSpec(
        slots_per_worker=1,
        run_policy=policy,
        ssh_auth_mount_path="/home/mpiuser/.ssh",
        mpi_replica_specs={"Launcher": launcher, "Worker": worker},
    )
    return models.V2beta1MPIJob(
        metadata=metadata,
        api_version="kubeflow.org/v2beta1",
        kind="MPIJob",
        spec=jobspec,
    )


def main():
    """
    Run an MPI job. This requires the MPI Operator to be installed.
    """
    parser = get_parser()
    args, _ = parser.parse_known_args()

    # Generate a CRD spec
    crd = generate_job_crd(args.job_name, args.image, args.command, args.args)
    crd_api = client.CustomObjectsApi()

    print(f"📦️ Container image selected is {args.image}...")
    print(f"⭐️ Creating sample job with prefix {args.job_name}...")
    crd_api.create_namespaced_custom_object(
        group="kubeflow.org",
        version="v2beta1",
        namespace="default",
        plural="mpijobs",
        body=crd,
    )
    print(
        'Use:\n"kubectl get queue" to see queue assignment\n"kubectl get jobs" to see jobs'
    )


if __name__ == "__main__":
    main()

提交后,您可以看到队列有一个已承认的工作负载!

$ kubectl get queue
NAME         CLUSTERQUEUE    PENDING WORKLOADS   ADMITTED WORKLOADS
user-queue   cluster-queue   0                   1

并且作业“pi-launcher”已经启动

$ kubectl get jobs
NAME          COMPLETIONS   DURATION   AGE
pi-launcher   0/1           9s         9s

MPI Operator 通过一个与节点通过 ssh 交互的中央启动器来工作。我们可以检查一个工作器和启动器,以了解两者如何工作

$ kubectl logs pods/pi-worker-1 
Server listening on 0.0.0.0 port 22.
Server listening on :: port 22.
Accepted publickey for mpiuser from 10.244.0.8 port 51694 ssh2: ECDSA SHA256:rgZdwufXolOkUPA1w0bf780BNJC8e4/FivJb1/F7OOI
Received disconnect from 10.244.0.8 port 51694:11: disconnected by user
Disconnected from user mpiuser 10.244.0.8 port 51694
Received signal 15; terminating.

作业相当快,并且我们可以在启动器中看到 pi 的输出

$ kubectl logs pods/pi-launcher-f4gqv 
Warning: Permanently added 'pi-worker-0.pi-worker.default.svc,10.244.0.7' (ECDSA) to the list of known hosts.
Warning: Permanently added 'pi-worker-1.pi-worker.default.svc,10.244.0.9' (ECDSA) to the list of known hosts.
Rank 1 on host pi-worker-1
Workers: 2
Rank 0 on host pi-worker-0
pi is approximately 3.1410376000000002

这看起来像 pi!🎉️🥧️如果您有兴趣在 Python 之外的 YAML 中运行此示例,请参阅 运行 MPIJob