티스토리 뷰

반응형

특정 테이블 (tc_ymd)를 조회한 결과를 json으로 BATCH_SIZE (100건) 씩 Flow File로 만들어서 다음 프로세서로

전달하는 예제입니다.

지난번에 포스팅 했던  Json 데이터를 Oracle Merge문으로 변환하는 프로세서로 전달하기 위해서 만들어 봤습니다.

Groovy Script로 작성한 이유는 조회한 데이터 일부를 특정 처리 (ex. 암호화)를 한 후에  다음 프로세서에 전달하기

위해서 입니다.

 

* tc_ymd.groovy

import org.apache.nifi.dbcp.DBCPService
import org.apache.nifi.processor.io.StreamCallback
import org.apache.nifi.processor.io.OutputStreamCallback
import org.slf4j.LoggerFactory
import org.apache.commons.io.IOUtils
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.controller.ControllerService
import org.apache.nifi.logging.ComponentLog
import groovy.json.JsonOutput

import javax.crypto.Cipher
import javax.crypto.KeyGenerator
import javax.crypto.SecretKey
import javax.crypto.spec.SecretKeySpec
import java.util.Base64
import java.nio.charset.StandardCharsets
import java.sql.*



def log = log as ComponentLog

//log.error("\n============== DEMO TC_YMD ETL ======================\n")

class AESUtil {

    static String encrypt(String data, SecretKey key) throws Exception {
        Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding")
        cipher.init(Cipher.ENCRYPT_MODE, key)
        byte[] encryptedBytes = cipher.doFinal(data.getBytes("UTF-8"))
        return Base64.getEncoder().encodeToString(encryptedBytes)
    }

    static String decrypt(String encryptedData, SecretKey key) throws Exception {
        Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding")
        cipher.init(Cipher.DECRYPT_MODE, key)
        byte[] decodedBytes = Base64.getDecoder().decode(encryptedData)
        byte[] decryptedBytes = cipher.doFinal(decodedBytes)
        return new String(decryptedBytes, "UTF-8")
    }

    static SecretKey generateKey() throws Exception {
        KeyGenerator keyGen = KeyGenerator.getInstance("AES")
        keyGen.init(128) // AES-128
        return keyGen.generateKey()
    }

}

def BATCH_SIZE = 100
def aes_key = context.getProperty('aes_key').evaluateAttributeExpressions().getValue()
def source_db_name = context.getProperty('source_db_name').evaluateAttributeExpressions().getValue()
def st_dt = context.getProperty('st_dt').evaluateAttributeExpressions().getValue()
def ed_dt = context.getProperty('ed_dt').evaluateAttributeExpressions().getValue()

SecretKey secretKey = new SecretKeySpec(aes_key.getBytes(StandardCharsets.UTF_8), "AES")
   
// SQL 쿼리
def sb = new StringBuilder()

sb.append("SELECT * FROM TC_YMD  \n")
   
def conn
def stmt
def rs

try {

   def lookup = context.controllerServiceLookup
   def dbcpServiceId = lookup.getControllerServiceIdentifiers(ControllerService).find { 
       cs -> lookup.getControllerServiceName(cs) == source_db_name
   }
   def dbcpService = lookup.getControllerService(dbcpServiceId)
   //def dbcpService = context.controllerServiceLookup.getControllerService("43c54e78-0191-1000-9a98-fd1fe94dc5e5")
   //log.error("=== " + ed_dt)

   conn = dbcpService.getConnection()    
   stmt = conn.createStatement()
   stmt.setFetchSize(BATCH_SIZE)
   rs = stmt.executeQuery(sb.toString())

   def metadata = rs.getMetaData()
   def columnCount = metadata.columnCount
      
   // 결과 행을 CSV 파일로 작성
   //?: ''  값이 NULL이면  '' 대임..  기본으로는 null 값이 지정됨.    
   def resultList = []
   int batchCount = 0;
   while (rs.next()) {
      
      def row = [:]
       for (int i = 1; i <= columnCount; i++) {
           def columnName = metadata.getColumnName(i)    
           def columnValue = rs.getString(i) ?: ''
                              
           row[columnName] = columnValue
      }
         
      resultList << row
       
      batchCount++;
      
      if (batchCount >= BATCH_SIZE) {       
          def jsonOutput = JsonOutput.toJson(resultList)
          
          def flowFile = session.create()
          session.write(flowFile, new StreamCallback() {
                           @Override
                           void process(InputStream s_in, OutputStream s_out) throws IOException {
                               s_out.write(jsonOutput.toString().getBytes("UTF-8"))
                           }
                       })
          session.transfer(flowFile, REL_SUCCESS)
          
          resultList.clear()
          batchCount = 0;
      }                  
   }    

   if (!resultList.isEmpty()) {        
     def jsonOutput = JsonOutput.toJson(resultList)
     def flowFile = session.create()    
     session.write(flowFile, new StreamCallback() {
                     @Override
                     void process(InputStream s_in, OutputStream s_out) throws IOException {
                         s_out.write(jsonOutput.toString().getBytes("UTF-8"))
                     }
                 })  
     session.transfer(flowFile, REL_SUCCESS)     

     resultList.clear()     
   }

} catch (Exception e) {
  def flowFile = session.create()
  session.write(flowFile, new StreamCallback() {
                     @Override
                     void process(InputStream s_in, OutputStream s_out) throws IOException {
                         s_out.write(e.getMessage().getBytes("UTF-8"))
                     }
                 }) 
  session.transfer(flowFile, REL_SUCCESS)  
} finally {
  // "?." Safe Navigation Operator 연산자로 객체가 null인지 확인하고, null아니면 메서드를 호출함.
  rs?.close()
  stmt?.close()
  conn?.close()
  
  // 파일 전송 후 세션 커밋
  session.commit()
}

 

 

* 어제 일자를  "yyyyMMdd" 포멧으로 가져오는 EL 표현식

${now():toNumber():minus(86400000):format('yyyyMMdd')}

 

 

반응형