Skip to main content
Version: Next

Kafka Connector

The Kafka Producer Connector is an outbound Connector that allows you to connect your BPMN service with Apache Kafka to produce messages.

Prerequisites

To use the Kafka Producer Connector, you must have a Kafka instance with a configured bootstrap server.

note

Use Camunda secrets to avoid exposing your sensitive data as plain text. To learn more, see managing secrets.

Create a Kafka Producer Connector task

You can apply a Connector to a task or event via the append menu. For example:

  • From the canvas: Select an element and click the Change element icon to change an existing element, or use the append feature to add a new element to the diagram.
  • From the properties panel: Navigate to the Template section and click Select.
  • From the side palette: Click the Create element icon.

change element

After you have applied a Connector to your element, follow the configuration steps or see using Connectors to learn more.

Make your Kafka Producer Connector for publishing messages executable

To make your Kafka Producer Connector for publishing messages executable, complete the following sections.

Authentication

(Optional) Set the relevant credentials in the Authentication section. For example, {{secrets.MY_KAFKA_USERNAME}}.

Schema

In the Kafka section:

  1. Select the schema strategy for your messages.
    • Select No schema, Inline schema for Avro serialization.
    • Select Schema registry if you have a Confluent Schema Registry.
  2. Set the URL of the bootstrap server(s). If more than one server is required, use comma-separated values.
  3. Set the topic name.
  4. (Optional) Set producer configuration values in the Headers field. Only UTF-8 strings are supported as header values.
  5. (Optional) Set producer configuration values in the Additional properties field.
info

The appendix provides more information about:

Additionally, to learn more about supported producer configurations, see the official Kafka documentation.

Message

In the Message section, set the Key and the Value that will be sent to Kafka topic.

Schema strategies

caution

Use Schema strategies with caution, as this is an alpha feature. Functionality may not be comprehensive and could change.

This Connector supports different schema strategies, offering a compact, fast, and binary data exchange format for Kafka messages.

When using a schema strategy, each message is serialized according to a specific schema written in JSON format. This schema defines the Kafka message structure, ensuring the data conforms to a predefined format, and enables schema evolution strategies.

info

To learn more about Schema strategies, refer to the official documentation:

No schema

Select No schema to send messages without a schema. This option is suitable for simple messages that do not require a schema.

Inline schema

Select Inline schema to send messages with an Avro schema.

  • This option is suitable for messages that require a schema and that are not (or do not need to be) registered in a schema registry.
  • Enter the Avro schema that defines the message structure into the Schema field that appears in the Message section.

Schema registry

Select Schema registry to send messages with a schema registered in a schema registry.

  • This option is suitable for messages that require a schema and that are registered in a schema registry.
  • You must provide:
    • The schema registry URL in the Kafka section.
    • The schema itself (that defines the message structure) in the Message section.
    • The credentials for the schema registry (if required). Refer to the Schema Registry documentation for more information.

Example Avro schema and data

The following is an example Avro schema and data:

Avro schema:

{
"doc": "Sample schema to help you get started.",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
},
{
"name": "emails",
"type": {
"items": "string",
"type": "array"
}
}
],
"name": "sampleRecord",
"namespace": "com.mycorp.mynamespace",
"type": "record"
}

Kafka message

  • Key: employee1

  • Value:

    {
    "name": "John Doe",
    "age": 29,
    "emails": ["johndoe@example.com"]
    }

Kafka Producer Connector response

The Kafka Producer Connector returns metadata for a record that has been acknowledged by the Kafka instance.

The following fields are available in the response variable:

  • timestamp: The timestamp of the message.
  • offset: The message offset.
  • partition: The message partition.
  • topic: The topic name.
info

For more information on these fields, refer to the official Kafka documentation.

You can use an output mapping to map the response:

  1. Use Result Variable to store the response in a process variable. For example, myResultVariable.

  2. Use Result Expression to map fields from the response into process variables. For example:

    = {
    "messageAcknowledgedAt": response.timestamp
    }

Appendix and FAQ

What mechanism is used to authenticate against Kafka?

If the fields Username and Password are not empty, by default the Kafka Producer Connector enables the credentials-based SASL SSL authentication and the following properties are set:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='<Your Username>'   password='<Your Password>';
security.protocol=SASL_SSL
sasl.mechanism=PLAIN

If any of the fields are not populated, you must configure your security method for your Kafka configuration. You can do this using the Additional properties field.

What are default Kafka Producer client properties?

  • Authentication properties (only if both Username and Password are not empty):

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='<Your Username>'   password='<Your Password>';
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
  • Bootstrap server property:

    bootstrap.servers=<bootstrap server(s) from BPMN>
  • Message properties:

    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
  • Miscellaneous properties:

    session.timeout.ms=45000
    client.dns.lookup=use_all_dns_ips
    acks=all
    delivery.timeout.ms=45000

What is the precedence of client properties loading?

Properties loading consists of three steps:

  1. Construct client properties from the BPMN diagram: authentication, bootstrap server, message properties.
  2. Load miscellaneous properties.
  3. Load and override properties from the field Additional properties.

How do I set or override additional client properties?

The following example sets a new client property client.id and overrides the SASL mechanism to SCRAM SHA-256 instead of plain text:

= {
"client.id":"MyDemoClientId",
"sasl.mechanism":"SCRAM-SHA-256"
}