go-twitch/eventsub/websocket/websocket.go

120 lines
1.9 KiB
Go

package websocket
import (
"context"
"encoding/json"
"errors"
"go.fifitido.net/twitch/api/eventsub"
"go.fifitido.net/twitch/eventsub/websocket/messages"
"nhooyr.io/websocket"
)
const (
DefaultEventSubEndpoint = "wss://eventsub.wss.twitch.tv/ws"
)
var (
ErrNotConnected = errors.New("not connected")
)
type Transport struct {
ctx context.Context
cancel context.CancelFunc
opts *TransportOptions
sessionId string
conn *websocket.Conn
}
func New(parentCtx context.Context, opts *TransportOptions) *Transport {
ctx, cancel := context.WithCancel(parentCtx)
if opts == nil {
opts = &TransportOptions{}
}
return &Transport{
ctx: ctx,
cancel: cancel,
opts: opts,
}
}
// ApiTransport implements eventsub.Transport.
func (t *Transport) ApiTransport() *eventsub.Transport {
return &eventsub.Transport{
Method: "websocket",
SessionID: &t.sessionId,
}
}
func (t *Transport) Close() error {
t.sessionId = ""
if t.conn != nil {
if err := t.conn.Close(websocket.StatusNormalClosure, ""); err != nil {
return err
}
}
t.cancel()
return nil
}
func (t *Transport) readLoop() chan *messages.Message {
c := make(chan *messages.Message)
go func(c chan *messages.Message) {
for {
_, raw, err := t.conn.Read(t.ctx)
if err != nil {
return
}
var msg *messages.Message
if err := json.Unmarshal(raw, &msg); err != nil {
return
}
c <- msg
}
}(c)
return c
}
func (t *Transport) Run() error {
addr := DefaultEventSubEndpoint
if t.opts.Address != nil {
addr = *t.opts.Address
}
var err error
t.conn, _, err = websocket.Dial(t.ctx, addr, nil)
if err != nil {
return err
}
defer t.conn.CloseNow()
msgC := t.readLoop()
for {
select {
case msg := <-msgC:
if err := t.handleMsg(msg); err != nil {
return err
}
case <-t.ctx.Done():
return t.Close()
}
}
}
func (t *Transport) Start() {
go func() {
if err := t.Run(); err != nil {
panic(err)
}
}()
}