Computer >> 컴퓨터 >  >> 프로그램 작성 >> 데이터 베이스

Logstash를 사용하여 CSV 데이터를 Elasticsearch로 로드

2015년 9월 10일 ObjectRocket.com/blog에서 최초 게시

새로운 Elasticsearch® 인스턴스가 있지만 검색하려는 모든 유용한 데이터가 CSV에 저장되어 있습니까? 파일?문제 없습니다. Logstash®를 사용하면 거의 모든 데이터를 Elasticsearch 인덱스에서 쉽게 검색할 수 있는 데이터로 변환할 수 있습니다.

Logstash를 사용하여 CSV 데이터를 Elasticsearch로 로드

먼저 이러한 예제를 사용하려면 Unix®와 유사한 일부 데이터와 환경이 필요합니다. Windows®는 약간의 조정으로 잘 작동합니다. 이 경우 Davis Vantage Pro2®에서 데이터를 내보내고 싶었습니다. 기상 관측소, .CSV 형식을 지정하고 새 색인을 만듭니다.

로컬 파일에 저장된 다음과 유사한 수백만 줄로 시작했습니다.

$ head -3 /home/erik/weather.csv
HumOut,TempIn,DewPoint,HumIn,WindDir,RainMonth,WindSpeed,RainDay,BatteryVolts,WindChill,Pressure,time,TempOut,WindSpeed10Min,RainRate
76,78.0,78.227017302825,44,109,2.0,2,0.0,1.236328125,90.87261657090625,29.543,2015-06-18T17:49:29Z,86.5,1,0.0
76,78.0,78.227017302825,44,107,2.0,2,0.0,1.236328125,90.87261657090625,29.543,2015-06-18T17:49:45Z,86.5,1,0.0
76,78.0,78.32406784157725,44,107,2.0,0,0.0,1.236328125,90.83340000000001,29.543,2015-06-18T17:50:00Z,86.59999999999999,1,0.0

참고: 이 실험이 작동하려면 데이터 소스가 하나 이상 있어야 합니다.

데이터가 있으면 시작할 수 있습니다. 먼저 Java 버전이 설치되어 있는지 확인하십시오.

$ java -version
openjdk version "1.8.0_51"

모든 자바 가상 머신 (JVM)은 OpenJDK®, Oracle® 등의 작업에 적합합니다.

$ curl -O https://download.elastic.co/logstash/logstash/logstash-1.5.4.tar.gz
$ tar xfz logstash-1.5.4.tar.gz
$ cd logstash-1.5.4
$ mkdir conf

이제 구성 파일을 만들 차례입니다.

먼저 input을 정의합니다. Logstash에 알려주는 섹션 데이터를 찾을 수 있는 위치:

input {
    file {
        path => "/home/erik/weather.csv"
        start_position => beginning

    }
}

이것은 Logstash에 알려줍니다. 파일의 시작 부분에서 로드할 위치와 로드하려는 위치를 지정합니다. 다음으로 필터가 필요합니다. Logstash에는 기본적으로 사용 가능한 필터 플러그인이 많이 있습니다. 이 예에서는 커플을 사용하여 데이터를 구문 분석합니다. 지금까지 Logstash는 파일의 데이터에 대해 아무 것도 알지 못합니다. 다양한 필드를 처리하는 방법에 대한 형식과 기타 세부 사항을 지정해야 합니다.

filter {
    csv {
        columns => [
          "HumOut",
          "TempIn",
          "DewPoint",
          "HumIn",
          "WindDir",
          "RainMonth",
          "WindSpeed",
          "RainDay",
          "BatteryVolts",
          "WindChill",
          "Pressure",
          "time",
          "TempOut",
          "WindSpeed10Min",
          "RainRate"
        ]
        separator => ","
        remove_field => ["message"]
        }
    date {
        match => ["time", "ISO8601"]
    }
    mutate {
        convert => ["TempOut", "float"]
    }
}

열은 자명하지만 여기에 더 자세히 설명되어 있습니다. 먼저, 예제는 message를 제거합니다. 필드는 전체 행을 포함하는 항목입니다. 특정 속성을 검색하기 때문에 필요하지 않습니다. 둘째, time 필드에는 ISO8601-formatted date가 포함됩니다. Elasticsearch 일반 문자열이 아니라는 것을 알고 있습니다. 마지막으로 mutate 함수를 사용하여 TempOut 값을 부동 소수점 숫자로 변환합니다.

이제 다음 코드를 사용하여 데이터를 수집하고 Elasticsearch에 저장한 후 구문 분석합니다.

output {
    elasticsearch {
        protocol => "https"
        host => ["iad1-20999-0.es.objectrocket.com:20999"]
        user => "erik"
        password => "mysupersecretpassword"
        action => "index"
        index => "eriks_weather_index"
    }
    stdout { }
}

마지막으로 호스트와 포트, 인증 데이터, 저장할 인덱스 이름을 설정합니다.

좋아, 발사하자. 작동하는 경우 다음과 유사해야 합니다.

$ bin/logstash -f conf/logstash.conf -v
Logstash startup completed

효과가 있었나요? Elasticsearch에 문의:

$ curl -u erik:mysupersecretpassword 'https://iad1-20999-0.es.objectrocket.com:20999/_cat/indices?v'
health status index               pri rep docs.count store.size pri.store.size
green  open   eriks_weather_index 5   1   294854     95.8mb     48.5mb

문서가 있으므로 하나를 문의하십시오.

$ curl -u erik:mysupersecretpassword 'https://iad1-20999-0.es.objectrocket.com:20999/eriks_weather_index/_search?q=TempOut:>75&pretty&terminate_after=1'

이것은 TempOut이 있는 문서를 찾도록 Elasticsearch에 지시합니다. 75보다 큼(Tempout:>75 ), 사람이 소비할 수 있도록 형식을 지정하고(예쁜) 샤드당 결과를 하나만 반환합니다(terminate_after=1). ). 다음과 같이 반환해야 합니다.

{
  "took" : 4,
  "timed_out" : false,
  "terminated_early" : true,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
     "total" : 5,
     "max_score" : 1.0,
       "hits" : [ {
    "_index" : "eriks_weather_index",
      "_type" : "logs",
      "_id" : "AU-yXZJIJb3HnhKvpdNC",
      "_score" : 1.0,
      "_source":{"@version":"1","@timestamp":"2015-06-22T10:24:23.000Z","host":"kibana","path":"/home/erik/weather.csv","HumOut":"86","TempIn":"79.7","DewPoint":"70.65179649787358","HumIn":"46","WindDir":"161","RainMonth":"2.7","WindSpeed":"0","RainDay":"0.36","BatteryVolts":"1.125","WindChill":"82.41464999999999","Pressure":"29.611","time":"2015-06-22T10:24:23Z","TempOut":75.1,"WindSpeed10Min":"0","RainRate":"0.0"}
    } ]
   } 
}

성공. Logstash는 흩어져 있는 모든 데이터를 Elasticsearch 내에서 쉽게 재생할 수 있는 것으로 변환하는 훌륭한 Swiss Army Knife입니다. 즐기세요!

www.rackspace.com을 방문하여 영업 채팅을 클릭합니다. cinversation을 시작합니다. 의견 사용 탭을 눌러 의견을 작성하거나 질문하십시오.

Rackspace Cloud 서비스 약관을 보려면 여기를 클릭하십시오.