mirror of
https://github.com/Kyren223/eko.git
synced 2025-09-05 21:18:14 +00:00
Fixed bug where server couldn't shutdown unless all clients closed their
connections willingly
This commit is contained in:
@@ -32,6 +32,8 @@ var nodeId int64 = 0
|
||||
|
||||
const CertFile = "EKO_SERVER_CERT_FILE"
|
||||
|
||||
const ReadCheckCancelledInterval = 1 * time.Second
|
||||
|
||||
func getTlsConfig() *tls.Config {
|
||||
path, ok := os.LookupEnv(CertFile)
|
||||
if !ok {
|
||||
@@ -181,9 +183,8 @@ func (s *server) Run() {
|
||||
|
||||
listener, err := tls.Listen("tcp4", ":"+strconv.Itoa(int(s.Port)), getTlsConfig())
|
||||
if err != nil {
|
||||
// TODO: we need certs even for dev
|
||||
slog.Error("error starting server", "error", err)
|
||||
os.Exit(1)
|
||||
assert.Abort("see logs")
|
||||
}
|
||||
|
||||
assert.AddFlush(listener)
|
||||
@@ -260,23 +261,19 @@ func (server *server) handleConnection(conn net.Conn) {
|
||||
for payload := range writeQueue {
|
||||
packet := packet.NewPacket(packet.NewMsgPackEncoder(payload))
|
||||
if _, err := packet.Into(conn); err != nil {
|
||||
// TODO: probably should add this to prevent the
|
||||
// "use of closed connection" error, as it's intended to happen
|
||||
// and once it happens we can just return
|
||||
// if !errors.Is(err, net.ErrClosed) {
|
||||
// log.Println(addr, err)
|
||||
// }
|
||||
slog.ErrorContext(ctx, "error sending packet", "error", err, "packet", packet.LogValue(), "payload", payload)
|
||||
return
|
||||
}
|
||||
slog.InfoContext(ctx, "packet sent", "packet", packet.LogValue(), "payload", payload)
|
||||
}
|
||||
slog.InfoContext(ctx, "writer done")
|
||||
}()
|
||||
|
||||
// Writer closer
|
||||
go func() {
|
||||
writerWg.Wait()
|
||||
sess.CloseWriteQueue() // causes writer to return
|
||||
slog.InfoContext(ctx, "closing writer...")
|
||||
}()
|
||||
|
||||
// Processor
|
||||
@@ -290,6 +287,7 @@ func (server *server) handleConnection(conn net.Conn) {
|
||||
for request := range framer.Out {
|
||||
processPacket(localCtx, sess, request)
|
||||
}
|
||||
slog.InfoContext(ctx, "processor done")
|
||||
}()
|
||||
|
||||
// NOTE: IMPROTANT LEGAL STUFF
|
||||
@@ -299,14 +297,20 @@ func (server *server) handleConnection(conn net.Conn) {
|
||||
// Reader
|
||||
buffer := make([]byte, 512)
|
||||
for {
|
||||
conn.SetReadDeadline(time.Now().Add(ReadCheckCancelledInterval))
|
||||
n, err := conn.Read(buffer)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) {
|
||||
slog.InfoContext(ctx, "closing gracefully")
|
||||
break
|
||||
} else if ne, ok := err.(net.Error); ok && ne.Timeout() {
|
||||
if ctx.Err() != nil {
|
||||
slog.InfoContext(ctx, "reader context done", "error", ctx.Err())
|
||||
} // else continue normally
|
||||
} else {
|
||||
slog.ErrorContext(ctx, "failed reading from buffer", "error", err)
|
||||
slog.ErrorContext(ctx, "failed reading from conn", "error", err)
|
||||
break
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
err = framer.Push(ctx, buffer[:n])
|
||||
@@ -323,6 +327,7 @@ func (server *server) handleConnection(conn net.Conn) {
|
||||
}
|
||||
}
|
||||
close(framer.Out) // stop processing
|
||||
slog.InfoContext(ctx, "reader done, closed framer")
|
||||
|
||||
<-done
|
||||
}
|
||||
|
Reference in New Issue
Block a user