diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java index 743cbe6156..ee6c296bcf 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkSavepointServiceImpl.java @@ -140,24 +140,38 @@ public String getSavePointPath(FlinkApplication appParam) throws Exception { // 1) properties have the highest priority, read the properties are set: -Dstate.savepoints.dir String savepointPath = getSavepointFromDynamicProps(application.getDynamicProperties()); - if (StringUtils.isNotBlank(savepointPath)) { - return savepointPath; - } // Application conf configuration has the second priority. If it is a streampark|flinksql type // task, see if Application conf is configured when the task is defined, if checkpoints are // configured // and enabled, read `state.savepoints.dir` - savepointPath = getSavepointFromConfig(application); - if (StringUtils.isNotBlank(savepointPath)) { - return savepointPath; + if (StringUtils.isBlank(savepointPath)) { + savepointPath = getSavepointFromConfig(application); } // 3) If the savepoint is not obtained above, try to obtain the savepoint path according to the // deployment type (remote|on yarn) // 3.1) At the remote mode, request the flink webui interface to get the savepoint path // 3.2) At the yarn or k8s mode, then read the savepoint in flink-conf.yml in the bound flink - return getSavepointFromDeployLayer(application); + if (StringUtils.isBlank(savepointPath)) { + savepointPath = getSavepointFromDeployLayer(application); + } + + // 4) Supporting variables + if (StringUtils.isNotBlank(savepointPath)) { + savepointPath = + processPath( + savepointPath, application.getJobName(), application.getId()); + } + return savepointPath; + } + + private String processPath(String path, String jobName, Long jobId) { + if (StringUtils.isNotBlank(path)) { + return path.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", jobName) + .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", jobId.toString()); + } + return path; } @Override