Данный Python скрипт создает события в PagerDuty используя APIv2.
За основу был взят следующий скрипт.
Для начала нужно создать "Routing Key", он же "Integration Key", не путать с "API Access Key", который можно использовать для любых API вызовов, нам же нужен только ключ от определенного сервиса.
Переходим в настройки сервиса, в моем случае он называется "AWS", и переходим к вкладке "Integrations"
Добавляем новую интеграцию — "Add a new integration"
Задаем имя и выбираем "Integration Type" -> "Use our API directly" -> "Events API v2" и получаем "Integration Key"
main.py:
import requests import json from typing import Any, Dict, List, Optional routing_key = 'abc123' api_url = 'https://events.pagerduty.com/v2/enqueue' def create_event( summary: str, severity: str, source: str = 'aws', action: str = 'trigger', dedup_key: Optional[str] = None, custom_details: Optional[Any] = None, group: Optional[str] = None, component: Optional[str] = None, class_type: Optional[str] = None, images: Optional[List[Any]] = None, links: Optional[List[Any]] = None ) -> Dict: payload = { "summary": summary, "severity": severity, "source": source, } if custom_details is not None: payload["custom_details"] = custom_details if component: payload["component"] = component if group: payload["group"] = group if class_type: payload["class"] = class_type actions = ('trigger', 'acknowledge', 'resolve') if action not in actions: raise ValueError("Event action must be one of: %s" % ', '.join(actions)) data = { "event_action": action, "routing_key": routing_key, "payload": payload, } if dedup_key: data["dedup_key"] = dedup_key elif action != 'trigger': raise ValueError( "The dedup_key property is required for event_action=%s events, and it must \ be a string." % action ) if images is not None: data["images"] = images if links is not None: data["links"] = links response = requests.post(api_url, json = data) print(response.content) def main(): create_event(summary = "Instance is down", severity = 'critical', custom_details = "Instance ID: i-12345") if __name__ == '__main__': main()
Замените значение переменной "routing_key" на значение "Integration Key"
В данном примере будет создано событие с уровнем "critical", заголовком "Instance is down" и в деталях будет указано "Instance ID: i-12345"
Так же скрипт можно использовать для создания событий на основе AWS CloudWatch Alarms, не используя AWS интеграцию в PagerDuty. Для этого создайте SNS топик, Lambda функцию и укажите SNS топик как источник для Lambda функции. После чего можно создавать CloudWatch Alarm и в качестве действия при срабатывании указываем SNS топик.
main.py (Lambda + SNS):
import requests import json from typing import Any, Dict, List, Optional routing_key = 'abc123' api_url = 'https://events.pagerduty.com/v2/enqueue' def get_message_info(): subject = event["Records"][0]["Sns"]["Subject"] message = event["Records"][0]["Sns"]["Message"] return subject, message def create_event( summary: str, severity: str, source: str = 'cloudwatch', action: str = 'trigger', dedup_key: Optional[str] = None, custom_details: Optional[Any] = None, group: Optional[str] = None, component: Optional[str] = None, class_type: Optional[str] = None, images: Optional[List[Any]] = None, links: Optional[List[Any]] = None ) -> Dict: payload = { "summary": summary, "severity": severity, "source": source, } if custom_details is not None: payload["custom_details"] = custom_details if component: payload["component"] = component if group: payload["group"] = group if class_type: payload["class"] = class_type actions = ('trigger', 'acknowledge', 'resolve') if action not in actions: raise ValueError("Event action must be one of: %s" % ', '.join(actions)) data = { "event_action": action, "routing_key": routing_key, "payload": payload, } if dedup_key: data["dedup_key"] = dedup_key elif action != 'trigger': raise ValueError( "The dedup_key property is required for event_action=%s events, and it must \ be a string." % action ) if images is not None: data["images"] = images if links is not None: data["links"] = links response = requests.post(api_url, json = data) print(response.content) def main(event, context): info = get_message_info() create_event(summary = info[0], severity='critical', custom_details = info[1]) if __name__ == '__main__': main()
Python пакет "requests" придется упаковывать в zip архив с Lambda функцией, так как в Lambda его нет. Для этого можно воспользоваться "virtualenv".