Enviando documentos via Bulk Api no ElasticSearch

Olá, já derrubaram algo em produção hoje?

Hoje eu vou falar sobre como importar documentos para o ElasticSearch utilizando a [Bulk Api] (https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.htm).

Para quem não conhece a Bulk Api serve para enviar ou remover documentos em massa no ElasticSearch, diminuindo assim a sobrecarga do sistema e melhorando a velocidade de indexação.

Agora falando sobre caso de uso, o meu é basicamente indexar arquivos de access logs que eu recebo da minha CDN na empresa em que trabalho, hoje os logs vem nesse formato:

{ "version": "v5", "time": "2020-09-30T21:25:04+00:00", "client": "0619h", "configuration": "1570140334", "host": "xxx.xxx.xxx", "request_time": "1.140", "request_method": "GET", "upstream_cache_status": "REVALIDATED", "status": "200", "proxy_status": "-", "upstream_status": "304", "upstream_bytes_received": "298", "scheme": "https", "request_uri": "/xxx", "sent_http_content_type": "image/svg+xml", "server_protocol": "HTTP/2.0", "request_length": "58", "bytes_sent": "1699", "upstream_connect_time": "0.140", "upstream_header_time": "1.139", "upstream_response_time": "1.139", "tcpinfo_rtt": "109355", "remote_addr": "000.000.000.000", "remote_port": "37040", "country": "Brazil", "state": "Rio de Janeiro", "ssl_protocol": "TLSv1.3", "ssl_cipher": "TLS_AES_256_GCM_SHA384", "http_user_agent": "Mozilla/5.0 (Linux; Android 9; Redmi Note 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.127 Mobile Safari/537.36", "http_referrer": "https://xxx.xxx.xxx/xxx.html", "requestPath": "/home1", "requestQuery": "", "headers": "-" }

Isto é, é um json por linha, todos os jsons estão "flat" em uma linha só, agora nós precisamos criar um cabeçalho para CADA LINHA DESSE JSON, isto mesmo, api de Bulk Import do ElasticSearch espera que o arquivo siga este formato:

{ "index" : { "_index" : "NOMEDOINDICE"}
{ "exemplo1": "arquivojson"...}

Ou seja, a primeira linha fica o cabeçalho identificando qual índice você quer colocar este documento e na linha seguinte o documento json em questão.

Para facilitar a minha vida eu criei uma função em shell script que aplica essa tag em todas as linhas de um arquivo onde cada linha é um objeto json:

function insert_tag() {
    while IFS=$'\n' read -r p || [ -n "$p" ]
    do
      printf '{ "index" : { "_index" : "NOMEDOINDICE"}\n'
      printf '%s\n' "$p"
    done < "$1"
}

Essa função recebe como parâmetro o nome do arquivo com os objetos json e joga em stdout o arquivo já processado e com os cabeçalhos.

$ insert_tag ex.json > ex_comtags.json

Após inserir a “tag” vamos enviar isto para o ElasticSearch (Assumindo que seu elasticsearch esteja acessível em localhost:9200, caso seu endereço seja diferente altere no curl a seguir):

$ curl -s -H "Content-Type: application/x-ndjson" -XPOST "localhost:9200/_bulk" --data-binary @ex_comtags.json

Devemos receber um json de retorno indicando se teve sucesso ou não ao inserir o documento:

    {
      "index": {
        "_index": "azion-01102020",
        "_type": "_doc",
        "_id": "T_PB5HQBQstMer5wsUf5",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 1,
          "successful": 1,
          "failed": 0
        },
        "_seq_no": 1110061,
        "_primary_term": 1,
        "status": 201
      }
    }

Preste atenção no objeto "_shards" e na propriedade successful (poderia ser status não é mesmo?) o valor deve ser 1, caso esteja diferente o retorno vai trazer o motivo pelo qual o seu documento não foi indexado.

Desta forma você consegue escrever scripts para automatizar a inserção de documentos sem sobrecarregar o seu servidor.

Caso tenham problemas ao fazer isso não hesitem em perguntar nos comentários, vou continuar a escrever bastante sobre o ElasticSearch então caso tenham dúvidas ou queiram saber algo é só mandar a sugestão.

Até mais,
Leandro Ferreira

Show Comments