티스토리 뷰
반응형
ConvertJSONToSQLWithMerge.java
최근에 Oracle 데이터베이스를 목적지로 데이터를 이관하기 위해서 Nifi를 검토하던 중에, Nifi에는 Oracle Merge을 지원하는
프로세서가 없다는 걸 알게되었습니다.
Upsert 형식을 지원하는 프로세서가 있지만, 대상 데이터베이스에는 Oracle이 없습니다.
그래서 구글링 해보니 Github에 커스텀 프로세서 코드가 있어서 사용해 보았습니다.
출처: https://github.com/dawsongzhao1104/nifi
출처: https://mdnice.com/writing/1e7798958878469cbd54e1dfe85126a7
그런데 실제로 테스트를 해보니, 커스컴 프로세서을 인식해서 사용할 수는 있는데, Merge문이 온전하게
생성이 되지 않아서 Github에 있는 소스를 수정해서 적용했습니다.
<< 변경한 소스>>
ConvertJSONToSQLWithMerge.java 에서 generateMerge() 메소드 부분만 수정함.
private String generateMerge(final JsonNode rootNode, final Map<String, String> attributes, final String tableName, final String updateKeys,
final TableSchema schema, final boolean translateFieldNames, final boolean ignoreUnmappedFields, final boolean failUnmappedColumns,
final boolean warningUnmappedColumns, boolean escapeColumnNames, boolean quoteTableName, final String attributePrefix) {
final Set<String> updateKeyNames;
if (updateKeys == null) {
updateKeyNames = schema.getPrimaryKeyColumnNames();
} else {
updateKeyNames = new HashSet<>();
for (final String updateKey : updateKeys.split(",")) {
updateKeyNames.add(updateKey.trim());
}
}
if (updateKeyNames.isEmpty()) {
throw new ProcessException("Target Table: '" + tableName + "' Update Key is empty!");
}
final StringBuilder sqlBuilder = new StringBuilder();
int fieldCount = 0;
sqlBuilder.append("MERGE INTO ");
if (quoteTableName) {
sqlBuilder.append(schema.getQuotedIdentifierString())
.append(tableName)
.append(schema.getQuotedIdentifierString());
} else {
sqlBuilder.append(tableName);
}
sqlBuilder.append(" t1 \n\t USING ( \n\t SELECT ");
// Create a Set of all normalized Update Key names, and ensure that there is a field in the JSON
// for each of the Update Key fields.
StringBuilder sqlOnCause = new StringBuilder("( ");
final Set<String> normalizedFieldNames = getNormalizedColumnNames(rootNode, translateFieldNames);
final Set<String> normalizedUpdateNames = new HashSet<>();
int i = 0;
for (final String normalizedUK : updateKeyNames) {
//final String normalizedUK = normalizeColumnName(uk, translateFieldNames);
i++;
normalizedUpdateNames.add(normalizedUK);
if (!normalizedFieldNames.contains(normalizedUK)) {
String missingColMessage = "JSON does not have a value for the " + (updateKeys == null ? "Primary" : "Update") + "Key column '" + normalizedUK + "'";
if (failUnmappedColumns) {
getLogger().error(missingColMessage);
throw new ProcessException(missingColMessage);
} else if (warningUnmappedColumns) {
getLogger().warn(missingColMessage);
}
}else{
if (i == updateKeyNames.size()) {
sqlOnCause.append(" t1.").append(normalizedUK).append("=").append(" t2.").append(normalizedUK).append(" )");
} else {
sqlOnCause.append(" t1.").append(normalizedUK).append("=").append(" t2.").append(normalizedUK).append(" AND ");
}
}
}
String onCause = sqlOnCause.append("\n\t WHEN MATCHED THEN UPDATE SET ").toString();
// iterate over all of the elements in the JSON, building the SQL statement by adding the column names, as well as
// adding the column value to a "<sql>.args.N.value" attribute and the type of a "<sql>.args.N.type" attribute add the
// columns that we are inserting into
StringBuilder sqlUpdateSet = new StringBuilder();
StringBuilder sqlInsertField = new StringBuilder(" \n\t WHEN NOT MATCHED THEN INSERT( ");
StringBuilder sqlInsertValues = new StringBuilder(" \n\t VALUES(");
// Update Keys Append Insert Statement
for (final String sb : updateKeyNames) {
sqlInsertField.append("\n\t t1.").append(sb).append(",");
sqlInsertValues.append("\n\t t2.").append(sb).append(",");
}
Iterator<String> fieldNames = rootNode.getFieldNames();
while (fieldNames.hasNext()) {
final String fieldName = fieldNames.next();
final String normalizedColName = normalizeColumnName(fieldName, translateFieldNames);
final ColumnDescription desc = schema.getColumns().get(normalizedColName);
if (desc == null) {
if (!ignoreUnmappedFields) {
throw new ProcessException("Cannot map JSON field '" + fieldName + "' to any column in the database.["+schema.toString()+"]");
} else {
continue;
}
}
if (fieldCount++ > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append(" \t ? ");
if(escapeColumnNames){
StringBuilder sb = new StringBuilder();
sb.append(schema.getQuotedIdentifierString())
.append(desc.getColumnName())
.append(schema.getQuotedIdentifierString());
sqlBuilder.append(sb);
if (!normalizedUpdateNames.contains(normalizedColName)) {
sqlUpdateSet.append("\n\t t1.").append(sb).append(" = t2.").append(sb).append(",");
sqlInsertField.append("\n\t t1.").append(sb).append(",");
sqlInsertValues.append("\n\t t2.").append(sb).append(",");
}
} else {
String sb = desc.getColumnName();
sqlBuilder.append(sb);
if (!normalizedUpdateNames.contains(normalizedColName)) {
sqlUpdateSet.append("\n\t t1.").append(sb).append(" = t2.").append(sb).append(",");
sqlInsertField.append("\n\t t1.").append(sb).append(",");
sqlInsertValues.append("\n\t t2.").append(sb).append(",");
}
}
sqlBuilder.append("\n\t");
final int sqlType = desc.getDataType();
attributes.put(attributePrefix + ".args." + fieldCount + ".type", String.valueOf(sqlType));
final Integer colSize = desc.getColumnSize();
final JsonNode fieldNode = rootNode.get(fieldName);
if (!fieldNode.isNull()) {
String fieldValue = createSqlStringValue(fieldNode, colSize, sqlType);
attributes.put(attributePrefix + ".args." + fieldCount + ".value", fieldValue);
}
}
// Set the ON clause based on the Update Key values
sqlBuilder.append(" from dual) t2 \n on ")
.append(onCause)
.append(sqlUpdateSet.deleteCharAt(sqlUpdateSet.length()-1))
.append(sqlInsertField.deleteCharAt(sqlInsertField.length()-1)).append(")")
.append(sqlInsertValues.deleteCharAt(sqlInsertValues.length()-1)).append(")");
return sqlBuilder.toString();
}
소스를 수정하고 pom.xml 에서 현재 설치된 Nifi 버전으로 의존성을 변경해 주고, 빌드를 하면
nifi-cvte-nar-1.0.0.nar 를 얻을 수 있는데, Nifi 설치 디렉토리에 lib 에 넣고, 재기동하면 커스텀 프로세서를 사용할 수 있습니다.
1.26.0 버전으로 생성한 것.
Schema Access Strategy : Use 'Schema Text' Property
Schema Text :
{
"type" : "record"
, "name" : "tb_tmp"
, "namespace" : "tmp.data"
, "fields" : [ {"name" : "CD","type" : [ "string", "null" ]}
, {"name" : "DT","type" : [ "string", "null" ]}
, {"name" : "CDNM","type" : [ "string", "null" ]}
]
}
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- 로니카 BCS
- GKRS
- node.js
- weka
- 오미크론
- 매직 트랙패드2
- 별잉 빛나는 밤
- 카카오 에드
- 유가바이트디비
- VARIDESK
- Pixel Pals
- 르세라핌
- yugabyteDB
- 빈센트 반 고흐
- 화분벌레
- Sybase IQ
- 남설 팔찌
- 루미큐브 종류
- 톡토기
- 홈 오피스
- 고체 향수
- 파나소닉 비데 DL-EH10KWS
- 브리다 정수기
- 배당급
- JMW 헤어드라이기기
- 증권정보포털
- 솔리드 쿨론
- 별이 빚나는 밤
- Life Chair
- 코라나 19
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
글 보관함