diff --git a/docs/configuration_samples/sample_kubernetes/README.md b/docs/configuration_samples/sample_kubernetes/README.md new file mode 100644 index 00000000..fb868770 --- /dev/null +++ b/docs/configuration_samples/sample_kubernetes/README.md @@ -0,0 +1,88 @@ +# Running Singer on Kubernetes + +This sample shows how to run Singer as a node-level logging agent (DaemonSet) +that tails logs for every pod on the node and routes each source to its own +Kafka topic. + +## How Kubernetes mode works + +With `singer.kubernetesEnabled=true`, Singer: + +1. Watches `singer.kubernetes.podLogDirectory` (normally `/var/log/pods`) for + pod directories, which follow the kubelet naming convention + `__`. +2. Polls the kubelet `/pods` API to detect pod deletions and to fetch pod + metadata (labels, annotations, etc.) selected by + `singer.kubernetes.podMetadataFields`. +3. For every pod, instantiates each log config in `conf.d/` whose `logDir` + (interpreted *relative to the pod's log directory*) matches, creating + independent log streams with their own watermarks. +4. Drains in-flight data before cleaning up when a pod is deleted. + +## Tailing container stdout/stderr + +The kubelet writes container output to +`/var/log/pods/__//0.log` and rotates by +renaming (`0.log` → `0.log.`), which is compatible with Singer's +inode-based watermarks. + +Use a wildcard `logDir` so every container directory becomes its own stream +(see `conf.d/container.stdout_logs.properties`): + +```properties +logDir=/* +logStreamRegex=0.log +logFileMatchMode=prefix +``` + +Lines are shipped in the raw CRI format +(` `); downstream consumers parse the +prefix if they need it. + +## Per-source topic routing + +`writer.kafka.topicTemplate` derives the topic from where the stream was +collected. Resolution happens once per stream, with this precedence: + +1. **`topicTemplate`** — used when every variable resolves and the result is a + legal Kafka topic name (`[a-zA-Z0-9._-]`, max 249 chars). Variables: + - `%{namespace}` — the pod's Kubernetes namespace + - `%{container}` — the container directory name (kubelet stdout layout) + - `%{metadata:}` — a pod metadata value fetched via + `singer.kubernetes.podMetadataFields`; the key is the last path segment of + the configured field (e.g. field `labels:app` → `%{metadata:app}`) +2. **`topic`** — the required static fallback; legacy `\N` capture-group + expansion against `logStreamRegex` still applies. Fallbacks are counted by + the `singer.writer.topic_template_fallback` metric. + +Pod-level identifiers (pod name, pod uid) are deliberately **not** supported as +template variables: they change on every restart/reschedule and would create +unbounded topic cardinality. + +Examples: + +```properties +# one topic per namespace+container +writer.kafka.topicTemplate=logs_%{namespace}_%{container} + +# self-serve routing: teams set a label/annotation on their pods +# (requires singer.kubernetes.podMetadataFields to include e.g. labels:logTopic) +writer.kafka.topicTemplate=%{metadata:logTopic} +``` + +To restrict which pods a config applies to at all, combine with the pod +allowlist (`podAllowlist` in the log config plus +`singer.kubernetes.podAllowlistMetadataKey` in `singer.properties`). + +## DaemonSet deployment notes + +- Mount `/var/log/pods` (read-only) from the host. +- Mount a writable host path for Singer's watermark files so a restart or + redeploy does not re-ship logs. +- For clusters where the kubelet read-only port (10255) is disabled, set + `singer.kubernetes.useSecureConnection=true`, + `singer.kubernetes.kubeletPort=10250`, point + `serviceAccountTokenPath`/`serviceAccountCaCertPath` at the standard service + account paths, expose the node IP as the `HOST_IP` environment variable via + the downward API, and grant the service account `get` on `nodes/proxy`. +- Expose the Ostrich port (`singer.ostrichPort`) if you scrape metrics. diff --git a/docs/configuration_samples/sample_kubernetes/conf.d/container.stdout_logs.properties b/docs/configuration_samples/sample_kubernetes/conf.d/container.stdout_logs.properties new file mode 100644 index 00000000..39eaab7b --- /dev/null +++ b/docs/configuration_samples/sample_kubernetes/conf.d/container.stdout_logs.properties @@ -0,0 +1,54 @@ +############################################################################### +# +# Config for shipping container stdout/stderr logs from the standard kubelet +# pod log directory layout: +# +# /var/log/pods/__//0.log +# +# logDir is relative to each pod's log directory. The wildcard "/*" matches +# every container directory, so each container gets its own log stream (and +# therefore its own resolved topic). +# +############################################################################### + +logName=container_logs +logDir=/* +logStreamRegex=0.log +logFileMatchMode=prefix + +# Configuration for processor. +processor.batchSize=200 +processor.processingIntervalInSeconds=10 +processor.processingIntervalInSecondsMax=20 +processor.processingTimeSliceInSeconds=15 + +# Configuration for reader. Note: lines are shipped in the raw CRI format +# (" "); downstream consumers are +# responsible for parsing the prefix if needed. +reader.type=text +reader.text.readerBufferSize=2097152 +reader.text.maxMessageSize=1000000 +reader.text.messageStartRegex=^.*$ +reader.text.numMessagesPerLogMessage=1 +reader.text.logMessageType=plain_text + +# Configuration for writer. +# +# Topic resolution precedence (per log stream, decided once at stream creation): +# 1. topicTemplate - used when every variable resolves and the result is a +# legal Kafka topic name. Supported variables: +# %{namespace} - the pod's Kubernetes namespace +# %{container} - the container directory name +# %{metadata:} - a pod metadata field fetched via +# singer.kubernetes.podMetadataFields, keyed by the +# last path segment (e.g. "labels:app" -> %{metadata:app}) +# Pod-level identifiers (pod name / pod uid) are intentionally NOT +# supported: they change on every restart/reschedule and would create +# unbounded topic cardinality. +# 2. topic - the static fallback, always required. Also supports the legacy +# \N capture-group expansion against logStreamRegex. +writer.type=kafka +writer.kafka.topicTemplate=logs_%{namespace}_%{container} +writer.kafka.topic=logs_unrouted +writer.kafka.producerConfig.bootstrap.servers=localhost:9092 +writer.kafka.producerConfig.acks=1 diff --git a/docs/configuration_samples/sample_kubernetes/singer.properties b/docs/configuration_samples/sample_kubernetes/singer.properties index b342c3f7..792a3bfe 100644 --- a/docs/configuration_samples/sample_kubernetes/singer.properties +++ b/docs/configuration_samples/sample_kubernetes/singer.properties @@ -5,6 +5,18 @@ singer.heartbeatEnabled = false singer.kubernetesEnabled = true singer.kubernetes.podLogDirectory = /var/log/pods +# Pod metadata fields to fetch from the kubelet, used for message headers and +# %{metadata:} topic template variables (keyed by the last path segment, +# e.g. labels:app -> %{metadata:app}) +# singer.kubernetes.podMetadataFields = labels:app,annotations:logTopic + +# For clusters with the kubelet read-only port disabled (requires HOST_IP env +# via the downward API and RBAC "get" on nodes/proxy): +# singer.kubernetes.useSecureConnection = true +# singer.kubernetes.kubeletPort = 10250 +# singer.kubernetes.serviceAccountTokenPath = /var/run/secrets/kubernetes.io/serviceaccount/token +# singer.kubernetes.serviceAccountCaCertPath = /var/run/secrets/kubernetes.io/serviceaccount/ca.crt + # Configuration for LogMonitor. singer.monitor.monitorIntervalInSecs = 10 diff --git a/singer-commons/src/main/thrift/config.thrift b/singer-commons/src/main/thrift/config.thrift index 7dde461a..fb027ef0 100644 --- a/singer-commons/src/main/thrift/config.thrift +++ b/singer-commons/src/main/thrift/config.thrift @@ -140,6 +140,20 @@ struct KafkaWriterConfig { 4: optional bool auditingEnabled = 0; 5: optional bool skipNoLeaderPartitions = 0; 6: optional i32 writeTimeoutInSeconds = 60; + // Template used to derive the topic per log stream when running in Kubernetes mode, + // e.g. "logs_%{namespace}_%{container}". Supported variables: + // %{namespace} - the Kubernetes namespace of the pod the stream belongs to + // %{container} - the first directory under the pod log directory; with the + // standard kubelet layout (/var/log/pods/__//) + // this is the container name + // %{metadata:} - a pod metadata value fetched via KubeConfig.podMetadataFields + // (e.g. a label or annotation), keyed by the last path segment + // Pod-level identifiers (pod name / pod uid) are intentionally not supported to avoid + // unbounded topic cardinality. + // Precedence: if every variable resolves and the result is a legal Kafka topic name, + // the resolved name is used; otherwise the writer falls back to "topic" (which still + // supports the existing \\N capture-group expansion against logStreamRegex). + 7: optional string topicTemplate; } struct NoOpWriteConfig { diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java b/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java index 14d187e0..58eae4cd 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerConfigDef.java @@ -134,4 +134,7 @@ public class SingerConfigDef { // Pod allowlist config for Kubernetes log stream filtering public static final String POD_ALLOWLIST = "podAllowlist"; public static final String POD_ALLOWLIST_METADATA_KEY = "podAllowlistMetadataKey"; + + // Per-stream Kafka topic template for Kubernetes log streams + public static final String TOPIC_TEMPLATE = "topicTemplate"; } \ No newline at end of file diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java index a5d8252a..bbd2a3b2 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java @@ -89,6 +89,8 @@ public class SingerMetrics { public static final String NUM_COMMITED_TRANSACTIONS = SINGER_WRITER + "num_committed_transactions"; public static final String NUM_ABORTED_TRANSACTIONS = SINGER_WRITER + "num_aborted_transactions"; public static final String NUM_KAFKA_PRODUCERS = SINGER_WRITER + "num_kafka_producers"; + public static final String TOPIC_TEMPLATE_RESOLVED = SINGER_WRITER + "topic_template_resolved"; + public static final String TOPIC_TEMPLATE_FALLBACK = SINGER_WRITER + "topic_template_fallback"; public static final String SINGER_TRANSFORMER = SINGER_PREIX + "transformer."; public static final String REGEX_BASED_MODIFIER = SINGER_TRANSFORMER + "regex_based_modifier."; diff --git a/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java b/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java index 6f69f599..3a7da8c2 100644 --- a/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java +++ b/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java @@ -24,6 +24,7 @@ import com.pinterest.singer.common.LogStreamWriter; import com.pinterest.singer.common.errors.LogStreamWriterException; import com.pinterest.singer.common.SingerMetrics; +import com.pinterest.singer.common.SingerSettings; import com.pinterest.singer.config.Decider; import com.pinterest.singer.metrics.OpenTsdbMetricConverter; import com.pinterest.singer.processor.DefaultLogStreamProcessor; @@ -46,6 +47,7 @@ import com.pinterest.singer.thrift.configuration.TextReaderConfig; import com.pinterest.singer.thrift.configuration.ThriftReaderConfig; import com.pinterest.singer.utils.SingerUtils; +import com.pinterest.singer.utils.TopicTemplateResolver; import com.pinterest.singer.writer.NoOpLogStreamWriter; import com.pinterest.singer.writer.KafkaWriter; import com.pinterest.singer.writer.kafka.CommittableKafkaWriter; @@ -76,6 +78,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -387,10 +390,7 @@ protected LogStreamWriter createKafkaWriter(LogStream logStream, KafkaProducerConfig producerConfig = kafkaWriterConfig.getProducerConfig(); SingerLogConfig singerLogConfig = logStream.getSingerLog().getSingerLogConfig(); - String topic = extractTopicNameFromLogStreamName( - logStream.getLogStreamName(), - singerLogConfig.getLogStreamRegex(), - kafkaWriterConfig.getTopic()); + String topic = resolveTopic(logStream, singerLogConfig, kafkaWriterConfig); boolean auditingEnabled = kafkaWriterConfig.isAuditingEnabled(); String auditTopic = null; @@ -688,6 +688,46 @@ private void stopMonitoredLogs() { } } + /** + * Resolve the Kafka topic for a log stream. + * + * Precedence: + * 1. writer.kafka.topicTemplate, if configured and the stream has Kubernetes pod + * context, every template variable resolves, and the result is a legal topic name. + * 2. writer.kafka.topic, with the existing \N capture-group expansion against + * logStreamRegex (unchanged legacy behavior; also the fallback whenever the + * template cannot be resolved). + */ + @VisibleForTesting + protected static String resolveTopic(LogStream logStream, SingerLogConfig singerLogConfig, + KafkaWriterConfig kafkaWriterConfig) + throws ConfigurationException { + if (kafkaWriterConfig.isSetTopicTemplate() && !kafkaWriterConfig.getTopicTemplate().isEmpty()) { + String podLogDirectory = ""; + SingerConfig singerConfig = SingerSettings.getSingerConfig(); + if (singerConfig != null && singerConfig.isKubernetesEnabled() + && singerConfig.getKubeConfig() != null) { + podLogDirectory = singerConfig.getKubeConfig().getPodLogDirectory(); + } + Optional resolved = TopicTemplateResolver + .resolve(kafkaWriterConfig.getTopicTemplate(), logStream, podLogDirectory); + if (resolved.isPresent()) { + OpenTsdbMetricConverter.incr(SingerMetrics.TOPIC_TEMPLATE_RESOLVED, 1, + "topic=" + resolved.get(), "logName=" + singerLogConfig.getName()); + return resolved.get(); + } + LOG.warn("Falling back to static topic {} for log stream {}: topicTemplate {} could not" + + " be resolved", kafkaWriterConfig.getTopic(), logStream.getLogStreamDescriptor(), + kafkaWriterConfig.getTopicTemplate()); + OpenTsdbMetricConverter.incr(SingerMetrics.TOPIC_TEMPLATE_FALLBACK, 1, + "topic=" + kafkaWriterConfig.getTopic(), "logName=" + singerLogConfig.getName()); + } + return extractTopicNameFromLogStreamName( + logStream.getLogStreamName(), + singerLogConfig.getLogStreamRegex(), + kafkaWriterConfig.getTopic()); + } + /** * Expand a name by replacing placeholder (such as \1, \2) in the name with captured group * from LogStream name. diff --git a/singer/src/main/java/com/pinterest/singer/monitor/RecursiveFSEventProcessor.java b/singer/src/main/java/com/pinterest/singer/monitor/RecursiveFSEventProcessor.java index 25fba2c5..d5aa31b8 100644 --- a/singer/src/main/java/com/pinterest/singer/monitor/RecursiveFSEventProcessor.java +++ b/singer/src/main/java/com/pinterest/singer/monitor/RecursiveFSEventProcessor.java @@ -150,12 +150,13 @@ public void evaluateAndRegisterLogStreamOrWatcher(Path path, String podUid) { SingerSettings.getSingerConfig().getKubeConfig().getPodLogDirectory(), path, podUid); int directoryDepth = logConfigKey.split("/").length - 1; + // collect configs from all matching entries so wildcard patterns (e.g. "/*") and exact + // directory configs at the same depth don't shadow each other Collection collection = new ArrayList<>(); for (Map.Entry, Collection> entry : SingerSettings.getLogConfigMap().entrySet()) { if (entry.getKey().getLeft() == directoryDepth && FilenameUtils.wildcardMatch(logConfigKey, entry.getKey().getRight())) { - collection = entry.getValue(); - break; + collection.addAll(entry.getValue()); } } diff --git a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java index cdcb6bd0..71615f87 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java @@ -1158,11 +1158,24 @@ private static KafkaWriterConfig parseKafkaWriterConfig(AbstractConfiguration ka .getInt(SingerConfigDef.KAFKA_WRITE_TIMEOUT_IN_SECONDS); } + String topicTemplate = null; + if (kafkaWriterConfiguration.containsKey(SingerConfigDef.TOPIC_TEMPLATE)) { + topicTemplate = kafkaWriterConfiguration.getString(SingerConfigDef.TOPIC_TEMPLATE); + try { + TopicTemplateResolver.validateTemplate(topicTemplate); + } catch (IllegalArgumentException e) { + throw new ConfigurationException("Invalid KafkaWriter topicTemplate: " + e.getMessage()); + } + } + KafkaWriterConfig writerConfig = new KafkaWriterConfig(topic, producerConfig); writerConfig.setAuditTopic(auditTopic); writerConfig.setAuditingEnabled(auditingEnabled); writerConfig.setSkipNoLeaderPartitions(skipNoLeaderPartitions); writerConfig.setWriteTimeoutInSeconds(writeTimeoutInSeconds); + if (topicTemplate != null) { + writerConfig.setTopicTemplate(topicTemplate); + } return writerConfig; } diff --git a/singer/src/main/java/com/pinterest/singer/utils/TopicTemplateResolver.java b/singer/src/main/java/com/pinterest/singer/utils/TopicTemplateResolver.java new file mode 100644 index 00000000..d8a8ab10 --- /dev/null +++ b/singer/src/main/java/com/pinterest/singer/utils/TopicTemplateResolver.java @@ -0,0 +1,221 @@ +/** + * Copyright 2026 Pinterest, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.singer.utils; + +import com.pinterest.singer.common.LogStream; +import com.pinterest.singer.common.SingerLog; + +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Resolves a Kafka topic template (writer.kafka.topicTemplate) for a Kubernetes pod + * log stream. + * + * Supported variables: + * %{namespace} - the Kubernetes namespace, parsed from the pod log directory name + * which follows the kubelet convention namespace_podname[_uid] + * %{container} - the first directory under the pod's log directory; with the + * standard kubelet layout /var/log/pods/<ns>_<pod>_<uid>/<container>/ + * this is the container name + * %{metadata:key} - a pod metadata value fetched via KubeConfig.podMetadataFields + * (e.g. "labels:app" is looked up as %{metadata:app}) + * + * Pod-level identifiers (pod name, pod uid) are intentionally unsupported: they change on + * every pod restart / reschedule and would create unbounded Kafka topic cardinality. + * + * Resolution is all-or-nothing: if any variable cannot be resolved, or the resolved string + * is not a legal Kafka topic name, an empty Optional is returned and the caller is expected + * to fall back to the statically configured topic. + */ +public class TopicTemplateResolver { + + private static final Logger LOG = LoggerFactory.getLogger(TopicTemplateResolver.class); + + public static final String VAR_NAMESPACE = "namespace"; + public static final String VAR_CONTAINER = "container"; + public static final String VAR_METADATA = "metadata"; + + private static final Pattern VARIABLE_PATTERN = Pattern.compile("%\\{([^}]*)\\}"); + // Kafka legal topic characters + private static final Pattern LEGAL_TOPIC_PATTERN = Pattern.compile("[a-zA-Z0-9._-]+"); + private static final int MAX_TOPIC_LENGTH = 249; + + private TopicTemplateResolver() { + } + + /** + * Validate a topic template at configuration load time. + * + * @throws IllegalArgumentException if the template references an unsupported variable, + * contains malformed variable syntax, or has literal characters that are not + * legal in a Kafka topic name. + */ + public static void validateTemplate(String template) { + if (template == null || template.isEmpty()) { + throw new IllegalArgumentException("topicTemplate must not be empty"); + } + Matcher matcher = VARIABLE_PATTERN.matcher(template); + StringBuffer literal = new StringBuffer(); + boolean hasVariable = false; + while (matcher.find()) { + hasVariable = true; + String variable = matcher.group(1); + String name = variable.contains(":") + ? variable.substring(0, variable.indexOf(':')) : variable; + String argument = variable.contains(":") + ? variable.substring(variable.indexOf(':') + 1) : null; + switch (name) { + case VAR_NAMESPACE: + case VAR_CONTAINER: + if (argument != null) { + throw new IllegalArgumentException("topicTemplate variable %{" + name + + "} does not take an argument: %{" + variable + "}"); + } + break; + case VAR_METADATA: + if (argument == null || argument.isEmpty()) { + throw new IllegalArgumentException( + "topicTemplate variable %{metadata:} requires a key: %{" + variable + "}"); + } + break; + case "podName": + case "podUid": + case "pod": + throw new IllegalArgumentException("topicTemplate variable %{" + name + + "} is not supported: pod-level identifiers change on every pod restart and" + + " would create unbounded topic cardinality. Allowed variables: %{namespace}," + + " %{container}, %{metadata:}"); + default: + throw new IllegalArgumentException("Unsupported topicTemplate variable %{" + variable + + "}. Allowed variables: %{namespace}, %{container}, %{metadata:}"); + } + matcher.appendReplacement(literal, ""); + } + matcher.appendTail(literal); + if (!hasVariable) { + throw new IllegalArgumentException("topicTemplate \"" + template + + "\" contains no %{...} variables; use writer.kafka.topic for static topics"); + } + String literalPart = literal.toString(); + if (literalPart.contains("%{") || literalPart.contains("}")) { + throw new IllegalArgumentException( + "topicTemplate \"" + template + "\" has malformed %{...} variable syntax"); + } + if (!literalPart.isEmpty() && !LEGAL_TOPIC_PATTERN.matcher(literalPart).matches()) { + throw new IllegalArgumentException("topicTemplate \"" + template + + "\" contains characters that are not legal in a Kafka topic name;" + + " legal characters are [a-zA-Z0-9._-]"); + } + } + + /** + * Resolve a topic template for the given log stream. + * + * @param template a template previously validated by {@link #validateTemplate} + * @param logStream the log stream the writer is being created for + * @param podLogDirectory the configured Kubernetes pod log directory + * @return the resolved topic, or empty if the stream is not a pod stream, any variable + * is unresolvable, or the result is not a legal Kafka topic name + */ + public static Optional resolve(String template, LogStream logStream, + String podLogDirectory) { + SingerLog singerLog = logStream.getSingerLog(); + String podUid = singerLog.getPodUid(); + if (podUid == null || podUid.isEmpty()) { + LOG.debug("topicTemplate is only supported for Kubernetes pod log streams, stream {}" + + " has no pod context", logStream.getLogStreamDescriptor()); + return Optional.empty(); + } + Matcher matcher = VARIABLE_PATTERN.matcher(template); + StringBuffer resolved = new StringBuffer(); + while (matcher.find()) { + String variable = matcher.group(1); + String value; + if (VAR_NAMESPACE.equals(variable)) { + value = extractNamespace(podUid); + } else if (VAR_CONTAINER.equals(variable)) { + value = extractContainerName(logStream.getLogDir(), podLogDirectory, podUid); + } else if (variable.startsWith(VAR_METADATA + ":")) { + String key = variable.substring(VAR_METADATA.length() + 1); + value = singerLog.getMetadata(key).map(TopicTemplateResolver::decode).orElse(null); + } else { + // validateTemplate rejects these at config load; defensive for direct callers + value = null; + } + if (value == null || value.isEmpty()) { + LOG.warn("Could not resolve topicTemplate variable %{{{}}} for stream {}", variable, + logStream.getLogStreamDescriptor()); + return Optional.empty(); + } + matcher.appendReplacement(resolved, Matcher.quoteReplacement(value)); + } + matcher.appendTail(resolved); + String topic = resolved.toString(); + if (!isLegalTopicName(topic)) { + LOG.warn("Resolved topic \"{}\" for stream {} is not a legal Kafka topic name", topic, + logStream.getLogStreamDescriptor()); + return Optional.empty(); + } + return Optional.of(topic); + } + + /** + * Pod log directories follow the kubelet convention namespace_podname[_uid]; namespaces + * are DNS labels and cannot contain underscores, so the namespace is the segment before + * the first underscore. + */ + public static String extractNamespace(String podUid) { + int separatorIndex = podUid.indexOf('_'); + return separatorIndex > 0 ? podUid.substring(0, separatorIndex) : null; + } + + /** + * The container name is the first path segment of the stream's directory below + * the pod's log directory, per the kubelet layout + * /var/log/pods/<ns>_<pod>_<uid>/<container>/0.log + */ + public static String extractContainerName(String streamLogDir, String podLogDirectory, + String podUid) { + if (streamLogDir == null || podLogDirectory == null || podLogDirectory.isEmpty()) { + return null; + } + String podRoot = new File(new File(podLogDirectory), podUid).toPath().normalize().toString(); + String normalizedStreamDir = new File(streamLogDir).toPath().normalize().toString(); + if (!normalizedStreamDir.startsWith(podRoot + File.separator)) { + return null; + } + String relative = normalizedStreamDir.substring(podRoot.length() + 1); + int separatorIndex = relative.indexOf(File.separatorChar); + return separatorIndex == -1 ? relative : relative.substring(0, separatorIndex); + } + + public static boolean isLegalTopicName(String topic) { + return !topic.isEmpty() && topic.length() <= MAX_TOPIC_LENGTH && !topic.equals(".") + && !topic.equals("..") && LEGAL_TOPIC_PATTERN.matcher(topic).matches(); + } + + private static String decode(ByteBuffer buffer) { + return StandardCharsets.UTF_8.decode(buffer.duplicate()).toString(); + } +} diff --git a/singer/src/test/java/com/pinterest/singer/kubernetes/TestContainerLogStreams.java b/singer/src/test/java/com/pinterest/singer/kubernetes/TestContainerLogStreams.java new file mode 100644 index 00000000..0d588397 --- /dev/null +++ b/singer/src/test/java/com/pinterest/singer/kubernetes/TestContainerLogStreams.java @@ -0,0 +1,234 @@ +/** + * Copyright 2026 Pinterest, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.singer.kubernetes; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Executors; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.pinterest.singer.SingerTestBase; +import com.pinterest.singer.common.LogStream; +import com.pinterest.singer.common.SingerSettings; +import com.pinterest.singer.monitor.LogStreamManager; +import com.pinterest.singer.thrift.configuration.FileNameMatchMode; +import com.pinterest.singer.thrift.configuration.KubeConfig; +import com.pinterest.singer.thrift.configuration.SingerConfig; +import com.pinterest.singer.thrift.configuration.SingerLogConfig; +import com.pinterest.singer.utils.TopicTemplateResolver; + +/** + * Tests for tailing container stdout logs from the kubelet pod log directory layout + * (/var/log/pods/<namespace>_<pod>_<uid>/<container>/0.log) using a + * wildcard logDir ("/*"). The SingerLog is registered under the literal wildcard path and + * LogConfigUtils.findDirectories expands it, so each container directory must end up with + * its own LogStream whose directory is concrete — that is what lets the topic template + * resolver derive %{container} for per-container topic routing. + */ +public class TestContainerLogStreams { + + private SingerConfig config; + private KubeConfig kubeConfig; + private String podLogPath; + private Path tempDir; + + // Pod directory name from pods-goodresponse.json: namespace_name_uid + private static final String POD_NGINX_1 = "default_nginx-deployment-5c689d7589-abcde_12345678-1234-1234-1234-1234567890ab"; + + @BeforeClass + public static void beforeClass() throws IOException { + TestKubeService.ensureServerRunning(); + } + + @AfterClass + public static void afterClass() { + TestKubeService.removePodsContext(); + } + + @Before + public void before() throws IOException { + TestKubeService.removePodsContext(); + TestKubeService.registerGoodResponse(); + + LogStreamManager.getInstance().getSingerLogPaths().clear(); + SingerSettings.getFsMonitorMap().clear(); + // logConfigMap is not cleared by SingerSettings.reset(); without this, configs + // accumulate across test methods and bleed into each other's assertions + SingerSettings.getLogConfigMap().clear(); + LogStreamManager.reset(); + KubeService.reset(); + PodMetadataFetcher.reset(); + SingerSettings.reset(); + + SingerSettings.setBackgroundTaskExecutor( + Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build())); + + config = new SingerConfig(); + config.setKubernetesEnabled(true); + SingerSettings.setSingerConfig(config); + + kubeConfig = new KubeConfig(); + config.setKubeConfig(kubeConfig); + + tempDir = Files.createTempDirectory("container_log_test"); + podLogPath = tempDir.toAbsolutePath().toString(); + kubeConfig.setPodLogDirectory(podLogPath); + } + + @After + public void after() { + TestKubeService.removePodsContext(); + SingerSettings.getFsMonitorMap().clear(); + SingerSettings.getLogConfigMap().clear(); + if (tempDir != null) { + deleteDirectory(tempDir.toFile()); + } + LogStreamManager.reset(); + KubeService.reset(); + PodMetadataFetcher.reset(); + SingerSettings.reset(); + } + + @Test + public void testWildcardLogDirCreatesStreamPerContainerDirectory() throws Exception { + SingerLogConfig logConfig = createLogConfig("container_logs", "/*", "0.log"); + config.setLogConfigs(Arrays.asList(logConfig)); + SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + + createContainerLog(POD_NGINX_1, "web"); + createContainerLog(POD_NGINX_1, "sidecar"); + + LogStreamManager lsm = LogStreamManager.getInstance(); + KubeService instance = KubeService.getInstance(); + instance.start(); + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS); + + // the SingerLog is registered under the literal wildcard path + assertTrue("wildcard path should be registered", + lsm.getSingerLogPaths().containsKey(podLogPath + "/" + POD_NGINX_1 + "/*")); + + // findDirectories must have expanded the wildcard into one LogStream per + // container directory, each with its concrete directory + String webPath = podLogPath + "/" + POD_NGINX_1 + "/web"; + String sidecarPath = podLogPath + "/" + POD_NGINX_1 + "/sidecar"; + assertTrue("web container should have a log stream", + lsm.getDirStreams().containsKey(webPath)); + assertTrue("sidecar container should have a log stream", + lsm.getDirStreams().containsKey(sidecarPath)); + + // end to end: each container's stream resolves to its own topic + Set resolvedTopics = new HashSet<>(); + for (String containerPath : new String[] { webPath, sidecarPath }) { + Collection streams = lsm.getDirStreams().get(containerPath); + assertEquals(1, streams.size()); + LogStream stream = streams.iterator().next(); + assertEquals(containerPath, stream.getLogDir()); + resolvedTopics.add(TopicTemplateResolver + .resolve("logs_%{namespace}_%{container}", stream, podLogPath).get()); + } + assertEquals(new HashSet<>(Arrays.asList("logs_default_web", "logs_default_sidecar")), + resolvedTopics); + + instance.stop(); + } + + @Test + public void testWildcardAndExactConfigsDoNotShadowEachOther() throws Exception { + SingerLogConfig wildcardConfig = createLogConfig("all_containers", "/*", "0.log"); + SingerLogConfig exactConfig = createLogConfig("web_only", "/web", "0.log"); + config.setLogConfigs(Arrays.asList(wildcardConfig, exactConfig)); + SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + + createContainerLog(POD_NGINX_1, "web"); + + LogStreamManager lsm = LogStreamManager.getInstance(); + KubeService instance = KubeService.getInstance(); + instance.start(); + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS); + + // before the matching fix, the wildcard entry shadowed the exact entry and the + // exact config was never initialized + assertTrue("wildcard config should be registered", + lsm.getSingerLogPaths().containsKey(podLogPath + "/" + POD_NGINX_1 + "/*")); + assertTrue("exact-directory config should be registered too", + lsm.getSingerLogPaths().containsKey(podLogPath + "/" + POD_NGINX_1 + "/web")); + + instance.stop(); + } + + @Test + public void testNonMatchingDirectoriesAreNotRegistered() throws Exception { + SingerLogConfig exactConfig = createLogConfig("web_only", "/web", "0.log"); + config.setLogConfigs(Arrays.asList(exactConfig)); + SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + + createContainerLog(POD_NGINX_1, "sidecar"); + + LogStreamManager lsm = LogStreamManager.getInstance(); + KubeService instance = KubeService.getInstance(); + instance.start(); + Thread.sleep(SingerTestBase.FILE_EVENT_WAIT_TIME_MS); + + assertFalse(lsm.getSingerLogPaths() + .containsKey(podLogPath + "/" + POD_NGINX_1 + "/sidecar")); + + instance.stop(); + } + + private SingerLogConfig createLogConfig(String name, String logDir, String regex) { + SingerLogConfig logConfig = new SingerLogConfig(); + logConfig.setName(name); + logConfig.setLogDir(logDir); + logConfig.setLogStreamRegex(regex); + logConfig.setFilenameMatchMode(FileNameMatchMode.PREFIX); + return logConfig; + } + + private void createContainerLog(String podUid, String container) throws IOException { + new File(podLogPath + "/" + podUid + "/" + container).mkdirs(); + new File(podLogPath + "/" + podUid + "/" + container + "/0.log").createNewFile(); + } + + private static boolean deleteDirectory(File dir) { + if (dir.isDirectory()) { + String[] children = dir.list(); + if (children != null) { + for (String child : children) { + if (!deleteDirectory(new File(dir, child))) { + return false; + } + } + } + } + return dir.delete(); + } +} diff --git a/singer/src/test/java/com/pinterest/singer/monitor/TestTopicTemplateRouting.java b/singer/src/test/java/com/pinterest/singer/monitor/TestTopicTemplateRouting.java new file mode 100644 index 00000000..acb28bd8 --- /dev/null +++ b/singer/src/test/java/com/pinterest/singer/monitor/TestTopicTemplateRouting.java @@ -0,0 +1,126 @@ +/** + * Copyright 2026 Pinterest, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.singer.monitor; + +import static org.junit.Assert.assertEquals; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.pinterest.singer.common.LogStream; +import com.pinterest.singer.common.SingerLog; +import com.pinterest.singer.common.SingerSettings; +import com.pinterest.singer.thrift.configuration.KafkaProducerConfig; +import com.pinterest.singer.thrift.configuration.KafkaWriterConfig; +import com.pinterest.singer.thrift.configuration.KubeConfig; +import com.pinterest.singer.thrift.configuration.SingerConfig; +import com.pinterest.singer.thrift.configuration.SingerLogConfig; +import com.pinterest.singer.utils.SingerUtils; + +/** + * Tests the topic resolution precedence in DefaultLogMonitor: + * 1. writer.kafka.topicTemplate when it fully resolves to a legal topic name + * 2. writer.kafka.topic (with legacy \N capture-group expansion) otherwise + */ +public class TestTopicTemplateRouting { + + private static final String POD_LOG_DIRECTORY = "/var/log/pods"; + private static final String POD_UID = "payments_api-7f8d5b7c6-mnopq_98765432-7654-4321"; + + @Before + public void before() { + SingerSettings.reset(); + SingerConfig singerConfig = new SingerConfig(); + singerConfig.setKubernetesEnabled(true); + KubeConfig kubeConfig = new KubeConfig(); + kubeConfig.setPodLogDirectory(POD_LOG_DIRECTORY); + singerConfig.setKubeConfig(kubeConfig); + SingerSettings.setSingerConfig(singerConfig); + } + + @After + public void after() { + SingerSettings.reset(); + } + + private LogStream buildContainerStream(String container) { + SingerLogConfig logConfig = new SingerLogConfig(); + logConfig.setName(POD_UID + "..container_logs"); + logConfig.setLogDir(POD_LOG_DIRECTORY + "/" + POD_UID + "/" + container); + logConfig.setLogStreamRegex("0\\.log"); + SingerLog singerLog = new SingerLog(logConfig, POD_UID); + singerLog.addMetadata("app", SingerUtils.getByteBuf("paymentservice")); + return new LogStream(singerLog, "0.log"); + } + + @Test + public void testTemplateTakesPrecedenceWhenResolvable() throws Exception { + LogStream stream = buildContainerStream("web"); + KafkaWriterConfig writerConfig = + new KafkaWriterConfig("fallback_topic", new KafkaProducerConfig()); + writerConfig.setTopicTemplate("logs_%{namespace}_%{container}"); + assertEquals("logs_payments_web", DefaultLogMonitor.resolveTopic(stream, + stream.getSingerLog().getSingerLogConfig(), writerConfig)); + } + + @Test + public void testMetadataVariableResolution() throws Exception { + LogStream stream = buildContainerStream("web"); + KafkaWriterConfig writerConfig = + new KafkaWriterConfig("fallback_topic", new KafkaProducerConfig()); + writerConfig.setTopicTemplate("logs_%{metadata:app}"); + assertEquals("logs_paymentservice", DefaultLogMonitor.resolveTopic(stream, + stream.getSingerLog().getSingerLogConfig(), writerConfig)); + } + + @Test + public void testFallbackToStaticTopicWhenVariableUnresolvable() throws Exception { + LogStream stream = buildContainerStream("web"); + KafkaWriterConfig writerConfig = + new KafkaWriterConfig("fallback_topic", new KafkaProducerConfig()); + writerConfig.setTopicTemplate("logs_%{metadata:nonexistent}"); + assertEquals("fallback_topic", DefaultLogMonitor.resolveTopic(stream, + stream.getSingerLog().getSingerLogConfig(), writerConfig)); + } + + @Test + public void testFallbackForNonKubernetesStream() throws Exception { + SingerLogConfig logConfig = new SingerLogConfig(); + logConfig.setName("host_log"); + logConfig.setLogDir("/var/log/hostapp"); + logConfig.setLogStreamRegex("app\\.log"); + LogStream stream = new LogStream(new SingerLog(logConfig), "app.log"); + KafkaWriterConfig writerConfig = + new KafkaWriterConfig("fallback_topic", new KafkaProducerConfig()); + writerConfig.setTopicTemplate("logs_%{namespace}"); + assertEquals("fallback_topic", + DefaultLogMonitor.resolveTopic(stream, logConfig, writerConfig)); + } + + @Test + public void testLegacyCaptureGroupExpansionStillWorks() throws Exception { + SingerLogConfig logConfig = new SingerLogConfig(); + logConfig.setName("host_log"); + logConfig.setLogDir("/var/log/hostapp"); + logConfig.setLogStreamRegex("app_(.*)\\.log"); + LogStream stream = new LogStream(new SingerLog(logConfig), "app_web.log"); + KafkaWriterConfig writerConfig = + new KafkaWriterConfig("logs-\\1", new KafkaProducerConfig()); + assertEquals("logs-web", + DefaultLogMonitor.resolveTopic(stream, logConfig, writerConfig)); + } +} diff --git a/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java b/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java index af9aa031..3ca9cb54 100644 --- a/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java +++ b/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java @@ -47,6 +47,7 @@ import com.pinterest.singer.common.SingerConfigDef; import com.pinterest.singer.config.ConfigFileWatcher; import com.pinterest.singer.thrift.configuration.KafkaProducerConfig; +import com.pinterest.singer.thrift.configuration.KafkaWriterConfig; import com.pinterest.singer.thrift.configuration.LogStreamProcessorConfig; import com.pinterest.singer.thrift.configuration.MemqWriterConfig; import com.pinterest.singer.thrift.configuration.RealpinWriterConfig; @@ -504,4 +505,62 @@ public void testRegexTransformerConfigurations() throws Exception { assertNotNull(regexBasedModifierConfig); } + + @Test + public void testKafkaWriterTopicTemplateParsing() throws Exception { + String CONFIG = "" + "type=kafka\n" + + "kafka.topic=fallback_topic\n" + + "kafka.topicTemplate=logs_%{namespace}_%{container}\n" + + "kafka.producerConfig.bootstrap.servers=localhost:9092\n"; + PropertiesConfiguration config = new PropertiesConfiguration(); + config.load(new ByteArrayInputStream(CONFIG.getBytes())); + KafkaWriterConfig kafkaWriterConfig = + LogConfigUtils.parseLogStreamWriterConfig(config).getKafkaWriterConfig(); + assertEquals("fallback_topic", kafkaWriterConfig.getTopic()); + assertEquals("logs_%{namespace}_%{container}", kafkaWriterConfig.getTopicTemplate()); + } + + @Test + public void testKafkaWriterWithoutTopicTemplate() throws Exception { + String CONFIG = "" + "type=kafka\n" + + "kafka.topic=plain_topic\n" + + "kafka.producerConfig.bootstrap.servers=localhost:9092\n"; + PropertiesConfiguration config = new PropertiesConfiguration(); + config.load(new ByteArrayInputStream(CONFIG.getBytes())); + KafkaWriterConfig kafkaWriterConfig = + LogConfigUtils.parseLogStreamWriterConfig(config).getKafkaWriterConfig(); + assertFalse(kafkaWriterConfig.isSetTopicTemplate()); + } + + @Test + public void testKafkaWriterTopicTemplateRejectsPodLevelVariables() throws Exception { + String CONFIG = "" + "type=kafka\n" + + "kafka.topic=fallback_topic\n" + + "kafka.topicTemplate=logs_%{podName}\n" + + "kafka.producerConfig.bootstrap.servers=localhost:9092\n"; + PropertiesConfiguration config = new PropertiesConfiguration(); + config.load(new ByteArrayInputStream(CONFIG.getBytes())); + try { + LogConfigUtils.parseLogStreamWriterConfig(config); + fail("topicTemplate with %{podName} should be rejected at config load time"); + } catch (ConfigurationException e) { + assertTrue(e.getMessage().contains("cardinality")); + } + } + + @Test + public void testKafkaWriterTopicTemplateRejectsUnknownVariable() throws Exception { + String CONFIG = "" + "type=kafka\n" + + "kafka.topic=fallback_topic\n" + + "kafka.topicTemplate=logs_%{cluster}\n" + + "kafka.producerConfig.bootstrap.servers=localhost:9092\n"; + PropertiesConfiguration config = new PropertiesConfiguration(); + config.load(new ByteArrayInputStream(CONFIG.getBytes())); + try { + LogConfigUtils.parseLogStreamWriterConfig(config); + fail("topicTemplate with unknown variable should be rejected at config load time"); + } catch (ConfigurationException e) { + assertTrue(e.getMessage().contains("Allowed variables")); + } + } } diff --git a/singer/src/test/java/com/pinterest/singer/utils/TestTopicTemplateResolver.java b/singer/src/test/java/com/pinterest/singer/utils/TestTopicTemplateResolver.java new file mode 100644 index 00000000..6399e55e --- /dev/null +++ b/singer/src/test/java/com/pinterest/singer/utils/TestTopicTemplateResolver.java @@ -0,0 +1,184 @@ +/** + * Copyright 2026 Pinterest, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.singer.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Optional; + +import org.junit.Test; + +import com.pinterest.singer.common.LogStream; +import com.pinterest.singer.common.SingerLog; +import com.pinterest.singer.thrift.configuration.SingerLogConfig; + +public class TestTopicTemplateResolver { + + private static final String POD_LOG_DIRECTORY = "/var/log/pods"; + private static final String POD_UID = "payments_api-7f8d5b7c6-mnopq_98765432-7654-4321"; + + private LogStream buildPodStream(String podUid, String streamDir) { + SingerLogConfig logConfig = new SingerLogConfig(); + logConfig.setName(podUid + "..container_logs"); + logConfig.setLogDir(streamDir); + logConfig.setLogStreamRegex("0.log"); + SingerLog singerLog = new SingerLog(logConfig, podUid); + singerLog.addMetadata("app", SingerUtils.getByteBuf("paymentservice")); + singerLog.addMetadata("bad-value", SingerUtils.getByteBuf("has spaces!")); + return new LogStream(singerLog, "0.log"); + } + + // ---- validateTemplate ---- + + @Test + public void testValidateAcceptsSupportedVariables() { + TopicTemplateResolver.validateTemplate("logs_%{namespace}_%{container}"); + TopicTemplateResolver.validateTemplate("%{metadata:app}"); + TopicTemplateResolver.validateTemplate("pl.%{namespace}.%{metadata:singer-topic}"); + } + + @Test + public void testValidateRejectsPodLevelIdentifiers() { + for (String variable : new String[] { "podName", "podUid", "pod" }) { + try { + TopicTemplateResolver.validateTemplate("logs_%{" + variable + "}"); + fail("%{" + variable + "} should be rejected"); + } catch (IllegalArgumentException e) { + assertTrue("error should explain cardinality concern for " + variable, + e.getMessage().contains("cardinality")); + } + } + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateRejectsUnknownVariable() { + TopicTemplateResolver.validateTemplate("logs_%{cluster}"); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateRejectsMetadataWithoutKey() { + TopicTemplateResolver.validateTemplate("logs_%{metadata}"); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateRejectsArgumentOnNamespace() { + TopicTemplateResolver.validateTemplate("logs_%{namespace:foo}"); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateRejectsTemplateWithoutVariables() { + TopicTemplateResolver.validateTemplate("static_topic"); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateRejectsIllegalLiteralCharacters() { + TopicTemplateResolver.validateTemplate("logs/%{namespace}"); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateRejectsUnclosedVariable() { + TopicTemplateResolver.validateTemplate("logs_%{namespace"); + } + + @Test(expected = IllegalArgumentException.class) + public void testValidateRejectsEmptyTemplate() { + TopicTemplateResolver.validateTemplate(""); + } + + // ---- resolve ---- + + @Test + public void testResolveNamespaceContainerAndMetadata() { + LogStream stream = buildPodStream(POD_UID, + POD_LOG_DIRECTORY + "/" + POD_UID + "/maincontainer"); + Optional resolved = TopicTemplateResolver + .resolve("logs_%{namespace}_%{container}", stream, POD_LOG_DIRECTORY); + assertEquals("logs_payments_maincontainer", resolved.get()); + + resolved = TopicTemplateResolver + .resolve("pl.%{namespace}.%{metadata:app}", stream, POD_LOG_DIRECTORY); + assertEquals("pl.payments.paymentservice", resolved.get()); + } + + @Test + public void testResolveFailsForMissingMetadataKey() { + LogStream stream = buildPodStream(POD_UID, POD_LOG_DIRECTORY + "/" + POD_UID + "/web"); + assertFalse(TopicTemplateResolver + .resolve("logs_%{metadata:nonexistent}", stream, POD_LOG_DIRECTORY).isPresent()); + } + + @Test + public void testResolveFailsForNonKubernetesStream() { + SingerLogConfig logConfig = new SingerLogConfig(); + logConfig.setName("host_log"); + logConfig.setLogDir("/var/log/hostapp"); + // SingerLog without pod uid == host-level stream + LogStream stream = new LogStream(new SingerLog(logConfig), "app.log"); + assertFalse(TopicTemplateResolver + .resolve("logs_%{namespace}", stream, POD_LOG_DIRECTORY).isPresent()); + } + + @Test + public void testResolveFailsForContainerAtPodRoot() { + // stream directly at the pod root has no container path segment + LogStream stream = buildPodStream(POD_UID, POD_LOG_DIRECTORY + "/" + POD_UID); + assertFalse(TopicTemplateResolver + .resolve("logs_%{container}", stream, POD_LOG_DIRECTORY).isPresent()); + } + + @Test + public void testResolveFailsForMalformedPodDirectoryName() { + // no namespace separator in the pod directory name + LogStream stream = buildPodStream("nounderscore", + POD_LOG_DIRECTORY + "/nounderscore/web"); + assertFalse(TopicTemplateResolver + .resolve("logs_%{namespace}", stream, POD_LOG_DIRECTORY).isPresent()); + } + + @Test + public void testResolveFailsForIllegalResolvedTopic() { + LogStream stream = buildPodStream(POD_UID, POD_LOG_DIRECTORY + "/" + POD_UID + "/web"); + // metadata value contains characters that are illegal in a Kafka topic name + assertFalse(TopicTemplateResolver + .resolve("logs_%{metadata:bad-value}", stream, POD_LOG_DIRECTORY).isPresent()); + } + + @Test + public void testContainerIsFirstSegmentBelowPodRoot() { + // volume-mounted log layout: /var/log/pods//var/log — "container" resolves to the + // first path segment which is only meaningful with the kubelet stdout layout + LogStream stream = buildPodStream(POD_UID, POD_LOG_DIRECTORY + "/" + POD_UID + "/var/log"); + assertEquals("logs_var", + TopicTemplateResolver.resolve("logs_%{container}", stream, POD_LOG_DIRECTORY).get()); + } + + @Test + public void testIsLegalTopicName() { + assertTrue(TopicTemplateResolver.isLegalTopicName("logs_payments.web-1")); + assertFalse(TopicTemplateResolver.isLegalTopicName("")); + assertFalse(TopicTemplateResolver.isLegalTopicName(".")); + assertFalse(TopicTemplateResolver.isLegalTopicName("..")); + assertFalse(TopicTemplateResolver.isLegalTopicName("has space")); + StringBuilder tooLong = new StringBuilder(); + for (int i = 0; i < 250; i++) { + tooLong.append('a'); + } + assertFalse(TopicTemplateResolver.isLegalTopicName(tooLong.toString())); + } +}