ChatOps isn't a new idea — GitHub's Hubot made it mainstream years ago. But most implementations I've seen are one-directional: your tools post into Slack, and that's it. You still SSH somewhere or run kubectl locally to actually do anything.
A two-way bot changes that. Events flow from Kubernetes into Slack. Operators respond with commands that trigger real Kubernetes actions. The chat channel becomes an audit log and a control plane at the same time. Done right, it's genuinely useful. Done wrong, it's a footgun pointed at your production cluster.
Here's how I set one up.
Inbound: Kubernetes Events Into Slack
The Kubernetes Python client has a watch API that lets you stream events as they happen in the cluster. You connect it to a Slack incoming webhook and you've got a live feed of cluster activity in your channel.
The events I care about most: pods entering CrashLoopBackOff, nodes going NotReady, OOMKilled containers, and deployment rollouts completing. Everything else is noise.
from kubernetes import client, config, watch
from slack_client import SlackClient
import threading
config.load_incluster_config() # running inside the cluster
v1 = client.CoreV1Api()
apps_v1 = client.AppsV1beta1Api()
SLACK_WEBHOOK = 'https://hooks.slack.com/services/T000/B000/xxxx'
SLACK_CHANNEL = '#ops-alerts'
SEVERITY_COLORS = {
'critical': '#FF0000',
'warning': '#FFA500',
'info': '#36A64F',
}
slack = SlackClient('') # webhook-only, no token needed for incoming
def post_to_slack(text, color='info', fields=None):
attachment = {
'color': SEVERITY_COLORS.get(color, '#36A64F'),
'text': text,
'mrkdwn_in': ['text'],
}
if fields:
attachment['fields'] = fields
import requests
requests.post(SLACK_WEBHOOK, json={
'channel': SLACK_CHANNEL,
'attachments': [attachment],
})
def watch_pods():
w = watch.Watch()
for event in w.stream(v1.list_pod_for_all_namespaces, timeout_seconds=0):
pod = event['object']
name = pod.metadata.name
namespace = pod.metadata.namespace
if not pod.status.container_statuses:
continue
for cs in pod.status.container_statuses:
# CrashLoopBackOff
if cs.state.waiting and cs.state.waiting.reason == 'CrashLoopBackOff':
post_to_slack(
text='*CrashLoopBackOff* detected',
color='critical',
fields=[
{'title': 'Pod', 'value': name, 'short': True},
{'title': 'Namespace', 'value': namespace, 'short': True},
{'title': 'Container', 'value': cs.name, 'short': True},
{'title': 'Restarts', 'value': str(cs.restart_count), 'short': True},
]
)
# OOMKilled
if cs.last_state.terminated and cs.last_state.terminated.reason == 'OOMKilled':
post_to_slack(
text='*OOMKilled* — container ran out of memory',
color='critical',
fields=[
{'title': 'Pod', 'value': name, 'short': True},
{'title': 'Namespace', 'value': namespace, 'short': True},
{'title': 'Container', 'value': cs.name, 'short': True},
]
)
def watch_nodes():
w = watch.Watch()
for event in w.stream(v1.list_node, timeout_seconds=0):
node = event['object']
node_name = node.metadata.name
for condition in node.status.conditions:
if condition.type == 'Ready' and condition.status == 'False':
post_to_slack(
text='*Node NotReady*',
color='critical',
fields=[{'title': 'Node', 'value': node_name, 'short': True}]
)
if __name__ == '__main__':
t1 = threading.Thread(target=watch_pods, daemon=True)
t2 = threading.Thread(target=watch_nodes, daemon=True)
t1.start()
t2.start()
t1.join()
t2.join()
The watch.Watch() stream reconnects automatically if the connection drops, which matters because you want this running continuously. Running it inside the cluster with load_incluster_config() is cleaner than managing external kubeconfig files.
Outbound: Slack Commands Into Kubernetes
This is the interesting half. The bot listens for messages mentioning it — either via the Slack RTM (Real Time Messaging) API or via slash commands over a Flask webhook. I went with RTM because it feels more natural in a channel.
The commands I implemented:
@opsbot scale deployment nginx 3— scale a deployment@opsbot pods default— list pods in a namespace@opsbot restart deployment api— rolling restart via annotation patch@opsbot logs api-pod-abc— fetch last 50 log lines
from slackclient import SlackClient
from kubernetes import client, config
import time
import re
config.load_incluster_config()
apps_v1 = client.AppsV1beta1Api()
core_v1 = client.CoreV1Api()
BOT_TOKEN = 'xoxb-your-bot-token'
BOT_ID = 'U0XXXXXXX'
slack = SlackClient(BOT_TOKEN)
def handle_command(command, channel):
parts = command.strip().split()
if not parts:
return
action = parts[0].lower()
response = 'Unknown command. Try: scale, pods, restart, logs'
try:
if action == 'scale' and len(parts) == 4:
_, deployment, namespace, replicas = parts
apps_v1.patch_namespaced_deployment_scale(
name=deployment,
namespace=namespace,
body={'spec': {'replicas': int(replicas)}}
)
response = 'Scaled `{}` to {} replicas in `{}`.'.format(
deployment, replicas, namespace)
elif action == 'pods' and len(parts) == 2:
namespace = parts[1]
pods = core_v1.list_namespaced_pod(namespace=namespace)
lines = []
for p in pods.items:
phase = p.status.phase
name = p.metadata.name
lines.append('{} — {}'.format(name, phase))
response = '```\n{}\n```'.format('\n'.join(lines)) if lines else 'No pods found.'
elif action == 'restart' and len(parts) == 3:
_, deployment, namespace = parts
import datetime
patch = {
'spec': {
'template': {
'metadata': {
'annotations': {
'kubectl.kubernetes.io/restartedAt':
datetime.datetime.utcnow().isoformat()
}
}
}
}
}
apps_v1.patch_namespaced_deployment(
name=deployment, namespace=namespace, body=patch)
response = 'Rolling restart triggered for `{}` in `{}`.'.format(
deployment, namespace)
elif action == 'logs' and len(parts) == 2:
pod_name = parts[1]
# default namespace — extend to parse namespace if needed
log = core_v1.read_namespaced_pod_log(
name=pod_name, namespace='default', tail_lines=50)
response = '```\n{}\n```'.format(log[-3000:]) # Slack message size limit
except Exception as e:
response = 'Error: {}'.format(str(e))
slack.api_call('chat.postMessage', channel=channel, text=response)
def parse_mention(text):
"""Strip the @opsbot mention and return the command."""
pattern = r'<@' + BOT_ID + r'>\s*(.*)'
match = re.match(pattern, text, re.DOTALL)
if match:
return match.group(1).strip()
return None
if slack.rtm_connect(with_team_state=False):
print('OpsBot connected to Slack RTM')
while True:
events = slack.rtm_read()
for event in events:
if event.get('type') == 'message' and 'text' in event:
command = parse_mention(event['text'])
if command:
handle_command(command, event['channel'])
time.sleep(0.5)
else:
print('RTM connection failed')
The RTM loop is simple: read events every 500ms, check if any are messages mentioning the bot, parse the command, dispatch it. The slackclient 1.x library's rtm_connect() and rtm_read() handle the WebSocket connection.
RBAC: Give the Bot Only What It Needs
This is non-negotiable. The bot runs as a pod in your cluster, which means it needs a ServiceAccount. Do not give it cluster-admin. Define exactly the permissions it needs and nothing more.
apiVersion: v1
kind: ServiceAccount
metadata:
name: opsbot
namespace: ops
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: opsbot-role
namespace: default
rules:
- apiGroups: [""]
resources: ["pods", "pods/log"]
verbs: ["get", "list", "watch"]
- apiGroups: ["apps", "extensions"]
resources: ["deployments", "deployments/scale"]
verbs: ["get", "list", "patch", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: opsbot-binding
namespace: default
subjects:
- kind: ServiceAccount
name: opsbot
namespace: ops
roleRef:
kind: Role
name: opsbot-role
apiGroup: rbac.authorization.k8s.io
This Role grants read access to pods and logs, and patch/update access to deployments. It does not grant node access, does not grant secret access, does not grant anything cluster-wide. If your bot only needs to operate in one namespace, keep the Role scoped to that namespace. Use ClusterRole only if you genuinely need cross-namespace visibility.
Running the Bot as a Deployment
Dog-food the platform. Run the bot inside the cluster it's managing.
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: opsbot
namespace: ops
spec:
replicas: 1
template:
metadata:
labels:
app: opsbot
spec:
serviceAccountName: opsbot
containers:
- name: opsbot
image: your-registry/opsbot:latest
env:
- name: SLACK_BOT_TOKEN
valueFrom:
secretKeyRef:
name: opsbot-secrets
key: bot-token
- name: SLACK_BOT_ID
value: "U0XXXXXXX"
- name: SLACK_WEBHOOK_URL
valueFrom:
secretKeyRef:
name: opsbot-secrets
key: webhook-url
resources:
requests:
cpu: 50m
memory: 64Mi
limits:
cpu: 200m
memory: 128Mi
One replica is fine — this isn't a high-availability workload. Store the Slack tokens in a Secret, not in the Deployment manifest. The serviceAccountName field wires up the RBAC permissions you defined.
Honest Take
Two-way bots are genuinely useful for day-to-day operations. Having @opsbot pods production available in Slack means people check pod status without needing local cluster access set up. The inbound alerting means CrashLoopBackOff events show up in a channel where the whole team sees them, not just whoever happens to be watching Nagios.
But unrestricted production access via chat commands is not something I'd run without approval workflows. The scale and restart commands above will execute immediately for anyone in the channel with no confirmation step. That's fine for a small team where everyone has context. It's not fine for a larger org where someone might accidentally scale production down to zero.
The right pattern is to add a confirmation step for destructive operations: the bot proposes the action, someone reacts with a checkmark emoji or types confirm, then it executes. Some teams go further and require approval from a second person. That's the ChatOps pattern done properly — not just "chat as a terminal," but chat as a collaborative, auditable control plane.
The code above is the foundation. Layer approvals on top before you point it at anything you care about.