wrapper to send logs of any program to bigquery
Imagine you have some service running somewhere on VM
This service has json logs which you want to have in BigQuery
To do so usually we are start fighting with things like logstash, fluentbit, etc
Which at the end may be written as a simple tools that reads file, stores offset, parses lines and send them to BigQuery
But the question is: why at all we need to touch disk and do all that?
Why not instead wrap service and mirror its stdout
Here is pseudo code of what it might look like:
cmd := exec("/usr/bin/some-service", "arg1")
cmd.Stdout = io.MultiWriter(os.Stdout, mirror) // here is the trick
cmd.Run()
For example lets have small http server
package main
import (
"fmt"
"log"
"net/http"
"os"
)
func main() {
fmt.Println("Starting server")
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
name := q.Get("name")
if name != "" {
fmt.Fprintf(os.Stdout, `{"name":"%s","message":"request","level":"info"}` + "\n", name)
w.Write([]byte("Hello " + name + "!\n"))
} else {
fmt.Fprintln(os.Stderr, `{"name":"unknown","message":"request","level":"warn"}`)
w.Write([]byte("Hello World!\n"))
fmt.Println("got request without name")
}
})
err := http.ListenAndServe(":8080", nil)
if err != nil {
log.Fatal(err)
}
fmt.Println("Listening localhost:8080")
}
By intent it is trying to behave like an app writing json logs but from time to time there are non json output
And here is our mirrorer:
package main
import (
"bufio"
"encoding/json"
"fmt"
"io"
"log"
"os"
"os/exec"
"strings"
"time"
)
func main() {
// TODO: pass os.Args to aka exec.Command(os.Args[1], os.Args[2:])
// cmd := exec.Command("ping", "-c", "2", "www.google.com")
// cmd := exec.Command("ls", "-lah", ".")
// cmd := exec.Command("node", "/Users/mac/Desktop/server.js")
// cmd := exec.Command("./server/server")
cmd := exec.Command("go", "run", "./server/server.go")
// TODO: there should be a way to ask kill child process in case of crash
// cmd.SysProcAttr = &syscall.SysProcAttr{
// Pdeathsig: syscall.SIGTERM,
// }
// NOTE: simples way to mirror child process stdout and stderr
// cmd.Stdout = os.Stdout
// cmd.Stderr = os.Stderr
// NOTE: each Write to the PipeWriter blocks until it has satisfied one or more Reads from the PipeReader that fully consume the written data
pr, pw := io.Pipe()
cmd.Stdout = io.MultiWriter(os.Stdout, pw)
// NOTE: thanks to buffer we have non blocking writes, but from time to time it crashes with "signal: broken pipe" error
// buf := bytes.NewBuffer([]byte{})
// cmd.Stdout = io.MultiWriter(os.Stdout, buf) // here is the trick, mirror child process stdout to our stdout and to buffer
// NOTE: because of pipe is blocking and we do not want to block stdout in any case we are creating buffer very big one, it does not take memory until is used, by intent i put really huge number here
// TODO: return back to 1K or 1M
// NOTE: 1_000_000_000 ~ 8Gb, 1_000_000 ~ 10Mb
queue := make(chan map[string]interface{}, 1_000_000)
go parse(pr, queue)
go process(queue)
// start child process and wait till it ends
if err := cmd.Run(); err != nil {
log.Println("child process ended")
log.Fatal(err)
}
}
// infinite loop, trying to read and parse json, all successfull entries are send then into queue for batching
func parse(reader io.Reader, queue chan map[string]interface{}) {
var data map[string]interface{}
scanner := bufio.NewScanner(reader)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
line := scanner.Text()
if !strings.HasPrefix(line, "{") {
// fmt.Printf("# skipped '%s' is does not look like JSON\n", strings.TrimSuffix(line, "\n"))
continue
}
err := json.Unmarshal([]byte(line), &data)
if err != nil {
// fmt.Printf("# unable parse '%s'\n", strings.TrimSuffix(line, "\n"))
continue
}
if data["timestamp"] == nil {
data["timestamp"] = time.Now()
}
queue <- data
}
if err := scanner.Err(); err != nil {
fmt.Println(err)
}
fmt.Println("Should not land here until child process closed")
}
func process(ch chan map[string]interface{}) {
// size := 1000 // BigQuery allows up to 1K of rows to be inserted at once
// timeout := 30 // We are going to flush every 30sec even if we have less than 1K entries
size := 5 // For demo purposes reduce numbers
timeout := 2
// NOTE: go get cloud.google.com/go/bigquery, commented out for demo purposes
// ctx := context.Background()
// client, err := bigquery.NewClient(ctx, "project-x")
// if err != nil {
// log.Fatal(err)
// }
// defer client.Close()
// inserter := client.Dataset("dataset1").Table("logs").Inserter()
counter := 0
for batch := range batch(ch, size, time.Duration(timeout)*time.Second) {
// err = inserter.Put(ctx, batch)
time.Sleep(2 * time.Second) // pretend we are saving entries to BigQuery, this one needed to check if we will block stdout
counter += len(batch)
fmt.Printf("saved batch of %d entries, %d total\n", len(batch), counter)
}
}
// https://elliotchance.medium.com/batch-a-channel-by-size-or-time-in-go-92fa3098f65
// NOTE: check this out, `goto` still alive :)
func batch(values <-chan map[string]interface{}, maxItems int, maxTimeout time.Duration) chan []map[string]interface{} {
batches := make(chan []map[string]interface{})
go func() {
defer close(batches)
for keepGoing := true; keepGoing; {
var batch []map[string]interface{}
expire := time.After(maxTimeout)
for {
select {
case value, ok := <-values:
if !ok {
keepGoing = false
goto done
}
batch = append(batch, value)
if len(batch) == maxItems {
goto done
}
case <-expire:
goto done
}
}
done:
if len(batch) > 0 {
batches <- batch
}
}
}()
return batches
}
Notes to consider:
io.Pipe
is blocking, which means if we have non buffered channel everything will be blocked till data is processed and if child process is dependant on stdout it will also be blocked, that's why we are creating buffered channel- from my experiments it seems that it does eat predictable amount of memory which may be even better, e.g. won't grow forewer and cause OOM
Having that we may wrap call to our service with this wrapper and have all our logs being send directly to BigQuery without need to touch disk at all
To check how it works i ran something like:
for i in {1..20}
do
curl "http://localhost:8080/?name=$i"
done
as expected stdout on both sides was written immediatelly and little bit later i see desired
Starting server
{"name":"1","message":"request","level":"info"}
...
{"name":"20","message":"request","level":"info"}
saved batch of 5 entries, 5 total
saved batch of 5 entries, 10 total
saved batch of 5 entries, 15 total
saved batch of 5 entries, 20 total
The only thing left is to modify systemd config to run wrapper instead of actual program and we are done