Shipping KumoMTA Logs to Graylog

Hi, I’m looking into sending logs over to Graylog. Since KumoMTA logs are zstd compressed, I’m scratching my head on how best to get them to Graylog uncompressed.

Do you guys have any tips or preferred ways to do this? Also, any specific log shippers you’d recommend for this scenario?

Thanks

I’m not familiar, does it accept webhooks or AMQP?

Not that I’m aware of. It accepts plain/JSON logs over UDP/TCP. Logs can be forwarded using fluentd/filebeat, but they consume logs from files. I’m looking to use webhooks for specific events and process raw logs with Graylog

I don’t really know anything about Graylog, but found this:

Graylog supports AMQP as a transport for various inputs such as GELF, syslog, and Raw/Plaintext inputs

on this page: Sending in log data

So it sounds like you can use AMQP to deliver individual log records to it.

Routing Messages via AMQP - KumoMTA Docs discusses how to go about that in kumomta

Otherwise, I suggest a script that takes the rotated compressed logs, decompresses them, and then ships them where they need to go

Thanks, I’ll look at it

This is the option I had in mind to work around and will give also a try, thanks :slightly_smiling_face:

if you do decide to decompress and ship log files, I’d recommend setting configure_local_logs - KumoMTA Docs so that you can use some simple mtime checks to determine when a segment file is ready to be processed. If the segment duration is not set, then the default is to flush based on size, which can be hard to reason about from a simple script

I was thinking using tailer --tail and directly pipe the uncompressed logs

What volume are you expecting to move per hour? Not sure about trailer for large volumes of log data.

Yeah, tailer was originally intended to support that kind of use case, but we realized that most people these days want webhooks. You can try using tailer for this; I’m not sure if it is 100% there. In particular: if there is some issue with the pipeline, (eg: network hiccup), it’s difficult to reason about whether logs were successfully delivered or not.

If you need to guarantee that your logs make it to where they need to go, we recommend using the webhooks/amqp approach to push them there; that’s durable and resilient and can retry log delivery in the case of failure.

relatively low to get started, this mainly to watch the behavior of newly configured server and later I can move the logs to a better solution like AMQP

Thanks :raising_hands:t2: , that answers my question. The solution will be AMQP and yes, having the webhooks/amqp is great to have.

I did setup a remote RabbitMQ and added the following code to init.lua

local kumo = require 'kumo'

kumo.on('init', function()
  kumo.configure_log_hook {
    name = 'amqp',
    headers = { 'Subject', 'X-Customer-ID' },
  }
end) -- END OF THE INIT EVENT


kumo.on('should_enqueue_log_record', function(msg)
  local log_record = msg:get_meta 'log_record'
  if log_record.queue ~= 'amqp' then
    msg:set_meta('queue', 'amqp')
    return true
  end
  return false
end)

kumo.on('get_queue_config', function(domain, tenant, campaign, routing_domain)
  if domain == 'amqp' then
    return kumo.make_queue_config {
      protocol = {
        custom_lua = {
          constructor = 'make.amqp',
        },
      },
    }
  end
  return kumo.make_queue_config {}
end)

kumo.on('make.amqp', function(domain, tenant, campaign)
  local connection = {}
  local client = kumo.amqp.build_client 'amqp://user:password@remote_ip:5672'
  function connection:send(message)
    local confirm = client:publish {
      routing_key = 'logging',
      payload = message:get_data(),
    }
    local result = confirm:wait()

    if result.status == 'Ack' or result.status == 'NotRequested' then
      return
    end

    kumo.reject(500, kumo.json_encode(result))
  end

  return connection
end)

But no message is sent to the RabbitMQ queue, no error, and no details are in debug mode. RabbitMQ URI and access were validated by publishing a message from the init handler. Is there anything missing?

The code is different from https://docs.kumomta.com/userguide/policy/amqp/ as it is missing the send method

Ah, I think that page in the docs needs to be update a bit; it doesn’t explain things as well as it could, and what you have there doesn’t quite match the current way we do things in log_hooks.lua.

Note the extra hook_name parameter here:

kumo.on('should_enqueue_log_record', function(msg, hook_name)
  if hook_name ~= 'amqp' then
    -- it's not our hook
    return
  end
  local log_record = msg:get_meta 'log_record'

  -- avoid an infinite loop caused by logging that we logged that we logged...
  if log_record.reception_protocol == 'LogRecord' then
    return false
  end

  -- was some other event that we want to log via the webhook
  msg:set_meta('queue', 'amqp')
  return true
end)

I’d suggest adding some print statements in a few key places and checking the journal to see if that gives more insight into what is or isn’t happening.

Thanks, now I can see messages being queued in amqp queue and an error:

stack traceback:
    [C]: in local 'poll'
    [string "?"]:5: in method 'send'
    /opt/kumomta/share/policy-extras/shaping.lua:87: in function </opt/kumomta/share/policy-extras/shaping.lua:86>
caused by: error sending request for url (http://127.0.0.1:8008/publish_log_v1): error trying to connect: tcp connect error: Connection refused (os error 111)

I will play with print statement to get more insights about the context of this error.

So get_queue_config is never called, is it due to the queue helper (from the example config)? are they exclusive?

local queue_helper =
    queue_module:setup { '/opt/kumomta/etc/policy/queues.toml' }

this error was due to tsa service being dead, now localset-2 insert{name="amqp"}:insert: kumod::queue: insert_ready: failed to resolve queue amqp: MX lookup for amqp failed: NXDOMAIN

the order matters; you want to register your hook before you setup the queue helper