Hi, I’m trying to send batches to kafka, but I think that the messages are still being sent one by one. This is the hook I’m using:
log_hooks:new {
name = "kafka",
batch_size = 1000,
constructor = function(domain, tenant, campaign)
local connection = {}
local producer = kumo.kafka.build_producer {
["bootstrap.servers"] = "<kafka_server>",
["queue.buffering.max.ms"] = 1000,
}
local topics_map = {
['Reception'] = "pmta_accounting_received",
['Delivery'] = "pmta_accounting_delivered",
['Bounce'] = "pmta_accounting_bounced",
['TransientFailure'] = "pmta_accounting_transient",
['Feedback'] = "kumo_accounting_feedback",
['AdminBounce'] = "kumo_accounting_admin_bounced",
['Expiration'] = "kumo_accounting_expired",
['OOB'] = "kumo_accounting_oob",
}
function connection:send_batch(messages)
for _, message in ipairs(messages) do
local log_record = message:get_meta('log_record')
local topic = topics_map[log_record.type] or "pmta_other"
local status, partition, offset = pcall(producer.send, producer, {
topic = topic,
payload = message:get_data(),
})
if not status then
kumo.reject(400, 'kafka error' .. tostring(partition))
end
end
return string.format('%d messages sent', #messages)
end
function connection:close()
producer:close()
end
return connection
end,
}```
with ["queue.buffering.max.ms"] = 1000 it's suppose to wait for 1s before sending the messages as batches but producer:send waits for the result. What do I need to change to make this configuration work ?