Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,13 @@ public Map<String, Object> list(@Context GraphManager manager,
limit = NO_LIMIT;
List<Id> idList = ids.stream().map(IdGenerator::of)
.collect(Collectors.toList());
iter = scheduler.tasks(idList);
iter = scheduler.tasks(idList, false);
} else {
if (status == null) {
iter = scheduler.tasks(null, limit, page);
iter = scheduler.tasks(null, limit, page, false);
} else {
iter = scheduler.tasks(parseStatus(status), limit, page);
iter = scheduler.tasks(parseStatus(status), limit, page,
false);
}
}

Expand Down Expand Up @@ -136,12 +137,17 @@ public Map<String, Object> get(@Context GraphManager manager,
@Parameter(description = "The graph name")
@PathParam("graph") String graph,
@Parameter(description = "The task id")
@PathParam("id") long id) {
@PathParam("id") long id,
@Parameter(description = "Whether to load task result")
@DefaultValue("true")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this introduces a new public query parameter, please update the PR checklist/docs status accordingly, or briefly explain why the generated OpenAPI annotation is sufficient and no user-facing docs are needed.

@QueryParam("with_result")
boolean withResult) {
LOG.debug("Graph [{}] get task: {}", graph, id);

TaskScheduler scheduler = graph(manager, graphSpace, graph)
.taskScheduler();
return scheduler.task(IdGenerator.of(id)).asMap();
return scheduler.task(IdGenerator.of(id), withResult)
.asMap(true, withResult);
}

@DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,8 +1326,14 @@ public <V> void save(HugeTask<V> task) {

@Override
public <V> HugeTask<V> task(Id id) {
return this.task(id, true);
}

@Override
public <V> HugeTask<V> task(Id id, boolean withResult) {
return verifyTaskPermission(HugePermission.READ,
this.taskScheduler.task(id));
this.taskScheduler.task(id,
withResult));
}

@Override
Expand All @@ -1336,18 +1342,36 @@ public <V> Iterator<HugeTask<V>> tasks(List<Id> ids) {
this.taskScheduler.tasks(ids));
}

@Override
public <V> Iterator<HugeTask<V>> tasks(List<Id> ids,
boolean withResult) {
return verifyTaskPermission(HugePermission.READ,
this.taskScheduler.tasks(ids,
withResult));
}

@Override
public <V> Iterator<HugeTask<V>> tasks(TaskStatus status,
long limit, String page) {
Iterator<HugeTask<V>> tasks = this.taskScheduler.tasks(status,
limit, page);
limit,
page);
return verifyTaskPermission(HugePermission.READ, tasks);
}

@Override
public <V> Iterator<HugeTask<V>> tasks(TaskStatus status,
long limit, String page,
boolean withResult) {
Iterator<HugeTask<V>> tasks = this.taskScheduler.tasks(
status, limit, page, withResult);
return verifyTaskPermission(HugePermission.READ, tasks);
}

@Override
public <V> HugeTask<V> delete(Id id, boolean force) {
verifyTaskPermission(HugePermission.DELETE,
this.taskScheduler.task(id));
this.taskScheduler.task(id, false));
return this.taskScheduler.delete(id, force);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,9 @@ protected <V> HugeTask<V> deleteFromDB(Id id) {
if (vertex == null) {
return null;
}
HugeTask<V> result = HugeTask.fromVertex(vertex);
this.tx().removeVertex(vertex);
HugeTask<V> result = HugeTask.fromVertex(vertex, false);
this.deleteTaskResultFromTx(id);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make the cleanup expectation explicit here. This path now deletes ~taskresult before deleting ~task, but TaskTransaction/TaskAndResultTransaction run with auto-commit and removeTaskVertex() commits in afterWrite(). If the second delete fails after this call succeeds, we can leave a live task whose result has already been removed. Could you either make the two removals failure-safe/atomic, or add a targeted test/explanation showing why this intermediate state is acceptable/recoverable?

this.tx().removeTaskVertex(vertex);
return result;
});
}
Expand Down Expand Up @@ -629,7 +630,7 @@ public void run() {
// 1. start task can be from schedule() & cronSchedule()
// 2. recheck the status of task, in case one same task
// called by both methods at same time;
HugeTask<Object> queryTask = task(this.task.id());
HugeTask<Object> queryTask = task(this.task.id(), false);
if (queryTask != null &&
!TaskStatus.NEW.equals(queryTask.status())) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.job.GremlinJob;
import org.apache.hugegraph.job.schema.SchemaJob;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.type.define.SerialEnum;
import org.apache.hugegraph.util.Blob;
import org.apache.hugegraph.util.E;
Expand Down Expand Up @@ -653,6 +654,11 @@ public Map<String, Object> asMap() {
}

public synchronized Map<String, Object> asMap(boolean withDetails) {
return this.asMap(withDetails, true);
}

public synchronized Map<String, Object> asMap(boolean withDetails,
boolean withResult) {
E.checkState(this.type != null, "Task type can't be null");
E.checkState(this.name != null, "Task name can't be null");

Expand Down Expand Up @@ -689,15 +695,45 @@ public synchronized Map<String, Object> asMap(boolean withDetails) {
if (this.input != null) {
map.put(Hidden.unHide(P.INPUT), this.input);
}
if (this.result != null) {
if (withResult && this.result != null) {
map.put(Hidden.unHide(P.RESULT), this.result);
}
}

return map;
}

synchronized HugeTask<V> copyWithoutResult() {
HugeTask<V> task = new HugeTask<>(this.id, this.parent, this.callable);
task.type = this.type;
task.name = this.name;
task.dependencies = this.dependencies == null ?
null : InsertionOrderUtil.newSet(this.dependencies);
task.description = this.description;
task.context = this.context;
task.create = this.create;
task.server = this.server;
task.load = this.load;
task.status = this.status;
task.progress = this.progress;
task.update = this.update;
task.retries = this.retries;
task.input = this.input;
task.result = null;
task.scheduler = this.scheduler;
return task;
}

public static <V> HugeTask<V> fromVertex(Vertex vertex) {
return fromVertex(vertex, true);
}

public static <V> HugeTask<V> fromVertex(Vertex vertex,
boolean withResult) {
if (!withResult && vertex instanceof HugeVertex) {
return fromHugeVertex((HugeVertex) vertex);
}

String callableName = vertex.value(P.CALLABLE);
TaskCallable<V> callable;
try {
Expand All @@ -710,11 +746,37 @@ public static <V> HugeTask<V> fromVertex(Vertex vertex) {
for (Iterator<VertexProperty<Object>> iter = vertex.properties();
iter.hasNext(); ) {
VertexProperty<Object> prop = iter.next();
if (!withResult && P.RESULT.equals(prop.key())) {
continue;
}
task.property(prop.key(), prop.value());
}
return task;
}

private static <V> HugeTask<V> fromHugeVertex(HugeVertex vertex) {
String callableName = getPropertyValue(vertex, P.CALLABLE);
TaskCallable<V> callable;
try {
callable = TaskCallable.fromClass(callableName);
} catch (Exception e) {
callable = TaskCallable.empty(e);
}

HugeTask<V> task = new HugeTask<>(vertex.id(), null, callable);
for (String property : P.METADATA_PROPERTIES) {
Object value = getPropertyValue(vertex, property);
if (value != null) {
task.property(property, value);
}
}
return task;
}

private static <V> V getPropertyValue(HugeVertex vertex, String property) {
return vertex.getPropertyValue(vertex.graph().propertyKey(property).id());
}

private static <V> Collector<V, ?, Set<V>> toOrderSet() {
return Collectors.toCollection(InsertionOrderUtil::newSet);
}
Expand Down Expand Up @@ -792,6 +854,11 @@ public static final class P {
public static final String DEPENDENCIES = "~task_dependencies";
public static final String SERVER = "~task_server";

private static final String[] METADATA_PROPERTIES = new String[]{
TYPE, NAME, CALLABLE, DESCRIPTION, CONTEXT, STATUS, PROGRESS,
CREATE, UPDATE, RETRIES, DEPENDENCIES, INPUT, SERVER
};

//public static final String PARENT = hide("parent");
//public static final String CHILDREN = hide("children");

Expand Down
Loading
Loading