Friday, October 12, 2012

CSV Splitter or Filter with Talend Java

The Data Team was in need of an unnexistent Talend behavior.

Something that we could call the CSVSplitter or the CSVFilter, a component that would take a CSV file and would output that same row only if a lookup column matches certain content.

Of course you might think at first of a combination of tFileInputDelimited and a tFilterRow but that would not work if you do not know the schema. We need some schema less or dynamic schema component for this use case.

Jump directly to learn how to get this done from a component or read below to understand how you can do this from tJavaFlex and later build your own component with similar code.

Here is a project that shows this proof of concept. It parses an inputFile using a delimiter, and outputs only the lines where the lookupColumn has a specific lookupValue (four parameters). Below is a screenshot of the POC. I needed to use the tFilterRow because tJavaFlex will output blank lines when there is no output from code:

This approach has a big advantage. Instead of having to create a job, subjob or project per schema to parse a unique single job can take care of all your CSV splitting or filtering needs.

You can test the project with the below file:
person| city
Paul| Miami
John| Boston
Mathew| San Francisco
Craig| Miami
Change the lookupColumn between person and city and change the lookupValue to see how it filters the rows. Change the delimiter to test that as well.

Below is the code for the import, begin, main and end methods with the addition of a new requirement: Start parsing the file at a given row (starting at 0) where the header is expected to be. Import:
import com.csvreader.CsvReader;
import java.io.ByteArrayInputStream;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.FileReader;
import java.io.InputStream;
import java.io.InputStreamReader;
Begin:
BufferedReader reader = new BufferedReader(new FileReader(context.inputFile));
ByteArrayOutputStream out = new ByteArrayOutputStream();
int rowNumber = 0;
String line = null;
while ((line = reader.readLine()) != null) {
  if(rowNumber >= context.headerRowNumber) {
    out.write((line + "\n").getBytes());
  }
  rowNumber++;
}
        
InputStream is = new ByteArrayInputStream(out.toByteArray());
CsvReader csvReader = new CsvReader(new InputStreamReader(is));
char delimiter = context.delimiter.charAt(0);
char textQualifier = csvReader.getTextQualifier();
csvReader.setDelimiter(delimiter);
csvReader.readHeaders();

String[] headers = csvReader.getHeaders();
StringBuffer sb = new StringBuffer();
for(int i = 0; i < headers.length; i++ ) {
  String header = headers[i];
  sb.append(textQualifier + header + textQualifier);
  if( i != headers.length - 1 ) {
    sb.append(delimiter);
  }
}
//System.out.println(sb);
int i = 0;
while (csvReader.readRecord()) {
Main:
String lookupValue = csvReader.get(context.lookupColumn);
//System.out.println("'" + context.lookupColumn + "'|'" + context.lookupValue + "'|'" + lookupValue + "'");
if(lookupValue.equals(context.lookupValue)) {
  //System.out.println(csvReader.getRawRecord());
  if( i == 0 ) {
    row2.line = sb.toString() + "\n" + csvReader.getRawRecord();
  } else {
    row2.line = csvReader.getRawRecord();
  }
}
i++;
End:
}
csvReader.close(); 
out.close();
reader.close();

Putting it all in a Talend Component

I have built a Talend component that encapsulates the logic here presented. It is included in a github project which contains a tutorial on how to build Talend custom components.

To use it you just need to configure the component to parse a file like the above. Look at the picture below for a usage example:

No comments:

Followers