outpost/ldap: regularly pre-heat flow executor cache to increase bind performance

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens Langhammer 2021-08-21 16:17:30 +02:00
parent ecf35cfd1d
commit ff24bc8cb8
8 changed files with 59 additions and 7 deletions

View File

@ -56,12 +56,6 @@ func NewAPIController(akURL url.URL, token string) *APIController {
log := log.WithField("logger", "authentik.outpost.ak-api-controller") log := log.WithField("logger", "authentik.outpost.ak-api-controller")
akConfig, _, err := apiClient.RootApi.RootConfigRetrieve(context.Background()).Execute()
if err != nil {
log.WithError(err).Error("Failed to fetch global configuration")
return nil
}
// Because we don't know the outpost UUID, we simply do a list and pick the first // Because we don't know the outpost UUID, we simply do a list and pick the first
// The service account this token belongs to should only have access to a single outpost // The service account this token belongs to should only have access to a single outpost
outposts, _, err := apiClient.OutpostsApi.OutpostsInstancesList(context.Background()).Execute() outposts, _, err := apiClient.OutpostsApi.OutpostsInstancesList(context.Background()).Execute()
@ -73,6 +67,15 @@ func NewAPIController(akURL url.URL, token string) *APIController {
outpost := outposts.Results[0] outpost := outposts.Results[0]
doGlobalSetup(outpost.Config) doGlobalSetup(outpost.Config)
log.WithField("name", outpost.Name).Debug("Fetched outpost configuration")
akConfig, _, err := apiClient.RootApi.RootConfigRetrieve(context.Background()).Execute()
if err != nil {
log.WithError(err).Error("Failed to fetch global configuration")
return nil
}
log.Debug("Fetched global configuration")
ac := &APIController{ ac := &APIController{
Client: apiClient, Client: apiClient,
GlobalConfig: akConfig, GlobalConfig: akConfig,
@ -121,5 +124,9 @@ func (a *APIController) StartBackgorundTasks() error {
a.logger.Debug("Starting Interval updater...") a.logger.Debug("Starting Interval updater...")
a.startIntervalUpdater() a.startIntervalUpdater()
}() }()
go func() {
a.logger.Debug("Starting periodical timer...")
a.startPeriodicalTasks()
}()
return nil return nil
} }

View File

@ -39,7 +39,7 @@ func (ac *APIController) initWS(akURL url.URL, outpostUUID strfmt.UUID) {
} }
ws.Dial(fmt.Sprintf(pathTemplate, scheme, akURL.Host, outpostUUID.String()), header) ws.Dial(fmt.Sprintf(pathTemplate, scheme, akURL.Host, outpostUUID.String()), header)
ac.logger.WithField("logger", "authentik.outpost.ak-ws").WithField("outpost", outpostUUID.String()).Debug("connecting to authentik") ac.logger.WithField("logger", "authentik.outpost.ak-ws").WithField("outpost", outpostUUID.String()).Debug("Connecting to authentik")
ac.wsConn = ws ac.wsConn = ws
// Send hello message with our version // Send hello message with our version

View File

@ -3,4 +3,5 @@ package ak
type Outpost interface { type Outpost interface {
Start() error Start() error
Refresh() error Refresh() error
TimerFlowCacheExpiry()
} }

View File

@ -0,0 +1,15 @@
package ak
import (
"time"
)
func (a *APIController) startPeriodicalTasks() {
go a.Server.TimerFlowCacheExpiry()
go func() {
for range time.Tick(time.Duration(a.GlobalConfig.CacheTimeoutFlows) * time.Second) {
a.logger.WithField("timer", "cache-timeout").Debug("Running periodical tasks")
a.Server.TimerFlowCacheExpiry()
}
}()
}

View File

@ -118,6 +118,15 @@ func (fe *FlowExecutor) getAnswer(stage StageComponent) string {
return "" return ""
} }
// WarmUp Ensure authentik's flow cache is warmed up
func (fe *FlowExecutor) WarmUp() error {
defer fe.sp.Finish()
gcsp := sentry.StartSpan(fe.Context, "authentik.outposts.flow_executor.get_challenge")
req := fe.api.FlowsApi.FlowsExecutorGet(gcsp.Context(), fe.flowSlug).Query(fe.Params.Encode())
_, _, err := req.Execute()
return err
}
func (fe *FlowExecutor) Execute() (bool, error) { func (fe *FlowExecutor) Execute() (bool, error) {
return fe.solveFlowChallenge(1) return fe.solveFlowChallenge(1)
} }

View File

@ -48,3 +48,10 @@ func (ls *LDAPServer) Bind(bindDN string, bindPW string, conn net.Conn) (ldap.LD
req.log.WithField("request", "bind").Warning("No provider found for request") req.log.WithField("request", "bind").Warning("No provider found for request")
return ldap.LDAPResultOperationsError, nil return ldap.LDAPResultOperationsError, nil
} }
func (ls *LDAPServer) TimerFlowCacheExpiry() {
for _, p := range ls.providers {
ls.log.WithField("flow", p.flowSlug).Debug("Pre-heating flow cache")
p.TimerFlowCacheExpiry()
}
}

View File

@ -118,3 +118,14 @@ func (pi *ProviderInstance) delayDeleteUserInfo(dn string) {
} }
}() }()
} }
func (pi *ProviderInstance) TimerFlowCacheExpiry() {
fe := outpost.NewFlowExecutor(context.Background(), pi.flowSlug, pi.s.ac.Client.GetConfig(), log.Fields{})
fe.Params.Add("goauthentik.io/outpost/ldap", "true")
fe.Params.Add("goauthentik.io/outpost/ldap-warmup", "true")
err := fe.WarmUp()
if err != nil {
pi.log.WithError(err).Warning("failed to warm up flow cache")
}
}

View File

@ -60,6 +60,8 @@ func (s *Server) ServeHTTP() {
s.logger.Printf("closing %s", listener.Addr()) s.logger.Printf("closing %s", listener.Addr())
} }
func (s *Server) TimerFlowCacheExpiry() {}
func (s *Server) Handler(w http.ResponseWriter, r *http.Request) { func (s *Server) Handler(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/akprox/ping" { if r.URL.Path == "/akprox/ping" {
w.WriteHeader(204) w.WriteHeader(204)