Overview
This node is a Kafka consumer trigger that listens to messages from a specified Kafka topic and emits them into an n8n workflow. It supports advanced options such as using the Confluent Schema Registry for message decoding, JSON parsing of messages, and controlling message processing order (parallel or sequential). This node is useful in scenarios where you want to integrate real-time streaming data from Kafka topics into your automation workflows, such as processing event streams, monitoring logs, or reacting to system notifications.
Practical examples:
- Consuming user activity events from a Kafka topic and triggering downstream workflows for analytics.
- Listening to IoT device telemetry data streamed via Kafka and processing it in real time.
- Integrating Kafka-based messaging systems with other services by transforming and forwarding messages.
Properties
Name | Meaning |
---|---|
Topic | Name of the Kafka topic to consume messages from. |
Group ID | Identifier for the Kafka consumer group. Multiple consumers with the same group ID share the load of reading from the topic. |
Use Schema Registry | Whether to decode messages using the Confluent Schema Registry. |
Schema Registry URL | URL of the Confluent Schema Registry service (required if "Use Schema Registry" is enabled). |
Options | Collection of additional configuration options: |
- Allow Topic Creation | Whether to allow sending messages to a topic that does not yet exist. |
- Auto Commit Threshold | Number of messages after which the consumer commits offsets automatically. |
- Auto Commit Interval | Time interval in milliseconds after which the consumer commits offsets automatically. |
- Heartbeat Interval | Interval in milliseconds for sending heartbeats to keep the consumer session active. Must be less than Session Timeout. |
- Max Number of Requests | Maximum number of unacknowledged requests allowed on a single connection. |
- Read Messages From Beginning | Whether to start consuming messages from the beginning of the topic or only new messages. |
- JSON Parse Message | Whether to attempt parsing the message value as JSON. |
- Parallel Processing | Whether to process messages in parallel (true) or sequentially preserving order (false). |
- Only Message | If JSON parsing is enabled, whether to return only the parsed message content instead of the full object with metadata. |
- Return Headers | Whether to include Kafka message headers in the output. |
- Session Timeout | Time in milliseconds to wait for a response before considering the session timed out. |
Output
The node outputs messages received from the Kafka topic as JSON objects. Each output item contains:
message
: The message payload, either as a string or parsed JSON object depending on settings.topic
: The name of the Kafka topic the message was consumed from.headers
(optional): An object containing Kafka message headers as UTF-8 strings, included if enabled.
If "Only Message" is enabled along with JSON parsing, the output will contain just the parsed message content without additional metadata.
The node does not output binary data.
Dependencies
- Requires access to a Kafka cluster with brokers specified in credentials.
- Optionally requires access to a Confluent Schema Registry service if schema decoding is enabled.
- Requires valid Kafka authentication credentials (e.g., username/password) if authentication is enabled.
- Uses the
kafkajs
library for Kafka client functionality. - Uses the
@kafkajs/confluent-schema-registry
library for schema registry integration.
Troubleshooting
- Authentication errors: If authentication is enabled but username or password is missing, the node will throw an error. Ensure all required credentials are provided.
- Schema decoding failures: If schema registry URL is incorrect or the schema registry is unreachable, decoding will fail. Verify the URL and network connectivity.
- Message parsing errors: Enabling JSON parsing may cause errors if messages are not valid JSON. Consider disabling JSON parsing or handling parse errors gracefully.
- Session timeout issues: If heartbeat interval is set higher than session timeout, the consumer session may expire unexpectedly. Ensure heartbeat interval is lower than session timeout.
- Topic subscription problems: If the topic does not exist and "Allow Topic Creation" is disabled, the consumer may fail to subscribe. Enable topic creation or verify topic existence.
- Parallel processing order: When parallel processing is enabled, message order is not guaranteed. Disable parallel processing if message order is critical.