这篇文章上次修改于 499 天前,可能其部分内容已经发生变化,如有疑问可询问作者。

最近在学习golang ,就做了这个小程序来练手:

go语言部分创建了3个文件:

  • worker.go -- 主程序
  • taskWorker.go -- 具体业务处理程序
  • myLog.go -- 日志程序

先看 woker.go 主程序


package main import ( "flag" "fmt" "github.com/benmanns/goworker" "os" ) var logfile string //日志文件 func main(){ fmt.Println("ansyWork start success") var pidFile string flag.StringVar( &pidFile, "pid", "/var/log/ansyTask.pid", "pid file" ) //接收-pid参数 为pid保存的文件路径 flag.StringVar( &logfile, "log", "/var/log/ansyTask/ansyTask.log", "log file" ) //接收-log参数 为log日志保存的文件路径 if !flag.Parsed() { flag.Parse() } //获取当前进程的pid,将pid保存到pidFile pid := os.Getpid() err := WritePid( pidFile, pid ) if err != nil { fmt.Println("Error:", err ) os.Exit(1) } //运行gowoker if err := goworker.Work(); err != nil { fmt.Println("Error:", err) } goworker.Close() }

 

下面是业务处理taskWorker.go 程序:

package main

import (
   "encoding/json"
 "fmt"
 "github.com/benmanns/goworker"
 "io"
 "io/ioutil"
 "os"
 "os/exec"
)

//在初始化里面进行业务注册
func init() {
   goworker.Register("AnsyTask", ansyTaskWorker)
}


func copyAndCapture(w io.Writer, r io.Reader) ([]byte, error) {
   var out []byte
buf := make([]byte, 1024, 1024)
   for {
      n, err := r.Read(buf[:])
      if n > 0 {
         d := buf[:n]
         out = append(out, d...)
         os.Stdout.Write(d)
      }
      if err != nil {
         // Read returns io.EOF at the end of file, which is not an error for us
if err == io.EOF {
            err = nil
         }
         return out, err
      }
   }
   // never reached
panic(true)
   return nil, nil
}


//写文件
func WritePid(name string, pid int) error {
   return ioutil.WriteFile(name, []byte(fmt.Sprintln(pid)),0666)
}

//异步任务 执行的方法
func ansyTaskWorker(queue string, args ...interface{}) error {
   var err error
var cmd  *exec.Cmd
//打印日志
go infoLog(fmt.Sprintf( "From Redis Key : %s; Args: %v\n", queue, args[0] ))
   //myInfo.Printf("From Redis Key : %s; Args: %v\n", queue, args[0])
 //解析数据
params := make(map[string]string)
   data, _ := json.Marshal(args[0])
   json.Unmarshal(data, &params)

   taskType := params["type"] //脚本类型 php|shell
dir := params["dir"] //执行文件所在目录
mainFile := params["mainFile"] //执行的文件

var cmdArgs string //参数
cmdArgs = ""
if _, ok := params["cmdArgs"]; ok{
      cmdArgs = params["cmdArgs"] //参数
}
   switch taskType {
      case "php":
         phpbin := params["phpbin"] //php命令文件
action := params["action"] //控制器-方法-动作
cmd = exec.Command( phpbin, mainFile, action, cmdArgs )
      case "shell":
         //执行shell脚本
cmd = exec.Command( "/bin/bash", mainFile, cmdArgs )
   }
   cmd.Dir = dir
   var stdout,stderr []byte
var errStdout, errStderr error

stdoutIn, _ := cmd.StdoutPipe()
   stderrIn, _ := cmd.StderrPipe()

   cmd.Start()

   go func() {
      stdout,errStdout = copyAndCapture(os.Stdout, stdoutIn)
   }()

   go func() {
      stderr, errStderr = copyAndCapture(os.Stderr, stderrIn)
   }()

   err = cmd.Wait()
   if err != nil {
      go errorLog( err )
      //myError.Println( err )
}

   if errStderr != nil || errStdout != nil {
      go errorLog(errStdout, errStderr)
      //myError.Println(errStdout, errStderr)
}

   outStr, errStr := string(stdout), string(stderr)
   //myInfo.Printf("\nout:\n%s\n", outStr)
infoLog( fmt.Sprintf( "\nout:\n%s\n", outStr ) )
   if errStr != "" {
      errorLog( fmt.Sprintf( "\nerr:\n%s\n", errStr ))
      //myError.Printf("\nerr:\n%s\n", errStr)
}
   return nil
}

 

下面是日志程序 myLog.go:

package main

import (
   "fmt"
 "io"
 "log"
 "os"
 "time"
)


func init()  {
   //设置日志的格式
log.SetPrefix("【ansyTask】") //设置日志的前缀
log.SetFlags(log.Ldate|log.Lshortfile) //显示日期,文件名和行号
}

//记录信息日志
func infoLog( msg ...interface{} ){
   t := time.Now()
   y,m,d := t.Date()
   ymd := fmt.Sprintf("%d%d%d", y,m,d)
   errFile,err:=os.OpenFile( logfile+"."+ymd, os.O_CREATE|os.O_WRONLY|os.O_APPEND,0666)
   defer errFile.Close()
   if err!=nil{
      log.Fatalln("打开日志文件失败:",err)
   }

   myInfo := log.New(io.MultiWriter(os.Stdout,errFile),"Info:",log.Ldate | log.Ltime | log.Lshortfile)
   myInfo.Println( msg... )
}

//记录错误日志
func errorLog( msg ...interface{} ){
   t := time.Now()
   y,m,d := t.Date()
   ymd := fmt.Sprintf("%d%d%d", y,m,d)
   errFile,err:=os.OpenFile( logfile+"."+ymd, os.O_CREATE|os.O_WRONLY|os.O_APPEND,0666)
   defer errFile.Close()
   if err!=nil{
      log.Fatalln("打开日志文件失败:",err)
   }

   myError := log.New(io.MultiWriter(os.Stderr,errFile),"Error:",log.Ldate | log.Ltime | log.Lshortfile)
   myError.Println( msg... )

}

//记录警告日志
func warnLog(msg ...interface{})  {
   t := time.Now()
   y,m,d := t.Date()
   ymd := fmt.Sprintf("%d%d%d", y,m,d)
   errFile,err:=os.OpenFile( logfile+"."+ymd, os.O_CREATE|os.O_WRONLY|os.O_APPEND,0666)
   defer errFile.Close()
   if err!=nil{
      log.Fatalln("打开日志文件失败:",err)
   }

   myWarn := log.New(io.MultiWriter(os.Stderr,errFile),"Error:",log.Ldate | log.Ltime | log.Lshortfile)
   myWarn.Println( msg... )
}

 源代码可在github上查看

github地址:https://github.com/wyz9527/ansyWorker