Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Acknowledgment of Configuration Updates in Watcher Mode. #91 #95

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,23 @@ type Type struct {
server *http.Server
}

// ConfigMetadata manges the config related data.
type ConfigMetadata struct {
WholeConf any
SuccessReloadCount *int
}

// New creates a new Benthos HTTP API.
func New(
version string,
dateBuilt string,
conf Config,
wholeConf any,
ConfigMetadata ConfigMetadata,
log log.Modular,
stats metrics.Type,
opts ...OptFunc,
) (*Type, error) {
log.Info("HTTP SERVER HAS BEEN EXECUTED After Create Manager")
gMux := mux.NewRouter()
server := &http.Server{Addr: conf.Address}

Expand Down Expand Up @@ -103,15 +110,17 @@ func New(
}

handlePrintJSONConfig := func(w http.ResponseWriter, r *http.Request) {
t.log.Info("handlePrintJson executed")
var g any
var err error
if node, ok := wholeConf.(yaml.Node); ok {
if node, ok := ConfigMetadata.WholeConf.(yaml.Node); ok {
err = node.Decode(&g)
} else {
g = node
}
var resBytes []byte
if err == nil {
t.log.Info("API INOFOOOOOOOO CHEKC\n %+v \n", g)
resBytes, err = json.Marshal(g)
}
if err != nil {
Expand All @@ -122,7 +131,7 @@ func New(
}

handlePrintYAMLConfig := func(w http.ResponseWriter, r *http.Request) {
resBytes, err := yaml.Marshal(wholeConf)
resBytes, err := yaml.Marshal(ConfigMetadata.WholeConf)
if err != nil {
w.WriteHeader(http.StatusBadGateway)
return
Expand All @@ -146,6 +155,10 @@ func New(
}
}

handleConfigAcknowledgement := func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "{\"success_reload_count\":\"%v\"}", *ConfigMetadata.SuccessReloadCount)
}

if t.conf.DebugEndpoints {
t.RegisterEndpoint(
"/debug/config/json", "DEBUG: Returns the loaded config as JSON.",
Expand Down Expand Up @@ -200,6 +213,7 @@ func New(

t.RegisterEndpoint("/ping", "Ping me.", handlePing)
t.RegisterEndpoint("/version", "Returns the service version.", handleVersion)
t.RegisterEndpoint("/config/ack", "Returns the count of success watcher", handleConfigAcknowledgement)
t.RegisterEndpoint("/endpoints", "Returns this map of endpoints.", handleEndpoints)

// If we want to expose a stats endpoint we register the endpoints.
Expand Down
26 changes: 21 additions & 5 deletions internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ func TestAPIEnableCORS(t *testing.T) {
conf := api.NewConfig()
conf.CORS.Enabled = true
conf.CORS.AllowedOrigins = []string{"*"}

s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
configMetadata := api.ConfigMetadata{
WholeConf: nil,
SuccessReloadCount: nil,
}
s, err := api.New("", "", conf, configMetadata, log.Noop(), metrics.Noop())
require.NoError(t, err)

handler := s.Handler()
Expand All @@ -41,7 +44,11 @@ func TestAPIEnableCORSOrigins(t *testing.T) {
conf.CORS.Enabled = true
conf.CORS.AllowedOrigins = []string{"foo", "bar"}

s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
configMetadata := api.ConfigMetadata{
WholeConf: nil,
SuccessReloadCount: nil,
}
s, err := api.New("", "", conf, configMetadata, log.Noop(), metrics.Noop())
require.NoError(t, err)

handler := s.Handler()
Expand Down Expand Up @@ -80,8 +87,12 @@ func TestAPIEnableCORSOrigins(t *testing.T) {
func TestAPIEnableCORSNoHeaders(t *testing.T) {
conf := api.NewConfig()
conf.CORS.Enabled = true
configMetadata := api.ConfigMetadata{
WholeConf: nil,
SuccessReloadCount: nil,
}
_, err := api.New("", "", conf, configMetadata, log.Noop(), metrics.Noop())

_, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
require.Error(t, err)
assert.Contains(t, err.Error(), "must specify at least one allowed origin")
}
Expand Down Expand Up @@ -164,7 +175,12 @@ func TestAPIBasicAuth(t *testing.T) {
conf.BasicAuth.PasswordHash = tc.correctPass
conf.BasicAuth.Salt = "EzrwNJYw2wkErVVV1P36FQ=="

s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
configMetadata := api.ConfigMetadata{
WholeConf: nil,
SuccessReloadCount: nil,
}
s, err := api.New("", "", conf, configMetadata, log.Noop(), metrics.Noop())

if ok := tc.expectedErr(t, err); !(ok && err == nil) {
return
}
Expand Down
12 changes: 10 additions & 2 deletions internal/cli/common/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ func CreateManager(
logger log.Modular,
streamsMode bool,
conf config.Type,
successReloadCount *int,
mgrOpts ...manager.OptFunc,
) (stoppableMgr *StoppableManager, err error) {
logger.Info("Create Manager Is Executed")
var stats *metrics.Namespaced
var trac trace.TracerProvider
defer func() {
Expand Down Expand Up @@ -88,7 +90,13 @@ func CreateManager(
}

var httpServer *api.Type
if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, sanitNode, logger, stats); err != nil {
logger.Info("New Method is calling which is having /debug endpoint")
logger.Info("This is Saint NOde %+v", sanitNode)
configMetadata := api.ConfigMetadata{
WholeConf: sanitNode,
SuccessReloadCount: successReloadCount,
}
if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, configMetadata, logger, stats); err != nil {
err = fmt.Errorf("failed to initialise API: %w", err)
return
}
Expand Down Expand Up @@ -285,7 +293,7 @@ func (s *StoppableManager) Stop(ctx context.Context) error {
return err
}
if err := s.mgr.CloseObservability(ctx); err != nil {
s.mgr.Logger().Error("Failed to cleanly close observability components: %s", err)
s.mgr.Logger().Error("Failed to cleanly close observability components: %w", err)
}
return nil
}
28 changes: 21 additions & 7 deletions internal/cli/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (e *ErrExitCode) Unwrap() error {
// RunService runs a service command (either the default or the streams
// subcommand).
func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error {

mainPath, inferredMainPath, confReader := ReadConfig(c, cliOpts, streamsMode)

conf, pConf, lints, err := confReader.Read()
Expand All @@ -49,6 +50,8 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error {
}()

logger, err := CreateLogger(c, cliOpts, conf, streamsMode)
logger.Info("RUN SERVICE CALLED Here is the intial conf %+v", conf)

if err != nil {
return fmt.Errorf("failed to create logger: %w", err)
}
Expand All @@ -63,6 +66,7 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error {
}

strict := !cliOpts.RootFlags.GetChilled(c)
logger.Info("This is the Stict Value %v ", strict)
for _, lint := range lints {
if strict {
logger.With("lint", lint).Error("Config lint error")
Expand All @@ -73,8 +77,9 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error {
if strict && len(lints) > 0 {
return errors.New(cliOpts.ExecTemplate("shutting down due to linter errors, to prevent shutdown run {{.ProductName}} with --chilled"))
}

stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf)
//Success Watcher Count Is Used to for to get count of the config which was updated with the watcher flag.
successReloadCount := 0
stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf, &successReloadCount)
if err != nil {
return err
}
Expand All @@ -90,9 +95,10 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error {
watching := cliOpts.RootFlags.GetWatcher(c)
if streamsMode {
enableStreamsAPI := !c.Bool("no-api")
stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager())
stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager(), &successReloadCount)
} else {
stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager())
logger.Info("InitMode Get Initiated... strict:%v", strict)
stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager(), &successReloadCount)
}
if err != nil {
return err
Expand Down Expand Up @@ -133,6 +139,7 @@ func initStreamsMode(
strict, watching, enableAPI bool,
confReader *config.Reader,
mgr *manager.Type,
successReloadCount *int,
) (RunningStream, error) {
logger := mgr.Logger()
streamMgr := strmmgr.New(mgr, strmmgr.OptAPIEnabled(enableAPI))
Expand Down Expand Up @@ -181,7 +188,7 @@ func initStreamsMode(
}

if watching {
if err := confReader.BeginFileWatching(mgr, strict); err != nil {
if err := confReader.BeginFileWatching(mgr, strict, successReloadCount); err != nil {
return nil, fmt.Errorf("failed to create stream config watcher: %w", err)
}
}
Expand All @@ -194,9 +201,10 @@ func initNormalMode(
strict, watching bool,
confReader *config.Reader,
mgr *manager.Type,
successReloadCount *int,
) (newStream RunningStream, stoppedChan chan struct{}, err error) {
logger := mgr.Logger()

logger.Info("Init Mode Has been started...")
stoppedChan = make(chan struct{})
var closeOnce sync.Once
streamInit := func() (RunningStream, error) {
Expand All @@ -219,19 +227,25 @@ func initNormalMode(
logger.Info(opts.ExecTemplate("Launching a {{.ProductName}} instance, use CTRL+C to close"))

if err := confReader.SubscribeConfigChanges(func(newStreamConf *config.Type) error {
mgr.Logger().Info("SbscribeConfigChanges is called")
ctx, done := context.WithTimeout(context.Background(), 30*time.Second)
defer done()
// NOTE: We're ignoring observability field changes for now.
return stoppableStream.Replace(ctx, func() (RunningStream, error) {

conf.Config = newStreamConf.Config
mgr.Logger().Info("Here is the Final Updated Config")
mgr.Logger().Info("%+v", conf.Config)
// TODO HERE Starte WITH SanitNode Logic to Encode
return streamInit()
})
}); err != nil {
return nil, nil, fmt.Errorf("failed to create config file watcher: %w", err)
}

if watching {
if err := confReader.BeginFileWatching(mgr, strict); err != nil {
logger.Info("Inside the Watching Before Begin FileWatching.....")
if err := confReader.BeginFileWatching(mgr, strict, successReloadCount); err != nil {
return nil, nil, fmt.Errorf("failed to create config file watcher: %w", err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/cli/studio/pull_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (r *PullRunner) bootstrapConfigReader(ctx context.Context) (bootstrapErr er
tmpTracingSummary.SetEnabled(false)

stopMgrTmp, err := common.CreateManager(
r.cliContext, r.cliOpts, r.logger, false, conf,
r.cliContext, r.cliOpts, r.logger, false, conf, nil,
manager.OptSetEnvironment(tmpEnv),
manager.OptSetBloblangEnvironment(bloblEnv),
manager.OptSetFS(sessFS))
Expand Down Expand Up @@ -413,13 +413,13 @@ func (r *PullRunner) Sync(ctx context.Context) {
}
}
for _, res := range diff.AddResources {
if err := r.confReader.TriggerResourceUpdate(r.mgr, r.strictMode, res.Name); err != nil {
if err := r.confReader.TriggerResourceUpdate(r.mgr, r.strictMode, res.Name, nil); err != nil {
r.logger.Error("Failed to reflect resource file '%v' update: %v", res.Name, err)
runErr = err
}
}
if diff.MainConfig != nil {
if err := r.confReader.TriggerMainUpdate(r.mgr, r.strictMode, diff.MainConfig.Name); err != nil {
if err := r.confReader.TriggerMainUpdate(r.mgr, r.strictMode, diff.MainConfig.Name, nil); err != nil {
r.logger.Error("Failed to reflect main config file '%v' update: %v", diff.MainConfig.Name, err)
runErr = err
}
Expand Down
9 changes: 8 additions & 1 deletion internal/config/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (r *Reader) readMain(mainPath string) (conf Type, pConf *docs.ParsedConfig,
// TriggerMainUpdate attempts to re-read the main configuration file, trigger
// the provided main update func, and apply changes to resources to the provided
// manager as appropriate.
func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string) error {
func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string, successReloadCount *int) error {
conf, _, lints, err := r.readMain(newPath)
if errors.Is(err, fs.ErrNotExist) {
if r.mainPath != newPath {
Expand Down Expand Up @@ -416,6 +416,13 @@ func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPat
mgr.Logger().Error("Failed to apply updated config: %v", err)
return err
}

// Success Watcher Count denotes if the configuration in the benthos gets updated with the watcher
// flag then success watcher count gets increased
if successReloadCount != nil {
*successReloadCount = *successReloadCount + 1
mgr.Logger().Info("Success Reload Count: %v, For Normal Config", *successReloadCount)
}
mgr.Logger().Info("Updated main config")
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions internal/config/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ processor_resources:
assert.False(t, testMgr.ProbeProcessor("c"))
assert.False(t, testMgr.ProbeProcessor("d"))

require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "bar_main.yaml"))
require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "bar_main.yaml", nil))

// Wait for the config watcher to reload the config
select {
Expand Down Expand Up @@ -226,10 +226,10 @@ processor_resources:
return nil
}))

require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "a.yaml"))
require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "b.yaml"))
require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "a.yaml", nil))
require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "b.yaml", nil))

require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "foo_main.yaml"))
require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "foo_main.yaml", nil))

assert.Equal(t, "fooin", conf.Input.Label)
assert.Equal(t, "fooout", conf.Output.Label)
Expand Down
7 changes: 6 additions & 1 deletion internal/config/resource_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (r *Reader) readResource(path string) (conf manager.ResourceConfig, lints [

// TriggerResourceUpdate attempts to re-read a resource configuration file and
// apply changes to the provided manager as appropriate.
func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string) error {
func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string, successReloadCount *int) error {
newResConf, lints, err := r.readResource(path)
if errors.Is(err, fs.ErrNotExist) {
return r.TriggerResourceDelete(mgr, path)
Expand Down Expand Up @@ -273,6 +273,11 @@ func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, pa
}

r.resourceFileInfo[path] = newInfo

if successReloadCount != nil {
*successReloadCount = *successReloadCount + 1
mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *successReloadCount)
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/config/resource_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ processor_resources:
// Watch for configuration changes.
testMgr, err := manager.New(conf.ResourceConfig)
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

tCtx, done := context.WithTimeout(context.Background(), time.Second*30)
defer done()
Expand Down Expand Up @@ -175,7 +175,7 @@ processor_resources:
// Watch for configuration changes.
testMgr, err := manager.New(conf.ResourceConfig)
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

tCtx, done := context.WithTimeout(context.Background(), time.Second*30)
defer done()
Expand Down
7 changes: 6 additions & 1 deletion internal/config/stream_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (r *Reader) findStreamPathWalkedDir(streamPath string) (dir string) {

// TriggerStreamUpdate attempts to re-read a stream configuration file, and
// trigger the provided stream update func.
func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string) error {
func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string, successReloadCount *int) error {
if r.streamUpdateFn == nil {
return nil
}
Expand Down Expand Up @@ -236,5 +236,10 @@ func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path
return err
}
mgr.Logger().Info("Updated stream %v config from file.", info.id)

if successReloadCount != nil {
*successReloadCount = *successReloadCount + 1
mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *successReloadCount)
}
return nil
}
Loading