Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ public void configure(String sourceHostname,
long pollMillis) throws IOException {
super.configure(sourceHostname, metricsPrefix, destinationHost, destinationPort, pollMillis);
this.client = new OpenTsdbClient(destinationHost, destinationPort);
this.converter = new OpenTsdbMetricConverter(metricsPrefix, sourceHostname);
String podName = System.getenv("POD_NAME");
if (podName != null) {
this.converter = new OpenTsdbMetricConverter(metricsPrefix, "host=" + sourceHostname, "podName=" + podName);
} else {
this.converter = new OpenTsdbMetricConverter(metricsPrefix, sourceHostname);
}
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -42,15 +43,11 @@
import java.util.stream.Stream;

import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.comparator.CompositeFileComparator;
import org.apache.commons.io.comparator.LastModifiedFileComparator;
import org.apache.commons.io.comparator.NameFileComparator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import com.pinterest.singer.common.LogStream;
import com.pinterest.singer.common.SingerSettings;
import com.pinterest.singer.common.SingerLog;
Expand Down Expand Up @@ -411,7 +408,6 @@ public LogStream createLogStream(SingerLog singerLog, Path fullAddedPath) throws
* @throws LogStreamException If there is an error creating the log stream
* @throws IOException If there is an error reading from disk
*/
@SuppressWarnings("unchecked")
private void createPrefixBasedLogStreams(SingerLog singerLog,
SingerLogConfig singerLogConfig, File logDir, File[] files, Path logDirPath) throws IOException {
Map<String, LogStream> nameToLogStream = new HashMap<>();
Expand All @@ -430,16 +426,27 @@ private void createPrefixBasedLogStreams(SingerLog singerLog,
FileFilter fileFilter = file -> pattern.matcher(file.getName()).matches();
List<File> logFiles = Arrays.asList(logDir.listFiles(fileFilter));

// Sort the file first by last_modified timestamp and then by name in case two files have
// the same mtime due to precision (mtime is up to seconds).
// snapshot the latest file timestamps
final Map<File, Long> latestFileTimestamps = new HashMap<>();
for (File file : logFiles) {
latestFileTimestamps.put(file, file.lastModified());
}

try {
@SuppressWarnings("rawtypes")
Ordering ordering = Ordering.from(
new CompositeFileComparator(
LastModifiedFileComparator.LASTMODIFIED_COMPARATOR, NameFileComparator.NAME_REVERSE));
logFiles = ordering.sortedCopy(logFiles);
} catch (Exception e) {
// Sort the files by last_modified timestamp and then by name in case two files have
// the same mtime due to precision (mtime is up to seconds).
Comparator<File> comparator = (f1, f2) -> {
int result = Long.compare(latestFileTimestamps.get(f1), latestFileTimestamps.get(f2));
if (result == 0) {
return f2.getName().compareTo(f1.getName());
}
return result;
};
logFiles.sort(comparator);
} catch (IllegalArgumentException e) {
Stats.incr(SingerMetrics.LOGSTREAM_SORT_EXCEPTION);
LOG.error("Exception during file sorting for log directory: {}. Files count: {}",
logDir.getAbsolutePath(), logFiles.size(), e);
throw e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,12 @@ public void evaluateAndRegisterLogStreamOrWatcher(Path path, String podUid) {
// only register new watchers if there are any log configurations for it's
// subdirectories
if (directoryDepth <= SingerSettings.getLogConfigMap().lastKey().getLeft()) {
// register for recursive watch if this event wasn't for a file
LOG.info("Registering recursive watch for path:" + path);
try {
lsm.getRecursiveDirectoryWatcher().registerPath(path);
if (path.toFile().isDirectory()) {
// register for recursive watch if this event wasn't for a file
LOG.info("Registering recursive watch for path:" + path);
lsm.getRecursiveDirectoryWatcher().registerPath(path);
}
} catch (IOException e) {
LOG.error("Problem with recursive directory watch for path:" + path, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ public static void beforeClass() throws IOException {
@AfterClass
public static void afterClass() {
TestKubeService.removePodsContext();
// Stop the HTTP server that was created by ensureServerRunning()
// Give it 1 second to complete any pending exchanges
if (TestKubeService.server != null) {
TestKubeService.server.stop(1);
TestKubeService.server = null;
}
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.ArrayList;
import java.util.Map;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -205,10 +204,12 @@ public void testFilesAlreadyPresent() throws Exception {
List<LogStream> logStreams = LogStreamManager.getLogStreamsFor(testDir.toPath(), created[0].toPath());
assertEquals(1, logStreams.size());
verifyFiles(createdFiles, logStreams.get(0));
assertTrue("LogStream 1 should have consistent sort order", logStreams.get(0).checkConsistency());

List<LogStream> logStreams2 = LogStreamManager.getLogStreamsFor(testDir.toPath(), created2[0].toPath());
assertEquals(1, logStreams2.size());
verifyFiles(createdFiles2, logStreams2.get(0));
assertTrue("LogStream 2 should have consistent sort order", logStreams2.get(0).checkConsistency());
}


Expand Down
Loading