Making Peace with Logstash Part 2- Parsing a CSV

3

February 23, 2018 by Mike Hillwig

In my last post, I went over the basics of importing data from the US government about flight performance data connected to Logstash. I was able to connect to the data. That was simple enough. Lots of data comes in CSV format. Whether you’re getting it from a log file, from Filebeat, or through a third party-provider, CSV tends to be the lowest common denominator. Someday, JSON will rule the world and XML will be banished, but until then, we live with CSV.

For this exercise, I added one section. It was a FILTER section with only one part, and that’s CSV. This is brilliantly simple.

Note that I changed the OUTPUT section slightly to include more debugging info. This is really helpful when developing or troubleshooting. My new configuration looks like this.

input {
file {
path => ["/Users/mikehillwig/elastic/flights/*.csv"]
sincedb_path => "/dev/null"
start_position => "beginning"
}
}

filter {
csv {

columns => [
"Year", "Quarter", "Month", "DayofMonth", "DayOfWeek", "FlightDate", "UniqueCarrier", "AirlineID", "Carrier", "TailNum", "FlightNum", "OriginAirportID",
"OriginAirportSeqID", "OriginCityMarketID", "Origin", "OriginCityName", "OriginState", "OriginStateFips", "OriginStateName", "OriginWac",
"DestAirportID","DestAirportSeqID","DestCityMarketID","Dest","DestCityName","DestState","DestStateFips","DestStateName","DestWac",
"CRSDepTime","DepTime","DepDelay","DepDelayMinutes","DepDel15","DepartureDelayGroups","DepTimeBlk","TaxiOut","WheelsOff","WheelsOn","TaxiIn",
"CRSArrTime","ArrTime","ArrDelay","ArrDelayMinutes","ArrDel15","ArrivalDelayGroups","ArrTimeBlk","Cancelled","CancellationCode","Diverted",
"CRSElapsedTime","ActualElapsedTime","AirTime","Flights","Distance","DistanceGroup","CarrierDelay","WeatherDelay","NASDelay","SecurityDelay",
"LateAircraftDelay","FirstDepTime","TotalAddGTime","LongestAddGTime","DivAirportLandings","DivReachedDest","DivActualElapsedTime","DivArrDelay",
"DivDistance","Div1Airport","Div1AirportID","Div1AirportSeqID","Div1WheelsOn","Div1TotalGTime","Div1LongestGTime","Div1WheelsOff","Div1TailNum",
"Div2Airport","Div2AirportID","Div2AirportSeqID","Div2WheelsOn","Div2TotalGTime","Div2LongestGTime","Div2WheelsOff","Div2TailNum","Div3Airport",
"Div3AirportID","Div3AirportSeqID","Div3WheelsOn","Div3TotalGTime","Div3LongestGTime","Div3WheelsOff","Div3TailNum","Div4Airport","Div4AirportID",
"Div4AirportSeqID","Div4WheelsOn","Div4TotalGTime","Div4LongestGTime","Div4WheelsOff","Div4TailNum","Div5Airport","Div5AirportID","Div5AirportSeqID",
"Div5WheelsOn","Div5TotalGTime","Div5LongestGTime","Div5WheelsOff","Div5TailNum"]
separator => ","
}
}

output {
stdout { codec => rubydebug }
}

Wow. That was easy. The hardest part of this was to get the first line of the CSV and turn it into column headings. Look at this output!

Mikes-MacBook-Pro:logstash-6.1.2 mikehillwig$ bin/logstash -f ../flights/flights.conf
Sending Logstash's logs to /Users/mikehillwig/elastic/logstash-6.1.2/logs which is now configured via log4j2.properties
[2018-02-19T10:32:58,119][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/Users/mikehillwig/elastic/logstash-6.1.2/modules/netflow/configuration"}
[2018-02-19T10:32:58,141][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/Users/mikehillwig/elastic/logstash-6.1.2/modules/fb_apache/configuration"}
[2018-02-19T10:32:58,363][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-02-19T10:32:58,864][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.1.2"}
[2018-02-19T10:32:59,219][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-02-19T10:33:04,438][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500, :thread=>"#"}
[2018-02-19T10:33:04,750][INFO ][logstash.pipeline ] Pipeline started {"pipeline.id"=>"main"}
[2018-02-19T10:33:04,872][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
{
"Distance" => "817.00",
"DivDistance" => nil,
"Div4AirportID" => nil,
"DepTimeBlk" => "1500-1559",
"Div5TotalGTime" => nil,
"ArrTimeBlk" => "1700-1759",
"Origin" => "PDX",
"DistanceGroup" => "4",
"Div3Airport" => "",
"DestCityName" => "Burbank, CA",
"@timestamp" => 2018-02-19T15:33:05.190Z,
"TaxiIn" => "4.00",
"Div4WheelsOff" => "",
"DepTime" => "1535",
"ActualElapsedTime" => "134.00",
"SecurityDelay" => nil,
"Div5LongestGTime" => nil,
"Div1WheelsOn" => "",
"Div5WheelsOff" => "",
"UniqueCarrier" => "AS",
"CancellationCode" => "",
"Div5TailNum" => "",
"DestState" => "CA",
"DestAirportSeqID" => "1080003",
"DepDelayMinutes" => "0.00",
"Div3TotalGTime" => nil,
"Div4AirportSeqID" => nil,
"Div4TailNum" => "",
"Div3AirportID" => nil,
"DestStateFips" => "06",
"Div2AirportSeqID" => nil,
"NASDelay" => nil,
"Quarter" => "4",
"WheelsOn" => "1745",
"path" => "/Users/mikehillwig/elastic/flights/On_Time_On_Time_Performance_2017_12.csv",
"CRSElapsedTime" => "130.00",
"Div2LongestGTime" => nil,
"Div2TailNum" => "",
"DestWac" => "91",
"Div3LongestGTime" => nil,
"DestAirportID" => "10800",
"DivAirportLandings" => "0",
"Div1LongestGTime" => nil,
"LongestAddGTime" => nil,
"TailNum" => "N613AS",
"host" => "Mikes-MacBook-Pro.local",
"Div4Airport" => "",
"Div1WheelsOff" => "",
"Month" => "12",
"Div2AirportID" => nil,
"DayofMonth" => "2",
"OriginStateName" => "Oregon",
"FlightNum" => "490",
"Div4TotalGTime" => nil,
"OriginAirportID" => "14057",
"DepDelay" => "-3.00",
"Div5AirportID" => nil,
"OriginCityMarketID" => "34057",
"ArrDelayMinutes" => "1.00",
"WeatherDelay" => nil,
"OriginStateFips" => "41",
"DepartureDelayGroups" => "-1",
"CarrierDelay" => nil,
"TotalAddGTime" => nil,
"Div1AirportID" => nil,
"Cancelled" => "0.00",
"Div3TailNum" => "",
"TaxiOut" => "12.00",
"message" => "2017,4,12,2,6,2017-12-02,\"AS\",19930,\"AS\",\"N613AS\",\"490\",14057,1405702,34057,\"PDX\",\"Portland, OR\",\"OR\",\"41\",\"Oregon\",92,10800,1080003,32575,\"BUR\",\"Burbank, CA\",\"CA\",\"06\",\"California\",91,\"1538\",\"1535\",-3.00,0.00,0.00,-1,\"1500-1559\",12.00,\"1547\",\"1745\",4.00,\"1748\",\"1749\",1.00,1.00,0.00,0,\"1700-1759\",0.00,\"\",0.00,130.00,134.00,118.00,1.00,817.00,4,,,,,,\"\",,,0,,,,,\"\",,,\"\",,,\"\",\"\",\"\",,,\"\",,,\"\",\"\",\"\",,,\"\",,,\"\",\"\",\"\",,,\"\",,,\"\",\"\",\"\",,,\"\",,,\"\",\"\",",
"Diverted" => "0.00",
"Div1Airport" => "",
"Div3WheelsOn" => "",
"OriginCityName" => "Portland, OR",
"column110" => nil,
"FirstDepTime" => "",
"FlightDate" => "2017-12-02",
"Div5AirportSeqID" => nil,
"Div2WheelsOn" => "",
"Div3AirportSeqID" => nil,
"Flights" => "1.00",
"DivArrDelay" => nil,
"LateAircraftDelay" => nil,
"Div2Airport" => "",
"AirlineID" => "19930",
"Div4LongestGTime" => nil,
"DestStateName" => "California",
"Div4WheelsOn" => "",
"ArrTime" => "1749",
"Div1TailNum" => "",
"CRSDepTime" => "1538",
"WheelsOff" => "1547",
"Div1TotalGTime" => nil,
"Year" => "2017",
"DestCityMarketID" => "32575",
"Dest" => "BUR",
"ArrDelay" => "1.00",
"OriginWac" => "92",
"Div3WheelsOff" => "",
"OriginAirportSeqID" => "1405702",
"ArrDel15" => "0.00",
"AirTime" => "118.00",
"Div2TotalGTime" => nil,
"CRSArrTime" => "1748",
"DivReachedDest" => nil,
"DivActualElapsedTime" => nil,
"Div5Airport" => "",
"Carrier" => "AS",
"Div5WheelsOn" => "",
"ArrivalDelayGroups" => "0",
"OriginState" => "OR",
"@version" => "1",
"Div1AirportSeqID" => nil,
"Div2WheelsOff" => "",
"DepDel15" => "0.00",
"DayOfWeek" => "6"
}

As I was writing this, I thought I’d play with the autodetect_column_names setting. Unfortunately, it wasn’t an option for this particular file. Logstash threw an error :exception=>java.lang.ArrayIndexOutOfBoundsException: -1 which leads me to guess that my file is too wide for this setting. This file is staggeringly wide with 75 columns. If you have a more narrow file, this could be a really cool option. If your file format changes by someone adding or removing a column from the CSV, it’ll be a lot easier to maintain. Alas, it’s not an option in this situation.

[2018-02-19T10:36:38,386][WARN ][logstash.filters.csv ] Error parsing csv {:field=>"message", :source=>"\"Year\",\"Quarter\",\"Month\",\"DayofMonth\",\"DayOfWeek\",\"FlightDate\",\"UniqueCarrier\",\"AirlineID\",\"Carrier\",\"TailNum\",\"FlightNum\",\"OriginAirportID\",\"OriginAirportSeqID\",\"OriginCityMarketID\",\"Origin\",\"OriginCityName\",\"OriginState\",\"OriginStateFips\",\"OriginStateName\",\"OriginWac\",\"DestAirportID\",\"DestAirportSeqID\",\"DestCityMarketID\",\"Dest\",\"DestCityName\",\"DestState\",\"DestStateFips\",\"DestStateName\",\"DestWac\",\"CRSDepTime\",\"DepTime\",\"DepDelay\",\"DepDelayMinutes\",\"DepDel15\",\"DepartureDelayGroups\",\"DepTimeBlk\",\"TaxiOut\",\"WheelsOff\",\"WheelsOn\",\"TaxiIn\",\"CRSArrTime\",\"ArrTime\",\"ArrDelay\",\"ArrDelayMinutes\",\"ArrDel15\",\"ArrivalDelayGroups\",\"ArrTimeBlk\",\"Cancelled\",\"CancellationCode\",\"Diverted\",\"CRSElapsedTime\",\"ActualElapsedTime\",\"AirTime\",\"Flights\",\"Distance\",\"DistanceGroup\",\"CarrierDelay\",\"WeatherDelay\",\"NASDelay\",\"SecurityDelay\",\"LateAircraftDelay\",\"FirstDepTime\",\"TotalAddGTime\",\"LongestAddGTime\",\"DivAirportLandings\",\"DivReachedDest\",\"DivActualElapsedTime\",\"DivArrDelay\",\"DivDistance\",\"Div1Airport\",\"Div1AirportID\",\"Div1AirportSeqID\",\"Div1WheelsOn\",\"Div1TotalGTime\",\"Div1LongestGTime\",\"Div1WheelsOff\",\"Div1TailNum\",\"Div2Airport\",\"Div2AirportID\",\"Div2AirportSeqID\",\"Div2WheelsOn\",\"Div2TotalGTime\",\"Div2LongestGTime\",\"Div2WheelsOff\",\"Div2TailNum\",\"Div3Airport\",\"Div3AirportID\",\"Div3AirportSeqID\",\"Div3WheelsOn\",\"Div3TotalGTime\",\"Div3LongestGTime\",\"Div3WheelsOff\",\"Div3TailNum\",\"Div4Airport\",\"Div4AirportID\",\"Div4AirportSeqID\",\"Div4WheelsOn\",\"Div4TotalGTime\",\"Div4LongestGTime\",\"Div4WheelsOff\",\"Div4TailNum\",\"Div5Airport\",\"Div5AirportID\",\"Div5AirportSeqID\",\"Div5WheelsOn\",\"Div5TotalGTime\",\"Div5LongestGTime\",\"Div5WheelsOff\",\"Div5TailNum\",", :exception=>java.lang.ArrayIndexOutOfBoundsException: -1}
[2018-02-19T10:36:38,386][WARN ][logstash.filters.csv ] Error parsing csv {:field=>"message", :source=>"2017,4,12,1,5,2017-12-01,\"AS\",19930,\"AS\",\"N615AS\",\"951\",14893,1489302,33192,\"SMF\",\"Sacramento, CA\",\"CA\",\"06\",\"California\",91,14057,1405702,34057,\"PDX\",\"Portland, OR\",\"OR\",\"41\",\"Oregon\",92,\"0920\",\"0905\",-15.00,0.00,0.00,-1,\"0900-0959\",13.00,\"0918\",\"1036\",3.00,\"1105\",\"1039\",-26.00,0.00,0.00,-2,\"1100-1159\",0.00,\"\",0.00,105.00,94.00,78.00,1.00,479.00,2,,,,,,\"\",,,0,,,,,\"\",,,\"\",,,\"\",\"\",\"\",,,\"\",,,\"\",\"\",\"\",,,\"\",,,\"\",\"\",\"\",,,\"\",,,\"\",\"\",\"\",,,\"\",,,\"\",\"\",", :exception=>java.lang.ArrayIndexOutOfBoundsException: -1}

Even without autodetecting the column names, I’m certainly on the path to getting this right. My challenge here is that I have a ton of data that I know I’ll never use. Next time, we’ll trim some fat.