Programming Interface
customizableKafkareader
Inherits from DataXReader
If you want to implement the interfaces corresponding to job and task, you can do so.
/**
* @author greenstone road
*/
public class KafkaReader extends Reader {
public static class Job extends {
private Configuration originalConfig = null;
@Override
public void init() {
= ();
();
}
@Override
public void destroy() {
}
@Override
public List<Configuration> split(int adviceNumber) {
List<Configuration> configurations = new ArrayList<>(adviceNumber);
for (int i=0; i<adviceNumber; i++) {
(());
}
return configurations;
}
private void validateParameter() {
(Key.BOOTSTRAP_SERVERS, KafkaReaderErrorCode.REQUIRED_VALUE);
(, KafkaReaderErrorCode.REQUIRED_VALUE);
}
}
public static class Task extends {
private static final Logger logger = ();
private Consumer<String, String> consumer;
private String topic;
private Configuration conf;
private int maxPollRecords;
private String fieldDelimiter;
private String readType;
private List<> columnTypes;
@Override
public void destroy() {
("consumer close");
if ((consumer)) {
();
}
}
@Override
public void init() {
= ();
= ();
= (Key.MAX_POLL_RECORDS, 500);
fieldDelimiter = (Key.FIELD_DELIMITER, "\t", null);
readType = (Key.READ_TYPE, (), null);
if (!().equalsIgnoreCase(readType)
&& !().equalsIgnoreCase(readType)) {
throw (KafkaReaderErrorCode.REQUIRED_VALUE,
("You have provided an incorrect configuration file,unsupportedreadType[%s]", readType));
}
if (().equalsIgnoreCase(readType)) {
List<String> columnTypeList = (Key.COLUMN_TYPE, );
if ((columnTypeList)) {
throw (KafkaReaderErrorCode.REQUIRED_VALUE,
("You have provided an incorrect configuration file,readTypebeJSONhour[%s]be必填参数,Empty or white space is not allowed .", Key.COLUMN_TYPE));
}
convertColumnType(columnTypeList);
}
Properties props = new Properties();
(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, (Key.BOOTSTRAP_SERVERS));
(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, (Key.KEY_DESERIALIZER, "", null));
(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, (Key.VALUE_DESERIALIZER, "", null));
(ConsumerConfig.GROUP_ID_CONFIG, (Key.GROUP_ID, KafkaReaderErrorCode.REQUIRED_VALUE));
(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
Configuration saslConf = ();
if ((saslConf)) {
("The configuration enables theSASLaccreditation");
(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, (Key.SASL_SECURITY_PROTOCOL, KafkaReaderErrorCode.REQUIRED_VALUE));
(SaslConfigs.SASL_MECHANISM, (Key.SASL_MECHANISM, KafkaReaderErrorCode.REQUIRED_VALUE));
String userName = (Key.SASL_USERNAME, KafkaReaderErrorCode.REQUIRED_VALUE);
String password = (Key.SASL_PASSWORD, KafkaReaderErrorCode.REQUIRED_VALUE);
(SaslConfigs.SASL_JAAS_CONFIG, (" required username=\"%s\" password=\"%s\";", userName, password));
}
consumer = new KafkaConsumer<>(props);
}
@Override
public void startRead(RecordSender recordSender) {
((topic));
int pollTimeoutMs = (Key.POLL_TIMEOUT_MS, 1000);
int retries = (, 5);
if (retries < 0) {
("joinGroupSuccessRetries Configuration error[{}], Reset to default[5]", retries);
retries = 5;
}
/**
* consumer 每次都be新创建,number onepollhour会重新加入消费者组,The joining process will take placeRebalance,(indicates contrast) Rebalance would result in the same Group All the consumers inside can't work
* the reason why poll The process of pulling,even iftopicI don't know if I can pull in the data.,on account of consumer In the process of joining a consumer group
* kafka-clients There is no correspondingAPI、The event mechanism to know consumer 成功加入消费者组的确切hour间
* So add retry
*/
ConsumerRecords<String, String> records = ((pollTimeoutMs));
int i = 0;
if ((records)) {
for (; i < retries; i++) {
records = ((pollTimeoutMs));
("(prefix indicating ordinal number, e.g. first, number two etc) {} retry,Get the number of message records[{}]", i + 1, ());
if (!(records)) {
break;
}
}
}
if (i >= retries) {
("retry {} after the second (meeting, class etc),Still haven't gotten the message.,请确认be否有数据、配置be否正确", retries);
return;
}
transferRecord(recordSender, records);
do {
records = ((pollTimeoutMs));
transferRecord(recordSender, records);
} while (!(records) && () >= maxPollRecords);
}
private void transferRecord(RecordSender recordSender, ConsumerRecords<String, String> records) {
if ((records)) {
return;
}
for (ConsumerRecord<String, String> record : records) {
Record sendRecord = ();
String msgValue = ();
if (().equalsIgnoreCase(readType)) {
transportJsonToRecord(sendRecord, msgValue);
} else if (().equalsIgnoreCase(readType)) {
// readType = text,Handle all as string types
String[] columnValues = (fieldDelimiter);
for (String columnValue : columnValues) {
(new StringColumn(columnValue));
}
}
(sendRecord);
}
();
}
private void convertColumnType(List<String> columnTypeList) {
columnTypes = new ArrayList<>();
for (String columnType : columnTypeList) {
switch (()) {
case "STRING":
();
break;
case "LONG":
();
break;
case "DOUBLE":
();
case "DATE":
();
break;
case "BOOLEAN":
();
break;
case "BYTES":
();
break;
default:
throw (KafkaReaderErrorCode.ILLEGAL_PARAM,
("The configuration file you provided is incorrect,dataxData types not supported[%s]", columnType));
}
}
}
private void transportJsonToRecord(Record sendRecord, String msgValue) {
List<KafkaColumn> kafkaColumns = (msgValue, );
if (() != ()) {
throw (KafkaReaderErrorCode.ILLEGAL_PARAM,
("The configuration file you provided is incorrect,readTypebeJSONhour[%scolumns=%d]together with[jsoncolumns=%d]Mismatch between the number of", Key.COLUMN_TYPE, (), ()));
}
for (int i=0; i<(); i++) {
KafkaColumn kafkaColumn = (i);
switch ((i)) {
case STRING:
(i, new StringColumn(()));
break;
case LONG:
(i, new LongColumn(()));
break;
case DOUBLE:
(i, new DoubleColumn(()));
break;
case DATE:
// 暂只支持hour间戳
(i, new DateColumn((())));
break;
case BOOL:
(i, new BoolColumn(()));
break;
case BYTES:
(i, new BytesColumn(().getBytes(StandardCharsets.UTF_8)));
break;
default:
throw (KafkaReaderErrorCode.ILLEGAL_PARAM,
("The configuration file you provided is incorrect,dataxData types not supported[%s]", (i)));
}
}
}
}
}
Focus on the Task interface implementation
-
init: reads the configuration items and creates the Consumer instance.
-
startWrite: pull data from Topic and write to Channel via RecordSender.
Here are a few details to keep in mind
- Consumer is newly created every time, when pulling data, if the consumer hasn't joined to the specified consumer group, it will join to the consumer group first, the joining process will be Rebalance, and Rebalance will lead to all the consumers in the same consumer group can't work, at this time, even if there is a message in the Topic that can be pulled, it can't be pulled. Even if there is a message in the topic that can be pulled, it cannot be pulled. Therefore, a retry mechanism is introduced to try to ensure that the consumer can pull the message normally when the synchronization task is pulled.
- Once a Consumer pulls a message, it pulls the message in a loop, and if the amount of data pulled at any one time is less than the maxPollRecords, then the Topic has been pulled and the loop is terminated; this is different from the regular use (where the Consumer will always be actively pulling or passively receiving)
- Two read formats are supported:
text
、json
Please see the configuration file description below for details - To ensure the integrity of the data written to the Channel, you need to configure the data type of the columns (DataX data types)
-
destroy:
Closing a Consumer Instance