kafka sql server 到 elasticserach 數據轉化問題

587
3

kaskade 数据小白Lv1

发表于2023-8-20 18:53

悬赏1

已解决

楼主
我正在做一個測試,由 sql server 用 Kafka 數據傳輸到 elasticsearch。 連接 SQLserver connector 是 "io.debezium.connector.sqlserver.SqlServerConnector", 連接 elasticsearch connector 是 "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",SQL server 那邊是用 cdc_enable 數據表 的做法, 讀出以下測試用的 topic message 。

  "payload": {
    "before": null,
    "after": {
      "id": 2,
      "name": "Man-1692415666",
      "date_birth": 1453,
      "created_at": 1692415666567,
      "updated_at": 1692415666567,
      "created_date": 19588,
      "dept_id": 2,
      "detail": "{\r\n  \"address\": \"XXXX\", \"block\":\"XX\", \"flat\": \"5\"}"
    },


我有四個 字段要做 格式 轉換,date_birth, created_at, updated_at, 要轉換成正常日期格式. detail 要將字符轉成 json 格式. 只是我用來創建連接 到 elasticsearch的設定

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '
{"name": "dst_es_emp_v1",
"config":  {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"connection.url": "http://es:9200",
"tasks.max": "1",
"topics": "v1.DB_Ecommerce.dbo.emp",
"type.name": "_doc",

"key.converter.schemas.enable": "true",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",

"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",

"errors.log.enable": "true",
"errors.log.include.messages": "true",

"behavior.on.null.values": "DELETE",
"behavior.on.malformed.documents": "IGNORE",

"schema.ignore": "true",

"key.ignore":"true",

"transforms": "ExtractFieldAfter,RenameFieldName,ConvertDateBirth",

"transforms.ExtractFieldAfter.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractFieldAfter.field": "after",

"transforms.RenameFieldName.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameFieldName.renames": "name:Fname",

"transforms.ConvertDateBirth.type": "com.github.larryloi.kafka.connect.smt.ConvertDateBirth"

}
}'


但是我都碰倒 以下問題,
{"error_code":400,"message":"Connector configuration is invalid and contains the following 1 error(s):\nInvalid value com.github.larryloi.kafka.connect.smt.ConvertDateBirth for configuration transforms.ConvertDateBirth.type: Class com.github.larryloi.kafka.connect.smt.ConvertDateBirth could not be found.\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

以下是我的轉換日期測試用代碼,路徑總是找不到
package com.github.larryloi.kafka.connect.smt;

import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.common.config.ConfigDef;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.Map;

public class ConvertDateBirth<R extends ConnectRecord<R>> implements Transformation<R> {

    private static final String FIELD_NAME = "date_birth";
    private static final String TARGET_FIELD_NAME = "date_birth_formatted";
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");

    @Override
    public R apply(R record) {
        if (record.value() == null) {
            return record;
        }

        Struct value = (Struct) record.value();
        Schema valueSchema = record.valueSchema();

        Field dateBirthField = valueSchema.field(FIELD_NAME);
        if (dateBirthField == null) {
            return record;
        }

        Integer dateBirthEpochDay = value.getInt32(FIELD_NAME);
        if (dateBirthEpochDay == null) {
            return record;
        }

        LocalDate dateBirth = LocalDate.ofEpochDay(dateBirthEpochDay);
        String dateBirthFormatted = FORMATTER.format(dateBirth);

        value.put(TARGET_FIELD_NAME, dateBirthFormatted);

        return record.newRecord(
                record.topic(),
                record.kafkaPartition(),
                record.keySchema(),
                record.key(),
                valueSchema,
                value,
                record.timestamp()
        );
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef();
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}



我使用 docker container 的,都確定路徑沒有問題。不知道問題出在那。請大家指教一下。看有沒有更好的解決方法。

最近看过此主题的会员

esen_4ZJINNMK7I

ESENSOFT官方技术支持

chestnut

最佳答案
关帖总结:路径错误导致
3个回答

只看楼主

chestnut 数据领袖Lv6

发表于2023-8-21 10:11

只看该作者

取消 关注该作者的回复

沙发

使用的产品版本及相关数据源配置可以截图看看吗?

kaskade 数据小白Lv1

发表于2023-8-21 22:56

只看该作者

取消 关注该作者的回复

板凳

我自己 compile 的時候 路徑攪錯了。謝謝

chestnut 数据领袖Lv6

发表于2023-8-20 18:53

只看该作者

取消 关注该作者的回复

地板

关帖总结:路径错误导致

登录后可回答问题,请登录注册

快速回复 返回顶部 返回列表

小时

全天响应

分钟

快速处理问题

工程师强势助力

明星产品
解决方案
联系合作

400咨询:400-0011-866

技术支持QQ:400-0011-866(工作日9:00-18:00)

产品建议邮箱yixin@esensoft.com

关注我们

扫TA学习更多干货

一对一专家交流

版权所有© 2006-2024 北京亿信华辰软件有限责任公司 京ICP备07017321号 京公网安备11010802016281号