Scala elasticsearch

Scala elasticsearch DEFAULT

Scala Examples for org.elasticsearch.common.settings.Settings

The following examples show how to use org.elasticsearch.common.settings.Settings. These examples are extracted from open source projects. You can vote up the ones you like or vote down the ones you don't like, and go to the original project or source file by following the links above each example.

Example 1

import java.nio.file.Files import org.apache.commons.io.FileUtils import org.apache.spark.SparkContext import org.elasticsearch.common.settings.Settings import org.elasticsearch.node.NodeBuilder import org.apache.spark.elasticsearch._ object Tryout { def main(args: Array[String]): Unit = { val sparkContext = new SparkContext("local[2]", "SparkES") val dataDir = Files.createTempDirectory("elasticsearch").toFile dataDir.deleteOnExit() val settings = Settings.settingsBuilder() .put("path.home", dataDir.getAbsolutePath) .put("path.logs", s"${dataDir.getAbsolutePath}/logs") .put("path.data", s"${dataDir.getAbsolutePath}/data") .put("index.store.fs.memory.enabled", true) .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) .put("cluster.name", "SparkES") .build() val node = NodeBuilder.nodeBuilder().settings(settings).node() val client = node.client() sparkContext .parallelize(Seq( ESDocument(ESMetadata("2", "type1", "index1"), """{"name": "John Smith"}"""), ESDocument(ESMetadata("1", "type1", "index1"), """{"name": "Sergey Shumov"}""") ), 2) .saveToES(Seq("localhost"), "SparkES") client.admin().cluster().prepareHealth("index1").setWaitForGreenStatus().get() val documents = sparkContext.esRDD( Seq("localhost"), "SparkES", Seq("index1"), Seq("type1"), "name:sergey") println(documents.count()) documents.foreach(println) sparkContext.stop() client.close() node.close() FileUtils.deleteQuietly(dataDir) } }

Example 2

package org.apache.spark.elasticsearch import org.elasticsearch.common.settings.Settings import org.scalatest.FunSuite class Tests extends FunSuite with SparkSuite with ElasticSearchSuite { test("Reads documents from multiple shards") { val client = es.client val indexName = "index-with-multiple-shards" client.admin().indices().prepareCreate(indexName) .setSettings(Settings.settingsBuilder() .put("index.number_of_replicas", 0) .put("index.number_of_shards", 2) .build() ) .get() for (i <- 1 to 1000) { client.prepareIndex(indexName, "foo", i.toString).setSource("{}").get() } client.admin().cluster().prepareHealth(indexName).setWaitForGreenStatus().get() client.admin().indices().prepareRefresh(indexName).get() val rdd = sparkContext.esRDD(Seq("localhost"), es.clusterName, Seq(indexName), Seq("foo"), "*") assert(rdd.partitions.length == 2) assert(rdd.collect().map(_.metadata.id).sorted.toList == (1 to 1000).map(_.toString).sorted.toList) } test("Writes documents to ElasticSearch") { val client = es.client val indexName = "index1" sparkContext.parallelize(Seq(1, 2, 3, 4)) .map(id => ESDocument(ESMetadata(id.toString, "foo", indexName), "{}")) .saveToES(Seq("localhost"), es.clusterName) client.admin().cluster().prepareHealth(indexName).setWaitForGreenStatus().get() client.admin().indices().prepareRefresh(indexName).get() assert(client.prepareGet(indexName, "foo", "1").get().isExists) assert(client.prepareGet(indexName, "foo", "2").get().isExists) assert(client.prepareGet(indexName, "foo", "3").get().isExists) assert(client.prepareGet(indexName, "foo", "4").get().isExists) } }

Example 3

package org.apache.spark.elasticsearch import java.nio.file.Files import java.util.UUID import org.apache.commons.io.FileUtils import org.elasticsearch.common.settings.Settings import org.elasticsearch.node.{NodeBuilder, Node} class LocalElasticSearch(val clusterName: String = UUID.randomUUID().toString) { lazy val node = buildNode() lazy val client = node.client() val dataDir = Files.createTempDirectory("elasticsearch").toFile private var started = false def buildNode(): Node = { val settings = Settings.settingsBuilder() .put("path.home", dataDir.getAbsolutePath) .put("path.logs", s"${dataDir.getAbsolutePath}/logs") .put("path.data", s"${dataDir.getAbsolutePath}/data") .put("index.store.fs.memory.enabled", true) .put("index.number_of_shards", 1) .put("index.number_of_replicas", 0) .put("cluster.name", clusterName) .build() val instance = NodeBuilder.nodeBuilder().settings(settings).node() started = true instance } def close(): Unit = { if (started) { client.close() node.close() } try { FileUtils.forceDelete(dataDir) } catch { case e: Exception => } } }

Example 4

package es.alvsanand.spark_recommender.recommender import java.net.InetAddress import akka.actor.ActorSystem import com.mongodb.casbah.{MongoClient, MongoClientURI} import es.alvsanand.spark_recommender.model._ import es.alvsanand.spark_recommender.utils.{ESConfig, Logging, MongoConfig} import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.transport.InetSocketTransportAddress import org.elasticsearch.transport.client.PreBuiltTransportClient import spray.httpx.SprayJsonSupport import spray.json.{DefaultJsonProtocol, NullOptions} import spray.routing.SimpleRoutingApp object RecommenderControllerProtocol extends DefaultJsonProtocol with NullOptions with SprayJsonSupport { implicit val productRecommendationRequestFormat = jsonFormat1(ProductRecommendationRequest) implicit val userRecommendationRequestFormat = jsonFormat1(UserRecommendationRequest) implicit val searchRecommendationRequestFormat = jsonFormat1(SearchRecommendationRequest) implicit val productHybridRecommendationRequestFormat = jsonFormat1(ProductHybridRecommendationRequest) implicit val recommendationFormat = jsonFormat2(Recommendation) implicit val hybridRecommendationFormat = jsonFormat3(HybridRecommendation) } object RecommenderController extends SimpleRoutingApp with Logging{ val ES_HOST_PORT_REGEX = "(.+):(\\d+)".r import RecommenderControllerProtocol._ implicit val system = ActorSystem("ActorSystem") def run(serverPort: Int)(implicit mongoConf: MongoConfig, esConf: ESConfig): Unit = { implicit val mongoClient = MongoClient(MongoClientURI(mongoConf.uri)) implicit val esClient = new PreBuiltTransportClient(Settings.EMPTY) esConf.transportHosts.split(";") .foreach { case ES_HOST_PORT_REGEX(host: String, port: String) => esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port.toInt)) } logger.info("Launching REST serves[port=%d]".format(serverPort)) startServer(interface = "localhost", port = serverPort) { path("recs" / "cf" / "pro") { post( entity(as[ProductRecommendationRequest]) { request => complete { RecommenderService.getCollaborativeFilteringRecommendations(request).toStream } } ) } ~ path("recs" / "cf" / "usr") { post( entity(as[UserRecommendationRequest]) { request => complete { RecommenderService.getCollaborativeFilteringRecommendations(request).toStream } } ) } ~ path("recs" / "cb" / "mrl") { post( entity(as[ProductRecommendationRequest]) { request => complete { RecommenderService.getContentBasedMoreLikeThisRecommendations(request).toStream } } ) } ~ path("recs" / "cb" / "sch") { post( entity(as[SearchRecommendationRequest]) { request => complete { RecommenderService.getContentBasedSearchRecommendations(request).toStream } } ) } ~ path("recs" / "hy" / "pro") { post( entity(as[ProductHybridRecommendationRequest]) { request => complete { RecommenderService.getHybridRecommendations(request).toStream } } ) } } } }

Example 5

package com.mfglabs.stream package extensions.elasticsearch import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl.Sink import org.elasticsearch.index.query.QueryBuilders import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.{Millis, Minutes, Span} import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec} import scala.concurrent.duration._ import scala.util.Try import org.elasticsearch.common.settings.Settings import org.elasticsearch.node.Node class ElasticExtensionsSpec extends FlatSpec with Matchers with ScalaFutures with BeforeAndAfterAll { implicit override val patienceConfig = PatienceConfig(timeout = Span(1, Minutes), interval = Span(100, Millis)) implicit val as = ActorSystem() implicit val fm = ActorMaterializer() implicit val blockingEc = ExecutionContextForBlockingOps(scala.concurrent.ExecutionContext.Implicits.global) val settings = Settings.builder() .put("path.data", "target/elasticsearch-data") .put("path.home", "/") .put("transport.type", "local") .put("http.enabled", false) .build(); lazy val node = new Node(settings).start(); implicit lazy val client = node.client() val index = "test" val `type` = "type" "EsStream" should "execute a query a get the result as a stream" in { Try(client.admin.indices().prepareDelete(index).get()) val toIndex = for (i <- 1 to 5002) yield (i, s"""{i: $i}""") toIndex.foreach { case (i, json) => client.prepareIndex(index, `type`).setSource(json).setId(i.toString).get() } client.admin.indices.prepareRefresh(index).get() // to be sure that the data is indexed val res = EsStream.queryAsStream(QueryBuilders.matchAllQuery(), index, `type`, 1 minutes, 50) .runWith(Sink.seq) .futureValue res.sorted shouldEqual toIndex.map(_._2).sorted } override def afterAll(): Unit = { client.close() node.close() } }

Example 6

package com.github.jparkie.spark.elasticsearch.transport import com.github.jparkie.spark.elasticsearch.conf.SparkEsTransportClientConf import org.apache.spark.Logging import org.elasticsearch.client.Client import org.elasticsearch.client.transport.TransportClient import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.transport.InetSocketTransportAddress import scala.collection.mutable private[elasticsearch] trait SparkEsTransportClientManager extends Serializable with Logging { @transient private[transport] val internalTransportClients = mutable.HashMap.empty[SparkEsTransportClientConf, TransportClient] private[transport] def buildTransportSettings(clientConf: SparkEsTransportClientConf): Settings = { val esSettingsBuilder = Settings.builder() clientConf.transportSettings foreach { currentSetting => esSettingsBuilder.put(currentSetting._1, currentSetting._2) } esSettingsBuilder.build() } private[transport] def buildTransportClient(clientConf: SparkEsTransportClientConf, esSettings: Settings): TransportClient = { import SparkEsTransportClientConf._ val esClient = TransportClient.builder() .settings(esSettings) .build() getTransportAddresses(clientConf.transportAddresses, clientConf.transportPort) foreach { inetSocketAddress => esClient.addTransportAddresses(new InetSocketTransportAddress(inetSocketAddress)) } sys.addShutdownHook { logInfo("Closed Elasticsearch Transport Client.") esClient.close() } logInfo(s"Connected to the following Elasticsearch nodes: ${esClient.connectedNodes()}.") esClient } def closeTransportClient(clientConf: SparkEsTransportClientConf): Unit = synchronized { internalTransportClients.remove(clientConf) match { case Some(transportClient) => transportClient.close() case None => logError(s"No TransportClient for $clientConf.") } } } object SparkEsTransportClientManager extends SparkEsTransportClientManager

Example 7

package ch.epfl.bluebrain.nexus.commons.es.server.embed import java.nio.file.Files import java.util.Arrays._ import akka.http.scaladsl.model.Uri import ch.epfl.bluebrain.nexus.commons.es.server.embed.ElasticServer.MyNode import ch.epfl.bluebrain.nexus.util.{ActorSystemFixture, Randomness} import org.apache.commons.io.FileUtils import org.elasticsearch.common.settings.Settings import org.elasticsearch.index.reindex.ReindexPlugin import org.elasticsearch.node.Node import org.elasticsearch.painless.PainlessPlugin import org.elasticsearch.plugins.Plugin import org.elasticsearch.transport.Netty4Plugin import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.BeforeAndAfterAll import scala.jdk.CollectionConverters._ import scala.util.Try // $COVERAGE-OFF$ abstract class ElasticServer extends ActorSystemFixture("ElasticServer") with AnyWordSpecLike with BeforeAndAfterAll with Randomness { override protected def beforeAll(): Unit = { super.beforeAll() startElastic() } override protected def afterAll(): Unit = { stopElastic() super.afterAll() } val startPort = freePort() val endPort = startPort + 100 val esUri = Uri(s"http://localhost:$startPort") implicit val ec = system.dispatcher private val clusterName = "elasticsearch" private val dataDir = Files.createTempDirectory("elasticsearch_data_").toFile private val settings = Settings .builder() .put("path.home", dataDir.toString) .put("http.port", s"$startPort-$endPort") .put("http.cors.enabled", true) .put("cluster.name", clusterName) .put("http.type", "netty4") .build private lazy val node = new MyNode(settings, asList(classOf[Netty4Plugin], classOf[PainlessPlugin], classOf[ReindexPlugin])) def startElastic(): Unit = { node.start() () } def stopElastic(): Unit = { node.close() Try(FileUtils.forceDelete(dataDir)) () } } object ElasticServer extends Randomness { import java.util import org.elasticsearch.node.InternalSettingsPreparer private class MyNode(preparedSettings: Settings, classpathPlugins: util.Collection[Class[_ <: Plugin]]) extends Node( InternalSettingsPreparer .prepareEnvironment(preparedSettings, Map.empty[String, String].asJava, null, () => "elasticsearch"), classpathPlugins, true ) {} } // $COVERAGE-ON$

Example 8

package com.klibisz.elastiknn import java.util import com.klibisz.elastiknn.mapper.VectorMapper import com.klibisz.elastiknn.query._ import org.elasticsearch.common.settings.Settings import org.elasticsearch.index.mapper.Mapper import org.elasticsearch.plugins.SearchPlugin.QuerySpec import org.elasticsearch.plugins._ class ElastiKnnPlugin(settings: Settings) extends Plugin with IngestPlugin with SearchPlugin with ActionPlugin with MapperPlugin { override def getQueries: util.List[SearchPlugin.QuerySpec[_]] = util.Arrays.asList( new QuerySpec(KnnQueryBuilder.NAME, KnnQueryBuilder.Reader, KnnQueryBuilder.Parser) ) override def getMappers: util.Map[String, Mapper.TypeParser] = { import VectorMapper._ new util.HashMap[String, Mapper.TypeParser] { put(sparseBoolVector.CONTENT_TYPE, new sparseBoolVector.TypeParser) put(denseFloatVector.CONTENT_TYPE, new denseFloatVector.TypeParser) } } }

Example 9

package storage.es import com.sksamuel.elastic4s.{ElasticsearchClientUri, TcpClient} import com.sksamuel.elastic4s.ElasticDsl._ import com.sksamuel.elastic4s.bulk.RichBulkResponse import java.io.File import javax.inject.{Inject, Singleton} import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.xcontent.XContentType import play.api.{Configuration, Logger} import play.api.inject.ApplicationLifecycle import scala.io.Source import scala.concurrent.{Await, Future, ExecutionContext} import scala.concurrent.duration._ import scala.util.{Try, Success, Failure} private def loadMappings(existingMappings: Seq[String] = Seq.empty[String]): Seq[(String, String)] = new File("conf/es-mappings").listFiles.toSeq.filter(_.getName.endsWith(".json")) .foldLeft(Seq.empty[(Int, (String, String))])((mappings, file) => { val number = file.getName.substring(0, 2).toInt val name = file.getName.substring(3, file.getName.lastIndexOf('.')) if (existingMappings.contains(name)) { mappings } else { val json = Source.fromFile(file).getLines.mkString("\n") mappings :+ (number, (name, json)) } }).sortBy(_._1).map(_._2) def countTotalDocs()(implicit ctx: ExecutionContext): Future[Long] = client execute { search(ES.RECOGITO) limit 0 } map { _.totalHits } }

Example 10

package com.spotify.scio.elasticsearch import java.net.InetSocketAddress import org.elasticsearch.action.admin.indices.create.CreateIndexResponse import org.elasticsearch.client.AdminClient import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.transport.InetSocketTransportAddress import org.elasticsearch.common.xcontent.XContentType import org.elasticsearch.transport.client.PreBuiltTransportClient import scala.util.Try object IndexAdmin { private def adminClient[A](esOptions: ElasticsearchOptions)(f: AdminClient => A): Try[A] = { val settings: Settings = Settings.builder.put("cluster.name", esOptions.clusterName).build val transportAddresses: Seq[InetSocketTransportAddress] = esOptions.servers .map(addr => new InetSocketTransportAddress(addr)) val client = new PreBuiltTransportClient(settings) .addTransportAddresses(transportAddresses: _*) val result = Try(f(client.admin())) client.close() result } private def ensureIndex( index: String, mappingSource: String, client: AdminClient ): CreateIndexResponse = client .indices() .prepareCreate(index) .setSource(mappingSource, XContentType.JSON) .get() }

Example 11

package com.spotify.scio.elasticsearch import java.net.InetSocketAddress import org.elasticsearch.action.admin.indices.create.CreateIndexResponse import org.elasticsearch.client.AdminClient import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.transport.TransportAddress import org.elasticsearch.common.xcontent.XContentType import org.elasticsearch.transport.client.PreBuiltTransportClient import scala.util.Try object IndexAdmin { private def adminClient[A](esOptions: ElasticsearchOptions)(f: AdminClient => A): Try[A] = { val settings: Settings = Settings.builder.put("cluster.name", esOptions.clusterName).build val transportAddresses: Seq[TransportAddress] = esOptions.servers .map(addr => new TransportAddress(addr)) val client = new PreBuiltTransportClient(settings) .addTransportAddresses(transportAddresses: _*) val result = Try(f(client.admin())) client.close() result } private def ensureIndex( index: String, mappingSource: String, client: AdminClient ): CreateIndexResponse = client .indices() .prepareCreate(index) .setSource(mappingSource, XContentType.JSON) .get() }
Sours: https://www.programcreek.com/scala/org.elasticsearch.common.settings.Settings

Elasticsearch

The Alpakka Elasticsearch connector provides Akka Streams integration for Elasticsearch.

For more information about Elasticsearch, please visit the Elasticsearch documentation.

Artifacts

sbt
Maven
Gradle

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.fasterxml.jackson.corejackson-core2.11.4
com.fasterxml.jackson.corejackson-databind2.11.4
com.typesafe.akkaakka-http-spray-json_2.1210.1.11
com.typesafe.akkaakka-http_2.1210.1.11
com.typesafe.akkaakka-stream_2.122.6.14
org.scala-langscala-library2.12.11
Dependency tree
com.fasterxml.jackson.core jackson-core 2.11.4 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-databind 2.11.4 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.11.4 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.11.4 The Apache Software License, Version 2.0 com.typesafe.akka akka-http-spray-json_2.12 10.1.11 Apache-2.0 com.typesafe.akka akka-http_2.12 10.1.11 Apache-2.0 com.typesafe.akka akka-http-core_2.12 10.1.11 Apache-2.0 com.typesafe.akka akka-parsing_2.12 10.1.11 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0 io.spray spray-json_2.12 1.3.5 Apache 2 org.scala-lang scala-library 2.12.11 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0 com.typesafe.akka akka-http_2.12 10.1.11 Apache-2.0 com.typesafe.akka akka-http-core_2.12 10.1.11 Apache-2.0 com.typesafe.akka akka-parsing_2.12 10.1.11 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0 com.typesafe.akka akka-stream_2.12 2.6.14 Apache-2.0 com.typesafe.akka akka-actor_2.12 2.6.14 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.11 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.12 2.6.14 Apache-2.0 com.typesafe ssl-config-core_2.12 0.4.2 Apache-2.0 com.typesafe config 1.4.0 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.2 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0 org.reactivestreams reactive-streams 1.0.3 CC0 org.scala-lang scala-library 2.12.11 Apache-2.0 org.scala-lang scala-library 2.12.11 Apache-2.0

Elasticsearch connection

The connection and credentials to authenticate with are configured with .

Scala
Java
Parameter Default Description
baseUrl Empty The base URL of Elasticsearch. Should not include a trailing slash.
username None The username to authenticate with
password None The password to authenticate with
headers None List of headers that should be sent with the http request.
connectionContext None The connectionContext that will be used with the http request. This can be used for TLS Auth instead of basic auth (username/password) by setting the SSLContext within the connectionContext.

Elasticsearch parameters

Any API method that allows reading from and writing to Elasticsearch takes an instance of .

has be constructed based on the ElasticSearch API version that you’re targeting:

Scala
Java

Elasticsearch as Source and Sink

You can stream messages from or to Elasticsearch using the , or the .

Scala
Java

With typed source

Use and to create source and sink. The data is converted to and from JSON by Spray JSON.The data is converted to and from JSON by Jackson’s ObjectMapper.

Scala
Java

With JSON source

Use and to create source and sink.

Scala
Java

Writing to Elasticsearch

In the above examples, is used as the input to and . This means requesting operation to Elasticsearch. It’s possible to request other operations using following message types:

Message factory Description
WriteMessage.createIndexMessage Create a new document. If is specified and it already exists, replace the document and increment its version.
WriteMessage.createCreateMessage Create a new document. If already exists, the will contain an error.
WriteMessage.createUpdateMessage Update an existing document. If there is no document with the specified , do nothing.
WriteMessage.createUpsertMessage Update an existing document. If there is no document with the specified , create a new document.
WriteMessage.createDeleteMessage Delete an existing document. If there is no document with the specified , do nothing.
Scala
Java

Source configuration

We can configure the source by .

Scala
Java
Parameter Default Description
connection The connection details and credentials to authenticate against ElasticSearch. See
bufferSize 10 retrieves messages from Elasticsearch by scroll scan. This buffer size is used as the scroll size.
includeDocumentVersion false Tell Elasticsearch to return the documents property with the search results. See Version and Optimistic Concurrenct Control to know about this property.
scrollDuration 5 min retrieves messages from Elasticsearch by scroll scan. This parameter is used as a scroll value. See Time units for supported units.
apiVersion V7 Currently supports and (see below)

Sink and flow configuration

Sinks and flows are configured with .

Scala
Java
Parameter Default Description
connection The connection details and credentials to authenticate against ElasticSearch. See
bufferSize 10 Flow and Sink batch messages to bulk requests when back-pressure applies.
versionType None If set, uses the chosen versionType to index documents. See Version types for accepted settings.
retryLogic No retries See below
apiVersion V7 Currently supports and (see below)
allowExplicitIndex True When set to False, the index name will be included in the URL instead of on each document (see below)

Retry logic

A bulk request might fail partially for some reason. To retry failed writes to Elasticsearch, a can be specified.

The provided implementations are:

Parameter Description
maxRetries The stage fails, if it gets this number of consecutive failures.
retryInterval Failing writes are retried after this duration.
Parameter Description
maxRetries The stage fails, if it gets this number of consecutive failures.
minBackoff Initial backoff for failing writes.
maxBackoff Maximum backoff for failing writes.

In case of write failures the order of messages downstream is guaranteed to be preserved.

Supported API versions

To support reading and writing to multiple versions of Elasticsearch, an can be specified.

This will be used to: 1. transform the bulk request into a format understood by the corresponding Elasticsearch server. 2. determine whether to include the index type mapping in the API calls. See removal of types

Currently and are supported specifically but this parameter does not need to match the server version exactly (for example, either or should work with Elasticsearch 6.x).

Allow explicit index

When using the API, Elasticsearch will reject requests that have an explicit index in the request body if explicit index names are not allowed. See URL-based access control

Elasticsearch as Flow

You can also build flow stages with . The API is similar to creating Sinks.

Scala
Java

Storing documents from Strings

Elasticsearch requires the documents to be properly formatted JSON. If your data is available as JSON in Strings, you may use the pre-defined to avoid any conversions. For any other JSON technologies, implement a .

Scala
Java

Passing data through ElasticsearchFlow

When streaming documents from Kafka, you might want to commit to Kafka AFTER the document has been written to Elastic.

Scala
Java

Specifying custom index-name for every document

When working with index-patterns using wildcards, you might need to specify a custom index-name for each document:

Scala
Java

Specifying custom metadata for every document

In some cases you might want to specify custom metadata per document you are inserting, for example a , this can be done like so:

Scala
Java

More custom searching

The easiest way of using Elasticsearch-source, is to just specify the query-param. Sometimes you need more control, like specifying which fields to return and so on. In such cases you can instead use ‘searchParams’ instead:

Scala
Java

Routing

Support for custom routing is available through the key. Add this key and the respective value in ‘searchParams’ map, to route your search directly to the shard that holds the document you are looking for and enjoy improved response times.

Sort

Support for sort is available through the key in map. If no sort is given, the source will use to maximize performance, as indicated by elasticsearch documentation.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.

Sours: https://doc.akka.io/docs/alpakka/current/elasticsearch.html
  1. Samsung 4 virus
  2. Hyguru uworld notes
  3. Chevy brake switch
  4. Geet episode 115

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs.

-- Spark website

Spark provides fast iterative/functional-like capabilities over large data sets, typically by caching data in memory. As opposed to the rest of the libraries mentioned in this documentation, Apache Spark is computing framework that is not tied to Map/Reduce itself however it does integrate with Hadoop, mainly to HDFS. elasticsearch-hadoop allows Elasticsearch to be used in Spark in two ways: through the dedicated support available since 2.1 or through the Map/Reduce bridge since 2.0. Spark 2.0 is supported in elasticsearch-hadoop since version 5.0

Installationedit

Just like other libraries, elasticsearch-hadoop needs to be available in Spark’s classpath. As Spark has multiple deployment modes, this can translate to the target classpath, whether it is on only one node (as is the case with the local mode - which will be used through-out the documentation) or per-node depending on the desired infrastructure.

elasticsearch-hadoop provides native integration between Elasticsearch and Apache Spark, in the form of an (Resilient Distributed Dataset) (or Pair to be precise) that can read data from Elasticsearch. The is offered in two flavors: one for Scala (which returns the data as with Scala collections) and one for Java (which returns the data as containing collections).

Whenever possible, consider using the native integration as it offers the best performance and maximum flexibility.

Configurationedit

To configure elasticsearch-hadoop for Apache Spark, one can set the various properties described in the Configuration chapter in the object:

import org.apache.spark.SparkConf val conf = new SparkConf().setAppName(appName).setMaster(master) conf.set("es.index.auto.create", "true")
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); conf.set("es.index.auto.create", "true");

Command-lineFor those that want to set the properties through the command-line (either directly or by loading them from a file), note that Spark only accepts those that start with the "spark." prefix and will ignore the rest (and depending on the version a warning might be thrown). To work around this limitation, define the elasticsearch-hadoop properties by appending the prefix (thus they become ) and elasticsearch-hadoop will automatically resolve them:

$ ./bin/spark-submit --conf spark.es.resource=index/type ...

Notice the property which became

Writing data to Elasticsearchedit

With elasticsearch-hadoop, any can be saved to Elasticsearch as long as its content can be translated into documents. In practice this means the type needs to be a (whether a Scala or a Java one), a or a Scala case class. When that is not the case, one can easily transform the data in Spark or plug-in their own custom .

Scalaedit

When using Scala, simply import the package which, through the pimp my library pattern, enriches the any API with methods:

import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran") sc.makeRDD( Seq(numbers, airports) ).saveToEs("spark/docs")

Spark Scala imports

elasticsearch-hadoop Scala imports

start Spark through its Scala API

creates an ad-hoc based on the collection specified; any other (in Java or Scala) can be passed in

index the content (namely the two documents (numbers and airports)) in Elasticsearch under

Scala users might be tempted to use and the notation for declaring root objects (that is the JSON document) instead of using a . While similar, the first notation results in slightly different types that cannot be matched to a JSON document: is an order sequence (in other words a list) while creates a which is more or less an ordered, fixed number of elements. As such, a list of lists cannot be used as a document since it cannot be mapped to a JSON object; however it can be used freely within one. Hence why in the example above was used instead of

As an alternative to the implicit import above, one can use elasticsearch-hadoop Spark support in Scala through in the package which acts as a utility class allowing explicit method invocations. Additionally instead of s (which are convenient but require one mapping per instance due to their difference in structure), use a case class :

import org.apache.spark.SparkContext import org.elasticsearch.spark.rdd.EsSpark // define a case class case class Trip(departure: String, arrival: String) val upcomingTrip = Trip("OTP", "SFO") val lastWeekTrip = Trip("MUC", "OTP") val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) EsSpark.saveToEs(rdd, "spark/docs")

import

Define a case class named

Create an around the instances

Index the explicitly through

For cases where the id (or other metadata fields like or ) of the document needs to be specified, one can do so by setting the appropriate mapping namely . Following the previous example, to indicate to Elasticsearch to use the field as the document id, update the configuration (it is also possible to set the property on the though due to its global effect it is discouraged):

EsSpark.saveToEs(rdd, "spark/docs", Map("es.mapping.id" -> "id"))
Javaedit

Java users have a dedicated class that provides a similar functionality to , namely in the (a package similar to Spark’s Java API):

import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2); Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); JavaEsSpark.saveToEs(javaRDD, "spark/docs");

Spark Java imports

elasticsearch-hadoop Java imports

start Spark through its Java API

to simplify the example, use Guava(a dependency of Spark) * methods for simple , creation

create a simple over the two collections; any other (in Java or Scala) can be passed in

index the content (namely the two documents (numbers and airports)) in Elasticsearch under

The code can be further simplified by using Java 5 static imports. Additionally, the (who’s mapping is dynamic due to its loose structure) can be replaced with a :

public class TripBean implements Serializable { private String departure, arrival; public TripBean(String departure, String arrival) { setDeparture(departure); setArrival(arrival); } public TripBean() {} public String getDeparture() { return departure; } public String getArrival() { return arrival; } public void setDeparture(String dep) { departure = dep; } public void setArrival(String arr) { arrival = arr; } }
import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... TripBean upcoming = new TripBean("OTP", "SFO"); TripBean lastWeek = new TripBean("MUC", "OTP"); JavaRDD<TripBean> javaRDD = jsc.parallelize( ImmutableList.of(upcoming, lastWeek)); saveToEs(javaRDD, "spark/docs");

statically import

define an containing instances ( is a )

call method without having to type again

Setting the document id (or other metadata fields like or ) is similar to its Scala counterpart, though potentially a bit more verbose depending on whether you are using the JDK classes or some other utilities (like Guava):

JavaEsSpark.saveToEs(javaRDD, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));

Writing existing JSON to Elasticsearchedit

For cases where the data in the is already in JSON, elasticsearch-hadoop allows direct indexing without applying any transformation; the data is taken as is and sent directly to Elasticsearch. As such, in this case, elasticsearch-hadoop expects either an containing or byte arrays (/), assuming each entry represents a JSON document. If the does not have the proper signature, the methods cannot be applied (in Scala they will not be available).

Scalaedit
val json1 = """{"reason" : "business", "airport" : "SFO"}""" val json2 = """{"participants" : 5, "airport" : "OTP"}""" new SparkContext(conf).makeRDD(Seq(json1, json2)) .saveJsonToEs("spark/json-trips")

example of an entry within the - the JSON is written as is, without any transformation, it should not contains breakline character like \n or \r\n

index the JSON data through the dedicated method

Javaedit
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}"; String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}"; JavaSparkContext jsc = ... JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); JavaEsSpark.saveJsonToEs(stringRDD, "spark/json-trips");

example of an entry within the - the JSON is written as is, without any transformation, it should not contains breakline character like \n or \r\n

notice the signature

index the JSON data through the dedicated method

Writing to dynamic/multi-resourcesedit

For cases when the data being written to Elasticsearch needs to be indexed under different buckets (based on the data content) one can use the field which accepts a pattern that is resolved from the document content, at runtime. Following the aforementioned media example, one could configure it as follows:

Scalaedit
val game = Map( "media_type"->"game", "title" -> "FF VI", "year" -> "1994") val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") sc.makeRDD(Seq(game, book, cd)).saveToEs("my-collection-{media_type}/doc")

Document key used for splitting the data. Any field can be declared (but make sure it is available in all documents)

Save each object based on its resource pattern, in this example based on

For each document/object about to be written, elasticsearch-hadoop will extract the field and use its value to determine the target resource.

Javaedit

As expected, things in Java are strikingly similar:

Map<String, ?> game = ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994"); Map<String, ?> book = ... Map<String, ?> cd = ... JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(game, book, cd)); saveToEs(javaRDD, "my-collection-{media_type}/doc");

Save each object based on its resource pattern, in this example

Handling document metadataedit

Elasticsearch allows each document to have its own metadata. As explained above, through the various mapping options one can customize these parameters so that their values are extracted from their belonging document. Further more, one can even include/exclude what parts of the data are sent back to Elasticsearch. In Spark, elasticsearch-hadoop extends this functionality allowing metadata to be supplied outside the document itself through the use of pairs. In other words, for s containing a key-value tuple, the metadata can be extracted from the key and the value used as the document source.

The metadata is described through the Java enum within package which identifies its type - , , , etc…​ Thus an keys can be a containing the for each document and its associated values. If key is not of type , elasticsearch-hadoop will consider the object as representing the document id and use it accordingly. This sounds more complicated than it is, so let us see some examples.

Scalaedit

Pair s, or simply put s with the signature can take advantage of the methods that are available either through the implicit import of package or object. To manually specify the id for each document, simply pass in the (not of type ) in your :

val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // instance of SparkContext val sc = ... val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) airportsRDD.saveToEsWithMeta("airports/2015")

is a key-value pair ; it is created from a of s

The key of each tuple within the represents the id of its associated value/document; in other words, document has id , and

Since is a pair , it has the method available. This tells elasticsearch-hadoop to pay special attention to the keys and use them as metadata, in this case as document ids. If would have been used instead, then elasticsearch-hadoop would consider the tuple, that is both the key and the value, as part of the document.

When more than just the id needs to be specified, one should use a with keys of type :

import org.elasticsearch.spark.rdd.Metadata._ val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // metadata for each document // note it's not required for them to have the same structure val otpMeta = Map(ID -> 1, TTL -> "3h") val mucMeta = Map(ID -> 2, VERSION -> "23") val sfoMeta = Map(ID -> 3) // instance of SparkContext val sc = ... val airportsRDD = sc.makeRDD( Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) airportsRDD.saveToEsWithMeta("airports/2015")

Import the enum

The metadata used for document. In this case, with a value of 1 and with a value of

The metadata used for document. In this case, with a value of 2 and with a value of

The metadata used for document. In this case, with a value of 3

The metadata and the documents are assembled into a pair

The is saved accordingly using the method

Javaedit

In a similar fashion, on the Java side, provides methods that are applied to (the equivalent in Java of ). Thus to save documents based on their ids one can use:

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs(ImmutableList.of( new Tuple2<Object, Object>(1, otp), new Tuple2<Object, Object>(2, jfk))); JavaEsSpark.saveToEsWithMeta(pairRDD, target);

Create a by using Scala class wrapped around the document id and the document itself

Tuple for the first document wrapped around the id () and the doc () itself

Tuple for the second document wrapped around the id () and

The is saved accordingly using the keys as a id and the values as documents

When more than just the id needs to be specified, one can choose to use a populated with keys of type :

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import org.elasticsearch.spark.rdd.Metadata; import static org.elasticsearch.spark.rdd.Metadata.*; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran"); // metadata for each document // note it's not required for them to have the same structure Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaPairRDD<?, ?> pairRdd = jsc.parallelizePairs<(ImmutableList.of( new Tuple2<Object, Object>(otpMeta, otp), new Tuple2<Object, Object>(sfoMeta, sfo))); JavaEsSpark.saveToEsWithMeta(pairRDD, target);

describing the document metadata that can be declared

static import for the to refer to its values in short format (, , etc…​)

Metadata for document

Metadata for document

Tuple between (as the value) and its metadata (as the key)

Tuple associating and its metadata

invoked over the containing documents and their respective metadata

Reading data from Elasticsearchedit

For reading, one should define the Elasticsearch that streams data from Elasticsearch to Spark.

Scalaedit

Similar to writing, the package, enriches the API with methods:

import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) val RDD = sc.esRDD("radio/artists")

Spark Scala imports

elasticsearch-hadoop Scala imports

start Spark through its Scala API

a dedicated for Elasticsearch is created for index

The method can be overloaded to specify an additional query or even a configuration (overriding ):

... import org.elasticsearch.spark._ ... val conf = ... val sc = new SparkContext(conf) sc.esRDD("radio/artists", "?q=me*")

create an streaming all the documents matching from index

The documents from Elasticsearch are returned, by default, as a containing as the first element the document id and the second element the actual document represented through Scala collections, namely one `Map[String, Any]`where the keys represent the field names and the value their respective values.

Javaedit

Java users have a dedicated that works the same as its Scala counterpart however the returned values (or second element) returns the documents as native, collections.

import org.apache.spark.api.java.JavaSparkContext; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); JavaPairRDD<String, Map<String, Object>> esRDD = JavaEsSpark.esRDD(jsc, "radio/artists");

Spark Java imports

elasticsearch-hadoop Java imports

start Spark through its Java API

a dedicated for Elasticsearch is created for index

In a similar fashion one can use the overloaded methods to specify a query or pass a object for advanced configuration. Let us see how this looks, but this time around using Java static imports. Further more, let us discard the documents ids and retrieve only the values:

import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.*; ... JavaRDD<Map<String, Object>> rdd = esRDD(jsc, "radio/artists", "?q=me*") .values();

statically import class

create an streaming all the documents starting with from index . Note the method does not have to be fully qualified due to the static import

return only values of the - hence why the result is of type and not

By using the API, one gets a hold of Spark’s dedicated which are better suited in Java environments than the base (due to its Scala signatures). Moreover, the dedicated returns Elasticsearch documents as proper Java collections so one does not have to deal with Scala collections (which is typically the case with s). This is particularly powerful when using Java 8, which we strongly recommend as its lambda expressions make collection processing extremely concise.

To wit, let us assume one wants to filter the documents from the and return only those that contain a value that contains (please ignore the fact one can and should do the filtering directly through Elasticsearch).

In versions prior to Java 8, the code would look something like this:

JavaRDD<Map<String, Object>> esRDD = esRDD(jsc, "radio/artists", "?q=me*").values(); JavaRDD<Map<String, Object>> filtered = esRDD.filter( new Function<Map<String, Object>, Boolean>() { @Override public Boolean call(Map<String, Object> map) throws Exception { returns map.contains("mega"); } });

with Java 8, the filtering becomes a one liner:

JavaRDD<Map<String, Object>> esRDD = esRDD(jsc, "radio/artists", "?q=me*").values(); JavaRDD<Map<String, Object>> filtered = esRDD.filter(doc -> doc.contains("mega"));
Reading data in JSON formatedit

In case where the results from Elasticsearch need to be in JSON format (typically to be sent down the wire to some other system), one can use the dedicated methods. In this case, the connector will return the documents content as it is received from Elasticsearch without any processing as an in Scala or in Java with the keys representing the document id and the value its actual content in JSON format.

Type conversionedit

When dealing with multi-value/array fields, please see this section and in particular these configuration options. IMPORTANT: If automatic index creation is used, please review this section for more information.

elasticsearch-hadoop automatically converts Spark built-in types to Elasticsearch types (and back) as shown in the table below:

Table 5. Scala Types Conversion Table

Scala typeElasticsearch type

empty

according to the table

case class

(see )

in addition, the following implied conversion applies for Java types:

Table 6. Java Types Conversion Table

Java typeElasticsearch type

or (depending on size)

( format)

( format)

( format)

(BASE64)

Java Bean

(see )

The conversion is done as a best effort; built-in Java and Scala types are guaranteed to be properly converted, however there are no guarantees for user types whether in Java or Scala. As mentioned in the tables above, when a class is encountered in Scala or in Java, the converters will try to its content and save it as an . Note this works only for top-level user objects - if the user object has other user objects nested in, the conversion is likely to fail since the converter does not perform nested . This is done on purpose since the converter has to serialize and deserialize the data and user types introduce ambiguity due to data loss; this can be addressed through some type of mapping however that takes the project way too close to the realm of ORMs and arguably introduces too much complexity for little to no gain; thanks to the processing functionality in Spark and the plugability in elasticsearch-hadoop one can easily transform objects into other types, if needed with minimal effort and maximum control.

Geo typesIt is worth mentioning that rich data types available only in Elasticsearch, such as or are supported by converting their structure into the primitives available in the table above. For example, based on its storage a might be returned as a or a .

Spark Streaming supportedit

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.

-- Spark website

Spark Streaming is an extension on top of the core Spark functionality that allows near real time processing of stream data. Spark Streaming works around the idea of s, or Discretized Streams. operate by collecting newly arrived records into a small and executing it. This repeats every few seconds with a new in a process called microbatching. The api includes many of the same processing operations as the api, plus a few other streaming specific methods. elasticsearch-hadoop provides native integration with Spark Streaming as of version 5.0.

When using the elasticsearch-hadoop Spark Streaming support, Elasticsearch can be targeted as an output location to index data into from a Spark Streaming job in the same way that one might persist the results from an . Though, unlike s, you are unable to read data out of Elasticsearch using a due to the continuous nature of it.

Spark Streaming support provides special optimizations to allow for conservation of network resources on Spark executors when running jobs with very small processing windows. For this reason, one should prefer to use this integration instead of invoking on s returned from the call on .

Writing to Elasticsearchedit

Like s, any can be saved to Elasticsearch as long as its content can be translated into documents. In practice this means the type needs to be a (either a Scala or a Java one), a or a Scala case class. When that is not the case, one can easily transform the data in Spark or plug-in their own custom .

Scalaedit

When using Scala, simply import the package which, through the pimp my library pattern, enriches the API with methods:

import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.StreamingContext._ import org.elasticsearch.spark.streaming._ ... val conf = ... val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran") val rdd = sc.makeRDD(Seq(numbers, airports)) val microbatches = mutable.Queue(rdd) ssc.queueStream(microbatches).saveToEs("spark/docs") ssc.start() ssc.awaitTermination()

Spark and Spark Streaming Scala imports

elasticsearch-hadoop Spark Streaming imports

start Spark through its Scala API

start SparkStreaming context by passing it the SparkContext. The microbatches will be processed every second.

creates an ad-hoc based on the collection specified; any other (in Java or Scala) can be passed in. Create a queue of `RDD`s to signify the microbatches to perform.

Create a out of the

Start the spark Streaming Job and wait for it to eventually finish.

As an alternative to the implicit import above, one can use elasticsearch-hadoop Spark Streaming support in Scala through in the package which acts as a utility class allowing explicit method invocations. Additionally instead of s (which are convenient but require one mapping per instance due to their difference in structure), use a case class :

import org.apache.spark.SparkContext import org.elasticsearch.spark.streaming.EsSparkStreaming // define a case class case class Trip(departure: String, arrival: String) val upcomingTrip = Trip("OTP", "SFO") val lastWeekTrip = Trip("MUC", "OTP") val rdd = sc.makeRDD(Seq(upcomingTrip, lastWeekTrip)) val microbatches = mutable.Queue(rdd) val dstream = ssc.queueStream(microbatches) EsSparkStreaming.saveToEs(dstream, "spark/docs") ssc.start()

import

Define a case class named

Create a around the of instances

Configure the to be indexed explicitly through

Start the streaming process

Once a SparkStreamingContext is started, no new s can be added or configured. Once a context has stopped, it cannot be restarted. There can only be one active SparkStreamingContext at a time per JVM. Also note that when stopping a SparkStreamingContext programmatically, it stops the underlying SparkContext unless instructed not to.

For cases where the id (or other metadata fields like or ) of the document needs to be specified, one can do so by setting the appropriate mapping namely . Following the previous example, to indicate to Elasticsearch to use the field as the document id, update the configuration (it is also possible to set the property on the though due to its global effect it is discouraged):

EsSparkStreaming.saveToEs(dstream, "spark/docs", Map("es.mapping.id" -> "id"))
Javaedit

Java users have a dedicated class that provides a similar functionality to , namely in the package (a package similar to Spark’s Java API):

import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.SparkConf; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaDStream; import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; ... SparkConf conf = ... JavaSparkContext jsc = new JavaSparkContext(conf); JavaStreamingContext jssc = new JavaSparkStreamingContext(jsc, Seconds.apply(1)); Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2); Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); Queue<JavaRDD<Map<String, ?>>> microbatches = new LinkedList<>(); microbatches.add(javaRDD); JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches); JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs"); jssc.start()

Spark and Spark Streaming Java imports

elasticsearch-hadoop Java imports

start Spark and Spark Streaming through its Java API. The microbatches will be processed every second.

to simplify the example, use Guava(a dependency of Spark) * methods for simple , creation

create a simple over the microbatch; any other s (in Java or Scala) can be passed in

index the content (namely the two documents (numbers and airports)) in Elasticsearch under

execute the streaming job.

The code can be further simplified by using Java 5 static imports. Additionally, the (who’s mapping is dynamic due to its loose structure) can be replaced with a :

public class TripBean implements Serializable { private String departure, arrival; public TripBean(String departure, String arrival) { setDeparture(departure); setArrival(arrival); } public TripBean() {} public String getDeparture() { return departure; } public String getArrival() { return arrival; } public void setDeparture(String dep) { departure = dep; } public void setArrival(String arr) { arrival = arr; } }
import static org.elasticsearch.spark.rdd.api.java.JavaEsSparkStreaming; ... TripBean upcoming = new TripBean("OTP", "SFO"); TripBean lastWeek = new TripBean("MUC", "OTP"); JavaRDD<TripBean> javaRDD = jsc.parallelize(ImmutableList.of(upcoming, lastWeek)); Queue<JavaRDD<TripBean>> microbatches = new LinkedList<JavaRDD<TripBean>>(); microbatches.add(javaRDD); JavaDStream<TripBean> javaDStream = jssc.queueStream(microbatches); saveToEs(javaDStream, "spark/docs"); jssc.start()

statically import

define a containing instances ( is a )

call method without having to type again

run that Streaming job

Setting the document id (or other metadata fields like or ) is similar to its Scala counterpart, though potentially a bit more verbose depending on whether you are using the JDK classes or some other utilities (like Guava):

JavaEsSparkStreaming.saveToEs(javaDStream, "spark/docs", ImmutableMap.of("es.mapping.id", "id"));

Writing Existing JSON to Elasticsearchedit

For cases where the data being streamed by the is already serialized as JSON, elasticsearch-hadoop allows direct indexing without applying any transformation; the data is taken as is and sent directly to Elasticsearch. As such, in this case, elasticsearch-hadoop expects either a containing or byte arrays (/), assuming each entry represents a JSON document. If the does not have the proper signature, the methods cannot be applied (in Scala they will not be available).

Scalaedit
val json1 = """{"reason" : "business", "airport" : "SFO"}""" val json2 = """{"participants" : 5, "airport" : "OTP"}""" val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val rdd = sc.makeRDD(Seq(json1, json2)) val microbatch = mutable.Queue(rdd) ssc.queueStream(microbatch).saveJsonToEs("spark/json-trips") ssc.start()

example of an entry within the - the JSON is written as is, without any transformation

configure the stream to index the JSON data through the dedicated method

start the streaming job

Javaedit
String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}"; String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}"; JavaSparkContext jsc = ... JavaStreamingContext jssc = ... JavaRDD<String> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); Queue<JavaRDD<String>> microbatches = new LinkedList<JavaRDD<String>>(); microbatches.add(stringRDD); JavaDStream<String> stringDStream = jssc.queueStream(microbatches); JavaEsSparkStreaming.saveJsonToEs(stringRDD, "spark/json-trips"); jssc.start()

example of an entry within the - the JSON is written as is, without any transformation

creating an , placing it into a queue, and creating a out of the queued s, treating each as a microbatch.

notice the signature

configure stream to index the JSON data through the dedicated method

launch stream job

Writing to dynamic/multi-resourcesedit

For cases when the data being written to Elasticsearch needs to be indexed under different buckets (based on the data content) one can use the field which accepts a pattern that is resolved from the document content, at runtime. Following the aforementioned media example, one could configure it as follows:

Scalaedit
val game = Map( "media_type" -> "game", "title" -> "FF VI", "year" -> "1994") val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") val batch = sc.makeRDD(Seq(game, book, cd)) val microbatches = mutable.Queue(batch) ssc.queueStream(microbatches).saveToEs("my-collection-{media_type}/doc") ssc.start()

Document key used for splitting the data. Any field can be declared (but make sure it is available in all documents)

Save each object based on its resource pattern, in this example based on

For each document/object about to be written, elasticsearch-hadoop will extract the field and use its value to determine the target resource.

Javaedit

As expected, things in Java are strikingly similar:

Map<String, ?> game = ImmutableMap.of("media_type", "game", "title", "FF VI", "year", "1994"); Map<String, ?> book = ... Map<String, ?> cd = ... JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(game, book, cd)); Queue<JavaRDD<Map<String, ?>>> microbatches = ... JavaDStream<Map<String, ?>> javaDStream = jssc.queueStream(microbatches); saveToEs(javaDStream, "my-collection-{media_type}/doc"); jssc.start();

Save each object based on its resource pattern, in this example

Handling document metadataedit

Elasticsearch allows each document to have its own metadata. As explained above, through the various mapping options one can customize these parameters so that their values are extracted from their belonging document. Further more, one can even include/exclude what parts of the data are sent back to Elasticsearch. In Spark, elasticsearch-hadoop extends this functionality allowing metadata to be supplied outside the document itself through the use of pairs.

This is no different in Spark Streaming. For s containing a key-value tuple, the metadata can be extracted from the key and the value used as the document source.

The metadata is described through the Java enum within package which identifies its type - , , , etc…​ Thus a 's keys can be a containing the for each document and its associated values. If the key is not of type , elasticsearch-hadoop will consider the object as representing the document id and use it accordingly. This sounds more complicated than it is, so let us see some examples.

Scalaedit

Pair s, or simply put s with the signature can take advantage of the methods that are available either through the implicit import of package or object. To manually specify the id for each document, simply pass in the (not of type ) in your :

val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // instance of SparkContext val sc = ... // instance of StreamingContext val ssc = ... val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) val microbatches = mutable.Queue(airportsRDD) ssc.queueStream(microbatches) .saveToEsWithMeta("airports/2015") ssc.start()

is a key-value pair ; it is created from a of s

The key of each tuple within the represents the id of its associated value/document; in other words, document has id , and

We construct a which inherits the type signature of the

Since the resulting is a pair , it has the method available. This tells elasticsearch-hadoop to pay special attention to the keys and use them as metadata, in this case as document ids. If would have been used instead, then elasticsearch-hadoop would consider the tuple, that is both the key and the value, as part of the document.

When more than just the id needs to be specified, one should use a with keys of type :

import org.elasticsearch.spark.rdd.Metadata._ val otp = Map("iata" -> "OTP", "name" -> "Otopeni") val muc = Map("iata" -> "MUC", "name" -> "Munich") val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // metadata for each document // note it's not required for them to have the same structure val otpMeta = Map(ID -> 1, TTL -> "3h") val mucMeta = Map(ID -> 2, VERSION -> "23") val sfoMeta = Map(ID -> 3) // instance of SparkContext val sc = ... // instance of StreamingContext val ssc = ... val airportsRDD = sc.makeRDD( Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) val microbatches = mutable.Queue(airportsRDD) ssc.queueStream(microbatches) .saveToEsWithMeta("airports/2015") ssc.start()

Import the enum

The metadata used for document. In this case, with a value of 1 and with a value of

The metadata used for document. In this case, with a value of 2 and with a value of

The metadata used for document. In this case, with a value of 3

The metadata and the documents are assembled into a pair

The inherits the signature from the , becoming a pair

The is configured to index the data accordingly using the method

Javaedit

In a similar fashion, on the Java side, provides methods that are applied to (the equivalent in Java of ).

This tends to involve a little more work due to the Java API’s limitations. For instance, you cannot create a directly from a queue of s. Instead, you must create a regular of objects and convert the into a . This sounds complex, but it’s a simple work around for a limitation of the API.

First, we’ll create a pair function, that takes a object in, and returns it right back to the framework:

public static class ExtractTuples implements PairFunction<Tuple2<Object, Object>, Object, Object>, Serializable { @Override public Tuple2<Object, Object> call(Tuple2<Object, Object> tuple2) throws Exception { return tuple2; } }

Then we’ll apply the pair function to a of s to create a and save it:

import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC"); JavaSparkContext jsc = ... JavaStreamingContext jssc = ... // create an RDD of between the id and the docs JavaRDD<Tuple2<?, ?>> rdd = jsc.parallelize( ImmutableList.of( new Tuple2<Object, Object>(1, otp), new Tuple2<Object, Object>(2, jfk))); Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ... JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()); JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target); jssc.start();

Create a regular of Scala s wrapped around the document id and the document itself

Tuple for the first document wrapped around the id () and the doc () itself

Tuple for the second document wrapped around the id () and

Assemble a regular out of the tuple

Transform the into a by passing our identity function to the method. This will allow the type to be converted to a . This function could be replaced by anything in your job that would extract both the id and the document to be indexed from a single entry.

The is configured to index the data accordingly using the keys as a id and the values as documents

When more than just the id needs to be specified, one can choose to use a populated with keys of type . We’ll use the same typing trick to repack the as a :

import org.elasticsearch.spark.streaming.api.java.JavaEsSparkStreaming; import org.elasticsearch.spark.rdd.Metadata; import static org.elasticsearch.spark.rdd.Metadata.*; // data to be saved Map<String, ?> otp = ImmutableMap.of("iata", "OTP", "name", "Otopeni"); Map<String, ?> sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran"); // metadata for each document // note it's not required for them to have the same structure Map<Metadata, Object> otpMeta = ImmutableMap.<Metadata, Object>of(ID, 1, TTL, "1d"); Map<Metadata, Object> sfoMeta = ImmutableMap.<Metadata, Object> of(ID, "2", VERSION, "23"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaRDD<Tuple2<?, ?>> pairRdd = jsc.parallelize<(ImmutableList.of( new Tuple2<Object, Object>(otpMeta, otp), new Tuple2<Object, Object>(sfoMeta, sfo))); Queue<JavaRDD<Tuple2<?, ?>>> microbatches = ... JavaDStream<Tuple2<?, ?>> dStream = jssc.queueStream(microbatches); JavaPairDStream<?, ?> pairDStream = dstream.mapToPair(new ExtractTuples()) JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target); jssc.start();

describing the document metadata that can be declared

static import for the to refer to its values in short format (, , etc…​)

Metadata for document

Metadata for document

Tuple between (as the value) and its metadata (as the key)

Tuple associating and its metadata

Create a out of the

Repack the into a by mapping the identity function over it.

invoked over the containing documents and their respective metadata

Spark Streaming Type Conversionedit

The elasticsearch-hadoop Spark Streaming support leverages the same type mapping as the regular Spark type mapping. The mappings are repeated here for consistency:

Table 7. Scala Types Conversion Table

Scala typeElasticsearch type

empty

according to the table

case class

(see )

in addition, the following implied conversion applies for Java types:

Table 8. Java Types Conversion Table

Java typeElasticsearch type

or (depending on size)

( format)

( format)

( format)

(BASE64)

Java Bean

(see )

Geo typesIt is worth re-mentioning that rich data types available only in Elasticsearch, such as or are supported by converting their structure into the primitives available in the table above. For example, based on its storage a might be returned as a or a .

Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as distributed SQL query engine.

-- Spark website

On top of the core Spark support, elasticsearch-hadoop also provides integration with Spark SQL. In other words, Elasticsearch becomes a native source for Spark SQL so that data can be indexed and queried from Spark SQL transparently.

Spark SQL works with structured data - in other words, all entries are expected to have the same structure (same number of fields, of the same type and name). Using unstructured data (documents with different structures) is not supported and will cause problems. For such cases, use s.

Supported Spark SQL versionsedit

Spark SQL while becoming a mature component, is still going through significant changes between releases. Spark SQL became a stable component in version 1.3, however it is not backwards compatible with the previous releases. Further more Spark 2.0 introduced significant changed which broke backwards compatibility, through the API. elasticsearch-hadoop supports both version Spark SQL 1.3-1.6 and Spark SQL 2.0 through two different jars: and support Spark SQL 1.3-1.6 (or higher) while supports Spark SQL 2.0. In other words, unless you are using Spark 2.0, use

Spark SQL support is available under package.

API differencesFrom the elasticsearch-hadoop user perspectives, the differences between Spark SQL 1.3-1.6 and Spark 2.0 are fairly consolidated. This document describes at length the differences which are briefly mentioned below:

vs
The core unit of Spark SQL in 1.3+ is a . This API remains in Spark 2.0 however underneath it is based on a
Unified API vs dedicated Java/Scala APIs
In Spark SQL 2.0, the APIs are further unified by introducing and by using the same backing code for both `Dataset`s, `DataFrame`s and `RDD`s.

As conceptually, a is a , the documentation below will focus on Spark SQL 1.3-1.6.

Writing (Spark SQL 1.3+) to Elasticsearchedit

With elasticsearch-hadoop, s (or any for that matter) can be indexed to Elasticsearch.

Scalaedit

In Scala, simply import package which enriches the given class with methods; while these have the same signature as the package, they are designed for implementations:

// reusing the example from Spark SQL documentation import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SQLContext._ import org.elasticsearch.spark.sql._ ... // sc = existing SparkContext val sqlContext = new SQLContext(sc) // case class used to define the DataFrame case class Person(name: String, surname: String, age: Int) // create DataFrame val people = sc.textFile("people.txt") .map(_.split(",")) .map(p => Person(p(0), p(1), p(2).trim.toInt)) .toDF() people.saveToEs("spark/people")

Spark SQL package import

elasticsearch-hadoop Spark package import

Read a text file as normal and map it to a (using the case class)

Index the resulting to Elasticsearch through the method

By default, elasticsearch-hadoop will ignore null values in favor of not writing any field at all. Since a is meant to be treated as structured tabular data, you can enable writing nulls as null valued fields for Objects only by toggling the setting to .

Javaedit

In a similar fashion, for Java usage the dedicated package provides similar functionality through the :

import org.apache.spark.sql.api.java.*; import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL; ... DataFrame people = ... JavaEsSparkSQL.saveToEs(people, "spark/people");

Spark SQL Java imports

elasticsearch-hadoop Spark SQL Java imports

index the in Elasticsearch under

Again, with Java 5 static imports this can be further simplied to:

import static org.elasticsearch.spark.sql.api.java.JavaEsSpark SQL; ... saveToEs("spark/people");

statically import

call method without having to type again

For maximum control over the mapping of your in Elasticsearch, it is highly recommended to create the mapping before hand. See this chapter for more information.

Writing existing JSON to Elasticsearchedit

When using Spark SQL, if the input data is in JSON format, simply convert it to a (in Spark SQL 1.3) or a (for Spark SQL 2.0) (as described in Spark documentation) through / methods.

Using pure SQL to read from Elasticsearchedit

The index and its mapping, have to exist prior to creating the temporary table

Spark SQL 1.2 introduced a new API for reading from external data sources, which is supported by elasticsearch-hadoop simplifying the SQL configured needed for interacting with Elasticsearch. Further more, behind the scenes it understands the operations executed by Spark and thus can optimize the data and queries made (such as filtering or pruning), improving performance.

Data Sources in Spark SQLedit

When using Spark SQL, elasticsearch-hadoop allows access to Elasticsearch through method. In other words, to create a / backed by Elasticsearch in a declarative manner:

val sql = new SQLContext... // Spark 1.3 style val df = sql.load( "spark/index", "org.elasticsearch.spark.sql")

experimental method for arbitrary data sources

path or resource to load - in this case the index/type in Elasticsearch

the data source provider -

In Spark 1.4, one would use the following similar API calls:

// Spark 1.4 style val df = sql.read .format("org.elasticsearch.spark.sql") .load("spark/index")

experimental method for arbitrary data sources

the data source provider -

path or resource to load - in this case the index/type in Elasticsearch

In Spark 1.5, this can be further simplified to:

// Spark 1.5 style val df = sql.read.format("es") .load("spark/index")

Use as an alias instead of the full package name for the provider

Whatever API is used, once created, the can be accessed freely to manipulate the data.

The sources declaration also allows specific options to be passed in, namely:

NameDefault valueDescription

required

Elasticsearch index/type

Whether to translate (push-down) Spark SQL into Elasticsearch Query DSL

Whether to use exact (not analyzed) matching or not (analyzed)

Usable in Spark 1.6 or higher

Whether to tell Spark apply its own filtering on the filters pushed down

Both options are explained in the next section. To specify the options (including the generic elasticsearch-hadoop ones), one simply passes a to the aforementioned methods:

For example:

val sql = new SQLContext... // options for Spark 1.3 need to include the target path/resource val options13 = Map("path" -> "spark/index", "pushdown" -> "true", "es.nodes" -> "someNode", "es.port" -> "9200") // Spark 1.3 style val spark13DF = sql.load("org.elasticsearch.spark.sql", options13) // options for Spark 1.4 - the path/resource is specified separately val options = Map("pushdown" -> "true", "es.nodes" -> "someNode", "es.port" -> "9200") // Spark 1.4 style val spark14DF = sql.read.format("org.elasticsearch.spark.sql") .options(options) .load("spark/index")

option - specific to Spark data sources

configuration option

pass the options when definition/loading the source

sqlContext.sql( "CREATE TEMPORARY TABLE myIndex " + "USING org.elasticsearch.spark.sql " + "OPTIONS (resource 'spark/index', nodes 'someNode')" ) "

Spark’s temporary table name

clause identifying the data source provider, in this case

elasticsearch-hadoop configuration options, the mandatory one being . The prefix is fixed due to the SQL parser

Do note that due to the SQL parser, the (among other common characters used for delimiting) is not allowed; the connector tries to work around it by append the prefix automatically however this works only for specifying the configuration options with only one (like above). Because of this, if properties with multiple are needed, one should use the or methods above and pass the properties as a .

Push-Down operationsedit

An important hidden feature of using elasticsearch-hadoop as a Spark is that the connector understand the operations performed within the /SQL and, by default, will translate them into the appropriate QueryDSL. In other words, the connector pushes down the operations directly at the source, where the data is efficiently filtered out so that only the required data is streamed back to Spark. This significantly increases the queries performance and minimizes the CPU, memory and I/O on both Spark and Elasticsearch clusters as only the needed data is returned (as oppose to returning the data in bulk only to be processed and discarded by Spark). Note the push down operations apply even when one specifies a query - the connector will enhance it according to the specified SQL.

As a side note, elasticsearch-hadoop supports all the `Filter`s available in Spark (1.3.0 and higher) while retaining backwards binary-compatibility with Spark 1.3.0, pushing down to full extent the SQL operations to Elasticsearch without any user interference.

Operators those have been optimized as pushdown filters:

SQL syntaxES 1.x/2.x syntaxES 5.x syntax

= null , is_null

missing

must_not.exists

= (strict)

term

term

= (not strict)

match

match

> , < , >= , ⇐

range

range

is_not_null

exists

exists

in (strict)

terms

terms

in (not strict)

or.filters

bool.should

and

and.filters

bool.filter

or

or.filters

bool.should [bool.filter]

not

not.filter

bool.must_not

StringStartsWith

wildcard(arg*)

wildcard(arg*)

StringEndsWith

wildcard(*arg)

wildcard(*arg)

StringContains

wildcard(*arg*)

wildcard(*arg*)

EqualNullSafe (strict)

term

term

EqualNullSafe (not strict)

match

match

To wit, consider the following Spark SQL:

// as a DataFrame val df = sqlContext.read().format("org.elasticsearch.spark.sql").load("spark/trips") df.printSchema() // root //|-- departure: string (nullable = true) //|-- arrival: string (nullable = true) //|-- days: long (nullable = true) val filter = df.filter(df("arrival").equalTo("OTP").and(df("days").gt(3))

or in pure SQL:

CREATE TEMPORARY TABLE trips USING org.elasticsearch.spark.sql OPTIONS (path "spark/trips") SELECT departure FROM trips WHERE arrival = "OTP" and days > 3

The connector translates the query into:

{ "query" : { "filtered" : { "query" : { "match_all" : {} }, "filter" : { "and" : [{ "query" : { "match" : { "arrival" : "OTP" } } }, { "days" : { "gt" : 3 } } ] } } } }

Further more, the pushdown filters can work on terms (the default) or can be configured to be strict and provide matches (work only on fields). Unless one manually specifies the mapping, it is highly recommended to leave the defaults as they are. This and other topics are discussed at length in the Elasticsearch Reference Documentation.

Note that , available since elasticsearch-hadoop 2.2 for Spark 1.6 or higher, allows filters that are already pushed down to Elasticsearch to be processed/evaluated by Spark as well (default) or not. Turning this feature off, especially when dealing with large data sizes speed things up. However one should pay attention to the semantics as turning this off, might return different results (depending on how the data is indexed, vs ). In general, when turning strict on, one can disable as well.

Data Sources as tablesedit

Available since Spark SQL 1.2, one can also access a data source by declaring it as a Spark temporary table (backed by elasticsearch-hadoop):

sqlContext.sql( "CREATE TEMPORARY TABLE myIndex " + "USING org.elasticsearch.spark.sql " + "OPTIONS (resource 'spark/index', " + "scroll_size '20')" )

Spark’s temporary table name

clause identifying the data source provider, in this case

elasticsearch-hadoop configuration options, the mandatory one being . One can use the prefix or skip it for convenience.

Since using can cause syntax exceptions, one should replace it instead with style. Thus, in this example becomes (as the leading can be removed). Do note this only works in Spark 1.3 as the Spark 1.4 has a stricter parser. See the chapter above for more information.

Once defined, the schema is picked up automatically. So one can issue queries, right away:

val all = sqlContext.sql("SELECT * FROM myIndex WHERE id <= 10")

As elasticsearch-hadoop is aware of the queries being made, it can optimize the requests done to Elasticsearch. For example, given the following query:

val names = sqlContext.sql("SELECT name FROM myIndex WHERE id >=1 AND id <= 10")

it knows only the and fields are required (the first to be returned to the user, the second for Spark’s internal filtering) and thus will ask only for this data, making the queries quite efficient.

Reading s (Spark SQL 1.3) from Elasticsearchedit

As you might have guessed, one can define a backed by Elasticsearch documents. Or even better, have them backed by a query result, effectively creating dynamic, real-time views over your data.

Scalaedit

Through the package, methods are available on the API:

import org.apache.spark.sql.SQLContext import org.elasticsearch.spark.sql._ ... val sql = new SQLContext(sc) val people = sql.esDF("spark/people") // check the associated schema println(people.schema.treeString) // root // |-- name: string (nullable = true) // |-- surname: string (nullable = true) // |-- age: long (nullable = true)

Spark SQL Scala imports

elasticsearch-hadoop SQL Scala imports

create a backed by the index in Elasticsearch

the associated schema discovered from Elasticsearch

notice how the field was transformed into a when using the default Elasticsearch mapping as discussed in the Mapping and Types chapter.

And just as with the Spark core support, additional parameters can be specified such as a query. This is quite a powerful concept as one can filter the data at the source (Elasticsearch) and use Spark only on the results:

// get only the Smiths val smiths = sqlContext.esDF("spark/people","?q=Smith")

Elasticsearch query whose results comprise the

Controlling the schemaIn some cases, especially when the index in Elasticsearch contains a lot of fields, it is desireable to create a that contains only a subset of them. While one can modify the (by working on its backing ) through the official Spark API or through dedicated queries, elasticsearch-hadoop allows the user to specify what fields to include and exclude from Elasticsearch when creating the .

Through and properties, one can indicate what fields to include or exclude from the index mapping. The syntax is similar to that of Elasticsearch include/exclude. Multiple values can be specified by using a comma. By default, no value is specified meaning all properties/fields are included and no properties/fields are excluded.

For example:

# include es.read.field.include = *name, address.* # exclude es.read.field.exclude = *.created

Due to the way SparkSQL works with a schema, elasticsearch-hadoop needs to be aware of what fields are returned from Elasticsearch before executing the actual queries. While one can restrict the fields manually through the underlying Elasticsearch query, elasticsearch-hadoop is unaware of this and the results are likely to be different or worse, errors will occur. Use the properties above instead, which Elasticsearch will properly use alongside the user query.

Javaedit

For Java users, a dedicated API exists through . It is strikingly similar to however it allows configuration options to be passed in through Java collections instead of Scala ones; other than that using the two is exactly the same:

import org.apache.spark.sql.api.java.JavaSQLContext; import org.elasticsearch.spark.sql.api.java.JavaEsSparkSQL; ... SQLContext sql = new SQLContext(sc); DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people");

Spark SQL import

elasticsearch-hadoop import

create a Java backed by an Elasticsearch index

Better yet, the can be backed by a query result:

DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people", "?q=Smith");

Elasticsearch query backing the elasticsearch-hadoop

Spark SQL Type conversionedit

When dealing with multi-value/array fields, please see this section and in particular these configuration options. IMPORTANT: If automatic index creation is used, please review this section for more information.

elasticsearch-hadoop automatically converts Spark built-in types to Elasticsearch types (and back) as shown in the table below:

While Spark SQL s have an equivalent in both Scala and Java and thus the RDD conversion can apply, there are slightly different semantics - in particular with the types due to the way Spark SQL handles them:

Table 9. Spark SQL 1.3+ Conversion Table

Spark SQL Elasticsearch type

(BASE64)

( format)

(unix time)

Geo Types Conversion TableIn addition to the table above, for Spark SQL 1.3 or higher, elasticsearch-hadoop performs automatic schema detection for geo types, namely Elasticsearch and . Since each type allows multiple formats ( accepts latitude and longitude to be specified in 4 different ways, while allows a variety of types (currently 9)) and the mapping does not provide such information, elasticsearch-hadoop will sample the determined geo fields at startup and retrieve an arbitrary document that contains all the relevant fields; it will parse it and thus determine the necessary schema (so for example it can tell whether a is specified as a or as an ).

Since Spark SQL is strongly-typed, each geo field needs to have the same format across all documents. Shy of that, the returned data will not fit the detected schema and thus lead to errors.

Spark Structured Streaming supportedit

Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.

-- Spark documentation

Released as an experimental feature in Spark 2.0, Spark Structured Streaming provides a unified streaming and batch interface built into the Spark SQL integration. As of elasticsearch-hadoop 6.0, we provide native functionality to index streaming data into Elasticsearch.

Like Spark SQL, Structured Streaming works with structured data. All entries are expected to have the same structure (same number of fields, of the same type and name). Using unstructured data (documents with different structures) is not supported and will cause problems. For such cases, use s.

Supported Spark Structured Streaming versionsedit

Spark Structured Streaming is considered generally available as of Spark v2.2.0. As such, elasticsearch-hadoop support for Structured Streaming (available in elasticsearch-hadoop 6.0+) is only compatible with Spark versions 2.2.0 and onward. Similar to Spark SQL before it, Structured Streaming may be subject to significant changes between releases before its interfaces are considered stable.

Spark Structured Streaming support is available under the and packages. It shares a unified interface with Spark SQL in the form of the api. Clients can interact with streaming s in almost exactly the same way as regular batch s with only a few exceptions.

Writing Streaming (Spark SQL 2.0+) to Elasticsearchedit

With elasticsearch-hadoop, Stream-backed s can be indexed to Elasticsearch.

Scalaedit

In Scala, to save your streaming based s and s to Elasticsearch, simply configure the stream to write out using the format, like so:

import org.apache.spark.sql.SparkSession ... val spark = SparkSession.builder() .appName("EsStreamingExample") .getOrCreate() // case class used to define the DataFrame case class Person(name: String, surname: String, age: Int) // create DataFrame val people = spark.readStream .textFile("/path/to/people/files/*") .map(_.split(",")) .map(p => Person(p(0), p(1), p(2).trim.toInt)) people.writeStream .option("checkpointLocation", "/save/location") .format("es") .start("spark/people")

Spark SQL import

Create

Instead of calling , call to get instance of

Read a directory of text files continuously and convert them into objects

Provide a location to save the offsets and commit logs for the streaming query

Start the stream using the format to index the contents of the continuously to Elasticsearch

Spark makes no type-based differentiation between batch and streaming based s. While you may be able to import the package to add methods to your or , it will throw an illegal argument exception if those methods are called on streaming based s or s.

Javaedit

In a similar fashion, the format is available for Java usage as well:

import org.apache.spark.sql.SparkSession ... SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .getOrCreate(); // java bean style class public static class PersonBean { private String name; private String surname; private int age; ... } Dataset<PersonBean> people = spark.readStream() .textFile("/path/to/people/files/*") .map(new MapFunction<String, PersonBean>() { @Override public PersonBean call(String value) throws Exception { return someFunctionThatParsesStringToJavaBeans(value.split(",")); } }, Encoders.<PersonBean>bean(PersonBean.class)); people.writeStream() .option("checkpointLocation", "/save/location") .format("es") .start("spark/people");

Spark SQL Java imports. Can use the same session class as Scala

Create SparkSession. Can also use the legacy api

We create a java bean class to be used as our data format

Use the method to get a to begin building our stream

Convert our string data into our PersonBean

Set a place to save the state of our stream

Using the format, we continuously index the in Elasticsearch under

Writing existing JSON to Elasticsearchedit

When using Spark SQL, if the input data is in JSON format, simply convert it to a (for Spark SQL 2.0) (as described in Spark documentation) through the 's format.

Sink commit log in Spark Structured Streamingedit

Spark Structured Streaming advertises an end-to-end fault-tolerant exactly-once processing model that is made possible through the usage of offset checkpoints and maintaining commit logs for each streaming query. When executing a streaming query, most sources and sinks require you to specify a "checkpointLocation" in order to persist the state of your job. In the event of an interruption, launching a new streaming query with the same checkpoint location will recover the state of the job and pick up where it left off. We maintain a commit log for elasticsearch-hadoop’s Elasticsearch sink implementation in a special directory under the configured checkpoint location:

$> ls /path/to/checkpoint/location metadata offsets/ sinks/ $> ls /path/to/checkpoint/location/sinks elasticsearch/ $> ls /path/to/checkpoint/location/sinks/elasticsearch 12.compact 13 14 15 16 17 18

Each file in the commit log directory corresponds to a batch id that has been committed. The log implementation periodically compacts the logs down to avoid clutter. You can set the location for the log directory a number of ways:

  1. Set the explicit log location with (see below).
  2. If that is not set, then the path specified by will be used.
  3. If that is not set, then a path will be constructed by combining the value of from the SparkSession with the 's given query name.
  4. If no query name is present, then a random UUID will be used in the above case instead of the query name
  5. If none of the above settings are provided then the call will throw an exception

Here is a list of configurations that affect the behavior of Elasticsearch’s commit log:

(default )
Enables or disables the commit log for a streaming job. By default, the log is enabled, and output batches with the same batch id will be skipped to avoid double-writes. When this is set to , the commit log is disabled, and all outputs will be sent to Elasticsearch, regardless if they have been sent in a previous execution.
Sets the location to store the log data for this streaming query. If this value is not set, then the Elasticsearch sink will store its commit logs under the path given in . Any HDFS Client compatible URI is acceptable.
(default )
The commit log is managed through Spark’s HDFS Client. Some HDFS compatible filesystems (like Amazon’s S3) propagate file changes in an asynchronous manner. To get around this, after a set of log files have been compacted, the client will wait for this amount of time before cleaning up the old files.
(default )
Determines if the log should delete old logs that are no longer needed. After every batch is committed, the client will check to see if there are any commit logs that have been compacted and are safe to be removed. If set to , the log will skip this cleanup step, leaving behind a commit file for each batch.
(default )
Sets the number of batches to process before compacting the log files. By default, every 10 batches the commit log will be compacted down into a single file that contains all previously committed batch ids.

Spark Structured Streaming Type conversionedit

Structured Streaming uses the exact same type conversion rules as the Spark SQL integration.

When dealing with multi-value/array fields, please see this section and in particular these configuration options.

If automatic index creation is used, please review this section for more information.

elasticsearch-hadoop automatically converts Spark built-in types to Elasticsearch types as shown in the table below:

While Spark SQL s have an equivalent in both Scala and Java and thus the RDD conversion can apply, there are slightly different semantics - in particular with the types due to the way Spark SQL handles them:

Table 10. Spark SQL 1.3+ Conversion Table

Spark SQL Elasticsearch type

(BASE64)

( format)

(unix time)

Using the Map/Reduce layeredit

Another way of using Spark with Elasticsearch is through the Map/Reduce layer, that is by leveraging the dedicated in elasticsearch-hadoop. However, unless one is stuck on elasticsearch-hadoop 2.0, we strongly recommend using the native integration as it offers significantly better performance and flexibility.

Configurationedit

Through elasticsearch-hadoop, Spark can integrate with Elasticsearch through its dedicated , and in case of writing, through . These are described at length in the Map/Reduce chapter so please refer to that for an in-depth explanation.

In short, one needs to setup a basic Hadoop object with the target Elasticsearch cluster and index, potentially a query, and she’s good to go.

From Spark’s perspective, the only thing required is setting up serialization - Spark relies by default on Java serialization which is convenient but fairly inefficient. This is the reason why Hadoop itself introduced its own serialization mechanism and its own types - namely s. As such, and s are required to return which, out of the box, Spark does not understand. The good news is, one can easily enable a different serialization (Kryo) which handles the conversion automatically and also does this quite efficiently.

SparkConf sc = new SparkConf(); //.setMaster("local"); sc.set("spark.serializer", KryoSerializer.class.getName()); // needed only when using the Java API JavaSparkContext jsc = new JavaSparkContext(sc);

Enable the Kryo serialization support with Spark

Or if you prefer Scala

val sc = new SparkConf(...) sc.set("spark.serializer", classOf[KryoSerializer].getName)

Enable the Kryo serialization support with Spark

Note that the Kryo serialization is used as a work-around for dealing with types; one can choose to convert the types directly (from to types) - which is fine however for getting started, the one liner above seems to be the most effective.

Reading data from Elasticsearchedit

To read data, simply pass in the class - since it supports both the and the Map/Reduce APIs, you are free to use either method on 's, (which we recommend for conciseness reasons) or . Which ever you chose, stick with it to avoid confusion and problems down the road.

Old () APIedit
JobConf conf = new JobConf(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); JavaPairRDD esRDD = jsc.hadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); long docCount = esRDD.count();

Create the Hadoop object (use the old API)

Configure the source (index)

Setup the query (optional)

Create a Spark on top of Elasticsearch through - the key represents the doc id, the value the doc itself

The Scala version is below:

val conf = new JobConf() conf.set("es.resource", "radio/artists") conf.set("es.query", "?q=me*") val esRDD = sc.hadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable])) val docCount = esRDD.count();

Create the Hadoop object (use the old API)

Configure the source (index)

Setup the query (optional)

Create a Spark on top of Elasticsearch through

New () APIedit

As expected, the API version is strikingly similar - replace with and with . That’s about it.

Configuration conf = new Configuration(); conf.set("es.resource", "radio/artists"); conf.set("es.query", "?q=me*"); JavaPairRDD esRDD = jsc.newAPIHadoopRDD(conf, EsInputFormat.class, Text.class, MapWritable.class); long docCount = esRDD.count();

Create the Hadoop object (use the new API)

Configure the source (index)

Setup the query (optional)

Create a Spark on top of Elasticsearch through - the key represent the doc id, the value the doc itself

The Scala version is below:

val conf = new Configuration() conf.set("es.resource", "radio/artists") conf.set("es.query", "?q=me*") val esRDD = sc.newAPIHadoopRDD(conf, classOf[EsInputFormat[Text, MapWritable]], classOf[Text], classOf[MapWritable])) val docCount = esRDD.count();

Create the Hadoop object (use the new API)

Configure the source (index)

Setup the query (optional)

Create a Spark on top of Elasticsearch through

Using the connector from PySparkedit

Thanks to its Map/Reduce layer, elasticsearch-hadoop can be used from PySpark as well to both read and write data to Elasticsearch. To wit, below is a snippet from the Spark documentation (make sure to switch to the Python snippet):

$ ./bin/pyspark --driver-class-path=/path/to/elasticsearch-hadoop.jar >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\ "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) >>> rdd.first() # the result is a MapWritable that is converted to a Python dict (u'Elasticsearch ID', {u'field1': True, u'field2': u'Some Text', u'field3': 12345})

Also, the SQL loader can be used as well:

from pyspark.sql import SQLContext sqlContext = SQLContext(sc) df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type") df.printSchema()
Sours: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html
How to write Apache Spark DataFrames to Elasticsearch

sumologic / elasticsearch-client

Build Statuscodecov.ioJoin the chat at https://gitter.im/SumoLogic/elasticsearch-client

Elasticsearch Client

The Sumo Logic Elasticsearch library provides Elasticsearch bindings with a Scala DSL. Unlike other Scala libraries like elastic4s this library targets the REST API. The REST API has a two primary advantages:

  1. Ability to upgrade Elasticsearch without the need to atomically also upgrade the client.
  2. Ability to use hosted Elasticsearch such as the version provided by AWS.

This project is currently targeted at Elasticsearch 6.0.x. For ES 2.3 compatibility see version 3 ( branch).

Along with a basic Elasticsearch client (), helper functionality for using Elasticsearch with Akka () and AWS () is also provided. The goal of the DSL is to keep it as simple as possible, occasionally sacrifing some end-user boilerplate to maintain a DSL that is easy to modify and add to. The DSL attempts to be type-safe in that it should be impossible to create an invalid Elasticsearch query. Rather than be as compact as possible, the DSL aims to closely reflect the JSON it generates when reasonable. This makes it easier discover how to access functionality than a traditional maximally compact DSL.

Install / Download

The library components are offered a la carte:

  • contains the basic Elasticsearch client and typesafe DSL
  • contains utilities for using AWS Hosted Elasticsearch.
  • contains Actors to use with Akka & Akka Streams

Usage

All client methods return futures that can be composed to perform multiple actions.

Basic Usage

valrestClient=newRestlasticSearchClient(newStaticEndpoint(newEndpoint(host, port))) valindex=Index("index-name") valtpe=Type("type") valindexFuture=for { _ <- restClient.createIndex(index) indexResult <- restClient.index(index, tpe, Document("docId", Map("text"->"Hello World!"))) } Await.result(indexFuture, 10.seconds) // Need to wait for a flush. In ElasticsearchIntegrationTest, you can just call "refresh()"Thread.sleep(2000) restClient.query(index, tpe, QueryRoot(TermQuery("text", "Hello World!"))).map { res => println(res.sourceAsMap) // List(Map(text -> Hello World)) }

https://github.com/SumoLogic/elasticsearch-client/blob/master/elasticsearch-core/src/test/scala/com/sumologic/elasticsearch/restlastic/RestlasticSearchClientTest.scala provides other basic examples.

Using the BulkIndexerActor

NOTE: to use the BulkIndexerActor you must add a dependency to .

The BulkIndexer actor provides a single-document style interface that actually delegates to the bulk API. This allows you to keep your code simple while still getting the performance benefits of bulk inserts and updates. The BulkIndexerActor has two configurable parameters:

  • : The number of documents at which a bulk request will be flushed
  • : If the limit is not hit, it will flush after .
valrestClient:RestlasticSearchClient= ... val (index, tpe) = (Index("i"), Type("t")) // Designed for potentially dynamic configuration:valconfig=newBulkConfig( flushDuration = () =>FiniteDuration(2, TimeUnit.Seconds), maxDocuments = () =>2000) valbulkActor= context.actorOf(BulkIndexerActor.props(restClient, config)) valsess=BulkSession.create() valresultFuture= bulkActor ?CreateRequest(sess, index, tpe, Document("id", Map("k"->"v"))) valresultFuture2= bulkActor ?CreateRequest(sess, index, tpe, Document("id", Map("k"->"v"))) // The resultFuture contains a `sessionId` you can use to match up replies with requests assuming you do not use// the ask pattern as above.// The two requests will be batched into a single bulk request and sent to Elasticsearch

You can also use the Bulk api directly via the REST client:

restClient.bulkIndex(index, tpe, Seq(doc1, doc2, doc3))

Usage With AWS

One common way to configure AWS Elasticsearch is with IAM roles. This requires you to sign every request you send to Elasticsearch with your use key. The module includes a request signer for this purpose:

importcom.sumologic.elasticsearch.util.AwsRequestSignerimportcom.amazonaws.auth.AWSCredentialsvalawsCredentials= _ // Credentials for the AWS user that has permissions to access Elasticsearchvalsigner=newAwsRequestSigner(awsCredentials, "REGION", "es") // You can also create your own dynamic endpoint class based off runtime configuration or the AWS API.valendpoint=newStaticEndpoint(newEndpoint("es.blahblahblah.amazon.com", 443)) valrestClient=newRestlasticSearchClient(endpoint, Some(signer))

will now sign every request automatically with your AWS credentials.

Contributing

Sumo Logic Elasticsearch uses Maven and the Maven GPG Plug-in for builds and testing. After cloning the repository make sure you have a GPG key created. Then run .

[Dev] Building

To build project in default Scala version:

To build project in any supported Scala version:

[Dev] Testing

Tests in this project are run against local Elasticsearch servers es23 es63.

For testing, change your consumer or to depend on the version generated. Make sure, your consumer can resolve artifacts from a local repository.

[Dev] Managing Scala versions

This project supports multiple versions of Scala. Supported versions are listed in .

  • - list of supported versions (Gradle prevents building with versions from outside this list)
  • - default version of Scala used for building - can be overridden with

[Dev] How to release new version

  1. Make sure you have all credentials - access to vault in 1Password.
    1. Can login as https://oss.sonatype.org/index.html
    2. Can import and verify the signing key ( from vault):
    3. Have nexus and signing credentials in
  2. Remove suffix from in
  3. Make a release branch with Scala version and project version, ex. :
  4. Perform a release in selected Scala versions (make sure both commands pass without any errors, otherwise go to the link below, drop created repo(s) and try again):
  5. Go to https://oss.sonatype.org/index.html#stagingRepositories, search for com.sumologic, close and release your repo (there should only be one). NOTE: If you had to login, reload the URL. It doesn't take you to the right page post-login
  6. Update the and with the new version and set upcoming snapshot in , ex.
  7. Commit the change and push as a PR:
Sours: https://index.scala-lang.org/sumologic/elasticsearch-client

Elasticsearch scala

Reading Time: 2minutes

ElasticSearch is a real-time distributed search and analytics engine built on top of Apache Lucene. It is used for full-text search, structured search and analytics.

Lucene is just a library and to leverage its power you need to use Java. Integrating Lucene directly with your application is a very complex task.

Elastic Search uses the indexing and searching capabilities of Lucene but hides the complexities behind a simple RESTful API.

In this post we will learn to perform basic CRUD operations using Elastic search transport client in Scala with sbt as our build-tool.

Let us start by downloading Elasticsearch from here and unzipping it.

Execute the following command to run Elastic search in foreground:

Test it out by opening another terminal window and running the following:

To start with the coding part, create a new sbt project and add the following dependency in the build.sbt file.

Next, we need to create a client that will talk to the elaticsearch server.

Once the client is created we can query the Elastic search server.

The following example inserts a JSON document into an index called library, under a type called books.

An index is like a database and a type is like a table in Elastic search.

Lets create our first json document.

To add a json into the Elasticsearch add the following code to your project:

The prepareIndex method takes 3 arguments:- index name,type,id. The id argument is optional. If you do not specify an id Elastic search will automatically generate an id for the document.

Note that the title of the book is Elastic and not Elastic search. Lets correct this by executing an update on the document:

Lets search for our document and see whether the document is updated or not
Execute the following code to search for a document:

client.prepareSearch("library").setTypes("books") .setQuery(QueryBuilders.termQuery("_id","1")).get()

The id that we specified while adding the document is stored as “_id”.
If you do not specify the setQuery method then Elastic search will get all the documents in the type books.

Finally to delete a document execute the following code:

Elasticsearch also provides bulk API used to insert multiple documents onto the Elastic search server in a single API call.
To use the bulk API create a file in the following format:

Now lets create a bulk request and add the following documents to the request:
Open an InputStream and read the json file you just created and store the data in a list named fileData.

We are done with the CRUD operations. You can read more from the Elasticsearch docs.
Get the source code from here.

Happy Searching!!!!

Related

Sours: https://blog.knoldus.com/introduction-to-elasticsearch-in-scala/
Clinton Gormley - Scaling real time search and analytics with Elasticsearch

elastic4s - Elasticsearch Scala Client

master

Elastic4s is a concise, idiomatic, reactive, type safe Scala client for Elasticsearch. The official Elasticsearch Java client can of course be used in Scala, but due to Java's syntax it is more verbose and it naturally doesn't support classes in the core Scala core library nor Scala idioms such as typeclass support.

Elastic4s's DSL allows you to construct your requests programatically, with syntactic and semantic errors manifested at compile time, and uses standard Scala futures to enable you to easily integrate into an asynchronous workflow. The aim of the DSL is that requests are written in a builder-like way, while staying broadly similar to the Java API or Rest API. Each request is an immutable object, so you can create requests and safely reuse them, or further copy them for derived requests. Because each request is strongly typed your IDE or editor can use the type information to show you what operations are available for any request type.

Elastic4s supports Scala collections so you don't have to do tedious conversions from your Scala domain classes into Java collections. It also allows you to index and read classes directly using typeclasses so you don't have to set fields or json documents manually. These typeclasses are generated using your favourite json library - modules exist for Jackson, Circe, Json4s, PlayJson and Spray Json. The client also uses standard Scala durations to avoid the use of strings or primitives for duration lengths.

Key points

  • Type safe concise DSL
  • Integrates with standard Scala futures or other effects libraries
  • Uses Scala collections library over Java collections
  • Returns where the java methods would return null
  • Uses Scala s instead of strings/longs for time values
  • Supports typeclasses for indexing, updating, and search backed by Jackson, Circe, Json4s, PlayJson and Spray Json implementations
  • Supports Java and Scala HTTP clients such as Akka-Http
  • Provides reactive-streams implementation
  • Provides a testkit subproject ideal for your tests

Release

Current Elastic4s versions support Scala 2.12 and 2.13. Scala 2.10 support has been dropped starting with 5.0.x and Scala 2.11 has been dropped starting with 7.2.0. For releases that are compatible with earlier versions of Elasticsearch, search maven central.

Sours: https://github.com/sksamuel/elastic4s

You will also be interested:

I wanted to learn ElasticSearch using the Scala library Elastic4s.

First let us look at the SBT imports

Here the entry log4j entries are a real life safer. i would have missed many issues with my applicaiton had I not been prodent enough to import them. Next we need to configure the log4j logger so that we can troubleshoot our appliction easily. Create a file called log4j.xml in

Installing elastic search on my remote server was pretty easy

  1. brew install elasticsearch
  2. search for a file called elasticsearch.yml and add the following line to it network.host: myremote-server
  3. brew services start elasticsearch

Its import to set the bind address otherwise you will not be able to connect remotely to elastic search. I always install all server products remotely so that they don’t slow my MBP down.

In order to ensure that our product is up and running point your browser to http://myremote-server:9200. You should see the following output

Make a note your the cluster name because this is needed. A lot of documentation on the internet assumes that your cluster name is elasticsearch and that is why programs fail to connect to elastic search.

Now we will write two applications. One using TcpClient and other using HttpClient to interact with the ElasticSearch server.

TcpClient.

In order to connect to elasticsearch using the TcpClient, we must be aware of our cluster name.

Note that I had to specify the name of the cluster in the properties. without this the application would not have worked.

Now we need to complete 3 tasks. We need to create an Index, Insert a document in that index and finally query the document. To complete each task you will need to familiarize yourself with the Elastic4s DSL. The pattern to use the DSL is pretty consistent. you first create the DSL command then execute the command using the client.

  • Create Index DSL.
    • Insert a document
      • Query the document

        Now all we need to do is to tie in these 3 DSL commands into our application and execute them.

        We will wrap these in Functions and then invoke those functions in a monadic way. Function to create the index

        Function to Insert a document in the Index

        And the Function to query the document

        Now we tie these together as

        HttpClient.

        Establishing a connection via the HttpClient is a little easier because we can connect just with the server name and port (without knowing the cluster name)

        The process of using the DSL is the same. We just have to be careful that we import the Http DSL I just imported the previous TcpClient DSL at first and my application had tons of compiler errors.

        Once again we write our 3 functions to create the index, insert documents in the index and then query.

        and finally we can connect all the 3 functions by means of a simple for statement

        The whole application can be found at my github


        Sours: https://abhsrivastava.github.io/2017/09/30/Elastic4s/


        936 937 938 939 940