Skip to content

Commit 297da62

Browse files
committed
set refid to be topic to allow for parsing
1 parent c308d3a commit 297da62

File tree

2 files changed

+8
-4
lines changed

2 files changed

+8
-4
lines changed

pkg/mqtt/framer.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (df *framer) key() string {
6161
if len(df.path) == 0 {
6262
return "Value"
6363
}
64-
return strings.Join(df.path, "")
64+
return strings.Join(df.path, "/")
6565
}
6666

6767
func (df *framer) addNil(logger log.Logger) {
@@ -99,14 +99,16 @@ func newFramer() *framer {
9999
return df
100100
}
101101

102-
func (df *framer) toFrame(messages []Message, logger log.Logger) (*data.Frame, error) {
102+
func (df *framer) toFrame(messages []Message, path string, logger log.Logger) (*data.Frame, error) {
103103
// clear the data in the fields
104104
for _, field := range df.fields {
105105
for i := field.Len() - 1; i >= 0; i-- {
106106
field.Delete(i)
107107
}
108108
}
109109

110+
df.path = strings.Split(path, "/")
111+
110112
for _, message := range messages {
111113
// df.iterator = jsoniter.ParseBytes(jsoniter.ConfigDefault, message.Value)
112114
// err := df.next(logger)
@@ -126,7 +128,7 @@ func (df *framer) toFrame(messages []Message, logger log.Logger) (*data.Frame, e
126128
df.extendFields(df.fields[0].Len() - 1)
127129
}
128130

129-
return data.NewFrame("mqtt", df.fields...), nil
131+
return data.NewFrame("mqtt", df.fields...).SetRefID(df.key()), nil
130132
}
131133

132134
func (df *framer) extendFields(idx int) {

pkg/mqtt/topic.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ func (t *Topic) ToDataFrame(logger log.Logger) (*data.Frame, error) {
3737
if t.framer == nil {
3838
t.framer = newFramer()
3939
}
40-
return t.framer.toFrame(t.Messages, logger)
40+
topic, _ := decodeTopic(t.Path, logger)
41+
42+
return t.framer.toFrame(t.Messages, topic, logger)
4143
}
4244

4345
// TopicMap is a thread-safe map of topics

0 commit comments

Comments
 (0)