diff --git a/apiserver/controllers/controllers.go b/apiserver/controllers/controllers.go index 25e429f7..ec918ce2 100644 --- a/apiserver/controllers/controllers.go +++ b/apiserver/controllers/controllers.go @@ -25,20 +25,29 @@ import ( gErrors "garm/errors" runnerParams "garm/params" "garm/runner" + wsWriter "garm/websocket" + "github.com/gorilla/websocket" "github.com/pkg/errors" ) -func NewAPIController(r *runner.Runner, auth *auth.Authenticator) (*APIController, error) { +func NewAPIController(r *runner.Runner, auth *auth.Authenticator, hub *wsWriter.Hub) (*APIController, error) { return &APIController{ r: r, auth: auth, + hub: hub, + upgrader: websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 16384, + }, }, nil } type APIController struct { - r *runner.Runner - auth *auth.Authenticator + r *runner.Runner + auth *auth.Authenticator + hub *wsWriter.Hub + upgrader websocket.Upgrader } func handleError(w http.ResponseWriter, err error) { @@ -124,6 +133,42 @@ func (a *APIController) CatchAll(w http.ResponseWriter, r *http.Request) { } } +func (a *APIController) WSHandler(writer http.ResponseWriter, req *http.Request) { + ctx := req.Context() + if !auth.IsAdmin(ctx) { + writer.WriteHeader(http.StatusForbidden) + writer.Write([]byte("you need admin level access to view logs")) + return + } + + if a.hub == nil { + handleError(writer, gErrors.NewBadRequestError("log streamer is disabled")) + return + } + + conn, err := a.upgrader.Upgrade(writer, req, nil) + if err != nil { + log.Printf("error upgrading to websockets: %v", err) + return + } + + // TODO (gsamfira): Handle ExpiresAt. Right now, if a client uses + // a valid token to authenticate, and keeps the websocket connection + // open, it will allow that client to stream logs via websockets + // until the connection is broken. We need to forcefully disconnect + // the client once the token expires. + client, err := wsWriter.NewClient(conn, a.hub) + if err != nil { + log.Printf("failed to create new client: %v", err) + return + } + if err := a.hub.Register(client); err != nil { + log.Printf("failed to register new client: %v", err) + return + } + client.Go() +} + // NotFoundHandler is returned when an invalid URL is acccessed func (a *APIController) NotFoundHandler(w http.ResponseWriter, r *http.Request) { apiErr := params.APIErrorResponse{ diff --git a/apiserver/routers/routers.go b/apiserver/routers/routers.go index f9a88e48..58b3fb6f 100644 --- a/apiserver/routers/routers.go +++ b/apiserver/routers/routers.go @@ -17,7 +17,6 @@ package routers import ( "io" "net/http" - "os" gorillaHandlers "github.com/gorilla/handlers" "github.com/gorilla/mux" @@ -40,16 +39,16 @@ func NewAPIRouter(han *controllers.APIController, logWriter io.Writer, authMiddl // FirstRunHandler firstRunRouter := apiSubRouter.PathPrefix("/first-run").Subrouter() - firstRunRouter.Handle("/", log(os.Stdout, http.HandlerFunc(han.FirstRunHandler))).Methods("POST", "OPTIONS") + firstRunRouter.Handle("/", log(logWriter, http.HandlerFunc(han.FirstRunHandler))).Methods("POST", "OPTIONS") // Instance callback callbackRouter := apiSubRouter.PathPrefix("/callbacks").Subrouter() - callbackRouter.Handle("/status/", log(os.Stdout, http.HandlerFunc(han.InstanceStatusMessageHandler))).Methods("POST", "OPTIONS") - callbackRouter.Handle("/status", log(os.Stdout, http.HandlerFunc(han.InstanceStatusMessageHandler))).Methods("POST", "OPTIONS") + callbackRouter.Handle("/status/", log(logWriter, http.HandlerFunc(han.InstanceStatusMessageHandler))).Methods("POST", "OPTIONS") + callbackRouter.Handle("/status", log(logWriter, http.HandlerFunc(han.InstanceStatusMessageHandler))).Methods("POST", "OPTIONS") callbackRouter.Use(instanceMiddleware.Middleware) // Login authRouter := apiSubRouter.PathPrefix("/auth").Subrouter() - authRouter.Handle("/{login:login\\/?}", log(os.Stdout, http.HandlerFunc(han.LoginHandler))).Methods("POST", "OPTIONS") + authRouter.Handle("/{login:login\\/?}", log(logWriter, http.HandlerFunc(han.LoginHandler))).Methods("POST", "OPTIONS") authRouter.Use(initMiddleware.Middleware) apiRouter := apiSubRouter.PathPrefix("").Subrouter() @@ -60,117 +59,119 @@ func NewAPIRouter(han *controllers.APIController, logWriter io.Writer, authMiddl // Pools // /////////// // List all pools - apiRouter.Handle("/pools/", log(os.Stdout, http.HandlerFunc(han.ListAllPoolsHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/pools", log(os.Stdout, http.HandlerFunc(han.ListAllPoolsHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/pools/", log(logWriter, http.HandlerFunc(han.ListAllPoolsHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/pools", log(logWriter, http.HandlerFunc(han.ListAllPoolsHandler))).Methods("GET", "OPTIONS") // Get one pool - apiRouter.Handle("/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.GetPoolByIDHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.GetPoolByIDHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/pools/{poolID}/", log(logWriter, http.HandlerFunc(han.GetPoolByIDHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/pools/{poolID}", log(logWriter, http.HandlerFunc(han.GetPoolByIDHandler))).Methods("GET", "OPTIONS") // Delete one pool - apiRouter.Handle("/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.DeletePoolByIDHandler))).Methods("DELETE", "OPTIONS") - apiRouter.Handle("/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.DeletePoolByIDHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/pools/{poolID}/", log(logWriter, http.HandlerFunc(han.DeletePoolByIDHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/pools/{poolID}", log(logWriter, http.HandlerFunc(han.DeletePoolByIDHandler))).Methods("DELETE", "OPTIONS") // Update one pool - apiRouter.Handle("/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.UpdatePoolByIDHandler))).Methods("PUT", "OPTIONS") - apiRouter.Handle("/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.UpdatePoolByIDHandler))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/pools/{poolID}/", log(logWriter, http.HandlerFunc(han.UpdatePoolByIDHandler))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/pools/{poolID}", log(logWriter, http.HandlerFunc(han.UpdatePoolByIDHandler))).Methods("PUT", "OPTIONS") // List pool instances - apiRouter.Handle("/pools/{poolID}/instances/", log(os.Stdout, http.HandlerFunc(han.ListPoolInstancesHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/pools/{poolID}/instances", log(os.Stdout, http.HandlerFunc(han.ListPoolInstancesHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/pools/{poolID}/instances/", log(logWriter, http.HandlerFunc(han.ListPoolInstancesHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/pools/{poolID}/instances", log(logWriter, http.HandlerFunc(han.ListPoolInstancesHandler))).Methods("GET", "OPTIONS") ///////////// // Runners // ///////////// // Get instance - apiRouter.Handle("/instances/{instanceName}/", log(os.Stdout, http.HandlerFunc(han.GetInstanceHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/instances/{instanceName}", log(os.Stdout, http.HandlerFunc(han.GetInstanceHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/instances/{instanceName}/", log(logWriter, http.HandlerFunc(han.GetInstanceHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/instances/{instanceName}", log(logWriter, http.HandlerFunc(han.GetInstanceHandler))).Methods("GET", "OPTIONS") // Delete runner - apiRouter.Handle("/instances/{instanceName}/", log(os.Stdout, http.HandlerFunc(han.DeleteInstanceHandler))).Methods("DELETE", "OPTIONS") - apiRouter.Handle("/instances/{instanceName}", log(os.Stdout, http.HandlerFunc(han.DeleteInstanceHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/instances/{instanceName}/", log(logWriter, http.HandlerFunc(han.DeleteInstanceHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/instances/{instanceName}", log(logWriter, http.HandlerFunc(han.DeleteInstanceHandler))).Methods("DELETE", "OPTIONS") // List runners - apiRouter.Handle("/instances/", log(os.Stdout, http.HandlerFunc(han.ListAllInstancesHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/instances", log(os.Stdout, http.HandlerFunc(han.ListAllInstancesHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/instances/", log(logWriter, http.HandlerFunc(han.ListAllInstancesHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/instances", log(logWriter, http.HandlerFunc(han.ListAllInstancesHandler))).Methods("GET", "OPTIONS") ///////////////////// // Repos and pools // ///////////////////// // Get pool - apiRouter.Handle("/repositories/{repoID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.GetRepoPoolHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/repositories/{repoID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.GetRepoPoolHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/pools/{poolID}/", log(logWriter, http.HandlerFunc(han.GetRepoPoolHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/pools/{poolID}", log(logWriter, http.HandlerFunc(han.GetRepoPoolHandler))).Methods("GET", "OPTIONS") // Delete pool - apiRouter.Handle("/repositories/{repoID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.DeleteRepoPoolHandler))).Methods("DELETE", "OPTIONS") - apiRouter.Handle("/repositories/{repoID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.DeleteRepoPoolHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/pools/{poolID}/", log(logWriter, http.HandlerFunc(han.DeleteRepoPoolHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/pools/{poolID}", log(logWriter, http.HandlerFunc(han.DeleteRepoPoolHandler))).Methods("DELETE", "OPTIONS") // Update pool - apiRouter.Handle("/repositories/{repoID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.UpdateRepoPoolHandler))).Methods("PUT", "OPTIONS") - apiRouter.Handle("/repositories/{repoID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.UpdateRepoPoolHandler))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/pools/{poolID}/", log(logWriter, http.HandlerFunc(han.UpdateRepoPoolHandler))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/pools/{poolID}", log(logWriter, http.HandlerFunc(han.UpdateRepoPoolHandler))).Methods("PUT", "OPTIONS") // List pools - apiRouter.Handle("/repositories/{repoID}/pools/", log(os.Stdout, http.HandlerFunc(han.ListRepoPoolsHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/repositories/{repoID}/pools", log(os.Stdout, http.HandlerFunc(han.ListRepoPoolsHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/pools/", log(logWriter, http.HandlerFunc(han.ListRepoPoolsHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/pools", log(logWriter, http.HandlerFunc(han.ListRepoPoolsHandler))).Methods("GET", "OPTIONS") // Create pool - apiRouter.Handle("/repositories/{repoID}/pools/", log(os.Stdout, http.HandlerFunc(han.CreateRepoPoolHandler))).Methods("POST", "OPTIONS") - apiRouter.Handle("/repositories/{repoID}/pools", log(os.Stdout, http.HandlerFunc(han.CreateRepoPoolHandler))).Methods("POST", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/pools/", log(logWriter, http.HandlerFunc(han.CreateRepoPoolHandler))).Methods("POST", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/pools", log(logWriter, http.HandlerFunc(han.CreateRepoPoolHandler))).Methods("POST", "OPTIONS") // Repo instances list - apiRouter.Handle("/repositories/{repoID}/instances/", log(os.Stdout, http.HandlerFunc(han.ListRepoInstancesHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/repositories/{repoID}/instances", log(os.Stdout, http.HandlerFunc(han.ListRepoInstancesHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/instances/", log(logWriter, http.HandlerFunc(han.ListRepoInstancesHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/instances", log(logWriter, http.HandlerFunc(han.ListRepoInstancesHandler))).Methods("GET", "OPTIONS") // Get repo - apiRouter.Handle("/repositories/{repoID}/", log(os.Stdout, http.HandlerFunc(han.GetRepoByIDHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/repositories/{repoID}", log(os.Stdout, http.HandlerFunc(han.GetRepoByIDHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/", log(logWriter, http.HandlerFunc(han.GetRepoByIDHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}", log(logWriter, http.HandlerFunc(han.GetRepoByIDHandler))).Methods("GET", "OPTIONS") // Update repo - apiRouter.Handle("/repositories/{repoID}/", log(os.Stdout, http.HandlerFunc(han.UpdateRepoHandler))).Methods("PUT", "OPTIONS") - apiRouter.Handle("/repositories/{repoID}", log(os.Stdout, http.HandlerFunc(han.UpdateRepoHandler))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/", log(logWriter, http.HandlerFunc(han.UpdateRepoHandler))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}", log(logWriter, http.HandlerFunc(han.UpdateRepoHandler))).Methods("PUT", "OPTIONS") // Delete repo - apiRouter.Handle("/repositories/{repoID}/", log(os.Stdout, http.HandlerFunc(han.DeleteRepoHandler))).Methods("DELETE", "OPTIONS") - apiRouter.Handle("/repositories/{repoID}", log(os.Stdout, http.HandlerFunc(han.DeleteRepoHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}/", log(logWriter, http.HandlerFunc(han.DeleteRepoHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/repositories/{repoID}", log(logWriter, http.HandlerFunc(han.DeleteRepoHandler))).Methods("DELETE", "OPTIONS") // List repos - apiRouter.Handle("/repositories/", log(os.Stdout, http.HandlerFunc(han.ListReposHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/repositories", log(os.Stdout, http.HandlerFunc(han.ListReposHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/repositories/", log(logWriter, http.HandlerFunc(han.ListReposHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/repositories", log(logWriter, http.HandlerFunc(han.ListReposHandler))).Methods("GET", "OPTIONS") // Create repo - apiRouter.Handle("/repositories/", log(os.Stdout, http.HandlerFunc(han.CreateRepoHandler))).Methods("POST", "OPTIONS") - apiRouter.Handle("/repositories", log(os.Stdout, http.HandlerFunc(han.CreateRepoHandler))).Methods("POST", "OPTIONS") + apiRouter.Handle("/repositories/", log(logWriter, http.HandlerFunc(han.CreateRepoHandler))).Methods("POST", "OPTIONS") + apiRouter.Handle("/repositories", log(logWriter, http.HandlerFunc(han.CreateRepoHandler))).Methods("POST", "OPTIONS") ///////////////////////////// // Organizations and pools // ///////////////////////////// // Get pool - apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.GetOrgPoolHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.GetOrgPoolHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(logWriter, http.HandlerFunc(han.GetOrgPoolHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(logWriter, http.HandlerFunc(han.GetOrgPoolHandler))).Methods("GET", "OPTIONS") // Delete pool - apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.DeleteOrgPoolHandler))).Methods("DELETE", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.DeleteOrgPoolHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(logWriter, http.HandlerFunc(han.DeleteOrgPoolHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(logWriter, http.HandlerFunc(han.DeleteOrgPoolHandler))).Methods("DELETE", "OPTIONS") // Update pool - apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(os.Stdout, http.HandlerFunc(han.UpdateOrgPoolHandler))).Methods("PUT", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(os.Stdout, http.HandlerFunc(han.UpdateOrgPoolHandler))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/{poolID}/", log(logWriter, http.HandlerFunc(han.UpdateOrgPoolHandler))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/{poolID}", log(logWriter, http.HandlerFunc(han.UpdateOrgPoolHandler))).Methods("PUT", "OPTIONS") // List pools - apiRouter.Handle("/organizations/{orgID}/pools/", log(os.Stdout, http.HandlerFunc(han.ListOrgPoolsHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}/pools", log(os.Stdout, http.HandlerFunc(han.ListOrgPoolsHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/", log(logWriter, http.HandlerFunc(han.ListOrgPoolsHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools", log(logWriter, http.HandlerFunc(han.ListOrgPoolsHandler))).Methods("GET", "OPTIONS") // Create pool - apiRouter.Handle("/organizations/{orgID}/pools/", log(os.Stdout, http.HandlerFunc(han.CreateOrgPoolHandler))).Methods("POST", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}/pools", log(os.Stdout, http.HandlerFunc(han.CreateOrgPoolHandler))).Methods("POST", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools/", log(logWriter, http.HandlerFunc(han.CreateOrgPoolHandler))).Methods("POST", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/pools", log(logWriter, http.HandlerFunc(han.CreateOrgPoolHandler))).Methods("POST", "OPTIONS") // Repo instances list - apiRouter.Handle("/organizations/{orgID}/instances/", log(os.Stdout, http.HandlerFunc(han.ListOrgInstancesHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}/instances", log(os.Stdout, http.HandlerFunc(han.ListOrgInstancesHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/instances/", log(logWriter, http.HandlerFunc(han.ListOrgInstancesHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/instances", log(logWriter, http.HandlerFunc(han.ListOrgInstancesHandler))).Methods("GET", "OPTIONS") // Get org - apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.GetOrgByIDHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.GetOrgByIDHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/", log(logWriter, http.HandlerFunc(han.GetOrgByIDHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}", log(logWriter, http.HandlerFunc(han.GetOrgByIDHandler))).Methods("GET", "OPTIONS") // Update org - apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.UpdateOrgHandler))).Methods("PUT", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.UpdateOrgHandler))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/", log(logWriter, http.HandlerFunc(han.UpdateOrgHandler))).Methods("PUT", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}", log(logWriter, http.HandlerFunc(han.UpdateOrgHandler))).Methods("PUT", "OPTIONS") // Delete org - apiRouter.Handle("/organizations/{orgID}/", log(os.Stdout, http.HandlerFunc(han.DeleteOrgHandler))).Methods("DELETE", "OPTIONS") - apiRouter.Handle("/organizations/{orgID}", log(os.Stdout, http.HandlerFunc(han.DeleteOrgHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}/", log(logWriter, http.HandlerFunc(han.DeleteOrgHandler))).Methods("DELETE", "OPTIONS") + apiRouter.Handle("/organizations/{orgID}", log(logWriter, http.HandlerFunc(han.DeleteOrgHandler))).Methods("DELETE", "OPTIONS") // List orgs - apiRouter.Handle("/organizations/", log(os.Stdout, http.HandlerFunc(han.ListOrgsHandler))).Methods("GET", "OPTIONS") - apiRouter.Handle("/organizations", log(os.Stdout, http.HandlerFunc(han.ListOrgsHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations/", log(logWriter, http.HandlerFunc(han.ListOrgsHandler))).Methods("GET", "OPTIONS") + apiRouter.Handle("/organizations", log(logWriter, http.HandlerFunc(han.ListOrgsHandler))).Methods("GET", "OPTIONS") // Create org - apiRouter.Handle("/organizations/", log(os.Stdout, http.HandlerFunc(han.CreateOrgHandler))).Methods("POST", "OPTIONS") - apiRouter.Handle("/organizations", log(os.Stdout, http.HandlerFunc(han.CreateOrgHandler))).Methods("POST", "OPTIONS") + apiRouter.Handle("/organizations/", log(logWriter, http.HandlerFunc(han.CreateOrgHandler))).Methods("POST", "OPTIONS") + apiRouter.Handle("/organizations", log(logWriter, http.HandlerFunc(han.CreateOrgHandler))).Methods("POST", "OPTIONS") // Credentials and providers - apiRouter.Handle("/credentials/", log(os.Stdout, http.HandlerFunc(han.ListCredentials))).Methods("GET", "OPTIONS") - apiRouter.Handle("/credentials", log(os.Stdout, http.HandlerFunc(han.ListCredentials))).Methods("GET", "OPTIONS") - apiRouter.Handle("/providers/", log(os.Stdout, http.HandlerFunc(han.ListProviders))).Methods("GET", "OPTIONS") - apiRouter.Handle("/providers", log(os.Stdout, http.HandlerFunc(han.ListProviders))).Methods("GET", "OPTIONS") + apiRouter.Handle("/credentials/", log(logWriter, http.HandlerFunc(han.ListCredentials))).Methods("GET", "OPTIONS") + apiRouter.Handle("/credentials", log(logWriter, http.HandlerFunc(han.ListCredentials))).Methods("GET", "OPTIONS") + apiRouter.Handle("/providers/", log(logWriter, http.HandlerFunc(han.ListProviders))).Methods("GET", "OPTIONS") + apiRouter.Handle("/providers", log(logWriter, http.HandlerFunc(han.ListProviders))).Methods("GET", "OPTIONS") + // Websocket log writer + apiRouter.Handle("/{ws:ws\\/?}", log(logWriter, http.HandlerFunc(han.WSHandler))).Methods("GET") return router } diff --git a/cmd/garm-cli/cmd/log.go b/cmd/garm-cli/cmd/log.go new file mode 100644 index 00000000..51730275 --- /dev/null +++ b/cmd/garm-cli/cmd/log.go @@ -0,0 +1,91 @@ +package cmd + +import ( + "fmt" + "log" + "net/http" + "net/url" + "os" + "os/signal" + "time" + + "github.com/gorilla/websocket" + "github.com/spf13/cobra" +) + +var logCmd = &cobra.Command{ + Use: "debug-log", + SilenceUsage: true, + Short: "Stream garm log", + Long: `Stream all garm logging to the terminal.`, + RunE: func(cmd *cobra.Command, args []string) error { + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + parsedURL, err := url.Parse(mgr.BaseURL) + if err != nil { + return err + } + + wsScheme := "ws" + if parsedURL.Scheme == "https" { + wsScheme = "wss" + } + u := url.URL{Scheme: wsScheme, Host: parsedURL.Host, Path: "/api/v1/ws"} + log.Printf("connecting to %s", u.String()) + + header := http.Header{} + header.Add("Authorization", fmt.Sprintf("Bearer %s", mgr.Token)) + + c, _, err := websocket.DefaultDialer.Dial(u.String(), header) + if err != nil { + log.Fatal("dial:", err) + } + defer c.Close() + + done := make(chan struct{}) + + go func() { + defer close(done) + for { + _, message, err := c.ReadMessage() + if err != nil { + log.Print("read:", err) + return + } + log.Print(message) + } + }() + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for { + select { + case <-done: + return nil + case t := <-ticker.C: + err := c.WriteMessage(websocket.TextMessage, []byte(t.String())) + if err != nil { + return err + } + case <-interrupt: + // Cleanly close the connection by sending a close message and then + // waiting (with timeout) for the server to close the connection. + err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + return err + } + select { + case <-done: + case <-time.After(time.Second): + } + return nil + } + } + }, +} + +func init() { + rootCmd.AddCommand(logCmd) +} diff --git a/cmd/garm/main.go b/cmd/garm/main.go index 6fd06b6b..f4738193 100644 --- a/cmd/garm/main.go +++ b/cmd/garm/main.go @@ -18,6 +18,7 @@ import ( "context" "flag" "fmt" + "io" "log" "net" "net/http" @@ -31,6 +32,7 @@ import ( "garm/database/common" "garm/runner" "garm/util" + "garm/websocket" "github.com/gorilla/handlers" "github.com/gorilla/mux" @@ -75,7 +77,19 @@ func main() { if err != nil { log.Fatalf("fetching log writer: %+v", err) } - log.SetOutput(logWriter) + + var writers []io.Writer = []io.Writer{ + logWriter, + } + var hub *websocket.Hub + if cfg.Default.EnableLogStreamer { + hub = websocket.NewHub(ctx) + hub.Start() + defer hub.Stop() + writers = append(writers, hub) + } + + log.SetOutput(io.MultiWriter(writers...)) db, err := database.NewDatabase(ctx, cfg.Database) if err != nil { @@ -98,7 +112,7 @@ func main() { } authenticator := auth.NewAuthenticator(cfg.JWTAuth, db) - controller, err := controllers.NewAPIController(runner, authenticator) + controller, err := controllers.NewAPIController(runner, authenticator, hub) if err != nil { log.Fatalf("failed to create controller: %+v", err) } diff --git a/config/config.go b/config/config.go index 49bc4080..dc72937f 100644 --- a/config/config.go +++ b/config/config.go @@ -73,7 +73,7 @@ var ( // DefaultConfigDir is the default path on disk to the config dir. The config // file will probably be in the same folder, but it is not mandatory. DefaultConfigDir = "/etc/garm" - + // DefaultUserGroups are the groups the default user will be part of. DefaultUserGroups = []string{ "sudo", "adm", "cdrom", "dialout", @@ -167,7 +167,8 @@ type Default struct { ConfigDir string `toml:"config_dir,omitempty" json:"config-dir,omitempty"` CallbackURL string `toml:"callback_url" json:"callback-url"` // LogFile is the location of the log file. - LogFile string `toml:"log_file,omitempty" json:"log-file"` + LogFile string `toml:"log_file,omitempty" json:"log-file"` + EnableLogStreamer bool `toml:"enable_log_streamer"` } func (d *Default) Validate() error { diff --git a/go.mod b/go.mod index 006c34e9..316f65d9 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/google/uuid v1.3.0 github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.8.0 + github.com/gorilla/websocket v1.5.0 github.com/jedib0t/go-pretty/v6 v6.3.1 github.com/juju/clock v0.0.0-20220704231616-a2b96c8eeb27 github.com/juju/retry v0.0.0-20220204093819-62423bf33287 @@ -23,12 +24,12 @@ require ( golang.org/x/crypto v0.0.0-20220321153916-2c7772ba3064 golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f + gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 gopkg.in/yaml.v3 v3.0.1 gorm.io/driver/mysql v1.3.3 gorm.io/driver/sqlite v1.3.2 gorm.io/gorm v1.23.4 - gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 ) require ( @@ -40,7 +41,6 @@ require ( github.com/go-sql-driver/mysql v1.6.0 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/google/go-querystring v1.1.0 // indirect - github.com/gorilla/websocket v1.5.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect diff --git a/runner/runner.go b/runner/runner.go index 8b681aa4..63664b7c 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -212,6 +212,9 @@ type Runner struct { } func (r *Runner) ListCredentials(ctx context.Context) ([]params.GithubCredentials, error) { + if !auth.IsAdmin(ctx) { + return nil, runnerErrors.ErrUnauthorized + } ret := []params.GithubCredentials{} for _, val := range r.config.Github { @@ -224,6 +227,9 @@ func (r *Runner) ListCredentials(ctx context.Context) ([]params.GithubCredential } func (r *Runner) ListProviders(ctx context.Context) ([]params.Provider, error) { + if !auth.IsAdmin(ctx) { + return nil, runnerErrors.ErrUnauthorized + } ret := []params.Provider{} for _, val := range r.providers { diff --git a/websocket/client.go b/websocket/client.go new file mode 100644 index 00000000..962da1e0 --- /dev/null +++ b/websocket/client.go @@ -0,0 +1,99 @@ +package websocket + +import ( + "log" + "time" + + "github.com/google/uuid" + + "github.com/gorilla/websocket" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + + // Maximum message size allowed from peer. + maxMessageSize = 1024 +) + +func NewClient(conn *websocket.Conn, hub *Hub) (*Client, error) { + clientID := uuid.New() + return &Client{ + id: clientID.String(), + conn: conn, + hub: hub, + send: make(chan []byte, 100), + }, nil +} + +type Client struct { + id string + conn *websocket.Conn + // Buffered channel of outbound messages. + send chan []byte + + hub *Hub +} + +func (c *Client) Go() { + go c.clientReader() + go c.clientWriter() +} + +// clientReader waits for options changes from the client. The client can at any time +// change the log level and binary name it watches. +func (c *Client) clientReader() { + defer func() { + c.hub.unregister <- c + c.conn.Close() + }() + c.conn.SetReadLimit(maxMessageSize) + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + for { + mt, _, err := c.conn.ReadMessage() + if err != nil { + break + } + if mt == websocket.CloseMessage { + break + } + } +} + +// clientWriter +func (c *Client) clientWriter() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.conn.Close() + }() + for { + select { + case message, ok := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // The hub closed the channel. + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + if err := c.conn.WriteMessage(websocket.TextMessage, message); err != nil { + log.Printf("error sending message: %v", err) + return + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} diff --git a/websocket/websocket.go b/websocket/websocket.go new file mode 100644 index 00000000..a482fdcd --- /dev/null +++ b/websocket/websocket.go @@ -0,0 +1,110 @@ +package websocket + +import ( + "context" + "fmt" + "time" +) + +func NewHub(ctx context.Context) *Hub { + return &Hub{ + clients: map[string]*Client{}, + broadcast: make(chan []byte, 100), + register: make(chan *Client, 100), + unregister: make(chan *Client, 100), + ctx: ctx, + closed: make(chan struct{}), + quit: make(chan struct{}), + } +} + +type Hub struct { + ctx context.Context + closed chan struct{} + quit chan struct{} + // Registered clients. + clients map[string]*Client + + // Inbound messages from the clients. + broadcast chan []byte + + // Register requests from the clients. + register chan *Client + + // Unregister requests from clients. + unregister chan *Client +} + +func (h *Hub) run() { + for { + select { + case <-h.quit: + close(h.closed) + return + case <-h.ctx.Done(): + close(h.closed) + return + case client := <-h.register: + if client != nil { + h.clients[client.id] = client + } + case client := <-h.unregister: + if client != nil { + if _, ok := h.clients[client.id]; ok { + delete(h.clients, client.id) + close(client.send) + } + } + case message := <-h.broadcast: + for id, client := range h.clients { + if client == nil { + continue + } + + select { + case client.send <- message: + case <-time.After(5 * time.Second): + close(client.send) + delete(h.clients, id) + } + } + } + } +} + +func (h *Hub) Register(client *Client) error { + h.register <- client + return nil +} + +func (h *Hub) Write(msg []byte) (int, error) { + select { + case <-time.After(5 * time.Second): + return 0, fmt.Errorf("timed out sending message to client") + case h.broadcast <- msg: + + } + return len(msg), nil +} + +func (h *Hub) Start() error { + go h.run() + return nil +} + +func (h *Hub) Stop() error { + close(h.quit) + select { + case <-h.closed: + return nil + case <-time.After(60 * time.Second): + return fmt.Errorf("timed out waiting for hub stop") + } +} + +func (h *Hub) Wait() { + select { + case <-h.closed: + case <-time.After(60 * time.Second): + } +}