diff --git a/conf/globalConfig/longJob.xml b/conf/globalConfig/longJob.xml index 9a89c741a37..bb3611190b2 100644 --- a/conf/globalConfig/longJob.xml +++ b/conf/globalConfig/longJob.xml @@ -7,4 +7,18 @@ 259200 java.lang.Long + + longJob + progress.notification.enabled + enable long job progress notification to SNS for UI + true + java.lang.Boolean + + + longJob + progress.notification.timeliness + batch delay in seconds for long job progress notification (0 = no delay) + 1 + java.lang.Integer + \ No newline at end of file diff --git a/core/src/main/java/org/zstack/core/progress/ProgressReportService.java b/core/src/main/java/org/zstack/core/progress/ProgressReportService.java index 0ec682d80eb..edb339fbb0e 100755 --- a/core/src/main/java/org/zstack/core/progress/ProgressReportService.java +++ b/core/src/main/java/org/zstack/core/progress/ProgressReportService.java @@ -6,6 +6,7 @@ import org.springframework.transaction.annotation.Transactional; import org.zstack.core.Platform; import org.zstack.core.cloudbus.CloudBus; +import org.zstack.core.componentloader.PluginRegistry; import org.zstack.core.cloudbus.MessageSafe; import org.zstack.core.config.GlobalConfig; import org.zstack.core.config.GlobalConfigUpdateExtensionPoint; @@ -376,7 +377,7 @@ public static void createSubTaskProgress(String fmt, Object... args) { vo.setManagementUuid(Platform.getManagementServerId()); vo.setTaskName(ThreadContext.get(Constants.THREAD_CONTEXT_TASK_NAME)); - Platform.getComponentLoader().getComponent(DatabaseFacade.class).persist(vo); + persistProgress(vo, TaskType.Task); // use content as the subtask name ThreadContext.put(Constants.THREAD_CONTEXT_TASK_NAME, vo.getContent()); @@ -411,6 +412,14 @@ private static void persistProgress(TaskProgressVO vo, TaskType type) { } Platform.getComponentLoader().getComponent(DatabaseFacade.class).persist(vo); + + for (ProgressUpdateExtensionPoint ext : Platform.getComponentLoader().getComponent(PluginRegistry.class).getExtensionList(ProgressUpdateExtensionPoint.class)) { + try { + ext.afterProgressPersisted(vo); + } catch (Throwable t) { + logger.warn("ProgressUpdateExtensionPoint.afterProgressPersisted failed", t); + } + } } private static void taskProgress(TaskType type, String fmt, Object... args) { diff --git a/header/src/main/java/org/zstack/header/core/progress/ProgressUpdateExtensionPoint.java b/header/src/main/java/org/zstack/header/core/progress/ProgressUpdateExtensionPoint.java new file mode 100644 index 00000000000..dc9b9121af2 --- /dev/null +++ b/header/src/main/java/org/zstack/header/core/progress/ProgressUpdateExtensionPoint.java @@ -0,0 +1,9 @@ +package org.zstack.header.core.progress; + +/** + * Extension point invoked after a task progress record is persisted. + * Used to trigger downstream notifications (e.g. Long Job progress to SNS) without coupling to progress storage. + */ +public interface ProgressUpdateExtensionPoint { + void afterProgressPersisted(TaskProgressVO vo); +} diff --git a/header/src/main/java/org/zstack/header/longjob/LongJobProgressNotificationMessage.java b/header/src/main/java/org/zstack/header/longjob/LongJobProgressNotificationMessage.java new file mode 100644 index 00000000000..802f3b2b3fa --- /dev/null +++ b/header/src/main/java/org/zstack/header/longjob/LongJobProgressNotificationMessage.java @@ -0,0 +1,105 @@ +package org.zstack.header.longjob; + +import org.zstack.header.core.progress.TaskProgressInventory; +import org.zstack.header.core.progress.TaskProgressVO; + +import java.io.Serializable; + +/** + * Notification message for Long Job state change and progress update. + * Reuses LongJobInventory and TaskProgressInventory for UI consistency. + * + *

Webhook/BFF contract: the payload should include a 0-100 progress number. + * BFF uses {@link #getProgress()} (Integer) directly; full taskProgress is optional for future use. + */ +public class LongJobProgressNotificationMessage implements Serializable { + public enum EventType { + STATE_CHANGED, + PROGRESS_UPDATED + } + + private LongJobInventory longJob; + /** Latest progress 0-100, derived from type=Progress latest record content. For BFF webhook. */ + private Integer progress; + /** Full progress detail; optional, BFF current version only needs {@link #getProgress()}. */ + private TaskProgressInventory taskProgress; + private EventType eventType; + private Long timestamp; + + public LongJobInventory getLongJob() { + return longJob; + } + + public void setLongJob(LongJobInventory longJob) { + this.longJob = longJob; + } + + /** Progress 0-100 for BFF webhook; from latest type=Progress content. */ + public Integer getProgress() { + return progress; + } + + public void setProgress(Integer progress) { + this.progress = progress; + } + + /** Full task progress inventory; optional for BFF. */ + public TaskProgressInventory getTaskProgress() { + return taskProgress; + } + + public void setTaskProgress(TaskProgressInventory taskProgress) { + this.taskProgress = taskProgress; + } + + public EventType getEventType() { + return eventType; + } + + public void setEventType(EventType eventType) { + this.eventType = eventType; + } + + public Long getTimestamp() { + return timestamp; + } + + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + + public static LongJobProgressNotificationMessage stateChanged(LongJobVO vo) { + LongJobProgressNotificationMessage msg = new LongJobProgressNotificationMessage(); + msg.longJob = LongJobInventory.valueOf(vo); + msg.eventType = EventType.STATE_CHANGED; + msg.timestamp = System.currentTimeMillis(); + return msg; + } + + public static LongJobProgressNotificationMessage progressUpdated(LongJobVO vo, TaskProgressVO progressVO) { + LongJobProgressNotificationMessage msg = new LongJobProgressNotificationMessage(); + msg.longJob = LongJobInventory.valueOf(vo); + int percent = parseProgressContent(progressVO.getContent()); + msg.progress = percent; + TaskProgressInventory inv = new TaskProgressInventory(progressVO); + if (progressVO.getContent() != null) { + inv.setContent(progressVO.getContent()); + } + msg.taskProgress = inv; + msg.eventType = EventType.PROGRESS_UPDATED; + msg.timestamp = System.currentTimeMillis(); + return msg; + } + + private static int parseProgressContent(String content) { + if (content == null || content.isEmpty()) { + return 0; + } + try { + int v = Integer.parseInt(content.trim()); + return Math.min(100, Math.max(0, v)); + } catch (NumberFormatException e) { + return 0; + } + } +} diff --git a/longjob/src/main/java/org/zstack/longjob/LongJobGlobalConfig.java b/longjob/src/main/java/org/zstack/longjob/LongJobGlobalConfig.java index 5d8d1af175e..58635bf08ca 100644 --- a/longjob/src/main/java/org/zstack/longjob/LongJobGlobalConfig.java +++ b/longjob/src/main/java/org/zstack/longjob/LongJobGlobalConfig.java @@ -13,4 +13,7 @@ public class LongJobGlobalConfig { @GlobalConfigValidation public static GlobalConfig LONG_JOB_DEFAULT_TIMEOUT = new GlobalConfig(CATEGORY, "longJob.api.timeout"); + + public static GlobalConfig LONG_JOB_PROGRESS_NOTIFICATION_ENABLED = new GlobalConfig(CATEGORY, "progress.notification.enabled"); + public static GlobalConfig LONG_JOB_NOTIFICATION_TIMELINESS = new GlobalConfig(CATEGORY, "progress.notification.timeliness"); } diff --git a/longjob/src/main/java/org/zstack/longjob/sns/LongJobTaskTracker.java b/longjob/src/main/java/org/zstack/longjob/sns/LongJobTaskTracker.java new file mode 100644 index 00000000000..c6155ea4faa --- /dev/null +++ b/longjob/src/main/java/org/zstack/longjob/sns/LongJobTaskTracker.java @@ -0,0 +1,36 @@ +package org.zstack.longjob.sns; + +import org.zstack.core.progress.TaskTracker; +import org.zstack.header.longjob.LongJobVO; + +/** + * Task tracker for Long Job state change and progress update notifications. + * Used by LongJobProgressNotification to receive tasks and publish to SNS. + */ +public class LongJobTaskTracker extends TaskTracker { + public static final String TASK_NAME = "longjob-progress"; + + public static final String PARAM_STATE = "state"; + public static final String PARAM_PROGRESS = "progress"; + public static final String PARAM_LONG_JOB_UUID = "longJobUuid"; + public static final String PARAM_JOB_NAME = "jobName"; + + public enum EventType { + STATE_CHANGED, + PROGRESS_UPDATED + } + + public LongJobTaskTracker(String resourceUuid) { + super(resourceUuid); + } + + @Override + protected String getResourceType() { + return LongJobVO.class.getSimpleName(); + } + + @Override + protected String getTaskName() { + return TASK_NAME; + } +}