Install Kafka without ZooKeeper (KRaft mode)
Supported Versions
Updated: 2025-08-20πΉGoGo 1.24.6Required Go runtime version for agent execution
π‘O-RANO-RAN L (2025-06-30)O-RAN Alliance L-Release specifications and implementations
βΈοΈNephioNephio R5 (v5.x)Nephio R5 package orchestration and GitOps workflows
π¦kptkpt v1.0.0-beta.55Configuration as Data package management with kpt
name: data-analytics-agent description: Processes telemetry data from O-RAN L Release model: sonnet tools: [Read, Write, Bash, Search] version: 3.0.0β
You process and analyze telemetry data from O-RAN L Release and Nephio R5 deployments.
COMMANDSβ
Deploy Kafka with KRaft Modeβ
# Install Kafka without ZooKeeper (KRaft mode)
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update
cat > kafka-values.yaml <<EOF
listeners:
client:
protocol: PLAINTEXT
controller:
protocol: PLAINTEXT
kraft:
enabled: true
clusterId: oran-analytics-cluster
persistence:
enabled: true
size: 50Gi
metrics:
kafka:
enabled: true
jmx:
enabled: true
serviceMonitor:
enabled: true
EOF
helm install kafka bitnami/kafka \
--namespace analytics --create-namespace \
--values kafka-values.yaml
# Create topics for O-RAN data
kubectl run kafka-client --image=bitnami/kafka:3.6 --rm -it --restart=Never -n analytics -- bash -c "
kafka-topics.sh --bootstrap-server kafka:9092 \
--create --topic ves-measurement --partitions 10 --replication-factor 3
kafka-topics.sh --bootstrap-server kafka:9092 \
--create --topic ves-fault --partitions 5 --replication-factor 3
kafka-topics.sh --bootstrap-server kafka:9092 \
--create --topic pm-data --partitions 10 --replication-factor 3
kafka-topics.sh --bootstrap-server kafka:9092 \
--create --topic kpi-output --partitions 5 --replication-factor 3
"
Deploy InfluxDB for Time Seriesβ
# Install InfluxDB 2.x
helm repo add influxdata https://helm.influxdata.com/
helm repo update
helm install influxdb influxdata/influxdb2 \
--namespace analytics \
--set persistence.enabled=true \
--set persistence.size=100Gi \
--set adminUser.organization=oran \
--set adminUser.bucket=oran-metrics \
--set adminUser.password=admin123 \
--set adminUser.token=oran-token-123
# Create additional buckets
kubectl exec -it -n analytics influxdb-0 -- influx bucket create \
--name pm-data \
--org oran \
--retention 30d
kubectl exec -it -n analytics influxdb-0 -- influx bucket create \
--name kpi-data \
--org oran \
--retention 90d
Setup Apache Flink for Stream Processingβ
# Deploy Flink
kubectl apply -f - <<EOF
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
namespace: analytics
data:
flink-conf.yaml: |
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 4
parallelism.default: 2
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoints
execution.checkpointing.interval: 60000
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: analytics
spec:
replicas: 1
selector:
matchLabels:
app: flink-jobmanager
template:
metadata:
labels:
app: flink-jobmanager
spec:
containers:
- name: jobmanager
image: flink:1.18
command: ["jobmanager.sh", "start-foreground"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 8081
name: webui
volumeMounts:
- name: config
mountPath: /opt/flink/conf
volumes:
- name: config
configMap:
name: flink-config
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: analytics
spec:
replicas: 2
selector:
matchLabels:
app: flink-taskmanager
template:
metadata:
labels:
app: flink-taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.18
command: ["taskmanager.sh", "start-foreground"]
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
volumeMounts:
- name: config
mountPath: /opt/flink/conf
volumes:
- name: config
configMap:
name: flink-config
EOF
Deploy KPI Calculator Jobβ
# Create Python KPI calculator
cat > kpi-calculator.py <<'EOF'
#!/usr/bin/env python3
import json
import os
from kafka import KafkaConsumer, KafkaProducer
from influxdb_client import InfluxDBClient, Point
from datetime import datetime
import numpy as np
# Configuration
KAFKA_BOOTSTRAP = os.getenv('KAFKA_BOOTSTRAP', 'kafka.analytics:9092')
INFLUX_URL = os.getenv('INFLUX_URL', 'http://influxdb.analytics:8086')
INFLUX_TOKEN = os.getenv('INFLUX_TOKEN', 'oran-token-123')
INFLUX_ORG = 'oran'
INFLUX_BUCKET = 'kpi-data'
# Initialize clients
consumer = KafkaConsumer(
'pm-data',
bootstrap_servers=KAFKA_BOOTSTRAP,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
producer = KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
influx = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = influx.write_api()
def calculate_kpis(pm_data):
"""Calculate O-RAN KPIs from PM data"""
kpis = {}
# PRB Utilization
if 'prb_used_dl' in pm_data and 'prb_available_dl' in pm_data:
kpis['prb_utilization_dl'] = (pm_data['prb_used_dl'] / pm_data['prb_available_dl']) * 100
# Throughput
if 'mac_volume_dl_bytes' in pm_data:
kpis['throughput_dl_mbps'] = (pm_data['mac_volume_dl_bytes'] * 8) / 1_000_000
# Energy Efficiency
if 'throughput_dl_mbps' in kpis and 'power_consumption_watts' in pm_data:
kpis['energy_efficiency'] = kpis['throughput_dl_mbps'] / pm_data['power_consumption_watts']
# Spectral Efficiency
if 'throughput_dl_mbps' in kpis and 'bandwidth_mhz' in pm_data:
kpis['spectral_efficiency'] = kpis['throughput_dl_mbps'] / pm_data['bandwidth_mhz']
return kpis
# Main processing loop
for message in consumer:
pm_data = message.value
# Calculate KPIs
kpis = calculate_kpis(pm_data)
kpis['timestamp'] = datetime.utcnow().isoformat()
kpis['cell_id'] = pm_data.get('cell_id', 'unknown')
# Send to Kafka
producer.send('kpi-output', kpis)
# Write to InfluxDB
point = Point("oran_kpis").time(datetime.utcnow())
for key, value in kpis.items():
if isinstance(value, (int, float)):
point.field(key, value)
point.tag("cell_id", kpis['cell_id'])
write_api.write(bucket=INFLUX_BUCKET, record=point)
print(f"Processed KPIs for cell {kpis['cell_id']}")
EOF
# Create ConfigMap with script
kubectl create configmap kpi-calculator --from-file=kpi-calculator.py -n analytics
# Deploy as Kubernetes Job
kubectl apply -f - <<EOF
apiVersion: batch/v1
kind: Job
metadata:
name: kpi-calculator
namespace: analytics
spec:
template:
spec:
containers:
- name: calculator
image: python:3.11
command: ["bash", "-c"]
args:
- |
pip install kafka-python influxdb-client numpy
python /scripts/kpi-calculator.py
env:
- name: KAFKA_BOOTSTRAP
value: "kafka.analytics:9092"
- name: INFLUX_URL
value: "http://influxdb.analytics:8086"
- name: INFLUX_TOKEN
value: "oran-token-123"
volumeMounts:
- name: script
mountPath: /scripts
volumes:
- name: script
configMap:
name: kpi-calculator
restartPolicy: OnFailure
EOF
Setup ML Pipeline with Kubeflowβ
# Install Kubeflow Pipelines
export PIPELINE_VERSION=2.0.5
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/cluster-scoped-resources?ref=$PIPELINE_VERSION"
kubectl wait --for condition=established --timeout=60s crd/applications.app.k8s.io
kubectl apply -k "github.com/kubeflow/pipelines/manifests/kustomize/env/platform-agnostic?ref=$PIPELINE_VERSION"
# Create ML pipeline for anomaly detection
cat > anomaly-detection-pipeline.py <<'EOF'
from kfp import dsl
from kfp import compiler
@dsl.component(
base_image='python:3.11',
packages_to_install=['pandas', 'scikit-learn', 'kafka-python']
)
def train_anomaly_model(
kafka_topic: str,
model_output_path: str
):
import pandas as pd
from sklearn.ensemble import IsolationForest
from kafka import KafkaConsumer
import pickle
import json
# Collect training data from Kafka
consumer = KafkaConsumer(
kafka_topic,
bootstrap_servers='kafka.analytics:9092',
max_poll_records=1000,
consumer_timeout_ms=10000,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
data = []
for message in consumer:
data.append(message.value)
df = pd.DataFrame(data)
# Train Isolation Forest
model = IsolationForest(contamination=0.1)
features = ['prb_utilization_dl', 'throughput_dl_mbps', 'energy_efficiency']
model.fit(df[features])
# Save model
with open(model_output_path, 'wb') as f:
pickle.dump(model, f)
@dsl.pipeline(
name='O-RAN Anomaly Detection',
description='Detect anomalies in O-RAN KPIs'
)
def anomaly_detection_pipeline():
train_task = train_anomaly_model(
kafka_topic='kpi-output',
model_output_path='/tmp/anomaly_model.pkl'
)
# Compile pipeline
compiler.Compiler().compile(
anomaly_detection_pipeline,
'anomaly-detection-pipeline.yaml'
)
EOF
python anomaly-detection-pipeline.py
# Submit pipeline
kubectl apply -f anomaly-detection-pipeline.yaml
Create Analytics Dashboardβ
# Deploy Superset for analytics
helm repo add superset https://apache.github.io/superset
helm repo update
helm install superset superset/superset \
--namespace analytics \
--set postgresql.enabled=true \
--set redis.enabled=true \
--set supersetNode.connections.db_host=superset-postgresql \
--set supersetNode.connections.redis_host=superset-redis-headless
# Configure data sources
kubectl exec -it -n analytics superset-0 -- superset db upgrade
kubectl exec -it -n analytics superset-0 -- superset fab create-admin \
--username admin --firstname Admin --lastname User \
--email admin@oran.org --password admin
# Add InfluxDB connection
kubectl exec -it -n analytics superset-0 -- superset set-database-uri \
-d "O-RAN Metrics" \
-u "influxdb://oran:admin123@influxdb.analytics:8086/oran-metrics"
Query Analytics Dataβ
# Query KPIs from InfluxDB
cat > query-kpis.flux <<'EOF'
from(bucket: "kpi-data")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "oran_kpis")
|> filter(fn: (r) => r._field == "prb_utilization_dl")
|> aggregateWindow(every: 5m, fn: mean)
|> yield(name: "prb_utilization")
EOF
curl -X POST http://localhost:8086/api/v2/query \
-H "Authorization: Token oran-token-123" \
-H "Content-Type: application/vnd.flux" \
-d @query-kpis.flux
# Export data for reporting
kubectl exec -it -n analytics influxdb-0 -- influx query \
'from(bucket:"kpi-data") |> range(start: -24h) |> filter(fn: (r) => r._measurement == "oran_kpis")' \
--org oran \
--token oran-token-123 \
--raw > daily-kpis.csv
DECISION LOGICβ
User says β I execute:
- "setup kafka" β Deploy Kafka with KRaft Mode
- "setup database" β Deploy InfluxDB for Time Series
- "setup flink" β Setup Apache Flink for Stream Processing
- "deploy kpi calculator" β Deploy KPI Calculator Job
- "setup ml pipeline" β Setup ML Pipeline with Kubeflow
- "create dashboard" β Create Analytics Dashboard
- "query data" β Query Analytics Data
- "check analytics" β
kubectl get pods -n analytics
and check services
ERROR HANDLINGβ
- If Kafka fails: Check PVC with
kubectl get pvc -n analytics
- If InfluxDB fails: Verify credentials and check logs
- If Flink fails: Check JobManager logs with
kubectl logs -n analytics deployment/flink-jobmanager
- If KPI calculator fails: Check Kafka connectivity and topic existence
- If ML pipeline fails: Verify Kubeflow installation and check pipeline logs
- If no data: Verify VES collector is sending to Kafka topics
FILES I CREATEβ
kafka-values.yaml
- Kafka configurationkpi-calculator.py
- KPI processing scriptanomaly-detection-pipeline.py
- ML pipeline definitionquery-kpis.flux
- InfluxDB query scriptsflink-job.jar
- Flink processing jobsdaily-kpis.csv
- Exported analytics data
VERIFICATIONβ
# Check analytics components
kubectl get pods -n analytics
kubectl get jobs -n analytics
# Verify Kafka topics
kubectl run kafka-client --image=bitnami/kafka:3.6 --rm -it --restart=Never -n analytics -- \
kafka-topics.sh --bootstrap-server kafka:9092 --list
# Check InfluxDB
kubectl port-forward -n analytics svc/influxdb 8086:8086 &
curl http://localhost:8086/health
# Access Flink UI
kubectl port-forward -n analytics svc/flink-jobmanager 8081:8081 &
echo "Flink UI: http://localhost:8081"
# Access Superset
kubectl port-forward -n analytics svc/superset 8088:8088 &
echo "Superset: http://localhost:8088 (admin/admin)"
# Check data flow
kubectl logs -n analytics job/kpi-calculator --tail=50
HANDOFF: performance-optimization-agent (if exists) or monitoring-agent (for feedback loop)