Hi, I’m looking into sending logs over to Graylog. Since KumoMTA logs are zstd compressed, I’m scratching my head on how best to get them to Graylog uncompressed.
Do you guys have any tips or preferred ways to do this? Also, any specific log shippers you’d recommend for this scenario?
Not that I’m aware of. It accepts plain/JSON logs over UDP/TCP. Logs can be forwarded using fluentd/filebeat, but they consume logs from files. I’m looking to use webhooks for specific events and process raw logs with Graylog
if you do decide to decompress and ship log files, I’d recommend setting configure_local_logs - KumoMTA Docs so that you can use some simple mtime checks to determine when a segment file is ready to be processed. If the segment duration is not set, then the default is to flush based on size, which can be hard to reason about from a simple script
Yeah, tailer was originally intended to support that kind of use case, but we realized that most people these days want webhooks. You can try using tailer for this; I’m not sure if it is 100% there. In particular: if there is some issue with the pipeline, (eg: network hiccup), it’s difficult to reason about whether logs were successfully delivered or not.
If you need to guarantee that your logs make it to where they need to go, we recommend using the webhooks/amqp approach to push them there; that’s durable and resilient and can retry log delivery in the case of failure.
relatively low to get started, this mainly to watch the behavior of newly configured server and later I can move the logs to a better solution like AMQP
I did setup a remote RabbitMQ and added the following code to init.lua
local kumo = require 'kumo'
kumo.on('init', function()
kumo.configure_log_hook {
name = 'amqp',
headers = { 'Subject', 'X-Customer-ID' },
}
end) -- END OF THE INIT EVENT
kumo.on('should_enqueue_log_record', function(msg)
local log_record = msg:get_meta 'log_record'
if log_record.queue ~= 'amqp' then
msg:set_meta('queue', 'amqp')
return true
end
return false
end)
kumo.on('get_queue_config', function(domain, tenant, campaign, routing_domain)
if domain == 'amqp' then
return kumo.make_queue_config {
protocol = {
custom_lua = {
constructor = 'make.amqp',
},
},
}
end
return kumo.make_queue_config {}
end)
kumo.on('make.amqp', function(domain, tenant, campaign)
local connection = {}
local client = kumo.amqp.build_client 'amqp://user:password@remote_ip:5672'
function connection:send(message)
local confirm = client:publish {
routing_key = 'logging',
payload = message:get_data(),
}
local result = confirm:wait()
if result.status == 'Ack' or result.status == 'NotRequested' then
return
end
kumo.reject(500, kumo.json_encode(result))
end
return connection
end)
But no message is sent to the RabbitMQ queue, no error, and no details are in debug mode. RabbitMQ URI and access were validated by publishing a message from the init handler. Is there anything missing?
Ah, I think that page in the docs needs to be update a bit; it doesn’t explain things as well as it could, and what you have there doesn’t quite match the current way we do things in log_hooks.lua.
Note the extra hook_name parameter here:
kumo.on('should_enqueue_log_record', function(msg, hook_name)
if hook_name ~= 'amqp' then
-- it's not our hook
return
end
local log_record = msg:get_meta 'log_record'
-- avoid an infinite loop caused by logging that we logged that we logged...
if log_record.reception_protocol == 'LogRecord' then
return false
end
-- was some other event that we want to log via the webhook
msg:set_meta('queue', 'amqp')
return true
end)
I’d suggest adding some print statements in a few key places and checking the journal to see if that gives more insight into what is or isn’t happening.
Thanks, now I can see messages being queued in amqp queue and an error:
stack traceback:
[C]: in local 'poll'
[string "?"]:5: in method 'send'
/opt/kumomta/share/policy-extras/shaping.lua:87: in function </opt/kumomta/share/policy-extras/shaping.lua:86>
caused by: error sending request for url (http://127.0.0.1:8008/publish_log_v1): error trying to connect: tcp connect error: Connection refused (os error 111)
I will play with print statement to get more insights about the context of this error.
this error was due to tsa service being dead, now localset-2 insert{name="amqp"}:insert: kumod::queue: insert_ready: failed to resolve queue amqp: MX lookup for amqp failed: NXDOMAIN