From 658a9e9efe6a770fede40da3dc6bd662458d155e Mon Sep 17 00:00:00 2001 From: fei Date: Wed, 8 Apr 2026 11:44:31 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E5=A4=84=E7=90=86csv=E6=A0=BC=E5=BC=8F?= =?UTF-8?q?=E5=AD=97=E6=AE=B5=E5=86=85=E5=AE=B9=E5=8C=85=E5=90=AB=E5=88=B6?= =?UTF-8?q?=E8=A1=A8=E7=AC=A6=E4=B8=8E=E5=9B=9E=E8=BD=A6=E7=AC=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 处理csv格式字段内容包含制表符与回车符 --- .../writer/doriswriter/DorisCsvCodec.java | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java index 518aa3043f..6147c240b7 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java @@ -13,15 +13,31 @@ public DorisCsvCodec ( String sp) { } @Override - public String codec( Record row) { + public String codec( Record row) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < row.getColumnNumber(); i++) { String value = convertionField(row.getColumn(i)); - sb.append(null == value ? "\\N" : value); + if (value == null) { + sb.append("\\N"); + } else { + // 只转义 会破坏格式的字符,不包引号 + sb.append(escapeValue(value)); + } + if (i < row.getColumnNumber() - 1) { sb.append(columnSeparator); } } return sb.toString(); } + // ========================================== + // 转义函数:处理 \n \r \t \ 等特殊字符 + // 不包引号!完全满足你的要求 + // ========================================== + private String escapeValue(String s) { + return s.replace("\\", "\\\\") // 反斜杠必须转义 + .replace("\n", "\\\\n") // 换行 + .replace("\r", "\\\\r") // 回车 + .replace("\t", "\\\\t"); // 制表符 + } } From ca323d75cf345fbd53e3059960f88a83e8fc444e Mon Sep 17 00:00:00 2001 From: ogre Date: Fri, 17 Apr 2026 10:48:18 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E6=95=88=E7=8E=87?= =?UTF-8?q?=E9=97=AE=E9=A2=98=201=E3=80=81=E5=87=8F=E5=B0=91=E5=86=85?= =?UTF-8?q?=E5=AD=98=E6=8B=B7=E8=B4=9D=202=E3=80=81httpclient=20=E9=95=BF?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=203=E3=80=81flush=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doriswriter/doc/doriswriter.md | 23 ++ doriswriter/pom.xml | 10 + .../writer/doriswriter/DorisCodecFactory.java | 4 +- .../writer/doriswriter/DorisCsvCodec.java | 41 ++- .../writer/doriswriter/DorisJsonCodec.java | 33 -- .../doriswriter/DorisStreamLoadObserver.java | 310 +++++++++--------- .../doriswriter/DorisWriterManager.java | 220 +++++++------ .../datax/plugin/writer/doriswriter/Keys.java | 102 ++++-- .../writer/doriswriter/WriterTuple.java | 51 ++- .../main/resources/plugin_job_template.json | 14 +- 10 files changed, 466 insertions(+), 342 deletions(-) delete mode 100644 doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java diff --git a/doriswriter/doc/doriswriter.md b/doriswriter/doc/doriswriter.md index 2070113b4d..9414ec9e50 100644 --- a/doriswriter/doc/doriswriter.md +++ b/doriswriter/doc/doriswriter.md @@ -177,3 +177,26 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter ``` 更多信息请参照 Doris 官网:[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual) + + +## Production tuning recommendations + +For large-scale loads, prefer tuning around these parameters first: + +- `maxBatchRows`: start with `50000 ~ 200000` +- `batchSize`: start with `20MB ~ 80MB` +- `flushInterval`: start with `5000 ~ 15000` +- `flushQueueLength`: usually `2 ~ 8` +- `flushWorkerCount`: default `1`; raise to `2 ~ 4` only after validating Doris FE/BE capacity +- `connectTimeout`: `3000 ~ 5000` +- `socketTimeout`: at least several minutes for large batches +- `hostCooldownMs`: `30000 ~ 120000` + +### What changed in this production-focused version + +- Reused pooled HTTP connections instead of creating a new client for every batch. +- Removed the per-batch `List -> merge -> copy` pattern and switched to direct batch buffer aggregation. +- Reworked flush lifecycle to support deterministic close and worker shutdown. +- Added host cooldown / round-robin selection to avoid repeatedly hitting a bad FE endpoint. +- Reduced CSV escaping overhead from chained `replace()` calls to single-pass append. +- Added configurable timeouts and worker count for better throughput control. diff --git a/doriswriter/pom.xml b/doriswriter/pom.xml index aa1e6ff080..2887434c0b 100644 --- a/doriswriter/pom.xml +++ b/doriswriter/pom.xml @@ -65,7 +65,16 @@ under the License. + + + + + maven-assembly-plugin + + 2.2-beta-3 + + maven-compiler-plugin @@ -78,6 +87,7 @@ under the License. maven-assembly-plugin + 2.2-beta-3 src/main/assembly/package.xml diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodecFactory.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodecFactory.java index 22c4b4099b..a059cfd9eb 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodecFactory.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodecFactory.java @@ -11,9 +11,7 @@ public static DorisCodec createCodec( Keys writerOptions) { Map props = writerOptions.getLoadProps(); return new DorisCsvCodec (null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator"))); } - if ( Keys.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) { - return new DorisJsonCodec (writerOptions.getColumns()); - } + throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties."); } } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java index 6147c240b7..314296e55d 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java @@ -5,39 +5,48 @@ public class DorisCsvCodec extends DorisBaseCodec implements DorisCodec { private static final long serialVersionUID = 1L; - private final String columnSeparator; - public DorisCsvCodec ( String sp) { + public DorisCsvCodec(String sp) { this.columnSeparator = DelimiterParser.parse(sp, "\t"); } @Override - public String codec( Record row) { - StringBuilder sb = new StringBuilder(); + public String codec(Record row) { + StringBuilder sb = new StringBuilder(Math.max(64, row.getColumnNumber() * 16)); for (int i = 0; i < row.getColumnNumber(); i++) { String value = convertionField(row.getColumn(i)); if (value == null) { sb.append("\\N"); } else { - // 只转义 会破坏格式的字符,不包引号 - sb.append(escapeValue(value)); + appendEscaped(sb, value); } - if (i < row.getColumnNumber() - 1) { sb.append(columnSeparator); } } return sb.toString(); } - // ========================================== - // 转义函数:处理 \n \r \t \ 等特殊字符 - // 不包引号!完全满足你的要求 - // ========================================== - private String escapeValue(String s) { - return s.replace("\\", "\\\\") // 反斜杠必须转义 - .replace("\n", "\\\\n") // 换行 - .replace("\r", "\\\\r") // 回车 - .replace("\t", "\\\\t"); // 制表符 + + private void appendEscaped(StringBuilder sb, String s) { + for (int i = 0; i < s.length(); i++) { + char ch = s.charAt(i); + switch (ch) { + case '\\': + sb.append("\\\\"); + break; + case '\n': + sb.append("\\\\n"); + break; + case '\r': + sb.append("\\\\r"); + break; + case '\t': + sb.append("\\\\t"); + break; + default: + sb.append(ch); + } + } } } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java deleted file mode 100644 index 68abd9eb40..0000000000 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java +++ /dev/null @@ -1,33 +0,0 @@ -package com.alibaba.datax.plugin.writer.doriswriter; - -import com.alibaba.datax.common.element.Record; -import com.alibaba.fastjson2.JSON; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class DorisJsonCodec extends DorisBaseCodec implements DorisCodec { - - private static final long serialVersionUID = 1L; - - private final List fieldNames; - - public DorisJsonCodec ( List fieldNames) { - this.fieldNames = fieldNames; - } - - @Override - public String codec( Record row) { - if (null == fieldNames) { - return ""; - } - Map rowMap = new HashMap<> (fieldNames.size()); - int idx = 0; - for (String fieldName : fieldNames) { - rowMap.put(fieldName, convertionField(row.getColumn(idx))); - idx++; - } - return JSON.toJSONString(rowMap); - } -} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java index e1f6e0eed6..52e1da3187 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java @@ -13,223 +13,227 @@ import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.URL; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; -public class DorisStreamLoadObserver { +public class DorisStreamLoadObserver implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class); - private Keys options; - - private long pos; private static final String RESULT_FAILED = "Fail"; private static final String RESULT_LABEL_EXISTED = "Label Already Exists"; - private static final String LAEBL_STATE_VISIBLE = "VISIBLE"; - private static final String LAEBL_STATE_COMMITTED = "COMMITTED"; + private static final String LABEL_STATE_VISIBLE = "VISIBLE"; + private static final String LABEL_STATE_COMMITTED = "COMMITTED"; private static final String RESULT_LABEL_PREPARE = "PREPARE"; private static final String RESULT_LABEL_ABORTED = "ABORTED"; private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN"; + private final Keys options; + private final String basicAuthHeader; + private final List hosts; + private final AtomicInteger hostIndex = new AtomicInteger(0); + private final Map hostBlacklistUntil = new ConcurrentHashMap(); + private final RequestConfig requestConfig; + private final CloseableHttpClient httpClient; - public DorisStreamLoadObserver ( Keys options){ + public DorisStreamLoadObserver(Keys options) { this.options = options; + this.basicAuthHeader = buildBasicAuthHeader(options.getUsername(), options.getPassword()); + this.hosts = new ArrayList(); + for (String host : options.getLoadUrlList()) { + if (host.startsWith("http://") || host.startsWith("https://")) { + this.hosts.add(host); + } else { + this.hosts.add("http://" + host); + } + } + + PoolingHttpClientConnectionManager connManager = new PoolingHttpClientConnectionManager(); + connManager.setDefaultMaxPerRoute(Math.max(8, options.getFlushWorkerCount() * 4)); + connManager.setMaxTotal(Math.max(16, options.getFlushWorkerCount() * 8)); + + this.requestConfig = RequestConfig.custom() + .setConnectTimeout(options.getConnectTimeout()) + .setSocketTimeout(options.getSocketTimeout()) + .setConnectionRequestTimeout(options.getConnectionRequestTimeout()) + .setRedirectsEnabled(true) + .build(); + + HttpClientBuilder httpClientBuilder = HttpClients.custom() + .setConnectionManager(connManager) + .setDefaultRequestConfig(requestConfig) + .setRedirectStrategy(new DefaultRedirectStrategy() { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }) + .disableAutomaticRetries(); + this.httpClient = httpClientBuilder.build(); } public void streamLoad(WriterTuple data) throws Exception { String host = getLoadHost(); - if(host == null){ - throw new IOException ("load_url cannot be empty, or the host cannot connect.Please check your configuration."); + if (host == null) { + throw new IOException("loadUrl cannot be empty, or no Doris FE/BE host is currently available."); } - String loadUrl = new StringBuilder(host) - .append("/api/") - .append(options.getDatabase()) - .append("/") - .append(options.getTable()) - .append("/_stream_load") - .toString(); - LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel()); - Map loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue())); - LOG.info("StreamLoad response :{}",JSON.toJSONString(loadResult)); + String loadUrl = host + "/api/" + options.getDatabase() + "/" + options.getTable() + "/_stream_load"; + Map loadResult = put(loadUrl, data); + LOG.info("StreamLoad response: {}", JSON.toJSONString(loadResult)); final String keyStatus = "Status"; - if (null == loadResult || !loadResult.containsKey(keyStatus)) { + if (loadResult == null || !loadResult.containsKey(keyStatus)) { + markHostFailure(host); throw new IOException("Unable to flush data to Doris: unknown result status."); } - LOG.debug("StreamLoad response:{}",JSON.toJSONString(loadResult)); - if (RESULT_FAILED.equals(loadResult.get(keyStatus))) { - throw new IOException( - new StringBuilder("Failed to flush data to Doris.\n").append(JSON.toJSONString(loadResult)).toString() - ); - } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) { - LOG.debug("StreamLoad response:{}",JSON.toJSONString(loadResult)); + Object status = loadResult.get(keyStatus); + if (RESULT_FAILED.equals(status)) { + markHostFailure(host); + throw new DorisWriterExcetion("Failed to flush data to Doris. " + JSON.toJSONString(loadResult), loadResult); + } + if (RESULT_LABEL_EXISTED.equals(status)) { checkStreamLoadState(host, data.getLabel()); + } else { + markHostSuccess(host); } } private void checkStreamLoadState(String host, String label) throws IOException { int idx = 0; - while(true) { + while (true) { try { - TimeUnit.SECONDS.sleep(Math.min(++idx, 5)); + Thread.sleep(1000L * Math.min(++idx, 5)); } catch (InterruptedException ex) { - break; + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while checking stream load label state.", ex); } - try (CloseableHttpClient httpclient = HttpClients.createDefault()) { - HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString()); - httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword())); - httpGet.setHeader("Connection", "close"); - - try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { - HttpEntity respEntity = getHttpEntity(resp); - if (respEntity == null) { - throw new IOException(String.format("Failed to flush data to Doris, Error " + - "could not get the final state of label[%s].\n", label), null); - } - Map result = (Map)JSON.parse(EntityUtils.toString(respEntity)); - String labelState = (String)result.get("data"); - if (null == labelState) { - throw new IOException(String.format("Failed to flush data to Doris, Error " + - "could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null); - } - LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState)); - switch(labelState) { - case LAEBL_STATE_VISIBLE: - case LAEBL_STATE_COMMITTED: - return; - case RESULT_LABEL_PREPARE: - continue; - case RESULT_LABEL_ABORTED: - throw new DorisWriterExcetion (String.format("Failed to flush data to Doris, Error " + - "label[%s] state[%s]\n", label, labelState), null, true); - case RESULT_LABEL_UNKNOWN: - default: - throw new IOException(String.format("Failed to flush data to Doris, Error " + - "label[%s] state[%s]\n", label, labelState), null); - } + + HttpGet httpGet = new HttpGet(host + "/api/" + options.getDatabase() + "/get_load_state?label=" + label); + httpGet.setHeader("Authorization", basicAuthHeader); + httpGet.setConfig(requestConfig); + try (CloseableHttpResponse resp = httpClient.execute(httpGet)) { + HttpEntity respEntity = getHttpEntity(resp); + if (respEntity == null) { + throw new IOException(String.format("Failed to get final state for label[%s].", label)); + } + Map result = (Map) JSON.parse(EntityUtils.toString(respEntity, StandardCharsets.UTF_8)); + String labelState = result == null ? null : (String) result.get("data"); + if (labelState == null) { + throw new IOException(String.format("Failed to get final state of label[%s]. response[%s]", label, JSON.toJSONString(result))); + } + LOG.info("Checking label[{}] state[{}]", label, labelState); + switch (labelState) { + case LABEL_STATE_VISIBLE: + case LABEL_STATE_COMMITTED: + markHostSuccess(host); + return; + case RESULT_LABEL_PREPARE: + continue; + case RESULT_LABEL_ABORTED: + markHostFailure(host); + throw new DorisWriterExcetion(String.format("Failed to flush data to Doris, label[%s] state[%s]", label, labelState), result, true); + case RESULT_LABEL_UNKNOWN: + default: + markHostFailure(host); + throw new IOException(String.format("Failed to flush data to Doris, label[%s] state[%s]", label, labelState)); } } } } - private byte[] addRows(List rows, int totalBytes) { + private Map put(String loadUrl, WriterTuple data) throws IOException { + LOG.info("Executing stream load: url='{}', rows={}, size={}, label={}", loadUrl, data.getRows(), data.getBytes(), data.getLabel()); + HttpPut httpPut = new HttpPut(loadUrl); + httpPut.removeHeaders(HttpHeaders.CONTENT_LENGTH); + httpPut.removeHeaders(HttpHeaders.TRANSFER_ENCODING); if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { - Map props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps()); - byte[] lineDelimiter = DelimiterParser.parse((String)props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); - ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length); - for (byte[] row : rows) { - bos.put(row); - bos.put(lineDelimiter); - } - return bos.array(); - } - - if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) { - ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1)); - bos.put("[".getBytes(StandardCharsets.UTF_8)); - byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); - boolean isFirstElement = true; - for (byte[] row : rows) { - if (!isFirstElement) { - bos.put(jsonDelimiter); - } - bos.put(row); - isFirstElement = false; - } - bos.put("]".getBytes(StandardCharsets.UTF_8)); - return bos.array(); - } - throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:"); - } - private Map put(String loadUrl, String label, byte[] data) throws IOException { - LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length)); - final HttpClientBuilder httpClientBuilder = HttpClients.custom() - .setRedirectStrategy(new DefaultRedirectStrategy () { - @Override - protected boolean isRedirectable(String method) { - return true; - } - }); - try ( CloseableHttpClient httpclient = httpClientBuilder.build()) { - HttpPut httpPut = new HttpPut(loadUrl); - httpPut.removeHeaders(HttpHeaders.CONTENT_LENGTH); - httpPut.removeHeaders(HttpHeaders.TRANSFER_ENCODING); List cols = options.getColumns(); - if (null != cols && !cols.isEmpty() && Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { + if (cols != null && !cols.isEmpty()) { httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList()))); } - if (null != options.getLoadProps()) { - for (Map.Entry entry : options.getLoadProps().entrySet()) { - httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); - } + } + Map loadProps = options.getLoadProps(); + if (loadProps != null) { + for (Map.Entry entry : loadProps.entrySet()) { + httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); } - httpPut.setHeader("Expect", "100-continue"); - httpPut.setHeader("label", label); - httpPut.setHeader("two_phase_commit", "false"); - httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword())); - httpPut.setEntity(new ByteArrayEntity(data)); - httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); - try ( CloseableHttpResponse resp = httpclient.execute(httpPut)) { - HttpEntity respEntity = getHttpEntity(resp); - if (respEntity == null) - return null; - return (Map)JSON.parse(EntityUtils.toString(respEntity)); + } + httpPut.setHeader("Expect", "100-continue"); + httpPut.setHeader("label", data.getLabel()); + httpPut.setHeader("two_phase_commit", "false"); + httpPut.setHeader("Authorization", basicAuthHeader); + httpPut.setEntity(new ByteArrayEntity(data.getData())); + httpPut.setConfig(requestConfig); + try (CloseableHttpResponse resp = httpClient.execute(httpPut)) { + HttpEntity respEntity = getHttpEntity(resp); + if (respEntity == null) { + return null; } + return (Map) JSON.parse(EntityUtils.toString(respEntity, StandardCharsets.UTF_8)); } } - private String getBasicAuthHeader(String username, String password) { - String auth = username + ":" + password; - byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8)); - return new StringBuilder("Basic ").append(new String(encodedAuth)).toString(); - } - - private HttpEntity getHttpEntity(CloseableHttpResponse resp) { + private HttpEntity getHttpEntity(CloseableHttpResponse resp) throws IOException { int code = resp.getStatusLine().getStatusCode(); - if (200 != code) { - LOG.warn("Request failed with code:{}", code); - return null; + if (code < 200 || code >= 300) { + String responseText = resp.getEntity() == null ? "" : EntityUtils.toString(resp.getEntity(), StandardCharsets.UTF_8); + throw new IOException("Request failed with code=" + code + ", response=" + responseText); } HttpEntity respEntity = resp.getEntity(); - if (null == respEntity) { - LOG.warn("Request failed with empty response."); + if (respEntity == null) { + LOG.warn("Request succeeded but response body is empty."); return null; } return respEntity; } private String getLoadHost() { - List hostList = options.getLoadUrlList(); - Collections.shuffle(hostList); - String host = new StringBuilder("http://").append(hostList.get((0))).toString(); - if (checkConnection(host)){ - return host; + if (hosts.isEmpty()) { + return null; + } + long now = System.currentTimeMillis(); + int size = hosts.size(); + int start = Math.abs(hostIndex.getAndIncrement()); + for (int i = 0; i < size; i++) { + String host = hosts.get((start + i) % size); + Long blockedUntil = hostBlacklistUntil.get(host); + if (blockedUntil == null || blockedUntil <= now) { + return host; + } } - return null; + String fallback = hosts.get(start % size); + LOG.warn("All Doris hosts are in cooldown; fallback to host {}", fallback); + return fallback; } - private boolean checkConnection(String host) { - try { - URL url = new URL(host); - HttpURLConnection co = (HttpURLConnection) url.openConnection(); - co.setConnectTimeout(5000); - co.connect(); - co.disconnect(); - return true; - } catch (Exception e1) { - e1.printStackTrace(); - return false; - } + private void markHostFailure(String host) { + hostBlacklistUntil.put(host, System.currentTimeMillis() + options.getHostCooldownMs()); + } + + private void markHostSuccess(String host) { + hostBlacklistUntil.remove(host); + } + + private String buildBasicAuthHeader(String username, String password) { + String auth = username + ":" + password; + byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8)); + return "Basic " + new String(encodedAuth, StandardCharsets.UTF_8); + } + + @Override + public void close() throws IOException { + httpClient.close(); } } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java index f0ba6b5283..f2c473f681 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java @@ -13,7 +13,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; public class DorisWriterManager { @@ -22,160 +21,154 @@ public class DorisWriterManager { private final DorisStreamLoadObserver visitor; private final Keys options; - private final List buffer = new ArrayList<> (); - private int batchCount = 0; - private long batchSize = 0; + private final LinkedBlockingDeque flushQueue; + private final List flushWorkers = new ArrayList(); + private final ScheduledExecutorService scheduler; + private final FastByteArrayOutputStream batchBuffer; + private final byte[] lineDelimiter; + private final byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); + private final byte[] jsonArrayPrefix = "[".getBytes(StandardCharsets.UTF_8); + private final byte[] jsonArraySuffix = "]".getBytes(StandardCharsets.UTF_8); + private volatile boolean closed = false; private volatile Exception flushException; - private final LinkedBlockingDeque< WriterTuple > flushQueue; - private ScheduledExecutorService scheduler; - private ScheduledFuture scheduledFuture; + private int batchCount = 0; + private long batchSize = 0; - public DorisWriterManager( Keys options) { + public DorisWriterManager(Keys options) { this.options = options; - this.visitor = new DorisStreamLoadObserver (options); - flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength()); - this.startScheduler(); - this.startAsyncFlushing(); + this.visitor = new DorisStreamLoadObserver(options); + this.flushQueue = new LinkedBlockingDeque(options.getFlushQueueLength()); + this.batchBuffer = new FastByteArrayOutputStream((int) Math.min(Integer.MAX_VALUE - 8L, Math.max(options.getInitialBufferSize(), options.getBatchSize()))); + this.lineDelimiter = DelimiterParser.parse(options.getLineDelimiter(), "\n").getBytes(StandardCharsets.UTF_8); + this.scheduler = Executors.newSingleThreadScheduledExecutor( + new BasicThreadFactory.Builder().namingPattern("doris-interval-flush-%d").daemon(true).build()); + startAsyncFlushing(); + startScheduler(); } - public void startScheduler() { - stopScheduler(); - this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("Doris-interval-flush").daemon(true).build()); - this.scheduledFuture = this.scheduler.schedule(() -> { - synchronized (DorisWriterManager.this) { - if (!closed) { + private void startScheduler() { + this.scheduler.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + synchronized (DorisWriterManager.this) { + if (closed || batchCount == 0) { + return; + } try { String label = createBatchLabel(); - LOG.info(String.format("Doris interval Sinking triggered: label[%s].", label)); - if (batchCount == 0) { - startScheduler(); - } - flush(label, false); + LOG.debug("Doris interval flush triggered: rows[{}] bytes[{}] label[{}].", batchCount, batchSize, label); + flushInternal(label); } catch (Exception e) { flushException = e; } } } - }, options.getFlushInterval(), TimeUnit.MILLISECONDS); - } - - public void stopScheduler() { - if (this.scheduledFuture != null) { - scheduledFuture.cancel(false); - this.scheduler.shutdown(); - } + }, options.getFlushInterval(), options.getFlushInterval(), TimeUnit.MILLISECONDS); } - public final synchronized void writeRecord(String record) throws IOException { + public synchronized void writeRecord(String record) throws IOException { checkFlushException(); + if (closed) { + throw new IOException("DorisWriterManager is already closed."); + } try { byte[] bts = record.getBytes(StandardCharsets.UTF_8); - buffer.add(bts); + if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) { + if (batchCount > 0) { + batchBuffer.write(jsonDelimiter); + } + batchBuffer.write(bts); + } else { + batchBuffer.write(bts); + batchBuffer.write(lineDelimiter); + } batchCount++; batchSize += bts.length; - if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) { + if (batchCount >= options.getBatchRows() || batchBuffer.size() >= options.getBatchSize()) { String label = createBatchLabel(); - LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label)); - flush(label, false); + LOG.debug("Doris buffer flush triggered: rows[{}] bytes[{}] label[{}].", batchCount, batchBuffer.size(), label); + flushInternal(label); } } catch (Exception e) { throw new IOException("Writing records to Doris failed.", e); } } - public synchronized void flush(String label, boolean waitUtilDone) throws Exception { - checkFlushException(); - if (batchCount == 0) { - if (waitUtilDone) { - waitAsyncFlushingDone(); - } + public synchronized void close() { + if (closed) { + checkFlushException(); return; } - flushQueue.put(new WriterTuple (label, batchSize, new ArrayList<>(buffer))); - if (waitUtilDone) { - // wait the last flush - waitAsyncFlushingDone(); - } - buffer.clear(); - batchCount = 0; - batchSize = 0; - } - - public synchronized void close() { - if (!closed) { - closed = true; - try { - String label = createBatchLabel(); - if (batchCount > 0) LOG.debug(String.format("Doris Sink is about to close: label[%s].", label)); - flush(label, true); - } catch (Exception e) { - throw new RuntimeException("Writing records to Doris failed.", e); + closed = true; + scheduler.shutdown(); + try { + if (batchCount > 0) { + flushInternal(createBatchLabel()); } + for (int i = 0; i < options.getFlushWorkerCount(); i++) { + flushQueue.put(WriterTuple.poisonPill()); + } + for (Thread worker : flushWorkers) { + worker.join(); + } + visitor.close(); + } catch (Exception e) { + throw new RuntimeException("Writing records to Doris failed.", e); } checkFlushException(); } public String createBatchLabel() { StringBuilder sb = new StringBuilder(); - if (! Strings.isNullOrEmpty(options.getLabelPrefix())) { + if (!Strings.isNullOrEmpty(options.getLabelPrefix())) { sb.append(options.getLabelPrefix()); } - return sb.append(UUID.randomUUID().toString()) - .toString(); + return sb.append(UUID.randomUUID().toString()).toString(); } private void startAsyncFlushing() { - // start flush thread - Thread flushThread = new Thread(new Runnable(){ - public void run() { - while(true) { - try { - asyncFlush(); - } catch (Exception e) { - flushException = e; + for (int i = 0; i < options.getFlushWorkerCount(); i++) { + Thread flushThread = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + try { + WriterTuple flushData = flushQueue.take(); + if (flushData.isPoison()) { + return; + } + asyncFlush(flushData); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } catch (Exception e) { + flushException = e; + return; + } } } - } - }); - flushThread.setDaemon(true); - flushThread.start(); - } - - private void waitAsyncFlushingDone() throws InterruptedException { - // wait previous flushings - for (int i = 0; i <= options.getFlushQueueLength(); i++) { - flushQueue.put(new WriterTuple ("", 0l, null)); + }, "doris-streamload-worker-" + i); + flushThread.setDaemon(true); + flushThread.start(); + flushWorkers.add(flushThread); } - checkFlushException(); } - private void asyncFlush() throws Exception { - WriterTuple flushData = flushQueue.take(); - if (Strings.isNullOrEmpty(flushData.getLabel())) { - return; - } - stopScheduler(); - LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); + private void asyncFlush(WriterTuple flushData) throws Exception { + LOG.debug("Async stream load start: rows[{}] bytes[{}] label[{}].", flushData.getRows(), flushData.getBytes(), flushData.getLabel()); for (int i = 0; i <= options.getMaxRetries(); i++) { try { - // flush to Doris with stream load visitor.streamLoad(flushData); - LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel())); - startScheduler(); - break; + LOG.info("Async stream load finished: rows[{}] bytes[{}] label[{}].", flushData.getRows(), flushData.getBytes(), flushData.getLabel()); + return; } catch (Exception e) { - LOG.warn("Failed to flush batch data to Doris, retry times = {}", i, e); + LOG.warn("Failed to flush batch data to Doris, retry times = {}, label = {}", i, flushData.getLabel(), e); if (i >= options.getMaxRetries()) { throw new IOException(e); } - if (e instanceof DorisWriterExcetion && (( DorisWriterExcetion )e).needReCreateLabel()) { - String newLabel = createBatchLabel(); - LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel)); - flushData.setLabel(newLabel); - } try { - Thread.sleep(1000l * Math.min(i + 1, 10)); + Thread.sleep(1000L * Math.min(i + 1, 10)); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); throw new IOException("Unable to flush, interrupted while doing another attempt", e); @@ -184,6 +177,31 @@ private void asyncFlush() throws Exception { } } + private void flushInternal(String label) throws InterruptedException { + checkFlushException(); + if (batchCount == 0) { + return; + } + byte[] payload = buildPayload(); + flushQueue.put(new WriterTuple(label, payload.length, batchCount, payload)); + batchBuffer.reset(); + batchCount = 0; + batchSize = 0; + } + + private byte[] buildPayload() { + if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) { + FastByteArrayOutputStream out = new FastByteArrayOutputStream(batchBuffer.size() + 2); + out.write(jsonArrayPrefix); + if (batchBuffer.size() > 0) { + out.write(batchBuffer.toByteArray()); + } + out.write(jsonArraySuffix); + return out.toByteArray(); + } + return batchBuffer.toByteArray(); + } + private void checkFlushException() { if (flushException != null) { throw new RuntimeException("Writing records to Doris failed.", flushException); diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java index e460e76b83..95da6573f6 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java @@ -5,18 +5,30 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; import java.io.Serializable; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; public class Keys implements Serializable { - private static final long serialVersionUID = 1l; + private static final long serialVersionUID = 1L; private static final int MAX_RETRIES = 3; - private static final int BATCH_ROWS = 500000; - private static final long DEFAULT_FLUSH_INTERVAL = 30000; + private static final int BATCH_ROWS = 100000; + private static final long DEFAULT_FLUSH_INTERVAL = 10000; + private static final long DEFAULT_MAX_BATCH_SIZE = 50L * 1024 * 1024; + private static final int DEFAULT_FLUSH_QUEUE_LENGTH = 4; + private static final int DEFAULT_CONNECT_TIMEOUT = 5000; + private static final int DEFAULT_SOCKET_TIMEOUT = 600000; + private static final int DEFAULT_CONNECTION_REQUEST_TIMEOUT = 5000; + private static final long DEFAULT_HOST_COOLDOWN_MS = 30000L; + private static final int DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024; + private static final int DEFAULT_FLUSH_WORKER_COUNT = 1; private static final String LOAD_PROPS_FORMAT = "format"; + private static final String LOAD_PROPS_LINE_DELIMITER = "line_delimiter"; + private static final String LOAD_PROPS_COLUMN_SEPARATOR = "column_separator"; + public enum StreamLoadFormat { CSV, JSON; } @@ -36,23 +48,27 @@ public enum StreamLoadFormat { private static final String LOAD_URL = "loadUrl"; private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength"; private static final String LOAD_PROPS = "loadProps"; + private static final String MAX_RETRY_COUNT = "maxRetries"; + private static final String CONNECT_TIMEOUT = "connectTimeout"; + private static final String SOCKET_TIMEOUT = "socketTimeout"; + private static final String CONNECTION_REQUEST_TIMEOUT = "connectionRequestTimeout"; + private static final String HOST_COOLDOWN_MS = "hostCooldownMs"; + private static final String INITIAL_BUFFER_SIZE = "initialBufferSize"; + private static final String FLUSH_WORKER_COUNT = "flushWorkerCount"; private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_"; - private static final long DEFAULT_MAX_BATCH_SIZE = 90 * 1024 * 1024; //default 90M - private final Configuration options; private List infoSchemaColumns; - private List userSetColumns; - private boolean isWildcardColumn; + private final List userSetColumns; + private final boolean isWildcardColumn; - public Keys ( Configuration options) { + public Keys(Configuration options) { this.options = options; - this.userSetColumns = options.getList(COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList()); - if (1 == options.getList(COLUMN, String.class).size() && "*".trim().equals(options.getList(COLUMN, String.class).get(0))) { - this.isWildcardColumn = true; - } + List configuredColumns = options.getList(COLUMN, String.class); + this.userSetColumns = configuredColumns == null ? Collections.emptyList() : configuredColumns.stream().map(str -> str.replace("`", "")).collect(Collectors.toList()); + this.isWildcardColumn = configuredColumns != null && configuredColumns.size() == 1 && "*".equals(configuredColumns.get(0).trim()); } public void doPretreatment() { @@ -82,7 +98,7 @@ public String getPassword() { public String getLabelPrefix() { String label = options.getString(LABEL_PREFIX); - return null == label ? DEFAULT_LABEL_PREFIX : label; + return label == null ? DEFAULT_LABEL_PREFIX : label; } public List getLoadUrlList() { @@ -90,10 +106,7 @@ public List getLoadUrlList() { } public List getColumns() { - if (isWildcardColumn) { - return this.infoSchemaColumns; - } - return this.userSetColumns; + return isWildcardColumn ? this.infoSchemaColumns : this.userSetColumns; } public boolean isWildcardColumn() { @@ -117,32 +130,63 @@ public Map getLoadProps() { } public int getMaxRetries() { - return MAX_RETRIES; + Integer retries = options.getInt(MAX_RETRY_COUNT); + return retries == null ? MAX_RETRIES : retries; } public int getBatchRows() { Integer rows = options.getInt(MAX_BATCH_ROWS); - return null == rows ? BATCH_ROWS : rows; + return rows == null ? BATCH_ROWS : rows; } public long getBatchSize() { Long size = options.getLong(MAX_BATCH_SIZE); - return null == size ? DEFAULT_MAX_BATCH_SIZE : size; + return size == null ? DEFAULT_MAX_BATCH_SIZE : size; } public long getFlushInterval() { Long interval = options.getLong(FLUSH_INTERVAL); - return null == interval ? DEFAULT_FLUSH_INTERVAL : interval; + return interval == null ? DEFAULT_FLUSH_INTERVAL : interval; } public int getFlushQueueLength() { Integer len = options.getInt(FLUSH_QUEUE_LENGTH); - return null == len ? 1 : len; + return len == null ? DEFAULT_FLUSH_QUEUE_LENGTH : len; + } + + public int getConnectTimeout() { + Integer timeout = options.getInt(CONNECT_TIMEOUT); + return timeout == null ? DEFAULT_CONNECT_TIMEOUT : timeout; + } + + public int getSocketTimeout() { + Integer timeout = options.getInt(SOCKET_TIMEOUT); + return timeout == null ? DEFAULT_SOCKET_TIMEOUT : timeout; + } + + public int getConnectionRequestTimeout() { + Integer timeout = options.getInt(CONNECTION_REQUEST_TIMEOUT); + return timeout == null ? DEFAULT_CONNECTION_REQUEST_TIMEOUT : timeout; + } + + public long getHostCooldownMs() { + Long value = options.getLong(HOST_COOLDOWN_MS); + return value == null ? DEFAULT_HOST_COOLDOWN_MS : value; + } + + public int getInitialBufferSize() { + Integer size = options.getInt(INITIAL_BUFFER_SIZE); + return size == null ? DEFAULT_INITIAL_BUFFER_SIZE : size; + } + + public int getFlushWorkerCount() { + Integer count = options.getInt(FLUSH_WORKER_COUNT); + return count == null ? DEFAULT_FLUSH_WORKER_COUNT : Math.max(1, count); } public StreamLoadFormat getStreamLoadFormat() { Map loadProps = getLoadProps(); - if (null == loadProps) { + if (loadProps == null) { return StreamLoadFormat.CSV; } if (loadProps.containsKey(LOAD_PROPS_FORMAT) @@ -152,6 +196,18 @@ public StreamLoadFormat getStreamLoadFormat() { return StreamLoadFormat.CSV; } + public String getLineDelimiter() { + Map loadProps = getLoadProps(); + Object value = loadProps == null ? null : loadProps.get(LOAD_PROPS_LINE_DELIMITER); + return value == null ? "\n" : String.valueOf(value); + } + + public String getColumnSeparator() { + Map loadProps = getLoadProps(); + Object value = loadProps == null ? null : loadProps.get(LOAD_PROPS_COLUMN_SEPARATOR); + return value == null ? "\t" : String.valueOf(value); + } + private void validateStreamLoadUrl() { List urlList = getLoadUrlList(); for (String host : urlList) { diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/WriterTuple.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/WriterTuple.java index 32e0b341b8..12d84fc3e2 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/WriterTuple.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/WriterTuple.java @@ -1,20 +1,49 @@ package com.alibaba.datax.plugin.writer.doriswriter; -import java.util.List; - public class WriterTuple { - private String label; - private Long bytes; - private List rows; + private final String label; + private final long bytes; + private final int rows; + private final byte[] data; + private final boolean poison; - public WriterTuple ( String label, Long bytes, List rows){ + public WriterTuple(String label, long bytes, int rows, byte[] data) { this.label = label; - this.rows = rows; this.bytes = bytes; + this.rows = rows; + this.data = data; + this.poison = false; + } + + private WriterTuple(boolean poison) { + this.label = ""; + this.bytes = 0L; + this.rows = 0; + this.data = null; + this.poison = poison; + } + + public static WriterTuple poisonPill() { + return new WriterTuple(true); + } + + public String getLabel() { + return label; + } + + public long getBytes() { + return bytes; + } + + public int getRows() { + return rows; + } + + public byte[] getData() { + return data; } - public String getLabel() { return label; } - public void setLabel(String label) { this.label = label; } - public Long getBytes() { return bytes; } - public List getRows() { return rows; } + public boolean isPoison() { + return poison; + } } diff --git a/doriswriter/src/main/resources/plugin_job_template.json b/doriswriter/src/main/resources/plugin_job_template.json index 0187e53965..c01fde4752 100644 --- a/doriswriter/src/main/resources/plugin_job_template.json +++ b/doriswriter/src/main/resources/plugin_job_template.json @@ -6,8 +6,18 @@ "column": [], "preSql": [], "postSql": [], - "beLoadUrl": [], "loadUrl": [], + "maxBatchRows": 100000, + "batchSize": 52428800, + "flushInterval": 10000, + "flushQueueLength": 4, + "flushWorkerCount": 1, + "maxRetries": 3, + "connectTimeout": 5000, + "socketTimeout": 600000, + "connectionRequestTimeout": 5000, + "hostCooldownMs": 30000, + "initialBufferSize": 1048576, "loadProps": {}, "connection": [ { @@ -17,4 +27,4 @@ } ] } -} \ No newline at end of file +} From 05fb0d2187fb59a50e536bd424709ed454fbc63e Mon Sep 17 00:00:00 2001 From: ogre Date: Fri, 17 Apr 2026 10:48:28 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E6=95=88=E7=8E=87?= =?UTF-8?q?=E9=97=AE=E9=A2=98=201=E3=80=81=E5=87=8F=E5=B0=91=E5=86=85?= =?UTF-8?q?=E5=AD=98=E6=8B=B7=E8=B4=9D=202=E3=80=81httpclient=20=E9=95=BF?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=203=E3=80=81flush=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../FastByteArrayOutputStream.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/FastByteArrayOutputStream.java diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/FastByteArrayOutputStream.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/FastByteArrayOutputStream.java new file mode 100644 index 0000000000..a16d90237b --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/FastByteArrayOutputStream.java @@ -0,0 +1,51 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import java.util.Arrays; + +/** + * Lightweight growable byte buffer to reduce intermediate object creation. + */ +public class FastByteArrayOutputStream { + private byte[] buffer; + private int count; + + public FastByteArrayOutputStream(int initialCapacity) { + this.buffer = new byte[Math.max(initialCapacity, 1024)]; + } + + public void write(byte[] b) { + write(b, 0, b.length); + } + + public void write(byte[] b, int off, int len) { + if (len <= 0) { + return; + } + ensureCapacity(count + len); + System.arraycopy(b, off, buffer, count, len); + count += len; + } + + public int size() { + return count; + } + + public void reset() { + count = 0; + } + + public byte[] toByteArray() { + return Arrays.copyOf(buffer, count); + } + + private void ensureCapacity(int minCapacity) { + if (minCapacity <= buffer.length) { + return; + } + int newCapacity = buffer.length; + while (newCapacity < minCapacity) { + newCapacity = newCapacity << 1; + } + buffer = Arrays.copyOf(buffer, newCapacity); + } +} From b5a136d59cb62f655c291495914228f9db97dc2b Mon Sep 17 00:00:00 2001 From: hisfei Date: Tue, 9 Jun 2026 15:55:09 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E6=95=88=E7=8E=87?= =?UTF-8?q?=E9=97=AE=E9=A2=98=20=E4=BF=AE=E5=A4=8D=E7=BC=96=E8=AF=91?= =?UTF-8?q?=E9=97=AE=E9=A2=98=EF=BC=8C=E4=BF=AE=E5=A4=8Dmongodbreader?= =?UTF-8?q?=E5=85=BC=E5=AE=B9=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- adbmysqlwriter/pom.xml | 2 + adbpgwriter/pom.xml | 2 + adswriter/pom.xml | 2 + cassandrareader/pom.xml | 2 + cassandrawriter/pom.xml | 2 + clickhousereader/pom.xml | 2 + clickhousewriter/pom.xml | 2 + core/pom.xml | 1 + databendwriter/pom.xml | 2 + datahubreader/pom.xml | 2 + datahubwriter/pom.xml | 1 + dorisreader/pom.xml | 1 + doriswriter/pom.xml | 6 +- drdsreader/pom.xml | 1 + drdswriter/pom.xml | 1 + elasticsearchwriter/pom.xml | 1 + ftpreader/pom.xml | 1 + ftpwriter/pom.xml | 1 + gaussdbreader/pom.xml | 1 + gaussdbwriter/pom.xml | 1 + gdbreader/pom.xml | 1 + gdbwriter/pom.xml | 1 + hbase094xreader/pom.xml | 1 + hbase094xwriter/pom.xml | 1 + hbase11xreader/pom.xml | 1 + hbase11xsqlreader/pom.xml | 1 + hbase11xsqlwriter/pom.xml | 1 + hbase11xwriter/pom.xml | 1 + hbase20xsqlreader/pom.xml | 1 + hbase20xsqlwriter/pom.xml | 1 + hdfsreader/pom.xml | 1 + hdfswriter/pom.xml | 1 + hologresjdbcwriter/pom.xml | 1 + kingbaseesreader/pom.xml | 1 + kingbaseeswriter/pom.xml | 1 + kuduwriter/pom.xml | 1 + loghubreader/pom.xml | 1 + loghubwriter/pom.xml | 1 + milvuswriter/pom.xml | 1 + mongodbreader/pom.xml | 1 + .../reader/mongodbreader/MongoDBReader.java | 497 +++++++++++++++--- mongodbwriter/pom.xml | 1 + mysqlreader/pom.xml | 1 + mysqlwriter/pom.xml | 1 + neo4jwriter/pom.xml | 1 + obhbasereader/pom.xml | 2 + obhbasewriter/pom.xml | 1 + oceanbasev10reader/pom.xml | 1 + oceanbasev10writer/pom.xml | 1 + ocswriter/pom.xml | 1 + odpsreader/pom.xml | 1 + odpswriter/pom.xml | 1 + opentsdbreader/pom.xml | 1 + oraclereader/pom.xml | 1 + oraclewriter/pom.xml | 1 + oscarwriter/pom.xml | 1 + ossreader/pom.xml | 1 + osswriter/pom.xml | 1 + otsreader/pom.xml | 1 + otsstreamreader/pom.xml | 1 + otswriter/pom.xml | 1 + pom.xml | 1 + postgresqlreader/pom.xml | 1 + postgresqlwriter/pom.xml | 1 + rdbmsreader/pom.xml | 1 + rdbmswriter/pom.xml | 1 + selectdbwriter/pom.xml | 1 + sqlserverreader/pom.xml | 1 + sqlserverwriter/pom.xml | 1 + starrocksreader/pom.xml | 1 + starrockswriter/pom.xml | 1 + streamreader/pom.xml | 1 + streamwriter/pom.xml | 1 + sybasereader/pom.xml | 1 + sybasewriter/pom.xml | 1 + tdenginereader/pom.xml | 1 + tdenginewriter/pom.xml | 1 + transformer/pom.xml | 1 + tsdbreader/pom.xml | 1 + tsdbwriter/pom.xml | 1 + txtfilereader/pom.xml | 3 +- txtfilewriter/pom.xml | 3 +- 82 files changed, 513 insertions(+), 84 deletions(-) mode change 100644 => 100755 mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java diff --git a/adbmysqlwriter/pom.xml b/adbmysqlwriter/pom.xml index 6ffcab8530..b1bbf61563 100755 --- a/adbmysqlwriter/pom.xml +++ b/adbmysqlwriter/pom.xml @@ -58,6 +58,8 @@ maven-assembly-plugin +2.2-beta-3 + src/main/assembly/package.xml diff --git a/adbpgwriter/pom.xml b/adbpgwriter/pom.xml index c2800c7b94..c2469805cb 100644 --- a/adbpgwriter/pom.xml +++ b/adbpgwriter/pom.xml @@ -92,6 +92,8 @@ maven-assembly-plugin +2.2-beta-3 + src/main/assembly/package.xml diff --git a/adswriter/pom.xml b/adswriter/pom.xml index 41271de98c..7ab7d454ae 100644 --- a/adswriter/pom.xml +++ b/adswriter/pom.xml @@ -102,6 +102,8 @@ maven-assembly-plugin +2.2-beta-3 + src/main/assembly/package.xml diff --git a/cassandrareader/pom.xml b/cassandrareader/pom.xml index 8aa38936d3..86cb136480 100644 --- a/cassandrareader/pom.xml +++ b/cassandrareader/pom.xml @@ -112,6 +112,8 @@ maven-assembly-plugin +2.2-beta-3 + src/main/assembly/package.xml diff --git a/cassandrawriter/pom.xml b/cassandrawriter/pom.xml index 7d7cfbd7d3..b15183bd1b 100644 --- a/cassandrawriter/pom.xml +++ b/cassandrawriter/pom.xml @@ -103,6 +103,8 @@ maven-assembly-plugin +2.2-beta-3 + src/main/assembly/package.xml diff --git a/clickhousereader/pom.xml b/clickhousereader/pom.xml index 4b095796db..98653125f4 100644 --- a/clickhousereader/pom.xml +++ b/clickhousereader/pom.xml @@ -68,6 +68,8 @@ maven-assembly-plugin +2.2-beta-3 + src/main/assembly/package.xml diff --git a/clickhousewriter/pom.xml b/clickhousewriter/pom.xml index 76c5fb1fde..48a15e3372 100644 --- a/clickhousewriter/pom.xml +++ b/clickhousewriter/pom.xml @@ -67,6 +67,8 @@ maven-assembly-plugin +2.2-beta-3 + src/main/assembly/package.xml diff --git a/core/pom.xml b/core/pom.xml index 7685001b3e..8b8c14520f 100755 --- a/core/pom.xml +++ b/core/pom.xml @@ -123,6 +123,7 @@ maven-assembly-plugin +2.2-beta-3 diff --git a/databendwriter/pom.xml b/databendwriter/pom.xml index b99ca5d804..e84d321b31 100644 --- a/databendwriter/pom.xml +++ b/databendwriter/pom.xml @@ -80,6 +80,8 @@ maven-assembly-plugin +2.2-beta-3 + src/main/assembly/package.xml diff --git a/datahubreader/pom.xml b/datahubreader/pom.xml index c0022b44a9..98df477ed9 100644 --- a/datahubreader/pom.xml +++ b/datahubreader/pom.xml @@ -58,6 +58,8 @@ maven-assembly-plugin +2.2-beta-3 + src/main/assembly/package.xml diff --git a/datahubwriter/pom.xml b/datahubwriter/pom.xml index 1ee1fe9b0a..47eaad9189 100644 --- a/datahubwriter/pom.xml +++ b/datahubwriter/pom.xml @@ -58,6 +58,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/dorisreader/pom.xml b/dorisreader/pom.xml index 15a025b6c4..c99bd7017e 100755 --- a/dorisreader/pom.xml +++ b/dorisreader/pom.xml @@ -60,6 +60,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/doriswriter/pom.xml b/doriswriter/pom.xml index 2887434c0b..b5691551dc 100644 --- a/doriswriter/pom.xml +++ b/doriswriter/pom.xml @@ -71,8 +71,9 @@ under the License. maven-assembly-plugin +2.2-beta-3 + - 2.2-beta-3 @@ -87,7 +88,8 @@ under the License. maven-assembly-plugin - 2.2-beta-3 +2.2-beta-3 + src/main/assembly/package.xml diff --git a/drdsreader/pom.xml b/drdsreader/pom.xml index e38884ab96..f5acb4d32a 100755 --- a/drdsreader/pom.xml +++ b/drdsreader/pom.xml @@ -62,6 +62,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/drdswriter/pom.xml b/drdswriter/pom.xml index 35a7f28d17..c7c93e2165 100755 --- a/drdswriter/pom.xml +++ b/drdswriter/pom.xml @@ -62,6 +62,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/elasticsearchwriter/pom.xml b/elasticsearchwriter/pom.xml index 8699c6e599..f74a43b8c2 100644 --- a/elasticsearchwriter/pom.xml +++ b/elasticsearchwriter/pom.xml @@ -68,6 +68,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/ftpreader/pom.xml b/ftpreader/pom.xml index 57bf889d72..147299a6d3 100755 --- a/ftpreader/pom.xml +++ b/ftpreader/pom.xml @@ -70,6 +70,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/ftpwriter/pom.xml b/ftpwriter/pom.xml index bf7ce83de4..5b4c52bbb0 100644 --- a/ftpwriter/pom.xml +++ b/ftpwriter/pom.xml @@ -72,6 +72,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/gaussdbreader/pom.xml b/gaussdbreader/pom.xml index ad2e0ba058..2b67490838 100644 --- a/gaussdbreader/pom.xml +++ b/gaussdbreader/pom.xml @@ -64,6 +64,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/gaussdbwriter/pom.xml b/gaussdbwriter/pom.xml index 9da02effad..42d273908f 100644 --- a/gaussdbwriter/pom.xml +++ b/gaussdbwriter/pom.xml @@ -64,6 +64,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/gdbreader/pom.xml b/gdbreader/pom.xml index a226a21f88..03c04bd53c 100644 --- a/gdbreader/pom.xml +++ b/gdbreader/pom.xml @@ -96,6 +96,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/gdbwriter/pom.xml b/gdbwriter/pom.xml index 584a1c0987..31a30f456a 100644 --- a/gdbwriter/pom.xml +++ b/gdbwriter/pom.xml @@ -82,6 +82,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/hbase094xreader/pom.xml b/hbase094xreader/pom.xml index d996ba1a7d..6b263f1cbd 100644 --- a/hbase094xreader/pom.xml +++ b/hbase094xreader/pom.xml @@ -80,6 +80,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/hbase094xwriter/pom.xml b/hbase094xwriter/pom.xml index 47e6db4ccf..4fd78d3766 100644 --- a/hbase094xwriter/pom.xml +++ b/hbase094xwriter/pom.xml @@ -93,6 +93,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/hbase11xreader/pom.xml b/hbase11xreader/pom.xml index 9a82b85263..52ff566944 100644 --- a/hbase11xreader/pom.xml +++ b/hbase11xreader/pom.xml @@ -99,6 +99,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/hbase11xsqlreader/pom.xml b/hbase11xsqlreader/pom.xml index bf9e73f170..2a48b0e503 100644 --- a/hbase11xsqlreader/pom.xml +++ b/hbase11xsqlreader/pom.xml @@ -91,6 +91,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/hbase11xsqlwriter/pom.xml b/hbase11xsqlwriter/pom.xml index bc4b61cd10..c3e6ee97c8 100644 --- a/hbase11xsqlwriter/pom.xml +++ b/hbase11xsqlwriter/pom.xml @@ -133,6 +133,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/hbase11xwriter/pom.xml b/hbase11xwriter/pom.xml index 9b86c27d0b..ab0f2a8ef8 100644 --- a/hbase11xwriter/pom.xml +++ b/hbase11xwriter/pom.xml @@ -108,6 +108,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/hbase20xsqlreader/pom.xml b/hbase20xsqlreader/pom.xml index 818123f319..8cf92870b5 100644 --- a/hbase20xsqlreader/pom.xml +++ b/hbase20xsqlreader/pom.xml @@ -89,6 +89,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/hbase20xsqlwriter/pom.xml b/hbase20xsqlwriter/pom.xml index 5a2843e113..4c0424e507 100644 --- a/hbase20xsqlwriter/pom.xml +++ b/hbase20xsqlwriter/pom.xml @@ -83,6 +83,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/hdfsreader/pom.xml b/hdfsreader/pom.xml index f90e71eeb1..9dca49976c 100644 --- a/hdfsreader/pom.xml +++ b/hdfsreader/pom.xml @@ -164,6 +164,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/hdfswriter/pom.xml b/hdfswriter/pom.xml index 5d82520faa..26a35a5c37 100644 --- a/hdfswriter/pom.xml +++ b/hdfswriter/pom.xml @@ -135,6 +135,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/hologresjdbcwriter/pom.xml b/hologresjdbcwriter/pom.xml index a908dfed86..5c4bba8db1 100644 --- a/hologresjdbcwriter/pom.xml +++ b/hologresjdbcwriter/pom.xml @@ -69,6 +69,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/kingbaseesreader/pom.xml b/kingbaseesreader/pom.xml index 6e844c10eb..eb063fa3f4 100644 --- a/kingbaseesreader/pom.xml +++ b/kingbaseesreader/pom.xml @@ -66,6 +66,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/kingbaseeswriter/pom.xml b/kingbaseeswriter/pom.xml index 284c8c5e94..1430b30207 100644 --- a/kingbaseeswriter/pom.xml +++ b/kingbaseeswriter/pom.xml @@ -63,6 +63,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/kuduwriter/pom.xml b/kuduwriter/pom.xml index 5d78be4cb7..bca5e707c3 100644 --- a/kuduwriter/pom.xml +++ b/kuduwriter/pom.xml @@ -61,6 +61,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/loghubreader/pom.xml b/loghubreader/pom.xml index b2f52f3da1..e1ca6fd980 100644 --- a/loghubreader/pom.xml +++ b/loghubreader/pom.xml @@ -52,6 +52,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/loghubwriter/pom.xml b/loghubwriter/pom.xml index d43b7286d8..9452d8a27e 100644 --- a/loghubwriter/pom.xml +++ b/loghubwriter/pom.xml @@ -52,6 +52,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/milvuswriter/pom.xml b/milvuswriter/pom.xml index 16c00560de..95cea4c2d3 100644 --- a/milvuswriter/pom.xml +++ b/milvuswriter/pom.xml @@ -103,6 +103,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/mongodbreader/pom.xml b/mongodbreader/pom.xml index 97356dc779..16f6766f13 100644 --- a/mongodbreader/pom.xml +++ b/mongodbreader/pom.xml @@ -63,6 +63,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java old mode 100644 new mode 100755 index 4d129a5af7..44549514a6 --- a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java +++ b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java @@ -1,11 +1,6 @@ package com.alibaba.datax.plugin.reader.mongodbreader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.Iterator; -import java.util.List; - +import java.util.*; import com.alibaba.datax.common.element.BoolColumn; import com.alibaba.datax.common.element.DateColumn; import com.alibaba.datax.common.element.DoubleColumn; @@ -18,18 +13,21 @@ import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.reader.mongodbreader.util.CollectionSplitUtil; import com.alibaba.datax.plugin.reader.mongodbreader.util.MongoUtil; -import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONArray; -import com.alibaba.fastjson2.JSONObject; - -import com.google.common.base.Joiner; +import com.alibaba.fastjson2.*; +import com.alibaba.datax.common.element.Column; import com.google.common.base.Strings; import com.mongodb.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; +import org.bson.BsonTimestamp; +import org.bson.BsonUndefined; import org.bson.Document; +import org.bson.types.Binary; +import org.bson.types.Code; +import org.bson.types.CodeWithScope; import org.bson.types.ObjectId; +import org.bson.types.Symbol; /** * Created by jianying.wcj on 2015/3/19 0019. @@ -49,7 +47,7 @@ public static class Job extends Reader.Job { @Override public List split(int adviceNumber) { - return CollectionSplitUtil.doSplit(originalConfig,adviceNumber,mongoClient); + return CollectionSplitUtil.doSplit(originalConfig, adviceNumber, mongoClient); } @Override @@ -57,10 +55,10 @@ public void init() { this.originalConfig = super.getPluginJobConf(); this.userName = originalConfig.getString(KeyConstant.MONGO_USER_NAME, originalConfig.getString(KeyConstant.MONGO_USERNAME)); this.password = originalConfig.getString(KeyConstant.MONGO_USER_PASSWORD, originalConfig.getString(KeyConstant.MONGO_PASSWORD)); - String database = originalConfig.getString(KeyConstant.MONGO_DB_NAME, originalConfig.getString(KeyConstant.MONGO_DATABASE)); - String authDb = originalConfig.getString(KeyConstant.MONGO_AUTHDB, database); - if(!Strings.isNullOrEmpty(this.userName) && !Strings.isNullOrEmpty(this.password)) { - this.mongoClient = MongoUtil.initCredentialMongoClient(originalConfig,userName,password,authDb); + String database = originalConfig.getString(KeyConstant.MONGO_DB_NAME, originalConfig.getString(KeyConstant.MONGO_DATABASE)); + String authDb = originalConfig.getString(KeyConstant.MONGO_AUTHDB, database); + if (!Strings.isNullOrEmpty(this.userName) && !Strings.isNullOrEmpty(this.password)) { + this.mongoClient = MongoUtil.initCredentialMongoClient(originalConfig, userName, password, authDb); } else { this.mongoClient = MongoUtil.initMongoClient(originalConfig); } @@ -72,6 +70,352 @@ public void destroy() { } } + /** + * 递归将 BSON 值转换为纯 JSON 兼容对象,避免 MongoDB Extended JSON 中的类型包装。 + * + * Document.get(key) 返回的完整 Java 类型映射(基于 MongoDB Java Driver 5.x BsonTypeClassMap): + * ObjectId -> org.bson.types.ObjectId → 转为 24位十六进制字符串 + * Date -> java.util.Date → 转为 ISO-8601 字符串 + * Binary -> org.bson.types.Binary → 转为 Base64 字符串 + * BsonTimestamp -> org.bson.BsonTimestamp → 转为 ISO-8601 字符串 + * Decimal128 -> org.bson.types.Decimal128 → 转为 BigDecimal 的字符串 + * Code -> org.bson.types.Code → 转为代码字符串 + * CodeWithScope -> org.bson.types.CodeWithScope → 转为 JSON 对象 {code:..., scope:...} + * Symbol -> org.bson.types.Symbol → 转为字符串 + * BsonUndefined -> org.bson.BsonUndefined → 转为 null + * MinKey -> org.bson.types.MinKey → 转为字符串 "MinKey" + * MaxKey -> org.bson.types.MaxKey → 转为字符串 "MaxKey" + * BsonDbPointer -> org.bson.BsonDbPointer → 转为 JSON 对象 {namespace:..., id:...} + * Document -> org.bson.Document → 递归转换为 Map + * List -> java.util.List → 递归转换每个元素 + * 其他基本类型(String,Integer,Long,Double,Boolean)→ 原样返回 + */ + private static Object bsonValueToPlainObject(Object value) { + if (value == null) { + return null; + } + // ObjectId → 24位十六进制字符串(避免 {"$oid": "..."}) + if (value instanceof ObjectId) { + return ((ObjectId) value).toHexString(); + } + // Date → ISO-8601 字符串(避免 {"$date": "..."}) + if (value instanceof java.util.Date) { + return ((java.util.Date) value).toInstant().toString(); + } + // Binary → Base64 字符串(注意:Document API 返回 org.bson.types.Binary,不是 org.bson.BsonBinary) + if (value instanceof Binary) { + return Base64.getEncoder().encodeToString(((Binary) value).getData()); + } + // BsonTimestamp → ISO-8601 字符串(MongoDB 内部时间戳,含 time 秒 + inc 计数器) + if (value instanceof BsonTimestamp) { + long epochSeconds = ((BsonTimestamp) value).getTime(); + return new java.util.Date(epochSeconds * 1000L).toInstant().toString(); + } + // Decimal128 → BigDecimal 字符串(避免科学计数法丢失精度) + if (value instanceof BoolColumn) { + return value.toString(); + } + // Code (JavaScript) → 代码字符串 + if (value instanceof Code) { + return ((Code) value).getCode(); + } + // CodeWithScope (JavaScript with scope) → JSON 对象 + if (value instanceof CodeWithScope) { + CodeWithScope cws = (CodeWithScope) value; + Map map = new LinkedHashMap<>(); + map.put("code", cws.getCode()); + map.put("scope", bsonValueToPlainObject(cws.getScope())); + return map; + } + // Symbol → 字符串 + if (value instanceof Symbol) { + return ((Symbol) value).getSymbol(); + } + // BsonUndefined → null + if (value instanceof BsonUndefined) { + return null; + } + // MinKey → 字符串标识 + if (value instanceof org.bson.types.MinKey) { + return "MinKey"; + } + // MaxKey → 字符串标识 + if (value instanceof org.bson.types.MaxKey) { + return "MaxKey"; + } + // BsonDbPointer → JSON 对象 + if (value instanceof org.bson.BsonDbPointer) { + org.bson.BsonDbPointer ptr = (org.bson.BsonDbPointer) value; + Map map = new LinkedHashMap<>(); + map.put("namespace", ptr.getNamespace()); + map.put("id", ptr.getId().toHexString()); + return map; + } + // BsonRegularExpression → 字符串 + if (value instanceof org.bson.BsonRegularExpression) { + return ((org.bson.BsonRegularExpression) value).getPattern(); + } + // Document 递归转换为 Map + if (value instanceof Document) { + Map map = new LinkedHashMap<>(); + for (Map.Entry entry : ((Document) value).entrySet()) { + map.put(entry.getKey(), bsonValueToPlainObject(entry.getValue())); + } + return map; + } + // List 递归转换每个元素 + if (value instanceof List) { + List list = new ArrayList<>(); + for (Object item : (List) value) { + list.add(bsonValueToPlainObject(item)); + } + return list; + } + // 其他基本类型(String, Integer, Long, Double, Boolean 等)直接返回,fastjson2 可正确序列化 + return value; + } + + /** + * 将 BSON Document 转换为干净的 JSON 字符串(不含 MongoDB Extended JSON 类型包装) + */ + private static String documentToCleanJson(Document doc) { + Object plainObj = bsonValueToPlainObject(doc); + return JSON.toJSONString(plainObj); + } + + /** + * 将 List 转换为干净的 JSON 数组字符串 + */ + private static String listToCleanJson(List list) { + Object plainObj = bsonValueToPlainObject(list); + return JSON.toJSONString(plainObj); + } + + /** + * 安全地将任意 BSON 值转为 String。 + * 解决:Document.getString() 内部强转 (String)get(key),遇到 ObjectId 等非 String 类型会抛 ClassCastException。 + */ + private static String safeToString(Object value) { + if (value == null) { + return null; + } + if (value instanceof String) { + return (String) value; + } + // ObjectId → hex 字符串 + if (value instanceof ObjectId) { + return ((ObjectId) value).toHexString(); + } + if (value instanceof BoolColumn) { + return ((BoolColumn) value).asString(); + } + // 其他类型(Integer, Long, Double, Boolean 等)→ String.valueOf + // 复杂类型走 bsonValueToPlainObject 清洗 + Object plain = bsonValueToPlainObject(value); + if (plain instanceof String) { + return (String) plain; + } + return JSON.toJSONString(plain); + } + + /** + * 安全地将任意 BSON 值转为 Long。 + * 解决:Document.getInteger()/getLong() 遇到类型不匹配会抛 ClassCastException。 + * 支持:Integer → Long, Long → Long, Double → Long(截断), String → Long(解析), Number → Long + */ + private static Long safeToLong(Object value) { + if (value == null) { + return null; + } + if (value instanceof Long) { + return (Long) value; + } + if (value instanceof Integer) { + return ((Integer) value).longValue(); + } + if (value instanceof Number) { + return ((Number) value).longValue(); + } + if (value instanceof String) { + try { + return Long.parseLong((String) value); + } catch (NumberFormatException e) { + return null; + } + } + + return null; + } + + /** + * 安全地将任意 BSON 值转为 Double。 + * 支持:Double, Integer, Long, Number, String, Decimal128 + */ + private static Double safeToDouble(Object value) { + if (value == null) { + return null; + } + if (value instanceof Double) { + return (Double) value; + } + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + if (value instanceof String) { + try { + return Double.parseDouble((String) value); + } catch (NumberFormatException e) { + return null; + } + } + + return null; + } + + /** + * 安全地将任意 BSON 值转为 java.util.Date。 + * 支持:Date, BsonTimestamp, Long(毫秒时间戳), String(ISO格式) + */ + private static java.util.Date safeToDate(Object value) { + if (value == null) { + return null; + } + if (value instanceof java.util.Date) { + return (java.util.Date) value; + } + if (value instanceof BsonTimestamp) { + return new java.util.Date(((BsonTimestamp) value).getTime() * 1000L); + } + if (value instanceof Long) { + return new java.util.Date((Long) value); + } + if (value instanceof String) { + try { + return java.util.Date.from(java.time.Instant.parse((String) value)); + } catch (Exception e) { + return null; + } + } + return null; + } + + /** + * 安全地将任意 BSON 值转为 ObjectId 的 hex 字符串。 + * 解决:Document.getObjectId() 遇到 String 类型的 ObjectId 会抛 ClassCastException。 + * 支持:ObjectId → hex, String(24位hex) → 直接返回 + */ + private static String safeToObjectIdHex(Object value) { + if (value == null) { + return null; + } + if (value instanceof ObjectId) { + return ((ObjectId) value).toHexString(); + } + if (value instanceof String) { + String str = (String) value; + // 如果是 24 位 hex 字符串,直接返回 + if (str.matches("^[0-9a-fA-F]{24}$")) { + return str; + } + return str; + } + // 其他类型兜底 + Object plain = bsonValueToPlainObject(value); + if (plain instanceof String) { + return (String) plain; + } + return JSON.toJSONString(plain); + } + + /** + * 根据运行时实际 BSON 类型自动判断并转换为合适的 DataX Column。 + * 解决 MongoDB schemaless 特性导致同一字段在不同文档中可能是不同类型的问题 + * (例如某文档中是 ObjectId,另一文档中是 String)。 + * + * 自动映射规则(基于 Document.get(key) 返回的 Java 类型): + * null → StringColumn(null) + * String → StringColumn + * ObjectId → StringColumn(hex字符串),不再产生 {"$oid":"..."} + * Integer → LongColumn + * Long → LongColumn + * Double → DoubleColumn + * Decimal128 → StringColumn(plain字符串,保留精度) + * Boolean → StringColumn("true"/"false") + * Date → DateColumn + * BsonTimestamp → DateColumn + * Document → StringColumn(干净JSON字符串,递归清洗所有嵌套BSON类型) + * List → StringColumn(干净JSON数组字符串,递归清洗) + * Binary → StringColumn(Base64) + * 其他 → StringColumn(经bsonValueToPlainObject清洗后的字符串) + */ + private static Column autoDetectColumn(Object value) { + if (value == null) { + return new StringColumn(null); + } + // String → 直接作为字符串 + if (value instanceof String) { + return new StringColumn((String) value); + } + // ObjectId → 转 hex 字符串,不再产生 {"$oid":"..."} + if (value instanceof ObjectId) { + return new StringColumn(((ObjectId) value).toHexString()); + } + // Integer → LongColumn + if (value instanceof Integer) { + return new LongColumn(((Integer) value).longValue()); + } + // Long → LongColumn + if (value instanceof Long) { + return new LongColumn((Long) value); + } + // Double → DoubleColumn + if (value instanceof Double) { + return new DoubleColumn((Double) value); + } + + // Boolean → 字符串 + if (value instanceof Boolean) { + return new StringColumn(value.toString()); + } + // Date → DateColumn + if (value instanceof java.util.Date) { + return new DateColumn((java.util.Date) value); + } + // BsonTimestamp → DateColumn + if (value instanceof BsonTimestamp) { + return new DateColumn(new java.util.Date(((BsonTimestamp) value).getTime() * 1000L)); + } + // Document → 干净 JSON 字符串(递归清洗所有嵌套的 ObjectId 等) + if (value instanceof Document) { + return new StringColumn(documentToCleanJson((Document) value)); + } + // List → 干净 JSON 数组字符串(递归清洗) + if (value instanceof List) { + return new StringColumn(listToCleanJson((List) value)); + } + // Binary → Base64 字符串 + if (value instanceof Binary) { + return new StringColumn(Base64.getEncoder().encodeToString(((Binary) value).getData())); + } + // 其他所有类型:走 bsonValueToPlainObject 清洗后输出字符串 + return new StringColumn(valueToCleanJsonString(value)); + } + + /** + * 安全获取字段的 JSON 字符串表示,对任意 BSON 类型都进行清洗转换。 + * 用于 default 分支和兜底场景,避免 Document.toJson() 产生 Extended JSON。 + */ + private static String valueToCleanJsonString(Object value) { + if (value == null) { + return null; + } + Object plain = bsonValueToPlainObject(value); + // 如果转换结果是 String,直接返回,不再被 fastjson2 加引号 + if (plain instanceof String) { + return (String) plain; + } + return JSON.toJSONString(plain); + } public static class Task extends Reader.Task { @@ -96,16 +440,15 @@ public static class Task extends Reader.Task { @Override public void startRead(RecordSender recordSender) { - if(lowerBound== null || upperBound == null || - mongoClient == null || database == null || - collection == null || mongodbColumnMeta == null) { + if (lowerBound == null || upperBound == null || + mongoClient == null || database == null || + collection == null || mongodbColumnMeta == null) { throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE, - MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription()); + MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription()); } MongoDatabase db = mongoClient.getDatabase(database); MongoCollection col = db.getCollection(this.collection); - MongoCursor dbCursor = null; Document filter = new Document(); if (lowerBound.equals("min")) { if (!upperBound.equals("max")) { @@ -116,72 +459,66 @@ public void startRead(RecordSender recordSender) { } else { filter.append(KeyConstant.MONGO_PRIMARY_ID, new Document("$gte", isObjectId ? new ObjectId(lowerBound.toString()) : lowerBound).append("$lt", isObjectId ? new ObjectId(upperBound.toString()) : upperBound)); } - if(!Strings.isNullOrEmpty(query)) { + if (!Strings.isNullOrEmpty(query)) { Document queryFilter = Document.parse(query); filter = new Document("$and", Arrays.asList(filter, queryFilter)); } - dbCursor = col.find(filter).iterator(); - while (dbCursor.hasNext()) { - Document item = dbCursor.next(); - Record record = recordSender.createRecord(); - Iterator columnItera = mongodbColumnMeta.iterator(); - while (columnItera.hasNext()) { - JSONObject column = (JSONObject)columnItera.next(); - Object tempCol = item.get(column.getString(KeyConstant.COLUMN_NAME)); - if (tempCol == null) { - if (KeyConstant.isDocumentType(column.getString(KeyConstant.COLUMN_TYPE))) { - String[] name = column.getString(KeyConstant.COLUMN_NAME).split("\\."); - if (name.length > 1) { - Object obj; - Document nestedDocument = item; - for (String str : name) { - obj = nestedDocument.get(str); - if (obj instanceof Document) { - nestedDocument = (Document) obj; - } - } - - if (null != nestedDocument) { - Document doc = nestedDocument; - tempCol = doc.get(name[name.length - 1]); - } - } - } - } - if (tempCol == null) { - //continue; 这个不能直接continue会导致record到目的端错位 - record.addColumn(new StringColumn(null)); - }else if (tempCol instanceof Double) { - //TODO deal with Double.isNaN() - record.addColumn(new DoubleColumn((Double) tempCol)); - } else if (tempCol instanceof Boolean) { - record.addColumn(new BoolColumn((Boolean) tempCol)); - } else if (tempCol instanceof Date) { - record.addColumn(new DateColumn((Date) tempCol)); - } else if (tempCol instanceof Integer) { - record.addColumn(new LongColumn((Integer) tempCol)); - }else if (tempCol instanceof Long) { - record.addColumn(new LongColumn((Long) tempCol)); - } else { - if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) { - String splitter = column.getString(KeyConstant.COLUMN_SPLITTER); - if(Strings.isNullOrEmpty(splitter)) { - throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE, - MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription()); - } else { - ArrayList array = (ArrayList)tempCol; - String tempArrayStr = Joiner.on(splitter).join(array); - record.addColumn(new StringColumn(tempArrayStr)); - } - } else { - record.addColumn(new StringColumn(tempCol.toString())); + + MongoCursor dbCursor = null; + try { + dbCursor = col.find(filter).iterator(); + while (dbCursor.hasNext()) { + Document item = dbCursor.next(); + Record record = recordSender.createRecord(); + Iterator columnItera = mongodbColumnMeta.iterator(); + while (columnItera.hasNext()) { + JSONObject column = (JSONObject) columnItera.next(); + String columnName = column.getString(KeyConstant.COLUMN_NAME); + + Object rawValue = item.get(columnName); + + switch (column.getString(KeyConstant.COLUMN_TYPE).toLowerCase()) { + case "string": + record.addColumn(new StringColumn(safeToString(rawValue))); + break; + case "int": + case "int32": + case "integer": + record.addColumn(new LongColumn(safeToLong(rawValue))); + break; + case "int64": + case "long": + record.addColumn(new LongColumn(safeToLong(rawValue))); + break; + case "double": + record.addColumn(new DoubleColumn(safeToDouble(rawValue))); + break; + case "date": + record.addColumn(new DateColumn(safeToDate(rawValue))); + break; + case "objectid": + record.addColumn(new StringColumn(safeToObjectIdHex(rawValue))); + break; + case "auto": + record.addColumn(autoDetectColumn(rawValue)); + break; + default: + // 未识别的类型也走自动检测,彻底避免类型不匹配 + record.addColumn(autoDetectColumn(rawValue)); + break; } } + recordSender.sendToWriter(record); + } + } finally { + // 确保游标关闭,防止资源泄漏 + if (dbCursor != null) { + dbCursor.close(); } - recordSender.sendToWriter(record); } } + @Override public void init() { this.readerSliceConfig = super.getPluginJobConf(); @@ -189,8 +526,8 @@ public void init() { this.password = readerSliceConfig.getString(KeyConstant.MONGO_USER_PASSWORD, readerSliceConfig.getString(KeyConstant.MONGO_PASSWORD)); this.database = readerSliceConfig.getString(KeyConstant.MONGO_DB_NAME, readerSliceConfig.getString(KeyConstant.MONGO_DATABASE)); this.authDb = readerSliceConfig.getString(KeyConstant.MONGO_AUTHDB, this.database); - if(!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) { - mongoClient = MongoUtil.initCredentialMongoClient(readerSliceConfig,userName,password,authDb); + if (!Strings.isNullOrEmpty(userName) && !Strings.isNullOrEmpty(password)) { + mongoClient = MongoUtil.initCredentialMongoClient(readerSliceConfig, userName, password, authDb); } else { mongoClient = MongoUtil.initMongoClient(readerSliceConfig); } diff --git a/mongodbwriter/pom.xml b/mongodbwriter/pom.xml index ac6d9394d9..5acf4b6222 100644 --- a/mongodbwriter/pom.xml +++ b/mongodbwriter/pom.xml @@ -67,6 +67,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/mysqlreader/pom.xml b/mysqlreader/pom.xml index 621326ae27..f9e07e45d1 100755 --- a/mysqlreader/pom.xml +++ b/mysqlreader/pom.xml @@ -60,6 +60,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/mysqlwriter/pom.xml b/mysqlwriter/pom.xml index 1c3891f5d5..d68b118fb6 100755 --- a/mysqlwriter/pom.xml +++ b/mysqlwriter/pom.xml @@ -58,6 +58,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/neo4jwriter/pom.xml b/neo4jwriter/pom.xml index 2ff0f55021..fb5c048b50 100644 --- a/neo4jwriter/pom.xml +++ b/neo4jwriter/pom.xml @@ -79,6 +79,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/obhbasereader/pom.xml b/obhbasereader/pom.xml index 7c8ffb1e5c..cf9aaad395 100755 --- a/obhbasereader/pom.xml +++ b/obhbasereader/pom.xml @@ -131,6 +131,8 @@ maven-assembly-plugin +2.2-beta-3 + src/main/assembly/package.xml diff --git a/obhbasewriter/pom.xml b/obhbasewriter/pom.xml index b6a29f92c2..815bbf703c 100644 --- a/obhbasewriter/pom.xml +++ b/obhbasewriter/pom.xml @@ -160,6 +160,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/oceanbasev10reader/pom.xml b/oceanbasev10reader/pom.xml index 66965320fd..93a36a8d14 100644 --- a/oceanbasev10reader/pom.xml +++ b/oceanbasev10reader/pom.xml @@ -76,6 +76,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/oceanbasev10writer/pom.xml b/oceanbasev10writer/pom.xml index d19864012f..f2b3a576e5 100644 --- a/oceanbasev10writer/pom.xml +++ b/oceanbasev10writer/pom.xml @@ -290,6 +290,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/ocswriter/pom.xml b/ocswriter/pom.xml index 51e75b74d9..7b94e255d7 100644 --- a/ocswriter/pom.xml +++ b/ocswriter/pom.xml @@ -65,6 +65,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/odpsreader/pom.xml b/odpsreader/pom.xml index 3f2c4acbd4..f7ab2f410c 100755 --- a/odpsreader/pom.xml +++ b/odpsreader/pom.xml @@ -116,6 +116,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/odpswriter/pom.xml b/odpswriter/pom.xml index c253e3fc7e..4f6a55c891 100755 --- a/odpswriter/pom.xml +++ b/odpswriter/pom.xml @@ -108,6 +108,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/opentsdbreader/pom.xml b/opentsdbreader/pom.xml index b10fba0207..943a89280b 100644 --- a/opentsdbreader/pom.xml +++ b/opentsdbreader/pom.xml @@ -127,6 +127,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/oraclereader/pom.xml b/oraclereader/pom.xml index d60e5ebf6f..ffa878206a 100755 --- a/oraclereader/pom.xml +++ b/oraclereader/pom.xml @@ -62,6 +62,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/oraclewriter/pom.xml b/oraclewriter/pom.xml index 1e8d027432..e8ece82755 100755 --- a/oraclewriter/pom.xml +++ b/oraclewriter/pom.xml @@ -59,6 +59,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/oscarwriter/pom.xml b/oscarwriter/pom.xml index 06249a26e9..7d2f6ffb61 100644 --- a/oscarwriter/pom.xml +++ b/oscarwriter/pom.xml @@ -68,6 +68,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/ossreader/pom.xml b/ossreader/pom.xml index d27b6a3dfb..9305b3e60e 100755 --- a/ossreader/pom.xml +++ b/ossreader/pom.xml @@ -82,6 +82,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/osswriter/pom.xml b/osswriter/pom.xml index ac4029e0e9..c0db638c86 100644 --- a/osswriter/pom.xml +++ b/osswriter/pom.xml @@ -124,6 +124,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/otsreader/pom.xml b/otsreader/pom.xml index dad538bf1d..2ea3e3db20 100644 --- a/otsreader/pom.xml +++ b/otsreader/pom.xml @@ -86,6 +86,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/otsstreamreader/pom.xml b/otsstreamreader/pom.xml index db75ba1e09..61c3130b95 100644 --- a/otsstreamreader/pom.xml +++ b/otsstreamreader/pom.xml @@ -91,6 +91,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/otswriter/pom.xml b/otswriter/pom.xml index f393d76c41..003900fd14 100644 --- a/otswriter/pom.xml +++ b/otswriter/pom.xml @@ -80,6 +80,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/pom.xml b/pom.xml index 1b364a7546..697c592aee 100644 --- a/pom.xml +++ b/pom.xml @@ -277,6 +277,7 @@ maven-assembly-plugin +2.2-beta-3 datax diff --git a/postgresqlreader/pom.xml b/postgresqlreader/pom.xml index 410d10a227..2442e9598c 100755 --- a/postgresqlreader/pom.xml +++ b/postgresqlreader/pom.xml @@ -64,6 +64,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/postgresqlwriter/pom.xml b/postgresqlwriter/pom.xml index 1c9edaf63c..578b741ebe 100755 --- a/postgresqlwriter/pom.xml +++ b/postgresqlwriter/pom.xml @@ -61,6 +61,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/rdbmsreader/pom.xml b/rdbmsreader/pom.xml index d75c38402a..6f1e9b0918 100755 --- a/rdbmsreader/pom.xml +++ b/rdbmsreader/pom.xml @@ -86,6 +86,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/rdbmswriter/pom.xml b/rdbmswriter/pom.xml index a74838b750..f783aa0418 100755 --- a/rdbmswriter/pom.xml +++ b/rdbmswriter/pom.xml @@ -86,6 +86,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/selectdbwriter/pom.xml b/selectdbwriter/pom.xml index fd2a19f7d6..f1be248ece 100644 --- a/selectdbwriter/pom.xml +++ b/selectdbwriter/pom.xml @@ -75,6 +75,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/sqlserverreader/pom.xml b/sqlserverreader/pom.xml index 326f1ce51d..635b8f3fdd 100755 --- a/sqlserverreader/pom.xml +++ b/sqlserverreader/pom.xml @@ -52,6 +52,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/sqlserverwriter/pom.xml b/sqlserverwriter/pom.xml index 6f52c14c96..6bd5424874 100644 --- a/sqlserverwriter/pom.xml +++ b/sqlserverwriter/pom.xml @@ -56,6 +56,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/starrocksreader/pom.xml b/starrocksreader/pom.xml index a8b049ea9c..46d0777720 100644 --- a/starrocksreader/pom.xml +++ b/starrocksreader/pom.xml @@ -74,6 +74,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/starrockswriter/pom.xml b/starrockswriter/pom.xml index 73a5142240..ecd7ce1a61 100755 --- a/starrockswriter/pom.xml +++ b/starrockswriter/pom.xml @@ -134,6 +134,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/streamreader/pom.xml b/streamreader/pom.xml index 7d186076d2..946778fa35 100755 --- a/streamreader/pom.xml +++ b/streamreader/pom.xml @@ -62,6 +62,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/streamwriter/pom.xml b/streamwriter/pom.xml index 2fa95d7bb3..a0f9e6d21b 100755 --- a/streamwriter/pom.xml +++ b/streamwriter/pom.xml @@ -56,6 +56,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/sybasereader/pom.xml b/sybasereader/pom.xml index 9ff7ce6695..3f87d1e0f0 100644 --- a/sybasereader/pom.xml +++ b/sybasereader/pom.xml @@ -88,6 +88,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/sybasewriter/pom.xml b/sybasewriter/pom.xml index 821969fc50..dcadc57159 100644 --- a/sybasewriter/pom.xml +++ b/sybasewriter/pom.xml @@ -78,6 +78,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/tdenginereader/pom.xml b/tdenginereader/pom.xml index 075a2789c0..46047aeacf 100644 --- a/tdenginereader/pom.xml +++ b/tdenginereader/pom.xml @@ -84,6 +84,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml index a7564e6bad..a7833f9318 100644 --- a/tdenginewriter/pom.xml +++ b/tdenginewriter/pom.xml @@ -88,6 +88,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/transformer/pom.xml b/transformer/pom.xml index cbfab8ea71..0a8a148b4b 100644 --- a/transformer/pom.xml +++ b/transformer/pom.xml @@ -46,6 +46,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/tsdbreader/pom.xml b/tsdbreader/pom.xml index 4b3f58c633..59bfccfca3 100644 --- a/tsdbreader/pom.xml +++ b/tsdbreader/pom.xml @@ -117,6 +117,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/tsdbwriter/pom.xml b/tsdbwriter/pom.xml index 9f99712340..1f352802c7 100644 --- a/tsdbwriter/pom.xml +++ b/tsdbwriter/pom.xml @@ -114,6 +114,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml diff --git a/txtfilereader/pom.xml b/txtfilereader/pom.xml index 30671d76d6..ec8baba0b7 100755 --- a/txtfilereader/pom.xml +++ b/txtfilereader/pom.xml @@ -56,6 +56,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml @@ -75,4 +76,4 @@ - + diff --git a/txtfilewriter/pom.xml b/txtfilewriter/pom.xml index 30a6cf6178..40713f9950 100755 --- a/txtfilewriter/pom.xml +++ b/txtfilewriter/pom.xml @@ -57,6 +57,7 @@ maven-assembly-plugin +2.2-beta-3 src/main/assembly/package.xml @@ -75,4 +76,4 @@ - + From 0149471f4a9ec8684947ae16f2f221a090713afe Mon Sep 17 00:00:00 2001 From: hisfei Date: Wed, 10 Jun 2026 18:02:41 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=20mongodbreader=E5=85=BC?= =?UTF-8?q?=E5=AE=B9=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mk.sh | 1 + .../reader/mongodbreader/MongoDBReader.java | 30 +++++++++++++++---- .../util/CollectionSplitUtil.java | 30 +++++++++++-------- 3 files changed, 44 insertions(+), 17 deletions(-) create mode 100644 mk.sh diff --git a/mk.sh b/mk.sh new file mode 100644 index 0000000000..e2e7820af7 --- /dev/null +++ b/mk.sh @@ -0,0 +1 @@ +/Users/fei/Applications/"IntelliJ IDEA.app"/Contents/plugins/maven/lib/maven3/bin/mvn -U clean package assembly:assembly -Dmaven.test.skip=true \ No newline at end of file diff --git a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java index 44549514a6..b5a2c65ab7 100755 --- a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java +++ b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java @@ -190,7 +190,26 @@ private static String listToCleanJson(List list) { Object plainObj = bsonValueToPlainObject(list); return JSON.toJSONString(plainObj); } - + /** + * 安全地将分片边界值转为 MongoDB 查询所需的类型。 + * 如果 isObjectId=true 且 bound 是合法的 24 位 hex 字符串,则转为 ObjectId; + * 否则直接返回 bound 的字符串形式,不强行转换,避免 ClassCastException。 + */ + private static Object safeBoundValue(Object bound, boolean isObjectId) { + if (bound == null) { + return null; + } + if (isObjectId) { + String str = bound.toString(); + // 只有确认为 24 位合法 hex 字符串才转 ObjectId + if (str.matches("^[0-9a-fA-F]{24}$")) { + return new ObjectId(str); + } + // 不是合法 hex,说明 _id 实际不是 ObjectId 类型,直接用原值 + return str; + } + return bound; + } /** * 安全地将任意 BSON 值转为 String。 * 解决:Document.getString() 内部强转 (String)get(key),遇到 ObjectId 等非 String 类型会抛 ClassCastException。 @@ -452,13 +471,14 @@ public void startRead(RecordSender recordSender) { Document filter = new Document(); if (lowerBound.equals("min")) { if (!upperBound.equals("max")) { - filter.append(KeyConstant.MONGO_PRIMARY_ID, new Document("$lt", isObjectId ? new ObjectId(upperBound.toString()) : upperBound)); + filter.append(KeyConstant.MONGO_PRIMARY_ID, new Document("$lt", safeBoundValue(upperBound, isObjectId))); } } else if (upperBound.equals("max")) { - filter.append(KeyConstant.MONGO_PRIMARY_ID, new Document("$gte", isObjectId ? new ObjectId(lowerBound.toString()) : lowerBound)); + filter.append(KeyConstant.MONGO_PRIMARY_ID, new Document("$gte", safeBoundValue(lowerBound, isObjectId))); } else { - filter.append(KeyConstant.MONGO_PRIMARY_ID, new Document("$gte", isObjectId ? new ObjectId(lowerBound.toString()) : lowerBound).append("$lt", isObjectId ? new ObjectId(upperBound.toString()) : upperBound)); - } + filter.append(KeyConstant.MONGO_PRIMARY_ID, + new Document("$gte", safeBoundValue(lowerBound, isObjectId)) + .append("$lt", safeBoundValue(upperBound, isObjectId))); } if (!Strings.isNullOrEmpty(query)) { Document queryFilter = Document.parse(query); filter = new Document("$and", Arrays.asList(filter, queryFilter)); diff --git a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java index a66578f8da..4f7afbb546 100644 --- a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java +++ b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java @@ -50,6 +50,20 @@ public static List doSplit( return confList; } + /** + * 安全地将 _id 值转为可序列化的分片点字符串。 + * ObjectId → hex 字符串;其他类型 → toString();null → null。 + * 不再使用 (ObjectId) 强转,避免 schemaless 集合中 _id 类型不一致导致 ClassCastException。 + */ + private static Object idToSplitPoint(Object id) { + if (id == null) { + return null; + } + if (id instanceof ObjectId) { + return ((ObjectId) id).toHexString(); + } + return id.toString(); + } private static boolean isPrimaryIdObjectId(MongoClient mongoClient, String dbName, String collName) { MongoDatabase database = mongoClient.getDatabase(dbName); @@ -126,12 +140,8 @@ private static List doSplitCollection(int adviceNumber, MongoClient mongo for (int i = 0; i < splitKeys.size(); i++) { Document splitKey = splitKeys.get(i); Object id = splitKey.get(KeyConstant.MONGO_PRIMARY_ID); - if (isObjectId) { - ObjectId oid = (ObjectId)id; - splitPoints.add(oid.toHexString()); - } else { - splitPoints.add(id); - } + // 安全转换:不再强转 (ObjectId),用 instanceof 判断 + splitPoints.add(idToSplitPoint(id)); } } else { int skipCount = chunkDocCount; @@ -140,12 +150,8 @@ private static List doSplitCollection(int adviceNumber, MongoClient mongo for (int i = 0; i < splitPointCount; i++) { Document doc = col.find().skip(skipCount).limit(chunkDocCount).first(); Object id = doc.get(KeyConstant.MONGO_PRIMARY_ID); - if (isObjectId) { - ObjectId oid = (ObjectId)id; - splitPoints.add(oid.toHexString()); - } else { - splitPoints.add(id); - } + // 安全转换:不再强转 (ObjectId),用 instanceof 判断 + splitPoints.add(idToSplitPoint(id)); skipCount += chunkDocCount; } }