Published 2022-09-22


Oz Tiram
Senior Software Engineer @ Spectro Cloud

This article originally appeared as the cover story in the July 2022 edition of
entwickler magazine (in German)


Kubernetes is de facto the platform for deploying applications using Linux
containers. Originally developed by Google, to deploy web applications on a
cluster of computers, it is now open source code. The developers of Kubernetes
allowed extending the API of Kubernetes from very early versions, and today
Kubernetes can deploy more than just Linux containers. It can deploy Virtual
Machines, using KubeVirt, FreeBSD Jails, and even whole Kubernetes clusters
using the Cluster API.

Very early on in the Kubernetes developers realized that allowing to extend
Kubernetes is key to successful adoption. Version 1.7 added the ability to
define ThirdPartyResource, which allowed extending Kubernetes. These were later
named CustomResourceDefinition in version 1.8 and onward.

While Golang is the dominant language in the Kubernetes ecosystem, nothing stops
you from writing components in other languages, as long as they fulfill the API.
For example, one can replace runc, written in Go, with crun, which is written in
C, as both implement the OCI Container Runtime specifications. Or you can
replace the Kubelet with Krustlet, written in Rust, as it fulfills the Kubelet

Hopefully, this is enough to convince you that you do not need to know Golang to
extend Kubernetes. In this article, we will see how to extend the Kubernetes API
and schedule our own workloads based on these newly defined API extensions using

If you are reading this magazine, there is a high chance you already know Python
or at least interested in it. But that is not all, Python is the most popular
language according to some surveys, and it is also very popular in introductory
programming courses in Universities. Hence, using Python to extend Kubernetes is
likely to be easy for you and other team members who might already know Python
and are less likely to know Go.

By now, you already know that Kubernetes can schedule more than just Containers,
and that you can extend the API with your own Custom Resource Definition.
However, you might still ponder with the question: what is an Operator?

An operator is a collection of domain specific custom resources and a controller
program to react to changes to the cluster or these specific resources. For
example, an operator can watch certain annotations on Pods or Deployments, and
manipulate objects inside or outside the cluster when these annotations are
detected. This is how CertManager or ExternalDNS work for example. Specifically,
when you create an annotation on an Ingress, there is a chain of actions, which
is triggered inside and outside the cluster. Along the process a certificate
request is sent to LetEncrypt and if authenticated successfully a new secret
containing a certificate is created and used to secure access to the Ingress
with HTTPS. The key message here is that: an operator can watch Kubernetes
objects, built-in or custom, and act on objects, which can be external or
internal to the cluster, bringing them to a desired state (See Fig. 1).


To develop a Kubernetes operator, you will need access to a working cluster. If
you do not have one, it’s easy to spawn a cluster using minikube.

$ minikube start -p entwickler.de --driver docker 😄 [entwickler.de] minikube
v1.25.2 on Gentoo 2.8 ✨ Using the docker driver based on user configuration 👍
Starting control plane node entwickler.de in cluster entwickler.de 🚜 Pulling
base image ... 🔥 Creating docker container (CPUs=2, Memory=2848MB) ... 🐳
Preparing Kubernetes v1.23.3 on Docker 20.10.12 ... ▪
kubelet.housekeeping-interval=5m ▪ Generating certificates and keys ... ▪
Booting up control plane ... ▪ Configuring RBAC rules ... 🔎 Verifying
Kubernetes components... ▪ Using image
gcr.io/k8s-minikube/storage-provisioner:v5 🌟 Enabled addons:
storage-provisioner, default-storageclass 🏄 Done! kubectl is now configured to
use "entwickler.de" cluster and "default" namespace by default

Now that you have your cluster running, you can verify that it is working
properly by doing:

$ kubectl get nodes


entwickler.de Ready control-plane,master 6m27s v1.23.3


We are going to write a small chaos engineering operator, which will produce
havoc in the cluster. As with all operators, it needs a name. The blackadder is
named after the Chaotic Baron in the BBC Sitcom with the same name.

Chaos Engineering is the discipline of experimenting on a system in order to
build confidence in the system’s capability to withstand turbulent conditions in
production. There are some pretty mature operators which can create havoc in
Kubernetes. Don’t dare running this operator in a Production cluster, unless of
course all your applications are already cloud native and chaos resistant. Our
operator will randomly kill pods and write garbage inside ConfigMaps. In
addition, it will scale deployments to many replicas randomly.

To write a Kubernetes Operator, we can use the official Python client or any
other alternative client, or any Python library can communicate with the
kube-api-server via HTTP. For this article,

I will be using pykube-ng, which is self-described as a lightweight client
library for the Kubernetes API. Personally, I like using it, because it feels
more pythonic than the official Python Client for Kubernetes.

We start by creating a CustomResourceDefinition:

$ cat k8s/blackadder-v1alpha1.yml apiVersion: apiextensions.k8s.io/v1 kind:
CustomResourceDefinition metadata: name: chaosagents.blackadder.io spec: group:
blackadder.io scope: Cluster # a CRD can also be Namespaced names: plural:
chaosagents singular: chaosagent kind: ChaosAgent shortNames: - ca versions: -
name: v1alpha1 # you can serve multiple versions e.g v1beta2 or v1alpha1 served:
true storage: true schema: openAPIV3Schema: type: object properties: spec: type:
object properties: tantrumMode: type: boolean podTolerance: type: integer
additionalPrinterColumns: - name: Tantrum type: boolean description: Kills Pods
randomly jsonPath: .spec.tantrumMode - name: Tolerance type: integer
description: Total number of Pod to tolerate before randomly killing Pods
jsonPath: .spec.podTolerance

We apply this CRD manifest:

$ kubectl apply -f k8s/blackadder-v1alpha1.yml
customresourcedefinition.apiextensions.k8s.io/chaosagents.blackadder.io created

We then create an example chaos agent with:

$ cat k8s/edmund.yml apiVersion: blackadder.io/v1alpha1 kind: ChaosAgent
metadata: name: princeedmund spec: tantrumMode: true podTolerance: 10

When apply this manifest and then view it, we see:

$ kubectl apply -f k8s/edmund.yml chaosagent.blackadder.io/princeedmund1 created
$ kubectl get chaosagents.blackadder.io NAME TANTRUM TOLERANCE princeedmund true

We can add more toggles and switches and definitions to our
CustomResourceDefintion until it looks like this:

$ kubectl get chaosagents.blackadder.io NAME TANTRUM TOLERANCE CANCER IPSUM
EAGERNESS PAUSE EXCLUDED princeedmund true 10 false true 20 30 ["kube-system"]

Note also, that CustomResourceDefinition can do various type input validation,
for example we can define eagerness as an integer ranging from 1 to 100:

$ kubectl apply -f k8s/edmund-v1beta1.yml The ChaosAgent "princeedmund" is
invalid: spec.eagerness: Invalid value: 200: spec.eagerness in body should be
less than or equal to 100

You can find the complete definition of the chaosagents.blackadder.io and the
manifests for creating agents in the source code repository accompanying this


We have now created a new resource that is stored in Kubernetes and served by
kube-api-server. Hence, we can now create the controller logic. We begin by
drafting an algorithm in pseudocode which will explain the intended behavior of
the chaos agent:

client = connect_to_kubernetes() # retrieves our agent configuration from the
kube-api-server chaos_agent = client.get_chaos_agent() while True: pods =
client.list_pods(exclude_namespaces) deployments =
client.list_deployments(exclude_namespaces) namespaces =
client.list_configmaps(exclude_namespaces) if chaos_agent.tantrum:
randomly_kill_pods(pods, chaos_agent.tolerance, chaos_agent.eagerness) if
chaos_agent.cancer: randomly_scale_deployments(deployments,
chaos_agent.eagerness) if chaos_agent.ipsum:
randomly_write_configmaps(configmaps, chaos_agent.eagerness)

The algorithm is pretty naive, but it’s got all the basics of a Kubernetes
operator. Obviously,

if you put it under a magnifying glass, there are a lot of obvious possible
improvements. For example, for a cluster with hundreds of ConfigMaps and Pods
each cycle can take a long while to complete, especially if cancer mode, which
randomly scales up Deployments, is also active. However, we are not in the
business of premature optimization, so we’ll ignore these limitations, and
continue to the actual implementation in Python.

The first thing we need to do, is to get a Kubernetes client so that we can
communicate with kube-api-server:

import pykube # automatically detect load in cluster token from #
/run/secrets/kubernetes.io/serviceaccount/token in cluster or # from
~/.kube/config config = pykube.KubeConfig.from_env() api =

With the client we just created, it’s easy to list objects stored in the
Kubernetes database. First, let’s create a couple of Pods and a Deployment:

$ kubectl run --image docker.io/nginx test -n kube-public $ kubectl run --image
docker.io/nginx test -n default $ kubectl create deployment my-dep --image=nginx

Then we can list them using the interactive Python console, which is extremely
handy for discovering the API while prototyping:

$ python -m pykube Pykube v22.7.0, loaded "/home/oznt/.kube/config" with context
"etwickler.de". Example commands: [d.name for d in Deployment.objects(api)] #
get names of deployments in default namespace list(DaemonSet.objects(api,
namespace='kube-system')) # list daemonsets in "kube-system"
Pod.objects(api).get_by_name('mypod').labels # labels of pod "mypod" Use Ctrl-D
to exit >>> [f"{p.namespace}/{p.name}" for p in Pod.objects(api,
namespace=pykube.all) if p.namespace not in ["kube-system"]]
['default/my-dep-84885b44-29vg7', 'default/my-dep-84885b44-l5nkw',
'default/my-dep-84885b44-p2gcd', 'default/test', 'kube-public/test']

The API is very intuitive! Hence, getting ConfigMaps and Deployments is similar.
We can also filter objects by labels, or metadata, which we will see later.
However, pykube-ng does not have predefined objects for listing ChaosAgents.
Using an object factory, we can create such object:

>>> ChaosAgent = pykube.object_factory(api, "blackadder.io/v1beta1",
"ChaosAgent") >>> ChaosAgent.objects(api, namespace=pykube.all) <Query of
ChaosAgent at 0x7f4997df18d0> >>> list(ChaosAgent.objects(api,
namespace=pykube.all)) [<ChaosAgent princeedmund1>] >>> # To access the
configuration of princeedmund1 we can do >>> ca = list(ChaosAgent.objects(api,
namespace=pykube.all))[0] >>> ca.obj["spec"] {'cancerMode': True, 'eagerness':
20, 'excludedNamespaces':['kube-system'], 'ipsumMode': True, 'pauseDuration':
30, 'podTolerance': 10, 'tantrumMode': True} 'pauseDuration': 30,
'podTolerance': 10, 'tantrumMode': True} >>> # Personally, I find access
attributes easier than dict keys, hence: >>> ca = munch.munchify(ca.obj) >>>
caobj.spec.tantrumMode True

We now have all that is needed to create the complete controller of a

import time import pykube import munch config = pykube.KubeConfig.from_env() api
= pykube.HTTPClient(config) ChaosAgent = pykube.object_factory(api,
"blackadder.io/v1beta1", "ChaosAgent") # retrieves our agent configuration from
the kube-api-server agent = ChaosAgent().objects(api, namespace=pykube.all)
agent.config = munch.munchify(agent.obj["spec"]) exclude_namespaces =
agent.config.excludedNamespaces def randomly_kill_pods(pods, tolerance,
eagerness): pass def randomly_scale_deployments(deployments, eagerness): pass
def randomly_write_configmaps(configmaps, eagerness): pass while True: pods =
api.list_pods(exclude_namespaces) deployments =
api.list_deployments(exclude_namespaces) configmaps =
api.list_configmaps(exclude_namespaces) if agent.config.tantrumMode:
randomly_kill_pods(pods, agent.config.tolerance, agent.config.eagerness) if
agent.config.cancerMode: randomly_scale_deployments(deployments,
agent.config.eagerness) if agent.config.ipsumMode:
randomly_write_configmaps(configmaps, agent.config.eagerness)

Note, the api instance does not have a method for listing objects with the
signature I wrote. There are two ways we can filter objects. The first, naive
one, is like I already showed:

>>> [p for p in Pod.objects(api, namespace=pykube.all) if p.namespace not in

This works and might be easy to read if you are versed in Python and like list
comprehensions. However, it sends a lot of data between the _kube-api-server and
the client. The better way is to select the objects based on field selectors
sent to the server:

>>> list(Pod.objects(api).filter(namespace=pykube.all,

We can repeat this for Deployments and ConfigMaps, but we should not. Instead,
we create a generic method and add it to the client instance. We are leveraging
on Python’s dynamic types to modify objects at runtime, which can be fun but
also dangerous:

from pykube import Pod, Deployment, ConfigMap ... def list_objects(self,
k8s_obj, exclude_namespaces): exclude_namespaces =
",".join("metadata.namespace!=" + ns for ns in exclude_namespaces) return list(
field_selector=exclude_namespaces )) ... pykube.HTTPClient.list_objects =
list_objects api = pykube.HTTPClient(config) ... while True: pods =
api.list_objects(Pod, exclude_namespaces) deployments =
api.list_objects(Deployment, exclude_namespaces) configmaps =
api.list_objects(ConfigMap, exclude_namespaces) ...

The next steps are implementing the chaos functions. We begin with pod deletion:

def randomly_kill_pods(pods, tolerance, eagerness): if len(pods) < tolerance:
return for p in pods: if random.randint(0, 100) < eagerness: p.delete()
print(f"Deleted {p.namespace}/{p.name}")

The next chaos function is deployment scaling:

def randomly_scale_deployments(deployments, eagerness): for d in deployments: if
random.randint(0, 100) < eagerness: while True: try: d.replicas = if d.replicas
< 128: d.replicas = min(d.replicas * 2, 128) d.update() print(f"scaled
{d.namespace}/{d.name} to {d.replicas}") break except
(requests.exceptions.HTTPError, pykube.exceptions.HTTPError): print(f"error
scaling {d.namespace}/{d.name} to {d.replicas}") d.reload() continue

Finally, we implement the function which writes Lorem Ipsum snippets:

def randomly_write_configmaps(configmaps, eagerness): for cm in configmaps:
print(f"Checking {cm.namespace}/{cm.name}") if cm.obj.get("immutable"): continue
if random.randint(0, 100) < eagerness: for k, v in cm.obj["data"].items():
cm.obj["data"][k] = lorem.paragraph() print(f"Lorem Ipsum in

With that, the controller code is complete. You can view the complete code in
controller.py in the code repository. If you are a seasoned Pythonista, you
probably think, that this code can be tremendously improved, or that iterating
over a large set of objects is a perfect use case for ConcurrentFutures or
aysncio, you are probably right! However, these are optimizations which will
mask the purpose of learning Kubernetes Operators. Hence, I will not demonstrate
this here, and leave to you as an exercise.

If you followed until now and tried running controller.py, you will see output
similar to this:

$ python controller.py Deleted default/my-dep-84885b44-bjg4t Deleted
default/my-dep-84885b44-ljvdn ... Lorem Impsum in
kube-node-lease/kube-root-ca.crt ... scaled default/my-dep to 4 ...

This works on the local shell with your admin configuration file, which was
created for you by minikube. When you deploy the controller to the cluster you
will need to give the controller permissions to list, patch and delete Pod,
Deployment and ConfigMap objects.

Before we examine how to create these permissions, we create a Docker image and
deploy the controller to the cluster.

The Dockerfile is using a multistage build and pipenv to manage dependency

FROM docker.io/python:3.10 AS builder RUN pip install --user pipenv # Tell
pipenv to create venv in the current directory ENV PIPENV_VENV_IN_PROJECT=1 ADD
Pipfile.lock Pipfile /usr/src/ WORKDIR /usr/src RUN /root/.local/bin/pipenv sync
RUN /usr/src/.venv/bin/python3 -c "import pykube; print(pykube.__version__)"
FROM docker.io/python:3.10 AS runtime RUN mkdir -v /usr/src/venv COPY
--from=builder /usr/src/.venv/ /usr/src/venv/ RUN /usr/src/venv/bin/python3 -c
"import pykube; print(pykube.__version__)" WORKDIR /usr/src/ COPY controller.py
. CMD ["./venv/bin/python", "-u", "controller.py"]

Then, you need to build the image and push it to a public or private repository:

$ docker build -t oz123/blackadder:0.1 . Sending build context to Docker daemon
166.4kB Step 1/13 : FROM docker.io/python:3.10 AS builder 3.10: Pulling from
library/python 1339eaac5b67: Pull complete 4c78fa1b9799: Pull complete

$ docker push oz123/blackadder:0.1 The push refers to repository
[docker.io/oz123/blackadder] 2ce87cdce319: Pushed 645d7db6379e: Pushing
[==================================================>] 17.26MB 3c924eba81b8:
Pushed ...

After that, create a namespace for the controller to be deployed in:

$ kubectl create namespace chaos-operator namespace/chaos-operator created

Before we deploy the controller, we should exclude that namespace from the list
of watched namespaces:

$ kubectl patch chaosagents.blackadder.io princeedmund \ --type merge
--patch='{"spec": {"excludedNamespaces": ["kube-system", "chaos-operator"]}}'
chaosagent.blackadder.io/princeedmund patched

Now, we can create the deployment for the chaos controller:

$ kubectl create deployment blackadder –image=oz123/blackadder:0.1 --replicas=1
\ -n chaos-operator deployment.apps/blackadder created

When you look at the logs of the container, you will see it crashed:

$ kubectl logs -n chaos-operator blackadder-65bc54f7f9-v56bp Traceback (most
recent call last): File "/usr/src/controller.py", line 35, in <module> agent =
list(ChaosAgent.objects(api, namespace=pykube.all))[0] File
"/usr/src/venv/lib/python3.10/site-packages/pykube/query.py", line 195, in
__iter__ return iter(self.query_cache["objects"]) File
"/usr/src/venv/lib/python3.10/site-packages/pykube/query.py", line 185, in
query_cache cache["response"] = self.execute().json() File
"/usr/src/venv/lib/python3.10/site-packages/pykube/query.py", line 160, in
execute r.raise_for_status() File
"/usr/src/venv/lib/python3.10/site-packages/requests/models.py", line 1021, in
raise_for_status raise HTTPError(http_error_msg, response=self)
Requests.exceptions.HTTPError : 403 Client Error: Forbidden for url:

That is because the service account for the namespace has no permissions to list
ChaosAgent objects.

To fix this, we need to define a ClusterRole and ClusterRoleBinding and assign
them to the user which runs the controller.

apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name:
blackadder rules: - apiGroups: ["apps"] resources: ["deployments"] verbs:
["get", "list", "patch"] - apiGroups: ["blackadder.io"] resources:
["chaosagents"] verbs: ["get", "list"] - apiGroups: [""] resources: ["pods"]
verbs: ["get", "list", "delete"] - apiGroups: [""] resources: ["configmaps"]
verbs: ["get", "list", "patch"]

The ClusterRoleBinding is defined with:

apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata:
name: blackadder roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole
name: blackadder subjects: - apiGroup: rbac.authorization.k8s.io kind: User
name: system:serviceaccount:chaos-operator:default

We apply these manifests:

$ kubectl apply -f k8s/clusterrole.yml
clusterrole.rbac.authorization.k8s.io/blackadder created $ kubectl apply -f
clusterrolebinding.rbac.authorization.k8s.io/blackadder created

Once you restart the Pod, you will see it running and doing its job. Note that,
in the final version of the controller, the while True loop is moved into a main
function, such that the code looks like this:

# this is docker label oz123/blackadder:0.1.1 def main(): while True: pods =
api.list_objects(Pod, exclude_namespaces) deployments =
api.list_objects(Deployment, exclude_namespaces) configmaps =
api.list_objects(ConfigMap, exclude_namespaces) ... if __name__ == "__main__":
print("This is the blackadder version 0.1") print("Ready to start a havoc in
your cluster") main()

When you watch the controller logs, you will see:

$ kubectl logs -n chaos-operator blackadder-7695b89559-8q4qp This is the
blackadder version 0.1.1 Ready to start a havoc in your cluster Checking
default/kube-root-ca.crt Checking kube-node-lease/kube-root-ca.crt Checking
kube-public/cluster-info ...

With that, we are finished implementing the chaos controller part of the
Blackadder Operator.


We have seen that it is easy to create Kubernetes Operators with Python.
Creating Operators allows us to extend Kubernetes in ways that fit our needs,
and which the original developers of Kubernetes might have not thought of. We
can create controller logic for anything that Python can access inside the
cluster or outside the cluster, and by using a CustomResourceDefinition we can
store information in the Kubernetes database, which can be used to configure the
Operator, or save data on the objects the controller works on.

I truly hope you enjoyed reading this article and that you are feeling
comfortable enough to embark on your own journey to extend Kubernetes with your
own Python Operators.

Oz Tiram
Senior Software Engineer @ Spectro Cloud


