Spark-NLP

Antonio Ercole De Luca
2 min readDec 10, 2019

Today I’ve played a bit with Spark-NLP, the idea is to use it to extract relationships between Named Entities and their Part of Speech tags in a very scalable way.

First, I installed Spark NLP, Python 3 (Jupyter Notebook), and used Apache Spark 2.4.0.
I solved the problem of configuring it by specifying the JAVA_HOME bash variable:

export JAVA_HOME="/usr/lib/jvm/java-8-openjdk-amd64/"

Using python, I included all the required functions with:

import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.types import StructType, StructField
from sparknlp import DocumentAssembler
from sparknlp.annotator import SentenceDetector, Tokenizer, WordEmbeddingsModel, PerceptronModel, NerCrfModel

I Started the Spark session specifying the spark-nlp jar file:

spark = SparkSession.builder \
.appName("ex1") \
.master("local[*]") \
.config("spark.driver.memory", "8G") \
.config("spark.driver.maxResultSize", "2G") \
.config("spark.jars.packages", "JohnSnowLabs:spark-nlp:2.3.1") \
.config("spark.kryoserializer.buffer.max", "500m") \
.getOrCreate()

I read the textual data-set with:

parts = spark.read.parquet(f"data/part*.snappy.parquet")

Then, I created a SparkML Pipeline using the following annotators:

  • DocumentAssembler
  • Tokenizer
  • WordEmbeddingsModel (Word Embeddings, Glove)
  • PerceptronModel (Part of Speech)
  • NerCrfModel (Named Entity Recognition)
documentAssembler = DocumentAssembler() \
.setInputCol("text") \
.setOutputCol("document")

sentenceDetector = SentenceDetector() \
.setInputCols(["document"]) \
.setOutputCol("sentence")

tokenizer = Tokenizer() \
.setInputCols(["sentence"]) \
.setOutputCol("token")

wordEmbeddingsModel = WordEmbeddingsModel \
.pretrained() \
.setInputCols(["document", "token"]) \
.setOutputCol("word_embeddings")

perceptronModel = PerceptronModel \
.pretrained() \
.setInputCols(["token", "document"]) \
.setOutputCol("pos")

nerCrfModel = NerCrfModel \
.pretrained() \
.setInputCols(["document", "token", "pos", "word_embeddings"]) \
.setOutputCol("ne")

pipeline = Pipeline() \
.setStages([
documentAssembler,
sentenceDetector,
tokenizer,
wordEmbeddingsModel,
perceptronModel,
nerCrfModel,
])

df = pipeline.fit(parts).transform(parts)

I printed the transformed DataFrame showing only the Part of Speech (POS) column and the Named Entity Recognition (NER) column.

tok_ne_pos_df = df \
.select("token.result", "ne.result", "pos.result") \
.toDF("token_result_arr", "ne_result_arr", "pos_result_arr")

tok_ne_pos_df.show()

Now I show the result attribute of NER and POS, where I am trying to explain any relationship between found entities and their part of speech attributes.
In order to get the most efficient code I tried to stick on using only Spark ML and UDFs.

Final filters before printing the results:

# excluding non Entity token
tok_ne_pos_df = tok_ne_pos_df.filter(F.col("ne_result") != 'O')

tok_ne_pos_df.groupBy("ne_result", "pos_result").count().sort(F.col("count").desc()).show()

Here’s the output:

+---------+----------+-----+                                                    
|ne_result|pos_result|count|
+---------+----------+-----+
| I-PER| NNP| 1445|
| I-LOC| NNP| 1022|
| I-ORG| NNP| 962|
| I-MISC| NNP| 390|
| I-MISC| JJ| 157|
| I-ORG| IN| 23|
| I-ORG| JJ| 15|
| I-ORG| CC| 12|
| I-ORG| NNPS| 10|
| I-MISC| NNPS| 10|
| I-PER| NN| 7|
| I-LOC| NNPS| 6|
| I-MISC| CD| 6|
| I-ORG| NN| 5|
| I-ORG| CD| 4|
| I-ORG| DT| 3|
| I-MISC| NN| 3|
| I-ORG| NNS| 3|
| I-ORG| (| 2|
| I-ORG| )| 2|
+---------+----------+-----+
only showing top 20 rows

More documentation here and examples can be found here.

--

--