--[[ ######################################################## KumoMTA minimal Send Policy (Save this as /opt/kumomta/etc/policy/init.lua for systemd automation) This config policy defines KumoMTA with a minimal set of modifications from default. Please read the docs at https://docs.kumomta.com/ For detailed configuration instructions. ######################################################## ]] -- local kumo = require 'kumo' local kumo = require 'kumo' local utils = require 'policy-extras.policy_utils' -- Load the policy helpers to simplify common configuration use cases local shaping = require 'policy-extras.shaping' local queue_module = require 'policy-extras.queue' local listener_domains = require 'policy-extras.listener_domains' local sources = require 'policy-extras.sources' local dkim_sign = require 'policy-extras.dkim_sign' local log_hooks = require 'policy-extras.log_hooks' -- Send a JSON webhook to a local network host. -- See https://docs.kumomta.com/userguide/operation/webhooks/ log_hooks:new { name = 'kafka', log_parameters = { headers = { 'Subject', 'X-Customer-ID','X-Message-ID' }, }, queue_config = { retry_interval = '1m', max_retry_interval = '20m', }, constructor = function(domain, tenant, campaign) -- add tenant futhure -- if (tenant == 'xxxx') local connection = {} -- Kafka 配置参数 local kafka_config = { ["bootstrap.servers"] = "xxx.xxxx.xxx.xxx:9200", ["queue.buffering.max.ms"] = 1000, ["message.timeout.ms"] = 5000, ["retries"] = 1, ["retry.backoff.ms"] = 100, ["compression.type"] = "snappy", } local producer = kumo.kafka.build_producer(kafka_config) local topic = 'kumo-topic' local max_retries = 3 function connection:send(message) local retry_count = 0 local success = false while not success and retry_count < max_retries do local status, partition, offset = pcall(producer.send, producer, { topic = topic, payload = message:get_data(), headers = { ["content-type"] = "text/json", ["source"] = "kumomta" } }) if status then success = true return String.format('Kafka success: %s', tostring(partition)) else retry_count = retry_count + 1 if retry_count >= max_retries then kumo.reject(400, string.format('Kafka error after %d retries: %s', retry_count, tostring(partition))) end kumo.sleep(0.1) end end end function connection:close() producer:close() end return connection end, } -- Configure queue management settings. These are not throttles, but instead -- control how messages flow through the queues. -- WARNING: ENSURE THAT WEBHOOKS AND SHAPING ARE SETUP BEFORE THE QUEUE HELPER FOR PROPER OPERATION -- WARNING: THIS WILL NOT LOAD WITHOUT the queues.toml FILE IN PLACE -- See https://docs.kumomta.com/userguide/configuration/queuemanagement/ local queue_helper = queue_module:setup { '/opt/kumomta/etc/policy/queues.toml' } -- END SETUP --START EVENT HANDLERS -- START SETUP -- Configure the sending IP addresses that will be used by KumoMTA to -- connect to remote systems using the sources.lua policy helper. -- Note that defining sources and pools does nothing without some form of -- policy in effect to assign messages to the source pools you have defined. -- WARNING: THIS WILL NOT LOAD WITHOUT THE source.toml FILE IN PLACE -- SEE https://docs.kumomta.com/userguide/configuration/sendingips/ sources:setup { '/opt/kumomta/etc/policy/sources.toml' } -- Configure DKIM signing. In this case we use the dkim_sign.lua policy helper. -- WARNING: THIS WILL NOT LOAD WITHOUT the dkim_data.toml FILE IN PLACE -- See https://docs.kumomta.com/userguide/configuration/dkim/ local dkim_signer = dkim_sign:setup { '/opt/kumomta/etc/policy/dkim_data.toml' } -- Load Traffic Shaping Automation Helper local shaper = shaping:setup_with_automation { publish = { 'http://127.0.0.1:8008' }, subscribe = { 'http://127.0.0.1:8008' }, extra_files = { '/opt/kumomta/etc/policy/shaping_custom.toml' }, } --[[ Start of INIT section ]] -- kumo.on('init', function() kumo.start_esmtp_listener { listen = '0.0.0.0:25', hostname = 'kumomta-test.xxxx.com', -- override the default set of relay hosts relay_hosts = { '127.0.0.1', '192.168.1.0/24' }, } kumo.start_http_listener { listen = '127.0.0.1:8000', -- allowed to access any http endpoint without additional auth trusted_hosts = { '127.0.0.1', '::1' }, } kumo.define_spool { name = 'data', path = '/var/spool/kumomta/data', kind = 'RocksDB', } -- Define the default "meta" spool location; this is where -- message envelope and metadata will be stored. kumo.define_spool { name = 'meta', path = '/var/spool/kumomta/meta', kind = 'RocksDB', } kumo.configure_local_logs { log_dir = '/var/log/kumomta', -- Flush logs every 10 seconds. -- You may wish to set a larger value in your production -- configuration; this lower value makes it quicker to see -- logs while you are first getting set up. max_segment_duration = '10s', } -- Configure publishing of TSA logs to automation daemon shaper.setup_publish() -- Configure bounce classification. -- See https://docs.kumomta.com/userguide/configuration/bounce/ kumo.configure_bounce_classifier { files = { '/opt/kumomta/share/bounce_classifier/iana.toml', }, } end) --[[ End of INIT Section ]] --[[ Start of Non-INIT level config ]] -- -- PLEASE read https://docs.kumomta.com/ for extensive documentation on customizing this config. --[[ End of Non-INIT level config ]] -- Call the Traffic Shaping Automation Helper to configure shaping rules. kumo.on('get_egress_path_config', shaper.get_egress_path_config) -- Processing of incoming messages via SMTP kumo.on('smtp_server_message_received', function(msg) -- Protect against SMTP Smuggling (https://sec-consult.com/blog/detail/smtp-smuggling-spoofing-e-mails-worldwide/) local failed = msg:check_fix_conformance( -- check for and reject messages with these issues: 'NON_CANONICAL_LINE_ENDINGS', -- fix messages with these issues: '' ) if failed then kumo.reject(552, string.format('5.6.0 %s', failed)) end -- Call the queue helper to set up the queue for the message. queue_helper:apply(msg) -- SIGNING MUST COME LAST OR YOU COULD BREAK YOUR DKIM SIGNATURES dkim_signer(msg) end) -- Processing of incoming messages via HTTP kumo.on('http_message_generated', function(msg) -- Call the queue helper to set up the queue for the message. queue_helper:apply(msg) -- SIGNING MUST COME LAST OR YOU COULD BREAK YOUR DKIM SIGNATURES dkim_signer(msg) end) -- END OF EVENT HANDLERS