Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,13 @@ public Map<String, Object> list(@Context GraphManager manager,
limit = NO_LIMIT;
List<Id> idList = ids.stream().map(IdGenerator::of)
.collect(Collectors.toList());
iter = scheduler.tasks(idList);
iter = scheduler.tasks(idList, false);
} else {
if (status == null) {
iter = scheduler.tasks(null, limit, page);
iter = scheduler.tasks(null, limit, page, false);
} else {
iter = scheduler.tasks(parseStatus(status), limit, page);
iter = scheduler.tasks(parseStatus(status), limit, page,
false);
}
}

Expand Down Expand Up @@ -136,12 +137,17 @@ public Map<String, Object> get(@Context GraphManager manager,
@Parameter(description = "The graph name")
@PathParam("graph") String graph,
@Parameter(description = "The task id")
@PathParam("id") long id) {
@PathParam("id") long id,
@Parameter(description = "Whether to load task result")
@DefaultValue("true")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this introduces a new public query parameter, please update the PR checklist/docs status accordingly, or briefly explain why the generated OpenAPI annotation is sufficient and no user-facing docs are needed.

@QueryParam("with_result")
boolean withResult) {
LOG.debug("Graph [{}] get task: {}", graph, id);

TaskScheduler scheduler = graph(manager, graphSpace, graph)
.taskScheduler();
return scheduler.task(IdGenerator.of(id)).asMap();
return scheduler.task(IdGenerator.of(id), withResult)
.asMap(true, withResult);
}

@DELETE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,8 +1326,14 @@ public <V> void save(HugeTask<V> task) {

@Override
public <V> HugeTask<V> task(Id id) {
return this.task(id, true);
}

@Override
public <V> HugeTask<V> task(Id id, boolean withResult) {
return verifyTaskPermission(HugePermission.READ,
this.taskScheduler.task(id));
this.taskScheduler.task(id,
withResult));
}

@Override
Expand All @@ -1336,18 +1342,36 @@ public <V> Iterator<HugeTask<V>> tasks(List<Id> ids) {
this.taskScheduler.tasks(ids));
}

@Override
public <V> Iterator<HugeTask<V>> tasks(List<Id> ids,
boolean withResult) {
return verifyTaskPermission(HugePermission.READ,
this.taskScheduler.tasks(ids,
withResult));
}

@Override
public <V> Iterator<HugeTask<V>> tasks(TaskStatus status,
long limit, String page) {
Iterator<HugeTask<V>> tasks = this.taskScheduler.tasks(status,
limit, page);
limit,
page);
return verifyTaskPermission(HugePermission.READ, tasks);
}

@Override
public <V> Iterator<HugeTask<V>> tasks(TaskStatus status,
long limit, String page,
boolean withResult) {
Iterator<HugeTask<V>> tasks = this.taskScheduler.tasks(
status, limit, page, withResult);
return verifyTaskPermission(HugePermission.READ, tasks);
}

@Override
public <V> HugeTask<V> delete(Id id, boolean force) {
verifyTaskPermission(HugePermission.DELETE,
this.taskScheduler.task(id));
this.taskScheduler.task(id, false));
return this.taskScheduler.delete(id, force);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,14 @@ protected <V> HugeTask<V> deleteFromDB(Id id) {
Iterator<Vertex> vertices = this.tx().queryTaskInfos(id);
HugeVertex vertex = (HugeVertex) QueryResults.one(vertices);
if (vertex == null) {
this.deleteTaskResultFromTx(id);
return null;
}
HugeTask<V> result = HugeTask.fromVertex(vertex);
this.tx().removeVertex(vertex);
HugeTask<V> result = HugeTask.fromVertex(vertex, false);
// Keep the task vertex as a retryable tombstone until its result
// vertex is removed; cronSchedule() can rediscover DELETING tasks.
this.deleteTaskResultFromTx(id);
this.tx().removeTaskVertex(vertex);
return result;
});
}
Expand All @@ -322,6 +326,12 @@ public <V> HugeTask<V> delete(Id id, boolean force) {
this.updateStatus(id, null, TaskStatus.DELETING);
return null;
} else {
HugeTask<?> task = this.taskWithoutResult(id);
if (task != null && task.status() != TaskStatus.DELETING) {
initTaskParams(task);
task.overwriteStatus(TaskStatus.DELETING);
this.save(task);
}
return this.deleteFromDB(id);
}
}
Expand Down Expand Up @@ -587,7 +597,7 @@ private void unlockTask(String taskId, LockResult lockResult) {
}
}

private boolean isLockedTask(String taskId) {
protected boolean isLockedTask(String taskId) {
return MetaManager.instance().isLockedTask(graphSpace,
graphName, taskId);
}
Expand Down Expand Up @@ -629,7 +639,7 @@ public void run() {
// 1. start task can be from schedule() & cronSchedule()
// 2. recheck the status of task, in case one same task
// called by both methods at same time;
HugeTask<Object> queryTask = task(this.task.id());
HugeTask<Object> queryTask = task(this.task.id(), false);
if (queryTask != null &&
!TaskStatus.NEW.equals(queryTask.status())) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.job.GremlinJob;
import org.apache.hugegraph.job.schema.SchemaJob;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.type.define.SerialEnum;
import org.apache.hugegraph.util.Blob;
import org.apache.hugegraph.util.E;
Expand Down Expand Up @@ -653,6 +654,11 @@ public Map<String, Object> asMap() {
}

public synchronized Map<String, Object> asMap(boolean withDetails) {
return this.asMap(withDetails, true);
}

public synchronized Map<String, Object> asMap(boolean withDetails,
boolean withResult) {
E.checkState(this.type != null, "Task type can't be null");
E.checkState(this.name != null, "Task name can't be null");

Expand Down Expand Up @@ -689,15 +695,45 @@ public synchronized Map<String, Object> asMap(boolean withDetails) {
if (this.input != null) {
map.put(Hidden.unHide(P.INPUT), this.input);
}
if (this.result != null) {
if (withResult && this.result != null) {
map.put(Hidden.unHide(P.RESULT), this.result);
}
}

return map;
}

synchronized HugeTask<V> copyWithoutResult() {
HugeTask<V> task = new HugeTask<>(this.id, this.parent, this.callable);
task.type = this.type;
task.name = this.name;
task.dependencies = this.dependencies == null ?
null : InsertionOrderUtil.newSet(this.dependencies);
task.description = this.description;
task.context = this.context;
task.create = this.create;
task.server = this.server;
task.load = this.load;
task.status = this.status;
task.progress = this.progress;
task.update = this.update;
task.retries = this.retries;
task.input = this.input;
task.result = null;
task.scheduler = this.scheduler;
return task;
}

public static <V> HugeTask<V> fromVertex(Vertex vertex) {
return fromVertex(vertex, true);
}

public static <V> HugeTask<V> fromVertex(Vertex vertex,
boolean withResult) {
if (!withResult && vertex instanceof HugeVertex) {
return fromHugeVertex((HugeVertex) vertex);
}

String callableName = vertex.value(P.CALLABLE);
TaskCallable<V> callable;
try {
Expand All @@ -710,11 +746,37 @@ public static <V> HugeTask<V> fromVertex(Vertex vertex) {
for (Iterator<VertexProperty<Object>> iter = vertex.properties();
iter.hasNext(); ) {
VertexProperty<Object> prop = iter.next();
if (!withResult && P.RESULT.equals(prop.key())) {
continue;
}
task.property(prop.key(), prop.value());
}
return task;
}

private static <V> HugeTask<V> fromHugeVertex(HugeVertex vertex) {
String callableName = getPropertyValue(vertex, P.CALLABLE);
TaskCallable<V> callable;
try {
callable = TaskCallable.fromClass(callableName);
} catch (Exception e) {
callable = TaskCallable.empty(e);
}

HugeTask<V> task = new HugeTask<>(vertex.id(), null, callable);
for (String property : P.METADATA_PROPERTIES) {
Object value = getPropertyValue(vertex, property);
if (value != null) {
task.property(property, value);
}
}
return task;
}

private static <V> V getPropertyValue(HugeVertex vertex, String property) {
return vertex.getPropertyValue(vertex.graph().propertyKey(property).id());
}

private static <V> Collector<V, ?, Set<V>> toOrderSet() {
return Collectors.toCollection(InsertionOrderUtil::newSet);
}
Expand Down Expand Up @@ -792,6 +854,11 @@ public static final class P {
public static final String DEPENDENCIES = "~task_dependencies";
public static final String SERVER = "~task_server";

private static final String[] METADATA_PROPERTIES = new String[]{
TYPE, NAME, CALLABLE, DESCRIPTION, CONTEXT, STATUS, PROGRESS,
CREATE, UPDATE, RETRIES, DEPENDENCIES, INPUT, SERVER
};

//public static final String PARENT = hide("parent");
//public static final String CHILDREN = hide("children");

Expand Down
Loading
Loading