1919package reactor .core .publisher ;
2020
2121import scouter .agent .Logger ;
22+ import scouter .agent .util .Tuple ;
2223
2324/**
2425 * @author Gun Lee ([email protected] ) on 2020/08/08 2526 */
2627public class ScouterOptimizableOperatorProxy {
2728
2829 public static final String EMPTY = "" ;
30+ public static final Tuple .StringLongPair EMPTYOBJ = new Tuple .StringLongPair ("" , 0 );
2931 public static boolean accessible = false ;
3032 public static boolean first = true ;
3133
32- public static String nameOnCheckpoint (Object candidate ) {
34+ public static Tuple . StringLongPair nameOnCheckpoint (Object candidate , int maxScanDepth ) {
3335 try {
3436 if (!accessible && first ) {
3537 try {
@@ -42,34 +44,47 @@ public static String nameOnCheckpoint(Object candidate) {
4244 first = false ;
4345 }
4446 if (!accessible ) {
45- return EMPTY ;
47+ return EMPTYOBJ ;
4648 }
4749
4850 if (candidate instanceof OptimizableOperator ) {
49- OptimizableOperator <?, ?> operator = ((OptimizableOperator <?, ?>) candidate ). nextOptimizableSource ( );
50- if (operator == null ) {
51- return EMPTY ;
51+ OptimizableOperator <?, ?> closeAssembly = findCloseAssembly ((OptimizableOperator <?, ?>) candidate , maxScanDepth );
52+ if (closeAssembly == null ) {
53+ return EMPTYOBJ ;
5254 }
53- if (operator instanceof MonoOnAssembly ) {
54- FluxOnAssembly .AssemblySnapshot snapshot = ((MonoOnAssembly ) operator ).stacktrace ;
55+ if (closeAssembly instanceof MonoOnAssembly ) {
56+ FluxOnAssembly .AssemblySnapshot snapshot = ((MonoOnAssembly ) closeAssembly ).stacktrace ;
5557 if (snapshot != null && snapshot .checkpointed ) {
56- return snapshot .cached ;
58+ return new Tuple . StringLongPair ( snapshot .cached , snapshot . hashCode ()) ;
5759 }
58- } else if (operator instanceof FluxOnAssembly ) {
59- FluxOnAssembly .AssemblySnapshot snapshot = ((FluxOnAssembly ) operator ).snapshotStack ;
60+ } else if (closeAssembly instanceof FluxOnAssembly ) {
61+ FluxOnAssembly .AssemblySnapshot snapshot = ((FluxOnAssembly ) closeAssembly ).snapshotStack ;
6062 if (snapshot != null && snapshot .checkpointed ) {
61- return snapshot .cached ;
63+ return new Tuple . StringLongPair ( snapshot .cached , snapshot . hashCode ()) ;
6264 }
6365 }
6466 }
65- return EMPTY ;
67+ return EMPTYOBJ ;
6668 } catch (Throwable e ) {
6769
68- return EMPTY ;
70+ return EMPTYOBJ ;
6971 }
7072 }
7173
72- public static void appendSources4Dump (Object candidate , StringBuilder builder ) {
74+ public static OptimizableOperator <?, ?> findCloseAssembly (OptimizableOperator <?, ?> candidate , int maxScanDepth ) {
75+ OptimizableOperator <?, ?> operator = candidate ;
76+ for (int i = 0 ; i < maxScanDepth ; i ++) {
77+ operator = operator .nextOptimizableSource ();
78+ if (operator == null ) {
79+ return null ;
80+ } else if (operator instanceof MonoOnAssembly || operator instanceof FluxOnAssembly ) {
81+ return operator ;
82+ }
83+ }
84+ return null ;
85+ }
86+
87+ public static void appendSources4Dump (Object candidate , StringBuilder builder , int maxScanDepth ) {
7388 try {
7489 if (!accessible && first ) {
7590 try {
@@ -86,14 +101,14 @@ public static void appendSources4Dump(Object candidate, StringBuilder builder) {
86101 }
87102
88103 if (candidate instanceof OptimizableOperator ) {
89- OptimizableOperator <?, ?> operator = ((OptimizableOperator <?, ?>) candidate ). nextOptimizableSource ( );
90- if (operator == null ) {
104+ OptimizableOperator <?, ?> closeAssembly = findCloseAssembly ((OptimizableOperator <?, ?>) candidate , maxScanDepth );
105+ if (closeAssembly == null ) {
91106 return ;
92107 }
93- String p1 = operator .toString ();
108+ String p1 = closeAssembly .toString ();
94109 builder .append (" (<-) " ).append (p1 );
95110 if (p1 .startsWith ("checkpoint" )) {
96- OptimizableOperator <?, ?> operator2 = operator .nextOptimizableSource ();
111+ OptimizableOperator <?, ?> operator2 = closeAssembly .nextOptimizableSource ();
97112 if (operator2 != null ) {
98113 builder .append (" (<-) " ).append (operator2 .toString ());
99114 }
0 commit comments