|
27 | 27 | #include <vector> |
28 | 28 |
|
29 | 29 | #include "base/pegasus_key_schema.h" |
| 30 | +#include "base/pegasus_value_schema.h" |
30 | 31 | #include "common/gpid.h" |
31 | 32 | #include "duplication_internal_types.h" |
32 | 33 | #include "gtest/gtest.h" |
|
43 | 44 | #include "task/task_code.h" |
44 | 45 | #include "utils/blob.h" |
45 | 46 | #include "utils/fail_point.h" |
| 47 | +#include "utils/string_conv.h" |
46 | 48 |
|
47 | | -namespace pegasus { |
48 | | -namespace server { |
| 49 | +namespace pegasus::server { |
49 | 50 |
|
50 | 51 | class pegasus_write_service_test : public pegasus_server_test_base |
51 | 52 | { |
@@ -219,6 +220,32 @@ class pegasus_write_service_test : public pegasus_server_test_base |
219 | 220 | ASSERT_EQ(_write_svc->_impl->_rocksdb_wrapper->_write_batch->Count(), 0); |
220 | 221 | ASSERT_EQ(_write_svc->_impl->_update_responses.size(), 0); |
221 | 222 | } |
| 223 | + |
| 224 | + void db_get(const dsn::blob &raw_key, db_get_context *get_ctx) |
| 225 | + { |
| 226 | + ASSERT_EQ(rocksdb::Status::kOk, |
| 227 | + _write_svc->_impl->_rocksdb_wrapper->get(raw_key.to_string_view(), get_ctx)); |
| 228 | + } |
| 229 | + |
| 230 | + void get_value_from_db(const dsn::blob &raw_key, std::string &user_value) |
| 231 | + { |
| 232 | + db_get_context get_ctx; |
| 233 | + db_get(raw_key, &get_ctx); |
| 234 | + ASSERT_TRUE(get_ctx.found) << "key not found in DB"; |
| 235 | + ASSERT_FALSE(get_ctx.expired) << "key expired in DB"; |
| 236 | + dsn::blob data; |
| 237 | + pegasus_extract_user_data(_write_svc->_impl->_rocksdb_wrapper->_pegasus_data_version, |
| 238 | + std::move(get_ctx.raw_value), |
| 239 | + data); |
| 240 | + user_value = data.to_string(); |
| 241 | + } |
| 242 | + |
| 243 | + void get_value_from_db(const dsn::blob &raw_key, int64_t &user_value) |
| 244 | + { |
| 245 | + std::string data; |
| 246 | + get_value_from_db(raw_key, data); |
| 247 | + ASSERT_TRUE(dsn::buf2int64(data, user_value)); |
| 248 | + } |
222 | 249 | }; |
223 | 250 |
|
224 | 251 | INSTANTIATE_TEST_SUITE_P(, pegasus_write_service_test, ::testing::Values(false, true)); |
@@ -338,5 +365,183 @@ TEST_P(pegasus_write_service_test, illegal_duplicate_request) |
338 | 365 | ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument); |
339 | 366 | } |
340 | 367 |
|
341 | | -} // namespace server |
342 | | -} // namespace pegasus |
| 368 | +TEST_P(pegasus_write_service_test, duplicate_incr) |
| 369 | +{ |
| 370 | + std::string hash_key = "dup_incr_hash"; |
| 371 | + std::string sort_key = "dup_incr_sort"; |
| 372 | + dsn::blob raw_key; |
| 373 | + pegasus::pegasus_generate_key(raw_key, hash_key, sort_key); |
| 374 | + |
| 375 | + dsn::apps::duplicate_request duplicate; |
| 376 | + dsn::apps::duplicate_entry entry; |
| 377 | + entry.timestamp = 1000; |
| 378 | + entry.cluster_id = 2; |
| 379 | + dsn::apps::duplicate_response resp; |
| 380 | + |
| 381 | + // First put base value "0" via duplicate |
| 382 | + { |
| 383 | + dsn::apps::update_request put_req; |
| 384 | + put_req.key = raw_key; |
| 385 | + put_req.value = dsn::blob::create_from_bytes("0"); |
| 386 | + dsn::message_ptr put_msg = pegasus::create_put_request(put_req); |
| 387 | + entry.task_code = dsn::apps::RPC_RRDB_RRDB_PUT; |
| 388 | + entry.raw_message = dsn::move_message_to_blob(put_msg.get()); |
| 389 | + duplicate.entries.clear(); |
| 390 | + duplicate.entries.emplace_back(entry); |
| 391 | + _write_svc->duplicate(1, duplicate, resp); |
| 392 | + ASSERT_EQ(resp.error, rocksdb::Status::kOk); |
| 393 | + } |
| 394 | + |
| 395 | + // Then duplicate INCR with increment=1 |
| 396 | + { |
| 397 | + dsn::apps::incr_request incr_req; |
| 398 | + incr_req.key = raw_key; |
| 399 | + incr_req.increment = 1; |
| 400 | + incr_req.expire_ts_seconds = 0; |
| 401 | + dsn::message_ptr incr_msg = pegasus::create_incr_request(incr_req); |
| 402 | + entry.task_code = dsn::apps::RPC_RRDB_RRDB_INCR; |
| 403 | + entry.raw_message = dsn::move_message_to_blob(incr_msg.get()); |
| 404 | + duplicate.entries.clear(); |
| 405 | + duplicate.entries.emplace_back(entry); |
| 406 | + _write_svc->duplicate(2, duplicate, resp); |
| 407 | + ASSERT_EQ(resp.error, rocksdb::Status::kOk); |
| 408 | + } |
| 409 | + |
| 410 | + int64_t value_after_incr = 0; |
| 411 | + get_value_from_db(raw_key, value_after_incr); |
| 412 | + ASSERT_EQ(value_after_incr, 1); |
| 413 | +} |
| 414 | + |
| 415 | +TEST_P(pegasus_write_service_test, duplicate_check_and_set) |
| 416 | +{ |
| 417 | + std::string hash_key = "dup_cas_hash"; |
| 418 | + std::string sort_key = "dup_cas_sort"; |
| 419 | + std::string check_sort_key = sort_key; |
| 420 | + std::string set_sort_key = sort_key; |
| 421 | + dsn::blob hash_key_blob; |
| 422 | + dsn::blob check_sort_key_blob; |
| 423 | + dsn::blob set_sort_key_blob; |
| 424 | + dsn::blob check_operand; |
| 425 | + dsn::blob set_value; |
| 426 | + hash_key_blob.assign(hash_key.data(), 0, hash_key.size()); |
| 427 | + check_sort_key_blob.assign(check_sort_key.data(), 0, check_sort_key.size()); |
| 428 | + set_sort_key_blob.assign(set_sort_key.data(), 0, set_sort_key.size()); |
| 429 | + check_operand.assign("old", 0, 3); |
| 430 | + set_value.assign("new", 0, 3); |
| 431 | + |
| 432 | + dsn::blob raw_key; |
| 433 | + pegasus::pegasus_generate_key(raw_key, hash_key, sort_key); |
| 434 | + |
| 435 | + dsn::apps::duplicate_request duplicate; |
| 436 | + dsn::apps::duplicate_entry entry; |
| 437 | + entry.timestamp = 1000; |
| 438 | + entry.cluster_id = 2; |
| 439 | + dsn::apps::duplicate_response resp; |
| 440 | + |
| 441 | + // First put "old" via duplicate |
| 442 | + { |
| 443 | + dsn::apps::update_request put_req; |
| 444 | + put_req.key = raw_key; |
| 445 | + put_req.value = dsn::blob::create_from_bytes("old"); |
| 446 | + dsn::message_ptr put_msg = pegasus::create_put_request(put_req); |
| 447 | + entry.task_code = dsn::apps::RPC_RRDB_RRDB_PUT; |
| 448 | + entry.raw_message = dsn::move_message_to_blob(put_msg.get()); |
| 449 | + duplicate.entries.clear(); |
| 450 | + duplicate.entries.emplace_back(entry); |
| 451 | + _write_svc->duplicate(1, duplicate, resp); |
| 452 | + ASSERT_EQ(resp.error, rocksdb::Status::kOk); |
| 453 | + } |
| 454 | + |
| 455 | + // Then duplicate CHECK_AND_SET: check value=="old", set to "new" |
| 456 | + { |
| 457 | + dsn::apps::check_and_set_request cas_req; |
| 458 | + cas_req.hash_key = hash_key_blob; |
| 459 | + cas_req.check_sort_key = check_sort_key_blob; |
| 460 | + cas_req.check_type = dsn::apps::cas_check_type::CT_VALUE_BYTES_EQUAL; |
| 461 | + cas_req.check_operand = check_operand; |
| 462 | + cas_req.set_diff_sort_key = false; |
| 463 | + cas_req.set_value = set_value; |
| 464 | + cas_req.set_expire_ts_seconds = 0; |
| 465 | + cas_req.return_check_value = false; |
| 466 | + dsn::message_ptr cas_msg = pegasus::create_check_and_set_request(cas_req); |
| 467 | + entry.task_code = dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET; |
| 468 | + entry.raw_message = dsn::move_message_to_blob(cas_msg.get()); |
| 469 | + duplicate.entries.clear(); |
| 470 | + duplicate.entries.emplace_back(entry); |
| 471 | + _write_svc->duplicate(2, duplicate, resp); |
| 472 | + ASSERT_EQ(resp.error, rocksdb::Status::kOk); |
| 473 | + } |
| 474 | + |
| 475 | + std::string value_after_cas; |
| 476 | + get_value_from_db(raw_key, value_after_cas); |
| 477 | + ASSERT_EQ(value_after_cas, "new"); |
| 478 | +} |
| 479 | + |
| 480 | +TEST_P(pegasus_write_service_test, duplicate_check_and_mutate) |
| 481 | +{ |
| 482 | + std::string hash_key = "dup_cam_hash"; |
| 483 | + std::string sort_key = "dup_cam_sort"; |
| 484 | + dsn::blob hash_key_blob; |
| 485 | + dsn::blob check_sort_key_blob; |
| 486 | + dsn::blob check_operand; |
| 487 | + dsn::blob mutate_sort_key; |
| 488 | + dsn::blob mutate_value; |
| 489 | + hash_key_blob.assign(hash_key.data(), 0, hash_key.size()); |
| 490 | + check_sort_key_blob.assign(sort_key.data(), 0, sort_key.size()); |
| 491 | + check_operand.assign("old", 0, 3); |
| 492 | + mutate_sort_key.assign(sort_key.data(), 0, sort_key.size()); |
| 493 | + mutate_value.assign("new", 0, 3); |
| 494 | + |
| 495 | + dsn::blob raw_key; |
| 496 | + pegasus::pegasus_generate_key(raw_key, hash_key, sort_key); |
| 497 | + |
| 498 | + dsn::apps::duplicate_request duplicate; |
| 499 | + dsn::apps::duplicate_entry entry; |
| 500 | + entry.timestamp = 1000; |
| 501 | + entry.cluster_id = 2; |
| 502 | + dsn::apps::duplicate_response resp; |
| 503 | + |
| 504 | + // First put "old" via duplicate |
| 505 | + { |
| 506 | + dsn::apps::update_request put_req; |
| 507 | + put_req.key = raw_key; |
| 508 | + put_req.value = dsn::blob::create_from_bytes("old"); |
| 509 | + dsn::message_ptr put_msg = pegasus::create_put_request(put_req); |
| 510 | + entry.task_code = dsn::apps::RPC_RRDB_RRDB_PUT; |
| 511 | + entry.raw_message = dsn::move_message_to_blob(put_msg.get()); |
| 512 | + duplicate.entries.clear(); |
| 513 | + duplicate.entries.emplace_back(entry); |
| 514 | + _write_svc->duplicate(1, duplicate, resp); |
| 515 | + ASSERT_EQ(resp.error, rocksdb::Status::kOk); |
| 516 | + } |
| 517 | + |
| 518 | + // Then duplicate CHECK_AND_MUTATE: check value=="old", mutate with MO_PUT to "new" |
| 519 | + { |
| 520 | + dsn::apps::mutate mutate_op; |
| 521 | + mutate_op.operation = dsn::apps::mutate_operation::MO_PUT; |
| 522 | + mutate_op.sort_key = mutate_sort_key; |
| 523 | + mutate_op.value = mutate_value; |
| 524 | + mutate_op.set_expire_ts_seconds = 0; |
| 525 | + |
| 526 | + dsn::apps::check_and_mutate_request cam_req; |
| 527 | + cam_req.hash_key = hash_key_blob; |
| 528 | + cam_req.check_sort_key = check_sort_key_blob; |
| 529 | + cam_req.check_type = dsn::apps::cas_check_type::CT_VALUE_BYTES_EQUAL; |
| 530 | + cam_req.check_operand = check_operand; |
| 531 | + cam_req.mutate_list = {mutate_op}; |
| 532 | + cam_req.return_check_value = false; |
| 533 | + dsn::message_ptr cam_msg = pegasus::create_check_and_mutate_request(cam_req); |
| 534 | + entry.task_code = dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE; |
| 535 | + entry.raw_message = dsn::move_message_to_blob(cam_msg.get()); |
| 536 | + duplicate.entries.clear(); |
| 537 | + duplicate.entries.emplace_back(entry); |
| 538 | + _write_svc->duplicate(2, duplicate, resp); |
| 539 | + ASSERT_EQ(resp.error, rocksdb::Status::kOk); |
| 540 | + } |
| 541 | + |
| 542 | + std::string value_after_cam; |
| 543 | + get_value_from_db(raw_key, value_after_cam); |
| 544 | + ASSERT_EQ(value_after_cam, "new"); |
| 545 | +} |
| 546 | + |
| 547 | +} // namespace pegasus::server |
0 commit comments