且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何在Jython中使用修改后的数据更新行?

更新时间:2022-12-28 09:00:42

在此转换中,不必要的数据位于每行的末尾,因此使用正则表达式管理转换任务确实非常容易.

In this transformation the unnecessary data located at the end of each line, so it's really easy to manage transform task with regular expression.

^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?

在此处检查正则表达式和解释: https://regex101.com/r/sAB4SA/2

Check the regular expression and explanation here: https://regex101.com/r/sAB4SA/2

文件较大时-***不要将其加载到内存中.以下代码将整个文件加载到内存中:

As soon as you have a large file - better not to load it into the memory. The following code loads whole the file into the memory:

IOUtils.readLines(inputStream, StandardCharsets.UTF_8)

***逐行进行迭代.

因此此代码适用于ExecuteScript具有python(Jython)语言的nifi处理器:

So this code is for ExecuteScript nifi processor with python (Jython) language:

import sys
import re
import traceback
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
from java.lang import Class
from java.io import BufferedReader
from java.io import InputStreamReader
from java.io import OutputStreamWriter


class TransformCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        try:
            writer = OutputStreamWriter(outputStream,"UTF-8")
            reader = BufferedReader(InputStreamReader(inputStream,"UTF-8"))
            line = reader.readLine()
            p = re.compile('^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?')
            while line!= None:
                # print line
                match = p.search(line)
                writer.write( match.group(1) + (match.group(3) if match.group(3)!=None else '') )
                writer.write('\n')
                line = reader.readLine()
            writer.flush()
            writer.close()
            reader.close()
        except:
            traceback.print_exc(file=sys.stdout)
            raise


flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile, TransformCallback())

    # Finish by transferring the FlowFile to an output relationship
    session.transfer(flowFile, REL_SUCCESS)


关于nifi的问题一经提出,以下替代方法似乎更容易


And as soon as question is about nifi, here are alternatives that seems to be easier

与上面相同的代码,但对于nifi ExecuteScript处理器却很普通:

the same code as above but in groovy for nifi ExecuteScript processor:

def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
    // ## transform streams into reader and writer
    rawIn.withReader("UTF-8"){reader->
        rawOut.withWriter("UTF-8"){writer->
            reader.eachLine{line, lineNum->
                if(lineNum>1) { // # skip the first line
                    // ## let use regular expression to transform each line
                    writer << line.replaceAll( /^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?/ , '$1$3' ) << '\n'
                }
            }
        }
    }
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)


ReplaceText处理器

如果正则表达式还可以-nifi中最简单的方法是ReplaceText处理器,它可以执行正则表达式逐行替换.


ReplaceText processor

And if regular expression is ok - the easiest way in nifi is a ReplaceText processor that could do regular expression replace line-by-line.

在这种情况下,您无需编写任何代码,只需构建正则表达式并正确配置处理器即可.

In this case you don't need to write any code, just build the regular expression and configure your processor correctly.