Kafka

Kafka stream data source

Properties

Properties supported in this source are shown below ( * indicates required fields )
Property
Description
Name *
Name of the data source
Description
Description of the data source
Connection
Pre-defined Kafka connection
Processing Mode
Select for batch and un-select for streaming. If 'Batch' is selected the value of the switch is set to true. If 'Streaming' is selected the value of the switch is set to false.Default: false
External Schema
External schema to impose on Kafka Value. It can be a JSON, AVRO schema string or even JSON, AVRO schema file path
Schema
Source schema to assist during the design of the pipeline
Subscription Type *
The Topic Name / Pattern to SubscribeExample: subscribe
Include Kafka Headers
Check to include the Kafka headers in the rowDefault: false
Watermark Field Name
Field name to be used as watermark. If unspecified in streaming mode, the default field name is 'tempWatermark'.Example: myConsumerWatermark
Watermark Value
Watermark value settingExample: 10 seconds,2 minutes
Schema Mode
This mode determines how the source behaves when schema is evaluatedDefault: PERMISSIVE
Starting Offsets by Timestamp
The start point of timestamp when a query is started, a json string specifying a starting timestamp for each TopicPartition. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist, the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value.For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details.Also the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details.Note: This option requires Kafka 0.10.1.0 or higher.Note2: startingOffsetsByTimestamp takes precedence over startingOffsets.Note3: For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.Example: {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}}
Starting Offsets if Any
The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest. Default is "latest" for streaming, "earliest" for batchExample: earliest
Ending Offsets by Timestamp
The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist, the offset will be set to latest.Spark simply passes the timestamp information to KafkaConsumer.offsetsForTimes, and doesn't interpret or reason about the value.For more details on KafkaConsumer.offsetsForTimes, please refer javadoc for details.Also the meaning of timestamp here can be vary according to Kafka configuration (log.message.timestamp.type): please refer Kafka documentation for further details.Note: This option requires Kafka 0.10.1.0 or higher.Note2: endingOffsetsByTimestamp takes precedence over endingOffsets.Example: {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}}
Ending Offsets if Any
The end point when a batch query is ended, either "latest" which is just referred to the latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1 as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed. latest or json string {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}Example: latest
Fail Query When Data is Lost
Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected.Default: false
Poll data timeout in milliseconds
The timeout in milliseconds to poll data from Kafka in executors.Example: 512Default: 512
Number of times to retry fetch
Number of times to retry before giving up fetching Kafka offsets.Example: 3Default: 3
Number of milliseconds to wait to retry fetch
milliseconds to wait before retrying to fetch Kafka offsetsExample: 5Default: 10
Maximum number of offsets to fetch per trigger
Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.Example: 2
Minimum number of partitions to read
Desired minimum number of partitions to read from Kafka. By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka. If you set this option to a value greater than your topicPartitions, Spark will divvy up large Kafka partitions to smaller pieces. Please note that this configuration is like a hint: the number of Spark tasks will be approximately minPartitions. It can be less or more depending on rounding errors or Kafka partitions that didn't receive any new data.Example: 5
Group ID Prefix
Prefix of consumer group identifiers ( group.id ) that are generated by structured streaming queries. If " kafka.group.id " is set, this option will be ignored.Example: kwartile
Group ID
The Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution. By default, each query generates a unique group id for reading data. This ensures that each Kafka source has its own consumer group that does not face interference from any other consumer, and therefore can read all of the partitions of its subscribed topics. In some scenarios (for example, Kafka group-based authorization), you may want to use a specific authorized group id to read data. You can optionally set the group id. However, do this with extreme caution as it can cause unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the same group id are likely interfere with each other causing each query to read only part of the data. This may also occur when queries are started/restarted in quick succession. To minimize such issues, set the Kafka consumer session timeout (by setting option " kafka.session.timeout.ms ") to be very small. When this is set, option "groupIdPrefix" will be ignored.Example: ABCD123
Acks
AcksExample: 1
Normalize Column Names
Normalizes column names by replacing special characters ,;{}()&/\n\t= and space with the given stringExample: _