null

Профессиональное оповещение об ошибках в Apache NiFi

Ранее в моем блоге уже публиковалось несколько статей об Apache NiFi, сейчас же мы обратим внимание на то, как можно организовать процесс оповещения об ошибках, возникших в результате работы потока обработки данных. В приведенном фрагменте потока мы будем отправлять сообщения об ошибках по электронной почте, принимая в расчет несколько особенностей. Во-первых, мы не можем инициировать отправку письма на каждую ошибку, поскольку в нагруженных системах, где файлов много, ошибки могут быть массовым явлением (особенно, если какой-нибудь сервис в цепи не отдает данные по причине недоступности). Это приведет к огромному числу писем и засорению почтового ящика получателя. Во-вторых, по той же причине мы не можем отправлять информацию об ошибках мгновенно, необходим некоторый промежуток времени между отправками писем с набором ошибок. Таким образом, появляется необходимость каким-либо образом агрегировать информацию о возникших ошибках и отправлять ее единым сообщением в заданный промежуток времени (пусть будет каждый час). Покажем, как это можно сделать.

Допустим, есть поток данных, содержащий процессоры GenerateFlowFile, InvokeHTTP, SplitJson - создание файла потока, получение данных из стороннего источника, обработка данных соответственно. На каждом этапе, на котором мы хотим отловить ошибки, мы добавляем переход по отношению failure в процессоры ReplaceText, внутри которых мы описываем подробности проблемы.

В процессорах ReplaceText, используемых для указания подробностей ошибки, используем Replacement Strategy "Prepend", если хотим добавить текст перед Flow File'ом, сохранив его содержимое. В самом же тексте на замену, как пример, можно указать следующее:

____${now():format('yyyy-MM-dd_HH:mm:ss')}_____

An error code ${invokehttp.status.code} was received.

Response from API:
${invokehttp.response.body}

Exception: ${invokehttp.java.exception.class} - ${invokehttp.java.exception.message}

В примере выше первая строка служит "разделителем" ошибок. В конечном итоге несколько таких ошибок будут отправлены по почте в едином файле. Остальные строки служат для дополнительной информации и содержат в себе некоторые полезные атрибуты, связанные с ответом на HTTP-запрос (код ошибки, исключение, возникшее при работе процессора, и т.д.). Конкретное сообщение будет зависеть от конкретных нужд, потребностей и места, где ошибка произошла.

Далее от всех ReplaceText'ов в случае 'success' необходим переход к специальному выходу из группы (output port), который приведет нас в другую группу, из которой отправляется почта. В данном случае это сделано через воронки (funnel):

Теперь перейдем к группе, где происходит агрегация всех этих ошибок, их слияние в один файл:

Самым важным здесь является процессор MergeContent. В настройках процессора во вкладке Scheduling необходимо указать Run schedule - время между запусками процессора. В нашем случае это будет "60 min", чтобы процессор запускался ровно раз каждый час. В течение же этого часа будет накапливаться очередь с сообщениями об ошибках по типу тех, что указаны ранее. Когда наступит обозначенное время, процессор последовательно объединит всю очередь в один FlowFile, доступный для отправки по почте. Для этого в свойствах процессора достаточно изменить лишь "Maximum number of Bins", чтобы все ошибки помещались в один файл, остальные свойства можно оставить по умолчанию.

Далее идет опциональное обновление имени файла, если оно необходимо (атрибут filename). И в конечном итоге через процессор PutEmail письмо с информацией об ошибках отправляется адресату. Параметры SMTP-сервера и email адреса задаются индивидуально, но наша рекомендация - поменять в свойствах процессора "Attach File" с false, используемого по умолчанию, на true. В таком случае файл с ошибками, который может быть большим, будет отправлен не в теле письма, а вложением, что упростит дальнейшую работу.

Цель достигнута!

 

Вперед