package main import ( "bufio" "context" "deploy/pkg/config" "deploy/pkg/logger" "deploy/pkg/server" "deploy/pkg/tools" "errors" "flag" "fmt" "log" "net/http" "os" "os/exec" "os/signal" "syscall" "time" "github.com/robfig/cron/v3" "github.com/sirupsen/logrus" "gopkg.in/fsnotify.v1" ) var cfgPath string = "./config.yml" func main() { flag.StringVar(&cfgPath, "c", "./config.yml", "config yaml path") flag.Parse() if !tools.CheckExists(cfgPath, false) { log.Fatal(fmt.Errorf("config file not exists")) } // lock for service running // buffer count 5 reloadChan := make(chan struct{}, 5) // lock for main lock lock := make(chan os.Signal, 1) signal.Notify(lock, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) watcher, err := fsnotify.NewWatcher() if err != nil { log.Fatal(err) } defer watcher.Close() go func() { for { select { case event, ok := <-watcher.Events: if !ok { // channel closed return } if event.Op&fsnotify.Write == fsnotify.Write { // send reload channel reloadChan <- struct{}{} } case err, ok := <-watcher.Errors: if !ok { // channel closed return } log.Fatal(err) } } }() if err := watcher.Add(cfgPath); err != nil { log.Fatal(err) } go runService(reloadChan) <-lock close(reloadChan) } func runService(reload <-chan struct{}) { lastReload := time.Now() log := logger.NewLogger(nil) var svc *http.Server var cron *Cron // infinity loop for { cfg, err := config.Load(cfgPath) if err != nil { log.Fatal(err) } if svc == nil { svc = startServer(cfg, log) } if cron == nil { cron = startCron(cfg, log) } select { case _, ok := <-reload: if !ok { // channel closed exit program if err := stopServer(svc); err != nil { log.Fatal(err) } if err := stopCron(cron); err != nil { log.Fatal(err) } return } if time.Since(lastReload).Seconds() < float64(2*time.Second) { break } lastReload = time.Now() if err := stopServer(svc); err != nil { log.Fatal(err) } if err := stopCron(cron); err != nil { log.Fatal(err) } svc = nil cron = nil } } } type Cron struct { *cron.Cron Logger *logrus.Logger } func (c *Cron) setJobs(cfg *config.Config) { for _, v := range cfg.CronJobs { c.AddFunc(v.CronTime, func() { cmd := exec.Command(v.Script) // r, w := io.Pipe() nlog := c.Logger.WithFields(logrus.Fields{"module": "cron", "script": v.Script}) reader, err := cmd.StdoutPipe() if err != nil { nlog.Warnf("setup pipe out fail: %+v\n", err) } errReader, err := cmd.StderrPipe() if err != nil { nlog.Warnf("setup pipe out fail: %+v\n", err) } if err := cmd.Start(); err != nil { nlog.Warnf("start script fail: %+v\n", err) } go func() { in := bufio.NewScanner(reader) for in.Scan() { nlog.Debugf(in.Text()) } if err := in.Err(); err != nil { nlog.Warnf("script error: %+v\n", err) } }() go func() { in := bufio.NewScanner(errReader) for in.Scan() { nlog.Debugf(in.Text()) } if err := in.Err(); err != nil { nlog.Warnf("script error: %+v\n", err) } }() }) } } func startCron(cfg *config.Config, log *logrus.Logger) *Cron { tz, err := time.LoadLocation("Asia/Taipei") if err != nil { panic(err) } c := cron.New( cron.WithLocation(tz), cron.WithParser(cron.NewParser(cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow)), ) c2 := &Cron{Cron: c, Logger: log} c2.setJobs(cfg) c2.Start() return c2 } func stopCron(c *Cron) error { ctx := c.Stop() select { case <-ctx.Done(): return nil case <-time.Tick(time.Second * 5): // skip stop jobs return errors.New("stop jobs fail") } } func startServer(cfg *config.Config, log *logrus.Logger) *http.Server { engine := server.NewServer(log) // set routes if err := engine.SetRoutes(cfg.Listens); err != nil { log.Fatal(err) } svc := &http.Server{ Addr: fmt.Sprintf(":%d", cfg.Server.Port), Handler: engine.Engine, } go func() { if err := svc.ListenAndServe(); err != nil && err != http.ErrServerClosed { log.Fatal(err) } }() return svc } func stopServer(svc *http.Server) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) defer cancel() if err := svc.Shutdown(ctx); err != nil { return err } return nil }