Elasticsearch Analyze() Not Compatible With Spark In Python?
Solution 1:
Essentially the Elasticsearch client is not serializable. So what you need to do is create an instance of the client for each partition, and process them:
def get_tokens(part):
es = Elasticsearch()
yield [es.indices.analyze(text=x)['tokens'][0] for x in part]
rdd = sc.parallelize([['the quick brown fox'], ['brown quick dog']], numSlices=2)
rdd.mapPartitions(lambda p: get_tokens(p)).collect()
Should give the following result:
Out[17]:
[[{u'end_offset': 3,
u'position': 1,
u'start_offset': 0,
u'token': u'the',
u'type': u'<ALPHANUM>'}],
[{u'end_offset': 5,
u'position': 1,
u'start_offset': 0,
u'token': u'brown',
u'type': u'<ALPHANUM>'}]]
Note that for large data sets, this is going to be very inefficient as it involves a REST call to ES for each element in the dataset.
Post a Comment for "Elasticsearch Analyze() Not Compatible With Spark In Python?"