티스토리 뷰
반응형
특정 테이블 (tc_ymd)를 조회한 결과를 json으로 BATCH_SIZE (100건) 씩 Flow File로 만들어서 다음 프로세서로
전달하는 예제입니다.
지난번에 포스팅 했던 Json 데이터를 Oracle Merge문으로 변환하는 프로세서로 전달하기 위해서 만들어 봤습니다.
Groovy Script로 작성한 이유는 조회한 데이터 일부를 특정 처리 (ex. 암호화)를 한 후에 다음 프로세서에 전달하기
위해서 입니다.
* tc_ymd.groovy
import org.apache.nifi.dbcp.DBCPService
import org.apache.nifi.processor.io.StreamCallback
import org.apache.nifi.processor.io.OutputStreamCallback
import org.slf4j.LoggerFactory
import org.apache.commons.io.IOUtils
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.controller.ControllerService
import org.apache.nifi.logging.ComponentLog
import groovy.json.JsonOutput
import javax.crypto.Cipher
import javax.crypto.KeyGenerator
import javax.crypto.SecretKey
import javax.crypto.spec.SecretKeySpec
import java.util.Base64
import java.nio.charset.StandardCharsets
import java.sql.*
def log = log as ComponentLog
//log.error("\n============== DEMO TC_YMD ETL ======================\n")
class AESUtil {
static String encrypt(String data, SecretKey key) throws Exception {
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding")
cipher.init(Cipher.ENCRYPT_MODE, key)
byte[] encryptedBytes = cipher.doFinal(data.getBytes("UTF-8"))
return Base64.getEncoder().encodeToString(encryptedBytes)
}
static String decrypt(String encryptedData, SecretKey key) throws Exception {
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding")
cipher.init(Cipher.DECRYPT_MODE, key)
byte[] decodedBytes = Base64.getDecoder().decode(encryptedData)
byte[] decryptedBytes = cipher.doFinal(decodedBytes)
return new String(decryptedBytes, "UTF-8")
}
static SecretKey generateKey() throws Exception {
KeyGenerator keyGen = KeyGenerator.getInstance("AES")
keyGen.init(128) // AES-128
return keyGen.generateKey()
}
}
def BATCH_SIZE = 100
def aes_key = context.getProperty('aes_key').evaluateAttributeExpressions().getValue()
def source_db_name = context.getProperty('source_db_name').evaluateAttributeExpressions().getValue()
def st_dt = context.getProperty('st_dt').evaluateAttributeExpressions().getValue()
def ed_dt = context.getProperty('ed_dt').evaluateAttributeExpressions().getValue()
SecretKey secretKey = new SecretKeySpec(aes_key.getBytes(StandardCharsets.UTF_8), "AES")
// SQL 쿼리
def sb = new StringBuilder()
sb.append("SELECT * FROM TC_YMD \n")
def conn
def stmt
def rs
try {
def lookup = context.controllerServiceLookup
def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find {
cs -> lookup.getControllerServiceName(cs) == source_db_name
}
def dbcpService = lookup.getControllerService(dbcpServiceId)
//def dbcpService = context.controllerServiceLookup.getControllerService("43c54e78-0191-1000-9a98-fd1fe94dc5e5")
//log.error("=== " + ed_dt)
conn = dbcpService.getConnection()
stmt = conn.createStatement()
stmt.setFetchSize(BATCH_SIZE)
rs = stmt.executeQuery(sb.toString())
def metadata = rs.getMetaData()
def columnCount = metadata.columnCount
// 결과 행을 CSV 파일로 작성
//?: '' 값이 NULL이면 '' 대임.. 기본으로는 null 값이 지정됨.
def resultList = []
int batchCount = 0;
while (rs.next()) {
def row = [:]
for (int i = 1; i <= columnCount; i++) {
def columnName = metadata.getColumnName(i)
def columnValue = rs.getString(i) ?: ''
row[columnName] = columnValue
}
resultList << row
batchCount++;
if (batchCount >= BATCH_SIZE) {
def jsonOutput = JsonOutput.toJson(resultList)
def flowFile = session.create()
session.write(flowFile, new StreamCallback() {
@Override
void process(InputStream s_in, OutputStream s_out) throws IOException {
s_out.write(jsonOutput.toString().getBytes("UTF-8"))
}
})
session.transfer(flowFile, REL_SUCCESS)
resultList.clear()
batchCount = 0;
}
}
if (!resultList.isEmpty()) {
def jsonOutput = JsonOutput.toJson(resultList)
def flowFile = session.create()
session.write(flowFile, new StreamCallback() {
@Override
void process(InputStream s_in, OutputStream s_out) throws IOException {
s_out.write(jsonOutput.toString().getBytes("UTF-8"))
}
})
session.transfer(flowFile, REL_SUCCESS)
resultList.clear()
}
} catch (Exception e) {
def flowFile = session.create()
session.write(flowFile, new StreamCallback() {
@Override
void process(InputStream s_in, OutputStream s_out) throws IOException {
s_out.write(e.getMessage().getBytes("UTF-8"))
}
})
session.transfer(flowFile, REL_SUCCESS)
} finally {
// "?." Safe Navigation Operator 연산자로 객체가 null인지 확인하고, null아니면 메서드를 호출함.
rs?.close()
stmt?.close()
conn?.close()
// 파일 전송 후 세션 커밋
session.commit()
}
* 어제 일자를 "yyyyMMdd" 포멧으로 가져오는 EL 표현식
${now():toNumber():minus(86400000):format('yyyyMMdd')}
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
- Total
- Today
- Yesterday
링크
TAG
- 빈센트 반 고흐
- weka
- GKRS
- Pixel Pals
- 고체 향수
- 증권정보포털
- 로니카 BCS
- 별잉 빛나는 밤
- 솔리드 쿨론
- 배당급
- 별이 빚나는 밤
- 유가바이트디비
- 브리다 정수기
- 파나소닉 비데 DL-EH10KWS
- yugabyteDB
- 오미크론
- 매직 트랙패드2
- 화분벌레
- 남설 팔찌
- 루미큐브 종류
- 르세라핌
- node.js
- JMW 헤어드라이기기
- Life Chair
- 카카오 에드
- 톡토기
- VARIDESK
- 홈 오피스
- 코라나 19
- Sybase IQ
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
글 보관함