Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions docs/configuration_samples/sample_kubernetes/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Running Singer on Kubernetes

This sample shows how to run Singer as a node-level logging agent (DaemonSet)
that tails logs for every pod on the node and routes each source to its own
Kafka topic.

## How Kubernetes mode works

With `singer.kubernetesEnabled=true`, Singer:

1. Watches `singer.kubernetes.podLogDirectory` (normally `/var/log/pods`) for
pod directories, which follow the kubelet naming convention
`<namespace>_<podname>_<poduid>`.
2. Polls the kubelet `/pods` API to detect pod deletions and to fetch pod
metadata (labels, annotations, etc.) selected by
`singer.kubernetes.podMetadataFields`.
3. For every pod, instantiates each log config in `conf.d/` whose `logDir`
(interpreted *relative to the pod's log directory*) matches, creating
independent log streams with their own watermarks.
4. Drains in-flight data before cleaning up when a pod is deleted.

## Tailing container stdout/stderr

The kubelet writes container output to
`/var/log/pods/<namespace>_<podname>_<poduid>/<container>/0.log` and rotates by
renaming (`0.log` → `0.log.<timestamp>`), which is compatible with Singer's
inode-based watermarks.

Use a wildcard `logDir` so every container directory becomes its own stream
(see `conf.d/container.stdout_logs.properties`):

```properties
logDir=/*
logStreamRegex=0.log
logFileMatchMode=prefix
```

Lines are shipped in the raw CRI format
(`<timestamp> <stdout|stderr> <P|F> <message>`); downstream consumers parse the
prefix if they need it.

## Per-source topic routing

`writer.kafka.topicTemplate` derives the topic from where the stream was
collected. Resolution happens once per stream, with this precedence:

1. **`topicTemplate`** — used when every variable resolves and the result is a
legal Kafka topic name (`[a-zA-Z0-9._-]`, max 249 chars). Variables:
- `%{namespace}` — the pod's Kubernetes namespace
- `%{container}` — the container directory name (kubelet stdout layout)
- `%{metadata:<key>}` — a pod metadata value fetched via
`singer.kubernetes.podMetadataFields`; the key is the last path segment of
the configured field (e.g. field `labels:app` → `%{metadata:app}`)
2. **`topic`** — the required static fallback; legacy `\N` capture-group
expansion against `logStreamRegex` still applies. Fallbacks are counted by
the `singer.writer.topic_template_fallback` metric.

Pod-level identifiers (pod name, pod uid) are deliberately **not** supported as
template variables: they change on every restart/reschedule and would create
unbounded topic cardinality.

Examples:

```properties
# one topic per namespace+container
writer.kafka.topicTemplate=logs_%{namespace}_%{container}

# self-serve routing: teams set a label/annotation on their pods
# (requires singer.kubernetes.podMetadataFields to include e.g. labels:logTopic)
writer.kafka.topicTemplate=%{metadata:logTopic}
```

To restrict which pods a config applies to at all, combine with the pod
allowlist (`podAllowlist` in the log config plus
`singer.kubernetes.podAllowlistMetadataKey` in `singer.properties`).

## DaemonSet deployment notes

- Mount `/var/log/pods` (read-only) from the host.
- Mount a writable host path for Singer's watermark files so a restart or
redeploy does not re-ship logs.
- For clusters where the kubelet read-only port (10255) is disabled, set
`singer.kubernetes.useSecureConnection=true`,
`singer.kubernetes.kubeletPort=10250`, point
`serviceAccountTokenPath`/`serviceAccountCaCertPath` at the standard service
account paths, expose the node IP as the `HOST_IP` environment variable via
the downward API, and grant the service account `get` on `nodes/proxy`.
- Expose the Ostrich port (`singer.ostrichPort`) if you scrape metrics.
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
###############################################################################
#
# Config for shipping container stdout/stderr logs from the standard kubelet
# pod log directory layout:
#
# /var/log/pods/<namespace>_<podname>_<poduid>/<container>/0.log
#
# logDir is relative to each pod's log directory. The wildcard "/*" matches
# every container directory, so each container gets its own log stream (and
# therefore its own resolved topic).
#
###############################################################################

logName=container_logs
logDir=/*
logStreamRegex=0.log
logFileMatchMode=prefix

# Configuration for processor.
processor.batchSize=200
processor.processingIntervalInSeconds=10
processor.processingIntervalInSecondsMax=20
processor.processingTimeSliceInSeconds=15

# Configuration for reader. Note: lines are shipped in the raw CRI format
# ("<timestamp> <stdout|stderr> <P|F> <message>"); downstream consumers are
# responsible for parsing the prefix if needed.
reader.type=text
reader.text.readerBufferSize=2097152
reader.text.maxMessageSize=1000000
reader.text.messageStartRegex=^.*$
reader.text.numMessagesPerLogMessage=1
reader.text.logMessageType=plain_text

# Configuration for writer.
#
# Topic resolution precedence (per log stream, decided once at stream creation):
# 1. topicTemplate - used when every variable resolves and the result is a
# legal Kafka topic name. Supported variables:
# %{namespace} - the pod's Kubernetes namespace
# %{container} - the container directory name
# %{metadata:<key>} - a pod metadata field fetched via
# singer.kubernetes.podMetadataFields, keyed by the
# last path segment (e.g. "labels:app" -> %{metadata:app})
# Pod-level identifiers (pod name / pod uid) are intentionally NOT
# supported: they change on every restart/reschedule and would create
# unbounded topic cardinality.
# 2. topic - the static fallback, always required. Also supports the legacy
# \N capture-group expansion against logStreamRegex.
writer.type=kafka
writer.kafka.topicTemplate=logs_%{namespace}_%{container}
writer.kafka.topic=logs_unrouted
writer.kafka.producerConfig.bootstrap.servers=localhost:9092
writer.kafka.producerConfig.acks=1
12 changes: 12 additions & 0 deletions docs/configuration_samples/sample_kubernetes/singer.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ singer.heartbeatEnabled = false
singer.kubernetesEnabled = true
singer.kubernetes.podLogDirectory = /var/log/pods

# Pod metadata fields to fetch from the kubelet, used for message headers and
# %{metadata:<key>} topic template variables (keyed by the last path segment,
# e.g. labels:app -> %{metadata:app})
# singer.kubernetes.podMetadataFields = labels:app,annotations:logTopic

# For clusters with the kubelet read-only port disabled (requires HOST_IP env
# via the downward API and RBAC "get" on nodes/proxy):
# singer.kubernetes.useSecureConnection = true
# singer.kubernetes.kubeletPort = 10250
# singer.kubernetes.serviceAccountTokenPath = /var/run/secrets/kubernetes.io/serviceaccount/token
# singer.kubernetes.serviceAccountCaCertPath = /var/run/secrets/kubernetes.io/serviceaccount/ca.crt

# Configuration for LogMonitor.
singer.monitor.monitorIntervalInSecs = 10

Expand Down
14 changes: 14 additions & 0 deletions singer-commons/src/main/thrift/config.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,20 @@ struct KafkaWriterConfig {
4: optional bool auditingEnabled = 0;
5: optional bool skipNoLeaderPartitions = 0;
6: optional i32 writeTimeoutInSeconds = 60;
// Template used to derive the topic per log stream when running in Kubernetes mode,
// e.g. "logs_%{namespace}_%{container}". Supported variables:
// %{namespace} - the Kubernetes namespace of the pod the stream belongs to
// %{container} - the first directory under the pod log directory; with the
// standard kubelet layout (/var/log/pods/<ns>_<pod>_<uid>/<container>/)
// this is the container name
// %{metadata:<key>} - a pod metadata value fetched via KubeConfig.podMetadataFields
// (e.g. a label or annotation), keyed by the last path segment
// Pod-level identifiers (pod name / pod uid) are intentionally not supported to avoid
// unbounded topic cardinality.
// Precedence: if every variable resolves and the result is a legal Kafka topic name,
// the resolved name is used; otherwise the writer falls back to "topic" (which still
// supports the existing \\N capture-group expansion against logStreamRegex).
7: optional string topicTemplate;
}

struct NoOpWriteConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,7 @@ public class SingerConfigDef {
// Pod allowlist config for Kubernetes log stream filtering
public static final String POD_ALLOWLIST = "podAllowlist";
public static final String POD_ALLOWLIST_METADATA_KEY = "podAllowlistMetadataKey";

// Per-stream Kafka topic template for Kubernetes log streams
public static final String TOPIC_TEMPLATE = "topicTemplate";
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class SingerMetrics {
public static final String NUM_COMMITED_TRANSACTIONS = SINGER_WRITER + "num_committed_transactions";
public static final String NUM_ABORTED_TRANSACTIONS = SINGER_WRITER + "num_aborted_transactions";
public static final String NUM_KAFKA_PRODUCERS = SINGER_WRITER + "num_kafka_producers";
public static final String TOPIC_TEMPLATE_RESOLVED = SINGER_WRITER + "topic_template_resolved";
public static final String TOPIC_TEMPLATE_FALLBACK = SINGER_WRITER + "topic_template_fallback";

public static final String SINGER_TRANSFORMER = SINGER_PREIX + "transformer.";
public static final String REGEX_BASED_MODIFIER = SINGER_TRANSFORMER + "regex_based_modifier.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.pinterest.singer.common.LogStreamWriter;
import com.pinterest.singer.common.errors.LogStreamWriterException;
import com.pinterest.singer.common.SingerMetrics;
import com.pinterest.singer.common.SingerSettings;
import com.pinterest.singer.config.Decider;
import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
import com.pinterest.singer.processor.DefaultLogStreamProcessor;
Expand All @@ -46,6 +47,7 @@
import com.pinterest.singer.thrift.configuration.TextReaderConfig;
import com.pinterest.singer.thrift.configuration.ThriftReaderConfig;
import com.pinterest.singer.utils.SingerUtils;
import com.pinterest.singer.utils.TopicTemplateResolver;
import com.pinterest.singer.writer.NoOpLogStreamWriter;
import com.pinterest.singer.writer.KafkaWriter;
import com.pinterest.singer.writer.kafka.CommittableKafkaWriter;
Expand Down Expand Up @@ -76,6 +78,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -387,10 +390,7 @@ protected LogStreamWriter createKafkaWriter(LogStream logStream,
KafkaProducerConfig producerConfig = kafkaWriterConfig.getProducerConfig();

SingerLogConfig singerLogConfig = logStream.getSingerLog().getSingerLogConfig();
String topic = extractTopicNameFromLogStreamName(
logStream.getLogStreamName(),
singerLogConfig.getLogStreamRegex(),
kafkaWriterConfig.getTopic());
String topic = resolveTopic(logStream, singerLogConfig, kafkaWriterConfig);

boolean auditingEnabled = kafkaWriterConfig.isAuditingEnabled();
String auditTopic = null;
Expand Down Expand Up @@ -688,6 +688,46 @@ private void stopMonitoredLogs() {
}
}

/**
* Resolve the Kafka topic for a log stream.
*
* Precedence:
* 1. writer.kafka.topicTemplate, if configured and the stream has Kubernetes pod
* context, every template variable resolves, and the result is a legal topic name.
* 2. writer.kafka.topic, with the existing \N capture-group expansion against
* logStreamRegex (unchanged legacy behavior; also the fallback whenever the
* template cannot be resolved).
*/
@VisibleForTesting
protected static String resolveTopic(LogStream logStream, SingerLogConfig singerLogConfig,
KafkaWriterConfig kafkaWriterConfig)
throws ConfigurationException {
if (kafkaWriterConfig.isSetTopicTemplate() && !kafkaWriterConfig.getTopicTemplate().isEmpty()) {
String podLogDirectory = "";
SingerConfig singerConfig = SingerSettings.getSingerConfig();
if (singerConfig != null && singerConfig.isKubernetesEnabled()
&& singerConfig.getKubeConfig() != null) {
podLogDirectory = singerConfig.getKubeConfig().getPodLogDirectory();
}
Optional<String> resolved = TopicTemplateResolver
.resolve(kafkaWriterConfig.getTopicTemplate(), logStream, podLogDirectory);
if (resolved.isPresent()) {
OpenTsdbMetricConverter.incr(SingerMetrics.TOPIC_TEMPLATE_RESOLVED, 1,
"topic=" + resolved.get(), "logName=" + singerLogConfig.getName());
return resolved.get();
}
LOG.warn("Falling back to static topic {} for log stream {}: topicTemplate {} could not"
+ " be resolved", kafkaWriterConfig.getTopic(), logStream.getLogStreamDescriptor(),
kafkaWriterConfig.getTopicTemplate());
OpenTsdbMetricConverter.incr(SingerMetrics.TOPIC_TEMPLATE_FALLBACK, 1,
"topic=" + kafkaWriterConfig.getTopic(), "logName=" + singerLogConfig.getName());
}
return extractTopicNameFromLogStreamName(
logStream.getLogStreamName(),
singerLogConfig.getLogStreamRegex(),
kafkaWriterConfig.getTopic());
}

/**
* Expand a name by replacing placeholder (such as \1, \2) in the name with captured group
* from LogStream name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,13 @@ public void evaluateAndRegisterLogStreamOrWatcher(Path path, String podUid) {
SingerSettings.getSingerConfig().getKubeConfig().getPodLogDirectory(), path, podUid);

int directoryDepth = logConfigKey.split("/").length - 1;
// collect configs from all matching entries so wildcard patterns (e.g. "/*") and exact
// directory configs at the same depth don't shadow each other
Collection<SingerLogConfig> collection = new ArrayList<>();
for (Map.Entry<Pair<Integer, String>, Collection<SingerLogConfig>> entry : SingerSettings.getLogConfigMap().entrySet()) {
if (entry.getKey().getLeft() == directoryDepth && FilenameUtils.wildcardMatch(logConfigKey,
entry.getKey().getRight())) {
collection = entry.getValue();
break;
collection.addAll(entry.getValue());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,11 +1158,24 @@ private static KafkaWriterConfig parseKafkaWriterConfig(AbstractConfiguration ka
.getInt(SingerConfigDef.KAFKA_WRITE_TIMEOUT_IN_SECONDS);
}

String topicTemplate = null;
if (kafkaWriterConfiguration.containsKey(SingerConfigDef.TOPIC_TEMPLATE)) {
topicTemplate = kafkaWriterConfiguration.getString(SingerConfigDef.TOPIC_TEMPLATE);
try {
TopicTemplateResolver.validateTemplate(topicTemplate);
} catch (IllegalArgumentException e) {
throw new ConfigurationException("Invalid KafkaWriter topicTemplate: " + e.getMessage());
}
}

KafkaWriterConfig writerConfig = new KafkaWriterConfig(topic, producerConfig);
writerConfig.setAuditTopic(auditTopic);
writerConfig.setAuditingEnabled(auditingEnabled);
writerConfig.setSkipNoLeaderPartitions(skipNoLeaderPartitions);
writerConfig.setWriteTimeoutInSeconds(writeTimeoutInSeconds);
if (topicTemplate != null) {
writerConfig.setTopicTemplate(topicTemplate);
}
return writerConfig;
}

Expand Down
Loading