From 39f46618ab3fea007dcac3399eae5f521de33db8 Mon Sep 17 00:00:00 2001 From: Evan Fiordeliso Date: Tue, 5 Mar 2024 12:33:34 -0500 Subject: [PATCH] Add conduit abstraction --- api/conduit/create_conduits.go | 23 ++++- api/conduit/models.go | 20 +---- api/conduit/update_conduit_shards.go | 6 +- eventsub/conduit/conduit.go | 128 +++++++++++++++++++++++++++ 4 files changed, 154 insertions(+), 23 deletions(-) diff --git a/api/conduit/create_conduits.go b/api/conduit/create_conduits.go index aced715..0e4ac07 100644 --- a/api/conduit/create_conduits.go +++ b/api/conduit/create_conduits.go @@ -3,11 +3,16 @@ package conduit import ( "context" "encoding/json" - "fmt" + "io" "net/http" "net/url" ) +type CreateConduitsRequest struct { + // The number of shards to create for this conduit. + ShardCount int `json:"shard_count"` +} + type CreateConduitsResponse struct { Data []ConduitData `json:"data"` } @@ -15,10 +20,20 @@ type CreateConduitsResponse struct { // Creates a new conduit. // // Requires an app access token. -func (c *Conduit) CreateConduits(ctx context.Context, shareCount int) (*CreateConduitsResponse, error) { - endpoint := c.baseUrl.ResolveReference(&url.URL{Path: "eventsub/conduits", RawQuery: "share_count=" + fmt.Sprint(shareCount)}) +func (c *Conduit) CreateConduits(ctx context.Context, body *CreateConduitsRequest) (*CreateConduitsResponse, error) { + endpoint := c.baseUrl.ResolveReference(&url.URL{Path: "eventsub/conduits"}) - req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint.String(), nil) + r, w := io.Pipe() + + go func() { + if err := json.NewEncoder(w).Encode(body); err != nil { + w.CloseWithError(err) + } else { + w.Close() + } + }() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint.String(), r) if err != nil { return nil, err } diff --git a/api/conduit/models.go b/api/conduit/models.go index d4e2ec2..3f6ff7a 100644 --- a/api/conduit/models.go +++ b/api/conduit/models.go @@ -1,6 +1,6 @@ package conduit -import "time" +import "go.fifitido.net/twitch/api/eventsub" type ConduitData struct { // Conduit ID. @@ -18,7 +18,7 @@ type Shard struct { Status Status `json:"status"` // The transport details used to send the notifications. - Transport *Transport `json:"transport"` + Transport *eventsub.Transport `json:"transport"` } type Status string @@ -58,19 +58,3 @@ const ( // websocket_network_error — The Twitch WebSocket server experienced a network error writing the message to the client. WebsocketnetworkError Status = "websocket_network_error" ) - -type Transport struct { - // The transport method. Possible values are: - // - // webhook, websocket - Method string `json:"method"` - - // The callback URL where the notifications are sent. Included only if method is set to webhook. - Callback *string `json:"callback,omitempty"` - - // The UTC date and time that the WebSocket connection was established. Included only if method is set to websocket. - ConnectedAt *time.Time `json:"connected_at,omitempty"` - - // The UTC date and time that the WebSocket connection was lost. Included only if method is set to websocket. - DisconnectedAt *time.Time `json:"disconnected_at,omitempty"` -} diff --git a/api/conduit/update_conduit_shards.go b/api/conduit/update_conduit_shards.go index 1efd40b..5edb4d0 100644 --- a/api/conduit/update_conduit_shards.go +++ b/api/conduit/update_conduit_shards.go @@ -6,6 +6,8 @@ import ( "io" "net/http" "net/url" + + "go.fifitido.net/twitch/api/eventsub" ) type UpdateConduitShardsRequest struct { @@ -21,7 +23,9 @@ type ShardUpdate struct { ID string `json:"id"` // The transport details that you want Twitch to use when sending you notifications. - Transport *Transport `json:"transport"` + // + // Does not support conduit transport type. + Transport *eventsub.Transport `json:"transport"` } type UpdateConduitShardsResponse struct { diff --git a/eventsub/conduit/conduit.go b/eventsub/conduit/conduit.go index c4dde5c..fc2f4dd 100644 --- a/eventsub/conduit/conduit.go +++ b/eventsub/conduit/conduit.go @@ -1 +1,129 @@ package conduit + +import ( + "context" + "fmt" + "sync" + + "go.fifitido.net/twitch/api" + "go.fifitido.net/twitch/api/conduit" + eventsubapi "go.fifitido.net/twitch/api/eventsub" + "go.fifitido.net/twitch/eventsub" +) + +type Conduit struct { + api *api.API + + id string + transports []eventsub.Transport + + mu sync.Mutex +} + +func New(transports ...eventsub.Transport) *Conduit { + return &Conduit{ + transports: transports, + } +} + +func (c *Conduit) AddTransport(transport eventsub.Transport) { + c.transports = append(c.transports, transport) +} + +// ApiTransport implements eventsub.Transport +func (c *Conduit) ApiTransport() *eventsubapi.Transport { + return &eventsubapi.Transport{ + Method: "conduit", + ConduitID: &c.id, + } +} + +// Run implements eventsub.Transport. +func (c *Conduit) Run() error { + var wg sync.WaitGroup + + for _, transport := range c.transports { + wg.Add(1) + go func(transport eventsub.Transport) { + defer wg.Done() + if err := transport.Run(); err != nil { + panic(err) + } + }(transport) + } + + wg.Wait() + return nil +} + +// Start implements eventsub.Transport. +func (c *Conduit) Start() { + go func() { + if err := c.Run(); err != nil { + panic(err) + } + }() +} + +// Close implements eventsub.Transport. +func (c *Conduit) Close() error { + for _, transport := range c.transports { + if err := transport.Close(); err != nil { + return err + } + } + return nil +} + +func (c *Conduit) Register(ctx context.Context) error { + c.mu.Lock() + defer c.mu.Unlock() + + createRes, err := c.api.Conduit.CreateConduits(context.Background(), &conduit.CreateConduitsRequest{ + ShardCount: 1, + }) + + if err != nil { + return err + } + + c.id = createRes.Data[0].ID + updateReq := &conduit.UpdateConduitShardsRequest{ + ConduitID: c.id, + Shards: make([]conduit.ShardUpdate, len(c.transports)), + } + + for i, transport := range c.transports { + updateReq.Shards[i] = conduit.ShardUpdate{ + ID: fmt.Sprint(i), + Transport: transport.ApiTransport(), + } + } + + updateRes, err := c.api.Conduit.UpdateConduitShards(ctx, updateReq) + if err != nil { + return err + } + + if len(updateRes.Errors) > 0 { + return fmt.Errorf("failed to update shards: %v", updateRes.Errors) + } + + return nil +} + +func (c *Conduit) Unregister(ctx context.Context) error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.id == "" { + return nil + } + + if err := c.api.Conduit.DeleteConduit(ctx, c.id); err != nil { + return err + } + + c.id = "" + return nil +}