Skip to content

Commit 95bb680

Browse files
committed
fix: do not apply empty snapshots
1 parent d3ab164 commit 95bb680

5 files changed

Lines changed: 32 additions & 14 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ All steps can be chained together for streamlined execution:
101101

102102
```bash
103103
kbridge fetch -b localhost:9092 | \
104-
kbridge calculate -b localhost:9093 -l Offset | \
104+
kbridge calculate -b localhost:9093 -H Offset | \
105105
kbridge apply -b localhost:9093
106106
```
107107

bridge-cli/src/main.rs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,23 +84,35 @@ async fn run(args: Args) -> Result<(), BridgeError> {
8484
} => {
8585
let topics = kafka_connection.topics.clone();
8686
let client: KafkaBridgeClient = kafka_connection.into();
87-
trace!("applying target offsets");
88-
let snapshot = OffsetSnapshot::from_csv(input)?;
8987

90-
// Handle confirmation in CLI before calling the core library
91-
if !dry_run && !skip_confirmation && !ask_for_confirmation(&snapshot) {
92-
return Err(BridgeError::Message(
93-
"Operation cancelled by user".to_string(),
88+
let snapshot = {
89+
let snapshot = OffsetSnapshot::from_csv(input)?;
90+
snapshot.filter_by_topics(&topics)
91+
};
92+
93+
if snapshot.is_empty() {
94+
// TODO decouple from command error
95+
return Err(BridgeError::ApplyOffsets(
96+
bridge_core::commands::apply_target_offsets::errors::ApplyOffsetsError::NoOffsetsToApply,
9497
));
9598
}
9699

97-
if dry_run {
98-
println!("DRY RUN: Would apply the following offsets:");
99-
print_offset_snapshot(&snapshot);
100-
Ok(())
101-
} else {
102-
client.apply_target_offsets(topics, snapshot, false).await
100+
// Only show confirmation/dry-run output if snapshot has data
101+
// (core library validates if offsets remain after topic filtering)
102+
if !snapshot.is_empty() {
103+
if !dry_run && !skip_confirmation && !ask_for_confirmation(&snapshot) {
104+
return Err(BridgeError::Message(
105+
"Operation cancelled by user".to_string(),
106+
));
107+
}
108+
109+
if dry_run {
110+
eprintln!("DRY RUN: Would apply the following offsets:");
111+
print_offset_snapshot(&snapshot);
112+
}
103113
}
114+
115+
client.apply_target_offsets(topics, snapshot, dry_run).await
104116
}
105117
}
106118
}

bridge-core/src/commands/apply_target_offsets/errors.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,6 @@ pub enum ApplyOffsetsError {
88
KafkaError(#[from] KafkaError),
99
#[error("IO error occurred. Reason: {0}")]
1010
IoError(#[from] io::Error),
11+
#[error("No offsets to apply (input is empty or no matching topics)")]
12+
NoOffsetsToApply,
1113
}

bridge-core/src/commands/apply_target_offsets/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ pub async fn execute(
4040
.or_insert(vec![value.clone()]);
4141
});
4242

43+
if mapped_intermediary_result.is_empty() {
44+
return Err(ApplyOffsetsError::NoOffsetsToApply);
45+
}
46+
4347
if !dry_run {
4448
apply_target_offsets(&mut exporter_base_config, &mapped_intermediary_result).await?;
4549
}

bridge-core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
pub mod client;
2-
mod commands;
2+
pub mod commands;
33
pub mod errors;
44
pub mod kafka;
55
pub mod partition;

0 commit comments

Comments
 (0)