package conduit import ( "context" "fmt" "sync" "go.fifitido.net/twitch/api" "go.fifitido.net/twitch/api/conduit" eventsubapi "go.fifitido.net/twitch/api/eventsub" "go.fifitido.net/twitch/eventsub" ) type Conduit struct { api *api.API id string transports []eventsub.Transport mu sync.Mutex } func New(transports ...eventsub.Transport) *Conduit { return &Conduit{ transports: transports, } } func (c *Conduit) AddTransport(transport eventsub.Transport) { c.transports = append(c.transports, transport) } // ApiTransport implements eventsub.Transport func (c *Conduit) ApiTransport() *eventsubapi.Transport { return &eventsubapi.Transport{ Method: "conduit", ConduitID: &c.id, } } // Run implements eventsub.Transport. func (c *Conduit) Run() error { var wg sync.WaitGroup for _, transport := range c.transports { wg.Add(1) go func(transport eventsub.Transport) { defer wg.Done() if err := transport.Run(); err != nil { panic(err) } }(transport) } wg.Wait() return nil } // Start implements eventsub.Transport. func (c *Conduit) Start() { go func() { if err := c.Run(); err != nil { panic(err) } }() } // Close implements eventsub.Transport. func (c *Conduit) Close() error { for _, transport := range c.transports { if err := transport.Close(); err != nil { return err } } return nil } func (c *Conduit) Register(ctx context.Context) error { c.mu.Lock() defer c.mu.Unlock() createRes, err := c.api.Conduit.CreateConduits(context.Background(), &conduit.CreateConduitsRequest{ ShardCount: 1, }) if err != nil { return err } c.id = createRes.Data[0].ID updateReq := &conduit.UpdateConduitShardsRequest{ ConduitID: c.id, Shards: make([]conduit.ShardUpdate, len(c.transports)), } for i, transport := range c.transports { updateReq.Shards[i] = conduit.ShardUpdate{ ID: fmt.Sprint(i), Transport: transport.ApiTransport(), } } updateRes, err := c.api.Conduit.UpdateConduitShards(ctx, updateReq) if err != nil { return err } if len(updateRes.Errors) > 0 { return fmt.Errorf("failed to update shards: %v", updateRes.Errors) } return nil } func (c *Conduit) Unregister(ctx context.Context) error { c.mu.Lock() defer c.mu.Unlock() if c.id == "" { return nil } if err := c.api.Conduit.DeleteConduit(ctx, c.id); err != nil { return err } c.id = "" return nil }