Add conduit abstraction

This commit is contained in:
Evan Fiordeliso 2024-03-05 12:33:34 -05:00
parent 9279a1ac3c
commit 39f46618ab
4 changed files with 154 additions and 23 deletions

View File

@ -3,11 +3,16 @@ package conduit
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "io"
"net/http" "net/http"
"net/url" "net/url"
) )
type CreateConduitsRequest struct {
// The number of shards to create for this conduit.
ShardCount int `json:"shard_count"`
}
type CreateConduitsResponse struct { type CreateConduitsResponse struct {
Data []ConduitData `json:"data"` Data []ConduitData `json:"data"`
} }
@ -15,10 +20,20 @@ type CreateConduitsResponse struct {
// Creates a new conduit. // Creates a new conduit.
// //
// Requires an app access token. // Requires an app access token.
func (c *Conduit) CreateConduits(ctx context.Context, shareCount int) (*CreateConduitsResponse, error) { func (c *Conduit) CreateConduits(ctx context.Context, body *CreateConduitsRequest) (*CreateConduitsResponse, error) {
endpoint := c.baseUrl.ResolveReference(&url.URL{Path: "eventsub/conduits", RawQuery: "share_count=" + fmt.Sprint(shareCount)}) endpoint := c.baseUrl.ResolveReference(&url.URL{Path: "eventsub/conduits"})
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint.String(), nil) r, w := io.Pipe()
go func() {
if err := json.NewEncoder(w).Encode(body); err != nil {
w.CloseWithError(err)
} else {
w.Close()
}
}()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint.String(), r)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -1,6 +1,6 @@
package conduit package conduit
import "time" import "go.fifitido.net/twitch/api/eventsub"
type ConduitData struct { type ConduitData struct {
// Conduit ID. // Conduit ID.
@ -18,7 +18,7 @@ type Shard struct {
Status Status `json:"status"` Status Status `json:"status"`
// The transport details used to send the notifications. // The transport details used to send the notifications.
Transport *Transport `json:"transport"` Transport *eventsub.Transport `json:"transport"`
} }
type Status string type Status string
@ -58,19 +58,3 @@ const (
// websocket_network_error — The Twitch WebSocket server experienced a network error writing the message to the client. // websocket_network_error — The Twitch WebSocket server experienced a network error writing the message to the client.
WebsocketnetworkError Status = "websocket_network_error" WebsocketnetworkError Status = "websocket_network_error"
) )
type Transport struct {
// The transport method. Possible values are:
//
// webhook, websocket
Method string `json:"method"`
// The callback URL where the notifications are sent. Included only if method is set to webhook.
Callback *string `json:"callback,omitempty"`
// The UTC date and time that the WebSocket connection was established. Included only if method is set to websocket.
ConnectedAt *time.Time `json:"connected_at,omitempty"`
// The UTC date and time that the WebSocket connection was lost. Included only if method is set to websocket.
DisconnectedAt *time.Time `json:"disconnected_at,omitempty"`
}

View File

@ -6,6 +6,8 @@ import (
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
"go.fifitido.net/twitch/api/eventsub"
) )
type UpdateConduitShardsRequest struct { type UpdateConduitShardsRequest struct {
@ -21,7 +23,9 @@ type ShardUpdate struct {
ID string `json:"id"` ID string `json:"id"`
// The transport details that you want Twitch to use when sending you notifications. // The transport details that you want Twitch to use when sending you notifications.
Transport *Transport `json:"transport"` //
// Does not support conduit transport type.
Transport *eventsub.Transport `json:"transport"`
} }
type UpdateConduitShardsResponse struct { type UpdateConduitShardsResponse struct {

View File

@ -1 +1,129 @@
package conduit package conduit
import (
"context"
"fmt"
"sync"
"go.fifitido.net/twitch/api"
"go.fifitido.net/twitch/api/conduit"
eventsubapi "go.fifitido.net/twitch/api/eventsub"
"go.fifitido.net/twitch/eventsub"
)
type Conduit struct {
api *api.API
id string
transports []eventsub.Transport
mu sync.Mutex
}
func New(transports ...eventsub.Transport) *Conduit {
return &Conduit{
transports: transports,
}
}
func (c *Conduit) AddTransport(transport eventsub.Transport) {
c.transports = append(c.transports, transport)
}
// ApiTransport implements eventsub.Transport
func (c *Conduit) ApiTransport() *eventsubapi.Transport {
return &eventsubapi.Transport{
Method: "conduit",
ConduitID: &c.id,
}
}
// Run implements eventsub.Transport.
func (c *Conduit) Run() error {
var wg sync.WaitGroup
for _, transport := range c.transports {
wg.Add(1)
go func(transport eventsub.Transport) {
defer wg.Done()
if err := transport.Run(); err != nil {
panic(err)
}
}(transport)
}
wg.Wait()
return nil
}
// Start implements eventsub.Transport.
func (c *Conduit) Start() {
go func() {
if err := c.Run(); err != nil {
panic(err)
}
}()
}
// Close implements eventsub.Transport.
func (c *Conduit) Close() error {
for _, transport := range c.transports {
if err := transport.Close(); err != nil {
return err
}
}
return nil
}
func (c *Conduit) Register(ctx context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
createRes, err := c.api.Conduit.CreateConduits(context.Background(), &conduit.CreateConduitsRequest{
ShardCount: 1,
})
if err != nil {
return err
}
c.id = createRes.Data[0].ID
updateReq := &conduit.UpdateConduitShardsRequest{
ConduitID: c.id,
Shards: make([]conduit.ShardUpdate, len(c.transports)),
}
for i, transport := range c.transports {
updateReq.Shards[i] = conduit.ShardUpdate{
ID: fmt.Sprint(i),
Transport: transport.ApiTransport(),
}
}
updateRes, err := c.api.Conduit.UpdateConduitShards(ctx, updateReq)
if err != nil {
return err
}
if len(updateRes.Errors) > 0 {
return fmt.Errorf("failed to update shards: %v", updateRes.Errors)
}
return nil
}
func (c *Conduit) Unregister(ctx context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.id == "" {
return nil
}
if err := c.api.Conduit.DeleteConduit(ctx, c.id); err != nil {
return err
}
c.id = ""
return nil
}