设置通知侦听器
May 11, 2023About 1 min
设置通知侦听器
本主题介绍如何为云眼特性标帜(Feature Flag)AB实验 Python SDK 设置和删除通知侦听器。
通知侦听器触发您在 SDK 中触发某些操作时定义的回调函数。
最常见的用例是将所有特性标帜(Feature Flag)决策的流发送到分析提供商或内部数据仓库,以将其与您拥有的有关用户的其他数据联接。
通知侦听器类型
有关通知侦听器类型和用例的更多信息,请参阅通知侦听器。
有关代码示例,请参阅以下部分。
添加和删除所有通知侦听器
下面的示例代码演示如何添加侦听器、删除侦听器、删除特定类型的所有侦听器(例如所有决策侦听器)以及删除所有侦听器。
Python
from eyeofcloud.helpers import enums
# Remove Notification Listener
eyeofcloud_client.notification_center.remove_notification_listener(notification_id)
# Remove all Notification Listeners
eyeofcloud_client.notification_center.clear_all_notification_listeners()
# Remove all Notification Listeners of a certain type
eyeofcloud_client.clear_notification_listeners(enums.NotificationTypes.DECISION);
设置每种类型的通知侦听器
下面的示例代码演示如何设置每种类型的通知侦听器。
Python
from eyeofcloud.helpers import enums
#import your third-party analytics integration here
#######################################
# SET UP DECISION NOTIFICATION LISTENER
#######################################
def on_decision(decision_type, user_id, attributes, decision_info):
# Add a DECISION Notification Listener for type FLAG
if decision_type == 'flag':
# Access information about feature flag, for example, key and enabled status
print(decision_info.get('flag_key'))
print(decision_info.get('enabled'))
print(decision_info.get('decision_event_dispatched'))
#Send data to analytics provider here
notification_id = eyeofcloud_client.notification_center.add_notification_listener(
enums.NotificationTypes.DECISION, on_decision)
#######################################
# SET UP LOG EVENT NOTIFICATION LISTENER
#######################################
def on_log_event(logEvent):
#process the logEvent object here (send to analytics provider, audit/inspect data)
eyeofcloud_client.notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, on_log_event)
#######################################
# SET UP EYEOFCLOUD CONFIG NOTIFICATION LISTENER
#######################################
# listen to EYEOFCLOUD_CONFIG_UPDATE to get updated data
def on_config_update_listener(*args):
config = eyeofcloud_client.get_eyeofcloud_config()
eyeofcloud_client.notification_center.add_notification_listener(
enums.NotificationTypes.EYEOFCLOUD_CONFIG_UPDATE, on_config_update_listener)
#######################################
# SET UP TRACK LISTENER
#######################################
def on_track(event):
#process the event here (send to analytics provider, audit/inspect data)
eyeofcloud_client.notification_center.add_notification_listener(
enums.NotificationTypes.TRACK, on_track
)