Wednesday, May 25, 2016

Validating JSON in NiFi with ExecuteScript

My last post alluded to a Groovy script for ExecuteScript that would use JSON Schema Validator to validate incoming flow files (in JSON format) against a JSON Schema. The purpose of that post was to show how to use Groovy Grape to get the JSON Schema Validator dependencies loaded dynamically by the script (versus downloading the JARs and adding them to the Module Directory property).  For this post I want to show the actual JSON validation script (as I presented it on the Apache NiFi "users" mailing list).

I'll use the schema as it was presented to me on the mailing list:
{
  "type": "object",
  "required": ["name", "tags", "timestamp", "fields"],
  "properties": {
    "name": {"type": "string"},
    "timestamp": {"type": "integer"},
    "tags": {"type": "object", "items": {"type": "string"}},
    "fields": { "type": "object"}
  }
}
This shows that the incoming flow file should contain a JSON object, that it needs to have certain fields (the "required" values), and the types of the values it may/must contain (the "properties" entries).  For this script I'll hard-code this schema, but I'll talk a bit at the end about how this can be done dynamically for a better user experience.

Since the schema itself is JSON, we use org.json.JSONObject and such to read in the schema. Then we use org.everit.json.schema.SchemaLoader to load in a Schema. We can read in the flow file with session.read, passing in a closure cast to InputStreamCallback, see my previous post for details, and call schema.validate(). If the JSON is not valid, validate() will throw a ValidationExecption. If it does, I set a "valid" variable to false, then route to SUCCESS or FAILURE depending on whether the incoming flow file was validated against the schema.  The original script is as follows:
import org.everit.json.schema.Schema
import org.everit.json.schema.loader.SchemaLoader
import org.json.JSONObject
import org.json.JSONTokener

flowFile = session.get()
if(!flowFile) return

jsonSchema = """
{
  "type": "object",
  "required": ["name", "tags", "timestamp", "fields"],
  "properties": {
    "name": {"type": "string"},
    "timestamp": {"type": "integer"},
    "tags": {"type": "object", "items": {"type": "string"}},
    "fields": { "type": "object"}
  }
}
"""

boolean valid = true
session.read(flowFile, { inputStream ->
   jsonInput = org.apache.commons.io.IOUtils.toString(inputStream,
java.nio.charset.StandardCharsets.UTF_8)
   JSONObject rawSchema = new JSONObject(new JSONTokener(new
ByteArrayInputStream(jsonSchema.bytes)))
   Schema schema = SchemaLoader.load(rawSchema)
   try {
      schema.validate(new JSONObject(jsonInput))
    } catch(ve) {
      log.error("Doesn't adhere to schema", ve)
      valid = false
    }
  } as InputStreamCallback)

session.transfer(flowFile, valid ? REL_SUCCESS : REL_FAILURE)

This is a pretty basic script, there are things we could do to improve the capability:
  • Move the schema load out of the session.read() method, since it doesn't require the input
  • Allow the user to specify the schema via a dynamic property
  • Do better exception handling and error message reporting
A worthwhile improvement (that would include all of these) is to turn the script into a proper Processor and put it in an InvokeScriptedProcessor. That way you could have a custom set of relationships, properties, to make it easy for the user to configure and use.

Of course, the best solution is probably to implement it in Java and contribute it to Apache NiFi under the Jira case NIFI-1893 :)

Cheers!

Tuesday, May 24, 2016

Using Groovy @Grab with ExecuteScript

As we've seen in a previous post, it is possible to add dependencies/modules to your ExecuteScript classpath by using the Module Directory property in the processor configuration dialog.  However many third-party libraries have (sometimes lots of) transitive dependencies, and downloading all of them and/or setting up your Module Directory can become a pain.

The Groovy world has a solution for such a thing, using Groovy Grape and specifically the @Grab annotation. This instructs Groovy to download the artifact(s) and their dependencies into the Grape cache. It works much like Maven or Ivy, and in fact it is based on Ivy. For this reason, ExecuteScript (when using the Groovy engine) needs the Apache Ivy JAR.

You may be asking why the Ivy JAR is not included with the scripting NAR that has the InvokeScriptProcessor and ExecuteScript processors in it, or if we can use the Module Directory to point at the Ivy JAR and be on our way.  I haven't been able to verify in the Groovy code, but by experimentation it appears that Grape uses the application classloader, and not the current thread context's class loader, to find the Ivy classes it needs. This means the Ivy JAR needs to be in the "original" NiFi classpath (i.e. in the lib/ folder of NiFi) rather than in the NAR or specified in the Module Directory property.

You can download the Apache Ivy JAR here (I tested with Ivy 2.4.0), and place it in your NiFi distribution under lib/, then restart NiFi.

Now we can get to the script part :)  Let's say we want to write a Groovy script for ExecuteScript to validate an incoming flow file in JSON format against a JSON Schema. There is a good JSON Schema Validator library for this on Github. In this case, the library only has a couple of dependencies, and the only one not available to ExecuteScript is org.json:json, so it's not a real pain to use Module Directory for this, but I wanted to keep it simple to show the general idea.  We use @Grab to get this dependency (and its transitive dependencies), then import the classes we will end up using to do the JSON Schema validation:
@Grab(group='org.everit.json', module='org.everit.json.schema', version='1.3.0')
import org.everit.json.schema.Schema
import org.everit.json.schema.loader.SchemaLoader
import org.json.JSONObject
import org.json.JSONTokener

// Rest of code here
If you don't already have all the artifacts in your Grapes cache, then the very first time this script runs, it will be much slower as it has to download each artifact and its transitive dependencies. Of course, this means your NiFi instance has to be able to get out to the internet, and Grape will need to be configured properly for your machine. In many cases the default settings are fine, but check out the Grape documentation as well as the default grapeConfig.xml settings (note that this is an Ivysettings file). NOTE: The default location for the Grapes cache is under the user's home directory in ~/.groovy/grapes. If you are running NiFi as a certain user, then you will want to ensure the user has permission to write to that directory.

An alternative to the possibly-long-download solution is to pre-install the grapes on the machine manually. If Groovy is installed on the machine itself (versus the one supplied with ExecuteScript) you can do the following (see the doc for more details):
grape install <groupid> <artifactid> [<version>]
to install the grapes into the cache. Then ExecuteScript with @Grab will be faster out of the gate.

Hopefully this post has offered a way to make prototyping new capabilities with ExecuteScript and Groovy a little faster and easier.

Cheers!