package metric import ( "fmt" influxdb "github.com/influxdata/influxdb1-client" "net/url" "os" "sort" "strings" "time" ) type metric struct { Type string `json:"type"` Timestamp int64 `json:"timestamp"` Metric string `json:"metric"` Count int64 `json:"count"` Value int64 `json:"value"` Max int64 `json:"max"` Min int64 `json:"min"` Tags map[string]string `json:"tags"` } type service struct { hostname string serviceName string hostIp string podIp string metrics chan *metric megers map[string]*metric config *Config client *influxdb.Client } func NewService(c *Config) *service { hostname, _ := os.Hostname() infUrl, _ := url.Parse(c.Address) infCfg := influxdb.NewConfig() infCfg.Username = c.Username infCfg.Password = c.Password infCfg.URL = *infUrl client, _ := influxdb.NewClient(infCfg) return &service{ metrics: make(chan *metric, 100000), hostname: hostname, serviceName: os.Getenv("SERVICE_NAME"), podIp: os.Getenv("POD_IP"), hostIp: os.Getenv("HOST_IP"), config: c, client: client, } } func (s *service) run() { timer := time.NewTicker(time.Duration(s.config.Interval) * time.Second) for { select { case m := <-s.metrics: s.process(m) case <-timer.C: s.report() } } } func (s *service) sortMap(tag map[string]string) string { arr := make([]string, 0) for k, v := range tag { arr = append(arr, fmt.Sprintf("%s=%s", k, v)) } sort.Strings(arr) return strings.Join(arr, ":") } func (s *service) process(m *metric) { if s.megers == nil { s.megers = make(map[string]*metric) } key := m.Metric if m.Tags != nil { key += "_" + s.sortMap(m.Tags) } if v, ok := s.megers[key]; ok { v.Value += m.Value v.Count += m.Count v.Timestamp = m.Timestamp if m.Value > v.Max { v.Max = m.Value } if m.Value < v.Min { v.Value = m.Min } return } m.Max = m.Value m.Min = m.Value s.megers[key] = m } func (s *service) defaultTags() map[string]string { return map[string]string{ "hostname": s.hostname, "host_ip": s.hostIp, "pod_ip": s.podIp, "service_name": s.serviceName, } } func (s *service) report() { if s.megers == nil { return } var bp influxdb.BatchPoints for _, v := range s.megers { var p influxdb.Point p.Measurement = v.Metric p.Tags = v.Tags p.Fields = s.getField(v) p.Time = time.Unix(v.Timestamp, 0) bp.Points = append(bp.Points, p) } bp.Database = s.config.Database bp.Tags = s.defaultTags() s.client.Write(bp) s.megers = nil } func (s *service) add(m *metric) { select { case s.metrics <- m: default: fmt.Println("chan is full") } } func (s *service) getField(m *metric) map[string]interface{} { if m.Type == TypeCount { return map[string]interface{}{ "count": m.Value, } } if m.Type == TypeDuration { avgMs := int64(float64(m.Value) / float64(m.Count)) return map[string]interface{}{ "avg": avgMs, "count": m.Count, "max": m.Max, "min": m.Min, } } return nil }