Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion internal/agent/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/amir20/dozzle/internal/container"
"github.com/amir20/dozzle/internal/utils"
"github.com/amir20/dozzle/types"
"github.com/go-faker/faker/v4"
"github.com/go-faker/faker/v4/pkg/options"
"github.com/stretchr/testify/assert"
Expand All @@ -28,6 +29,16 @@ var lis *bufconn.Listener
var certs tls.Certificate
var mockService *MockedClientService

type mockNotificationHandler struct{}

func (m *mockNotificationHandler) HandleNotificationConfig(subscriptions []types.SubscriptionConfig, dispatchers []types.DispatcherConfig) error {
return nil
}

func (m *mockNotificationHandler) GetNotificationStats() []types.SubscriptionStats {
return nil
}

type MockedClientService struct {
mock.Mock
}
Expand Down Expand Up @@ -134,7 +145,7 @@ func init() {

mockService.On("Client").Return(nil)

server, _ := NewServer(mockService, certs, "test", nil)
server, _ := NewServer(mockService, certs, "test", &mockNotificationHandler{})
go server.Serve(lis)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/agent/pb/rpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/agent/pb/rpc_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/agent/pb/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 6 additions & 11 deletions internal/agent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type server struct {
}

func newServer(service ClientService, dozzleVersion string, notificationHandler NotificationConfigHandler) pb.AgentServiceServer {
if notificationHandler == nil {
log.Fatal().Msg("No notification config handler registered")
}

return &server{
service: service,
version: dozzleVersion,
Expand Down Expand Up @@ -390,11 +394,6 @@ func (s *server) ContainerAttach(stream pb.AgentService_ContainerAttachServer) e
}

func (s *server) UpdateNotificationConfig(ctx context.Context, req *pb.UpdateNotificationConfigRequest) (*pb.UpdateNotificationConfigResponse, error) {
if s.notificationConfigHandler == nil {
log.Warn().Msg("No notification config handler registered, ignoring config update")
return &pb.UpdateNotificationConfigResponse{}, nil
}

// Validate request sizes to prevent memory exhaustion
const maxSubscriptions = 1000
const maxDispatchers = 100
Expand Down Expand Up @@ -446,17 +445,13 @@ func (s *server) UpdateNotificationConfig(ctx context.Context, req *pb.UpdateNot
}

func (s *server) GetNotificationStats(ctx context.Context, req *pb.GetNotificationStatsRequest) (*pb.GetNotificationStatsResponse, error) {
if s.notificationConfigHandler == nil {
return &pb.GetNotificationStatsResponse{}, nil
}

stats := s.notificationConfigHandler.GetNotificationStats()

pbStats := make([]*pb.NotificationSubscriptionStats, len(stats))
for i, s := range stats {
pbStat := &pb.NotificationSubscriptionStats{
SubscriptionId: int32(s.SubscriptionID),
TriggerCount: s.TriggerCount,
SubscriptionId: int32(s.SubscriptionID),
TriggerCount: s.TriggerCount,
TriggeredContainerIds: s.TriggeredContainerIDs,
}
if s.LastTriggeredAt != nil {
Expand Down
8 changes: 8 additions & 0 deletions internal/support/docker/multi_host_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func (m *MultiHostService) broadcastNotificationConfig() {
LogExpression: sub.LogExpression,
ContainerExpression: sub.ContainerExpression,
MetricExpression: sub.MetricExpression,
EventExpression: sub.EventExpression,
Cooldown: sub.Cooldown,
SampleWindow: sub.SampleWindow,
}
Expand All @@ -267,6 +268,7 @@ func (m *MultiHostService) broadcastNotificationConfig() {
Type: d.Type,
URL: d.URL,
Template: d.Template,
Headers: d.Headers,
APIKey: d.APIKey,
Prefix: d.Prefix,
ExpiresAt: d.ExpiresAt,
Expand All @@ -291,6 +293,12 @@ func (m *MultiHostService) broadcastNotificationConfig() {
wg.Wait()
}

// NotificationHandler returns the notification manager as an agent.NotificationConfigHandler.
// This is used in swarm mode to pass the handler to the local agent server.
func (m *MultiHostService) NotificationHandler() *notification.Manager {
return m.notificationManager
}

// AddSubscription adds a subscription to local manager and broadcasts to agents
func (m *MultiHostService) AddSubscription(sub *notification.Subscription) error {
if err := m.notificationManager.AddSubscription(sub); err != nil {
Expand Down
3 changes: 1 addition & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ func main() {
}
// Create client service for agent server in swarm mode
clientService := docker_support.NewDockerClientService(localClient, args.Filter)
// TODO add notification for swarm mode
server, err := agent.NewServer(clientService, certs, args.Version(), nil)
server, err := agent.NewServer(clientService, certs, args.Version(), multiHostService.NotificationHandler())
if err != nil {
log.Fatal().Err(err).Msg("failed to create agent")
}
Expand Down
Loading