且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何在 Jython 中使用修改后的数据更新行?

更新时间:2022-12-28 08:56:42

在这个转换中,不必要的数据位于每一行的末尾,所以用正则表达式管理转换任务真的很容易.

In this transformation the unnecessary data located at the end of each line, so it's really easy to manage transform task with regular expression.

^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?

检查这里的正则表达式和解释:https://regex101.com/r/sAB4SA/2

Check the regular expression and explanation here: https://regex101.com/r/sAB4SA/2

一旦你有一个大文件 - ***不要将它加载到内存中.以下代码将整个文件加载到内存中:

As soon as you have a large file - better not to load it into the memory. The following code loads whole the file into the memory:

IOUtils.readLines(inputStream, StandardCharsets.UTF_8)

***逐行迭代.

所以这段代码是用于ExecuteScript nifi 处理器和python (Jython) 语言的:

So this code is for ExecuteScript nifi processor with python (Jython) language:

import sys
import re
import traceback
from org.apache.commons.io import IOUtils
from org.apache.nifi.processor.io import StreamCallback
from org.python.core.util import StringUtil
from java.lang import Class
from java.io import BufferedReader
from java.io import InputStreamReader
from java.io import OutputStreamWriter


class TransformCallback(StreamCallback):
    def __init__(self):
        pass

    def process(self, inputStream, outputStream):
        try:
            writer = OutputStreamWriter(outputStream,"UTF-8")
            reader = BufferedReader(InputStreamReader(inputStream,"UTF-8"))
            line = reader.readLine()
            p = re.compile('^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?')
            while line!= None:
                # print line
                match = p.search(line)
                writer.write( match.group(1) + (match.group(3) if match.group(3)!=None else '') )
                writer.write('\n')
                line = reader.readLine()
            writer.flush()
            writer.close()
            reader.close()
        except:
            traceback.print_exc(file=sys.stdout)
            raise


flowFile = session.get()
if flowFile != None:
    flowFile = session.write(flowFile, TransformCallback())

    # Finish by transferring the FlowFile to an output relationship
    session.transfer(flowFile, REL_SUCCESS)

只要有关于 nifi 的问题,这里有一些似乎更容易的替代方案


And as soon as question is about nifi, here are alternatives that seems to be easier

与上面相同的代码,但在 nifi ExecuteScript 处理器中使用 groovy:

the same code as above but in groovy for nifi ExecuteScript processor:

def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
    // ## transform streams into reader and writer
    rawIn.withReader("UTF-8"){reader->
        rawOut.withWriter("UTF-8"){writer->
            reader.eachLine{line, lineNum->
                if(lineNum>1) { // # skip the first line
                    // ## let use regular expression to transform each line
                    writer << line.replaceAll( /^(.*:\d\d)((\.\d{1,3})(\d*))?(-\d\d)?/ , '$1$3' ) << '\n'
                }
            }
        }
    }
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)

替换文本处理器

如果正则表达式没问题 - nifi 中最简单的方法是使用 ReplaceText 处理器,它可以逐行执行正则表达式替换.


ReplaceText processor

And if regular expression is ok - the easiest way in nifi is a ReplaceText processor that could do regular expression replace line-by-line.

在这种情况下,您无需编写任何代码,只需构建正则表达式并正确配置处理器即可.

In this case you don't need to write any code, just build the regular expression and configure your processor correctly.