Skip to content
This repository was archived by the owner on Dec 5, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,35 @@ jobs:
- name: Check out code
uses: actions/checkout@v4

- name: unit tests
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.23'

- name: Cache Go modules
uses: actions/cache@v4
with:
path: |
~/.cache/go-build
~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: |
${{ runner.os }}-go-

- name: Install dependencies
run: go mod download

- name: Run unit tests
run: make test

integration:
runs-on: ubuntu-latest
strategy:
matrix:
storage: [memory, postgres, kafka]
fail-fast: false # Continue testing other storage types even if one fails

name: integration-${{ matrix.storage }}

steps:
- uses: actions/checkout@v4
Expand Down Expand Up @@ -47,11 +71,8 @@ jobs:
- name: Install dependencies
run: go mod download

- name: Run unit tests
run: make test

- name: Build bridge
run: make build

- name: Run integration tests
run: make integration-test
- name: Run integration tests with ${{ matrix.storage }} storage
run: make integration-test-${{ matrix.storage }}
16 changes: 14 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
GOFMT_FILES?=$$(find . -name '*.go' | grep -v vendor | grep -v yacc | grep -v .git)

.PHONY: all imports fmt test test-bridge-sdk clean-bridge-sdk test-all integration-test run stop clean logs status
.PHONY: all imports fmt test test-bridge-sdk clean-bridge-sdk test-all integration-test integration-test-memory integration-test-postgres integration-test-kafka run stop clean logs status

all: imports fmt test

Expand Down Expand Up @@ -30,7 +30,19 @@ test-all: test test-bridge-sdk
@echo "All tests completed successfully!"

integration-test:
@./scripts/integration-test.sh
@./scripts/integration-test.sh memory

integration-test-memory:
@echo "🧪 Running integration tests with memory storage..."
@./scripts/integration-test.sh memory

integration-test-postgres:
@echo "🧪 Running integration tests with PostgreSQL storage..."
@./scripts/integration-test.sh postgres

integration-test-kafka:
@echo "🧪 Running integration tests with Kafka storage..."
@./scripts/integration-test.sh kafka

run:
@echo "Starting bridge environment with nginx load balancer and 3 bridge instances..."
Expand Down
73 changes: 72 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,30 @@ make build
./callmebridge
```

### PostgreSQL Storage
```bash
# Start PostgreSQL and set environment
export STORAGE_TYPE="postgres"
export POSTGRES_URI="postgres://user:pass@localhost/bridge"
./callmebridge
```

### Kafka Storage
```bash
# Start Kafka (using Docker Compose)
docker-compose -f docker-compose.kafka.yml up -d kafka

# Or use the provided script
./scripts/run-with-kafka.sh
```

### Full Multi-Instance Setup with Kafka
```bash
# Start the entire stack with load balancer
docker-compose -f docker-compose.kafka.yml up
# Access via http://localhost:8080 (nginx load balancer)
```

## 📋Requirements

- Go 1.23+
Expand All @@ -23,9 +47,56 @@ Configure using environment variables:

```bash
PORT=8081 # Server port
POSTGRES_URI="postgres://user:pass@host/dbname" # Database connection

# Storage Configuration
STORAGE_TYPE="memory" # Storage type: memory, postgres, kafka
POSTGRES_URI="postgres://user:pass@host/dbname" # PostgreSQL connection (for postgres storage)

# Kafka Configuration (for kafka storage)
KAFKA_BROKERS="localhost:9092,localhost:9093" # Comma-separated Kafka broker addresses
KAFKA_TOPIC="bridge-messages" # Kafka topic name (default: bridge-messages)
KAFKA_CONSUMER_GROUP="bridge-consumer" # Kafka consumer group (default: bridge-consumer)

# Other Configuration
WEBHOOK_URL="" # Webhook URL for message forwarding
COPY_TO_URL="" # URL to copy messages to
CORS_ENABLE=false # Enable CORS
HEARTBEAT_INTERVAL=10 # Heartbeat interval in seconds
RPS_LIMIT=1000 # Rate limit (requests per second)
CONNECTIONS_LIMIT=200 # Maximum concurrent connections
SELF_SIGNED_TLS=false # Use self-signed TLS certificate
```

### Storage Types

**Memory Storage** (default)
- Fast, in-memory storage
- Data is lost on restart
- Best for development and testing

**PostgreSQL Storage**
- Persistent storage with automatic migrations
- Supports high availability setups
- Automatic cleanup of expired messages

### Kafka Storage

**Kafka Storage** uses modern **KRaft mode** (Kafka Raft metadata mode) instead of Zookeeper:

✅ **KRaft Benefits:**
- **Simpler architecture**: No separate Zookeeper cluster needed
- **Faster startup**: Reduced coordination overhead
- **Better performance**: Lower latency for metadata operations
- **Easier scaling**: Single service to manage and scale
- **Production-ready**: Available since Kafka 3.3.0+

**Features:**
- Distributed, scalable message storage
- Hybrid approach: messages stored in Kafka with in-memory caching
- Automatic message expiration and cleanup
- Supports multiple bridge instances
- Built-in health checks and monitoring

## 🛠️API Endpoints

### Bridge Endpoints
Expand Down
21 changes: 17 additions & 4 deletions cmd/bridge/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,28 @@ func main() {
log.Info("Bridge is running")
config.LoadConfig()

dbConn, err := storage.NewStorage(config.Config.DbURI)
storageConfig := storage.StorageConfig{
Type: config.Config.StorageType,
PostgresURI: config.Config.DbURI,
KafkaBrokers: config.Config.KafkaBrokers,
KafkaTopic: config.Config.KafkaTopic,
KafkaConsumerGroup: config.Config.KafkaConsumerGroup,
}

dbConn, err := storage.NewStorage(storageConfig)

if err != nil {
log.Fatalf("failed to create storage: %v", err)
}
if _, ok := dbConn.(*storage.MemStorage); ok {
log.Info("Using in-memory storage")
} else {

// Log which storage type is being used
switch storageConfig.Type {
case "kafka":
log.Info("Using Kafka storage")
case "postgres", "pg":
log.Info("Using PostgreSQL storage")
default:
log.Info("Using in-memory storage")
}

healthMetric.Set(1)
Expand Down
145 changes: 145 additions & 0 deletions docker-compose.kafka.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
version: '3.8'

services:
kafka:
image: confluentinc/cp-kafka:7.4.0
hostname: kafka
container_name: kafka
ports:
- "9092:9092"
- "9093:9093"
- "9997:9997"
environment:
# KRaft mode configuration (no Zookeeper needed!)
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_LISTENERS: 'PLAINTEXT://kafka:29092,CONTROLLER://kafka:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093'
KAFKA_PROCESS_ROLES: 'broker,controller'

# Topic and replication settings
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_NUM_PARTITIONS: 3

# Auto-create topics
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

# JMX for monitoring
KAFKA_JMX_PORT: 9997
KAFKA_JMX_HOSTNAME: localhost

# Performance tuning
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_LOG_RETENTION_MS: 3600000 # 1 hour retention for bridge messages

# KRaft cluster metadata
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
volumes:
- kafka_data:/var/lib/kafka/data
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
interval: 10s
timeout: 5s
retries: 5
networks:
- bridge_network

bridge1:
build: .
environment:
PORT: 8081
STORAGE_TYPE: "kafka"
KAFKA_BROKERS: "kafka:29092"
KAFKA_TOPIC: "bridge-messages"
KAFKA_CONSUMER_GROUP: "bridge-consumer-1"
CORS_ENABLE: "true"
HEARTBEAT_INTERVAL: 10
RPS_LIMIT: 1000
CONNECTIONS_LIMIT: 200
depends_on:
kafka:
condition: service_healthy
networks:
- bridge_network
restart: unless-stopped

bridge2:
build: .
environment:
PORT: 8081
STORAGE_TYPE: "kafka"
KAFKA_BROKERS: "kafka:29092"
KAFKA_TOPIC: "bridge-messages"
KAFKA_CONSUMER_GROUP: "bridge-consumer-2"
CORS_ENABLE: "true"
HEARTBEAT_INTERVAL: 10
RPS_LIMIT: 1000
CONNECTIONS_LIMIT: 200
depends_on:
kafka:
condition: service_healthy
networks:
- bridge_network
restart: unless-stopped

bridge3:
build: .
environment:
PORT: 8081
STORAGE_TYPE: "kafka"
KAFKA_BROKERS: "kafka:29092"
KAFKA_TOPIC: "bridge-messages"
KAFKA_CONSUMER_GROUP: "bridge-consumer-3"
CORS_ENABLE: "true"
HEARTBEAT_INTERVAL: 10
RPS_LIMIT: 1000
CONNECTIONS_LIMIT: 200
depends_on:
kafka:
condition: service_healthy
networks:
- bridge_network
restart: unless-stopped

nginx:
image: nginx:alpine
ports:
- "8080:80"
volumes:
- ./nginx.conf:/etc/nginx/conf.d/default.conf
depends_on:
- bridge1
- bridge2
- bridge3
networks:
- bridge_network
restart: unless-stopped

# Kafka UI for monitoring (now without Zookeeper dependency)
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka
ports:
- "8090:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
# No Zookeeper configuration needed for KRaft mode
networks:
- bridge_network

volumes:
kafka_data:
driver: local

networks:
bridge_network:
driver: bridge
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ require (
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/labstack/gommon v0.4.2 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pierrec/lz4/v4 v4.1.16 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.63.0 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/segmentio/kafka-go v0.4.48 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
go.uber.org/atomic v1.11.0 // indirect
Expand Down
Loading
Loading