1616
1717package com .linecorp .decaton .centraldogma ;
1818
19+ import java .io .IOException ;
20+ import java .io .UncheckedIOException ;
1921import java .util .List ;
2022import java .util .Optional ;
2123import java .util .concurrent .ConcurrentHashMap ;
3840import com .linecorp .centraldogma .common .Change ;
3941import com .linecorp .centraldogma .common .ChangeConflictException ;
4042import com .linecorp .centraldogma .common .PathPattern ;
41- import com .linecorp .centraldogma .common .Query ;
4243import com .linecorp .centraldogma .common .Revision ;
44+ import com .linecorp .decaton .centraldogma .internal .DecatonPropertyFileFormat ;
4345import com .linecorp .decaton .processor .runtime .DynamicProperty ;
4446import com .linecorp .decaton .processor .runtime .ProcessorProperties ;
4547import com .linecorp .decaton .processor .runtime .Property ;
4850
4951/**
5052 * A {@link PropertySupplier} implementation with Central Dogma backend.
51- *
53+ * <p>
5254 * This implementation maps property's {@link PropertyDefinition#name()} as the absolute field name in the file
5355 * on Central Dogma.
54- *
56+ * <p>
57+ * You can use json or yaml format for the property file.
58+ * You cannot nest keys in both formats. Keys must be top-level fields.
59+ * <p>
5560 * An example JSON format would be look like:
56- * {@code
61+ * <pre> {@code
5762 * {
5863 * "decaton.partition.concurrency": 10,
5964 * "decaton.ignore.keys": [
6267 * ],
6368 * "decaton.processing.rate.per.partition": 50
6469 * }
65- * }
70+ * }</pre>
71+ *
72+ * An example YAML format would be look like:
73+ * <pre>{@code
74+ * decaton.partition.concurrency: 10
75+ * decaton.ignore.keys:
76+ * - "123456"
77+ * - "79797979"
78+ * decaton.processing.rate.per.partition: 50
79+ * }</pre>
6680 */
6781public class CentralDogmaPropertySupplier implements PropertySupplier , AutoCloseable {
6882 private static final Logger logger = LoggerFactory .getLogger (CentralDogmaPropertySupplier .class );
@@ -73,7 +87,6 @@ public class CentralDogmaPropertySupplier implements PropertySupplier, AutoClose
7387 private static final ObjectMapper objectMapper = new ObjectMapper ();
7488
7589 private final Watcher <JsonNode > rootWatcher ;
76-
7790 private final ConcurrentMap <String , DynamicProperty <?>> cachedProperties = new ConcurrentHashMap <>();
7891
7992 /**
@@ -94,7 +107,9 @@ public CentralDogmaPropertySupplier(CentralDogma centralDogma, String projectNam
94107 * @param fileName the name of the file containing properties as top-level fields.
95108 */
96109 public CentralDogmaPropertySupplier (CentralDogmaRepository centralDogmaRepository , String fileName ) {
97- rootWatcher = centralDogmaRepository .watcher (Query .ofJsonPath (fileName )).start ();
110+ DecatonPropertyFileFormat configFile = DecatonPropertyFileFormat .of (fileName );
111+ this .rootWatcher = configFile .createWatcher (centralDogmaRepository , fileName );
112+
98113 try {
99114 rootWatcher .awaitInitialValue (INITIAL_VALUE_TIMEOUT_SECS , TimeUnit .SECONDS );
100115 } catch (InterruptedException e ) {
@@ -103,6 +118,20 @@ public CentralDogmaPropertySupplier(CentralDogmaRepository centralDogmaRepositor
103118 } catch (TimeoutException e ) {
104119 throw new RuntimeException (e );
105120 }
121+
122+ rootWatcher .watch (node -> {
123+ for (ConcurrentHashMap .Entry <String , DynamicProperty <?>> cachedProperty : cachedProperties .entrySet ()) {
124+ if (node .has (cachedProperty .getKey ())) {
125+ try {
126+ setValue (cachedProperty .getValue (), node .get (cachedProperty .getKey ()));
127+ } catch (Exception e ) {
128+ // Catching Exception instead of RuntimeException, since
129+ // Kotlin-implemented DynamicProperty would throw checked exceptions
130+ logger .warn ("Failed to set value updatedfrom CentralDogma for {}" , cachedProperty .getKey (), e );
131+ }
132+ }
133+ }
134+ });
106135 }
107136
108137 // visible for testing
@@ -129,25 +158,13 @@ public <T> Optional<Property<T>> getProperty(PropertyDefinition<T> definition) {
129158 // for most use cases though, this cache is only filled/read once.
130159 final DynamicProperty <?> cachedProp = cachedProperties .computeIfAbsent (definition .name (), name -> {
131160 DynamicProperty <T > prop = new DynamicProperty <>(definition );
132- Watcher <JsonNode > child = rootWatcher .newChild (jsonNode -> jsonNode .path (definition .name ()));
133- child .watch (node -> {
134- try {
135- setValue (prop , node );
136- } catch (Exception e ) {
137- // Catching Exception instead of RuntimeException, since
138- // Kotlin-implemented DynamicProperty would throw checked exceptions
139- logger .warn ("Failed to set value updated from CentralDogma for {}" , definition .name (), e );
140- }
141- });
142161 try {
143- JsonNode node = child .initialValueFuture ().join ().value (); //doesn't fail since it's a child watcher
144- setValue (prop , node );
162+ setValue (prop , rootWatcher .latestValue ().get (definition .name ()));
145163 } catch (Exception e ) {
146164 // Catching Exception instead of RuntimeException, since
147165 // Kotlin-implemented DynamicProperty would throw checked exceptions
148166 logger .warn ("Failed to set initial value from CentralDogma for {}" , definition .name (), e );
149167 }
150-
151168 return prop ;
152169 });
153170
@@ -175,8 +192,7 @@ public void close() {
175192 public static CentralDogmaPropertySupplier register (CentralDogma centralDogma , String project ,
176193 String repository , String filename ) {
177194 final CentralDogmaRepository centralDogmaRepository = centralDogma .forRepo (project , repository );
178- createPropertyFile (centralDogmaRepository , filename , ProcessorProperties .defaultProperties ());
179- return new CentralDogmaPropertySupplier (centralDogmaRepository , filename );
195+ return register (centralDogmaRepository , filename );
180196 }
181197
182198 /**
@@ -215,6 +231,7 @@ public static CentralDogmaPropertySupplier register(CentralDogma centralDogma, S
215231 public static CentralDogmaPropertySupplier register (CentralDogmaRepository centralDogmaRepository ,
216232 String filename ,
217233 PropertySupplier supplier ) {
234+
218235 List <Property <?>> properties = ProcessorProperties .defaultProperties ().stream ().map (defaultProperty -> {
219236 Optional <? extends Property <?>> prop = supplier .getProperty (defaultProperty .definition ());
220237 if (prop .isPresent ()) {
@@ -236,12 +253,18 @@ private static void createPropertyFile(CentralDogmaRepository centralDogmaReposi
236253 long remainingTime = remainingTime (PROPERTY_CREATION_TIMEOUT_MILLIS , startedTime );
237254
238255 JsonNode jsonNodeProperties = convertPropertyListToJsonNode (properties );
256+ Change <?> upsert ;
257+ try {
258+ upsert = DecatonPropertyFileFormat .of (fileName ).createUpsertChange (fileName , jsonNodeProperties );
259+ } catch (IOException e ) {
260+ throw new UncheckedIOException (e );
261+ }
239262
240263 while (!fileExists && remainingTime > 0 ) {
241264 try {
242265 centralDogmaRepository
243266 .commit (String .format ("[CentralDogmaPropertySupplier] Property file created: %s" , fileName ),
244- Change . ofJsonUpsert ( fileName , jsonNodeProperties ) )
267+ upsert )
245268 .push (baseRevision )
246269 .get (remainingTime , TimeUnit .MILLISECONDS );
247270 logger .info ("New property file {} registered on Central Dogma" , fileName );
0 commit comments