--[[ ######################################################## KumoMTA minimal Send Policy (Save this as /opt/kumomta/etc/policy/init.lua for systemd automation) This config policy defines KumoMTA with a minimal set of modifications from default. Please read the docs at https://docs.kumomta.com/ For detailed configuration instructions. ######################################################## ]] local kumo = require 'kumo' local shaping = require 'policy-extras.shaping' local redis = require 'redis' local sqlite = require 'sqlite' local sources = require 'policy-extras.sources' sources:setup { '/opt/kumomta/etc/policy/sources.toml' } local queue_module = require 'policy-extras.queue' local listener_domains = require 'policy-extras.listener_domains' local client = kumo.amqp.build_client os.getenv("RABBITMQ_CONNECTION") local conn = redis.open { node = os.getenv("REDIS_CONNECTION") } local producer = kumo.kafka.build_producer { ['bootstrap.servers'] = os.getenv("KAFKA_CONNECTION"), ['message.timeout.ms'] = '5000', -- 5 seconds ['request.timeout.ms'] = '3000', -- 3 seconds ['linger.ms'] = '0', -- Send immediately ['queue.buffering.max.messages'] = '10000', ["message.max.bytes"] = '1000000000' } local db = sqlite.open '/opt/psql/Mailer.db' local rabitmqqueue = 'transaction_email_events' kumo.on('get_listener_domain', listener_domains:setup { '/opt/kumomta/etc/listener_domains.toml' }) kumo.on('get_egress_path_config', shaping:setup { '/opt/kumomta/etc/shaping.json', '/opt/kumomta/etc/policy/shaping.toml' }) function addToKafka(cid, message, domain) local dnscheckdata = { messageType = message, cid = cid, domain = domain or "" } addToKafkaTopics('transaction_restrct_email', kumo.json_encode(dnscheckdata)) end function addToKafkaTopics(topic, data) local success, err = pcall(function() producer:send { topic = topic, payload = data, timeout = '10 seconds', required_acks = 0 -- Set to 0 for "fire and forget" mode } end) if not success then print("Error adding to Kafka: " .. err) return false end return true end function addToRequestLog(authz, typeval, message, password) local success, err = pcall(function() local transaction_log = { authz = authz, typeval = typeval, message = message, password = password } return addToKafkaTopics('transaction_request_log', kumo.json_encode(transaction_log)) end) if not success then print("Request log error to Kafka: " .. err) return false end return true end function handleSmtpServerAuthPlain(authz, authc, password, conn_meta) -- Attempt to log the request, but continue even if it fails pcall(function() addToRequestLog(authc, 1, "request", password) end) local AUTH_DETAILS = conn:query('GET', 'trans_authdetails') local details = kumo.json_parse(AUTH_DETAILS) if password ~= '' and details[authc] and details[authc].password == kumo.encode.base64_encode(password) then -- Successful login conn:query('SET', 'failed_attempts_' .. authc, 0) -- Reset failed attempts on success pcall(function() addToRequestLog(authc, 3, "login success", password) end) return true end kumo.on('init', function() -- kumo.set_diagnostic_log_filter 'kumod=debug' kumo.start_esmtp_listener { listen = '0.0.0.0:587', banner = 'Welcome to Mailer', hostname = os.getenv("HOST_NAME"), tls_certificate = os.getenv("TLS_CERTIFICATE"), tls_private_key = os.getenv("TLS_PRIVATE_KEY"), max_recipients_per_message = 100, trace_headers = { received_header = true, supplemental_header = true, header_name = 'X-Mailer', } } kumo.start_http_listener { listen = '127.0.0.1:8000', hostname = os.getenv("HOST_NAME"), use_tls = false, tls_certificate = os.getenv("TLS_CERTIFICATE"), tls_private_key = os.getenv("TLS_PRIVATE_KEY"), } kumo.define_spool { name = 'data', path = '/var/spool/kumomta/data', kind = 'RocksDB', } kumo.define_spool { name = 'meta', path = '/var/spool/kumomta/meta', kind = 'RocksDB' } kumo.configure_local_logs { log_dir = '/var/log/kumomta', headers = { 'Subject', 'X-Tracking-Id' }, meta = { 'received_from', 'tenant', 'received', 'domain', 'routing_domain' }, } kumo.configure_bounce_classifier { files = { '/opt/kumomta/share/bounce_classifier/iana.toml', }, } kumo.configure_log_hook { name = 'amqp', headers = { 'Subject', 'X-Tracking-Id' }, meta = { 'received_from', 'tenant', 'received', 'domain', 'cid', 'pool_id', 'server_id', 'email_domain', 'received_at', 'from_email', 'routing_domain' }, } end) kumo.on('make.amqp', function(domain, tenant, campaign) local connection = {} function connection:send(message) local confirm = client:publish { routing_key = rabitmqqueue, payload = message:get_data(), exchange = 'transaction_email_events_exchange' } local result = confirm:wait() if result.status == 'Ack' or result.status == 'NotRequested' then return result.status end kumo.reject(500, kumo.json_encode(result)) end return connection end) kumo.on('smtp_server_mail_from', function(sender, msg) if not msg:get_meta('authz_id') then kumo.reject(400, "Username and password is empty") end local AUTH_DETAILS = conn:query('GET', 'trans_authdetails') local user = msg:get_meta('authz_id') local details = kumo.json_parse(AUTH_DETAILS) local cid = details[user] and details[user].cid or "" local permission = checkQuota(details, user, cid) if permission ~= "success" then kumo.reject(400, permission) end end) kumo.on('should_enqueue_log_record', function(msg, hook_name) if hook_name ~= 'amqp' then return end local log_record = msg:get_meta('log_record') if log_record.reception_protocol == 'LogRecord' then return false end msg:set_meta('queue', 'amqp') return true end) kumo.on('smtp_server_auth_plain', handleSmtpServerAuthPlain) kumo.on('get_queue_config', function(domain, tenant, campaign, routing_domain) if domain == 'amqp' then return kumo.make_queue_config { protocol = { custom_lua = { constructor = 'make.amqp', }, }, } end return kumo.make_queue_config {} end) local queue_helper = queue_module:setup({ '/opt/kumomta/etc/policy/queues.toml' }) function handleSmtpServerMessage(msg) local user = msg:get_meta('authz_id') if not user then msg:set_meta('queue', 'null') kumo.reject(400, "Username and password is empty") end queue_helper:apply(msg) signMessage(msg, sendingDomain, details, user, cid) end function signMessage(msg) local signer = kumo.dkim.rsa_sha256_signer { domain = os.getenv("DKIM_DOMAIN"), selector = os.getenv("DKIM_SELETOR"), headers = { 'From', 'To', 'Subject' }, key = os.getenv("DKIM_KEY"), } msg:dkim_sign(signer) msg:set_meta("routing_domain", "smtp.eu.mlrcd.com") end kumo.on('smtp_server_message_received', handleSmtpServerMessage)