I do not use helper. code like this .
kumo.on('smtp_server_message_received', function(msg)
-- Protect against SMTP Smuggling (https://sec-consult.com/blog/detail/smtp-smuggling-spoofing-e-mails-worldwide/)
local failed = msg:check_fix_conformance(
-- check for and reject messages with these issues:
'NON_CANONICAL_LINE_ENDINGS',
-- fix messages with these issues:
''
)
local campaign = msg:get_first_named_header_value('X-campaign')
if campaign then
local campaignsigle, msg_reception, msg_delivery, msg_bounce,
msg_transient = string.match(campaign or "", "([^|]+)|([01])|([01])|([01])|([01])|?")
if campaignsigle == 'whatiwant' then
msg:set_meta('campaign', campaignsigle)
msg:set_meta('test1', msg_reception)
msg:set_meta('test2', msg_delivery)
msg:set_meta('test3', msg_bounce)
msg:set_meta('test4', msg_transient)
end
end
if failed then
kumo.reject(552, string.format('5.6.0 %s', failed))
end
-- Call the queue helper to set up the queue for the message.
queue_helper:apply(msg)
-- SIGNING MUST COME LAST OR YOU COULD BREAK YOUR DKIM SIGNATURES
dkim_signer(msg)
end)
kumo.on('should_enqueue_log_record', function(msg, hook_name)
local log_record = msg:get_meta 'log_record'
if log_record.queue ~= 'kafka' then
msg:set_meta('queue', 'kafka')
return true
end
return false
end)
kumo.on('make.kafka', function(domain, tenant, campaign)
local connection = {}
local kafka_config = {
["bootstrap.servers"] = "xxx.xxx.xxxx.xxx:port",
["queue.buffering.max.ms"] = 1000,
["message.timeout.ms"] = 5000,
["retries"] = 1,
["retry.backoff.ms"] = 100,
["compression.type"] = "snappy",
}
local producer = kumo.kafka.build_producer(kafka_config)
local topics_map = {
['Reception'] = "test1",
['Delivery'] = "test2",
['Bounce'] = "test3",
['TransientFailure'] = "test4",
}
function connection:send(message)
local connection = {}
local campaign = message:get_meta('campaign')
local msg_reception = message:get_meta('msg_reception')
local msg_delivery = message:get_meta('msg_delivery')
local msg_bounce = message:get_meta('msg_bounce')
local msg_transient = message:get_meta('msg_transient')
print('make.kafka --- campaign, msg_reception, msg_delivery, msg_bounce, msg_transient',campaign, msg_reception, msg_delivery, msg_bounce, msg_transient)
-- some other code
return connection
end
function connection:close()
producer:close()
end
return connection
end)
kumo.on('get_queue_config', function(domain, tenant, campaign, routing_domain)
print('get_queue_config --- domain, tenant, campaign, routing_domain',domain, tenant, campaign, routing_domain)
if domain == 'kafka' then
-- Use the `make.webhook` event to handle delivery
-- of webhook log records
return kumo.make_queue_config {
protocol = {
custom_lua = {
constructor = 'make.kafka',
-- how to send other params?
},
},
}
end
return kumo.make_queue_config {}
end)