Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions adbmysqlwriter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-3</version>

<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
Expand Down
2 changes: 2 additions & 0 deletions adbpgwriter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-3</version>

<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
Expand Down
2 changes: 2 additions & 0 deletions adswriter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-3</version>

<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
Expand Down
2 changes: 2 additions & 0 deletions cassandrareader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-3</version>

<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
Expand Down
2 changes: 2 additions & 0 deletions cassandrawriter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-3</version>

<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
Expand Down
2 changes: 2 additions & 0 deletions clickhousereader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-3</version>

<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
Expand Down
2 changes: 2 additions & 0 deletions clickhousewriter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-3</version>

<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
Expand Down
1 change: 1 addition & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@

<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-3</version>
<configuration>
<archive>
<manifest>
Expand Down
2 changes: 2 additions & 0 deletions databendwriter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-3</version>

<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
Expand Down
2 changes: 2 additions & 0 deletions datahubreader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-3</version>

<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
Expand Down
1 change: 1 addition & 0 deletions datahubwriter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-3</version>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
Expand Down
1 change: 1 addition & 0 deletions dorisreader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-3</version>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
Expand Down
23 changes: 23 additions & 0 deletions doriswriter/doc/doriswriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,26 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
```

更多信息请参照 Doris 官网:[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual)


## Production tuning recommendations

For large-scale loads, prefer tuning around these parameters first:

- `maxBatchRows`: start with `50000 ~ 200000`
- `batchSize`: start with `20MB ~ 80MB`
- `flushInterval`: start with `5000 ~ 15000`
- `flushQueueLength`: usually `2 ~ 8`
- `flushWorkerCount`: default `1`; raise to `2 ~ 4` only after validating Doris FE/BE capacity
- `connectTimeout`: `3000 ~ 5000`
- `socketTimeout`: at least several minutes for large batches
- `hostCooldownMs`: `30000 ~ 120000`

### What changed in this production-focused version

- Reused pooled HTTP connections instead of creating a new client for every batch.
- Removed the per-batch `List<byte[]> -> merge -> copy` pattern and switched to direct batch buffer aggregation.
- Reworked flush lifecycle to support deterministic close and worker shutdown.
- Added host cooldown / round-robin selection to avoid repeatedly hitting a bad FE endpoint.
- Reduced CSV escaping overhead from chained `replace()` calls to single-pass append.
- Added configurable timeouts and worker count for better throughput control.
12 changes: 12 additions & 0 deletions doriswriter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,17 @@ under the License.
</dependency>
</dependencies>
<build>



<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-3</version>



</plugin>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
Expand All @@ -78,6 +88,8 @@ under the License.
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-3</version>

<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ public static DorisCodec createCodec( Keys writerOptions) {
Map<String, Object> props = writerOptions.getLoadProps();
return new DorisCsvCodec (null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator")));
}
if ( Keys.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
return new DorisJsonCodec (writerOptions.getColumns());
}

throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,48 @@
public class DorisCsvCodec extends DorisBaseCodec implements DorisCodec {

private static final long serialVersionUID = 1L;

private final String columnSeparator;

public DorisCsvCodec ( String sp) {
public DorisCsvCodec(String sp) {
this.columnSeparator = DelimiterParser.parse(sp, "\t");
}

@Override
public String codec( Record row) {
StringBuilder sb = new StringBuilder();
public String codec(Record row) {
StringBuilder sb = new StringBuilder(Math.max(64, row.getColumnNumber() * 16));
for (int i = 0; i < row.getColumnNumber(); i++) {
String value = convertionField(row.getColumn(i));
sb.append(null == value ? "\\N" : value);
if (value == null) {
sb.append("\\N");
} else {
appendEscaped(sb, value);
}
if (i < row.getColumnNumber() - 1) {
sb.append(columnSeparator);
}
}
return sb.toString();
}

private void appendEscaped(StringBuilder sb, String s) {
for (int i = 0; i < s.length(); i++) {
char ch = s.charAt(i);
switch (ch) {
case '\\':
sb.append("\\\\");
break;
case '\n':
sb.append("\\\\n");
break;
case '\r':
sb.append("\\\\r");
break;
case '\t':
sb.append("\\\\t");
break;
default:
sb.append(ch);
}
}
}
}

This file was deleted.

Loading