Blog
April 5, 2017 Marie H.

Two-Way Slack Bots for Kubernetes Operations

Two-Way Slack Bots for Kubernetes Operations

Photo by <a href="https://unsplash.com/@scottwebb?utm_source=cloudista&utm_medium=referral" target="_blank" rel="noopener">Scott Webb</a> on <a href="https://unsplash.com/?utm_source=cloudista&utm_medium=referral" target="_blank" rel="noopener">Unsplash</a>

ChatOps isn't a new idea — GitHub's Hubot made it mainstream years ago. But most implementations I've seen are one-directional: your tools post into Slack, and that's it. You still SSH somewhere or run kubectl locally to actually do anything.

A two-way bot changes that. Events flow from Kubernetes into Slack. Operators respond with commands that trigger real Kubernetes actions. The chat channel becomes an audit log and a control plane at the same time. Done right, it's genuinely useful. Done wrong, it's a footgun pointed at your production cluster.

Here's how I set one up.

Inbound: Kubernetes Events Into Slack

The Kubernetes Python client has a watch API that lets you stream events as they happen in the cluster. You connect it to a Slack incoming webhook and you've got a live feed of cluster activity in your channel.

The events I care about most: pods entering CrashLoopBackOff, nodes going NotReady, OOMKilled containers, and deployment rollouts completing. Everything else is noise.

from kubernetes import client, config, watch
from slack_client import SlackClient
import threading

config.load_incluster_config()  # running inside the cluster
v1 = client.CoreV1Api()
apps_v1 = client.AppsV1beta1Api()

SLACK_WEBHOOK = 'https://hooks.slack.com/services/T000/B000/xxxx'
SLACK_CHANNEL = '#ops-alerts'

SEVERITY_COLORS = {
    'critical': '#FF0000',
    'warning': '#FFA500',
    'info': '#36A64F',
}

slack = SlackClient('')  # webhook-only, no token needed for incoming

def post_to_slack(text, color='info', fields=None):
    attachment = {
        'color': SEVERITY_COLORS.get(color, '#36A64F'),
        'text': text,
        'mrkdwn_in': ['text'],
    }
    if fields:
        attachment['fields'] = fields
    import requests
    requests.post(SLACK_WEBHOOK, json={
        'channel': SLACK_CHANNEL,
        'attachments': [attachment],
    })

def watch_pods():
    w = watch.Watch()
    for event in w.stream(v1.list_pod_for_all_namespaces, timeout_seconds=0):
        pod = event['object']
        name = pod.metadata.name
        namespace = pod.metadata.namespace

        if not pod.status.container_statuses:
            continue

        for cs in pod.status.container_statuses:
            # CrashLoopBackOff
            if cs.state.waiting and cs.state.waiting.reason == 'CrashLoopBackOff':
                post_to_slack(
                    text='*CrashLoopBackOff* detected',
                    color='critical',
                    fields=[
                        {'title': 'Pod', 'value': name, 'short': True},
                        {'title': 'Namespace', 'value': namespace, 'short': True},
                        {'title': 'Container', 'value': cs.name, 'short': True},
                        {'title': 'Restarts', 'value': str(cs.restart_count), 'short': True},
                    ]
                )
            # OOMKilled
            if cs.last_state.terminated and cs.last_state.terminated.reason == 'OOMKilled':
                post_to_slack(
                    text='*OOMKilled* — container ran out of memory',
                    color='critical',
                    fields=[
                        {'title': 'Pod', 'value': name, 'short': True},
                        {'title': 'Namespace', 'value': namespace, 'short': True},
                        {'title': 'Container', 'value': cs.name, 'short': True},
                    ]
                )

def watch_nodes():
    w = watch.Watch()
    for event in w.stream(v1.list_node, timeout_seconds=0):
        node = event['object']
        node_name = node.metadata.name
        for condition in node.status.conditions:
            if condition.type == 'Ready' and condition.status == 'False':
                post_to_slack(
                    text='*Node NotReady*',
                    color='critical',
                    fields=[{'title': 'Node', 'value': node_name, 'short': True}]
                )

if __name__ == '__main__':
    t1 = threading.Thread(target=watch_pods, daemon=True)
    t2 = threading.Thread(target=watch_nodes, daemon=True)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

The watch.Watch() stream reconnects automatically if the connection drops, which matters because you want this running continuously. Running it inside the cluster with load_incluster_config() is cleaner than managing external kubeconfig files.

Outbound: Slack Commands Into Kubernetes

This is the interesting half. The bot listens for messages mentioning it — either via the Slack RTM (Real Time Messaging) API or via slash commands over a Flask webhook. I went with RTM because it feels more natural in a channel.

The commands I implemented:

  • @opsbot scale deployment nginx 3 — scale a deployment
  • @opsbot pods default — list pods in a namespace
  • @opsbot restart deployment api — rolling restart via annotation patch
  • @opsbot logs api-pod-abc — fetch last 50 log lines
from slackclient import SlackClient
from kubernetes import client, config
import time
import re

config.load_incluster_config()
apps_v1 = client.AppsV1beta1Api()
core_v1 = client.CoreV1Api()

BOT_TOKEN = 'xoxb-your-bot-token'
BOT_ID = 'U0XXXXXXX'

slack = SlackClient(BOT_TOKEN)

def handle_command(command, channel):
    parts = command.strip().split()
    if not parts:
        return

    action = parts[0].lower()
    response = 'Unknown command. Try: scale, pods, restart, logs'

    try:
        if action == 'scale' and len(parts) == 4:
            _, deployment, namespace, replicas = parts
            apps_v1.patch_namespaced_deployment_scale(
                name=deployment,
                namespace=namespace,
                body={'spec': {'replicas': int(replicas)}}
            )
            response = 'Scaled `{}` to {} replicas in `{}`.'.format(
                deployment, replicas, namespace)

        elif action == 'pods' and len(parts) == 2:
            namespace = parts[1]
            pods = core_v1.list_namespaced_pod(namespace=namespace)
            lines = []
            for p in pods.items:
                phase = p.status.phase
                name = p.metadata.name
                lines.append('{} — {}'.format(name, phase))
            response = '```\n{}\n```'.format('\n'.join(lines)) if lines else 'No pods found.'

        elif action == 'restart' and len(parts) == 3:
            _, deployment, namespace = parts
            import datetime
            patch = {
                'spec': {
                    'template': {
                        'metadata': {
                            'annotations': {
                                'kubectl.kubernetes.io/restartedAt':
                                    datetime.datetime.utcnow().isoformat()
                            }
                        }
                    }
                }
            }
            apps_v1.patch_namespaced_deployment(
                name=deployment, namespace=namespace, body=patch)
            response = 'Rolling restart triggered for `{}` in `{}`.'.format(
                deployment, namespace)

        elif action == 'logs' and len(parts) == 2:
            pod_name = parts[1]
            # default namespace — extend to parse namespace if needed
            log = core_v1.read_namespaced_pod_log(
                name=pod_name, namespace='default', tail_lines=50)
            response = '```\n{}\n```'.format(log[-3000:])  # Slack message size limit

    except Exception as e:
        response = 'Error: {}'.format(str(e))

    slack.api_call('chat.postMessage', channel=channel, text=response)

def parse_mention(text):
    """Strip the @opsbot mention and return the command."""
    pattern = r'<@' + BOT_ID + r'>\s*(.*)'
    match = re.match(pattern, text, re.DOTALL)
    if match:
        return match.group(1).strip()
    return None

if slack.rtm_connect(with_team_state=False):
    print('OpsBot connected to Slack RTM')
    while True:
        events = slack.rtm_read()
        for event in events:
            if event.get('type') == 'message' and 'text' in event:
                command = parse_mention(event['text'])
                if command:
                    handle_command(command, event['channel'])
        time.sleep(0.5)
else:
    print('RTM connection failed')

The RTM loop is simple: read events every 500ms, check if any are messages mentioning the bot, parse the command, dispatch it. The slackclient 1.x library's rtm_connect() and rtm_read() handle the WebSocket connection.

RBAC: Give the Bot Only What It Needs

This is non-negotiable. The bot runs as a pod in your cluster, which means it needs a ServiceAccount. Do not give it cluster-admin. Define exactly the permissions it needs and nothing more.

apiVersion: v1
kind: ServiceAccount
metadata:
  name: opsbot
  namespace: ops
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: opsbot-role
  namespace: default
rules:
  - apiGroups: [""]
    resources: ["pods", "pods/log"]
    verbs: ["get", "list", "watch"]
  - apiGroups: ["apps", "extensions"]
    resources: ["deployments", "deployments/scale"]
    verbs: ["get", "list", "patch", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: opsbot-binding
  namespace: default
subjects:
  - kind: ServiceAccount
    name: opsbot
    namespace: ops
roleRef:
  kind: Role
  name: opsbot-role
  apiGroup: rbac.authorization.k8s.io

This Role grants read access to pods and logs, and patch/update access to deployments. It does not grant node access, does not grant secret access, does not grant anything cluster-wide. If your bot only needs to operate in one namespace, keep the Role scoped to that namespace. Use ClusterRole only if you genuinely need cross-namespace visibility.

Running the Bot as a Deployment

Dog-food the platform. Run the bot inside the cluster it's managing.

apiVersion: apps/v1beta1
kind: Deployment
metadata:
  name: opsbot
  namespace: ops
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: opsbot
    spec:
      serviceAccountName: opsbot
      containers:
        - name: opsbot
          image: your-registry/opsbot:latest
          env:
            - name: SLACK_BOT_TOKEN
              valueFrom:
                secretKeyRef:
                  name: opsbot-secrets
                  key: bot-token
            - name: SLACK_BOT_ID
              value: "U0XXXXXXX"
            - name: SLACK_WEBHOOK_URL
              valueFrom:
                secretKeyRef:
                  name: opsbot-secrets
                  key: webhook-url
          resources:
            requests:
              cpu: 50m
              memory: 64Mi
            limits:
              cpu: 200m
              memory: 128Mi

One replica is fine — this isn't a high-availability workload. Store the Slack tokens in a Secret, not in the Deployment manifest. The serviceAccountName field wires up the RBAC permissions you defined.

Honest Take

Two-way bots are genuinely useful for day-to-day operations. Having @opsbot pods production available in Slack means people check pod status without needing local cluster access set up. The inbound alerting means CrashLoopBackOff events show up in a channel where the whole team sees them, not just whoever happens to be watching Nagios.

But unrestricted production access via chat commands is not something I'd run without approval workflows. The scale and restart commands above will execute immediately for anyone in the channel with no confirmation step. That's fine for a small team where everyone has context. It's not fine for a larger org where someone might accidentally scale production down to zero.

The right pattern is to add a confirmation step for destructive operations: the bot proposes the action, someone reacts with a checkmark emoji or types confirm, then it executes. Some teams go further and require approval from a second person. That's the ChatOps pattern done properly — not just "chat as a terminal," but chat as a collaborative, auditable control plane.

The code above is the foundation. Layer approvals on top before you point it at anything you care about.