-
kaskade 数据小白Lv1
发表于2023-8-20 18:53
悬赏1
已解决
楼主
我正在做一個測試,由 sql server 用 Kafka 數據傳輸到 elasticsearch。 連接 SQLserver connector 是 "io.debezium.connector.sqlserver.SqlServerConnector", 連接 elasticsearch connector 是 "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",SQL server 那邊是用 cdc_enable 數據表 的做法, 讀出以下測試用的 topic message 。
"payload": {
"before": null,
"after": {
"id": 2,
"name": "Man-1692415666",
"date_birth": 1453,
"created_at": 1692415666567,
"updated_at": 1692415666567,
"created_date": 19588,
"dept_id": 2,
"detail": "{\r\n \"address\": \"XXXX\", \"block\":\"XX\", \"flat\": \"5\"}"
},
我有四個 字段要做 格式 轉換,date_birth, created_at, updated_at, 要轉換成正常日期格式. detail 要將字符轉成 json 格式. 只是我用來創建連接 到 elasticsearch的設定
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{"name": "dst_es_emp_v1",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://es:9200",
"tasks.max": "1",
"topics": "v1.DB_Ecommerce.dbo.emp",
"type.name": "_doc",
"key.converter.schemas.enable": "true",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"behavior.on.null.values": "DELETE",
"behavior.on.malformed.documents": "IGNORE",
"schema.ignore": "true",
"key.ignore":"true",
"transforms": "ExtractFieldAfter,RenameFieldName,ConvertDateBirth",
"transforms.ExtractFieldAfter.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractFieldAfter.field": "after",
"transforms.RenameFieldName.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameFieldName.renames": "name:Fname",
"transforms.ConvertDateBirth.type": "com.github.larryloi.kafka.connect.smt.ConvertDateBirth"
}
}'
但是我都碰倒 以下問題,
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value com.github.larryloi.kafka.connect.smt.ConvertDateBirth for configuration transforms.ConvertDateBirth.type: Class com.github.larryloi.kafka.connect.smt.ConvertDateBirth could not be found.\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
以下是我的轉換日期測試用代碼,路徑總是找不到
package com.github.larryloi.kafka.connect.smt;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.common.config.ConfigDef;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Map;
public class ConvertDateBirth<R extends ConnectRecord<R>> implements Transformation<R> {
private static final String FIELD_NAME = "date_birth";
private static final String TARGET_FIELD_NAME = "date_birth_formatted";
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
@Override
public R apply(R record) {
if (record.value() == null) {
return record;
}
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
Field dateBirthField = valueSchema.field(FIELD_NAME);
if (dateBirthField == null) {
return record;
}
Integer dateBirthEpochDay = value.getInt32(FIELD_NAME);
if (dateBirthEpochDay == null) {
return record;
}
LocalDate dateBirth = LocalDate.ofEpochDay(dateBirthEpochDay);
String dateBirthFormatted = FORMATTER.format(dateBirth);
value.put(TARGET_FIELD_NAME, dateBirthFormatted);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
valueSchema,
value,
record.timestamp()
);
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
我使用 docker container 的,都確定路徑沒有問題。不知道問題出在那。請大家指教一下。看有沒有更好的解決方法。
"payload": {
"before": null,
"after": {
"id": 2,
"name": "Man-1692415666",
"date_birth": 1453,
"created_at": 1692415666567,
"updated_at": 1692415666567,
"created_date": 19588,
"dept_id": 2,
"detail": "{\r\n \"address\": \"XXXX\", \"block\":\"XX\", \"flat\": \"5\"}"
},
我有四個 字段要做 格式 轉換,date_birth, created_at, updated_at, 要轉換成正常日期格式. detail 要將字符轉成 json 格式. 只是我用來創建連接 到 elasticsearch的設定
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{"name": "dst_es_emp_v1",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://es:9200",
"tasks.max": "1",
"topics": "v1.DB_Ecommerce.dbo.emp",
"type.name": "_doc",
"key.converter.schemas.enable": "true",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"behavior.on.null.values": "DELETE",
"behavior.on.malformed.documents": "IGNORE",
"schema.ignore": "true",
"key.ignore":"true",
"transforms": "ExtractFieldAfter,RenameFieldName,ConvertDateBirth",
"transforms.ExtractFieldAfter.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractFieldAfter.field": "after",
"transforms.RenameFieldName.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameFieldName.renames": "name:Fname",
"transforms.ConvertDateBirth.type": "com.github.larryloi.kafka.connect.smt.ConvertDateBirth"
}
}'
但是我都碰倒 以下問題,
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value com.github.larryloi.kafka.connect.smt.ConvertDateBirth for configuration transforms.ConvertDateBirth.type: Class com.github.larryloi.kafka.connect.smt.ConvertDateBirth could not be found.\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}
以下是我的轉換日期測試用代碼,路徑總是找不到
package com.github.larryloi.kafka.connect.smt;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.common.config.ConfigDef;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Map;
public class ConvertDateBirth<R extends ConnectRecord<R>> implements Transformation<R> {
private static final String FIELD_NAME = "date_birth";
private static final String TARGET_FIELD_NAME = "date_birth_formatted";
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
@Override
public R apply(R record) {
if (record.value() == null) {
return record;
}
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
Field dateBirthField = valueSchema.field(FIELD_NAME);
if (dateBirthField == null) {
return record;
}
Integer dateBirthEpochDay = value.getInt32(FIELD_NAME);
if (dateBirthEpochDay == null) {
return record;
}
LocalDate dateBirth = LocalDate.ofEpochDay(dateBirthEpochDay);
String dateBirthFormatted = FORMATTER.format(dateBirth);
value.put(TARGET_FIELD_NAME, dateBirthFormatted);
return record.newRecord(
record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
valueSchema,
value,
record.timestamp()
);
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
我使用 docker container 的,都確定路徑沒有問題。不知道問題出在那。請大家指教一下。看有沒有更好的解決方法。