go-twitch/eventsub/conduit/conduit.go

130 lines
2.4 KiB
Go
Raw Normal View History

2024-03-05 12:14:18 -05:00
package conduit
2024-03-05 12:33:34 -05:00
import (
"context"
"fmt"
"sync"
"go.fifitido.net/twitch/api"
"go.fifitido.net/twitch/api/conduit"
eventsubapi "go.fifitido.net/twitch/api/eventsub"
"go.fifitido.net/twitch/eventsub"
)
type Conduit struct {
api *api.API
id string
transports []eventsub.Transport
mu sync.Mutex
}
func New(transports ...eventsub.Transport) *Conduit {
return &Conduit{
transports: transports,
}
}
func (c *Conduit) AddTransport(transport eventsub.Transport) {
c.transports = append(c.transports, transport)
}
// ApiTransport implements eventsub.Transport
func (c *Conduit) ApiTransport() *eventsubapi.Transport {
return &eventsubapi.Transport{
Method: "conduit",
ConduitID: &c.id,
}
}
// Run implements eventsub.Transport.
func (c *Conduit) Run() error {
var wg sync.WaitGroup
for _, transport := range c.transports {
wg.Add(1)
go func(transport eventsub.Transport) {
defer wg.Done()
if err := transport.Run(); err != nil {
panic(err)
}
}(transport)
}
wg.Wait()
return nil
}
// Start implements eventsub.Transport.
func (c *Conduit) Start() {
go func() {
if err := c.Run(); err != nil {
panic(err)
}
}()
}
// Close implements eventsub.Transport.
func (c *Conduit) Close() error {
for _, transport := range c.transports {
if err := transport.Close(); err != nil {
return err
}
}
return nil
}
func (c *Conduit) Register(ctx context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
createRes, err := c.api.Conduit.CreateConduits(context.Background(), &conduit.CreateConduitsRequest{
ShardCount: 1,
})
if err != nil {
return err
}
c.id = createRes.Data[0].ID
updateReq := &conduit.UpdateConduitShardsRequest{
ConduitID: c.id,
Shards: make([]conduit.ShardUpdate, len(c.transports)),
}
for i, transport := range c.transports {
updateReq.Shards[i] = conduit.ShardUpdate{
ID: fmt.Sprint(i),
Transport: transport.ApiTransport(),
}
}
updateRes, err := c.api.Conduit.UpdateConduitShards(ctx, updateReq)
if err != nil {
return err
}
if len(updateRes.Errors) > 0 {
return fmt.Errorf("failed to update shards: %v", updateRes.Errors)
}
return nil
}
func (c *Conduit) Unregister(ctx context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.id == "" {
return nil
}
if err := c.api.Conduit.DeleteConduit(ctx, c.id); err != nil {
return err
}
c.id = ""
return nil
}