PagerDuty — Python скрипт для создания событий

Данный Python скрипт создает события в PagerDuty используя APIv2.

За основу был взят следующий скрипт.

Для начала нужно создать "Routing Key", он же "Integration Key", не путать с "API Access Key", который можно использовать для любых API вызовов, нам же нужен только ключ от определенного сервиса.

Переходим в настройки сервиса, в моем случае он называется "AWS", и переходим к вкладке  "Integrations"

 

Добавляем новую интеграцию — "Add a new integration"

 

Задаем имя и выбираем "Integration Type" -> "Use our API directly" -> "Events API v2" и получаем "Integration Key"

main.py:

import requests
import json

from typing import Any, Dict, List, Optional

routing_key = 'abc123'
api_url = 'https://events.pagerduty.com/v2/enqueue'

def create_event(
    summary: str,
    severity: str,
    source: str = 'aws',
    action: str = 'trigger',
    dedup_key: Optional[str] = None,
    custom_details: Optional[Any] = None,
    group: Optional[str] = None,
    component: Optional[str] = None,
    class_type: Optional[str] = None,
    images: Optional[List[Any]] = None,
    links: Optional[List[Any]] = None
) -> Dict:
    payload = {
        "summary": summary,
        "severity": severity,
        "source": source,
    }
    if custom_details is not None:
        payload["custom_details"] = custom_details
    if component:
        payload["component"] = component
    if group:
        payload["group"] = group
    if class_type:
        payload["class"] = class_type

    actions = ('trigger', 'acknowledge', 'resolve')
    if action not in actions:
        raise ValueError("Event action must be one of: %s" % ', '.join(actions))
    data = {
        "event_action": action,
        "routing_key": routing_key,
        "payload": payload,
    }
    if dedup_key:
        data["dedup_key"] = dedup_key
    elif action != 'trigger':
        raise ValueError(
            "The dedup_key property is required for event_action=%s events, and it must \
            be a string."
            % action
        )
    if images is not None:
        data["images"] = images
    if links is not None:
        data["links"] = links

    response = requests.post(api_url, json = data)
    print(response.content)

def main():
    create_event(summary = "Instance is down", severity = 'critical', custom_details = "Instance ID: i-12345")
  
if __name__ == '__main__':
    main()

 

Замените значение переменной "routing_key" на значение "Integration Key"

В данном примере будет создано событие с уровнем "critical", заголовком "Instance is down" и в деталях будет указано "Instance ID: i-12345"

Так же скрипт можно использовать для создания событий на основе AWS CloudWatch Alarms, не используя AWS интеграцию в PagerDuty. Для этого создайте SNS топик, Lambda функцию и укажите SNS топик как источник для Lambda функции. После чего можно создавать CloudWatch Alarm и в качестве действия при срабатывании указываем SNS топик.

main.py (Lambda + SNS):

import requests
import json

from typing import Any, Dict, List, Optional

routing_key = 'abc123'
api_url = 'https://events.pagerduty.com/v2/enqueue'

def get_message_info():
    subject = event["Records"][0]["Sns"]["Subject"]
    message = event["Records"][0]["Sns"]["Message"]
    return subject, message

def create_event(
    summary: str,
    severity: str,
    source: str = 'cloudwatch',
    action: str = 'trigger',
    dedup_key: Optional[str] = None,
    custom_details: Optional[Any] = None,
    group: Optional[str] = None,
    component: Optional[str] = None,
    class_type: Optional[str] = None,
    images: Optional[List[Any]] = None,
    links: Optional[List[Any]] = None
) -> Dict:
    payload = {
        "summary": summary,
        "severity": severity,
        "source": source,
    }
    if custom_details is not None:
        payload["custom_details"] = custom_details
    if component:
        payload["component"] = component
    if group:
        payload["group"] = group
    if class_type:
        payload["class"] = class_type

    actions = ('trigger', 'acknowledge', 'resolve')
    if action not in actions:
        raise ValueError("Event action must be one of: %s" % ', '.join(actions))
    data = {
        "event_action": action,
        "routing_key": routing_key,
        "payload": payload,
    }
    if dedup_key:
        data["dedup_key"] = dedup_key
    elif action != 'trigger':
        raise ValueError(
            "The dedup_key property is required for event_action=%s events, and it must \
            be a string."
            % action
        )
    if images is not None:
        data["images"] = images
    if links is not None:
        data["links"] = links

    response = requests.post(api_url, json = data)
    print(response.content)

def main(event, context):
    info = get_message_info()
    create_event(summary = info[0], severity='critical', custom_details = info[1])
  
if __name__ == '__main__':
    main()

 

Python пакет "requests" придется упаковывать в zip архив с Lambda функцией, так как в Lambda его нет. Для этого можно воспользоваться "virtualenv".

0 0 vote
Рейтинг статьи

Метки: Метки

Подписаться
Уведомление о
guest
0 комментариев
Inline Feedbacks
View all comments