Added in progress requests metric and also returning an error when the

user hits the rate limit
This commit is contained in:
2025-07-15 19:25:43 +03:00
parent afee78d625
commit 8f58905364
3 changed files with 10 additions and 21 deletions

View File

@@ -21,6 +21,7 @@ var (
ErrInternalError = packet.Error{Error: "internal server error"}
ErrPermissionDenied = packet.Error{Error: "permission denied"}
ErrNotImplemented = packet.Error{Error: "not implemented yet"}
ErrRateLimited = packet.Error{Error: "rate limited"}
ErrSuccess = packet.Error{Error: "success"}
DefaultBanReason = ""

View File

@@ -13,11 +13,11 @@ var RequestsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
Help: "The total number of processed requests",
}, []string{"request_type", "dropped"})
// var RequestsInProgress = promauto.NewCounterVec(prometheus.CounterOpts{
// Namespace: namespace,
// Name: "requests_in_progress_total",
// Help: "The total number of in-progress requests",
// }, []string{"request_type"})
var RequestsInProgress = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Name: "requests_in_progress_total",
Help: "The total number of in-progress requests",
}, []string{"request_type"})
var RequestProcessingDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
@@ -26,20 +26,6 @@ var RequestProcessingDuration = promauto.NewHistogramVec(prometheus.HistogramOpt
NativeHistogramBucketFactor: 1.00271,
}, []string{"request_type", "dropped"})
// var RequestProcessingDuration = promauto.NewSummaryVec(prometheus.SummaryOpts{
// Namespace: namespace,
// Name: "request_processing_duration_seconds",
// Help: "The duration in seconds it took to process a request",
// Objectives: map[float64]float64{
// 0.01: 0.001,
// 0.50: 0.005,
// 0.90: 0.009,
// 0.95: 0.0095,
// 0.99: 0.0099,
// },
// MaxAge: 1 * time.Hour,
// }, []string{"request_type"})
var ConnectionsEstablished = promauto.NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Name: "connections_established_total",

View File

@@ -300,22 +300,23 @@ func (server *server) handleConnection(conn net.Conn) {
// will still have a time limit upper bound, from timeout()
for request := range framer.Out {
metrics.RequestsInProgress.WithLabelValues(request.Type().String()).Inc()
start := time.Now().UTC()
success := processPacket(localCtx, sess, request)
duration := time.Since(start)
metrics.RequestsInProgress.WithLabelValues(request.Type().String()).Dec()
labels := prometheus.Labels{
"request_type": request.Type().String(),
"dropped": strconv.FormatBool(!success),
}
metrics.RequestsProcessed.With(labels).Inc()
metrics.RequestProcessingDuration.With(labels).Observe(float64(duration.Seconds()))
if success {
slog.InfoContext(ctx, "processed request", "request_type", request.Type().String(), "duration", duration.String(), "duration_ns", duration.Nanoseconds())
} else {
slog.InfoContext(ctx, "dropped request", "request_type", request.Type().String(), "duration", duration.String(), "duration_ns", duration.Nanoseconds())
}
}
slog.InfoContext(ctx, "processor done")
}()
@@ -370,6 +371,7 @@ func (server *server) handleConnection(conn net.Conn) {
func processPacket(ctx context.Context, sess *session.Session, pkt packet.Packet) bool {
tokens := TokensPerRequest(pkt.Type())
if !sess.RateLimiter().Take(tokens) {
_ = sess.Write(ctx, &api.ErrRateLimited)
return false // Rate limit was hit
}