且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Apache Nifi / Cassandra-如何将CSV加载到Cassandra表中

更新时间:2023-02-02 21:41:07

TL; DR我有一个NiFi 1.0模板可以在


  1. 读取CSV文件。可以使用GetFile,***是ListFile-> FetchFile。在我的示例中,我使用脚本处理器内联创建流文件,其中包含来自上方的示例数据。


  2. 解析文件名以获取测站和传感器字段。它使用 NiFi表达式语言来获取零件下划线之前(用于工作站)和下划线之后(减去CSV扩展名)的传感器文件名。


  3. 将单个CSV流文件拆分为一个流每行文件。这样做是为了以后我们可以创建单个CQL INSERT语句。


  4. 从每行提取列值。我为此使用了ExtractText和一个正则表达式,如果您有非常复杂的逻辑,则可能需要检出脚本处理器,例如 ExecuteScript


  5. 更改时间戳记。 IIRC,CQL不接受时间戳文字的微秒。您可以尝试解析微秒(***在ExecuteScript处理器中完成),也可以只是重新格式化时间戳。请注意,由于无法解析微秒,因此重新格式化会导致所有小数秒都被截断。


  6. 构建CQL INSERT声明。此时,数据(无论如何在我的模板中)全部在流文件属性中,原始内容可以用CQL INSERT语句替换(这是 PutCassandraQL 期望)。您可以将数据保留在属性中(使用UpdateAttribute正确命名它们,请参阅PutCassandraQL文档)并使用准备好的语句,但是恕我直言,编写显式CQL语句更简单。在撰写本文时,PutCassandraQL尚未缓存PreparedStatements,因此现在这样做的性能实际上较低。


  7. 使用以下命令执行CQL语句PutCassandraQL。


我没有详细介绍属性名称等信息,但是当流程到达ReplaceText时,我具有以下属性:




  • station.name:包含从文件名解析的站点的名称

  • sensor.name:包含从文件名解析的传感器的名称

  • tps:包含更新的时间戳值

  • columns.2 :(大概)包含传感器读数的值



ReplaceText将内容设置为以下内容(使用表达式语言以填充值):

 插入sensor_data(station_id,sensor_id,tps,val)值('$ { station.name}, $ {sensor.name}, $ {tps},$ {column.2 })

希望能有所帮助,如果您有任何疑问或问题,请告诉我。干杯!


I have various CSV files incoming several times per day, storing timeseries data from sensors, which are parts of sensors stations. Each CSV is named after the sensor station and sensor id from which it is coming from, for instance "station1_sensor2.csv". At the moment, data is stored like this :

> cat station1_sensor2.csv
2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;

I have created a Cassandra table to store them and to be able to query them for various identified tasks. The Cassandra table looks like this :

cqlsh > CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};

        CREATE TABLE sensor_data (
        station_id text, // id of the station
        sensor_id text,  // id of the sensor
        tps timestamp,   // timestamp of the measure
        val float,       // measured value
        PRIMARY KEY ((station_id, sensor_id), tps)
        );

I would like to use Apache Nifi to automatically store the data from the CSV into this Cassandra Table, but I can't find example or scheme to do it right. I have tried to use the "PutCassandraQL" processor, but I am struggling without any clear example. So, any help on how to execute a Cassandra put query with Apache Nifi to insert the data into the table would be appreciated !

TL;DR I have a NiFi 1.0 template to accomplish this on Gist and in the NiFi Wiki.

NiFi encourages very modular design, so let's break this down into smaller tasks, I'll describe a possible flow and explain what each processor is used for in terms of your use case:

  1. Read in the CSV file. This can be done with GetFile, or preferably ListFile -> FetchFile. In my example I am using a scripting processor to create a flow file in-line, containing your example data from above. This makes my template portable for others to use.

  2. Parse the filename to get the station and sensor fields. This uses NiFi Expression Language to get the parts of the filename before the underscore (for station) and after the underscore (minus the CSV extension) for sensor.

  3. Split the single CSV flow file into one flow file per line. This is done so we can create individual CQL INSERT statements later.

  4. Extract the column values from each line. I used ExtractText and a regular expression for this, if you have very complicated logic you may want to check out a scripting processor such as ExecuteScript.

  5. Alter the timestamp. IIRC, CQL doesn't accept microseconds on timestamp literals. You can either try to parse out the microseconds (might best be done in an ExecuteScript processor) or just re-format the timestamp. Note that "re-formatting", since the microseconds couldn't be parsed, causes all fractional seconds to be truncated in my example.

  6. Build a CQL INSERT statement. At this point the data (in my template anyway) is all in flow file attributes, the original content can be replaced with a CQL INSERT statement (which is the way PutCassandraQL expects it). You can keep the data in attributes (using UpdateAttribute to name them correctly, see the PutCassandraQL doc) and use a prepared statement, but IMHO it's simpler to write an explicit CQL statement. At the time of this writing, PutCassandraQL is not caching PreparedStatements, so it is actually less performant right now to do things that way.

  7. Execute the CQL statements with PutCassandraQL.

I didn't go into detail as far as the names of my attributes and such, but by the time the flow gets to ReplaceText, I have the following attributes:

  • station.name: Contains the name of the station parsed from the filename
  • sensor.name: Contains the name of the sensor parsed from the filename
  • tps: Contains the updated timestamp value
  • columns.2: Contains (presumably) the value of the sensor reading

The ReplaceText sets the content to the following (using Expression Language to fill in the values):

insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})

Hopefully that helps, please let me know if you have any questions or issues. Cheers!