Assertion in librdkafka causes kumomta to crash

Looks like memory allocation fails and this brings down whole kumomta instance.
Not sure why is that happening as the system has plenty of free memory.
Any ideas how to debug this?
Jan 28 17:42:56 localhost kumod[3277858]: kumod: rd.h:135: rd_calloc: Assertion `p' failed. Jan 28 17:42:58 localhost systemd[1]: kumomta.service: Main process exited, code=killed, status=6/ABRT Jan 28 17:42:58 localhost systemd[1]: kumomta.service: Failed with result 'signal'.

              total        used        free      shared  buff/cache   available
Mem:      329787468   135994320    34053296    21866664   159739852   169685140
Swap:      67108860     8785524    58323336
Total:    396896328   144779844    92376632```

Read the links above and provide all the information listed.

CentOS Linux release 8.4.2105```

https://gist.github.com/edgarsendernet/1fb22c87a380cf906f50d6e71cea4d16

Have you changed the log level to capture more details from the journal? Troubleshooting KumoMTA - KumoMTA Docs

enabled debug logging, will monitor

OK, so I figured out it’s caused by kafka.build_producer spawning threads and not closing them.

ps -eL 
....snip...
1006124 1375647 ?        00:00:00 rdk:main
1006124 1375648 ?        00:00:00 rdk:broker-1
1006124 1375652 ?        00:00:04 producer pollin
1006124 1375653 ?        00:00:00 rdk:main
1006124 1375654 ?        00:00:00 rdk:broker-1
1006124 1375658 ?        00:00:04 producer pollin
1006124 1375659 ?        00:00:00 rdk:broker1
1006124 1375660 ?        00:00:00 rdk:broker2
1006124 1375661 ?        00:00:00 rdk:broker0
1006124 1375662 ?        00:00:00 rdk:broker1
...snip...```

Under high concurrency it accumulates > 20k threads like this, at which point it crashes.
kumo.kafka.build_producer is called just once at the beginning of init.lua.
I assume this because each lua context starts it’s own kafka producer, each of which start a few threads of it’s own.

Any ideas how to share a single Kafka producer instance?

Remove this section, which creates a new kafka producer for every lua context that is initialized: https://gist.github.com/edgarsendernet/1fb22c87a380cf906f50d6e71cea4d16#file-init-lua-L19-L21

OK, but where to initialize it then?

IIRC the helper takes care of it.

https://docs.kumomta.com/reference/kumo.kafka/build_producer/

Followed this, not sure if I missed something

I was thinking to do that on ‘init’ event, but log hooks depend on pre_init

You appear to have your own custom log hooks module that you didn’t share. Presumably you are implementing a logger using a similar pattern to the official log hooks module. You should create your producer object as part of setting up your custom connection object, and close it when your connection object is closed

true, here it is https://gist.github.com/edgarsendernet/42b87757cfbf806366c100a7c60ca828

to minimize the overhead I’m now passing the producer to all the log hooks like this log_hooks:new_kafka ({ name = 'webhook-kafka-bounces', topic = 'kumomta_bounces', url = '127.0.0.1:9092', log_parameters = { meta = { 'x_campaign', 'x_campaign_type', 'x_tenant', 'x_subscriber_id', 'x_email_id', 'x_mailbox_provider' }, headers = { 'X-Campaign', 'X-Tenant' }, per_record = { Bounce = { enable = true, }, Any = { enable = false } } }, }, kafka_producer)

https://gist.github.com/edgarsendernet/42b87757cfbf806366c100a7c60ca828#file-log_hooks-lua-L267 is where you want to create your producer; store it in your connection object:

local connection = {
  producer = kumo.kafka.build_producer {
    ['bootstrap.servers'] = "127.0.0.1:9092"
  }
}

you can reference it as self.producer in your connection:send method, instead of kafka_producer
then in your close method you must close it:

        function connection:close()
           self.producer:close()
        end