local kumo = require 'kumo' local redis = require 'redis' local utils = require 'policy-extras.policy_utils' local log_hooks = require 'policy-extras.log_hooks' local shaping = require 'policy-extras.shaping' local VAULT_PROTOCOL = os.getenv 'KUMOD_VAULT_PROTOCOL' local VAULT_HOST = os.getenv 'KUMOD_VAULT_HOST' local VAULT_PORT = os.getenv 'KUMOD_VAULT_PORT' local VAULT_TOKEN = os.getenv 'KUMOD_VAULT_TOKEN' local VAULT_MOUNT = 'secret' local VAULT_SERVER_ADDRESS = VAULT_PROTOCOL .. "://" .. VAULT_HOST .. ":" .. VAULT_PORT local REDIS_PROTOCOL = os.getenv 'KUMOD_REDIS_PROTOCOL' local REDIS_HOST = os.getenv 'KUMOD_REDIS_HOST' local REDIS_PORT = os.getenv 'KUMOD_REDIS_PORT' local REDIS_USER = os.getenv 'KUMOD_REDIS_USER' local REDIS_PASSWORD = os.getenv 'KUMOD_REDIS_PASSWORD' local REDIS_SERVER_NODE = REDIS_PROTOCOL .. '://' .. REDIS_USER .. ':' .. REDIS_PASSWORD .. '@' .. REDIS_HOST .. ':' .. REDIS_PORT .. '/0' local METRICS_USER = os.getenv 'KUMOD_METRICS_USER' local METRICS_PASSWORD = os.getenv 'KUMOD_MTRICS_PASSWORD' local conn = redis.open { node = REDIS_SERVER_NODE } function fetch_vault_data(data_path) local data = kumo.secrets.load { vault_path = data_path, vault_address = VAULT_SERVER_ADDRESS, vault_mount = VAULT_MOUNT, vault_token = VAULT_TOKEN } return data end function extractTenantUUID(tenant_domain_string) local parts = {} for part in string.gmatch(tenant_domain_string, "[^%-]+") do table.insert(parts, part) end if #parts > 5 then local uuid = table.concat(parts, "-", 1, 5) return uuid else return nil end end function tenant_webhook_reception(tenant) local webhook = conn:query('GET', 'tenant_webhook_reception:' .. tenant) return webhook end function tenant_webhook_delivery(tenant) local webhook = conn:query('GET', 'tenant_webhook_delivery:' .. tenant) return webhook end function tenant_webhook_bounce(tenant) local webhook = conn:query('GET', 'tenant_webhook_bounce:' .. tenant) return webhook end function get_dkim_key(domain, selector) local dkim_path = 'dkim/' .. domain .. '/' .. selector .. '.key' return fetch_vault_data(dkim_path) end cached_domain_dkim_key = kumo.memoize(get_dkim_key, { name = 'dkim_key', ttl = '60 minutes', capacity = 100 }) cached_tenant_webhook_reception = kumo.memoize(tenant_webhook_reception, { name = 'webhook_reception', ttl = '10 minutes', capacity = 100 }) cached_tenant_webhook_delivery = kumo.memoize(tenant_webhook_delivery, { name = 'webhook_delivery', ttl = '10 minutes', capacity = 100 }) cached_tenant_webhook_bounce = kumo.memoize(tenant_webhook_bounce, { name = 'webhook_bounce', ttl = '10 minutes', capacity = 100 }) function auth_check(user, password, conn_meta) local smtp_credentials_data = fetch_vault_data('smtp-credentials/' .. user) if not smtp_credentials_data then return false end local user_data = kumo.json_parse(smtp_credentials_data) if user_data.password ~= password then return false end conn_meta:set_meta('tenant', user_data.tenant) local tenant_domains = conn:query('LRANGE', 'tenant_domains:' .. user_data.tenant, 0, -1) conn_meta:set_meta('tenant_domains', tenant_domains) local tenant_limit_max = conn:query('GET', 'tenant_volume:' .. user_data.tenant) conn_meta:set_meta('tenant_volume', tenant_limit_max) return true end -- CALLED ON STARTUP, ALL ENTRIES WITHIN init REQUIRE A SERVER RESTART WHEN CHANGED. kumo.on('init', function() kumo.set_diagnostic_log_filter 'kumod=info' kumo.define_spool { name = 'data', path = '/var/spool/kumomta/data', kind = 'RocksDB' } kumo.define_spool { name = 'meta', path = '/var/spool/kumomta/meta', kind = 'RocksDB' } kumo.configure_local_logs { log_dir = '/var/log/kumo', headers = {'Subject', 'X-Campaign-ID'}, max_segment_duration = '5 minutes', compression_level = 0, meta = {'tenant'}, filter_event = 'should_log_to_smtp_logs', per_record = { Reception = { log_dir = '/var/log/kumo/reception', suffix = '_recv', enable = true, template = [[{{ log_record | tojson }}]] }, Delivery = { log_dir = '/var/log/kumo/delivery', suffix = '_delv', enable = true, template = [[{{ log_record | tojson }}]] }, Bounce = { log_dir = '/var/log/kumo/bounce', template = [[{{ log_record | tojson }}]], suffix = '_bnce', enable = true }, TransientFailure = { log_dir = '/var/log/kumomta', suffix = '_trns', enable = true }, Any = { log_dir = '/var/log/kumomta', suffix = '_othr', enable = true } } } kumo.configure_bounce_classifier { files = {'/opt/kumomta/share/bounce_classifier/iana.toml'} } kumo.start_http_listener { listen = '0.0.0.0:8000', trusted_hosts = {'0.0.0.0/0', '::0'} } kumo.start_esmtp_listener { listen = '0.0.0.0:587', hostname = 'smtp.mailtinni.com', banner = 'Mailtinni ESMTP service ready', relay_hosts = {'0.0.0.0/0', '::/0'}, client_timeout = '5 minutes', deferred_spool = false, max_messages_per_connection = 10000, max_recipients_per_message = 1024, max_message_size = 31457280, trace_headers = { received_header = true, supplemental_header = true, header_name = 'X-KumoRef', include_meta_names = {'tenant', 'campaign'} } -- tls_certificate = '/etc/letsencrypt/live/smtp.mailtinni.com/fullchain.pem', -- tls_private_key = '/etc/letsencrypt/live/smtp.mailtinni.com/privkey.pem' } kumo.configure_log_hook { name = 'webhook-reception', headers = {'Subject', 'X-Campaign-ID'}, meta = {'tenant'}, per_record = { Reception = { enable = true, template = [[{ "id": "{{ id }}", "type": "{{ type }}", "protocol": "{{ reception_protocol }}", "logtime": {{ timestamp }}, "from": "{{ sender }}", "to": "{{ recipient }}", "headers": {{ headers }} }]] }, Delivery = { enable = false }, TransientFailure = { enable = false }, Bounce = { enable = false }, Any = { enable = false } } } kumo.configure_log_hook { name = 'webhook-delivery', headers = {'Subject', 'X-Campaign-ID'}, meta = {'tenant'}, per_record = { Delivery = { enable = true, template = [[{ "id": "{{ id }}", "type": "{{ type }}", "protocol": "{{ delivery_protocol }}", "timestamp": {{ created }}, "from": "{{ sender }}", "to": "{{ recipient }}", "headers": {{ headers }} }]] }, Reception = { enable = false }, TransientFailure = { enable = false }, Bounce = { enable = false }, Any = { enable = false } } } kumo.configure_log_hook { name = 'webhook-bounce', headers = {'Subject', 'X-Campaign-ID'}, meta = {'tenant'}, per_record = { Bounce = { enable = true, template = [[{ "id": "{{ id }}", "type": "{{ type }}", "protocol": "{{ delivery_protocol }}", "timestamp": {{ timestamp }}, "from": "{{ sender }}", "to": "{{ recipient }}", "headers": {{ headers }}, "bounce": "{{ bounce_classification }}" }]] }, Reception = { enable = false }, Delivery = { enable = false }, TransientFailure = { enable = false }, Any = { enable = false } } } -- kumo.configure_redis_throttles { node = 'redis://localhost:6379:6379/' } end) -- END OF THE INIT EVENT kumo.on('smtp_server_auth_plain', function(authz, authc, password, conn_meta) return auth_check(authc, password, conn_meta) end) kumo.on('http_server_validate_auth_basic', function(user, password) local password_database = { [METRICS_USER] = METRICS_PASSWORD } if password == '' then return false end return password_database[user] == password end) kumo.on('http_message_generated', function(msg) error 'inject not allowed' end) kumo.on('smtp_server_mail_from', function(sender, conn_meta) local isInList = false for _, domain in ipairs(conn_meta:get_meta('tenant_domains')) do if domain == sender.domain then isInList = true break end end if isInList == false then kumo.reject(420, 'FROM domain not allowed') end local sent = conn:query("INCRBY", 'tenant_sent:' .. conn_meta:get_meta('tenant'), 1) local volume = tonumber(conn_meta:get_meta('tenant_volume')) if tonumber(sent) > volume then kumo.reject(420, 'monthly limit reached') end end) kumo.on('smtp_server_message_received', function(msg, conn_meta) local messageIdHeader = msg:get_first_named_header_value('Message-ID') if messageIdHeader then if not string.match(messageIdHeader, "^<.+@" .. msg:from_header().domain .. ">$") then msg:remove_all_named_headers('Message-ID') msg:append_header('Message-ID', '<' .. msg:id() .. '@' .. msg:from_header().domain .. '>') end else msg:append_header('Message-Id', '<' .. msg:id() .. '@' .. msg:from_header().domain .. '>') end msg:append_header('Return-Path', "bounce@" .. msg:from_header().domain) local dkim_selector = 'dkim1024' local dkim_key = cached_domain_dkim_key(msg:from_header().domain, dkim_selector) local vault_signer = kumo.dkim.rsa_sha256_signer { selector = dkim_selector, headers = {"From", "To", "Subject", "Date", "MIME-Version", "Content-Type", "Sender"}, algo = "sha256", domain = msg:from_header().domain, key = { key_data = dkim_key } } if not vault_signer then kumo.reject(420, 'no dkim') end msg:dkim_sign(vault_signer) msg:set_meta('tenant', conn_meta:get_meta('tenant') .. '-' .. msg:from_header().domain) msg:set_meta('campaign', msg:get_first_named_header_value 'X-Campaign-ID') end) kumo.on('get_queue_config', function(domain, tenant_domain, campaign) if domain == 'webhook-reception' then return kumo.make_queue_config { protocol = { custom_lua = { constructor = 'make.webhook-reception' } }, max_age = '120 minutes', retry_interval = '2 minutes', max_retry_interval = '20 minutes', egress_pool = "webhook" } end if domain == 'webhook-delivery' then return kumo.make_queue_config { protocol = { custom_lua = { constructor = 'make.webhook-delivery' } }, max_age = '120 minutes', retry_interval = '2 minutes', max_retry_interval = '20 minutes', egress_pool = "webhook" } end if domain == 'webhook-bounce' then return kumo.make_queue_config { protocol = { custom_lua = { constructor = 'make.webhook-bounce' } }, max_age = '120 minutes', retry_interval = '2 minutes', max_retry_interval = '20 minutes', egress_pool = "webhook" } end return kumo.make_queue_config({ max_age = '72 hours', retry_interval = '4 hours', max_retry_interval = '24 hours', egress_pool = tenant_domain -- here }) end) kumo.on('get_egress_pool', function(pool_name) local pool = conn:query('GET', 'tenant_pools:' .. pool_name) if pool then local decoded_pool = kumo.json_parse(pool) local name = decoded_pool.name local entries = decoded_pool.sources local egress_entries = {} for _, entry in ipairs(entries) do table.insert(egress_entries, entry) end return kumo.make_egress_pool { name = name, entries = egress_entries, ttl = '10 minutes' } end end) kumo.on('get_egress_source', function(source_name) local source = conn:query('GET', 'tenant_sources:' .. source_name) if source then local decoded_source = kumo.json_parse(source) return kumo.make_egress_source { name = source_name, source_address = decoded_source.ip, ehlo_domain = decoded_source.ehlo_domain, ttl = '10 minutes' } end end) kumo.on('get_egress_path_config', shaping:setup{'/opt/kumomta/etc/shaping.toml'}) kumo.on('should_enqueue_log_record', function(msg, hook_name) local log_record = msg:get_meta 'log_record' if log_record.queue ~= 'webhook-reception' and log_record.queue ~= 'webhook-delivery' and log_record.queue ~= 'webhook-bounce' then local tenant_domain = msg:get_meta('tenant') if tenant_domain then local tenant = extractTenantUUID(tenant_domain) if tenant then local webhook if hook_name == 'webhook-reception' then webhook = cached_tenant_webhook_reception(tenant) end if hook_name == 'webhook-delivery' then webhook = cached_tenant_webhook_delivery(tenant) end if hook_name == 'webhook-bounce' then webhook = cached_tenant_webhook_bounce(tenant) end if webhook then msg:set_meta('queue', hook_name) return true end end end end return false end) kumo.on('make.webhook-reception', function(domain, tenant, campaign) local connection = {} local client = kumo.http.build_client {} function connection:send(message) local tenant_domain = message:get_meta('tenant') if tenant_domain then local tenant = extractTenantUUID(tenant_domain) if tenant then local webhook = cached_tenant_webhook_reception(tenant) if webhook then local response = client:post(webhook):header('Content-Type', 'application/json'):body( message:get_data()):send() local disposition = string.format('%d %s: %s', response:status_code(), response:status_reason(), response:text()) if response:status_is_success() then return disposition end kumo.reject(500, disposition) end end end end return connection end) kumo.on('make.webhook-delivery', function(domain, tenant, campaign) local connection = {} local client = kumo.http.build_client {} function connection:send(message) local tenant_domain = message:get_meta('tenant') if tenant_domain then local tenant = extractTenantUUID(tenant_domain) if tenant then local webhook = cached_tenant_webhook_delivery(tenant) if webhook then local response = client:post(webhook):header('Content-Type', 'application/json'):body( message:get_data()):send() local disposition = string.format('%d %s: %s', response:status_code(), response:status_reason(), response:text()) if response:status_is_success() then return disposition end kumo.reject(500, disposition) end end end end return connection end) kumo.on('make.webhook-bounce', function(domain, tenant, campaign) local connection = {} local client = kumo.http.build_client {} function connection:send(message) local tenant_domain = message:get_meta('tenant') if tenant_domain then local tenant = extractTenantUUID(tenant_domain) if tenant then local webhook = cached_tenant_webhook_bounce(tenant) if webhook then local response = client:post(webhook):header('Content-Type', 'application/json'):body( message:get_data()):send() local disposition = string.format('%d %s: %s', response:status_code(), response:status_reason(), response:text()) if response:status_is_success() then return disposition end kumo.reject(500, disposition) end end end end return connection end) kumo.on('should_log_to_smtp_logs', function(msg) return msg:get_meta 'reception_protocol' == 'ESMTP' end)