티스토리 뷰

반응형
ConvertJSONToSQLWithMerge.java

최근에 Oracle 데이터베이스를 목적지로 데이터를 이관하기 위해서 Nifi를 검토하던 중에, Nifi에는 Oracle Merge을 지원하는

프로세서가 없다는 걸 알게되었습니다.

Upsert 형식을 지원하는 프로세서가 있지만, 대상 데이터베이스에는 Oracle이 없습니다.

그래서 구글링 해보니 Github에 커스텀 프로세서 코드가 있어서 사용해 보았습니다.

출처: https://github.com/dawsongzhao1104/nifi
출처: https://mdnice.com/writing/1e7798958878469cbd54e1dfe85126a7

 

그런데 실제로 테스트를 해보니, 커스컴 프로세서을 인식해서 사용할 수는 있는데, Merge문이 온전하게

생성이 되지 않아서 Github에 있는 소스를 수정해서 적용했습니다.

 

<< 변경한 소스>>

ConvertJSONToSQLWithMerge.java  에서 generateMerge() 메소드 부분만 수정함.


    private String generateMerge(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
                                  final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
                                  final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName, final String attributePrefix) {

        final Set<String> updateKeyNames;
        if (updateKeys == null) {
            updateKeyNames = schema.getPrimaryKeyColumnNames();
        } else {
            updateKeyNames = new HashSet<>();
            for (final String updateKey : updateKeys.split(",")) {
                updateKeyNames.add(updateKey.trim());
            }
        }

        if (updateKeyNames.isEmpty()) {
            throw new ProcessException("Target Table: '" + tableName + "' Update Key is empty!");
        }

        final StringBuilder sqlBuilder = new StringBuilder();
        int fieldCount = 0;
        sqlBuilder.append("MERGE INTO ");
        if (quoteTableName) {
            sqlBuilder.append(schema.getQuotedIdentifierString())
                    .append(tableName)
                    .append(schema.getQuotedIdentifierString());
        } else {
            sqlBuilder.append(tableName);
        }

        sqlBuilder.append(" t1 \n\t USING ( \n\t SELECT ");


        // Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON
        // for each of the Update Key fields.
        StringBuilder sqlOnCause = new StringBuilder("( ");

        final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
        final Set<String> normalizedUpdateNames = new HashSet<>();

        int i =  0;

        for (final String normalizedUK : updateKeyNames) {
            //final String normalizedUK  = normalizeColumnName(uk, translateFieldNames);
            i++;
            normalizedUpdateNames.add(normalizedUK);

            if (!normalizedFieldNames.contains(normalizedUK)) {
                String missingColMessage = "JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + normalizedUK + "'";
                if (failUnmappedColumns) {
                    getLogger().error(missingColMessage);
                    throw new ProcessException(missingColMessage);
                } else if (warningUnmappedColumns) {
                    getLogger().warn(missingColMessage);
                }
            }else{
                if (i == updateKeyNames.size()) {
                    sqlOnCause.append(" t1.").append(normalizedUK).append("=").append(" t2.").append(normalizedUK).append(" )");
                } else {
                    sqlOnCause.append(" t1.").append(normalizedUK).append("=").append(" t2.").append(normalizedUK).append(" AND ");
                }
            }
        }

        String onCause = sqlOnCause.append("\n\t WHEN MATCHED THEN UPDATE SET ").toString();

        // iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
        // adding the column value to a "<sql>.args.N.value" attribute and the type of a "<sql>.args.N.type" attribute add the
        // columns that we are inserting into
        StringBuilder sqlUpdateSet = new StringBuilder();
        StringBuilder sqlInsertField =  new StringBuilder(" \n\t WHEN NOT MATCHED THEN INSERT( ");
        StringBuilder sqlInsertValues =  new StringBuilder(" \n\t VALUES(");

        // Update Keys Append Insert Statement
        for (final String sb : updateKeyNames) {
            sqlInsertField.append("\n\t t1.").append(sb).append(",");
            sqlInsertValues.append("\n\t t2.").append(sb).append(",");
        }

        Iterator<String> fieldNames = rootNode.getFieldNames();
        while (fieldNames.hasNext()) {
            final String fieldName = fieldNames.next();

            final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
            final ColumnDescription desc = schema.getColumns().get(normalizedColName);
            if (desc == null) {
                if (!ignoreUnmappedFields) {
                    throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database.["+schema.toString()+"]");
                } else {
                    continue;
                }
            }

            if (fieldCount++ > 0) {
                sqlBuilder.append(", ");
            }

            sqlBuilder.append(" \t ? ");
            if(escapeColumnNames){
                StringBuilder sb = new StringBuilder();
                sb.append(schema.getQuotedIdentifierString())
                        .append(desc.getColumnName())
                        .append(schema.getQuotedIdentifierString());

                sqlBuilder.append(sb);
                if (!normalizedUpdateNames.contains(normalizedColName)) {
                    sqlUpdateSet.append("\n\t t1.").append(sb).append(" = t2.").append(sb).append(",");
                    sqlInsertField.append("\n\t t1.").append(sb).append(",");
                    sqlInsertValues.append("\n\t t2.").append(sb).append(",");
                }

            } else {
                String sb = desc.getColumnName();
                sqlBuilder.append(sb);

                if (!normalizedUpdateNames.contains(normalizedColName)) {
                    sqlUpdateSet.append("\n\t t1.").append(sb).append(" = t2.").append(sb).append(",");
                    sqlInsertField.append("\n\t t1.").append(sb).append(",");
                    sqlInsertValues.append("\n\t t2.").append(sb).append(",");
                }
            }

            sqlBuilder.append("\n\t");

            final int sqlType = desc.getDataType();
            attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType));

            final Integer colSize = desc.getColumnSize();

            final JsonNode fieldNode = rootNode.get(fieldName);
            if (!fieldNode.isNull()) {
                String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType);
                attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue);
            }
        }

        // Set the ON  clause based on the Update Key values
        sqlBuilder.append(" from dual) t2 \n on ")
                .append(onCause)
                .append(sqlUpdateSet.deleteCharAt(sqlUpdateSet.length()-1))
                .append(sqlInsertField.deleteCharAt(sqlInsertField.length()-1)).append(")")
                .append(sqlInsertValues.deleteCharAt(sqlInsertValues.length()-1)).append(")");

        return sqlBuilder.toString();
    }

 

 

소스를 수정하고 pom.xml 에서 현재 설치된 Nifi 버전으로 의존성을 변경해 주고, 빌드를 하면

nifi-cvte-nar-1.0.0.nar 를 얻을 수 있는데, Nifi 설치 디렉토리에 lib 에 넣고, 재기동하면 커스텀 프로세서를 사용할 수 있습니다.

1.26.0  버전으로 생성한 것.

 

nifi-cvte-nar-1.0.0.nar
1.93MB

 

 

 

 

 

 

 

 

Schema Access Strategy : Use 'Schema Text' Property

Schema Text : 
{
   "type" : "record"
 , "name" : "tb_tmp"
 , "namespace" : "tmp.data"
 , "fields" : [ {"name" : "CD","type" : [ "string", "null" ]}
                    , {"name" : "DT","type" : [ "string", "null" ]}
                    , {"name" : "CDNM","type" : [ "string", "null" ]}
                    ]
}​

 

 

 

반응형