且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

在 Nifi 中从 Avro Schema 创建一个 Postgresql 表

更新时间:2022-12-10 12:35:46

我可以在nifi v1.5+中推荐ExecuteGroovyScript处理器

I can suggest ExecuteGroovyScript processor in nifi v1.5+

定义新属性 SQL.mydb - 系统将提示您将其值链接到数据库 (DBCPConnectionPool)

define new property SQL.mydb - you will be prompted to link its value to a database (DBCPConnectionPool)

选择要建表的数据库

并使用此脚本(假设 avro 架构在流文件内容中)

and use this script (assume avro schema is in the flow file content)

import groovy.json.JsonSlurper

def ff = session.get()
if(!ff)return

//parse avro schema from flow file content
def schema = ff.read().withReader("UTF-8"){ new JsonSlurper().parse(it) }

//define type mapping
def typeMap = [
    "string"            : "varchar(255)",
    "long"              : "numeric(10)",
    [ "null", "string" ]: "varchar(255)",
    [ "null", "long" ]  : "numeric(10)",
]

assert schema.name && schema.name=~/^\w.*/

//build create table statement
def createTable = "create table ${schema.name} (" +
    schema.fields.collect{ "\n  ${it.name.padRight(39)} ${typeMap[it.type]}" }.join(',') +
    "\n)"

//execute statement through the custom defined property
//SQL.mydb references http://docs.groovy-lang.org/2.4.10/html/api/groovy/sql/Sql.html object
SQL.mydb.execute(createTable as String) //important to cast to String

//transfer flow file to success
REL_SUCCESS << ff