Elasticsearch 5의 가장 멋진 새 기능 중 하나는 일부 Logstash 스타일 처리를 Elasticsearch 클러스터에 추가하는 수집 노드로, 이를 위해 다른 서비스 및/또는 인프라 없이 데이터를 인덱싱하기 전에 변환할 수 있습니다. 얼마 전에 Logstash로 csv 파일을 구문 분석하는 방법에 대한 간단한 블로그를 게시했는데 비교를 위해 해당 버전의 수집 파이프라인을 제공하고 싶습니다.
여기에서 보여드릴 것은 Filebeat를 사용하여 데이터를 수집 파이프라인으로 보내고, 색인을 생성하고, Kibana로 시각화하는 예입니다.
데이터
무료 데이터에 대한 훌륭한 소스가 많이 있지만 ObjectRocket의 우리 대부분은 텍사스 주 오스틴에 있기 때문에 data.austintexas.gov의 일부 데이터를 사용할 것입니다. 식당 검사 데이터 세트는 실제 사례를 제공하기에 충분한 관련 정보가 있는 적절한 크기의 데이터 세트입니다.
다음은 데이터 구조에 대한 아이디어를 제공하기 위해 이 데이터 세트의 몇 줄입니다.
Restaurant Name,Zip Code,Inspection Date,Score,Address,Facility ID,Process Description
Westminster Manor,78731,07/21/2015,96,"4100 JACKSON AVE
AUSTIN, TX 78731
(30.314499, -97.755166)",2800365,Routine Inspection
Wieland Elementary,78660,10/02/2014,100,"900 TUDOR HOUSE RD
AUSTIN, TX 78660
(30.422862, -97.640183)",10051637,Routine Inspection
DOH... 이것은 항목 케이스당 친절하고 친절하며 한 줄로 표시되지는 않겠지만 괜찮습니다. 곧 보게 되겠지만 Filebeat에는 여러 줄 항목을 처리하고 데이터에 묻힌 줄 바꿈을 해결하는 기능이 내장되어 있습니다.
편집 참고 사항:"장애"가 거의 없는 멋진 간단한 예제를 계획했지만 결국 Elastic Stack이 이러한 시나리오를 해결하기 위해 제공하는 도구를 보는 것이 흥미로울 수 있다고 생각했습니다.
Filebeat 설정
첫 번째 단계는 Filebeat가 Elasticsearch 클러스터로 데이터를 배송할 준비를 하는 것입니다. Filebeat를 다운로드하고(ES 클러스터와 동일한 버전 사용) 압축을 풀면 포함된 filebeat.yml 구성 파일을 통해 설정하는 것이 매우 간단합니다. 이 시나리오의 경우 사용 중인 구성은 다음과 같습니다.
filebeat.prospectors:
- input_type: log
paths:
- /Path/To/logs/*.csv
# Ignore the first line with column headings
exclude_lines: ["^Restaurant Name,"]
# Identifies the last two columns as the end of an entry and then prepends the previous lines to it
multiline.pattern: ',\d+,[^\",]+$'
multiline.negate: true
multiline.match: before
#================================ Outputs =====================================
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["https://dfw-xxxxx-0.es.objectrocket.com:xxxxx", "https://dfw-xxxxx-1.es.objectrocket.com:xxxxx", "https://dfw-xxxxx-2.es.objectrocket.com:xxxxx", "https://dfw-xxxxx-3.es.objectrocket.com:xxxxx"]
pipeline: "inspectioncsvs"
# Optional protocol and basic auth credentials.
username: "esuser"
password: "supersecretpassword"
여기에서는 모든 것이 매우 간단합니다. 입력 파일을 가져올 위치와 방법을 지정하는 섹션과 데이터를 전달할 위치를 지정하는 섹션이 있습니다. 특별히 언급할 부분은 여러 줄 비트와 Elasticsearch 구성 부분입니다.
이 데이터 세트의 형식은 매우 엄격하지 않고 큰 따옴표와 여러 줄 바꿈을 일관되게 사용하지 않으므로 숫자 ID와 검사 유형으로 구성된 항목의 끝을 찾는 것이 가장 좋습니다. 많은 변형이나 큰따옴표/줄 바꿈 없이. 거기에서 Filebeat는 일치하지 않는 줄을 대기열에 넣고 패턴과 일치하는 마지막 줄 앞에 추가합니다. 데이터가 더 깨끗하고 항목 형식당 간단한 줄만 사용하는 경우 여러 줄 설정을 거의 무시할 수 있습니다.
Elasticsearch 출력 섹션을 보면 파이프라인:지시문과 함께 사용하려는 파이프라인 이름이 약간 추가된 표준 Elasticsearch 설정입니다. ObjectRocket 서비스를 사용하는 경우 UI의 "연결" 탭에서 출력 스니펫을 가져올 수 있습니다. 여기에는 모든 올바른 호스트가 미리 채워져 있으며 파이프라인 라인을 추가하고 사용자와 비밀번호를 입력하기만 하면 됩니다. . 또한 클러스터의 ACL에 아직 추가하지 않은 경우 시스템 IP가 추가되었는지 확인하십시오.
수집 파이프라인 생성
이제 입력 데이터와 Filebeat를 사용할 준비가 되었으므로 수집 파이프라인을 만들고 조정할 수 있습니다. 파이프라인이 수행해야 하는 주요 작업은 다음과 같습니다.
- csv 콘텐츠를 올바른 필드로 분할
- 검사 점수를 정수로 변환
@timestamp
설정 필드- 다른 데이터 형식 정리
다음은 이 모든 작업을 수행할 수 있는 파이프라인입니다.
PUT _ingest/pipeline/inspectioncsvs
{
"description" : "Convert Restaurant inspections csv data to indexed data",
"processors" : [
{
"grok": {
"field": "message",
"patterns": ["%{REST_NAME:RestaurantName},%{REST_ZIP:ZipCode},%{MONTHNUM2:InspectionMonth}/%{MONTHDAY:InspectionDay}/%{YEAR:InspectionYear},%{NUMBER:Score},\"%{DATA:StreetAddress}\n%{DATA:City},?\\s+%{WORD:State}\\s*%{NUMBER:ZipCode2}\\s*\n\\(?%{DATA:Location}\\)?\",%{NUMBER:FacilityID},%{DATA:InspectionType}$"],
"pattern_definitions": {
"REST_NAME": "%{DATA}|%{QUOTEDSTRING}",
"REST_ZIP": "%{QUOTEDSTRING}|%{NUMBER}"
}
}
},
{
"grok": {
"field": "ZipCode",
"patterns": [".*%{ZIP:ZipCode}\"?$"],
"pattern_definitions": {
"ZIP": "\\d{5}"
}
}
},
{
"convert": {
"field" : "Score",
"type": "integer"
}
},
{
"set": {
"field" : "@timestamp",
"value" : "//"
}
},
{
"date" : {
"field" : "@timestamp",
"formats" : ["yyyy/MM/dd"]
}
}
],
"on_failure" : [
{
"set" : {
"field" : "error",
"value" : " - Error processing message - "
}
}
]
}
Logstash와 달리 수집 파이프라인에는 (이 글을 쓰는 시점에) csv 프로세서/플러그인이 없으므로 csv를 직접 변환해야 합니다. 각 행에는 몇 개의 열만 있기 때문에 무거운 작업을 수행하기 위해 grok 프로세서를 사용했습니다. 더 많은 열이 있는 데이터의 경우 grok 프로세서가 매우 복잡해질 수 있으므로 또 다른 옵션은 분할 프로세서와 간편한 스크립팅을 사용하여 보다 반복적인 방식으로 라인을 처리하는 것입니다. 또한 두 번째 grok 프로세서는 이 데이터 세트에 우편번호가 입력되는 두 가지 다른 방법을 처리하기 위한 것입니다.
디버그 목적으로 모든 오류를 포착하고 실패한 프로세서 유형과 파이프라인을 중단시킨 메시지를 인쇄하는 일반적인 on_failure 섹션을 포함했습니다. 이것은 디버그 방법을 더 쉽게 만듭니다. 오류가 설정된 모든 문서에 대한 색인을 쿼리한 다음 시뮬레이션 API로 디버그할 수 있습니다. 지금 더 자세히...
파이프라인 테스트
수집 파이프라인을 구성했으므로 이제 시뮬레이션 API로 테스트하고 실행해 보겠습니다. 먼저 샘플 문서가 필요합니다. 몇 가지 방법으로 이 작업을 수행할 수 있습니다. 파이프라인 설정 없이 Filebeat를 실행한 다음 Elasticsearch에서 처리되지 않은 문서를 가져오거나 Elasticsearch 섹션을 주석 처리하고 yml 파일에 다음을 추가하여 콘솔 출력이 활성화된 상태에서 Filebeat를 실행할 수 있습니다.
output.console:
pretty: true
다음은 내 환경에서 가져온 샘플 문서입니다.
POST _ingest/pipeline/inspectioncsvs/_simulate
{
"docs" : [
{
"_index": "inspections",
"_type": "log",
"_id": "AVpsUYR_du9kwoEnKsSA",
"_score": 1,
"_source": {
"@timestamp": "2017-03-31T18:22:25.981Z",
"beat": {
"hostname": "systemx",
"name": "RestReviews",
"version": "5.1.1"
},
"input_type": "log",
"message": "Wieland Elementary,78660,10/02/2014,100,\"900 TUDOR HOUSE RD\nAUSTIN, TX 78660\n(30.422862, -97.640183)\",10051637,Routine Inspection",
"offset": 2109798,
"source": "/Path/to/my/logs/Restaurant_Inspection_Scores.csv",
"tags": [
"debug",
"reviews"
],
"type": "log"
}
}
]
}
그리고 응답(설정하려는 필드로 줄였습니다):
{
"docs": [
{
"doc": {
"_id": "AVpsUYR_du9kwoEnKsSA",
"_type": "log",
"_index": "inspections",
"_source": {
"InspectionType": "Routine Inspection",
"ZipCode": "78660",
"InspectionMonth": "10",
"City": "AUSTIN",
"message": "Wieland Elementary,78660,10/02/2014,100,\"900 TUDOR HOUSE RD\nAUSTIN, TX 78660\n(30.422862, -97.640183)\",10051637,Routine Inspection",
"RestaurantName": "Wieland Elementary",
"FacilityID": "10051637",
"Score": 100,
"StreetAddress": "900 TUDOR HOUSE RD",
"State": "TX",
"InspectionDay": "02",
"InspectionYear": "2014",
"ZipCode2": "78660",
"Location": "30.422862, -97.640183"
},
"_ingest": {
"timestamp": "2017-03-31T20:36:59.574+0000"
}
}
}
]
}
파이프라인은 확실히 성공했지만 가장 중요한 것은 모든 데이터가 올바른 위치에 있는 것처럼 보입니다.
파일비트 실행
Filebeat를 실행하기 전에 마지막으로 한 가지 작업을 수행합니다. 수집 파이프라인에 익숙해지기를 원하는 경우 이 부분은 완전히 선택 사항이지만 grok 프로세서에서 Geo-point로 설정한 Location 필드를 사용하려면 filebeat에 매핑을 추가해야 합니다. 속성 섹션에 다음을 추가하여 template.json 파일:
"Location": {
"type": "geo_point"
},
이제 문제가 해결되었으므로 ./filebeat -e -c filebeat.yml -d "elasticsearch"를 실행하여 Filebeat를 실행할 수 있습니다.
데이터 사용
GET /filebeat-*/_count
{}
{
"count": 25081,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
}
}
좋은 징조입니다! 오류가 있는지 살펴보겠습니다.
GET /filebeat-*/_search
{
"query": {
"exists" : { "field" : "error" }
}
}
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 0,
"max_score": null,
"hits": []
}
}
또 다른 좋은 징조입니다!
이제 Kibana에서 데이터를 시각화하고 표시할 준비가 되었습니다. 다른 시간에 Kibana 대시보드를 만드는 과정을 살펴볼 수 있지만 날짜, 레스토랑 이름, 점수 및 위치가 있다는 점을 감안할 때 멋진 시각화를 만들 수 있는 충분한 자유가 있습니다.
최종 메모
다시 한 번, 수집 파이프라인은 매우 강력하며 변환을 매우 쉽게 처리할 수 있습니다. 파이프라인의 어딘가에 Logstash를 요구하지 않고 모든 처리를 Elasticsearch로 이동하고 호스트에서 경량 Beat만 사용할 수 있습니다. 그러나 Logstash와 비교하여 수집 노드에는 여전히 약간의 격차가 있습니다. 예를 들어 수집 파이프라인에서 사용할 수 있는 프로세서의 수는 여전히 제한되어 있으므로 CSV 구문 분석과 같은 간단한 작업은 Logstash에서만큼 쉽지 않습니다. Elasticsearch 팀은 정기적으로 새로운 프로세서를 출시하는 것 같으므로 차이점 목록이 계속 줄어들기를 바랍니다.