Conversation
|
Will reopen after investigating test failures |
|
IMPORTANT NOTE - With this PR in case the segment granularity is increased between tranquility restarts, tranquility would require Druid to create a segment with interval != segment granular interval (Why? - Read first comment above). However, this is not supported by RealtimePlumber and after having discussion with @cheddar we thought that it will be a good to have thing when Druid moves to support kappa like architecture. So the PR might need to wait till this is supported. @gianm What do you think about the PR in general ? Does the Appenderator stuff that you are working on supports creation of segments with such interval ? |
dd9b3bd to
dbf3eea
Compare
|
@pjain1 the appenderators do support creation of variable-sized segments. |
|
👍 that's cool. Will wait till the Appenderator stuff is available in Druid |
With this PR tranquility can handle segment granularity changes without losing data and also prevents tasks (that spans the new and old segment interval) to hang up indefinitely. The basic idea used here is that we see an event and if the event timestamp falls in some existing beam interval we use that beam otherwise we try to create new beam. Implications of the following changes -
To achieve this functionality following design changes have been made -
Beamtrait now exposesgetInterval()which returns an optional interval as it may not make sense for all implementations ofBeambeamsobject inClusteredBeamis now a reverse sorted list (by interval start) of beams known to us. This will be used while grouping the events to check if the event timestamp falls in the interval of theBeam. This should not degrade the performance as compared to doing look up in HashMap as most of the times the head item in thebeamslist would be the one that can handle the event, also truncating the event timestamp will be node be needed anymore.Question - I am not sure why the
beamsobject was ConcurrentHashMap instead of just HashMap ? As all the writes tobeamsare synchronized usingbeamWriteMonitor, I have used non-thread safe list forbeamsin this code.Note - This is the first time I am writing Scala code so please feel free to point out mistakes