Kafka  Lz4 Trigger icon

Kafka Lz4 Trigger

Consume messages from a Kafka topic

Overview

This node is a Kafka trigger designed to consume messages from a Kafka topic using LZ4 compression. It connects to a specified Kafka topic and consumer group, listens for incoming messages, and emits them as workflow data in n8n. The node supports advanced features such as decoding messages with Avro schemas or Confluent Schema Registry, JSON parsing of message payloads, and returning message headers.

Common scenarios where this node is beneficial include:

  • Real-time processing of event streams from Kafka topics.
  • Integrating Kafka message consumption into automation workflows.
  • Decoding complex message formats like Avro or schema-registry-managed messages.
  • Handling high-throughput Kafka topics with options for parallel processing and offset commit control.

Practical example:

  • Consuming user activity events from a Kafka topic, decoding the Avro-encoded messages, and triggering downstream workflows for analytics or alerting.

Properties

Name Meaning
Topic Name of the Kafka topic or queue to consume messages from.
Group ID Identifier for the Kafka consumer group. Multiple consumers with the same group ID share the load of consuming messages.
Use Schema Registry Whether to decode messages using Confluent Schema Registry.
Schema Registry URL URL of the Confluent Schema Registry service (required if "Use Schema Registry" is enabled).
Use Avro Decode Whether to decode messages using an Avro schema directly.
Avro Schema JSON definition of the Avro schema used for decoding messages (required if "Use Avro Decode" is enabled).
Options Collection of additional configuration options:
- Allow Topic Creation Whether to allow sending messages to topics that do not yet exist.
- Auto Commit Threshold Number of messages after which the consumer commits offsets automatically.
- Auto Commit Interval Time interval in milliseconds after which the consumer commits offsets automatically.
- Heartbeat Interval Interval in milliseconds for sending heartbeats to keep the consumer session active. Must be less than Session Timeout.
- Max Number of Requests Maximum number of unacknowledged requests allowed on a single connection.
- Read Messages From Beginning Whether to start reading messages from the beginning of the topic or only new messages.
- JSON Parse Message Whether to attempt parsing the message payload as JSON.
- Parallel Processing Whether to process messages in parallel or sequentially to maintain order.
- Only Message If JSON parsing is enabled, whether to return only the parsed message content instead of the full object with metadata.
- Return Headers Whether to include Kafka message headers in the output.
- Session Timeout Time in milliseconds to wait for a response before timing out the consumer session.

Output

The node outputs an array of JSON objects representing consumed Kafka messages. Each output item contains:

  • message: The decoded message payload. This can be a string, parsed JSON object, or Avro-decoded object depending on configuration.
  • topic: The Kafka topic name from which the message was consumed.
  • headers (optional): An object containing Kafka message headers as key-value pairs (strings), included if "Return Headers" is enabled.

If "Only Message" is enabled along with JSON parsing, the output will contain just the parsed message content without additional metadata.

The node does not output binary data.

Dependencies

  • Requires a Kafka cluster accessible via brokers specified in credentials.
  • Supports optional integration with Confluent Schema Registry for schema-based decoding; requires the URL of the registry.
  • Uses an API key credential for Kafka authentication (username/password) if authentication is enabled.
  • Relies on the kafkajs library with LZ4 compression codec support.
  • Optionally uses avsc for Avro decoding.
  • Requires proper n8n Kafka credentials configured with broker addresses, client ID, SSL, and authentication details as needed.

Troubleshooting

  • Authentication errors: Ensure username and password are provided if authentication is enabled in credentials.
  • Schema decoding failures: Verify the correctness of the Avro schema JSON or the Schema Registry URL and connectivity.
  • Message parsing errors: If JSON parsing fails, check that the message payload is valid JSON or disable JSON parsing.
  • Timeouts or disconnects: Adjust session timeout and heartbeat interval settings to values appropriate for your Kafka cluster.
  • Topic subscription issues: Confirm the topic name is correct and exists in the Kafka cluster; enable "Allow Topic Creation" if you want to create topics on demand.
  • Parallel processing order issues: Disable parallel processing if message order must be preserved.

Links and References

Discussion