Skip to content

Commit d6cab25

Browse files
authored
[Fix-4263] Fix web socket bug in data studio (#4268)
1 parent 02cdefa commit d6cab25

File tree

3 files changed

+17
-77
lines changed

3 files changed

+17
-77
lines changed

dinky-admin/src/main/java/org/dinky/aop/UdfClassLoaderAspect.java

Lines changed: 0 additions & 68 deletions
This file was deleted.

dinky-admin/src/main/java/org/dinky/ws/handler/ScheduleMessageEventHandler.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.dinky.ws.handler;
2121

22+
import org.dinky.assertion.Asserts;
2223
import org.dinky.ws.GlobalWebSocketTopic;
2324
import org.dinky.ws.WsSendEvent;
2425

@@ -50,11 +51,13 @@ public void run() {
5051
@Override
5152
public void run() {
5253
Map<String, ?> data = autoMessageSend();
53-
WsSendEvent event = WsSendEvent.builder()
54-
.topic(topic)
55-
.paramsAndData(data)
56-
.build();
57-
applicationEventPublisher.publishEvent(event);
54+
if (Asserts.isNotNullMap(data)) {
55+
WsSendEvent event = WsSendEvent.builder()
56+
.topic(topic)
57+
.paramsAndData(data)
58+
.build();
59+
applicationEventPublisher.publishEvent(event);
60+
}
5861
}
5962
},
6063
0,

dinky-admin/src/main/java/org/dinky/ws/handler/TaskRunInstance.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.HashMap;
2626
import java.util.Map;
2727
import java.util.Set;
28+
import java.util.concurrent.ConcurrentSkipListSet;
2829
import java.util.concurrent.TimeUnit;
2930

3031
import org.springframework.stereotype.Service;
@@ -34,7 +35,8 @@
3435
@Slf4j
3536
@Service
3637
public class TaskRunInstance extends ScheduleMessageEventHandler {
37-
// todo 这里需要优化下,应该触发hook调用
38+
39+
private Set<Integer> runningJobIds = new ConcurrentSkipListSet<>();
3840

3941
@Override
4042
public Map<String, Object> firstSubscribe(Set<String> allParams) {
@@ -51,11 +53,14 @@ public GlobalWebSocketTopic getTopic() {
5153
@Override
5254
public Map<String, Object> autoMessageSend() {
5355
Set<Integer> currentMonitorTaskIds = FlinkJobThreadPool.getInstance().getCurrentMonitorTaskIds();
54-
Map<String, Object> result = new HashMap<>();
55-
if (!currentMonitorTaskIds.isEmpty()) {
56+
if (!runningJobIds.equals(currentMonitorTaskIds)) {
57+
runningJobIds.clear();
58+
runningJobIds.addAll(currentMonitorTaskIds);
59+
Map<String, Object> result = new HashMap<>();
5660
result.put("RunningTaskId", currentMonitorTaskIds);
61+
return result;
5762
}
58-
return result;
63+
return new HashMap<>();
5964
}
6065

6166
@Override

0 commit comments

Comments
 (0)