content[-]

official link how to write a sink to ES

https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/elasticsearch/ maven pom.xml

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7</artifactId>
            <version>1.16.3</version>
        </dependency>
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.{ElasticsearchSink, RestClientFactory}
import org.apache.http.HttpHost
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.impl.client.BasicCredentialsProvider
import org.elasticsearch.client.{Requests, RestClientBuilder}
import java.util

case class Event(user: String, url: String, timestamp: Long)

object FlinkToEsTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream : DataStream[Event] = env.fromElements(Event("fei", "./aws", 1000L),
      Event("lisa", "./aws", 2000L),
      Event("qiao", "./aws", 2000L),
    )

    val httpHosts = new util.ArrayList[HttpHost]()
    httpHosts.add(new HttpHost("localhost", 9200))

    // Set up RestClientFactory with credentials
    val restClientFactory = new RestClientFactory {
      override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
        val credentialsProvider = new BasicCredentialsProvider()
        credentialsProvider.setCredentials(
          AuthScope.ANY,
          new UsernamePasswordCredentials("elastic", "*******")
        )
        restClientBuilder.setHttpClientConfigCallback(
          httpClientConfigCallback => httpClientConfigCallback.setDefaultCredentialsProvider(credentialsProvider)
        )
      }
    }

    // Set up ElasticsearchSink
    val elasticsearchSinkBuilder = new ElasticsearchSink.Builder[Event](
      httpHosts,
      new ElasticsearchSinkFunction[Event] {
        override def process(t: Event, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
          val data = new util.HashMap[String, String]()
          data.put(t.user, t.url)

          // http
          val indexRequest = Requests.indexRequest()
            .index("clicks")
            .source(data)

          //  post http

          requestIndexer.add(indexRequest)
        }
      }

    )

    elasticsearchSinkBuilder.setRestClientFactory(restClientFactory)
    val elasticsearchSink = elasticsearchSinkBuilder.build()

    stream.addSink(elasticsearchSink)

    env.execute()

  }
}

curl -u  elastic:***** "http://localhost:9200/_cat/indices?v"
health status index                                                        uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .internal.alerts-observability.logs.alerts-default-000001    v1ujNMbSS4-2AJw2lVOqbA   1   0          0            0       248b           248b
green  open   .internal.alerts-observability.uptime.alerts-default-000001  WM2cmqgqTByMvXaXy7Dyqw   1   0          0            0       248b           248b
green  open   .fleet-file-data-agent-000001                                33S-3jICTHiDak6b3bAOlQ   1   0          0            0       248b           248b
green  open   .fleet-files-agent-000001                                    T39TUhgUQw6bxCStMLUbhw   1   0          0            0       248b           248b
green  open   .internal.alerts-observability.slo.alerts-default-000001     uGMy8ddITzCP4ZlhRQDjMg   1   0          0            0       248b           248b
green  open   .fleet-filedelivery-meta-endpoint-000001                     paLT1iPGQKSE9RcNtEb7GA   1   0          0            0       248b           248b
green  open   .internal.alerts-observability.apm.alerts-default-000001     NGglFYPLTu-e9kDkMg1Pug   1   0          0            0       248b           248b
yellow open   movies                                                       9NH9_BNMSU-Au15fJz0M2Q   1   1          5            0      6.1kb          6.1kb
green  open   metrics-endpoint.metadata_current_default                    FXmfZmT3QregiMYslqOM1Q   1   0          0            0       248b           248b
green  open   .internal.alerts-observability.metrics.alerts-default-000001 6rRHeCKNTmW3kvy-ZY2csQ   1   0          0            0       248b           248b
green  open   .fleet-filedelivery-data-endpoint-000001                     6mll61pvQJyNlZgkHhGgIA   1   0          0            0       248b           248b
green  open   .fleet-files-endpoint-000001                                 mkvJEmzORJKI3sNnCY3rlw   1   0          0            0       248b           248b
yellow open   shakespeare                                                  irQyjHhSQZeRX3Q5d7xR1A   1   1     111396            0     17.6mb         17.6mb
yellow open   clicks                                                       Ph6MyY8YTY64Uw1z_EMQ8g   1   1          7            0     16.4kb         16.4kb
green  open   .fleet-file-data-endpoint-000001                             9YuhHn9NTXKJlxY6hWY_0Q   1   0          0            0       248b           248b
green  open   .internal.alerts-security.alerts-default-000001              FDQdk9PqR8u6DT8itNyKxA   1   0          0            0       248b           248b

curl -XGET -u elastic:**** http://localhost:9200/clicks/_search?pretty

{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 7,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "clicks",
        "_id" : "rcNrmI0B3N-OH7cCpNMS",
        "_score" : 1.0,
        "_source" : {
          "fei" : "./aws"
        }
      },
      {
        "_index" : "clicks",
        "_id" : "rsNrmI0B3N-OH7cCpNMS",
        "_score" : 1.0,
        "_source" : {
          "lisa" : "./aws"
        }
      },
      {
        "_index" : "clicks",
        "_id" : "r8NzmI0B3N-OH7cC1NPI",
        "_score" : 1.0,
        "_source" : {
          "fei" : "./aws"
        }
      },
      {
        "_index" : "clicks",
        "_id" : "sMNzmI0B3N-OH7cC1NPI",
        "_score" : 1.0,
        "_source" : {
          "lisa" : "./aws"
        }
      },
      {
        "_index" : "clicks",
        "_id" : "scN0mI0B3N-OH7cCUdNc",
        "_score" : 1.0,
        "_source" : {
          "fei" : "./aws"
        }
      },
      {
        "_index" : "clicks",
        "_id" : "ssN0mI0B3N-OH7cCUdNc",
        "_score" : 1.0,
        "_source" : {
          "lisa" : "./aws"
        }
      },
      {
        "_index" : "clicks",
        "_id" : "s8N0mI0B3N-OH7cCUdNc",
        "_score" : 1.0,
        "_source" : {
          "qiao" : "./aws"
        }
      }
    ]
  }
}