Skip to content

Commit 4b1e0fd

Browse files
committed
kafka sink
1 parent cec7061 commit 4b1e0fd

12 files changed

Lines changed: 151 additions & 4 deletions

File tree

playbooks/deployment_babysitter.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@
1010

1111

1212
class DeploymentBabysitterConfig(BaseModel):
13-
slack_channel: str
13+
slack_channel: str = ""
1414
fields_to_monitor: Tuple[str] = (
1515
"status.readyReplicas",
1616
"message",
1717
"reason",
1818
"spec"
1919
)
20+
sinks: List[SinkConfigBase] = None
2021

2122

2223
# TODO: filter out all the managed fields crap
@@ -40,6 +41,7 @@ def babysitter_get_blocks(diffs: List[DiffDetail]):
4041
@on_deployment_all_changes
4142
def deployment_babysitter(event: DeploymentEvent, config: DeploymentBabysitterConfig):
4243
"""Track changes to a deployment and send the changes in slack."""
44+
filtered_diffs = None
4345
if event.operation == K8sOperationType.UPDATE:
4446
all_diffs = event.obj.diff(event.old_obj)
4547
filtered_diffs = list(filter(lambda x: babysitter_should_include_diff(x, config), all_diffs))
@@ -48,5 +50,22 @@ def deployment_babysitter(event: DeploymentEvent, config: DeploymentBabysitterCo
4850
event.report_attachment_blocks.extend(babysitter_get_blocks(filtered_diffs))
4951

5052
event.report_title = f"Deployment {event.obj.metadata.name} {event.operation.value}d in namespace {event.obj.metadata.namespace}"
51-
event.slack_channel = config.slack_channel
52-
send_to_slack(event)
53+
if config.slack_channel:
54+
event.slack_channel = config.slack_channel
55+
send_to_slack(event)
56+
57+
if config.sinks:
58+
data = {
59+
"deployment": event.obj.metadata.name,
60+
"deployment_namespace": event.obj.metadata.namespace,
61+
"message": "Deployment properties change",
62+
"changed_properties": [{
63+
"property": ".".join(diff.path),
64+
"old": diff.other_value,
65+
"new": diff.value
66+
} for diff in filtered_diffs]
67+
}
68+
for sink_config in config.sinks:
69+
SinkFactory.get_sink(sink_config).write(data)
70+
71+

playbooks/grafana_enrichment.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,43 @@ def add_deployment_lines_to_grafana(event: DeploymentEvent, action_params: Param
2929
grafana.add_line_to_dashboard(action_params.grafana_dashboard_uid, msg, tags=[event.obj.metadata.name])
3030

3131

32+
class ImageChangesParams(BaseModel):
33+
sinks: List[SinkConfigBase]
34+
35+
@on_deployment_update
36+
def report_image_changes(event: DeploymentEvent, action_params: ImageChangesParams):
37+
"""
38+
Report image changed whenever a new application version is deployed so that you can easily see changes.
39+
"""
40+
new_images = event.obj.get_images()
41+
old_images = event.old_obj.get_images()
42+
if new_images == old_images:
43+
return
44+
45+
msg = ""
46+
changed_properties = []
47+
if new_images.keys() != old_images.keys():
48+
msg = f"number or names of images changed: new - {new_images} old - {old_images}"
49+
else:
50+
for name in new_images:
51+
if new_images[name] != old_images[name]:
52+
msg += f"image name: {name} new tag: {new_images[name]} old tag {old_images[name]}"
53+
changed_properties.append({
54+
"property": "image",
55+
"old": f"{name}:{old_images[name]}",
56+
"new": f"{name}:{new_images[name]}"
57+
})
58+
59+
data = {
60+
"deployment": event.obj.metadata.name,
61+
"deployment_namespace": event.obj.metadata.namespace,
62+
"message": msg,
63+
"changed_properties": changed_properties
64+
}
65+
for sink_config in action_params.sinks:
66+
SinkFactory.get_sink(sink_config).write(data)
67+
68+
3269
@on_pod_create
3370
def test_pod_orm(event : PodEvent):
3471
logging.info('running test_pod_orm')

src/poetry.lock

Lines changed: 16 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ dulwich = "^0.20.23"
2727
better-exceptions = "^0.3.3"
2828
CairoSVG = "^2.5.2"
2929
tabulate = "^0.8.9"
30+
kafka-python = "^2.0.2"
3031

3132
[tool.poetry.dev-dependencies]
3233
hikaru = {git = "https://github.com/aantn/hikaru.git", rev = "fix_datetimes"}

src/robusta/api/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
from ..integrations.manual.triggers import *
1515
from ..integrations.scheduled.triggers import *
1616
from ..integrations.git.git_repo_manager import *
17+
from ..integrations.sinks.sink_base import *
18+
from ..integrations.sinks.sink_config import *
19+
from ..integrations.sinks.sink_factory import *
20+
from ..integrations.sinks.kafka import *
1721
from ..core.persistency.in_memory import get_persistent_data
1822
from ..utils.rate_limiter import RateLimiter
1923
from ..runner.object_updater import *

src/robusta/integrations/sinks/kafka/__init__.py

Whitespace-only changes.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import json
2+
3+
from ..sink_base import SinkBase
4+
from kafka import KafkaProducer
5+
6+
7+
class KafkaSink(SinkBase):
8+
9+
def __init__(self, producer: KafkaProducer, topic: str):
10+
self.producer = producer
11+
self.topic = topic
12+
13+
def write(self, data: dict):
14+
self.producer.send(self.topic, value=json.dumps(data).encode("utf-8"))
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from pydantic import BaseModel
2+
3+
4+
class KafkaSinkConfig(BaseModel):
5+
kafka_url: str
6+
topic: str
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import threading
2+
from collections import defaultdict
3+
4+
from kafka import KafkaProducer
5+
6+
from .kafka_sink import KafkaSink
7+
from ..sink_base import SinkBase
8+
9+
10+
class KafkaSinkManager:
11+
12+
manager_lock = threading.Lock()
13+
producers_map = defaultdict(None)
14+
15+
@staticmethod
16+
def get_kafka_sink(kafka_url : str, topic: str) -> SinkBase:
17+
with KafkaSinkManager.manager_lock:
18+
producer = KafkaSinkManager.producers_map.get(kafka_url)
19+
if producer is not None:
20+
return KafkaSink(producer, topic)
21+
producer = KafkaProducer(bootstrap_servers=kafka_url)
22+
KafkaSinkManager.producers_map[kafka_url] = producer
23+
return KafkaSink(producer, topic)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
2+
class SinkBase:
3+
4+
def write(self, data: dict):
5+
pass

0 commit comments

Comments
 (0)