Kafka batching

Hi, I’m trying to send batches to kafka, but I think that the messages are still being sent one by one. This is the hook I’m using:

log_hooks:new {
  name = "kafka",
  batch_size = 1000,
  constructor = function(domain, tenant, campaign)
    local connection = {}
    local producer = kumo.kafka.build_producer {
      ["bootstrap.servers"] = "<kafka_server>",
      ["queue.buffering.max.ms"] = 1000,
    }
    local topics_map = {
      ['Reception'] = "pmta_accounting_received",
      ['Delivery'] = "pmta_accounting_delivered",
      ['Bounce'] = "pmta_accounting_bounced",
      ['TransientFailure'] = "pmta_accounting_transient",
      ['Feedback'] = "kumo_accounting_feedback",
      ['AdminBounce'] = "kumo_accounting_admin_bounced",
      ['Expiration'] = "kumo_accounting_expired",
      ['OOB'] = "kumo_accounting_oob",
    }

    function connection:send_batch(messages)
      for _, message in ipairs(messages) do
        local log_record = message:get_meta('log_record')
        local topic = topics_map[log_record.type] or "pmta_other"
        local status, partition, offset = pcall(producer.send, producer, {
          topic = topic,
          payload = message:get_data(),
        })
        if not status then
          kumo.reject(400, 'kafka error' .. tostring(partition))
        end
      end
      return string.format('%d messages sent', #messages)
    end

    function connection:close()
      producer:close()
    end

    return connection
  end,
}``` 
with ["queue.buffering.max.ms"] = 1000 it's suppose to wait for 1s before sending the messages as batches but producer:send waits for the result. What do I need to change to make this configuration work ?

I don’t know that the batching code was tested for Kafka, so the buffering option may not have predictable results. I’ll check with @free-spirited-yorksh to see if it’s usable in this context.

I did not test it for Kafka, but I did for Webhooks and the underlying code should be the same.
An interesting artifact of this change that batch_size really becomes the ‘max’ batch size and you also have 2 other settings
min_batch_size = 500 , – the low target for the batch
max_batch_latency = “30s”, – the amount of time to pause while collecting logs. This allows the batch to spend more time filling.

Also you will want to tune the connections for that to 1 in order to get the largest batches. the default 32 will spread that batch over 32 connections which is probably not what you want.
IE:

  mx_rollup = false
  connection_limit = 1
  max_deliveries_per_connection = 100000
  max_connection_rate = "1000/s"```

To clarify what Tom wrote: starting with add min_batch_size and max_batch_latency to lua delivery handler · KumoCorp/kumomta@979e69f · GitHub you can now specify min_batch_size and max_batch_latency to directly control batching.

In earlier builds you would need to constrain the connection_limit to 1 and play a slightly dirty trick by adding a kumo.sleep call into your send_batch implementation to encourage collecting a large enough batch.

You should not need to downsize the connection_limit when running with builds that support min_batch_size and max_batch_latency.

@free-spirited-yorksh works fast and I need to keep up :grinning_face_with_smiling_eyes:

Just noticed that your send_batch method:

   function connection:send_batch(messages)
      for _, message in ipairs(messages) do
        local log_record = message:get_meta('log_record')
        local topic = topics_map[log_record.type] or "pmta_other"
        local status, partition, offset = pcall(producer.send, producer, {
          topic = topic,
          payload = message:get_data(),
        })
        if not status then
          kumo.reject(400, 'kafka error' .. tostring(partition))
        end
      end
      return string.format('%d messages sent', #messages)
    end

is collecting a batch inside kumo in the messages variable, but is then iterating over that list and sending a single message at a time into kafka.

You’ll want to tweak that to be something like this where you call producer:send just once per batch:

local payload = {}
for _, message in ipairs(messages) do
   table.insert(payload, message:get_meta('log_record'))
end
producer:send { 
  topic = topic,
  payload = payload
}

that’s not really what I’m looking for, this will put an array of logs inside a single message, to the same topic. I need to keep the log seperated and keep it a log a message

mx_rollup = false
connection_limit = 1
max_deliveries_per_connection = 2000
max_message_rate = "10000/s"
max_ready = 2048

I had this configuration for this queue, I also tried with all the new settings and still have the same results. Setting

  batch_size = 50,
  min_batch_size = 10,
  max_batch_latency = "1s",```
and still keeping ["queue.buffering.max.ms"] = 1000, I get:

for i in {0..100}; do date && /opt/kumomta/sbin/kcli queue-summary; echo ; sleep 10; done
Thu Nov 28 16:52:16 CET 2024
SITE SOURCE PROTO D T C Q

SCHEDULED QUEUE COUNT

Thu Nov 28 16:52:26 CET 2024
SITE SOURCE PROTO D T C Q
kafka.log_hook unspecified lua:make.kafka.log_hook 0 0 1 2,048

SCHEDULED QUEUE COUNT
kafka.log_hook 324

Thu Nov 28 16:52:36 CET 2024
SITE SOURCE PROTO D T C Q
kafka.log_hook unspecified lua:make.kafka.log_hook 11 0 1 2,048

SCHEDULED QUEUE COUNT
kafka.log_hook 891

Thu Nov 28 16:53:17 CET 2024
SITE SOURCE PROTO D T C Q
kafka.log_hook unspecified lua:make.kafka.log_hook 11 0 1 2,048

SCHEDULED QUEUE COUNT
kafka.log_hook 891

Thu Nov 28 16:53:27 CET 2024
SITE SOURCE PROTO D T C Q
kafka.log_hook unspecified lua:make.kafka.log_hook 61 0 1 2,048

SCHEDULED QUEUE COUNT
kafka.log_hook 841

Thu Nov 28 16:54:07 CET 2024
SITE SOURCE PROTO D T C Q
kafka.log_hook unspecified lua:make.kafka.log_hook 61 0 1 2,048

SCHEDULED QUEUE COUNT
kafka.log_hook 841

Thu Nov 28 16:54:17 CET 2024
SITE SOURCE PROTO D T C Q
kafka.log_hook unspecified lua:make.kafka.log_hook 111 0 1 1,998

SCHEDULED QUEUE COUNT
kafka.log_hook 841

I think it’s from the way the kafka mod sends the message, according to the doc, the client should be able to do some internal batching https://docs.rs/rdkafka/latest/rdkafka/producer/index.html#producer-configuration. Maybe we can make a new function that takes an array of messages, do all the sends and then return either ok or an array of failed messages ?

Maybe there should be a send_batch method added that queues up the futures for the sends and then uses FuturesOrdered to wait for them all?

The current await for the completion of the send might be getting in the way of the batching that its client is trying to do.

Alternatively: maybe we don’t need another method at all but a field in the send parameters that just tells the existing method not to wait for the futurerecord?

but then: you wouldn’t know if any given send succeeded.

I’m in favor of the 1st option, we’ll get some feedback if it’s not sent successfully

Do you want to take a run at that? It’s essentially copy and paste kumomta/crates/mod-kafka/src/lib.rs at 858b0324485e6c289e7f6c6c39b5405d3bcb313c · KumoCorp/kumomta · GitHub and change:

methods.add_async_method("send_batch", |lua, this, value: Vec<Value>| async move {

have it iterate of the array of Value and convert to Record, produce the FutureRecords and push_back those into FuturesOrdered in futures::stream - Rust, then wait for the results there and return them in some useful form to the caller.

I’ll see what I can do to implement it, thanks !

it might not be the FutureRecord you need to put into the FuturesOrdered, but rather the return value from the producer:send call

either way I think it should be fairly simple to try it out and see what works