Golang功能开发:监听日志文件实时传送到Opensearch
golang写一些功能性的小工具,正好使用了github.com/hpcloud/tail
实现类似filebeat的功能,监听日志文件实时传送到Opensearch。
使用docker-compose搭建Opensearch
创建docker-compose.yaml
文件
cat >docker-compose.yaml<<EOF
version: '3'
services:
opensearch-node1:
image: opensearchproject/opensearch:latest
container_name: opensearch-node1
environment:
- cluster.name=opensearch-cluster # Name the cluster
- node.name=opensearch-node1 # Name the node that will run in this container
- discovery.seed_hosts=opensearch-node1,opensearch-node2 # Nodes to look for when discovering the cluster
- cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2 # Nodes eligibile to serve as cluster manager
- bootstrap.memory_lock=true # Disable JVM heap memory swapping
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # Set min and max JVM heap sizes to at least 50% of system RAM
- "DISABLE_INSTALL_DEMO_CONFIG=true" # Prevents execution of bundled demo script which installs demo certificates and security configurations to OpenSearch
- "DISABLE_SECURITY_PLUGIN=true" # Disables security plugin
ulimits:
memlock:
soft: -1 # Set memlock to unlimited (no soft or hard limit)
hard: -1
nofile:
soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536
hard: 65536
volumes:
- opensearch-data1:/usr/share/opensearch/data # Creates volume called opensearch-data1 and mounts it to the container
ports:
- 9200:9200 # REST API
- 9600:9600 # Performance Analyzer
networks:
- opensearch-net # All of the containers will join the same Docker bridge network
opensearch-node2:
image: opensearchproject/opensearch:latest
container_name: opensearch-node2
environment:
- cluster.name=opensearch-cluster # Name the cluster
- node.name=opensearch-node2 # Name the node that will run in this container
- discovery.seed_hosts=opensearch-node1,opensearch-node2 # Nodes to look for when discovering the cluster
- cluster.initial_cluster_manager_nodes=opensearch-node1,opensearch-node2 # Nodes eligibile to serve as cluster manager
- bootstrap.memory_lock=true # Disable JVM heap memory swapping
- "OPENSEARCH_JAVA_OPTS=-Xms512m -Xmx512m" # Set min and max JVM heap sizes to at least 50% of system RAM
- "DISABLE_INSTALL_DEMO_CONFIG=true" # Prevents execution of bundled demo script which installs demo certificates and security configurations to OpenSearch
- "DISABLE_SECURITY_PLUGIN=true" # Disables security plugin
ulimits:
memlock:
soft: -1 # Set memlock to unlimited (no soft or hard limit)
hard: -1
nofile:
soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536
hard: 65536
volumes:
- opensearch-data2:/usr/share/opensearch/data # Creates volume called opensearch-data2 and mounts it to the container
networks:
- opensearch-net # All of the containers will join the same Docker bridge network
opensearch-dashboards:
image: opensearchproject/opensearch-dashboards:latest
container_name: opensearch-dashboards
ports:
- 5601:5601 # Map host port 5601 to container port 5601
expose:
- "5601" # Expose port 5601 for web access to OpenSearch Dashboards
environment:
- 'OPENSEARCH_HOSTS=["http://opensearch-node1:9200","http://opensearch-node2:9200"]'
- "DISABLE_SECURITY_DASHBOARDS_PLUGIN=true" # disables security dashboards plugin in OpenSearch Dashboards
networks:
- opensearch-net
volumes:
opensearch-data1:
opensearch-data2:
networks:
opensearch-net:
EOF
在docker-compose.yaml
文件所在目录执行启动命令
docker-compose up -d
可以通过
docker-compose logs -f
查看滚动日志
实现golang代码监听指定文件的内容变动,并传送到Opensearch
使用go mod初始化项目
go mod init
go mod tidy
如下代码可以实现向已安装好的opensearch服务器发送实时日志
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/hpcloud/tail"
"github.com/opensearch-project/opensearch-go"
"github.com/opensearch-project/opensearch-go/opensearchapi"
"log"
"net"
"net/http"
"os"
"strings"
"time"
_ "time"
)
var (
// 字符串列表的方式写入opensearch集群的连接方式
OS_Server = []string{"http://10.122.144.62:9200"}
)
type OSLog struct {
Timestamp int64 `json:"@timestamp"`
Content string `json:"content"`
Host []string `json:"host"`
}
// 获取本机IP列表,去掉本机本地ipv4和ipv6的回环地址
func GetLocalInfo() []string {
var result []string
addrs, _ := net.InterfaceAddrs()
for _, address := range addrs {
if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
string_ip := ipnet.String()
if !strings.HasPrefix(string_ip, "169.254.") &&
!strings.HasPrefix(string_ip, "fe80:") {
result = append(result, string_ip)
}
}
}
return result
}
// 初始化Opensearch的client,监听指定日志文件,并通过client发送日志到opensearch
func SendLog(filename string) {
// create an OpenSearch client
client, err := opensearch.NewClient(opensearch.Config{
Transport: &http.Transport{},
Addresses: OS_Server,
})
if err != nil {
fmt.Println("Error creating the client:", err)
return
}
// Log file configuration
tailConfig := tail.Config{
Location: &tail.SeekInfo{Offset: 0, Whence: os.SEEK_END},
ReOpen: true,
Follow: true,
}
// Open the log file
t, err := tail.TailFile(filename, tailConfig)
if err != nil {
log.Fatalf("Error opening the log file: %s", err)
}
// Create a scanner to read the log file line by line
for line := range t.Lines {
// 此处为将struct格式化为json的方法,感谢chatGPT的友情支持,=。=
var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(OSLog{
Timestamp: time.Now().UnixMilli(),
Content: line.Text,
Host: GetLocalInfo(),
})
if err != nil {
log.Fatalf("%s", err)
}
// Create the index request
indexReq := opensearchapi.IndexRequest{
Index: "rpa",
DocumentID: "", // use an empty ID to auto-generate the ID
Body: &buf,
}
// Execute the index request to send the log data to OpenSearch
indexResp, err := indexReq.Do(context.Background(), client)
if err != nil {
log.Fatalf("Error indexing log data: %s", err)
}
fmt.Printf("Indexed log %s\n", indexResp)
}
}
func main() {
SendLog("robot.log")
}
配置OpenSearch的dashboard,以实现正确的索引
如下图是在左侧导航栏Dev Tools
中对我们的日志特定字段的属性设置
put /rpa
{
"mappings": {
"properties": {
"content": {
"type": "text"
},
"@timestamp": {
"type": "date"
}
}
}
}
之后进入导航栏的对应页面,进行Index的配置
注意,如果日志发送的字段有更新,需要重新进入该页面点击刷新按钮
THE END