2024-01-21 18:12:39 +08:00
|
|
|
package metric
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2024-01-21 18:49:51 +08:00
|
|
|
influxdb "github.com/influxdata/influxdb1-client"
|
|
|
|
"net/url"
|
|
|
|
"os"
|
2024-01-21 18:12:39 +08:00
|
|
|
"sort"
|
|
|
|
"strings"
|
|
|
|
"time"
|
|
|
|
)
|
|
|
|
|
|
|
|
type metric struct {
|
2024-01-21 22:54:16 +08:00
|
|
|
Type string `json:"type"`
|
2024-01-21 18:12:39 +08:00
|
|
|
Timestamp int64 `json:"timestamp"`
|
|
|
|
Metric string `json:"metric"`
|
2024-01-21 22:54:16 +08:00
|
|
|
Count int64 `json:"count"`
|
|
|
|
Value int64 `json:"value"`
|
|
|
|
Max int64 `json:"max"`
|
|
|
|
Min int64 `json:"min"`
|
2024-01-21 18:12:39 +08:00
|
|
|
Tags map[string]string `json:"tags"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type service struct {
|
2024-01-21 18:49:51 +08:00
|
|
|
hostname string
|
|
|
|
serviceName string
|
|
|
|
hostIp string
|
|
|
|
podIp string
|
|
|
|
metrics chan *metric
|
|
|
|
megers map[string]*metric
|
|
|
|
config *Config
|
|
|
|
client *influxdb.Client
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewService(c *Config) *service {
|
|
|
|
hostname, _ := os.Hostname()
|
|
|
|
|
|
|
|
infUrl, _ := url.Parse(c.Address)
|
|
|
|
infCfg := influxdb.NewConfig()
|
|
|
|
infCfg.Username = c.Username
|
|
|
|
infCfg.Password = c.Password
|
|
|
|
infCfg.URL = *infUrl
|
|
|
|
|
|
|
|
client, _ := influxdb.NewClient(infCfg)
|
|
|
|
|
|
|
|
return &service{
|
|
|
|
metrics: make(chan *metric, 100000),
|
|
|
|
hostname: hostname,
|
|
|
|
serviceName: os.Getenv("SERVICE_NAME"),
|
|
|
|
podIp: os.Getenv("POD_IP"),
|
|
|
|
hostIp: os.Getenv("HOST_IP"),
|
|
|
|
config: c,
|
|
|
|
client: client,
|
|
|
|
}
|
2024-01-21 18:12:39 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *service) run() {
|
|
|
|
timer := time.NewTicker(time.Duration(s.config.Interval) * time.Second)
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case m := <-s.metrics:
|
|
|
|
s.process(m)
|
|
|
|
|
|
|
|
case <-timer.C:
|
|
|
|
s.report()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *service) sortMap(tag map[string]string) string {
|
|
|
|
arr := make([]string, 0)
|
|
|
|
for k, v := range tag {
|
|
|
|
arr = append(arr, fmt.Sprintf("%s=%s", k, v))
|
|
|
|
}
|
|
|
|
sort.Strings(arr)
|
|
|
|
return strings.Join(arr, ":")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *service) process(m *metric) {
|
|
|
|
if s.megers == nil {
|
|
|
|
s.megers = make(map[string]*metric)
|
|
|
|
}
|
|
|
|
key := m.Metric
|
|
|
|
if m.Tags != nil {
|
|
|
|
key += "_" + s.sortMap(m.Tags)
|
|
|
|
}
|
|
|
|
|
|
|
|
if v, ok := s.megers[key]; ok {
|
|
|
|
v.Value += m.Value
|
2024-01-21 22:54:16 +08:00
|
|
|
v.Count += m.Count
|
2024-01-21 18:12:39 +08:00
|
|
|
v.Timestamp = m.Timestamp
|
2024-01-21 22:54:16 +08:00
|
|
|
if m.Value > v.Max {
|
|
|
|
v.Max = m.Value
|
|
|
|
}
|
|
|
|
if m.Value < v.Min {
|
|
|
|
v.Value = m.Min
|
|
|
|
}
|
2024-01-21 18:12:39 +08:00
|
|
|
return
|
|
|
|
}
|
2024-01-21 22:54:16 +08:00
|
|
|
|
|
|
|
m.Max = m.Value
|
|
|
|
m.Min = m.Value
|
2024-01-21 18:12:39 +08:00
|
|
|
s.megers[key] = m
|
|
|
|
}
|
|
|
|
|
2024-01-21 18:49:51 +08:00
|
|
|
func (s *service) defaultTags() map[string]string {
|
|
|
|
return map[string]string{
|
|
|
|
"hostname": s.hostname,
|
|
|
|
"host_ip": s.hostIp,
|
|
|
|
"pod_ip": s.podIp,
|
|
|
|
"service_name": s.serviceName,
|
2024-01-21 18:12:39 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *service) report() {
|
|
|
|
if s.megers == nil {
|
|
|
|
return
|
|
|
|
}
|
2024-01-21 18:49:51 +08:00
|
|
|
|
|
|
|
var bp influxdb.BatchPoints
|
2024-01-21 18:12:39 +08:00
|
|
|
for _, v := range s.megers {
|
2024-01-21 18:49:51 +08:00
|
|
|
var p influxdb.Point
|
|
|
|
p.Measurement = v.Metric
|
|
|
|
p.Tags = v.Tags
|
2024-01-21 22:54:16 +08:00
|
|
|
p.Fields = s.getField(v)
|
2024-01-21 18:49:51 +08:00
|
|
|
p.Time = time.Unix(v.Timestamp, 0)
|
|
|
|
bp.Points = append(bp.Points, p)
|
2024-01-21 18:12:39 +08:00
|
|
|
}
|
|
|
|
|
2024-01-21 18:49:51 +08:00
|
|
|
bp.Database = s.config.Database
|
|
|
|
bp.Tags = s.defaultTags()
|
2024-01-21 18:12:39 +08:00
|
|
|
|
2024-02-03 22:20:38 +08:00
|
|
|
s.client.Write(bp)
|
2024-01-21 18:12:39 +08:00
|
|
|
s.megers = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *service) add(m *metric) {
|
|
|
|
select {
|
|
|
|
case s.metrics <- m:
|
|
|
|
default:
|
|
|
|
fmt.Println("chan is full")
|
|
|
|
}
|
|
|
|
}
|
2024-01-21 22:54:16 +08:00
|
|
|
|
|
|
|
func (s *service) getField(m *metric) map[string]interface{} {
|
|
|
|
|
|
|
|
if m.Type == TypeCount {
|
|
|
|
return map[string]interface{}{
|
2024-01-21 23:03:18 +08:00
|
|
|
"count": m.Value,
|
2024-01-21 22:54:16 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if m.Type == TypeDuration {
|
2024-01-21 23:03:18 +08:00
|
|
|
avgMs := int64(float64(m.Value) / float64(m.Count))
|
2024-01-21 22:54:16 +08:00
|
|
|
return map[string]interface{}{
|
|
|
|
"avg": avgMs,
|
|
|
|
"count": m.Count,
|
|
|
|
"max": m.Max,
|
|
|
|
"min": m.Min,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|