diff --git a/singer-commons/src/main/java/com/pinterest/singer/metrics/OpenTsdbStatsPusher.java b/singer-commons/src/main/java/com/pinterest/singer/metrics/OpenTsdbStatsPusher.java index 564086ab..903b7445 100644 --- a/singer-commons/src/main/java/com/pinterest/singer/metrics/OpenTsdbStatsPusher.java +++ b/singer-commons/src/main/java/com/pinterest/singer/metrics/OpenTsdbStatsPusher.java @@ -61,7 +61,12 @@ public void configure(String sourceHostname, long pollMillis) throws IOException { super.configure(sourceHostname, metricsPrefix, destinationHost, destinationPort, pollMillis); this.client = new OpenTsdbClient(destinationHost, destinationPort); - this.converter = new OpenTsdbMetricConverter(metricsPrefix, sourceHostname); + String podName = System.getenv("POD_NAME"); + if (podName != null) { + this.converter = new OpenTsdbMetricConverter(metricsPrefix, "host=" + sourceHostname, "podName=" + podName); + } else { + this.converter = new OpenTsdbMetricConverter(metricsPrefix, sourceHostname); + } } @SuppressWarnings("unchecked") diff --git a/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java b/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java index 9dfc3411..277c5c16 100644 --- a/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java +++ b/singer/src/main/java/com/pinterest/singer/monitor/LogStreamManager.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -42,15 +43,11 @@ import java.util.stream.Stream; import org.apache.commons.io.FilenameUtils; -import org.apache.commons.io.comparator.CompositeFileComparator; -import org.apache.commons.io.comparator.LastModifiedFileComparator; -import org.apache.commons.io.comparator.NameFileComparator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Ordering; import com.pinterest.singer.common.LogStream; import com.pinterest.singer.common.SingerSettings; import com.pinterest.singer.common.SingerLog; @@ -411,7 +408,6 @@ public LogStream createLogStream(SingerLog singerLog, Path fullAddedPath) throws * @throws LogStreamException If there is an error creating the log stream * @throws IOException If there is an error reading from disk */ - @SuppressWarnings("unchecked") private void createPrefixBasedLogStreams(SingerLog singerLog, SingerLogConfig singerLogConfig, File logDir, File[] files, Path logDirPath) throws IOException { Map nameToLogStream = new HashMap<>(); @@ -430,16 +426,27 @@ private void createPrefixBasedLogStreams(SingerLog singerLog, FileFilter fileFilter = file -> pattern.matcher(file.getName()).matches(); List logFiles = Arrays.asList(logDir.listFiles(fileFilter)); - // Sort the file first by last_modified timestamp and then by name in case two files have - // the same mtime due to precision (mtime is up to seconds). + // snapshot the latest file timestamps + final Map latestFileTimestamps = new HashMap<>(); + for (File file : logFiles) { + latestFileTimestamps.put(file, file.lastModified()); + } + try { - @SuppressWarnings("rawtypes") - Ordering ordering = Ordering.from( - new CompositeFileComparator( - LastModifiedFileComparator.LASTMODIFIED_COMPARATOR, NameFileComparator.NAME_REVERSE)); - logFiles = ordering.sortedCopy(logFiles); - } catch (Exception e) { + // Sort the files by last_modified timestamp and then by name in case two files have + // the same mtime due to precision (mtime is up to seconds). + Comparator comparator = (f1, f2) -> { + int result = Long.compare(latestFileTimestamps.get(f1), latestFileTimestamps.get(f2)); + if (result == 0) { + return f2.getName().compareTo(f1.getName()); + } + return result; + }; + logFiles.sort(comparator); + } catch (IllegalArgumentException e) { Stats.incr(SingerMetrics.LOGSTREAM_SORT_EXCEPTION); + LOG.error("Exception during file sorting for log directory: {}. Files count: {}", + logDir.getAbsolutePath(), logFiles.size(), e); throw e; } @@ -558,8 +565,10 @@ public void stop() { * Reset LogStream manager for testing purpose */ public static void reset() { - instance.stop(); - instance = null; + if (instance != null) { + instance.stop(); + instance = null; + } } public FileSystemEventFetcher getRecursiveDirectoryWatcher() { diff --git a/singer/src/main/java/com/pinterest/singer/monitor/RecursiveFSEventProcessor.java b/singer/src/main/java/com/pinterest/singer/monitor/RecursiveFSEventProcessor.java index 25fba2c5..d0dce73e 100644 --- a/singer/src/main/java/com/pinterest/singer/monitor/RecursiveFSEventProcessor.java +++ b/singer/src/main/java/com/pinterest/singer/monitor/RecursiveFSEventProcessor.java @@ -174,10 +174,12 @@ public void evaluateAndRegisterLogStreamOrWatcher(Path path, String podUid) { // only register new watchers if there are any log configurations for it's // subdirectories if (directoryDepth <= SingerSettings.getLogConfigMap().lastKey().getLeft()) { - // register for recursive watch if this event wasn't for a file - LOG.info("Registering recursive watch for path:" + path); try { - lsm.getRecursiveDirectoryWatcher().registerPath(path); + if (path.toFile().isDirectory()) { + // register for recursive watch if this event wasn't for a file + LOG.info("Registering recursive watch for path:" + path); + lsm.getRecursiveDirectoryWatcher().registerPath(path); + } } catch (IOException e) { LOG.error("Problem with recursive directory watch for path:" + path, e); } diff --git a/singer/src/test/java/com/pinterest/singer/kubernetes/TestPodAllowlist.java b/singer/src/test/java/com/pinterest/singer/kubernetes/TestPodAllowlist.java index 820d6891..fdd8d2fc 100644 --- a/singer/src/test/java/com/pinterest/singer/kubernetes/TestPodAllowlist.java +++ b/singer/src/test/java/com/pinterest/singer/kubernetes/TestPodAllowlist.java @@ -21,15 +21,16 @@ import java.io.File; import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.concurrent.Executors; +import org.apache.commons.io.IOUtils; import org.junit.After; -import org.junit.AfterClass; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -40,6 +41,9 @@ import com.pinterest.singer.thrift.configuration.KubeConfig; import com.pinterest.singer.thrift.configuration.SingerConfig; import com.pinterest.singer.thrift.configuration.SingerLogConfig; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; /** * Tests for the pod allowlist feature which filters log stream initialization @@ -47,34 +51,22 @@ * * Uses the shared HTTP server from TestKubeService and pods-goodresponse.json. */ +@SuppressWarnings("restriction") public class TestPodAllowlist { private SingerConfig config; private KubeConfig kubeConfig; private String podLogPath; private Path tempDir; + private HttpServer server; // Pod directory names from pods-goodresponse.json: namespace_name_uid private static final String POD_NGINX_1 = "default_nginx-deployment-5c689d7589-abcde_12345678-1234-1234-1234-1234567890ab"; private static final String POD_BACKEND = "default_backend-service-7987d5b5c-12345_54321678-9876-5432-9876-5432198765ac"; private static final String POD_DATABASE = "default_database-7f8d5b7c6-mnopq_98765432-7654-4321-6543-987654321098"; - @BeforeClass - public static void beforeClass() throws IOException { - TestKubeService.ensureServerRunning(); - } - - @AfterClass - public static void afterClass() { - TestKubeService.removePodsContext(); - } - @Before public void before() throws IOException { - TestKubeService.removePodsContext(); - TestKubeService.registerGoodResponse(); - - LogStreamManager.getInstance().getSingerLogPaths().clear(); SingerSettings.getFsMonitorMap().clear(); LogStreamManager.reset(); KubeService.reset(); @@ -95,19 +87,26 @@ public void before() throws IOException { podLogPath = tempDir.toAbsolutePath().toString(); kubeConfig.setPodLogDirectory(podLogPath); kubeConfig.setPodMetadataFields(Arrays.asList("name")); + + server = HttpServer.create(new InetSocketAddress(0), 0); + server.start(); + kubeConfig.setKubeletPort(String.valueOf(server.getAddress().getPort())); + registerGoodResponse(); } @After public void after() { - TestKubeService.removePodsContext(); - SingerSettings.getFsMonitorMap().clear(); - if (tempDir != null) { - deleteDirectory(tempDir.toFile()); + if (server != null) { + server.stop(0); + server = null; } LogStreamManager.reset(); KubeService.reset(); PodMetadataFetcher.reset(); SingerSettings.reset(); + if (tempDir != null) { + deleteDirectory(tempDir.toFile()); + } } @Test @@ -117,7 +116,7 @@ public void testAllowlistDisabledWhenMetadataKeyNotConfigured() throws Exception logConfig.setPodAllowlist(Arrays.asList("nginx-deployment-5c689d7589-abcde")); // Only allow this pod config.setLogConfigs(Arrays.asList(logConfig)); - SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + SingerSettings.initializeConfigMap(config); createPodDirectory(POD_BACKEND, "/var/log", "app.log"); @@ -142,7 +141,7 @@ public void testAllowlistMatchAllowsInitialization() throws Exception { "nginx-deployment-5c689d7589-fghij")); config.setLogConfigs(Arrays.asList(logConfig)); - SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + SingerSettings.initializeConfigMap(config); createPodDirectory(POD_NGINX_1, "/var/log", "app.log"); @@ -168,7 +167,7 @@ public void testAllowlistNoMatchSkipsInitialization() throws Exception { "nginx-deployment-5c689d7589-fghij")); config.setLogConfigs(Arrays.asList(logConfig)); - SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + SingerSettings.initializeConfigMap(config); createPodDirectory(POD_BACKEND, "/var/log", "app.log"); @@ -189,7 +188,7 @@ public void testConfigWithoutAllowlistInitializesForAllPods() throws Exception { SingerLogConfig logConfig = createLogConfig("test-log", "/var/log", "app.log"); config.setLogConfigs(Arrays.asList(logConfig)); - SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + SingerSettings.initializeConfigMap(config); createPodDirectory(POD_DATABASE, "/var/log", "app.log"); @@ -218,7 +217,7 @@ public void testMultipleConfigsWithDifferentAllowlists() throws Exception { SingerLogConfig logConfig3 = createLogConfig("log-universal", "/var/log/common", "common.log"); config.setLogConfigs(Arrays.asList(logConfig1, logConfig2, logConfig3)); - SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + SingerSettings.initializeConfigMap(config); new File(podLogPath + "/" + POD_NGINX_1 + "/var/log/nginx").mkdirs(); new File(podLogPath + "/" + POD_NGINX_1 + "/var/log/nginx/nginx.log").createNewFile(); @@ -251,7 +250,7 @@ public void testHostOnlyConfigSkipsPodsInitialization() throws Exception { hostOnlyConfig.setPodAllowlist(Arrays.asList("__HOST__")); config.setLogConfigs(Arrays.asList(hostOnlyConfig)); - SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + SingerSettings.initializeConfigMap(config); createPodDirectory(POD_NGINX_1, "/var/log", "host.log"); @@ -279,7 +278,7 @@ public void testIncludeHostMarkerWithPodIds() throws Exception { "nginx-deployment-5c689d7589-abcde")); config.setLogConfigs(Arrays.asList(mixedConfig)); - SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + SingerSettings.initializeConfigMap(config); LogStreamManager.initializeLogStreams(); LogStreamManager lsm = LogStreamManager.getInstance(); @@ -311,7 +310,7 @@ public void testPrefixMatchingWithPodIds() throws Exception { prefixConfig.setPodAllowlist(Arrays.asList("nginx-")); config.setLogConfigs(Arrays.asList(prefixConfig)); - SingerSettings.getLogConfigMap().putAll(SingerSettings.loadLogConfigMap(config)); + SingerSettings.initializeConfigMap(config); createPodDirectory(POD_NGINX_1, "/var/log", "prefix.log"); @@ -340,6 +339,21 @@ private void createPodDirectory(String podUid, String logDir, String logFile) th new File(podLogPath + "/" + podUid + logDir + "/" + logFile).createNewFile(); } + private void registerGoodResponse() { + server.createContext("/pods", new HttpHandler() { + @Override + public void handle(HttpExchange exchange) throws IOException { + String response = new String( + Files.readAllBytes(new File("src/test/resources/pods-goodresponse.json").toPath()), + "utf-8"); + exchange.getResponseHeaders().add("Content-Type", "text/html"); + exchange.sendResponseHeaders(HttpURLConnection.HTTP_OK, response.length()); + IOUtils.write(response, exchange.getResponseBody()); + exchange.close(); + } + }); + } + private static boolean deleteDirectory(File dir) { if (dir.isDirectory()) { String[] children = dir.list(); diff --git a/singer/src/test/java/com/pinterest/singer/monitor/FileSystemMonitorTest.java b/singer/src/test/java/com/pinterest/singer/monitor/FileSystemMonitorTest.java index eef3673d..5c56594c 100644 --- a/singer/src/test/java/com/pinterest/singer/monitor/FileSystemMonitorTest.java +++ b/singer/src/test/java/com/pinterest/singer/monitor/FileSystemMonitorTest.java @@ -37,7 +37,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.ArrayList; import java.util.Map; import java.util.regex.Pattern; @@ -205,10 +204,12 @@ public void testFilesAlreadyPresent() throws Exception { List logStreams = LogStreamManager.getLogStreamsFor(testDir.toPath(), created[0].toPath()); assertEquals(1, logStreams.size()); verifyFiles(createdFiles, logStreams.get(0)); + assertTrue("LogStream 1 should have consistent sort order", logStreams.get(0).checkConsistency()); List logStreams2 = LogStreamManager.getLogStreamsFor(testDir.toPath(), created2[0].toPath()); assertEquals(1, logStreams2.size()); verifyFiles(createdFiles2, logStreams2.get(0)); + assertTrue("LogStream 2 should have consistent sort order", logStreams2.get(0).checkConsistency()); }