mirror of
https://github.com/slhaf/Partner.git
synced 2026-05-12 16:53:04 +08:00
fix(BuiltinCommand): expire finished command sessions by ttl
This commit is contained in:
@@ -8,6 +8,7 @@ import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry;
|
|||||||
import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec;
|
import work.slhaf.partner.core.action.runner.policy.WrappedLaunchSpec;
|
||||||
|
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
|
import java.time.Duration;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
@@ -21,6 +22,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
|
|||||||
private static final int DEFAULT_READ_LIMIT = 4096;
|
private static final int DEFAULT_READ_LIMIT = 4096;
|
||||||
private static final int SUMMARY_MAX_LINES = 5;
|
private static final int SUMMARY_MAX_LINES = 5;
|
||||||
private static final int SUMMARY_MAX_LENGTH = 2048;
|
private static final int SUMMARY_MAX_LENGTH = 2048;
|
||||||
|
private static final Duration COMMAND_SESSION_TTL = Duration.ofMinutes(10);
|
||||||
|
|
||||||
private final Set<String> basicTags = Set.of("Builtin MetaAction", "System Command Tool");
|
private final Set<String> basicTags = Set.of("Builtin MetaAction", "System Command Tool");
|
||||||
|
|
||||||
@@ -93,6 +95,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
|
|||||||
JSONObject.of("executionId", "Command execution session id.")
|
JSONObject.of("executionId", "Command execution session id.")
|
||||||
);
|
);
|
||||||
Function<Map<String, Object>, String> invoker = params -> {
|
Function<Map<String, Object>, String> invoker = params -> {
|
||||||
|
cleanupExpiredHandles();
|
||||||
String desc = BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "desc");
|
String desc = BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "desc");
|
||||||
List<String> commands = requireCommandArguments(params);
|
List<String> commands = requireCommandArguments(params);
|
||||||
CommandExecutionService.CommandSession session = commandExecutionService.createSessionTask(wrapCommands(commands));
|
CommandExecutionService.CommandSession session = commandExecutionService.createSessionTask(wrapCommands(commands));
|
||||||
@@ -145,6 +148,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
|
|||||||
)
|
)
|
||||||
);
|
);
|
||||||
Function<Map<String, Object>, String> invoker = params -> {
|
Function<Map<String, Object>, String> invoker = params -> {
|
||||||
|
cleanupExpiredHandles();
|
||||||
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
|
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
|
||||||
return JSONObject.of(
|
return JSONObject.of(
|
||||||
"executionId", handle.executionId,
|
"executionId", handle.executionId,
|
||||||
@@ -196,6 +200,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
|
|||||||
)
|
)
|
||||||
);
|
);
|
||||||
Function<Map<String, Object>, String> invoker = params -> {
|
Function<Map<String, Object>, String> invoker = params -> {
|
||||||
|
cleanupExpiredHandles();
|
||||||
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
|
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
|
||||||
String stream = BuiltinActionRegistry.BuiltinActionDefinition.optionalString(params, "stream", "stdout");
|
String stream = BuiltinActionRegistry.BuiltinActionDefinition.optionalString(params, "stream", "stdout");
|
||||||
if (!"stdout".equals(stream) && !"stderr".equals(stream)) {
|
if (!"stdout".equals(stream) && !"stderr".equals(stream)) {
|
||||||
@@ -256,6 +261,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
|
|||||||
)
|
)
|
||||||
);
|
);
|
||||||
Function<Map<String, Object>, String> invoker = params -> {
|
Function<Map<String, Object>, String> invoker = params -> {
|
||||||
|
cleanupExpiredHandles();
|
||||||
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
|
CommandHandle handle = requireHandle(BuiltinActionRegistry.BuiltinActionDefinition.requireString(params, "id"));
|
||||||
if (handle.process.isAlive()) {
|
if (handle.process.isAlive()) {
|
||||||
handle.process.destroy();
|
handle.process.destroy();
|
||||||
@@ -308,6 +314,7 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
|
|||||||
)
|
)
|
||||||
);
|
);
|
||||||
Function<Map<String, Object>, String> invoker = params -> {
|
Function<Map<String, Object>, String> invoker = params -> {
|
||||||
|
cleanupExpiredHandles();
|
||||||
List<JSONObject> items = commandHandles.values().stream()
|
List<JSONObject> items = commandHandles.values().stream()
|
||||||
.sorted(Comparator.comparing(handle -> handle.startAt))
|
.sorted(Comparator.comparing(handle -> handle.startAt))
|
||||||
.map(handle -> JSONObject.of(
|
.map(handle -> JSONObject.of(
|
||||||
@@ -338,6 +345,16 @@ class BuiltinCommandActionProvider implements BuiltinActionProvider {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void cleanupExpiredHandles() {
|
||||||
|
Instant now = Instant.now();
|
||||||
|
commandHandles.entrySet().removeIf(entry -> isExpired(entry.getValue(), now));
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isExpired(CommandHandle handle, Instant now) {
|
||||||
|
Instant exitTime = handle.exitAt;
|
||||||
|
return exitTime != null && !exitTime.plus(COMMAND_SESSION_TTL).isAfter(now);
|
||||||
|
}
|
||||||
|
|
||||||
private WrappedLaunchSpec wrapCommands(List<String> commands) {
|
private WrappedLaunchSpec wrapCommands(List<String> commands) {
|
||||||
return ExecutionPolicyRegistry.INSTANCE.prepare(commands);
|
return ExecutionPolicyRegistry.INSTANCE.prepare(commands);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,139 @@
|
|||||||
|
package work.slhaf.partner.module.action.builtin;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson2.JSONArray;
|
||||||
|
import com.alibaba.fastjson2.JSONObject;
|
||||||
|
import org.junit.jupiter.api.AfterAll;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import work.slhaf.partner.core.action.runner.policy.ExecutionPolicy;
|
||||||
|
import work.slhaf.partner.core.action.runner.policy.ExecutionPolicyRegistry;
|
||||||
|
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
class BuiltinCommandActionProviderTtlTest {
|
||||||
|
|
||||||
|
private static String originalUserHome;
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
static void prepareTestHome() throws IOException {
|
||||||
|
originalUserHome = System.getProperty("user.home");
|
||||||
|
Path tempHome = Files.createTempDirectory("partner-test-home");
|
||||||
|
System.setProperty("user.home", tempHome.toString());
|
||||||
|
ExecutionPolicyRegistry.INSTANCE.updatePolicy(new ExecutionPolicy(
|
||||||
|
ExecutionPolicy.Mode.DIRECT,
|
||||||
|
"direct",
|
||||||
|
ExecutionPolicy.Network.ENABLE,
|
||||||
|
true,
|
||||||
|
Map.of(),
|
||||||
|
null,
|
||||||
|
Set.of(),
|
||||||
|
Set.of()
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterAll
|
||||||
|
static void restoreUserHome() {
|
||||||
|
if (originalUserHome != null) {
|
||||||
|
System.setProperty("user.home", originalUserHome);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testOverviewRemovesExpiredFinishedSessions() throws Exception {
|
||||||
|
BuiltinCommandActionProvider provider = new BuiltinCommandActionProvider();
|
||||||
|
List<BuiltinActionRegistry.BuiltinActionDefinition> definitions = provider.provideBuiltinActions();
|
||||||
|
BuiltinActionRegistry.BuiltinActionDefinition start = requireDefinition(definitions, "builtin::command::start");
|
||||||
|
BuiltinActionRegistry.BuiltinActionDefinition overview = requireDefinition(definitions, "builtin::command::overview");
|
||||||
|
BuiltinActionRegistry.BuiltinActionDefinition inspect = requireDefinition(definitions, "builtin::command::inspect");
|
||||||
|
|
||||||
|
String startResult = start.invoker().apply(Map.of(
|
||||||
|
"desc", "ttl-session",
|
||||||
|
"arg", "sh",
|
||||||
|
"arg1", "-lc",
|
||||||
|
"arg2", "printf 'done'"
|
||||||
|
));
|
||||||
|
String executionId = JSONObject.parseObject(startResult).getString("executionId");
|
||||||
|
waitForInspectExit(inspect, executionId);
|
||||||
|
|
||||||
|
expireHandle(provider, executionId);
|
||||||
|
|
||||||
|
JSONObject overviewResult = JSONObject.parseObject(overview.invoker().apply(Map.of()));
|
||||||
|
JSONArray result = overviewResult.getJSONArray("result");
|
||||||
|
Assertions.assertTrue(result.stream().map(item -> (JSONObject) item)
|
||||||
|
.noneMatch(item -> executionId.equals(item.getString("executionId"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void testInspectRejectsExpiredFinishedSession() throws Exception {
|
||||||
|
BuiltinCommandActionProvider provider = new BuiltinCommandActionProvider();
|
||||||
|
List<BuiltinActionRegistry.BuiltinActionDefinition> definitions = provider.provideBuiltinActions();
|
||||||
|
BuiltinActionRegistry.BuiltinActionDefinition start = requireDefinition(definitions, "builtin::command::start");
|
||||||
|
BuiltinActionRegistry.BuiltinActionDefinition inspect = requireDefinition(definitions, "builtin::command::inspect");
|
||||||
|
|
||||||
|
String startResult = start.invoker().apply(Map.of(
|
||||||
|
"desc", "ttl-session-inspect",
|
||||||
|
"arg", "sh",
|
||||||
|
"arg1", "-lc",
|
||||||
|
"arg2", "printf 'done'"
|
||||||
|
));
|
||||||
|
String executionId = JSONObject.parseObject(startResult).getString("executionId");
|
||||||
|
waitForInspectExit(inspect, executionId);
|
||||||
|
|
||||||
|
expireHandle(provider, executionId);
|
||||||
|
|
||||||
|
Assertions.assertThrows(IllegalArgumentException.class, () -> inspect.invoker().apply(Map.of(
|
||||||
|
"id", executionId
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void expireHandle(BuiltinCommandActionProvider provider, String executionId) throws Exception {
|
||||||
|
Field handlesField = BuiltinCommandActionProvider.class.getDeclaredField("commandHandles");
|
||||||
|
handlesField.setAccessible(true);
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
ConcurrentHashMap<String, Object> handles = (ConcurrentHashMap<String, Object>) handlesField.get(provider);
|
||||||
|
Object handle = handles.get(executionId);
|
||||||
|
Assertions.assertNotNull(handle);
|
||||||
|
|
||||||
|
Field exitCodeField = handle.getClass().getDeclaredField("exitCode");
|
||||||
|
exitCodeField.setAccessible(true);
|
||||||
|
exitCodeField.set(handle, 0);
|
||||||
|
|
||||||
|
Field exitAtField = handle.getClass().getDeclaredField("exitAt");
|
||||||
|
exitAtField.setAccessible(true);
|
||||||
|
exitAtField.set(handle, Instant.now().minus(11, ChronoUnit.MINUTES));
|
||||||
|
}
|
||||||
|
|
||||||
|
private BuiltinActionRegistry.BuiltinActionDefinition requireDefinition(
|
||||||
|
List<BuiltinActionRegistry.BuiltinActionDefinition> definitions,
|
||||||
|
String actionKey
|
||||||
|
) {
|
||||||
|
return definitions.stream()
|
||||||
|
.filter(definition -> actionKey.equals(definition.actionKey()))
|
||||||
|
.findFirst()
|
||||||
|
.orElseThrow(() -> new AssertionError("definition not found: " + actionKey));
|
||||||
|
}
|
||||||
|
|
||||||
|
private JSONObject waitForInspectExit(BuiltinActionRegistry.BuiltinActionDefinition inspectDefinition, String executionId) throws Exception {
|
||||||
|
long deadline = System.currentTimeMillis() + 3000;
|
||||||
|
while (System.currentTimeMillis() < deadline) {
|
||||||
|
JSONObject inspect = JSONObject.parseObject(inspectDefinition.invoker().apply(Map.of(
|
||||||
|
"id", executionId
|
||||||
|
)));
|
||||||
|
if (inspect.get("exitCode") != null) {
|
||||||
|
return inspect;
|
||||||
|
}
|
||||||
|
Thread.sleep(20);
|
||||||
|
}
|
||||||
|
throw new AssertionError("command session did not exit in time");
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user