Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rosbag2_compression/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ if(BUILD_TESTING)
)

ament_add_gmock(test_sequential_compression_writer
test/rosbag2_compression/test_sequential_compression_writer.cpp)
test/rosbag2_compression/test_sequential_compression_writer.cpp
test/rosbag2_compression/fake_compressor.cpp)
target_link_libraries(test_sequential_compression_writer
${PROJECT_NAME}
rosbag2_cpp::rosbag2_cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_
#define ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_

#include <memory>
#include <string>

#include "rosbag2_compression/compression_factory.hpp"
#include "fake_compressor.hpp"

class FakeCompressionFactory
: public rosbag2_compression::CompressionFactory
{
public:
FakeCompressionFactory() = delete;

~FakeCompressionFactory() override = default;

explicit FakeCompressionFactory(int & detected_thread_priority)
: detected_thread_priority_(detected_thread_priority) {}

std::shared_ptr<rosbag2_compression::BaseCompressorInterface>
create_compressor(const std::string & /*compression_format*/) override
{
return std::make_shared<FakeCompressor>(detected_thread_priority_);
}

private:
int & detected_thread_priority_;
};

#endif // ROSBAG2_COMPRESSION__FAKE_COMPRESSION_FACTORY_HPP_
18 changes: 17 additions & 1 deletion rosbag2_compression/test/rosbag2_compression/fake_compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,27 @@
// limitations under the License.

#include <string>
#ifdef _WIN32
#include <windows.h>
#else
#include <sys/resource.h>
#endif

#include "pluginlib/class_list_macros.hpp"

#include "fake_compressor.hpp"

FakeCompressor::FakeCompressor(int & detected_thread_priority)
{
#ifndef _WIN32
int cur_nice_value = getpriority(PRIO_PROCESS, 0);
if (cur_nice_value != -1 && errno == 0) {
detected_thread_priority = cur_nice_value;
}
#else
detected_thread_priority = GetThreadPriority(GetCurrentThread());
#endif
}

std::string FakeCompressor::compress_uri(const std::string & uri)
{
return uri + "." + get_compression_identifier();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class FakeCompressor : public rosbag2_compression::BaseCompressorInterface
public:
FakeCompressor() = default;

explicit FakeCompressor(int & detected_thread_priority);

std::string compress_uri(const std::string & uri) override;

void compress_serialized_bag_message(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@
#include "mock_storage_factory.hpp"

#include "mock_compression_factory.hpp"
#include "fake_compression_factory.hpp"

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#include <sys/resource.h>
#endif

using namespace testing; // NOLINT
Expand Down Expand Up @@ -401,82 +399,37 @@ TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_siz
EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
}


TEST_P(SequentialCompressionWriterTest, writer_sets_nice_value)
TEST_P(SequentialCompressionWriterTest, writer_sets_threads_priority)
{
const std::string test_topic_name = "test_topic";
const std::string test_topic_type = "test_msgs/BasicTypes";
const uint64_t kCompressionQueueSize = GetParam();
const int wanted_nice_value = 10;

class TestCompressor : public rosbag2_compression::BaseCompressorInterface
{
int & detected_nice_value;

public:
TestCompressor(int & detected_nice_value)
: detected_nice_value(detected_nice_value) {}
virtual std::string compress_uri(const std::string & uri)
{
return uri;
}

virtual void compress_serialized_bag_message(
const rosbag2_storage::SerializedBagMessage * bag_message,
rosbag2_storage::SerializedBagMessage * compressed_message)
{
#ifdef _WIN32
int cur_nice_value = getpriority(PRIO_PROCESS, 0);
if (cur_nice_value != -1 && errno == 0) {

detected_nice_value = cur_nice_value;
}
#ifndef _WIN32
const int wanted_thread_priority = 10;
#else
//FIXME implement windows version
const int wanted_thread_priority = THREAD_MODE_BACKGROUND_BEGIN;
#endif

*compressed_message = *bag_message;
}

/**
* Get the identifier of the compression algorithm.
* This is appended to the extension of the compressed file.
*/
virtual std::string get_compression_identifier() const
{
return "niceTest";
}
};

class FakeFactory : public rosbag2_compression::CompressionFactory
{
int & detected_nice_value;

public:
FakeFactory(int & detected_nice_value)
: detected_nice_value(detected_nice_value) {}

virtual std::shared_ptr<rosbag2_compression::BaseCompressorInterface>
create_compressor(const std::string & /*compression_format*/)
{
return std::make_shared<TestCompressor>(detected_nice_value);
}
};

// queue size should be 0 or at least the number of remaining messages to prevent message loss
rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
kCompressionQueueSize,
kDefaultCompressionQueueThreads,
wanted_nice_value
wanted_thread_priority
};

#ifndef _WIN32
// nice values are in the range from -20 to +19, so this value will never be read
int detected_nice_value = 100;
int detected_thread_priority = 100;
#else
int detected_thread_priority = THREAD_PRIORITY_ERROR_RETURN;
#endif

initializeFakeFileStorage();
initializeWriter(compression_options, std::make_unique<FakeFactory>(detected_nice_value));
initializeWriter(
compression_options,
std::make_unique<FakeCompressionFactory>(detected_thread_priority));

writer_->open(tmp_dir_storage_options_);
writer_->create_topic({test_topic_name, test_topic_type, "", "", ""});
Expand All @@ -490,8 +443,7 @@ TEST_P(SequentialCompressionWriterTest, writer_sets_nice_value)
}
writer_.reset(); // reset will call writer destructor

EXPECT_EQ(detected_nice_value, *compression_options.thread_nice_value);

EXPECT_EQ(detected_thread_priority, *compression_options.thread_nice_value);
EXPECT_EQ(fake_storage_size_, kNumMessagesToWrite);
}

Expand Down