wrapper to send logs of any program to bigquery

Imagine you have some service running somewhere on VM

This service has json logs which you want to have in BigQuery

To do so usually we are start fighting with things like logstash, fluentbit, etc

Which at the end may be written as a simple tools that reads file, stores offset, parses lines and send them to BigQuery

But the question is: why at all we need to touch disk and do all that?

Why not instead wrap service and mirror its stdout

Here is pseudo code of what it might look like:

cmd := exec("/usr/bin/some-service", "arg1")
cmd.Stdout = io.MultiWriter(os.Stdout, mirror) // here is the trick
cmd.Run()

For example lets have small http server

package main

import (
	"fmt"
	"log"
	"net/http"
	"os"
)

func main() {
  fmt.Println("Starting server")

  http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
    q := r.URL.Query()
    name := q.Get("name")
    if name != "" {
      fmt.Fprintf(os.Stdout, `{"name":"%s","message":"request","level":"info"}` + "\n", name)
      w.Write([]byte("Hello " + name + "!\n"))
    } else {
      fmt.Fprintln(os.Stderr, `{"name":"unknown","message":"request","level":"warn"}`)
      w.Write([]byte("Hello World!\n"))
      fmt.Println("got request without name")
    }
  })

  err := http.ListenAndServe(":8080", nil)
  if err != nil {
    log.Fatal(err)
  }
  fmt.Println("Listening localhost:8080")
}

By intent it is trying to behave like an app writing json logs but from time to time there are non json output

And here is our mirrorer:

package main

import (
	"bufio"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"os"
	"os/exec"
	"strings"
	"time"
)

func main() {
	// TODO: pass os.Args to aka exec.Command(os.Args[1], os.Args[2:])
	// cmd := exec.Command("ping", "-c", "2", "www.google.com")
	// cmd := exec.Command("ls", "-lah", ".")
	// cmd := exec.Command("node", "/Users/mac/Desktop/server.js")
  // cmd := exec.Command("./server/server")
  cmd := exec.Command("go", "run", "./server/server.go")

  // TODO: there should be a way to ask kill child process in case of crash
	// cmd.SysProcAttr = &syscall.SysProcAttr{
	// 	Pdeathsig: syscall.SIGTERM,
	// }

	// NOTE: simples way to mirror child process stdout and stderr
	// cmd.Stdout = os.Stdout
	// cmd.Stderr = os.Stderr

  // NOTE: each Write to the PipeWriter blocks until it has satisfied one or more Reads from the PipeReader that fully consume the written data
  pr, pw := io.Pipe()
	cmd.Stdout = io.MultiWriter(os.Stdout, pw)

  // NOTE: thanks to buffer we have non blocking writes, but from time to time it crashes with "signal: broken pipe" error
  // buf := bytes.NewBuffer([]byte{})
	// cmd.Stdout = io.MultiWriter(os.Stdout, buf) // here is the trick, mirror child process stdout to our stdout and to buffer

  // NOTE: because of pipe is blocking and we do not want to block stdout in any case we are creating buffer very big one, it does not take memory until is used, by intent i put really huge number here
  // TODO: return back to 1K or 1M
  // NOTE: 1_000_000_000 ~ 8Gb, 1_000_000 ~ 10Mb
  queue := make(chan map[string]interface{}, 1_000_000)

	go parse(pr, queue)
  go process(queue)

	// start child process and wait till it ends
	if err := cmd.Run(); err != nil {
    log.Println("child process ended")
		log.Fatal(err)
	}
}

// infinite loop, trying to read and parse json, all successfull entries are send then into queue for batching
func parse(reader io.Reader, queue chan map[string]interface{}) {
  var data map[string]interface{}

  scanner := bufio.NewScanner(reader)
  scanner.Split(bufio.ScanLines)

  for scanner.Scan() {
    line := scanner.Text()

    if !strings.HasPrefix(line, "{") {
      // fmt.Printf("# skipped '%s' is does not look like JSON\n", strings.TrimSuffix(line, "\n"))
      continue
    }

    err := json.Unmarshal([]byte(line), &data)
    if err != nil {
      // fmt.Printf("# unable parse '%s'\n", strings.TrimSuffix(line, "\n"))
      continue
    }

    if data["timestamp"] == nil {
      data["timestamp"] = time.Now()
    }

    queue <- data
  }

  if err := scanner.Err(); err != nil {
    fmt.Println(err)
  }

  fmt.Println("Should not land here until child process closed")
}

func process(ch chan map[string]interface{}) {
  // size := 1000 // BigQuery allows up to 1K of rows to be inserted at once
	// timeout := 30 // We are going to flush every 30sec even if we have less than 1K entries
  size := 5 // For demo purposes reduce numbers
  timeout := 2

  // NOTE: go get cloud.google.com/go/bigquery, commented out for demo purposes
  // ctx := context.Background()
	// client, err := bigquery.NewClient(ctx, "project-x")
	// if err != nil {
	// 	log.Fatal(err)
	// }
	// defer client.Close()
  // inserter := client.Dataset("dataset1").Table("logs").Inserter()

  counter := 0
  for batch := range batch(ch, size, time.Duration(timeout)*time.Second) {
    // err = inserter.Put(ctx, batch)
    time.Sleep(2 * time.Second) // pretend we are saving entries to BigQuery, this one needed to check if we will block stdout
    counter += len(batch)
		fmt.Printf("saved batch of %d entries, %d total\n", len(batch), counter)
	}
}

// https://elliotchance.medium.com/batch-a-channel-by-size-or-time-in-go-92fa3098f65
// NOTE: check this out, `goto` still alive :)
func batch(values <-chan map[string]interface{}, maxItems int, maxTimeout time.Duration) chan []map[string]interface{} {
	batches := make(chan []map[string]interface{})

	go func() {
		defer close(batches)

		for keepGoing := true; keepGoing; {
			var batch []map[string]interface{}
			expire := time.After(maxTimeout)
			for {
				select {
				case value, ok := <-values:
					if !ok {
						keepGoing = false
						goto done
					}

					batch = append(batch, value)
					if len(batch) == maxItems {
						goto done
					}

				case <-expire:
					goto done
				}
			}

		done:
			if len(batch) > 0 {
				batches <- batch
			}
		}
	}()

	return batches
}

Notes to consider:

  • io.Pipe is blocking, which means if we have non buffered channel everything will be blocked till data is processed and if child process is dependant on stdout it will also be blocked, that's why we are creating buffered channel
  • from my experiments it seems that it does eat predictable amount of memory which may be even better, e.g. won't grow forewer and cause OOM

Having that we may wrap call to our service with this wrapper and have all our logs being send directly to BigQuery without need to touch disk at all

To check how it works i ran something like:

for i in {1..20}
do
curl "http://localhost:8080/?name=$i"
done

as expected stdout on both sides was written immediatelly and little bit later i see desired

Starting server
{"name":"1","message":"request","level":"info"}
...
{"name":"20","message":"request","level":"info"}
saved batch of 5 entries, 5 total
saved batch of 5 entries, 10 total
saved batch of 5 entries, 15 total
saved batch of 5 entries, 20 total

The only thing left is to modify systemd config to run wrapper instead of actual program and we are done