Skip to content

Commit 4a802a9

Browse files
author
Richard Patel
committed
add tx success metric
1 parent 482fb15 commit 4a802a9

File tree

6 files changed

+183
-12
lines changed

6 files changed

+183
-12
lines changed

balance.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"go.uber.org/zap"
1212
)
1313

14+
// balanceScraper retrieves the Pyth publisher balances on request.
1415
type balanceScraper struct {
1516
*prometheus.GaugeVec
1617

@@ -28,6 +29,7 @@ func newBalanceScraper(publishers []solana.PublicKey, rpcURL string, log *zap.Lo
2829
}
2930
}
3031

32+
// Collect gets invoked by the Prometheus exporter when a new scrape is requested.
3133
func (b *balanceScraper) Collect(metrics chan<- prometheus.Metric) {
3234
const collectTimeout = 5 * time.Second
3335
ctx, cancel := context.WithTimeout(context.Background(), collectTimeout)

logger.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,6 @@ func getLogger() *zap.Logger {
1818
if err != nil {
1919
panic("no logger: " + err.Error())
2020
}
21+
logger.Info("Log level", zap.Stringer("level", flagLogLevel))
2122
return logger
2223
}

main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,14 @@ func main() {
134134
balances := newBalanceScraper(publishKeys.pubkeys, *rpcURL, log.Named("balances"))
135135
metrics.Registry.MustRegister(balances)
136136

137+
// Create tx tailer.
138+
txs := newTxScraper(*rpcURL, log.Named("txs"), publishKeys.pubkeys)
139+
group.Go(func() error {
140+
const scrapeInterval = 5 * time.Second
141+
txs.run(ctx, scrapeInterval)
142+
return nil
143+
})
144+
137145
if err := group.Wait(); err != nil {
138146
log.Fatal("App crashed", zap.Error(err))
139147
}

metrics/metrics.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,17 @@ var factory = promauto.With(Registry)
2525

2626
// On-chain transaction execution status.
2727
const (
28-
txStatusSuccess = "success"
29-
txStatusFailed = "failed"
28+
TxStatusSuccess = "success"
29+
TxStatusFailed = "failed"
3030
)
3131

3232
var (
33-
// RPC request stats
34-
rpcRequestsTotal = factory.NewCounterVec(prometheus.CounterOpts{
33+
RpcRequestsTotal = factory.NewCounter(prometheus.CounterOpts{
3534
Namespace: Namespace,
3635
Subsystem: SubsystemExporter,
3736
Name: "rpc_requests_total",
3837
Help: "Number of outgoing RPC requests from pyth_exporter to RPC nodes",
39-
}, []string{})
38+
})
4039
WsActiveConns = factory.NewGauge(prometheus.GaugeOpts{
4140
Namespace: Namespace,
4241
Subsystem: SubsystemExporter,
@@ -88,13 +87,7 @@ var (
8887
Help: "Last observed slot for Pyth publisher",
8988
}, []string{labelProduct, labelPublisher})
9089

91-
// Publisher Observers
92-
metricTxFeesTotal = factory.NewCounterVec(prometheus.CounterOpts{
93-
Namespace: Namespace,
94-
Name: "tx_fees_total",
95-
Help: "Approximate amount of SOL in lamports spent on Pyth publishing",
96-
}, []string{labelPublisher})
97-
metricTxCount = factory.NewCounterVec(prometheus.CounterOpts{
90+
TxCount = factory.NewCounterVec(prometheus.CounterOpts{
9891
Namespace: Namespace,
9992
Name: "txs_total",
10093
Help: "Approximate number of Pyth transactions sent",

price.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"go.blockdaemon.com/pyth_exporter/pyth"
99
)
1010

11+
// priceScraper scrapes prices out of the on-chain Pyth price accounts.
1112
type priceScraper struct {
1213
productKeys []solana.PublicKey
1314
publishKeys []solana.PublicKey

txs.go

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/gagliardetto/solana-go"
9+
"github.com/gagliardetto/solana-go/rpc"
10+
"go.blockdaemon.com/pyth_exporter/metrics"
11+
"go.uber.org/zap"
12+
)
13+
14+
// txScraper polls publisher transactions.
15+
type txScraper struct {
16+
tailers []*txTailer
17+
log *zap.Logger
18+
rpc *rpc.Client
19+
}
20+
21+
func newTxScraper(rpcURL string, log *zap.Logger, publishers []solana.PublicKey) *txScraper {
22+
scraper := &txScraper{
23+
rpc: rpc.New(rpcURL),
24+
log: log,
25+
tailers: make([]*txTailer, len(publishers)),
26+
}
27+
28+
for i, pubkey := range publishers {
29+
scraper.tailers[i] = newTxTailer(scraper, pubkey)
30+
}
31+
32+
return scraper
33+
}
34+
35+
func (s *txScraper) run(ctx context.Context, interval time.Duration) {
36+
ticker := time.NewTicker(interval)
37+
defer ticker.Stop()
38+
39+
for {
40+
wait:
41+
select {
42+
case <-ctx.Done():
43+
return
44+
case <-ticker.C:
45+
break wait
46+
}
47+
48+
s.poll(ctx, interval)
49+
}
50+
}
51+
52+
func (s *txScraper) poll(ctx context.Context, interval time.Duration) {
53+
ctx, cancel := context.WithTimeout(ctx, interval)
54+
defer cancel()
55+
56+
var wg sync.WaitGroup
57+
wg.Add(len(s.tailers))
58+
for _, tailer := range s.tailers {
59+
go s.pollOne(ctx, &wg, tailer)
60+
}
61+
wg.Wait()
62+
}
63+
64+
func (s *txScraper) pollOne(ctx context.Context, wg *sync.WaitGroup, tailer *txTailer) {
65+
defer wg.Done()
66+
for {
67+
isEnd, err := tailer.poll(ctx)
68+
if isEnd {
69+
break
70+
}
71+
if err != nil {
72+
s.log.Warn("Failed to poll account txs", zap.Error(err))
73+
}
74+
}
75+
}
76+
77+
// txTailer "tails" the transaction log of an account.
78+
type txTailer struct {
79+
*txScraper
80+
81+
pubkey solana.PublicKey
82+
pubkeyStr string
83+
lastSlot uint64
84+
lastSig solana.Signature
85+
}
86+
87+
func newTxTailer(scraper *txScraper, pubkey solana.PublicKey) *txTailer {
88+
return &txTailer{
89+
txScraper: scraper,
90+
pubkey: pubkey,
91+
pubkeyStr: pubkey.String(),
92+
}
93+
}
94+
95+
func (t *txTailer) refreshLastSig(ctx context.Context) error {
96+
oneInt := 1
97+
sigs, err := t.rpc.GetSignaturesForAddressWithOpts(ctx, t.pubkey, &rpc.GetSignaturesForAddressOpts{
98+
Limit: &oneInt,
99+
})
100+
if err != nil {
101+
return err
102+
}
103+
metrics.RpcRequestsTotal.Inc()
104+
105+
if len(sigs) == 0 {
106+
return nil // empty account
107+
}
108+
t.lastSlot = sigs[0].Slot
109+
t.lastSig = sigs[0].Signature
110+
return nil
111+
}
112+
113+
// poll retrieves the latest transactions.
114+
func (t *txTailer) poll(ctx context.Context) (end bool, err error) {
115+
t.log.Debug("Polling new txs",
116+
zap.String("publisher", t.pubkeyStr),
117+
zap.Stringer("last_sig", t.lastSig))
118+
119+
// Get starting sig if none.
120+
if t.lastSig.IsZero() {
121+
return true, t.refreshLastSig(ctx)
122+
}
123+
124+
// Get sigs since last check.
125+
var tailLimit = 100
126+
sigs, err := t.rpc.GetSignaturesForAddressWithOpts(ctx, t.pubkey, &rpc.GetSignaturesForAddressOpts{
127+
Limit: &tailLimit,
128+
Until: t.lastSig,
129+
})
130+
if err != nil {
131+
return true, err
132+
}
133+
metrics.RpcRequestsTotal.Inc()
134+
135+
if len(sigs) == 0 {
136+
return true, nil
137+
}
138+
139+
// Iteration is newest to latest.
140+
// So write down the first sig as the newest sig, so we can later continue.
141+
stopSlot := t.lastSlot
142+
if sigs[0].Slot > t.lastSlot {
143+
t.lastSlot = sigs[0].Slot
144+
t.lastSig = sigs[0].Signature
145+
}
146+
147+
// If the number of returned sigs matches exactly our requested limit, there is probably more.
148+
end = len(sigs) != tailLimit
149+
150+
// Scroll through page.
151+
for len(sigs) > 0 && sigs[0].Slot > stopSlot {
152+
sig := sigs[0]
153+
154+
var status string
155+
if sig.Err != nil {
156+
status = metrics.TxStatusFailed
157+
} else {
158+
status = metrics.TxStatusSuccess
159+
}
160+
metrics.TxCount.WithLabelValues(t.pubkeyStr, status).Inc()
161+
162+
sigs = sigs[1:]
163+
}
164+
165+
return end, nil
166+
}

0 commit comments

Comments
 (0)