content[-]
official link how to write a sink to ES
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/elasticsearch/ maven pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7</artifactId>
<version>1.16.3</version>
</dependency>
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.{ElasticsearchSink, RestClientFactory}
import org.apache.http.HttpHost
import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
import org.apache.http.impl.client.BasicCredentialsProvider
import org.elasticsearch.client.{Requests, RestClientBuilder}
import java.util
case class Event(user: String, url: String, timestamp: Long)
object FlinkToEsTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream : DataStream[Event] = env.fromElements(Event("fei", "./aws", 1000L),
Event("lisa", "./aws", 2000L),
Event("qiao", "./aws", 2000L),
)
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))
// Set up RestClientFactory with credentials
val restClientFactory = new RestClientFactory {
override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {
val credentialsProvider = new BasicCredentialsProvider()
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "*******")
)
restClientBuilder.setHttpClientConfigCallback(
httpClientConfigCallback => httpClientConfigCallback.setDefaultCredentialsProvider(credentialsProvider)
)
}
}
// Set up ElasticsearchSink
val elasticsearchSinkBuilder = new ElasticsearchSink.Builder[Event](
httpHosts,
new ElasticsearchSinkFunction[Event] {
override def process(t: Event, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
val data = new util.HashMap[String, String]()
data.put(t.user, t.url)
// http
val indexRequest = Requests.indexRequest()
.index("clicks")
.source(data)
// post http
requestIndexer.add(indexRequest)
}
}
)
elasticsearchSinkBuilder.setRestClientFactory(restClientFactory)
val elasticsearchSink = elasticsearchSinkBuilder.build()
stream.addSink(elasticsearchSink)
env.execute()
}
}
curl -u elastic:***** "http://localhost:9200/_cat/indices?v"
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open .internal.alerts-observability.logs.alerts-default-000001 v1ujNMbSS4-2AJw2lVOqbA 1 0 0 0 248b 248b
green open .internal.alerts-observability.uptime.alerts-default-000001 WM2cmqgqTByMvXaXy7Dyqw 1 0 0 0 248b 248b
green open .fleet-file-data-agent-000001 33S-3jICTHiDak6b3bAOlQ 1 0 0 0 248b 248b
green open .fleet-files-agent-000001 T39TUhgUQw6bxCStMLUbhw 1 0 0 0 248b 248b
green open .internal.alerts-observability.slo.alerts-default-000001 uGMy8ddITzCP4ZlhRQDjMg 1 0 0 0 248b 248b
green open .fleet-filedelivery-meta-endpoint-000001 paLT1iPGQKSE9RcNtEb7GA 1 0 0 0 248b 248b
green open .internal.alerts-observability.apm.alerts-default-000001 NGglFYPLTu-e9kDkMg1Pug 1 0 0 0 248b 248b
yellow open movies 9NH9_BNMSU-Au15fJz0M2Q 1 1 5 0 6.1kb 6.1kb
green open metrics-endpoint.metadata_current_default FXmfZmT3QregiMYslqOM1Q 1 0 0 0 248b 248b
green open .internal.alerts-observability.metrics.alerts-default-000001 6rRHeCKNTmW3kvy-ZY2csQ 1 0 0 0 248b 248b
green open .fleet-filedelivery-data-endpoint-000001 6mll61pvQJyNlZgkHhGgIA 1 0 0 0 248b 248b
green open .fleet-files-endpoint-000001 mkvJEmzORJKI3sNnCY3rlw 1 0 0 0 248b 248b
yellow open shakespeare irQyjHhSQZeRX3Q5d7xR1A 1 1 111396 0 17.6mb 17.6mb
yellow open clicks Ph6MyY8YTY64Uw1z_EMQ8g 1 1 7 0 16.4kb 16.4kb
green open .fleet-file-data-endpoint-000001 9YuhHn9NTXKJlxY6hWY_0Q 1 0 0 0 248b 248b
green open .internal.alerts-security.alerts-default-000001 FDQdk9PqR8u6DT8itNyKxA 1 0 0 0 248b 248b
curl -XGET -u elastic:**** http://localhost:9200/clicks/_search?pretty
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 7,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "clicks",
"_id" : "rcNrmI0B3N-OH7cCpNMS",
"_score" : 1.0,
"_source" : {
"fei" : "./aws"
}
},
{
"_index" : "clicks",
"_id" : "rsNrmI0B3N-OH7cCpNMS",
"_score" : 1.0,
"_source" : {
"lisa" : "./aws"
}
},
{
"_index" : "clicks",
"_id" : "r8NzmI0B3N-OH7cC1NPI",
"_score" : 1.0,
"_source" : {
"fei" : "./aws"
}
},
{
"_index" : "clicks",
"_id" : "sMNzmI0B3N-OH7cC1NPI",
"_score" : 1.0,
"_source" : {
"lisa" : "./aws"
}
},
{
"_index" : "clicks",
"_id" : "scN0mI0B3N-OH7cCUdNc",
"_score" : 1.0,
"_source" : {
"fei" : "./aws"
}
},
{
"_index" : "clicks",
"_id" : "ssN0mI0B3N-OH7cCUdNc",
"_score" : 1.0,
"_source" : {
"lisa" : "./aws"
}
},
{
"_index" : "clicks",
"_id" : "s8N0mI0B3N-OH7cCUdNc",
"_score" : 1.0,
"_source" : {
"qiao" : "./aws"
}
}
]
}
}