Added better logging for packets and payloads and now the writer is

responsible for packaging into a packet the payload (instead of everyone
who sends to the writer)
This commit is contained in:
2025-07-06 18:20:46 +03:00
parent d447505072
commit bbd4eae95f
6 changed files with 37 additions and 39 deletions

View File

@@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"math"
"github.com/vmihailenco/msgpack/v5"
@@ -237,9 +238,20 @@ func (p Packet) Payload() []byte {
}
func (p Packet) String() string {
assert.Never("oops, found someone who uses packet.String(), improve this")
return fmt.Sprintf("Packet(v%v t%v %v [%v bytes...])", p.Version(), p.Encoding().String(), p.Type(), p.PayloadLength())
}
func (p Packet) LogValue() slog.Value {
return slog.GroupValue(
slog.Int("version", int(p.Version())),
slog.String("encoding", p.Encoding().String()),
slog.String("type", p.Type().String()),
slog.Int("payload_length", int(p.PayloadLength())),
slog.Int("total_bytes", len(p.data)),
)
}
func (p Packet) Into(writer io.Writer) (int, error) {
return writer.Write(p.data)
}

View File

@@ -14,7 +14,6 @@ import (
"github.com/kyren223/eko/internal/data"
"github.com/kyren223/eko/internal/packet"
"github.com/kyren223/eko/internal/server/ctxkeys"
"github.com/kyren223/eko/internal/server/session"
"github.com/kyren223/eko/pkg/assert"
"github.com/kyren223/eko/pkg/snowflake"
@@ -1626,7 +1625,7 @@ func Authenticate(ctx context.Context, sess *session.Session, request *packet.Au
// NOTE: as per the protocol, this must be the first message after auth
payload := &packet.UsersInfo{Users: []data.User{user}}
ok := sess.Write(ctx, WrapPayload(payload))
ok := sess.Write(ctx, payload)
if !ok {
// Timeout, send at least this payload, client can request the rest
return payload
@@ -1657,8 +1656,8 @@ func sendInitialAuthPackets(ctx context.Context, sess *session.Session) bool {
success = false
continue
}
slog.InfoContext(ctx, "sending initial auth payload", ctxkeys.Payload.String(), payload, ctxkeys.PayloadType.String(), payload.Type())
ok := sess.Write(ctx, WrapPayload(payload))
slog.InfoContext(ctx, "sending initial auth payload", "payload", payload, "payload_type", payload.Type())
ok := sess.Write(ctx, payload)
if !ok {
success = false
continue

View File

@@ -79,7 +79,7 @@ func NetworkPropagateWithFilter(
context, cancel := context.WithTimeout(context.Background(), timeout)
go func() {
defer cancel()
if ok := session.Write(context, WrapPayload(payload)); !ok {
if ok := session.Write(context, payload); !ok {
log.Println(sess.Addr(), "propagation to", session.Addr(), "failed")
}
}()
@@ -121,7 +121,7 @@ func UserPropagate(
context, cancel := context.WithTimeout(context.Background(), timeout)
go func() {
defer cancel()
if ok := session.Write(context, WrapPayload(payload)); !ok {
if ok := session.Write(context, payload); !ok {
log.Println(sess.Addr(), "propagation to", session.Addr(), "failed")
}
}()
@@ -189,7 +189,3 @@ func getNotifications(ctx context.Context, userId snowflake.ID) (packet.Notifica
}
return items, nil
}
func WrapPayload(payload packet.Payload) packet.Packet {
return packet.NewPacket(packet.NewMsgPackEncoder(payload))
}

View File

@@ -12,21 +12,13 @@ type key int
const (
UserID key = iota
IpAddr
Evicted
EvictedBy
Payload
PayloadType
KeyMax
)
var keyNames = map[key]string{
UserID: "user_id",
IpAddr: "ip_addr",
Evicted: "evicted",
EvictedBy: "evicted_by",
Payload: "payload",
PayloadType: "payload_type",
UserID: "user_id",
IpAddr: "ip_addr",
}
func (k key) String() string {

View File

@@ -83,12 +83,12 @@ func (s *server) AddSession(session *session.Session, userId snowflake.ID, pubKe
slog.Info("closed due to new connection from another location",
ctxkeys.IpAddr.String(), sess.Addr(),
ctxkeys.UserID.String(), sess.ID(),
ctxkeys.EvictedBy.String(), session.Addr(),
"evicted_by", session.Addr(),
)
slog.Info("this session evicted another session",
ctxkeys.IpAddr.String(), session.Addr(),
ctxkeys.UserID.String(), session.ID(),
ctxkeys.Evicted.String(), sess.Addr(),
"evicted", sess.Addr(),
)
}
@@ -102,7 +102,7 @@ func EvictSession(sess *session.Session) {
payload := &packet.Error{
Error: "new connection from another location, closing this one",
}
sess.Write(ctx, api.WrapPayload(payload))
sess.Write(ctx, payload)
sess.Close()
}
@@ -209,7 +209,8 @@ func (server *server) handleConnection(conn net.Conn) {
defer conn.Close() // To unblock reader
writeQueue := sess.Read()
for packet := range writeQueue {
for payload := range writeQueue {
packet := packet.NewPacket(packet.NewMsgPackEncoder(payload))
if _, err := packet.Into(conn); err != nil {
// TODO: probably should add this to prevent the
// "use of closed connection" error, as it's intended to happen
@@ -217,10 +218,10 @@ func (server *server) handleConnection(conn net.Conn) {
// if !errors.Is(err, net.ErrClosed) {
// log.Println(addr, err)
// }
slog.ErrorContext(ctx, "error sending packet", "error", err, "packet", packet)
slog.ErrorContext(ctx, "error sending packet", "error", err, "packet", packet.LogValue(), "payload", payload)
return
}
slog.InfoContext(ctx, "packet sent", "packet", packet)
slog.InfoContext(ctx, "packet sent", "packet", packet.LogValue(), "payload", payload)
}
}()
@@ -253,7 +254,7 @@ func (server *server) handleConnection(conn net.Conn) {
n, err := conn.Read(buffer)
if err != nil {
if errors.Is(err, io.EOF) {
slog.InfoContext(ctx, "closed gracefully")
slog.InfoContext(ctx, "closing gracefully")
} else {
slog.ErrorContext(ctx, "failed reading from buffer", "error", err)
}
@@ -267,7 +268,7 @@ func (server *server) handleConnection(conn net.Conn) {
}
if err != nil {
writerWg.Add(1)
sess.Write(ctx, api.WrapPayload(&packet.Error{Error: err.Error()}))
sess.Write(ctx, &packet.Error{Error: err.Error()})
writerWg.Done()
slog.WarnContext(ctx, "received malformed packet", "error", err)
break
@@ -290,15 +291,14 @@ func processPacket(ctx context.Context, sess *session.Session, pkt packet.Packet
// Nil is ok if responses were handled manually using sess.Write()
if response != nil {
ok := sess.Write(ctx, api.WrapPayload(response))
ok := sess.Write(ctx, response)
assert.Assert(ok, "context is never done and write will panic if queue is closed")
}
}
func processRequest(ctx context.Context, sess *session.Session, request packet.Payload) packet.Payload {
slog.InfoContext(ctx, "processing request",
ctxkeys.PayloadType.String(),
request.Type(), ctxkeys.Payload.String(), request,
"request", request, "request_type", request.Type(),
)
if !sess.IsTosAccepted() {
@@ -429,8 +429,7 @@ func timeout[T packet.Payload](
return response
case <-ctx.Done():
slog.WarnContext(ctx, "request timeout",
ctxkeys.Payload.String(), request,
ctxkeys.PayloadType.String(), request.Type(),
"request", request, "request_type", request.Type(),
)
return &packet.Error{Error: "request timeout"}
}
@@ -448,5 +447,5 @@ func sendTosInfo(ctx context.Context, sess *session.Session) bool {
PrivacyPolicy: privacy,
Date: date,
}
return sess.Write(ctx, api.WrapPayload(payload))
return sess.Write(ctx, payload)
}

View File

@@ -32,7 +32,7 @@ type Session struct {
addr *net.TCPAddr
cancel context.CancelFunc
writeQueue chan packet.Packet
writeQueue chan packet.Payload
writerWg *sync.WaitGroup
writeMu sync.RWMutex
@@ -57,7 +57,7 @@ func NewSession(
manager: manager,
addr: addr,
cancel: cancel,
writeQueue: make(chan packet.Packet, WriteQueueSize),
writeQueue: make(chan packet.Payload, WriteQueueSize),
writerWg: writerWg,
writeMu: sync.RWMutex{},
issuedTime: time.Time{},
@@ -117,7 +117,7 @@ func (s *Session) Challenge() []byte {
return s.challenge
}
func (s *Session) Write(ctx context.Context, pkt packet.Packet) bool {
func (s *Session) Write(ctx context.Context, payload packet.Payload) bool {
s.writerWg.Add(1)
defer s.writerWg.Done()
@@ -125,14 +125,14 @@ func (s *Session) Write(ctx context.Context, pkt packet.Packet) bool {
defer s.writeMu.RUnlock()
select {
case s.writeQueue <- pkt:
case s.writeQueue <- payload:
return true
case <-ctx.Done():
return false
}
}
func (s *Session) Read() <-chan packet.Packet {
func (s *Session) Read() <-chan packet.Payload {
s.writeMu.RLock()
defer s.writeMu.RUnlock()
return s.writeQueue