120 lines
1.9 KiB
Go
120 lines
1.9 KiB
Go
package websocket
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
|
|
"go.fifitido.net/twitch/api/eventsub"
|
|
"go.fifitido.net/twitch/eventsub/websocket/messages"
|
|
"nhooyr.io/websocket"
|
|
)
|
|
|
|
const (
|
|
DefaultEventSubEndpoint = "wss://eventsub.wss.twitch.tv/ws"
|
|
)
|
|
|
|
var (
|
|
ErrNotConnected = errors.New("not connected")
|
|
)
|
|
|
|
type Transport struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
opts *TransportOptions
|
|
|
|
sessionId string
|
|
conn *websocket.Conn
|
|
}
|
|
|
|
func New(parentCtx context.Context, opts *TransportOptions) *Transport {
|
|
ctx, cancel := context.WithCancel(parentCtx)
|
|
|
|
if opts == nil {
|
|
opts = &TransportOptions{}
|
|
}
|
|
|
|
return &Transport{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
opts: opts,
|
|
}
|
|
}
|
|
|
|
// ApiTransport implements eventsub.Transport.
|
|
func (t *Transport) ApiTransport() *eventsub.Transport {
|
|
return &eventsub.Transport{
|
|
Method: "websocket",
|
|
SessionID: &t.sessionId,
|
|
}
|
|
}
|
|
|
|
func (t *Transport) Close() error {
|
|
t.sessionId = ""
|
|
|
|
if t.conn != nil {
|
|
if err := t.conn.Close(websocket.StatusNormalClosure, ""); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
t.cancel()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *Transport) readLoop() chan *messages.Message {
|
|
c := make(chan *messages.Message)
|
|
go func(c chan *messages.Message) {
|
|
for {
|
|
_, raw, err := t.conn.Read(t.ctx)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
var msg *messages.Message
|
|
if err := json.Unmarshal(raw, &msg); err != nil {
|
|
return
|
|
}
|
|
|
|
c <- msg
|
|
}
|
|
}(c)
|
|
return c
|
|
}
|
|
|
|
func (t *Transport) Run() error {
|
|
addr := DefaultEventSubEndpoint
|
|
if t.opts.Address != nil {
|
|
addr = *t.opts.Address
|
|
}
|
|
|
|
var err error
|
|
t.conn, _, err = websocket.Dial(t.ctx, addr, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer t.conn.CloseNow()
|
|
|
|
msgC := t.readLoop()
|
|
|
|
for {
|
|
select {
|
|
case msg := <-msgC:
|
|
if err := t.handleMsg(msg); err != nil {
|
|
return err
|
|
}
|
|
case <-t.ctx.Done():
|
|
return t.Close()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *Transport) Start() {
|
|
go func() {
|
|
if err := t.Run(); err != nil {
|
|
panic(err)
|
|
}
|
|
}()
|
|
}
|