Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/computer-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ jobs:
TRAVIS_DIR: computer-dist/src/assembly/travis
BSP_ETCD_URL: http://localhost:2579
KUBERNETES_VERSION: 1.20.1
# TODO: adapt the HugeGraph Server/Loader version to 1.5.0 (EdgeID has 5 parts now)
# NOTE: Remember to adaptor/update the version before new release
GRAPH_ENV_VERSION: 1.3.0
GRAPH_ENV_VERSION: 1.7.0

steps:
- name: Checkout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.hugegraph.computer.core.output.hg.exceptions.WriteBackException;
import org.apache.hugegraph.computer.core.output.hg.metrics.LoadSummary;
import org.apache.hugegraph.computer.core.output.hg.metrics.Printer;
import org.apache.hugegraph.computer.core.util.HugeClientUtil;
import org.apache.hugegraph.driver.HugeClient;
import org.apache.hugegraph.driver.HugeClientBuilder;
import org.apache.hugegraph.structure.graph.Vertex;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.Log;
Expand Down Expand Up @@ -58,7 +58,8 @@ public TaskManager(Config config) {
String graph = config.get(ComputerOptions.HUGEGRAPH_GRAPH_NAME);
String username = config.get(ComputerOptions.HUGEGRAPH_USERNAME);
String password = config.get(ComputerOptions.HUGEGRAPH_PASSWORD);
this.client = new HugeClientBuilder(url, graph).configUser(username, password).build();
this.client = HugeClientUtil.newHugeClient(url, graph, username,
password);
// Try to make all batch threads running and don't wait for producer
this.batchSemaphore = new Semaphore(this.batchSemaphoreNum());
/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.computer.core.util;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hugegraph.driver.HugeClient;
import org.apache.hugegraph.driver.HugeClientBuilder;
import org.apache.hugegraph.rest.RestResult;
import org.apache.hugegraph.structure.schema.EdgeLabel;
import org.apache.hugegraph.util.JsonUtil;

import com.fasterxml.jackson.databind.BeanDescription;
import com.fasterxml.jackson.databind.DeserializationConfig;
import com.fasterxml.jackson.databind.deser.BeanDeserializerBuilder;
import com.fasterxml.jackson.databind.deser.BeanDeserializerModifier;
import com.fasterxml.jackson.databind.module.SimpleModule;

public final class HugeClientUtil {

private static final AtomicBoolean COMPATIBILITY_REGISTERED =
new AtomicBoolean(false);
private static final String EDGE_LABEL_TYPE = "edgelabel_type";
private static final String EDGE_LABEL_PARENT_LABEL = "parent_label";
private static final String EDGE_LABEL_LINKS = "links";

public static HugeClient newHugeClient(String url, String graph,
String username, String password) {
registerCompatibilityModule();
return new HugeClientBuilder(url, graph).configUser(username, password)
.build();
}

public static HugeClient newHugeClient(String url, String graph,
String username, String password,
int timeout) {
registerCompatibilityModule();
return new HugeClientBuilder(url, graph).configUser(username, password)
.configTimeout(timeout)
.build();
}

public static void registerCompatibilityModule() {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Important: please make the completion flag reflect completed registration, not registration start.

COMPATIBILITY_REGISTERED.compareAndSet(false, true) flips the guard before the compatibility module is registered into both global mappers. A concurrent caller entering during that window will return immediately and can create/use a client before RestResult and JsonUtil are both configured.

Please make the registration completion the synchronization boundary, for example:

Suggested change
public static void registerCompatibilityModule() {
public static void registerCompatibilityModule() {
if (COMPATIBILITY_REGISTERED.get()) {
return;
}
synchronized (HugeClientUtil.class) {
if (COMPATIBILITY_REGISTERED.get()) {
return;
}
RestResult.registerModule(newCompatibilityModule());
JsonUtil.registerModule(newCompatibilityModule());
COMPATIBILITY_REGISTERED.set(true);
}
}

if (!COMPATIBILITY_REGISTERED.compareAndSet(false, true)) {
return;
}
RestResult.registerModule(newCompatibilityModule());
JsonUtil.registerModule(newCompatibilityModule());
}

private static SimpleModule newCompatibilityModule() {
SimpleModule module = new SimpleModule(
"hugegraph-computer-client-compatibility");
module.setDeserializerModifier(new BeanDeserializerModifier() {

@Override
public BeanDeserializerBuilder updateBuilder(
DeserializationConfig config, BeanDescription beanDesc,
BeanDeserializerBuilder builder) {
if (EdgeLabel.class.equals(beanDesc.getBeanClass())) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Please avoid making current java-client fields globally ignorable.

This compatibility module ignores edgelabel_type, parent_label, and links for every EdgeLabel deserialization. That works for the pinned 1.3 client because those fields are unknown there, but in current java-client 1.7 they are real fields: sourceLabel() / targetLabel() derive from links.

A more version-tolerant approach would ignore unknown fields rather than explicitly dropping known current fields, for example with an @JsonIgnoreProperties(ignoreUnknown = true) mix-in, or by conditionally adding ignorables only when the runtime EdgeLabel class does not expose those fields. The tests should prove both sides: old client does not fail on new server fields, and current client semantics do not lose links.

addEdgeLabelIgnorable(builder, EDGE_LABEL_TYPE,
"edgeLabelType");
addEdgeLabelIgnorable(builder, EDGE_LABEL_PARENT_LABEL,
"parentLabel");
addEdgeLabelIgnorable(builder, EDGE_LABEL_LINKS, "links");
}
return builder;
}
});
return module;
}

private static void addEdgeLabelIgnorable(BeanDeserializerBuilder builder,
String jsonProperty,
String methodName) {
if (shouldIgnoreEdgeLabelProperty(EdgeLabel.class, methodName)) {
builder.addIgnorable(jsonProperty);
}
}

static boolean shouldIgnoreEdgeLabelProperty(Class<?> edgeLabelClass,
String methodName) {
try {
edgeLabelClass.getMethod(methodName);
return false;
} catch (NoSuchMethodException ignored) {
return true;
}
}

private HugeClientUtil() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@
import org.apache.hugegraph.computer.core.graph.value.NullValue;
import org.apache.hugegraph.computer.core.graph.value.StringValue;
import org.apache.hugegraph.computer.core.graph.value.Value;
import org.apache.hugegraph.structure.graph.Edge;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.SplicingIdGenerator;

public final class HugeConverter {

private static final int LEGACY_EDGE_ID_PARTS = 4;
private static final int DIRECTIONAL_EDGE_ID_PARTS = 6;

private static final GraphFactory GRAPH_FACTORY =
ComputerContext.instance().graphFactory();

Expand Down Expand Up @@ -96,4 +101,19 @@ public static Properties convertProperties(
}
return properties;
}

public static String convertEdgeName(Edge edge) {
E.checkArgumentNotNull(edge, "The edge can't be null");
String edgeId = edge.id();
if (edgeId == null) {
return edge.name();
}

String[] parts = SplicingIdGenerator.split(edgeId);
if (parts.length >= LEGACY_EDGE_ID_PARTS &&
parts.length <= DIRECTIONAL_EDGE_ID_PARTS) {
return parts[parts.length - 2];
}
return edge.name();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.hugegraph.computer.core.input.InputSplit;
import org.apache.hugegraph.computer.core.input.VertexFetcher;
import org.apache.hugegraph.computer.core.rpc.InputSplitRpcService;
import org.apache.hugegraph.computer.core.util.HugeClientUtil;
import org.apache.hugegraph.driver.HugeClient;
import org.apache.hugegraph.driver.HugeClientBuilder;

public class HugeGraphFetcher implements GraphFetcher {

Expand All @@ -39,7 +39,8 @@ public HugeGraphFetcher(Config config, InputSplitRpcService rpcService) {
String graph = config.get(ComputerOptions.HUGEGRAPH_GRAPH_NAME);
String username = config.get(ComputerOptions.HUGEGRAPH_USERNAME);
String password = config.get(ComputerOptions.HUGEGRAPH_PASSWORD);
this.client = new HugeClientBuilder(url, graph).configUser(username, password).build();
this.client = HugeClientUtil.newHugeClient(url, graph, username,
password);
this.vertexFetcher = new HugeVertexFetcher(config, this.client);
this.edgeFetcher = new HugeEdgeFetcher(config, this.client);
this.rpcService = rpcService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.input.InputSplit;
import org.apache.hugegraph.computer.core.input.InputSplitFetcher;
import org.apache.hugegraph.computer.core.util.HugeClientUtil;
import org.apache.hugegraph.driver.HugeClient;
import org.apache.hugegraph.driver.HugeClientBuilder;
import org.apache.hugegraph.structure.graph.Shard;
import org.apache.hugegraph.util.E;

Expand All @@ -41,9 +41,8 @@ public HugeInputSplitFetcher(Config config) {
String username = config.get(ComputerOptions.HUGEGRAPH_USERNAME);
String password = config.get(ComputerOptions.HUGEGRAPH_PASSWORD);
int timeout = config.get(ComputerOptions.INPUT_SPLIT_FETCH_TIMEOUT);
this.client = new HugeClientBuilder(url, graph).configUser(username, password)
.configTimeout(timeout)
.build();
this.client = HugeClientUtil.newHugeClient(url, graph, username,
password, timeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ private Edge convert(org.apache.hugegraph.structure.graph.Edge edge) {
Properties properties = HugeConverter.convertProperties(
edge.properties());
Edge computerEdge = graphFactory.createEdge(edge.label(),
edge.name(), targetId
HugeConverter.convertEdgeName(edge),
targetId
);
computerEdge.label(edge.label());
computerEdge.properties(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import org.apache.hugegraph.computer.core.graph.value.StringValue;
import org.apache.hugegraph.computer.core.graph.value.ValueType;
import org.apache.hugegraph.computer.suite.unit.UnitTestBase;
import org.apache.hugegraph.structure.graph.Edge;
import org.apache.hugegraph.testutil.Assert;
import org.junit.Test;
import org.mockito.Mockito;

import com.google.common.collect.ImmutableList;

Expand Down Expand Up @@ -126,4 +128,74 @@ public void testConvertProperties() {
Assert.assertEquals(properties,
HugeConverter.convertProperties(rawProperties));
}

@Test
public void testConvertEdgeNameWithLegacyFourPartEdgeId() {
Edge edge = Mockito.mock(Edge.class);
Mockito.when(edge.id()).thenReturn(
"S1:178201>5>参数标准!3BA0>S4:239464");
Mockito.when(edge.name()).thenReturn("stale_client_name");

Assert.assertEquals("参数标准!3BA0",
HugeConverter.convertEdgeName(edge));
}

@Test
public void testConvertEdgeNameWithFivePartEdgeId() {
Edge edge = new Edge("belong_to_el_defect");
edge.id("S1:178201>5>5>参数标准!3BA0>S4:239464");

Assert.assertEquals("参数标准!3BA0",
HugeConverter.convertEdgeName(edge));
}

@Test
public void testConvertEdgeNameWithSixPartEdgeId() {
Edge edge = new Edge("belong_to_el_defect");
edge.id("S1:178201>O>5>5>参数标准!3BA0>S4:239464");

Assert.assertEquals("参数标准!3BA0",
HugeConverter.convertEdgeName(edge));
}

@Test
public void testConvertEdgeNameWithSixPartInEdgeId() {
Edge edge = new Edge("belong_to_el_defect");
edge.id("S4:239464>I>5>5>参数标准!3BA0>S1:178201");

Assert.assertEquals("参数标准!3BA0",
HugeConverter.convertEdgeName(edge));
}

@Test
public void testConvertEdgeNameWithNullEdgeId() {
Edge edge = Mockito.mock(Edge.class);
Mockito.when(edge.id()).thenReturn(null);
Mockito.when(edge.name()).thenReturn("fallback_name");

Assert.assertEquals("fallback_name",
HugeConverter.convertEdgeName(edge));
}

@Test
public void testConvertEdgeNameWithUnknownEdgeIdFormat() {
Edge edge = Mockito.mock(Edge.class);
Mockito.when(edge.id()).thenReturn(
"S1:178201>bad>edge");
Mockito.when(edge.name()).thenReturn("fallback_name");

Assert.assertEquals("fallback_name",
HugeConverter.convertEdgeName(edge));

Mockito.when(edge.id()).thenReturn(
"S1:178201>O>5>5>参数标准!3BA0>S4:239464>extra");
Assert.assertEquals("fallback_name",
HugeConverter.convertEdgeName(edge));
}

@Test
public void testConvertEdgeNameWithNullEdge() {
Assert.assertThrows(IllegalArgumentException.class,
() -> HugeConverter.convertEdgeName(null));
}
}
Loading