您现在的位置是:主页 > news > wordpress siren/aso优化服务站

wordpress siren/aso优化服务站

admin2025/5/21 15:06:32news

简介wordpress siren,aso优化服务站,东莞专业网站营销,东莞南城网站开发公司1. Janus启动及线程 Created with Raphal 2.2.0开始......启动Session处理线程启动请求处理线程创建异步请求处理线程池......结束1.1 线程处理代码 主入口main函数位于 janus.c 文件中,下面为线程部分: 其中:需要特别注意~~janus_sessions…

wordpress siren,aso优化服务站,东莞专业网站营销,东莞南城网站开发公司1. Janus启动及线程 Created with Raphal 2.2.0开始......启动Session处理线程启动请求处理线程创建异步请求处理线程池......结束1.1 线程处理代码 主入口main函数位于 janus.c 文件中,下面为线程部分: 其中:需要特别注意~~janus_sessions…

1. Janus启动及线程

Created with Raphaël 2.2.0开始......启动Session处理线程启动请求处理线程创建异步请求处理线程池......结束

1.1 线程处理代码

主入口main函数位于 janus.c 文件中,下面为线程部分:
在这里插入图片描述
其中:需要特别注意~~janus_sessions_watchdog~~、janus_transport_requestsjanus_transport_task几个处理函数

1.2 Session处理函数janus_sessions_watchdog(非本文重点)

在这里插入图片描述

1.3 请求处理函数janus_transport_requests

在这里插入图片描述

1.4 异步请求处理函数janus_transport_task

在这里插入图片描述

2. Janus启动及组件

Created with Raphaël 2.2.0开始......加载并初始化所有EventHandlers加载并初始化所有Plugins加载并初始化所有Transports......结束

2.1 组件回调处理

2.1.1 Plugin回调处理

在这里插入图片描述

2.1.2 Transport回调处理

在这里插入图片描述

2.2 EventHandlers处理代码

在这里插入图片描述
详细代码:

	/* Load event handlers */const char *path = NULL;DIR *dir = NULL;/* Event handlers are disabled by default, though: they need to be enabled in the configuration */item = janus_config_get(config, config_events, janus_config_type_item, "broadcast");gboolean enable_events = FALSE;if(item && item->value)enable_events = janus_is_true(item->value);if(!enable_events) {JANUS_LOG(LOG_WARN, "Event handlers support disabled\n");} else {gchar **disabled_eventhandlers = NULL;path = EVENTDIR;item = janus_config_get(config, config_general, janus_config_type_item, "events_folder");if(item && item->value)path = (char *)item->value;JANUS_LOG(LOG_INFO, "Event handler plugins folder: %s\n", path);dir = opendir(path);if(!dir) {/* Not really fatal, we don't care and go on anyway: event handlers are not fundamental */JANUS_LOG(LOG_FATAL, "\tCouldn't access event handler plugins folder...\n");} else {item = janus_config_get(config, config_events, janus_config_type_item, "stats_period");if(item && item->value) {/* Check if we need to use a larger period for pushing statistics to event handlers */int period = atoi(item->value);if(period < 0) {JANUS_LOG(LOG_WARN, "Invalid event handlers statistics period, using default value (1 second)\n");} else if(period == 0) {janus_ice_set_event_stats_period(0);JANUS_LOG(LOG_WARN, "Disabling event handlers statistics period, no media statistics will be pushed to event handlers\n");} else {janus_ice_set_event_stats_period(period);JANUS_LOG(LOG_INFO, "Setting event handlers statistics period to %d seconds\n", period);}}/* Any event handlers to ignore? */item = janus_config_get(config, config_events, janus_config_type_item, "disable");if(item && item->value)disabled_eventhandlers = g_strsplit(item->value, ",", -1);/* Open the shared objects */struct dirent *eventent = NULL;char eventpath[1024];while((eventent = readdir(dir))) {int len = strlen(eventent->d_name);if (len < 4) {continue;}if (strcasecmp(eventent->d_name+len-strlen(SHLIB_EXT), SHLIB_EXT)) {continue;}/* Check if this event handler has been disabled in the configuration file */if(disabled_eventhandlers != NULL) {gchar *index = disabled_eventhandlers[0];if(index != NULL) {int i=0;gboolean skip = FALSE;while(index != NULL) {while(isspace(*index))index++;if(strlen(index) && !strcmp(index, eventent->d_name)) {JANUS_LOG(LOG_WARN, "Event handler plugin '%s' has been disabled, skipping...\n", eventent->d_name);skip = TRUE;break;}i++;index = disabled_eventhandlers[i];}if(skip)continue;}}JANUS_LOG(LOG_INFO, "Loading event handler plugin '%s'...\n", eventent->d_name);memset(eventpath, 0, 1024);g_snprintf(eventpath, 1024, "%s/%s", path, eventent->d_name);void *event = dlopen(eventpath, RTLD_NOW | RTLD_GLOBAL);if (!event) {JANUS_LOG(LOG_ERR, "\tCouldn't load event handler plugin '%s': %s\n", eventent->d_name, dlerror());} else {create_e *create = (create_e*) dlsym(event, "create");const char *dlsym_error = dlerror();if (dlsym_error) {JANUS_LOG(LOG_ERR, "\tCouldn't load symbol 'create': %s\n", dlsym_error);continue;}janus_eventhandler *janus_eventhandler = create();if(!janus_eventhandler) {JANUS_LOG(LOG_ERR, "\tCouldn't use function 'create'...\n");continue;}/* Are all the mandatory methods and callbacks implemented? */if(!janus_eventhandler->init || !janus_eventhandler->destroy ||!janus_eventhandler->get_api_compatibility ||!janus_eventhandler->get_version ||!janus_eventhandler->get_version_string ||!janus_eventhandler->get_description ||!janus_eventhandler->get_package ||!janus_eventhandler->get_name ||!janus_eventhandler->incoming_event) {JANUS_LOG(LOG_ERR, "\tMissing some mandatory methods/callbacks, skipping this event handler plugin...\n");continue;}if(janus_eventhandler->get_api_compatibility() < JANUS_EVENTHANDLER_API_VERSION) {JANUS_LOG(LOG_ERR, "The '%s' event handler plugin was compiled against an older version of the API (%d < %d), skipping it: update it to enable it again\n",janus_eventhandler->get_package(), janus_eventhandler->get_api_compatibility(), JANUS_EVENTHANDLER_API_VERSION);continue;}janus_eventhandler->init(configs_folder);JANUS_LOG(LOG_VERB, "\tVersion: %d (%s)\n", janus_eventhandler->get_version(), janus_eventhandler->get_version_string());JANUS_LOG(LOG_VERB, "\t   [%s] %s\n", janus_eventhandler->get_package(), janus_eventhandler->get_name());JANUS_LOG(LOG_VERB, "\t   %s\n", janus_eventhandler->get_description());JANUS_LOG(LOG_VERB, "\t   Plugin API version: %d\n", janus_eventhandler->get_api_compatibility());JANUS_LOG(LOG_VERB, "\t   Subscriptions:");if(janus_eventhandler->events_mask == 0) {JANUS_LOG(LOG_VERB, " none");} else {if(janus_flags_is_set(&janus_eventhandler->events_mask, JANUS_EVENT_TYPE_SESSION))JANUS_LOG(LOG_VERB, " sessions");if(janus_flags_is_set(&janus_eventhandler->events_mask, JANUS_EVENT_TYPE_HANDLE))JANUS_LOG(LOG_VERB, " handles");if(janus_flags_is_set(&janus_eventhandler->events_mask, JANUS_EVENT_TYPE_JSEP))JANUS_LOG(LOG_VERB, " jsep");if(janus_flags_is_set(&janus_eventhandler->events_mask, JANUS_EVENT_TYPE_WEBRTC))JANUS_LOG(LOG_VERB, " webrtc");if(janus_flags_is_set(&janus_eventhandler->events_mask, JANUS_EVENT_TYPE_MEDIA))JANUS_LOG(LOG_VERB, " media");if(janus_flags_is_set(&janus_eventhandler->events_mask, JANUS_EVENT_TYPE_PLUGIN))JANUS_LOG(LOG_VERB, " plugins");if(janus_flags_is_set(&janus_eventhandler->events_mask, JANUS_EVENT_TYPE_TRANSPORT))JANUS_LOG(LOG_VERB, " transports");}JANUS_LOG(LOG_VERB, "\n");if(eventhandlers == NULL)eventhandlers = g_hash_table_new(g_str_hash, g_str_equal);g_hash_table_insert(eventhandlers, (gpointer)janus_eventhandler->get_package(), janus_eventhandler);if(eventhandlers_so == NULL)eventhandlers_so = g_hash_table_new(g_str_hash, g_str_equal);g_hash_table_insert(eventhandlers_so, (gpointer)janus_eventhandler->get_package(), event);}}}closedir(dir);if(disabled_eventhandlers != NULL)g_strfreev(disabled_eventhandlers);disabled_eventhandlers = NULL;/* Initialize the event broadcaster */if(janus_events_init(enable_events, (server_name ? server_name : (char *)JANUS_SERVER_NAME), eventhandlers) < 0) {JANUS_LOG(LOG_FATAL, "Error initializing the Event handlers mechanism...\n");exit(1);}}

其中,janus_events_init(enable_events, (server_name ? server_name : (char *)JANUS_SERVER_NAME), eventhandlers)初始化启动了全局的事件处理线程 janus_events_thread
在这里插入图片描述

2.3 Plugins处理代码

在这里插入图片描述

详细代码:

	/* Load plugins */path = PLUGINDIR;item = janus_config_get(config, config_general, janus_config_type_item, "plugins_folder");if(item && item->value)path = (char *)item->value;JANUS_LOG(LOG_INFO, "Plugins folder: %s\n", path);dir = opendir(path);if(!dir) {JANUS_LOG(LOG_FATAL, "\tCouldn't access plugins folder...\n");exit(1);}/* Any plugin to ignore? */gchar **disabled_plugins = NULL;item = janus_config_get(config, config_plugins, janus_config_type_item, "disable");if(item && item->value)disabled_plugins = g_strsplit(item->value, ",", -1);/* Open the shared objects */struct dirent *pluginent = NULL;char pluginpath[1024];while((pluginent = readdir(dir))) {int len = strlen(pluginent->d_name);if (len < 4) {continue;}if (strcasecmp(pluginent->d_name+len-strlen(SHLIB_EXT), SHLIB_EXT)) {continue;}/* Check if this plugins has been disabled in the configuration file */if(disabled_plugins != NULL) {gchar *index = disabled_plugins[0];if(index != NULL) {int i=0;gboolean skip = FALSE;while(index != NULL) {while(isspace(*index))index++;if(strlen(index) && !strcmp(index, pluginent->d_name)) {JANUS_LOG(LOG_WARN, "Plugin '%s' has been disabled, skipping...\n", pluginent->d_name);skip = TRUE;break;}i++;index = disabled_plugins[i];}if(skip)continue;}}JANUS_LOG(LOG_INFO, "Loading plugin '%s'...\n", pluginent->d_name);memset(pluginpath, 0, 1024);g_snprintf(pluginpath, 1024, "%s/%s", path, pluginent->d_name);void *plugin = dlopen(pluginpath, RTLD_NOW | RTLD_GLOBAL);if (!plugin) {JANUS_LOG(LOG_ERR, "\tCouldn't load plugin '%s': %s\n", pluginent->d_name, dlerror());} else {create_p *create = (create_p*) dlsym(plugin, "create");const char *dlsym_error = dlerror();if (dlsym_error) {JANUS_LOG(LOG_ERR, "\tCouldn't load symbol 'create': %s\n", dlsym_error);continue;}janus_plugin *janus_plugin = create();if(!janus_plugin) {JANUS_LOG(LOG_ERR, "\tCouldn't use function 'create'...\n");continue;}/* Are all the mandatory methods and callbacks implemented? */if(!janus_plugin->init || !janus_plugin->destroy ||!janus_plugin->get_api_compatibility ||!janus_plugin->get_version ||!janus_plugin->get_version_string ||!janus_plugin->get_description ||!janus_plugin->get_package ||!janus_plugin->get_name ||!janus_plugin->create_session ||!janus_plugin->query_session ||!janus_plugin->destroy_session ||!janus_plugin->handle_message ||!janus_plugin->setup_media ||!janus_plugin->hangup_media) {JANUS_LOG(LOG_ERR, "\tMissing some mandatory methods/callbacks, skipping this plugin...\n");continue;}if(janus_plugin->get_api_compatibility() < JANUS_PLUGIN_API_VERSION) {JANUS_LOG(LOG_ERR, "The '%s' plugin was compiled against an older version of the API (%d < %d), skipping it: update it to enable it again\n",janus_plugin->get_package(), janus_plugin->get_api_compatibility(), JANUS_PLUGIN_API_VERSION);continue;}if(janus_plugin->init(&janus_handler_plugin, configs_folder) < 0) {JANUS_LOG(LOG_WARN, "The '%s' plugin could not be initialized\n", janus_plugin->get_package());dlclose(plugin);continue;}JANUS_LOG(LOG_VERB, "\tVersion: %d (%s)\n", janus_plugin->get_version(), janus_plugin->get_version_string());JANUS_LOG(LOG_VERB, "\t   [%s] %s\n", janus_plugin->get_package(), janus_plugin->get_name());JANUS_LOG(LOG_VERB, "\t   %s\n", janus_plugin->get_description());JANUS_LOG(LOG_VERB, "\t   Plugin API version: %d\n", janus_plugin->get_api_compatibility());if(!janus_plugin->incoming_rtp && !janus_plugin->incoming_rtcp && !janus_plugin->incoming_data) {JANUS_LOG(LOG_WARN, "The '%s' plugin doesn't implement any callback for RTP/RTCP/data... is this on purpose?\n",janus_plugin->get_package());}if(!janus_plugin->incoming_rtp && !janus_plugin->incoming_rtcp && janus_plugin->incoming_data) {JANUS_LOG(LOG_WARN, "The '%s' plugin will only handle data channels (no RTP/RTCP)... is this on purpose?\n",janus_plugin->get_package());}if(plugins == NULL)plugins = g_hash_table_new(g_str_hash, g_str_equal);g_hash_table_insert(plugins, (gpointer)janus_plugin->get_package(), janus_plugin);if(plugins_so == NULL)plugins_so = g_hash_table_new(g_str_hash, g_str_equal);g_hash_table_insert(plugins_so, (gpointer)janus_plugin->get_package(), plugin);}}closedir(dir);if(disabled_plugins != NULL)g_strfreev(disabled_plugins);disabled_plugins = NULL;

其中,janus_plugin->init(&janus_handler_plugin, configs_folder)初始化设置了插件的回调处理 janus_handler_plugin
在这里插入图片描述

2.4 Transports处理代码

在这里插入图片描述

详细代码:

	/* Load transports */gboolean janus_api_enabled = FALSE, admin_api_enabled = FALSE;path = TRANSPORTDIR;item = janus_config_get(config, config_general, janus_config_type_item, "transports_folder");if(item && item->value)path = (char *)item->value;JANUS_LOG(LOG_INFO, "Transport plugins folder: %s\n", path);dir = opendir(path);if(!dir) {JANUS_LOG(LOG_FATAL, "\tCouldn't access transport plugins folder...\n");exit(1);}/* Any transport to ignore? */gchar **disabled_transports = NULL;item = janus_config_get(config, config_transports, janus_config_type_item, "disable");if(item && item->value)disabled_transports = g_strsplit(item->value, ",", -1);/* Open the shared objects */struct dirent *transportent = NULL;char transportpath[1024];while((transportent = readdir(dir))) {int len = strlen(transportent->d_name);if (len < 4) {continue;}if (strcasecmp(transportent->d_name+len-strlen(SHLIB_EXT), SHLIB_EXT)) {continue;}/* Check if this transports has been disabled in the configuration file */if(disabled_transports != NULL) {gchar *index = disabled_transports[0];if(index != NULL) {int i=0;gboolean skip = FALSE;while(index != NULL) {while(isspace(*index))index++;if(strlen(index) && !strcmp(index, transportent->d_name)) {JANUS_LOG(LOG_WARN, "Transport plugin '%s' has been disabled, skipping...\n", transportent->d_name);skip = TRUE;break;}i++;index = disabled_transports[i];}if(skip)continue;}}JANUS_LOG(LOG_INFO, "Loading transport plugin '%s'...\n", transportent->d_name);memset(transportpath, 0, 1024);g_snprintf(transportpath, 1024, "%s/%s", path, transportent->d_name);void *transport = dlopen(transportpath, RTLD_NOW | RTLD_GLOBAL);if (!transport) {JANUS_LOG(LOG_ERR, "\tCouldn't load transport plugin '%s': %s\n", transportent->d_name, dlerror());} else {create_t *create = (create_t*) dlsym(transport, "create");const char *dlsym_error = dlerror();if (dlsym_error) {JANUS_LOG(LOG_ERR, "\tCouldn't load symbol 'create': %s\n", dlsym_error);continue;}janus_transport *janus_transport = create();if(!janus_transport) {JANUS_LOG(LOG_ERR, "\tCouldn't use function 'create'...\n");continue;}/* Are all the mandatory methods and callbacks implemented? */if(!janus_transport->init || !janus_transport->destroy ||!janus_transport->get_api_compatibility ||!janus_transport->get_version ||!janus_transport->get_version_string ||!janus_transport->get_description ||!janus_transport->get_package ||!janus_transport->get_name ||!janus_transport->send_message ||!janus_transport->is_janus_api_enabled ||!janus_transport->is_admin_api_enabled ||!janus_transport->session_created ||!janus_transport->session_over ||!janus_transport->session_claimed) {JANUS_LOG(LOG_ERR, "\tMissing some mandatory methods/callbacks, skipping this transport plugin...\n");continue;}if(janus_transport->get_api_compatibility() < JANUS_TRANSPORT_API_VERSION) {JANUS_LOG(LOG_ERR, "The '%s' transport plugin was compiled against an older version of the API (%d < %d), skipping it: update it to enable it again\n",janus_transport->get_package(), janus_transport->get_api_compatibility(), JANUS_TRANSPORT_API_VERSION);continue;}if(janus_transport->init(&janus_handler_transport, configs_folder) < 0) {JANUS_LOG(LOG_WARN, "The '%s' plugin could not be initialized\n", janus_transport->get_package());dlclose(transport);continue;}JANUS_LOG(LOG_VERB, "\tVersion: %d (%s)\n", janus_transport->get_version(), janus_transport->get_version_string());JANUS_LOG(LOG_VERB, "\t   [%s] %s\n", janus_transport->get_package(), janus_transport->get_name());JANUS_LOG(LOG_VERB, "\t   %s\n", janus_transport->get_description());JANUS_LOG(LOG_VERB, "\t   Plugin API version: %d\n", janus_transport->get_api_compatibility());JANUS_LOG(LOG_VERB, "\t   Janus API: %s\n", janus_transport->is_janus_api_enabled() ? "enabled" : "disabled");JANUS_LOG(LOG_VERB, "\t   Admin API: %s\n", janus_transport->is_admin_api_enabled() ? "enabled" : "disabled");janus_api_enabled = janus_api_enabled || janus_transport->is_janus_api_enabled();admin_api_enabled = admin_api_enabled || janus_transport->is_admin_api_enabled();if(transports == NULL)transports = g_hash_table_new(g_str_hash, g_str_equal);g_hash_table_insert(transports, (gpointer)janus_transport->get_package(), janus_transport);if(transports_so == NULL)transports_so = g_hash_table_new(g_str_hash, g_str_equal);g_hash_table_insert(transports_so, (gpointer)janus_transport->get_package(), transport);}}closedir(dir);if(disabled_transports != NULL)g_strfreev(disabled_transports);disabled_transports = NULL;

其中,janus_transport->init(&janus_handler_transport, configs_folder)初始化设置了Transport的回调处理 janus_handler_transport
在这里插入图片描述

3. 请求处理过程

在这里插入图片描述

过程如下所述:

  1. Janus客户端连接Transport发起请求;
  2. Transport收到请求后,执行janus_xxx_handler处理,调用Janus主线程的回调函数incoming_request;
  3. Janus主线程的incoming_request将请求加入请求队列;
  4. 请求处理线程请求队列中获取请求信息,执行janus_transport_requests处理;
    • 若请求需同步处理,请求处理线程执行janus_process_incoming_request,调用Plugin的handle_message函数进行处理;
    • 若请求需异步处理,请求处理线程将请求加入任务列表任务处理线程池执行janus_transport_task函数,执行janus_process_incoming_request,调用Plugin的handle_message函数进行处理;
  5. Plugin的handle_message将处理结果加入Plugin消息队列;
  6. Plugin消息处理线程Plugin消息队列获取处理结果,执行janus_yyy_handler处理,调用Janus主线程的回调函数push_event;
  7. Janus主线程的push_event调用Transport的send_message进行处理;
  8. Transport将处理完的结果返回给Janus客户端

3.1 Transport启动服务时设置请求处理函数(以transport/janus_http.c为例)

在这里插入图片描述

janus_http_handler 详细代码:

/* WebServer requests handler */
int janus_http_handler(void *cls, struct MHD_Connection *connection, const char *url, const char *method, const char *version, const char *upload_data, size_t *upload_data_size, void **ptr)
{char *payload = NULL;json_t *root = NULL;struct MHD_Response *response = NULL;int ret = MHD_NO;gchar *session_path = NULL, *handle_path = NULL;gchar **basepath = NULL, **path = NULL;guint64 session_id = 0, handle_id = 0;/* Is this the first round? */int firstround = 0;janus_transport_session *ts = (janus_transport_session *)*ptr;janus_http_msg *msg = NULL;if(ts == NULL) {firstround = 1;JANUS_LOG(LOG_DBG, "Got a HTTP %s request on %s...\n", method, url);JANUS_LOG(LOG_DBG, " ... Just parsing headers for now...\n");msg = g_malloc0(sizeof(janus_http_msg));msg->connection = connection;janus_mutex_init(&msg->wait_mutex);janus_condition_init(&msg->wait_cond);ts = janus_transport_session_create(msg, janus_http_msg_destroy);janus_mutex_lock(&messages_mutex);g_hash_table_insert(messages, ts, ts);janus_mutex_unlock(&messages_mutex);*ptr = ts;MHD_get_connection_values(connection, MHD_HEADER_KIND, &janus_http_headers, msg);ret = MHD_YES;/* Notify handlers about this new transport instance */if(notify_events && gateway->events_is_enabled()) {json_t *info = json_object();json_object_set_new(info, "event", json_string("request"));json_object_set_new(info, "admin_api", json_false());const union MHD_ConnectionInfo *conninfo = MHD_get_connection_info(connection, MHD_CONNECTION_INFO_CLIENT_ADDRESS);if(conninfo != NULL) {janus_network_address addr;janus_network_address_string_buffer addr_buf;if(janus_network_address_from_sockaddr((struct sockaddr *)conninfo->client_addr, &addr) == 0 &&janus_network_address_to_string_buffer(&addr, &addr_buf) == 0) {const char *ip = janus_network_address_string_from_buffer(&addr_buf);json_object_set_new(info, "ip", json_string(ip));}uint16_t port = janus_http_sockaddr_to_port((struct sockaddr *)conninfo->client_addr);json_object_set_new(info, "port", json_integer(port));}gateway->notify_event(&janus_http_transport, ts, info);}} else {JANUS_LOG(LOG_DBG, "Processing HTTP %s request on %s...\n", method, url);msg = (janus_http_msg *)ts->transport_p;}/* Parse request */if (strcasecmp(method, "GET") && strcasecmp(method, "POST") && strcasecmp(method, "OPTIONS")) {ret = janus_http_return_error(ts, 0, NULL, JANUS_ERROR_TRANSPORT_SPECIFIC, "Unsupported method %s", method);goto done;}if (!strcasecmp(method, "OPTIONS")) {response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);janus_http_add_cors_headers(msg, response);ret = MHD_queue_response(connection, MHD_HTTP_OK, response);MHD_destroy_response(response);}/* Get path components */if(strcasecmp(url, ws_path)) {if(strlen(ws_path) > 1) {basepath = g_strsplit(url, ws_path, -1);} else {/* The base path is the web server too itself, we process the url itself */basepath = g_malloc_n(3, sizeof(char *));basepath[0] = g_strdup("/");basepath[1] = g_strdup(url);basepath[2] = NULL;}if(basepath[0] == NULL || basepath[1] == NULL || basepath[1][0] != '/') {JANUS_LOG(LOG_ERR, "Invalid url %s\n", url);response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);janus_http_add_cors_headers(msg, response);ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);MHD_destroy_response(response);}if(firstround) {g_strfreev(basepath);return ret;}path = g_strsplit(basepath[1], "/", -1);if(path == NULL || path[1] == NULL) {JANUS_LOG(LOG_ERR, "Invalid path %s (%s)\n", basepath[1], path ? path[1] : "");response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);janus_http_add_cors_headers(msg, response);ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);MHD_destroy_response(response);g_strfreev(basepath);g_strfreev(path);return ret;}}if(firstround) {g_strfreev(basepath);g_strfreev(path);return ret;}JANUS_LOG(LOG_DBG, " ... parsing request...\n");if(path != NULL && path[1] != NULL && strlen(path[1]) > 0) {session_path = g_strdup(path[1]);JANUS_LOG(LOG_HUGE, "Session: %s\n", session_path);}if(session_path != NULL && path[2] != NULL && strlen(path[2]) > 0) {handle_path = g_strdup(path[2]);JANUS_LOG(LOG_HUGE, "Handle: %s\n", handle_path);}if(session_path != NULL && handle_path != NULL && path[3] != NULL && strlen(path[3]) > 0) {JANUS_LOG(LOG_ERR, "Too many components...\n");response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);janus_http_add_cors_headers(msg, response);ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);MHD_destroy_response(response);goto done;}/* Get payload, if any */if(!strcasecmp(method, "POST")) {JANUS_LOG(LOG_HUGE, "Processing POST data (%s) (%zu bytes)...\n", msg->contenttype, *upload_data_size);if(*upload_data_size != 0) {msg->payload = g_realloc(msg->payload, msg->len+*upload_data_size+1);memcpy(msg->payload+msg->len, upload_data, *upload_data_size);msg->len += *upload_data_size;memset(msg->payload + msg->len, '\0', 1);JANUS_LOG(LOG_DBG, "  -- Data we have now (%zu bytes)\n", msg->len);*upload_data_size = 0;	/* Go on */ret = MHD_YES;goto done;}JANUS_LOG(LOG_DBG, "Done getting payload, we can answer\n");if(msg->payload == NULL) {JANUS_LOG(LOG_ERR, "No payload :-(\n");ret = MHD_NO;goto done;}payload = msg->payload;JANUS_LOG(LOG_HUGE, "%s\n", payload);}/* Is this a generic request for info? */if(session_path != NULL && !strcmp(session_path, "info")) {/* The info REST endpoint, if contacted through a GET, provides information on the Janus core */if(strcasecmp(method, "GET")) {response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);janus_http_add_cors_headers(msg, response);ret = MHD_queue_response(connection, MHD_HTTP_BAD_REQUEST, response);MHD_destroy_response(response);goto done;}/* Turn this into a fake "info" request */method = "POST";char tr[12];janus_http_random_string(12, (char *)&tr);root = json_object();json_object_set_new(root, "janus", json_string("info"));json_object_set_new(root, "transaction", json_string(tr));goto parsingdone;}/* Or maybe a long poll */if(!strcasecmp(method, "GET") || !payload) {session_id = session_path ? g_ascii_strtoull(session_path, NULL, 10) : 0;if(session_id < 1) {JANUS_LOG(LOG_ERR, "Invalid session %s\n", session_path);response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);janus_http_add_cors_headers(msg, response);ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);MHD_destroy_response(response);goto done;}msg->session_id = session_id;/* Since we handle long polls ourselves, the core isn't involved (if not for providing us with events)* A long poll, though, can act as a keepalive, so we pass a fake one to the core to avoid undesirable timeouts *//* First of all, though, API secret and token based authentication may be enabled in the core, so since* we're bypassing it for notifications we'll have to check those ourselves */const char *secret = MHD_lookup_connection_value(connection, MHD_GET_ARGUMENT_KIND, "apisecret");const char *token = MHD_lookup_connection_value(connection, MHD_GET_ARGUMENT_KIND, "token");gboolean secret_authorized = FALSE, token_authorized = FALSE;if(!gateway->is_api_secret_needed(&janus_http_transport) && !gateway->is_auth_token_needed(&janus_http_transport)) {/* Nothing to check */secret_authorized = TRUE;token_authorized = TRUE;} else {if(gateway->is_api_secret_valid(&janus_http_transport, secret)) {/* API secret is valid */secret_authorized = TRUE;}if(gateway->is_auth_token_valid(&janus_http_transport, token)) {/* Token is valid */token_authorized = TRUE;}/* We consider a request authorized if either the proper API secret or a valid token has been provided */if(!secret_authorized && !token_authorized) {response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);janus_http_add_cors_headers(msg, response);ret = MHD_queue_response(connection, MHD_HTTP_FORBIDDEN, response);MHD_destroy_response(response);goto done;}}/* Ok, go on with the keepalive */char tr[12];janus_http_random_string(12, (char *)&tr);root = json_object();json_object_set_new(root, "janus", json_string("keepalive"));json_object_set_new(root, "session_id", json_integer(session_id));json_object_set_new(root, "transaction", json_string(tr));if(secret)json_object_set_new(root, "apisecret", json_string(secret));if(token)json_object_set_new(root, "token", json_string(token));gateway->incoming_request(&janus_http_transport, ts, (void *)keepalive_id, FALSE, root, NULL);/* Ok, go on */if(handle_path) {char *location = g_malloc(strlen(ws_path) + strlen(session_path) + 2);g_sprintf(location, "%s/%s", ws_path, session_path);JANUS_LOG(LOG_ERR, "Invalid GET to %s, redirecting to %s\n", url, location);response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);MHD_add_response_header(response, "Location", location);janus_http_add_cors_headers(msg, response);ret = MHD_queue_response(connection, 302, response);MHD_destroy_response(response);g_free(location);goto done;}janus_mutex_lock(&sessions_mutex);janus_http_session *session = g_hash_table_lookup(sessions, &session_id);janus_mutex_unlock(&sessions_mutex);if(!session || g_atomic_int_get(&session->destroyed)) {JANUS_LOG(LOG_ERR, "Couldn't find any session %"SCNu64"...\n", session_id);response = MHD_create_response_from_buffer(0, NULL, MHD_RESPMEM_PERSISTENT);janus_http_add_cors_headers(msg, response);ret = MHD_queue_response(connection, MHD_HTTP_NOT_FOUND, response);MHD_destroy_response(response);goto done;}janus_refcount_increase(&ts->ref);janus_refcount_increase(&session->ref);/* How many messages can we send back in a single response? (just one by default) */int max_events = 1;const char *maxev = MHD_lookup_connection_value(connection, MHD_GET_ARGUMENT_KIND, "maxev");if(maxev != NULL) {max_events = atoi(maxev);if(max_events < 1) {JANUS_LOG(LOG_WARN, "Invalid maxev parameter passed (%d), defaulting to 1\n", max_events);max_events = 1;}}JANUS_LOG(LOG_VERB, "Session %"SCNu64" found... returning up to %d messages\n", session_id, max_events);/* Handle GET, taking the first message from the list */json_t *event = g_async_queue_try_pop(session->events);if(event != NULL) {if(max_events == 1) {/* Return just this message and leave */gchar *event_text = json_dumps(event, json_format);json_decref(event);ret = janus_http_return_success(ts, event_text);} else {/* The application is willing to receive more events at the same time, anything to report? */json_t *list = json_array();json_array_append_new(list, event);int events = 1;while(events < max_events) {event = g_async_queue_try_pop(session->events);if(event == NULL)break;json_array_append_new(list, event);events++;}/* Return the array of messages and leave */gchar *list_text = json_dumps(list, json_format);json_decref(list);ret = janus_http_return_success(ts, list_text);}} else {/* Still no message, wait */ret = janus_http_notifier(ts, session, max_events);}janus_refcount_decrease(&session->ref);janus_refcount_decrease(&ts->ref);goto done;}json_error_t error;/* Parse the JSON payload */root = json_loads(payload, 0, &error);if(!root) {ret = janus_http_return_error(ts, 0, NULL, JANUS_ERROR_INVALID_JSON, "JSON error: on line %d: %s", error.line, error.text);goto done;}if(!json_is_object(root)) {ret = janus_http_return_error(ts, 0, NULL, JANUS_ERROR_INVALID_JSON_OBJECT, "JSON error: not an object");json_decref(root);goto done;}parsingdone:/* Check if we have session and handle identifiers, and how they were provided */session_id = json_integer_value(json_object_get(root, "session_id"));if(session_id && session_path && g_ascii_strtoull(session_path, NULL, 10) != session_id) {ret = janus_http_return_error(ts, 0, NULL, JANUS_ERROR_INVALID_REQUEST_PATH, "Conflicting session ID (payload and path)");json_decref(root);goto done;}if(!session_id) {/* No session ID in the JSON object, maybe in the path? */session_id = session_path ? g_ascii_strtoull(session_path, NULL, 10) : 0;if(session_id > 0)json_object_set_new(root, "session_id", json_integer(session_id));}handle_id = json_integer_value(json_object_get(root, "handle_id"));if(handle_id && handle_path && g_ascii_strtoull(handle_path, NULL, 10) != handle_id) {ret = janus_http_return_error(ts, 0, NULL, JANUS_ERROR_INVALID_REQUEST_PATH, "Conflicting handle ID (payload and path)");json_decref(root);goto done;}if(!handle_id) {/* No session ID in the JSON object, maybe in the path? */handle_id = handle_path ? g_ascii_strtoull(handle_path, NULL, 10) : 0;if(handle_id > 0)json_object_set_new(root, "handle_id", json_integer(handle_id));}/* Suspend the connection and pass the ball to the core */JANUS_LOG(LOG_HUGE, "Forwarding request to the core (%p)\n", ts);gateway->incoming_request(&janus_http_transport, ts, ts, FALSE, root, &error);/* Wait for a response (but not forever) */
#ifndef USE_PTHREAD_MUTEXgint64 wakeup = janus_get_monotonic_time() + 10*G_TIME_SPAN_SECOND;janus_mutex_lock(&msg->wait_mutex);while(!msg->got_response) {int res = janus_condition_wait_until(&msg->wait_cond, &msg->wait_mutex, wakeup);if(msg->got_response || !res)break;}janus_mutex_unlock(&msg->wait_mutex);
#elsestruct timeval now;gettimeofday(&now, NULL);struct timespec wakeup;wakeup.tv_sec = now.tv_sec+10;	/* Wait at max 10 seconds for a response */wakeup.tv_nsec = now.tv_usec*1000UL;janus_mutex_lock(&msg->wait_mutex);while(!msg->got_response) {int res = janus_condition_timedwait(&msg->wait_cond, &msg->wait_mutex, &wakeup);if(msg->got_response || res == ETIMEDOUT)break;}janus_mutex_unlock(&msg->wait_mutex);
#endifif(!msg->response) {ret = MHD_NO;} else {char *response_text = json_dumps(msg->response, json_format);json_decref(msg->response);msg->response = NULL;ret = janus_http_return_success(ts, response_text);}done:g_strfreev(basepath);g_strfreev(path);g_free(session_path);g_free(handle_path);return ret;
}

回调 incoming_request ,并等待响应直至返回结果或超时:
在这里插入图片描述

3.2 Janus将请求加入请求队列

在这里插入图片描述

3.3 请求处理线程处理函数janus_transport_requests

在这里插入图片描述

  • 若是管理请求,执行janus_process_incoming_admin_request同步处理;
  • 若不是管理请求
    • janus==message,将请求加入任务队列,采用janus_transport_task异步处理,调用janus_process_incoming_request进行处理;
    • janus!=message,执行janus_process_incoming_request同步处理;

3.4 处理请求函数janus_process_incoming_request

int janus_process_incoming_request(janus_request *request) {int ret = -1;if(request == NULL) {JANUS_LOG(LOG_ERR, "Missing request or payload to process, giving up...\n");return ret;}int error_code = 0;char error_cause[100];json_t *root = request->message;/* Ok, let's start with the ids */guint64 session_id = 0, handle_id = 0;json_t *s = json_object_get(root, "session_id");if(s && json_is_integer(s))session_id = json_integer_value(s);json_t *h = json_object_get(root, "handle_id");if(h && json_is_integer(h))handle_id = json_integer_value(h);janus_session *session = NULL;janus_ice_handle *handle = NULL;/* Get transaction and message request */JANUS_VALIDATE_JSON_OBJECT(root, incoming_request_parameters,error_code, error_cause, FALSE,JANUS_ERROR_MISSING_MANDATORY_ELEMENT, JANUS_ERROR_INVALID_ELEMENT_TYPE);if(error_code != 0) {ret = janus_process_error_string(request, session_id, NULL, error_code, error_cause);goto jsondone;}json_t *transaction = json_object_get(root, "transaction");const gchar *transaction_text = json_string_value(transaction);json_t *message = json_object_get(root, "janus");const gchar *message_text = json_string_value(message);if(session_id == 0 && handle_id == 0) {/* Can only be a 'Create new session', a 'Get info' or a 'Ping/Pong' request */if(!strcasecmp(message_text, "info")) {ret = janus_process_success(request, janus_info(transaction_text));goto jsondone;}if(!strcasecmp(message_text, "ping")) {/* Prepare JSON reply */json_t *reply = janus_create_message("pong", 0, transaction_text);ret = janus_process_success(request, reply);goto jsondone;}if(strcasecmp(message_text, "create")) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);goto jsondone;}/* Make sure we're accepting new sessions */if(!accept_new_sessions) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_NOT_ACCEPTING_SESSIONS, NULL);goto jsondone;}/* Any secret/token to check? */ret = janus_request_check_secret(request, session_id, transaction_text);if(ret != 0) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_UNAUTHORIZED, NULL);goto jsondone;}session_id = 0;json_t *id = json_object_get(root, "id");if(id != NULL) {/* The application provided the session ID to use */session_id = json_integer_value(id);if(session_id > 0 && (session = janus_session_find(session_id)) != NULL) {/* Session ID already taken */ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_SESSION_CONFLICT, "Session ID already in use");goto jsondone;}}/* Handle it */session = janus_session_create(session_id);if(session == NULL) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_UNKNOWN, "Memory error");goto jsondone;}session_id = session->session_id;/* We increase the counter as this request is using the session */janus_refcount_increase(&session->ref);/* Take note of the request source that originated this session (HTTP, WebSockets, RabbitMQ?) */session->source = janus_request_new(request->transport, request->instance, NULL, FALSE, NULL);/* Notify the source that a new session has been created */request->transport->session_created(request->instance, session->session_id);/* Notify event handlers */if(janus_events_is_enabled()) {/* Session created, add info on the transport that originated it */json_t *transport = json_object();json_object_set_new(transport, "transport", json_string(session->source->transport->get_package()));char id[32];memset(id, 0, sizeof(id));g_snprintf(id, sizeof(id), "%p", session->source->instance);json_object_set_new(transport, "id", json_string(id));janus_events_notify_handlers(JANUS_EVENT_TYPE_SESSION, session_id, "created", transport);}/* Prepare JSON reply */json_t *reply = janus_create_message("success", 0, transaction_text);json_t *data = json_object();json_object_set_new(data, "id", json_integer(session_id));json_object_set_new(reply, "data", data);/* Send the success reply */ret = janus_process_success(request, reply);goto jsondone;}if(session_id < 1) {JANUS_LOG(LOG_ERR, "Invalid session\n");ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_SESSION_NOT_FOUND, NULL);goto jsondone;}if(h && handle_id < 1) {JANUS_LOG(LOG_ERR, "Invalid handle\n");ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_SESSION_NOT_FOUND, NULL);goto jsondone;}/* Go on with the processing */ret = janus_request_check_secret(request, session_id, transaction_text);if(ret != 0) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_UNAUTHORIZED, NULL);goto jsondone;}/* If we got here, make sure we have a session (and/or a handle) */session = janus_session_find(session_id);if(!session) {JANUS_LOG(LOG_ERR, "Couldn't find any session %"SCNu64"...\n", session_id);ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_SESSION_NOT_FOUND, "No such session %"SCNu64"", session_id);goto jsondone;}/* Update the last activity timer */session->last_activity = janus_get_monotonic_time();handle = NULL;if(handle_id > 0) {handle = janus_session_handles_find(session, handle_id);if(!handle) {JANUS_LOG(LOG_ERR, "Couldn't find any handle %"SCNu64" in session %"SCNu64"...\n", handle_id, session_id);ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_HANDLE_NOT_FOUND, "No such handle %"SCNu64" in session %"SCNu64"", handle_id, session_id);goto jsondone;}}/* What is this? */if(!strcasecmp(message_text, "keepalive")) {/* Just a keep-alive message, reply with an ack */JANUS_LOG(LOG_VERB, "Got a keep-alive on session %"SCNu64"\n", session_id);json_t *reply = janus_create_message("ack", session_id, transaction_text);/* Send the success reply */ret = janus_process_success(request, reply);} else if(!strcasecmp(message_text, "attach")) {if(handle != NULL) {/* Attach is a session-level command */ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);goto jsondone;}JANUS_VALIDATE_JSON_OBJECT(root, attach_parameters,error_code, error_cause, FALSE,JANUS_ERROR_MISSING_MANDATORY_ELEMENT, JANUS_ERROR_INVALID_ELEMENT_TYPE);if(error_code != 0) {ret = janus_process_error_string(request, session_id, transaction_text, error_code, error_cause);goto jsondone;}json_t *plugin = json_object_get(root, "plugin");const gchar *plugin_text = json_string_value(plugin);janus_plugin *plugin_t = janus_plugin_find(plugin_text);if(plugin_t == NULL) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_PLUGIN_NOT_FOUND, "No such plugin '%s'", plugin_text);goto jsondone;}/* If the auth token mechanism is enabled, we should check if this token can access this plugin */if(janus_auth_is_enabled()) {json_t *token = json_object_get(root, "token");if(token != NULL) {const char *token_value = json_string_value(token);if(token_value && !janus_auth_check_plugin(token_value, plugin_t)) {JANUS_LOG(LOG_ERR, "Token '%s' can't access plugin '%s'\n", token_value, plugin_text);ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_UNAUTHORIZED_PLUGIN, "Provided token can't access plugin '%s'", plugin_text);goto jsondone;}}}json_t *opaque = json_object_get(root, "opaque_id");const char *opaque_id = opaque ? json_string_value(opaque) : NULL;/* Create handle */handle = janus_ice_handle_create(session, opaque_id);if(handle == NULL) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_UNKNOWN, "Memory error");goto jsondone;}handle_id = handle->handle_id;/* We increase the counter as this request is using the handle */janus_refcount_increase(&handle->ref);/* Attach to the plugin */int error = 0;if((error = janus_ice_handle_attach_plugin(session, handle, plugin_t)) != 0) {/* TODO Make error struct to pass verbose information */janus_session_handles_remove(session, handle);JANUS_LOG(LOG_ERR, "Couldn't attach to plugin '%s', error '%d'\n", plugin_text, error);ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_PLUGIN_ATTACH, "Couldn't attach to plugin: error '%d'", error);goto jsondone;}/* Prepare JSON reply */json_t *reply = janus_create_message("success", session_id, transaction_text);json_t *data = json_object();json_object_set_new(data, "id", json_integer(handle_id));json_object_set_new(reply, "data", data);/* Send the success reply */ret = janus_process_success(request, reply);} else if(!strcasecmp(message_text, "destroy")) {if(handle != NULL) {/* Query is a session-level command */ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);goto jsondone;}janus_mutex_lock(&sessions_mutex);g_hash_table_remove(sessions, &session->session_id);janus_mutex_unlock(&sessions_mutex);/* Notify the source that the session has been destroyed */if(session->source && session->source->transport) {session->source->transport->session_over(session->source->instance, session->session_id, FALSE, FALSE);}/* Schedule the session for deletion */janus_session_destroy(session);/* Prepare JSON reply */json_t *reply = janus_create_message("success", session_id, transaction_text);/* Send the success reply */ret = janus_process_success(request, reply);/* Notify event handlers as well */if(janus_events_is_enabled())janus_events_notify_handlers(JANUS_EVENT_TYPE_SESSION, session_id, "destroyed", NULL);} else if(!strcasecmp(message_text, "detach")) {if(handle == NULL) {/* Query is an handle-level command */ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);goto jsondone;}if(handle->app == NULL || handle->app_handle == NULL) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_PLUGIN_DETACH, "No plugin to detach from");goto jsondone;}int error = janus_session_handles_remove(session, handle);if(error != 0) {/* TODO Make error struct to pass verbose information */ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_PLUGIN_DETACH, "Couldn't detach from plugin: error '%d'", error);/* TODO Delete handle instance */goto jsondone;}/* Prepare JSON reply */json_t *reply = janus_create_message("success", session_id, transaction_text);/* Send the success reply */ret = janus_process_success(request, reply);} else if(!strcasecmp(message_text, "hangup")) {if(handle == NULL) {/* Query is an handle-level command */ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);goto jsondone;}if(handle->app == NULL || handle->app_handle == NULL) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_PLUGIN_DETACH, "No plugin attached");goto jsondone;}janus_ice_webrtc_hangup(handle, "Janus API");/* Prepare JSON reply */json_t *reply = janus_create_message("success", session_id, transaction_text);/* Send the success reply */ret = janus_process_success(request, reply);} else if(!strcasecmp(message_text, "claim")) {janus_mutex_lock(&session->mutex);if(session->source != NULL) {/* Notify the old transport that this session is over for them, but has been reclaimed */session->source->transport->session_over(session->source->instance, session->session_id, FALSE, TRUE);janus_request_destroy(session->source);session->source = NULL;}session->source = janus_request_new(request->transport, request->instance, NULL, FALSE, NULL);/* Notify the new transport that it has claimed a session */session->source->transport->session_claimed(session->source->instance, session->session_id);/* Previous transport may be gone, clear flag. */g_atomic_int_set(&session->transport_gone, 0);janus_mutex_unlock(&session->mutex);/* Prepare JSON reply */json_t *reply = json_object();json_object_set_new(reply, "janus", json_string("success"));json_object_set_new(reply, "session_id", json_integer(session_id));json_object_set_new(reply, "transaction", json_string(transaction_text));/* Send the success reply */ret = janus_process_success(request, reply);} else if(!strcasecmp(message_text, "message")) {if(handle == NULL) {/* Query is an handle-level command */ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);goto jsondone;}if(handle->app == NULL || handle->app_handle == NULL) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_PLUGIN_MESSAGE, "No plugin to handle this message");goto jsondone;}janus_plugin *plugin_t = (janus_plugin *)handle->app;JANUS_LOG(LOG_VERB, "[%"SCNu64"] There's a message for %s\n", handle->handle_id, plugin_t->get_name());JANUS_VALIDATE_JSON_OBJECT(root, body_parameters,error_code, error_cause, FALSE,JANUS_ERROR_MISSING_MANDATORY_ELEMENT, JANUS_ERROR_INVALID_ELEMENT_TYPE);if(error_code != 0) {ret = janus_process_error_string(request, session_id, transaction_text, error_code, error_cause);goto jsondone;}json_t *body = json_object_get(root, "body");/* Is there an SDP attached? */json_t *jsep = json_object_get(root, "jsep");char *jsep_type = NULL;char *jsep_sdp = NULL, *jsep_sdp_stripped = NULL;gboolean renegotiation = FALSE;if(jsep != NULL) {if(!json_is_object(jsep)) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_JSON_OBJECT, "Invalid jsep object");goto jsondone;}JANUS_VALIDATE_JSON_OBJECT_FORMAT("JSEP error: missing mandatory element (%s)","JSEP error: invalid element type (%s should be %s)",jsep, jsep_parameters, error_code, error_cause, FALSE,JANUS_ERROR_MISSING_MANDATORY_ELEMENT, JANUS_ERROR_INVALID_ELEMENT_TYPE);if(error_code != 0) {ret = janus_process_error_string(request, session_id, transaction_text, error_code, error_cause);goto jsondone;}json_t *type = json_object_get(jsep, "type");jsep_type = g_strdup(json_string_value(type));type = NULL;gboolean do_trickle = TRUE;json_t *jsep_trickle = json_object_get(jsep, "trickle");do_trickle = jsep_trickle ? json_is_true(jsep_trickle) : TRUE;/* Are we still cleaning up from a previous media session? */if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_CLEANING)) {JANUS_LOG(LOG_VERB, "[%"SCNu64"] Still cleaning up from a previous media session, let's wait a bit...\n", handle->handle_id);gint64 waited = 0;while(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_CLEANING)) {g_usleep(100000);waited += 100000;if(waited >= 3*G_USEC_PER_SEC) {JANUS_LOG(LOG_VERB, "[%"SCNu64"]   -- Waited 3 seconds, that's enough!\n", handle->handle_id);ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_WEBRTC_STATE, "Still cleaning a previous session");goto jsondone;}}}/* Check if we're renegotiating (if we have an answer, we did an offer/answer round already) */renegotiation = janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_NEGOTIATED);/* Check the JSEP type */janus_mutex_lock(&handle->mutex);int offer = 0;if(!strcasecmp(jsep_type, "offer")) {offer = 1;janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_GOT_OFFER);janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_GOT_ANSWER);} else if(!strcasecmp(jsep_type, "answer")) {janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_GOT_ANSWER);if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_GOT_OFFER))janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_NEGOTIATED);offer = 0;} else {/* TODO Handle other message types as well */ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_JSEP_UNKNOWN_TYPE, "JSEP error: unknown message type '%s'", jsep_type);g_free(jsep_type);janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);janus_mutex_unlock(&handle->mutex);goto jsondone;}json_t *sdp = json_object_get(jsep, "sdp");jsep_sdp = (char *)json_string_value(sdp);JANUS_LOG(LOG_VERB, "[%"SCNu64"] Remote SDP:\n%s", handle->handle_id, jsep_sdp);/* Is this valid SDP? */char error_str[512];int audio = 0, video = 0, data = 0;janus_sdp *parsed_sdp = janus_sdp_preparse(handle, jsep_sdp, error_str, sizeof(error_str), &audio, &video, &data);if(parsed_sdp == NULL) {/* Invalid SDP */ret = janus_process_error_string(request, session_id, transaction_text, JANUS_ERROR_JSEP_INVALID_SDP, error_str);g_free(jsep_type);janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);janus_mutex_unlock(&handle->mutex);goto jsondone;}/* Notify event handlers */if(janus_events_is_enabled()) {janus_events_notify_handlers(JANUS_EVENT_TYPE_JSEP,session_id, handle_id, handle->opaque_id, "remote", jsep_type, jsep_sdp);}/* FIXME We're only handling single audio/video lines for now... */JANUS_LOG(LOG_VERB, "[%"SCNu64"] Audio %s been negotiated, Video %s been negotiated, SCTP/DataChannels %s been negotiated\n",handle->handle_id,audio ? "has" : "has NOT",video ? "has" : "has NOT",data ? "have" : "have NOT");if(audio > 1) {JANUS_LOG(LOG_WARN, "[%"SCNu64"] More than one audio line? only going to negotiate one...\n", handle->handle_id);}if(video > 1) {JANUS_LOG(LOG_WARN, "[%"SCNu64"] More than one video line? only going to negotiate one...\n", handle->handle_id);}if(data > 1) {JANUS_LOG(LOG_WARN, "[%"SCNu64"] More than one data line? only going to negotiate one...\n", handle->handle_id);}
#ifndef HAVE_SCTPif(data) {JANUS_LOG(LOG_WARN, "[%"SCNu64"]   -- DataChannels have been negotiated, but support for them has not been compiled...\n", handle->handle_id);}
#endif/* We behave differently if it's a new session or an update... */if(!renegotiation) {/* New session */if(offer) {/* Setup ICE locally (we received an offer) */if(janus_ice_setup_local(handle, offer, audio, video, data, do_trickle) < 0) {JANUS_LOG(LOG_ERR, "Error setting ICE locally\n");janus_sdp_destroy(parsed_sdp);g_free(jsep_type);janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_UNKNOWN, "Error setting ICE locally");janus_mutex_unlock(&handle->mutex);goto jsondone;}} else {/* Make sure we're waiting for an ANSWER in the first place */if(!handle->agent) {JANUS_LOG(LOG_ERR, "Unexpected ANSWER (did we offer?)\n");janus_sdp_destroy(parsed_sdp);g_free(jsep_type);janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_UNEXPECTED_ANSWER, "Unexpected ANSWER (did we offer?)");janus_mutex_unlock(&handle->mutex);goto jsondone;}}if(janus_sdp_process(handle, parsed_sdp, FALSE) < 0) {JANUS_LOG(LOG_ERR, "Error processing SDP\n");janus_sdp_destroy(parsed_sdp);g_free(jsep_type);janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_JSEP_INVALID_SDP, "Error processing SDP");janus_mutex_unlock(&handle->mutex);goto jsondone;}if(!offer) {/* Set remote candidates now (we received an answer) */if(do_trickle) {janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE);} else {janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE);}janus_request_ice_handle_answer(handle, audio, video, data, jsep_sdp);} else {/* Check if the mid RTP extension is being negotiated */handle->stream->mid_ext_id = janus_rtp_header_extension_get_id(jsep_sdp, JANUS_RTP_EXTMAP_MID);/* Check if the RTP Stream ID extension is being negotiated */handle->stream->rid_ext_id = janus_rtp_header_extension_get_id(jsep_sdp, JANUS_RTP_EXTMAP_RID);handle->stream->ridrtx_ext_id = janus_rtp_header_extension_get_id(jsep_sdp, JANUS_RTP_EXTMAP_REPAIRED_RID);/* Check if the frame marking ID extension is being negotiated */handle->stream->framemarking_ext_id = janus_rtp_header_extension_get_id(jsep_sdp, JANUS_RTP_EXTMAP_FRAME_MARKING);/* Check if transport wide CC is supported */int transport_wide_cc_ext_id = janus_rtp_header_extension_get_id(jsep_sdp, JANUS_RTP_EXTMAP_TRANSPORT_WIDE_CC);handle->stream->do_transport_wide_cc = transport_wide_cc_ext_id > 0 ? TRUE : FALSE;handle->stream->transport_wide_cc_ext_id = transport_wide_cc_ext_id;}} else {/* FIXME This is a renegotiation: we can currently only handle simple changes in media* direction and ICE restarts: anything more complex than that will result in an error */JANUS_LOG(LOG_INFO, "[%"SCNu64"] Negotiation update, checking what changed...\n", handle->handle_id);if(janus_sdp_process(handle, parsed_sdp, TRUE) < 0) {JANUS_LOG(LOG_ERR, "Error processing SDP\n");janus_sdp_destroy(parsed_sdp);g_free(jsep_type);janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_UNEXPECTED_ANSWER, "Error processing SDP");janus_mutex_unlock(&handle->mutex);goto jsondone;}if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ICE_RESTART)) {JANUS_LOG(LOG_INFO, "[%"SCNu64"] Restarting ICE...\n", handle->handle_id);/* Update remote credentials for ICE */if(handle->stream) {nice_agent_set_remote_credentials(handle->agent, handle->stream->stream_id,handle->stream->ruser, handle->stream->rpass);}/* FIXME We only need to do that for offers: if it's an answer, we did that already */if(offer) {janus_ice_restart(handle);} else {janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_ICE_RESTART);}/* If we're full-trickling, we'll need to resend the candidates later */if(janus_ice_is_full_trickle_enabled()) {janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_RESEND_TRICKLES);}}
#ifdef HAVE_SCTPif(!offer) {/* Were datachannels just added? */if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_DATA_CHANNELS)) {janus_ice_stream *stream = handle->stream;if(stream != NULL && stream->component != NULL&& stream->component->dtls != NULL && stream->component->dtls->sctp == NULL) {/* Create SCTP association as well */JANUS_LOG(LOG_WARN, "[%"SCNu64"] Creating datachannels...\n", handle->handle_id);janus_dtls_srtp_create_sctp(stream->component->dtls);}}}
#endif}char *tmp = handle->remote_sdp;handle->remote_sdp = g_strdup(jsep_sdp);g_free(tmp);janus_mutex_unlock(&handle->mutex);/* Anonymize SDP */if(janus_sdp_anonymize(parsed_sdp) < 0) {/* Invalid SDP */ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_JSEP_INVALID_SDP, "JSEP error: invalid SDP");janus_sdp_destroy(parsed_sdp);g_free(jsep_type);janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);goto jsondone;}jsep_sdp_stripped = janus_sdp_write(parsed_sdp);janus_sdp_destroy(parsed_sdp);sdp = NULL;janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);}/* Make sure the app handle is still valid */if(handle->app == NULL || !janus_plugin_session_is_alive(handle->app_handle)) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_PLUGIN_MESSAGE, "No plugin to handle this message");g_free(jsep_type);g_free(jsep_sdp_stripped);janus_flags_clear(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER);goto jsondone;}/* Send the message to the plugin (which must eventually free transaction_text and unref the two objects, body and jsep) */json_incref(body);json_t *body_jsep = NULL;if(jsep_sdp_stripped) {body_jsep = json_pack("{ssss}", "type", jsep_type, "sdp", jsep_sdp_stripped);/* Check if simulcasting is enabled */if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_HAS_VIDEO)) {if(handle->stream && (handle->stream->rid[0] || handle->stream->video_ssrc_peer[1])) {json_t *simulcast = json_object();/* If we have rids, pass those, otherwise pass the SSRCs */if(handle->stream->rid[0]) {json_t *rids = json_array();json_array_append_new(rids, json_string(handle->stream->rid[0]));if(handle->stream->rid[1])json_array_append_new(rids, json_string(handle->stream->rid[1]));if(handle->stream->rid[2])json_array_append_new(rids, json_string(handle->stream->rid[2]));json_object_set_new(simulcast, "rids", rids);json_object_set_new(simulcast, "rid-ext", json_integer(handle->stream->rid_ext_id));} else {json_t *ssrcs = json_array();json_array_append_new(ssrcs, json_integer(handle->stream->video_ssrc_peer[0]));if(handle->stream->video_ssrc_peer[1])json_array_append_new(ssrcs, json_integer(handle->stream->video_ssrc_peer[1]));if(handle->stream->video_ssrc_peer[2])json_array_append_new(ssrcs, json_integer(handle->stream->video_ssrc_peer[2]));json_object_set_new(simulcast, "ssrcs", ssrcs);}if(handle->stream->framemarking_ext_id > 0)json_object_set_new(simulcast, "framemarking-ext", json_integer(handle->stream->framemarking_ext_id));json_object_set_new(body_jsep, "simulcast", simulcast);}}/* Check if this is a renegotiation or update */if(renegotiation)json_object_set_new(body_jsep, "update", json_true());}janus_plugin_result *result = plugin_t->handle_message(handle->app_handle,g_strdup((char *)transaction_text), body, body_jsep);g_free(jsep_type);g_free(jsep_sdp_stripped);if(result == NULL) {/* Something went horribly wrong! */ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_PLUGIN_MESSAGE, "Plugin didn't give a result");goto jsondone;}if(result->type == JANUS_PLUGIN_OK) {/* The plugin gave a result already (synchronous request/response) */if(result->content == NULL || !json_is_object(result->content)) {/* Missing content, or not a JSON object */ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_PLUGIN_MESSAGE,result->content == NULL ?"Plugin didn't provide any content for this synchronous response" :"Plugin returned an invalid JSON response");janus_plugin_result_destroy(result);goto jsondone;}/* Reference the content, as destroying the result instance will decref it */json_incref(result->content);/* Prepare JSON response */json_t *reply = janus_create_message("success", session->session_id, transaction_text);json_object_set_new(reply, "sender", json_integer(handle->handle_id));if(janus_is_opaqueid_in_api_enabled() && handle->opaque_id != NULL)json_object_set_new(reply, "opaque_id", json_string(handle->opaque_id));json_t *plugin_data = json_object();json_object_set_new(plugin_data, "plugin", json_string(plugin_t->get_package()));json_object_set_new(plugin_data, "data", result->content);json_object_set_new(reply, "plugindata", plugin_data);/* Send the success reply */ret = janus_process_success(request, reply);} else if(result->type == JANUS_PLUGIN_OK_WAIT) {/* The plugin received the request but didn't process it yet, send an ack (asynchronous notifications may follow) */json_t *reply = janus_create_message("ack", session_id, transaction_text);if(result->text)json_object_set_new(reply, "hint", json_string(result->text));/* Send the success reply */ret = janus_process_success(request, reply);} else {/* Something went horribly wrong! */ret = janus_process_error_string(request, session_id, transaction_text, JANUS_ERROR_PLUGIN_MESSAGE,(char *)(result->text ? result->text : "Plugin returned a severe (unknown) error"));janus_plugin_result_destroy(result);goto jsondone;}janus_plugin_result_destroy(result);} else if(!strcasecmp(message_text, "trickle")) {if(handle == NULL) {/* Trickle is an handle-level command */ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_REQUEST_PATH, "Unhandled request '%s' at this path", message_text);goto jsondone;}if(handle->app == NULL || !janus_plugin_session_is_alive(handle->app_handle)) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_PLUGIN_MESSAGE, "No plugin to handle this trickle candidate");goto jsondone;}json_t *candidate = json_object_get(root, "candidate");json_t *candidates = json_object_get(root, "candidates");if(candidate == NULL && candidates == NULL) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_MISSING_MANDATORY_ELEMENT, "Missing mandatory element (candidate|candidates)");goto jsondone;}if(candidate != NULL && candidates != NULL) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_JSON, "Can't have both candidate and candidates");goto jsondone;}if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_CLEANING)) {JANUS_LOG(LOG_ERR, "[%"SCNu64"] Received a trickle, but still cleaning a previous session\n", handle->handle_id);ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_WEBRTC_STATE, "Still cleaning a previous session");goto jsondone;}janus_mutex_lock(&handle->mutex);if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE)) {/* It looks like this peer supports Trickle, after all */JANUS_LOG(LOG_VERB, "Handle %"SCNu64" supports trickle even if it didn't negotiate it...\n", handle->handle_id);janus_flags_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_TRICKLE);}/* Is there any stream ready? this trickle may get here before the SDP it relates to */if(handle->stream == NULL) {JANUS_LOG(LOG_WARN, "[%"SCNu64"] No stream, queueing this trickle as it got here before the SDP...\n", handle->handle_id);/* Enqueue this trickle candidate(s), we'll process this later */janus_ice_trickle *early_trickle = janus_ice_trickle_new(transaction_text, candidate ? candidate : candidates);handle->pending_trickles = g_list_append(handle->pending_trickles, early_trickle);/* Send the ack right away, an event will tell the application if the candidate(s) failed */goto trickledone;}/* Is the ICE stack ready already? */if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER) ||!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_GOT_OFFER) ||!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_GOT_ANSWER)) {const char *cause = NULL;if(janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_PROCESSING_OFFER))cause = "processing the offer";else if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_GOT_ANSWER))cause = "waiting for the answer";else if(!janus_flags_is_set(&handle->webrtc_flags, JANUS_ICE_HANDLE_WEBRTC_GOT_OFFER))cause = "waiting for the offer";JANUS_LOG(LOG_VERB, "[%"SCNu64"] Still %s, queueing this trickle to wait until we're done there...\n",handle->handle_id, cause);/* Enqueue this trickle candidate(s), we'll process this later */janus_ice_trickle *early_trickle = janus_ice_trickle_new(transaction_text, candidate ? candidate : candidates);handle->pending_trickles = g_list_append(handle->pending_trickles, early_trickle);/* Send the ack right away, an event will tell the application if the candidate(s) failed */goto trickledone;}if(candidate != NULL) {/* We got a single candidate */int error = 0;const char *error_string = NULL;if((error = janus_ice_trickle_parse(handle, candidate, &error_string)) != 0) {ret = janus_process_error(request, session_id, transaction_text, error, "%s", error_string);janus_mutex_unlock(&handle->mutex);goto jsondone;}} else {/* We got multiple candidates in an array */if(!json_is_array(candidates)) {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_INVALID_ELEMENT_TYPE, "candidates is not an array");janus_mutex_unlock(&handle->mutex);goto jsondone;}JANUS_LOG(LOG_VERB, "Got multiple candidates (%zu)\n", json_array_size(candidates));if(json_array_size(candidates) > 0) {/* Handle remote candidates */size_t i = 0;for(i=0; i<json_array_size(candidates); i++) {json_t *c = json_array_get(candidates, i);/* FIXME We don't care if any trickle fails to parse */janus_ice_trickle_parse(handle, c, NULL);}}}trickledone:janus_mutex_unlock(&handle->mutex);/* We reply right away, not to block the web server... */json_t *reply = janus_create_message("ack", session_id, transaction_text);/* Send the success reply */ret = janus_process_success(request, reply);} else {ret = janus_process_error(request, session_id, transaction_text, JANUS_ERROR_UNKNOWN_REQUEST, "Unknown request '%s'", message_text);}jsondone:/* Done processing */if(handle != NULL)janus_refcount_decrease(&handle->ref);if(session != NULL)janus_refcount_decrease(&session->ref);return ret;
}

3.5 Plugin消息处理函数handle_message(以plugins/janus_videoroom.c为例)

struct janus_plugin_result *janus_videoroom_handle_message(janus_plugin_session *handle, char *transaction, json_t *message, json_t *jsep) {if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))return janus_plugin_result_new(JANUS_PLUGIN_ERROR, g_atomic_int_get(&stopping) ? "Shutting down" : "Plugin not initialized", NULL);/* Pre-parse the message */int error_code = 0;char error_cause[512];json_t *root = message;json_t *response = NULL;janus_mutex_lock(&sessions_mutex);janus_videoroom_session *session = janus_videoroom_lookup_session(handle);if(!session) {janus_mutex_unlock(&sessions_mutex);JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");error_code = JANUS_VIDEOROOM_ERROR_UNKNOWN_ERROR;g_snprintf(error_cause, 512, "%s", "No session associated with this handle...");goto plugin_response;}/* Increase the reference counter for this session: we'll decrease it after we handle the message */janus_refcount_increase(&session->ref);janus_mutex_unlock(&sessions_mutex);if(g_atomic_int_get(&session->destroyed)) {JANUS_LOG(LOG_ERR, "Session has already been marked as destroyed...\n");error_code = JANUS_VIDEOROOM_ERROR_UNKNOWN_ERROR;g_snprintf(error_cause, 512, "%s", "Session has already been marked as destroyed...");goto plugin_response;}if(message == NULL) {JANUS_LOG(LOG_ERR, "No message??\n");error_code = JANUS_VIDEOROOM_ERROR_NO_MESSAGE;g_snprintf(error_cause, 512, "%s", "No message??");goto plugin_response;}if(!json_is_object(root)) {JANUS_LOG(LOG_ERR, "JSON error: not an object\n");error_code = JANUS_VIDEOROOM_ERROR_INVALID_JSON;g_snprintf(error_cause, 512, "JSON error: not an object");goto plugin_response;}/* Get the request first */JANUS_VALIDATE_JSON_OBJECT(root, request_parameters,error_code, error_cause, TRUE,JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT);if(error_code != 0)goto plugin_response;json_t *request = json_object_get(root, "request");/* Some requests ('create', 'destroy', 'exists', 'list') can be handled synchronously */const char *request_text = json_string_value(request);/* We have a separate method to process synchronous requests, as those may* arrive from the Admin API as well, and so we handle them the same way */response = janus_videoroom_process_synchronous_request(session, root);if(response != NULL) {/* We got a response, send it back */goto plugin_response;} else if(!strcasecmp(request_text, "join") || !strcasecmp(request_text, "joinandconfigure")|| !strcasecmp(request_text, "configure") || !strcasecmp(request_text, "publish") || !strcasecmp(request_text, "unpublish")|| !strcasecmp(request_text, "start") || !strcasecmp(request_text, "pause") || !strcasecmp(request_text, "switch")|| !strcasecmp(request_text, "leave")) {/* These messages are handled asynchronously */janus_videoroom_message *msg = g_malloc(sizeof(janus_videoroom_message));msg->handle = handle;msg->transaction = transaction;msg->message = root;msg->jsep = jsep;g_async_queue_push(messages, msg);return janus_plugin_result_new(JANUS_PLUGIN_OK_WAIT, NULL, NULL);} else {JANUS_LOG(LOG_VERB, "Unknown request '%s'\n", request_text);error_code = JANUS_VIDEOROOM_ERROR_INVALID_REQUEST;g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);}plugin_response:{if(error_code == 0 && !response) {error_code = JANUS_VIDEOROOM_ERROR_UNKNOWN_ERROR;g_snprintf(error_cause, 512, "Invalid response");}if(error_code != 0) {/* Prepare JSON error event */json_t *event = json_object();json_object_set_new(event, "videoroom", json_string("event"));json_object_set_new(event, "error_code", json_integer(error_code));json_object_set_new(event, "error", json_string(error_cause));response = event;}if(root != NULL)json_decref(root);if(jsep != NULL)json_decref(jsep);g_free(transaction);if(session != NULL)janus_refcount_decrease(&session->ref);return janus_plugin_result_new(JANUS_PLUGIN_OK, NULL, response);}
}

3.6 Plugin消息处理线程函数(以plugins/janus_videoroom.c为例)

/* Thread to handle incoming messages */
static void *janus_videoroom_handler(void *data) {JANUS_LOG(LOG_VERB, "Joining VideoRoom handler thread\n");janus_videoroom_message *msg = NULL;int error_code = 0;char error_cause[512];json_t *root = NULL;while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {msg = g_async_queue_pop(messages);if(msg == &exit_message)break;if(msg->handle == NULL) {janus_videoroom_message_free(msg);continue;}janus_videoroom *videoroom = NULL;janus_videoroom_publisher *participant = NULL;janus_mutex_lock(&sessions_mutex);janus_videoroom_session *session = janus_videoroom_lookup_session(msg->handle);if(!session) {janus_mutex_unlock(&sessions_mutex);JANUS_LOG(LOG_ERR, "No session associated with this handle...\n");janus_videoroom_message_free(msg);continue;}if(g_atomic_int_get(&session->destroyed)) {janus_mutex_unlock(&sessions_mutex);janus_videoroom_message_free(msg);continue;}janus_mutex_unlock(&sessions_mutex);/* Handle request */error_code = 0;root = NULL;if(msg->message == NULL) {JANUS_LOG(LOG_ERR, "No message??\n");error_code = JANUS_VIDEOROOM_ERROR_NO_MESSAGE;g_snprintf(error_cause, 512, "%s", "No message??");goto error;}root = msg->message;/* Get the request first */JANUS_VALIDATE_JSON_OBJECT(root, request_parameters,error_code, error_cause, TRUE,JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT);if(error_code != 0)goto error;json_t *request = json_object_get(root, "request");const char *request_text = json_string_value(request);json_t *event = NULL;gboolean sdp_update = FALSE;if(json_object_get(msg->jsep, "update") != NULL)sdp_update = json_is_true(json_object_get(msg->jsep, "update"));/* 'create' and 'destroy' are handled synchronously: what kind of participant is this session referring to? */if(session->participant_type == janus_videoroom_p_type_none) {JANUS_LOG(LOG_VERB, "Configuring new participant\n");/* Not configured yet, we need to do this now */if(strcasecmp(request_text, "join") && strcasecmp(request_text, "joinandconfigure")) {JANUS_LOG(LOG_ERR, "Invalid request on unconfigured participant\n");error_code = JANUS_VIDEOROOM_ERROR_JOIN_FIRST;g_snprintf(error_cause, 512, "Invalid request on unconfigured participant");goto error;}JANUS_VALIDATE_JSON_OBJECT(root, join_parameters,error_code, error_cause, TRUE,JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT);if(error_code != 0)goto error;janus_mutex_lock(&rooms_mutex);error_code = janus_videoroom_access_room(root, FALSE, TRUE, &videoroom, error_cause, sizeof(error_cause));if(error_code != 0) {janus_mutex_unlock(&rooms_mutex);goto error;}janus_refcount_increase(&videoroom->ref);janus_mutex_lock(&videoroom->mutex);janus_mutex_unlock(&rooms_mutex);json_t *ptype = json_object_get(root, "ptype");const char *ptype_text = json_string_value(ptype);if(!strcasecmp(ptype_text, "publisher")) {JANUS_LOG(LOG_VERB, "Configuring new publisher\n");JANUS_VALIDATE_JSON_OBJECT(root, publisher_parameters,error_code, error_cause, TRUE,JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT);if(error_code != 0) {janus_mutex_unlock(&videoroom->mutex);janus_refcount_decrease(&videoroom->ref);goto error;}/* A token might be required to join */if(videoroom->check_allowed) {json_t *token = json_object_get(root, "token");const char *token_text = token ? json_string_value(token) : NULL;if(token_text == NULL || g_hash_table_lookup(videoroom->allowed, token_text) == NULL) {janus_mutex_unlock(&videoroom->mutex);janus_refcount_decrease(&videoroom->ref);JANUS_LOG(LOG_ERR, "Unauthorized (not in the allowed list)\n");error_code = JANUS_VIDEOROOM_ERROR_UNAUTHORIZED;g_snprintf(error_cause, 512, "Unauthorized (not in the allowed list)");goto error;}}json_t *display = json_object_get(root, "display");const char *display_text = display ? json_string_value(display) : NULL;guint64 user_id = 0;json_t *id = json_object_get(root, "id");if(id) {user_id = json_integer_value(id);if(g_hash_table_lookup(videoroom->participants, &user_id) != NULL) {janus_mutex_unlock(&videoroom->mutex);janus_refcount_decrease(&videoroom->ref);/* User ID already taken */JANUS_LOG(LOG_ERR, "User ID %"SCNu64" already exists\n", user_id);error_code = JANUS_VIDEOROOM_ERROR_ID_EXISTS;g_snprintf(error_cause, 512, "User ID %"SCNu64" already exists", user_id);goto error;}}if(user_id == 0) {/* Generate a random ID */while(user_id == 0) {user_id = janus_random_uint64();if(g_hash_table_lookup(videoroom->participants, &user_id) != NULL) {/* User ID already taken, try another one */user_id = 0;}}}JANUS_LOG(LOG_VERB, "  -- Publisher ID: %"SCNu64"\n", user_id);/* Process the request */json_t *audio = NULL, *video = NULL, *data = NULL,*bitrate = NULL, *record = NULL, *recfile = NULL;if(!strcasecmp(request_text, "joinandconfigure")) {/* Also configure (or publish a new feed) audio/video/bitrate for this new publisher *//* join_parameters were validated earlier. */audio = json_object_get(root, "audio");video = json_object_get(root, "video");data = json_object_get(root, "data");bitrate = json_object_get(root, "bitrate");record = json_object_get(root, "record");recfile = json_object_get(root, "filename");}janus_videoroom_publisher *publisher = g_malloc0(sizeof(janus_videoroom_publisher));publisher->session = session;publisher->room_id = videoroom->room_id;publisher->room = videoroom;videoroom = NULL;publisher->user_id = user_id;publisher->display = display_text ? g_strdup(display_text) : NULL;publisher->sdp = NULL;		/* We'll deal with this later */publisher->audio = FALSE;	/* We'll deal with this later */publisher->video = FALSE;	/* We'll deal with this later */publisher->data = FALSE;	/* We'll deal with this later */publisher->acodec = JANUS_AUDIOCODEC_NONE;	/* We'll deal with this later */publisher->vcodec = JANUS_VIDEOCODEC_NONE;	/* We'll deal with this later */publisher->audio_active = TRUE;publisher->video_active = TRUE;publisher->data_active = TRUE;publisher->recording_active = FALSE;publisher->recording_base = NULL;publisher->arc = NULL;publisher->vrc = NULL;publisher->drc = NULL;janus_mutex_init(&publisher->rec_mutex);publisher->firefox = FALSE;publisher->bitrate = publisher->room->bitrate;publisher->subscribers = NULL;publisher->subscriptions = NULL;janus_mutex_init(&publisher->subscribers_mutex);publisher->audio_pt = -1;	/* We'll deal with this later */publisher->video_pt = -1;	/* We'll deal with this later */publisher->audio_level_extmap_id = 0;publisher->video_orient_extmap_id = 0;publisher->playout_delay_extmap_id = 0;publisher->remb_startup = 4;publisher->remb_latest = 0;publisher->fir_latest = 0;publisher->fir_seq = 0;janus_mutex_init(&publisher->rtp_forwarders_mutex);publisher->rtp_forwarders = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)janus_videoroom_rtp_forwarder_destroy);publisher->srtp_contexts = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, (GDestroyNotify)janus_videoroom_srtp_context_free);publisher->udp_sock = -1;/* Finally, generate a private ID: this is only needed in case the participant* wants to allow the plugin to know which subscriptions belong to them */publisher->pvt_id = 0;while(publisher->pvt_id == 0) {publisher->pvt_id = janus_random_uint32();if(g_hash_table_lookup(publisher->room->private_ids, GUINT_TO_POINTER(publisher->pvt_id)) != NULL) {/* Private ID already taken, try another one */publisher->pvt_id = 0;}g_hash_table_insert(publisher->room->private_ids, GUINT_TO_POINTER(publisher->pvt_id), publisher);}g_atomic_int_set(&publisher->destroyed, 0);janus_refcount_init(&publisher->ref, janus_videoroom_publisher_free);/* In case we also wanted to configure */if(audio) {publisher->audio_active = json_is_true(audio);JANUS_LOG(LOG_VERB, "Setting audio property: %s (room %"SCNu64", user %"SCNu64")\n", publisher->audio_active ? "true" : "false", publisher->room_id, publisher->user_id);}if(video) {publisher->video_active = json_is_true(video);JANUS_LOG(LOG_VERB, "Setting video property: %s (room %"SCNu64", user %"SCNu64")\n", publisher->video_active ? "true" : "false", publisher->room_id, publisher->user_id);}if(data) {publisher->data_active = json_is_true(data);JANUS_LOG(LOG_VERB, "Setting data property: %s (room %"SCNu64", user %"SCNu64")\n", publisher->data_active ? "true" : "false", publisher->room_id, publisher->user_id);}if(bitrate) {publisher->bitrate = json_integer_value(bitrate);JANUS_LOG(LOG_VERB, "Setting video bitrate: %"SCNu32" (room %"SCNu64", user %"SCNu64")\n", publisher->bitrate, publisher->room_id, publisher->user_id);}if(record) {publisher->recording_active = json_is_true(record);JANUS_LOG(LOG_VERB, "Setting record property: %s (room %"SCNu64", user %"SCNu64")\n", publisher->recording_active ? "true" : "false", publisher->room_id, publisher->user_id);}if(recfile) {publisher->recording_base = g_strdup(json_string_value(recfile));JANUS_LOG(LOG_VERB, "Setting recording basename: %s (room %"SCNu64", user %"SCNu64")\n", publisher->recording_base, publisher->room_id, publisher->user_id);}/* Done */janus_mutex_lock(&session->mutex);session->participant_type = janus_videoroom_p_type_publisher;session->participant = publisher;janus_mutex_unlock(&session->mutex);/* Return a list of all available publishers (those with an SDP available, that is) */json_t *list = json_array(), *attendees = NULL;if(publisher->room->notify_joining)attendees = json_array();GHashTableIter iter;gpointer value;janus_refcount_increase(&publisher->ref);g_hash_table_insert(publisher->room->participants, janus_uint64_dup(publisher->user_id), publisher);g_hash_table_iter_init(&iter, publisher->room->participants);while (!g_atomic_int_get(&publisher->room->destroyed) && g_hash_table_iter_next(&iter, NULL, &value)) {janus_videoroom_publisher *p = value;if(p == publisher || !p->sdp || !p->session->started) {/* Check if we're also notifying normal joins and not just publishers */if(p != publisher && publisher->room->notify_joining) {json_t *al = json_object();json_object_set_new(al, "id", json_integer(p->user_id));if(p->display)json_object_set_new(al, "display", json_string(p->display));json_array_append_new(attendees, al);}continue;}json_t *pl = json_object();json_object_set_new(pl, "id", json_integer(p->user_id));if(p->display)json_object_set_new(pl, "display", json_string(p->display));if(p->audio)json_object_set_new(pl, "audio_codec", json_string(janus_audiocodec_name(p->acodec)));if(p->video)json_object_set_new(pl, "video_codec", json_string(janus_videocodec_name(p->vcodec)));if(p->ssrc[0] || p->rid[0])json_object_set_new(pl, "simulcast", json_true());if(p->audio_level_extmap_id > 0)json_object_set_new(pl, "talking", p->talking ? json_true() : json_false());json_array_append_new(list, pl);}event = json_object();json_object_set_new(event, "videoroom", json_string("joined"));json_object_set_new(event, "room", json_integer(publisher->room->room_id));json_object_set_new(event, "description", json_string(publisher->room->room_name));json_object_set_new(event, "id", json_integer(user_id));json_object_set_new(event, "private_id", json_integer(publisher->pvt_id));json_object_set_new(event, "publishers", list);if(attendees != NULL)json_object_set_new(event, "attendees", attendees);/* See if we need to notify about a new participant joined the room (by default, we don't). */janus_videoroom_participant_joining(publisher);/* Also notify event handlers */if(notify_events && gateway->events_is_enabled()) {json_t *info = json_object();json_object_set_new(info, "event", json_string("joined"));json_object_set_new(info, "room", json_integer(publisher->room->room_id));json_object_set_new(info, "id", json_integer(user_id));json_object_set_new(info, "private_id", json_integer(publisher->pvt_id));if(display_text != NULL)json_object_set_new(info, "display", json_string(display_text));gateway->notify_event(&janus_videoroom_plugin, session->handle, info);}janus_mutex_unlock(&publisher->room->mutex);} else if(!strcasecmp(ptype_text, "subscriber") || !strcasecmp(ptype_text, "listener")) {JANUS_LOG(LOG_VERB, "Configuring new subscriber\n");gboolean legacy = !strcasecmp(ptype_text, "listener");if(legacy) {JANUS_LOG(LOG_WARN, "Subscriber is using the legacy 'listener' ptype\n");}/* This is a new subscriber */JANUS_VALIDATE_JSON_OBJECT(root, subscriber_parameters,error_code, error_cause, TRUE,JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT);if(error_code != 0) {janus_mutex_unlock(&videoroom->mutex);goto error;}json_t *feed = json_object_get(root, "feed");guint64 feed_id = json_integer_value(feed);json_t *pvt = json_object_get(root, "private_id");guint64 pvt_id = json_integer_value(pvt);json_t *cpc = json_object_get(root, "close_pc");gboolean close_pc  = cpc ? json_is_true(cpc) : TRUE;json_t *audio = json_object_get(root, "audio");json_t *video = json_object_get(root, "video");json_t *data = json_object_get(root, "data");json_t *offer_audio = json_object_get(root, "offer_audio");json_t *offer_video = json_object_get(root, "offer_video");json_t *offer_data = json_object_get(root, "offer_data");json_t *spatial = json_object_get(root, "spatial_layer");json_t *sc_substream = json_object_get(root, "substream");if(json_integer_value(spatial) < 0 || json_integer_value(spatial) > 2 ||json_integer_value(sc_substream) < 0 || json_integer_value(sc_substream) > 2) {JANUS_LOG(LOG_ERR, "Invalid element (substream/spatial_layer should be 0, 1 or 2)\n");error_code = JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT;g_snprintf(error_cause, 512, "Invalid value (substream/spatial_layer should be 0, 1 or 2)");janus_mutex_unlock(&videoroom->mutex);goto error;}json_t *temporal = json_object_get(root, "temporal_layer");json_t *sc_temporal = json_object_get(root, "temporal");if(json_integer_value(temporal) < 0 || json_integer_value(temporal) > 2 ||json_integer_value(sc_temporal) < 0 || json_integer_value(sc_temporal) > 2) {JANUS_LOG(LOG_ERR, "Invalid element (temporal/temporal_layer should be 0, 1 or 2)\n");error_code = JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT;g_snprintf(error_cause, 512, "Invalid value (temporal/temporal_layer should be 0, 1 or 2)");janus_mutex_unlock(&videoroom->mutex);goto error;}janus_videoroom_publisher *owner = NULL;janus_videoroom_publisher *publisher = g_hash_table_lookup(videoroom->participants, &feed_id);if(publisher == NULL || g_atomic_int_get(&publisher->destroyed) || publisher->sdp == NULL) {JANUS_LOG(LOG_ERR, "No such feed (%"SCNu64")\n", feed_id);error_code = JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED;g_snprintf(error_cause, 512, "No such feed (%"SCNu64")", feed_id);janus_mutex_unlock(&videoroom->mutex);goto error;} else {/* Increase the refcount before unlocking so that nobody can remove and free the publisher in the meantime. */janus_refcount_increase(&publisher->ref);janus_refcount_increase(&publisher->session->ref);/* First of all, let's check if this room requires valid private_id values */if(videoroom->require_pvtid) {/* It does, let's make sure this subscription complies */owner = g_hash_table_lookup(videoroom->private_ids, GUINT_TO_POINTER(pvt_id));if(pvt_id == 0 || owner == NULL) {JANUS_LOG(LOG_ERR, "Unauthorized (this room requires a valid private_id)\n");error_code = JANUS_VIDEOROOM_ERROR_UNAUTHORIZED;g_snprintf(error_cause, 512, "Unauthorized (this room requires a valid private_id)");janus_mutex_unlock(&videoroom->mutex);goto error;}janus_refcount_increase(&owner->ref);janus_refcount_increase(&owner->session->ref);}janus_mutex_unlock(&videoroom->mutex);janus_videoroom_subscriber *subscriber = g_malloc0(sizeof(janus_videoroom_subscriber));subscriber->session = session;subscriber->room_id = videoroom->room_id;subscriber->room = videoroom;videoroom = NULL;subscriber->feed = publisher;subscriber->pvt_id = pvt_id;subscriber->close_pc = close_pc;/* Initialize the subscriber context */janus_rtp_switching_context_reset(&subscriber->context);subscriber->audio_offered = offer_audio ? json_is_true(offer_audio) : TRUE;	/* True by default */subscriber->video_offered = offer_video ? json_is_true(offer_video) : TRUE;	/* True by default */subscriber->data_offered = offer_data ? json_is_true(offer_data) : TRUE;	/* True by default */if((!publisher->audio || !subscriber->audio_offered) &&(!publisher->video || !subscriber->video_offered) &&(!publisher->data || !subscriber->data_offered)) {g_free(subscriber);if (owner) {janus_refcount_decrease(&owner->session->ref);janus_refcount_decrease(&owner->ref);}janus_refcount_decrease(&publisher->session->ref);janus_refcount_decrease(&publisher->ref);JANUS_LOG(LOG_ERR, "Can't offer an SDP with no audio, video or data\n");error_code = JANUS_VIDEOROOM_ERROR_INVALID_SDP;g_snprintf(error_cause, 512, "Can't offer an SDP with no audio, video or data");goto error;}subscriber->audio = audio ? json_is_true(audio) : TRUE;	/* True by default */if(!publisher->audio || !subscriber->audio_offered)subscriber->audio = FALSE;	/* ... unless the publisher isn't sending any audio or we're skipping it */subscriber->video = video ? json_is_true(video) : TRUE;	/* True by default */if(!publisher->video || !subscriber->video_offered)subscriber->video = FALSE;	/* ... unless the publisher isn't sending any video or we're skipping it */subscriber->data = data ? json_is_true(data) : TRUE;	/* True by default */if(!publisher->data || !subscriber->data_offered)subscriber->data = FALSE;	/* ... unless the publisher isn't sending any data or we're skipping it */subscriber->paused = TRUE;	/* We need an explicit start from the subscriber */g_atomic_int_set(&subscriber->destroyed, 0);janus_refcount_init(&subscriber->ref, janus_videoroom_subscriber_free);janus_refcount_increase(&subscriber->ref);	/* The publisher references the new subscriber too *//* Check if a simulcasting-related request is involved */janus_rtp_simulcasting_context_reset(&subscriber->sim_context);subscriber->sim_context.rid_ext_id = publisher->rid_extmap_id;subscriber->sim_context.substream_target = sc_substream ? json_integer_value(sc_substream) : 2;subscriber->sim_context.templayer_target = sc_temporal ? json_integer_value(sc_temporal) : 2;janus_vp8_simulcast_context_reset(&subscriber->vp8_context);/* Check if a VP9 SVC-related request is involved */if(subscriber->room->do_svc) {subscriber->spatial_layer = -1;subscriber->target_spatial_layer = spatial ? json_integer_value(spatial) : 2;subscriber->temporal_layer = -1;subscriber->target_temporal_layer = temporal ? json_integer_value(temporal) : 2;}session->participant = subscriber;janus_mutex_lock(&publisher->subscribers_mutex);publisher->subscribers = g_slist_append(publisher->subscribers, subscriber);janus_mutex_unlock(&publisher->subscribers_mutex);if(owner != NULL) {/* Note: we should refcount these subscription-publisher mappings as well */janus_mutex_lock(&owner->subscribers_mutex);owner->subscriptions = g_slist_append(owner->subscriptions, subscriber);janus_mutex_unlock(&owner->subscribers_mutex);/* Done adding the subscription, owner is safe to be released */janus_refcount_decrease(&owner->session->ref);janus_refcount_decrease(&owner->ref);}event = json_object();json_object_set_new(event, "videoroom", json_string("attached"));json_object_set_new(event, "room", json_integer(subscriber->room_id));json_object_set_new(event, "id", json_integer(feed_id));if(publisher->display)json_object_set_new(event, "display", json_string(publisher->display));if(legacy)json_object_set_new(event, "warning", json_string("Deprecated use of 'listener' ptype, update to the new 'subscriber' ASAP"));session->participant_type = janus_videoroom_p_type_subscriber;JANUS_LOG(LOG_VERB, "Preparing JSON event as a reply\n");/* Negotiate by sending the selected publisher SDP back */janus_mutex_lock(&publisher->subscribers_mutex);if(publisher->sdp != NULL) {/* Check if there's something the original SDP has that we should remove */janus_sdp *offer = janus_sdp_parse(publisher->sdp, NULL, 0);subscriber->sdp = offer;session->sdp_version = 1;subscriber->sdp->o_version = session->sdp_version;if((publisher->audio && !subscriber->audio_offered) ||(publisher->video && !subscriber->video_offered) ||(publisher->data && !subscriber->data_offered)) {JANUS_LOG(LOG_VERB, "Munging SDP offer to adapt it to the subscriber's requirements\n");if(publisher->audio && !subscriber->audio_offered)janus_sdp_mline_remove(offer, JANUS_SDP_AUDIO);if(publisher->video && !subscriber->video_offered)janus_sdp_mline_remove(offer, JANUS_SDP_VIDEO);if(publisher->data && !subscriber->data_offered)janus_sdp_mline_remove(offer, JANUS_SDP_APPLICATION);}char* sdp = janus_sdp_write(offer);json_t *jsep = json_pack("{ssss}", "type", "offer", "sdp", sdp);g_free(sdp);janus_mutex_unlock(&publisher->subscribers_mutex);/* How long will the Janus core take to push the event? */g_atomic_int_set(&session->hangingup, 0);gint64 start = janus_get_monotonic_time();int res = gateway->push_event(msg->handle, &janus_videoroom_plugin, msg->transaction, event, jsep);JANUS_LOG(LOG_VERB, "  >> Pushing event: %d (took %"SCNu64" us)\n", res, janus_get_monotonic_time()-start);json_decref(event);json_decref(jsep);janus_videoroom_message_free(msg);/* Also notify event handlers */if(notify_events && gateway->events_is_enabled()) {json_t *info = json_object();json_object_set_new(info, "event", json_string("subscribing"));json_object_set_new(info, "room", json_integer(subscriber->room_id));json_object_set_new(info, "feed", json_integer(feed_id));json_object_set_new(info, "private_id", json_integer(pvt_id));gateway->notify_event(&janus_videoroom_plugin, session->handle, info);}continue;}janus_mutex_unlock(&publisher->subscribers_mutex);}} else {janus_mutex_unlock(&videoroom->mutex);JANUS_LOG(LOG_ERR, "Invalid element (ptype)\n");error_code = JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT;g_snprintf(error_cause, 512, "Invalid element (ptype)");goto error;}} else if(session->participant_type == janus_videoroom_p_type_publisher) {/* Handle this publisher */participant = janus_videoroom_session_get_publisher(session);if(participant == NULL) {JANUS_LOG(LOG_ERR, "Invalid participant instance\n");error_code = JANUS_VIDEOROOM_ERROR_UNKNOWN_ERROR;g_snprintf(error_cause, 512, "Invalid participant instance");goto error;}if(participant->room == NULL) {janus_refcount_decrease(&participant->ref);JANUS_LOG(LOG_ERR, "No such room\n");error_code = JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM;g_snprintf(error_cause, 512, "No such room");goto error;}if(!strcasecmp(request_text, "join") || !strcasecmp(request_text, "joinandconfigure")) {janus_refcount_decrease(&participant->ref);JANUS_LOG(LOG_ERR, "Already in as a publisher on this handle\n");error_code = JANUS_VIDEOROOM_ERROR_ALREADY_JOINED;g_snprintf(error_cause, 512, "Already in as a publisher on this handle");goto error;} else if(!strcasecmp(request_text, "configure") || !strcasecmp(request_text, "publish")) {if(!strcasecmp(request_text, "publish") && participant->sdp) {janus_refcount_decrease(&participant->ref);JANUS_LOG(LOG_ERR, "Can't publish, already published\n");error_code = JANUS_VIDEOROOM_ERROR_ALREADY_PUBLISHED;g_snprintf(error_cause, 512, "Can't publish, already published");goto error;}if(participant->kicked) {janus_refcount_decrease(&participant->ref);JANUS_LOG(LOG_ERR, "Unauthorized, you have been kicked\n");error_code = JANUS_VIDEOROOM_ERROR_UNAUTHORIZED;g_snprintf(error_cause, 512, "Unauthorized, you have been kicked");goto error;}/* Configure (or publish a new feed) audio/video/bitrate for this publisher */JANUS_VALIDATE_JSON_OBJECT(root, publish_parameters,error_code, error_cause, TRUE,JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT);if(error_code != 0) {janus_refcount_decrease(&participant->ref);goto error;}json_t *audio = json_object_get(root, "audio");json_t *audiocodec = json_object_get(root, "audiocodec");json_t *video = json_object_get(root, "video");json_t *videocodec = json_object_get(root, "videocodec");json_t *data = json_object_get(root, "data");json_t *bitrate = json_object_get(root, "bitrate");json_t *keyframe = json_object_get(root, "keyframe");json_t *record = json_object_get(root, "record");json_t *recfile = json_object_get(root, "filename");json_t *display = json_object_get(root, "display");json_t *update = json_object_get(root, "update");if(audio) {gboolean audio_active = json_is_true(audio);if(session->started && audio_active && !participant->audio_active) {/* Audio was just resumed, try resetting the RTP headers for viewers */janus_mutex_lock(&participant->subscribers_mutex);GSList *ps = participant->subscribers;while(ps) {janus_videoroom_subscriber *l = (janus_videoroom_subscriber *)ps->data;if(l)l->context.a_seq_reset = TRUE;ps = ps->next;}janus_mutex_unlock(&participant->subscribers_mutex);}participant->audio_active = audio_active;JANUS_LOG(LOG_VERB, "Setting audio property: %s (room %"SCNu64", user %"SCNu64")\n", participant->audio_active ? "true" : "false", participant->room_id, participant->user_id);}if(audiocodec && json_string_value(json_object_get(msg->jsep, "sdp")) != NULL) {/* The participant would like to use an audio codec in particular */janus_audiocodec acodec = janus_audiocodec_from_name(json_string_value(audiocodec));if(acodec == JANUS_AUDIOCODEC_NONE ||(acodec != participant->room->acodec[0] &&acodec != participant->room->acodec[1] &&acodec != participant->room->acodec[2])) {JANUS_LOG(LOG_ERR, "Participant asked for audio codec '%s', but it's not allowed (room %"SCNu64", user %"SCNu64")\n",json_string_value(audiocodec), participant->room_id, participant->user_id);janus_refcount_decrease(&participant->ref);error_code = JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT;g_snprintf(error_cause, 512, "Audio codec unavailable in this room");goto error;}participant->acodec = acodec;JANUS_LOG(LOG_VERB, "Participant asked for audio codec '%s' (room %"SCNu64", user %"SCNu64")\n",json_string_value(audiocodec), participant->room_id, participant->user_id);}if(video) {gboolean video_active = json_is_true(video);if(session->started && video_active && !participant->video_active) {/* Video was just resumed, try resetting the RTP headers for viewers */janus_mutex_lock(&participant->subscribers_mutex);GSList *ps = participant->subscribers;while(ps) {janus_videoroom_subscriber *l = (janus_videoroom_subscriber *)ps->data;if(l)l->context.v_seq_reset = TRUE;ps = ps->next;}janus_mutex_unlock(&participant->subscribers_mutex);}participant->video_active = video_active;JANUS_LOG(LOG_VERB, "Setting video property: %s (room %"SCNu64", user %"SCNu64")\n", participant->video_active ? "true" : "false", participant->room_id, participant->user_id);}if(videocodec && json_string_value(json_object_get(msg->jsep, "sdp")) != NULL) {/* The participant would like to use a video codec in particular */janus_videocodec vcodec = janus_videocodec_from_name(json_string_value(videocodec));if(vcodec == JANUS_VIDEOCODEC_NONE ||(vcodec != participant->room->vcodec[0] &&vcodec != participant->room->vcodec[1] &&vcodec != participant->room->vcodec[2])) {JANUS_LOG(LOG_ERR, "Participant asked for video codec '%s', but it's not allowed (room %"SCNu64", user %"SCNu64")\n",json_string_value(videocodec), participant->room_id, participant->user_id);janus_refcount_decrease(&participant->ref);error_code = JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT;g_snprintf(error_cause, 512, "Video codec unavailable in this room");goto error;}participant->vcodec = vcodec;JANUS_LOG(LOG_VERB, "Participant asked for video codec '%s' (room %"SCNu64", user %"SCNu64")\n",json_string_value(videocodec), participant->room_id, participant->user_id);}if(data) {gboolean data_active = json_is_true(data);participant->data_active = data_active;JANUS_LOG(LOG_VERB, "Setting data property: %s (room %"SCNu64", user %"SCNu64")\n", participant->data_active ? "true" : "false", participant->room_id, participant->user_id);}if(bitrate) {participant->bitrate = json_integer_value(bitrate);JANUS_LOG(LOG_VERB, "Setting video bitrate: %"SCNu32" (room %"SCNu64", user %"SCNu64")\n", participant->bitrate, participant->room_id, participant->user_id);/* Send a new REMB */if(session->started)participant->remb_latest = janus_get_monotonic_time();char rtcpbuf[24];janus_rtcp_remb((char *)(&rtcpbuf), 24, participant->bitrate);gateway->relay_rtcp(msg->handle, 1, rtcpbuf, 24);}if(keyframe && json_is_true(keyframe)) {/* Send a FIR */janus_videoroom_reqfir(participant, "Keyframe request");}janus_mutex_lock(&participant->rec_mutex);gboolean prev_recording_active = participant->recording_active;if(record) {participant->recording_active = json_is_true(record);JANUS_LOG(LOG_VERB, "Setting record property: %s (room %"SCNu64", user %"SCNu64")\n", participant->recording_active ? "true" : "false", participant->room_id, participant->user_id);}if(recfile) {participant->recording_base = g_strdup(json_string_value(recfile));JANUS_LOG(LOG_VERB, "Setting recording basename: %s (room %"SCNu64", user %"SCNu64")\n", participant->recording_base, participant->room_id, participant->user_id);}/* Do we need to do something with the recordings right now? */if(participant->recording_active != prev_recording_active) {/* Something changed */if(!participant->recording_active) {/* Not recording (anymore?) */janus_videoroom_recorder_close(participant);} else if(participant->recording_active && participant->sdp) {/* We've started recording, send a PLI/FIR and go on */janus_videoroom_recorder_create(participant, strstr(participant->sdp, "m=audio") != NULL,strstr(participant->sdp, "m=video") != NULL,strstr(participant->sdp, "m=application") != NULL);if(strstr(participant->sdp, "m=video")) {/* Send a FIR */janus_videoroom_reqfir(participant, "Recording video");}}}janus_mutex_unlock(&participant->rec_mutex);if(display) {janus_mutex_lock(&participant->room->mutex);char *old_display = participant->display;char *new_display = g_strdup(json_string_value(display));participant->display = new_display;g_free(old_display);json_t *display_event = json_object();json_object_set_new(display_event, "videoroom", json_string("event"));json_object_set_new(display_event, "id", json_integer(participant->user_id));json_object_set_new(display_event, "display", json_string(participant->display));if(participant->room && !participant->room->destroyed) {janus_videoroom_notify_participants(participant, display_event);}janus_mutex_unlock(&participant->room->mutex);json_decref(display_event);}/* A renegotiation may be taking place */gboolean do_update = update ? json_is_true(update) : FALSE;if(do_update && !sdp_update) {JANUS_LOG(LOG_WARN, "Got an 'update' request, but no SDP update? Ignoring...\n");}/* Done */event = json_object();json_object_set_new(event, "videoroom", json_string("event"));json_object_set_new(event, "room", json_integer(participant->room_id));json_object_set_new(event, "configured", json_string("ok"));/* Also notify event handlers */if(notify_events && gateway->events_is_enabled()) {json_t *info = json_object();json_object_set_new(info, "event", json_string("configured"));json_object_set_new(info, "room", json_integer(participant->room_id));json_object_set_new(info, "id", json_integer(participant->user_id));json_object_set_new(info, "audio_active", participant->audio_active ? json_true() : json_false());json_object_set_new(info, "video_active", participant->video_active ? json_true() : json_false());json_object_set_new(info, "data_active", participant->data_active ? json_true() : json_false());json_object_set_new(info, "bitrate", json_integer(participant->bitrate));if(participant->arc || participant->vrc || participant->drc) {json_t *recording = json_object();if(participant->arc && participant->arc->filename)json_object_set_new(recording, "audio", json_string(participant->arc->filename));if(participant->vrc && participant->vrc->filename)json_object_set_new(recording, "video", json_string(participant->vrc->filename));if(participant->drc && participant->drc->filename)json_object_set_new(recording, "data", json_string(participant->drc->filename));json_object_set_new(info, "recording", recording);}gateway->notify_event(&janus_videoroom_plugin, session->handle, info);}} else if(!strcasecmp(request_text, "unpublish")) {/* This participant wants to unpublish */if(!participant->sdp) {janus_refcount_decrease(&participant->ref);JANUS_LOG(LOG_ERR, "Can't unpublish, not published\n");error_code = JANUS_VIDEOROOM_ERROR_NOT_PUBLISHED;g_snprintf(error_cause, 512, "Can't unpublish, not published");goto error;}/* Tell the core to tear down the PeerConnection, hangup_media will do the rest */janus_videoroom_hangup_media(session->handle);gateway->close_pc(session->handle);/* Done */event = json_object();json_object_set_new(event, "videoroom", json_string("event"));json_object_set_new(event, "room", json_integer(participant->room_id));json_object_set_new(event, "unpublished", json_string("ok"));} else if(!strcasecmp(request_text, "leave")) {/* Prepare an event to confirm the request */event = json_object();json_object_set_new(event, "videoroom", json_string("event"));json_object_set_new(event, "room", json_integer(participant->room_id));json_object_set_new(event, "leaving", json_string("ok"));/* This publisher is leaving, tell everybody */janus_videoroom_leave_or_unpublish(participant, TRUE, FALSE);/* Done */participant->audio_active = FALSE;participant->video_active = FALSE;participant->data_active = FALSE;session->started = FALSE;//~ session->destroy = TRUE;} else {janus_refcount_decrease(&participant->ref);JANUS_LOG(LOG_ERR, "Unknown request '%s'\n", request_text);error_code = JANUS_VIDEOROOM_ERROR_INVALID_REQUEST;g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);goto error;}janus_refcount_decrease(&participant->ref);} else if(session->participant_type == janus_videoroom_p_type_subscriber) {/* Handle this subscriber */janus_videoroom_subscriber *subscriber = (janus_videoroom_subscriber *)session->participant;if(subscriber == NULL) {JANUS_LOG(LOG_ERR, "Invalid subscriber instance\n");error_code = JANUS_VIDEOROOM_ERROR_UNKNOWN_ERROR;g_snprintf(error_cause, 512, "Invalid subscriber instance");goto error;}if(subscriber->room == NULL) {JANUS_LOG(LOG_ERR, "No such room\n");error_code = JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM;g_snprintf(error_cause, 512, "No such room");goto error;}if(!strcasecmp(request_text, "join")) {JANUS_LOG(LOG_ERR, "Already in as a subscriber on this handle\n");error_code = JANUS_VIDEOROOM_ERROR_ALREADY_JOINED;g_snprintf(error_cause, 512, "Already in as a subscriber on this handle");goto error;} else if(!strcasecmp(request_text, "start")) {/* Start/restart receiving the publisher streams */if(subscriber->paused && msg->jsep == NULL) {/* This is just resuming a paused stream, reset the RTP sequence numbers */subscriber->context.a_seq_reset = TRUE;subscriber->context.v_seq_reset = TRUE;}subscriber->paused = FALSE;event = json_object();json_object_set_new(event, "videoroom", json_string("event"));json_object_set_new(event, "room", json_integer(subscriber->room_id));json_object_set_new(event, "started", json_string("ok"));} else if(!strcasecmp(request_text, "configure")) {JANUS_VALIDATE_JSON_OBJECT(root, configure_parameters,error_code, error_cause, TRUE,JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT);if(error_code != 0)goto error;if(subscriber->kicked) {JANUS_LOG(LOG_ERR, "Unauthorized, you have been kicked\n");error_code = JANUS_VIDEOROOM_ERROR_UNAUTHORIZED;g_snprintf(error_cause, 512, "Unauthorized, you have been kicked");goto error;}json_t *audio = json_object_get(root, "audio");json_t *video = json_object_get(root, "video");json_t *data = json_object_get(root, "data");json_t *restart = json_object_get(root, "restart");json_t *update = json_object_get(root, "update");json_t *spatial = json_object_get(root, "spatial_layer");json_t *sc_substream = json_object_get(root, "substream");if(json_integer_value(spatial) < 0 || json_integer_value(spatial) > 2 ||json_integer_value(sc_substream) < 0 || json_integer_value(sc_substream) > 2) {JANUS_LOG(LOG_ERR, "Invalid element (substream/spatial_layer should be 0, 1 or 2)\n");error_code = JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT;g_snprintf(error_cause, 512, "Invalid value (substream/spatial_layer should be 0, 1 or 2)");goto error;}json_t *temporal = json_object_get(root, "temporal_layer");json_t *sc_temporal = json_object_get(root, "temporal");if(json_integer_value(temporal) < 0 || json_integer_value(temporal) > 2 ||json_integer_value(sc_temporal) < 0 || json_integer_value(sc_temporal) > 2) {JANUS_LOG(LOG_ERR, "Invalid element (temporal/temporal_layer should be 0, 1 or 2)\n");error_code = JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT;g_snprintf(error_cause, 512, "Invalid value (temporal/temporal_layer should be 0, 1 or 2)");goto error;}/* Update the audio/video/data flags, if set */janus_videoroom_publisher *publisher = subscriber->feed;if(publisher) {if(audio && publisher->audio && subscriber->audio_offered) {gboolean oldaudio = subscriber->audio;gboolean newaudio = json_is_true(audio);if(!oldaudio && newaudio) {/* Audio just resumed, reset the RTP sequence numbers */subscriber->context.a_seq_reset = TRUE;}subscriber->audio = newaudio;}if(video && publisher->video && subscriber->video_offered) {gboolean oldvideo = subscriber->video;gboolean newvideo = json_is_true(video);if(!oldvideo && newvideo) {/* Video just resumed, reset the RTP sequence numbers */subscriber->context.v_seq_reset = TRUE;}subscriber->video = newvideo;if(subscriber->video) {/* Send a FIR */janus_videoroom_reqfir(publisher, "Restoring video for subscriber");}}if(data && publisher->data && subscriber->data_offered)subscriber->data = json_is_true(data);/* Check if a simulcasting-related request is involved */if(sc_substream && (publisher->ssrc[0] != 0 || publisher->rid[0] != NULL)) {subscriber->sim_context.substream_target = json_integer_value(sc_substream);JANUS_LOG(LOG_VERB, "Setting video SSRC to let through (simulcast): %"SCNu32" (index %d, was %d)\n",publisher->ssrc[subscriber->sim_context.substream],subscriber->sim_context.substream_target,subscriber->sim_context.substream);if(subscriber->sim_context.substream_target == subscriber->sim_context.substream) {/* No need to do anything, we're already getting the right substream, so notify the user */json_t *event = json_object();json_object_set_new(event, "videoroom", json_string("event"));json_object_set_new(event, "room", json_integer(subscriber->room_id));json_object_set_new(event, "substream", json_integer(subscriber->sim_context.substream));gateway->push_event(msg->handle, &janus_videoroom_plugin, NULL, event, NULL);json_decref(event);} else {/* Send a FIR */janus_videoroom_reqfir(publisher, "Simulcasting substream change");}}if(subscriber->feed && subscriber->feed->vcodec == JANUS_VIDEOCODEC_VP8 &&sc_temporal && (publisher->ssrc[0] != 0 || publisher->rid[0] != NULL)) {subscriber->sim_context.templayer_target = json_integer_value(sc_temporal);JANUS_LOG(LOG_VERB, "Setting video temporal layer to let through (simulcast): %d (was %d)\n",subscriber->sim_context.templayer_target, subscriber->sim_context.templayer);if(subscriber->sim_context.templayer_target == subscriber->sim_context.templayer) {/* No need to do anything, we're already getting the right temporal, so notify the user */json_t *event = json_object();json_object_set_new(event, "videoroom", json_string("event"));json_object_set_new(event, "room", json_integer(subscriber->room_id));json_object_set_new(event, "temporal", json_integer(subscriber->sim_context.templayer));gateway->push_event(msg->handle, &janus_videoroom_plugin, NULL, event, NULL);json_decref(event);} else {/* Send a FIR */janus_videoroom_reqfir(publisher, "Simulcasting temporal layer change");}}}if(subscriber->room->do_svc) {/* Also check if the viewer is trying to configure a layer change */if(spatial) {int spatial_layer = json_integer_value(spatial);if(spatial_layer > 1) {JANUS_LOG(LOG_WARN, "Spatial layer higher than 1, it will be ignored if using EnabledByFlag_2SL3TL\n");}if(spatial_layer == subscriber->spatial_layer) {/* No need to do anything, we're already getting the right spatial layer, so notify the user */json_t *event = json_object();json_object_set_new(event, "videoroom", json_string("event"));json_object_set_new(event, "room", json_integer(subscriber->room_id));json_object_set_new(event, "spatial_layer", json_integer(subscriber->spatial_layer));gateway->push_event(msg->handle, &janus_videoroom_plugin, NULL, event, NULL);json_decref(event);} else if(spatial_layer != subscriber->target_spatial_layer) {/* Send a FIR to the new RTP forward publisher */janus_videoroom_reqfir(publisher, "Need to downscale spatially");}subscriber->target_spatial_layer = spatial_layer;}if(temporal) {int temporal_layer = json_integer_value(temporal);if(temporal_layer > 2) {JANUS_LOG(LOG_WARN, "Temporal layer higher than 2, will probably be ignored\n");}if(temporal_layer == subscriber->temporal_layer) {/* No need to do anything, we're already getting the right temporal layer, so notify the user */json_t *event = json_object();json_object_set_new(event, "videoroom", json_string("event"));json_object_set_new(event, "room", json_integer(subscriber->room_id));json_object_set_new(event, "temporal_layer", json_integer(subscriber->temporal_layer));gateway->push_event(msg->handle, &janus_videoroom_plugin, NULL, event, NULL);json_decref(event);}subscriber->target_temporal_layer = temporal_layer;}}event = json_object();json_object_set_new(event, "videoroom", json_string("event"));json_object_set_new(event, "room", json_integer(subscriber->room_id));json_object_set_new(event, "configured", json_string("ok"));/* The user may be interested in an ICE restart */gboolean do_restart = restart ? json_is_true(restart) : FALSE;gboolean do_update = update ? json_is_true(update) : FALSE;if(sdp_update || do_restart || do_update) {/* Negotiate by sending the selected publisher SDP back, and/or force an ICE restart */if(publisher->sdp != NULL) {char temp_error[512];JANUS_LOG(LOG_VERB, "Munging SDP offer (update) to adapt it to the subscriber's requirements\n");janus_sdp *offer = janus_sdp_parse(publisher->sdp, temp_error, sizeof(temp_error));if(publisher->audio && !subscriber->audio_offered)janus_sdp_mline_remove(offer, JANUS_SDP_AUDIO);if(publisher->video && !subscriber->video_offered)janus_sdp_mline_remove(offer, JANUS_SDP_VIDEO);if(publisher->data && !subscriber->data_offered)janus_sdp_mline_remove(offer, JANUS_SDP_APPLICATION);/* This is an update, check if we need to update */janus_sdp_mtype mtype[3] = { JANUS_SDP_AUDIO, JANUS_SDP_VIDEO, JANUS_SDP_APPLICATION };int i=0;for(i=0; i<3; i++) {janus_sdp_mline *m = janus_sdp_mline_find(subscriber->sdp, mtype[i]);janus_sdp_mline *m_new = janus_sdp_mline_find(offer, mtype[i]);if(m != NULL && m->port > 0 && m->direction != JANUS_SDP_INACTIVE) {/* We have such an m-line and it's active, should it be changed? */if(m_new == NULL || m_new->port == 0 || m_new->direction == JANUS_SDP_INACTIVE) {/* Turn the m-line to inactive */m->port = 0;m->direction = JANUS_SDP_INACTIVE;}} else {/* We don't have such an m-line or it's disabled, should it be added/enabled? */if(m_new != NULL && m_new->port > 0 && m_new->direction != JANUS_SDP_INACTIVE) {if(m != NULL) {m->port = m_new->port;m->direction = m_new->direction;} else {/* Add the new m-line */m = janus_sdp_mline_create(m_new->type, m_new->port, m_new->proto, m_new->direction);subscriber->sdp->m_lines = g_list_append(subscriber->sdp->m_lines, m);}/* Copy/replace the other properties */m->c_ipv4 = m_new->c_ipv4;if(m_new->c_addr && (m->c_addr == NULL || strcmp(m->c_addr, m_new->c_addr))) {g_free(m->c_addr);m->c_addr = g_strdup(m_new->c_addr);}if(m_new->b_name && (m->b_name == NULL || strcmp(m->b_name, m_new->b_name))) {g_free(m->b_name);m->b_name = g_strdup(m_new->b_name);}m->b_value = m_new->b_value;g_list_free_full(m->fmts, (GDestroyNotify)g_free);m->fmts = NULL;GList *fmts = m_new->fmts;while(fmts) {char *fmt = (char *)fmts->data;if(fmt)m->fmts = g_list_append(m->fmts,g_strdup(fmt));fmts = fmts->next;}g_list_free(m->ptypes);m->ptypes = g_list_copy(m_new->ptypes);g_list_free_full(m->attributes, (GDestroyNotify)janus_sdp_attribute_destroy);m->attributes = NULL;GList *attr = m_new->attributes;while(attr) {janus_sdp_attribute *a = (janus_sdp_attribute *)attr->data;janus_sdp_attribute_add_to_mline(m,janus_sdp_attribute_create(a->name, "%s", a->value));attr = attr->next;}}}}janus_sdp_destroy(offer);session->sdp_version++;subscriber->sdp->o_version = session->sdp_version;char *newsdp = janus_sdp_write(subscriber->sdp);JANUS_LOG(LOG_VERB, "Updating subscriber:\n%s\n", newsdp);json_t *jsep = json_pack("{ssss}", "type", "offer", "sdp", newsdp);if(do_restart)json_object_set_new(jsep, "restart", json_true());/* How long will the Janus core take to push the event? */gint64 start = janus_get_monotonic_time();int res = gateway->push_event(msg->handle, &janus_videoroom_plugin, msg->transaction, event, jsep);JANUS_LOG(LOG_VERB, "  >> Pushing event: %d (took %"SCNu64" us)\n", res, janus_get_monotonic_time()-start);json_decref(event);json_decref(jsep);g_free(newsdp);/* Any update in the media directions? */subscriber->audio = publisher->audio && subscriber->audio_offered;subscriber->video = publisher->video && subscriber->video_offered;subscriber->data = publisher->data && subscriber->data_offered;/* Done */janus_videoroom_message_free(msg);continue;}}} else if(!strcasecmp(request_text, "pause")) {/* Stop receiving the publisher streams for a while */subscriber->paused = TRUE;event = json_object();json_object_set_new(event, "videoroom", json_string("event"));json_object_set_new(event, "room", json_integer(subscriber->room_id));json_object_set_new(event, "paused", json_string("ok"));} else if(!strcasecmp(request_text, "switch")) {/* This subscriber wants to switch to a different publisher */JANUS_VALIDATE_JSON_OBJECT(root, subscriber_parameters,error_code, error_cause, TRUE,JANUS_VIDEOROOM_ERROR_MISSING_ELEMENT, JANUS_VIDEOROOM_ERROR_INVALID_ELEMENT);if(error_code != 0)goto error;json_t *feed = json_object_get(root, "feed");guint64 feed_id = json_integer_value(feed);json_t *audio = json_object_get(root, "audio");json_t *video = json_object_get(root, "video");json_t *data = json_object_get(root, "data");if(!subscriber->room) {JANUS_LOG(LOG_ERR, "Room Destroyed \n");error_code = JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM;g_snprintf(error_cause, 512, "No such room ");goto error;}if(g_atomic_int_get(&subscriber->destroyed)) {JANUS_LOG(LOG_ERR, "Room Destroyed (%"SCNu64")\n", subscriber->room_id);error_code = JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM;g_snprintf(error_cause, 512, "No such room (%"SCNu64")", subscriber->room_id);goto error;}janus_mutex_lock(&subscriber->room->mutex);janus_videoroom_publisher *publisher = g_hash_table_lookup(subscriber->room->participants, &feed_id);if(publisher == NULL || g_atomic_int_get(&publisher->destroyed) || publisher->sdp == NULL) {JANUS_LOG(LOG_ERR, "No such feed (%"SCNu64")\n", feed_id);error_code = JANUS_VIDEOROOM_ERROR_NO_SUCH_FEED;g_snprintf(error_cause, 512, "No such feed (%"SCNu64")", feed_id);janus_mutex_unlock(&subscriber->room->mutex);goto error;}janus_refcount_increase(&publisher->ref);janus_refcount_increase(&publisher->session->ref);janus_mutex_unlock(&subscriber->room->mutex);gboolean paused = subscriber->paused;subscriber->paused = TRUE;/* Unsubscribe from the previous publisher */janus_videoroom_publisher *prev_feed = subscriber->feed;if(prev_feed) {/* ... but make sure the codecs are compliant first */if(publisher->acodec != prev_feed->acodec || publisher->vcodec != prev_feed->vcodec) {janus_refcount_decrease(&publisher->session->ref);janus_refcount_decrease(&publisher->ref);subscriber->paused = paused;JANUS_LOG(LOG_ERR, "The two publishers are not using the same codecs, can't switch\n");error_code = JANUS_VIDEOROOM_ERROR_INVALID_SDP;g_snprintf(error_cause, 512, "The two publishers are not using the same codecs, can't switch");goto error;}/* Go on */janus_mutex_lock(&prev_feed->subscribers_mutex);prev_feed->subscribers = g_slist_remove(prev_feed->subscribers, subscriber);janus_mutex_unlock(&prev_feed->subscribers_mutex);janus_refcount_decrease(&prev_feed->session->ref);g_clear_pointer(&subscriber->feed, janus_videoroom_publisher_dereference);}/* Subscribe to the new one */subscriber->audio = audio ? json_is_true(audio) : TRUE;	/* True by default */if(!publisher->audio)subscriber->audio = FALSE;	/* ... unless the publisher isn't sending any audio */subscriber->video = video ? json_is_true(video) : TRUE;	/* True by default */if(!publisher->video)subscriber->video = FALSE;	/* ... unless the publisher isn't sending any video */subscriber->data = data ? json_is_true(data) : TRUE;	/* True by default */if(!publisher->data)subscriber->data = FALSE;	/* ... unless the publisher isn't sending any data */if(subscriber->room && subscriber->room->do_svc) {/* This subscriber belongs to a room where VP9 SVC has been enabled,* let's assume we're interested in all layers for the time being */subscriber->spatial_layer = -1;subscriber->target_spatial_layer = 2;		/* FIXME Chrome sends 0, 1 and 2 (if using EnabledByFlag_3SL3TL) */subscriber->temporal_layer = -1;subscriber->target_temporal_layer = 2;	/* FIXME Chrome sends 0, 1 and 2 */}janus_mutex_lock(&publisher->subscribers_mutex);publisher->subscribers = g_slist_append(publisher->subscribers, subscriber);janus_mutex_unlock(&publisher->subscribers_mutex);subscriber->feed = publisher;/* Send a FIR to the new publisher */janus_videoroom_reqfir(publisher, "Switching existing subscriber to new publisher");/* Done */subscriber->paused = paused;event = json_object();json_object_set_new(event, "videoroom", json_string("event"));json_object_set_new(event, "switched", json_string("ok"));json_object_set_new(event, "room", json_integer(subscriber->room_id));json_object_set_new(event, "id", json_integer(feed_id));if(publisher->display)json_object_set_new(event, "display", json_string(publisher->display));/* Also notify event handlers */if(notify_events && gateway->events_is_enabled()) {json_t *info = json_object();json_object_set_new(info, "event", json_string("switched"));json_object_set_new(info, "room", json_integer(publisher->room_id));json_object_set_new(info, "feed", json_integer(publisher->user_id));gateway->notify_event(&janus_videoroom_plugin, session->handle, info);}} else if(!strcasecmp(request_text, "leave")) {guint64 room_id = subscriber ? subscriber->room_id : 0;/* Tell the core to tear down the PeerConnection, hangup_media will do the rest */janus_videoroom_hangup_media(session->handle);gateway->close_pc(session->handle);/* Send an event back */event = json_object();json_object_set_new(event, "videoroom", json_string("event"));json_object_set_new(event, "room", json_integer(room_id));json_object_set_new(event, "left", json_string("ok"));session->started = FALSE;} else {JANUS_LOG(LOG_ERR, "Unknown request '%s'\n", request_text);error_code = JANUS_VIDEOROOM_ERROR_INVALID_REQUEST;g_snprintf(error_cause, 512, "Unknown request '%s'", request_text);goto error;}}/* Prepare JSON event */JANUS_LOG(LOG_VERB, "Preparing JSON event as a reply\n");/* Any SDP or update to handle? */const char *msg_sdp_type = json_string_value(json_object_get(msg->jsep, "type"));const char *msg_sdp = json_string_value(json_object_get(msg->jsep, "sdp"));json_t *msg_simulcast = json_object_get(msg->jsep, "simulcast");if(!msg_sdp) {/* No SDP to send */int ret = gateway->push_event(msg->handle, &janus_videoroom_plugin, msg->transaction, event, NULL);JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));json_decref(event);} else {/* Generate offer or answer */JANUS_LOG(LOG_VERB, "This is involving a negotiation (%s) as well:\n%s\n", msg_sdp_type, msg_sdp);if(sdp_update) {/* Renegotiation: make sure the user provided an offer, and send answer */JANUS_LOG(LOG_VERB, "  -- Updating existing publisher\n");session->sdp_version++;		/* This needs to be increased when it changes */} else {/* New PeerConnection */session->sdp_version = 1;	/* This needs to be increased when it changes */session->sdp_sessid = janus_get_real_time();}const char *type = NULL;if(!strcasecmp(msg_sdp_type, "offer")) {/* We need to answer */type = "answer";} else if(!strcasecmp(msg_sdp_type, "answer")) {/* We got an answer (from a subscriber?), no need to negotiate */g_atomic_int_set(&session->hangingup, 0);int ret = gateway->push_event(msg->handle, &janus_videoroom_plugin, msg->transaction, event, NULL);JANUS_LOG(LOG_VERB, "  >> %d (%s)\n", ret, janus_get_api_error(ret));json_decref(event);janus_videoroom_message_free(msg);continue;} else {/* TODO We don't support anything else right now... */JANUS_LOG(LOG_ERR, "Unknown SDP type '%s'\n", msg_sdp_type);error_code = JANUS_VIDEOROOM_ERROR_INVALID_SDP_TYPE;g_snprintf(error_cause, 512, "Unknown SDP type '%s'", msg_sdp_type);goto error;}if(session->participant_type != janus_videoroom_p_type_publisher) {/* We shouldn't be here, we always offer ourselves */JANUS_LOG(LOG_ERR, "Only publishers send offers\n");error_code = JANUS_VIDEOROOM_ERROR_INVALID_SDP_TYPE;g_snprintf(error_cause, 512, "Only publishers send offers");goto error;} else {/* This is a new publisher: is there room? */participant = janus_videoroom_session_get_publisher(session);janus_videoroom *videoroom = participant->room;int count = 0;GHashTableIter iter;gpointer value;if(!videoroom) {error_code = JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM;goto error;}if(g_atomic_int_get(&videoroom->destroyed)) {error_code = JANUS_VIDEOROOM_ERROR_NO_SUCH_ROOM;goto error;}janus_mutex_lock(&videoroom->mutex);g_hash_table_iter_init(&iter, videoroom->participants);while (!g_atomic_int_get(&videoroom->destroyed) && g_hash_table_iter_next(&iter, NULL, &value)) {janus_videoroom_publisher *p = value;if(p != participant && p->sdp)count++;}janus_mutex_unlock(&videoroom->mutex);if(count == videoroom->max_publishers) {participant->audio_active = FALSE;participant->video_active = FALSE;participant->data_active = FALSE;JANUS_LOG(LOG_ERR, "Maximum number of publishers (%d) already reached\n", videoroom->max_publishers);error_code = JANUS_VIDEOROOM_ERROR_PUBLISHERS_FULL;g_snprintf(error_cause, 512, "Maximum number of publishers (%d) already reached", videoroom->max_publishers);goto error;}/* Now prepare the SDP to give back */if(strstr(msg_sdp, "mozilla") || strstr(msg_sdp, "Mozilla")) {participant->firefox = TRUE;}/* Start by parsing the offer */char error_str[512];janus_sdp *offer = janus_sdp_parse(msg_sdp, error_str, sizeof(error_str));if(offer == NULL) {json_decref(event);JANUS_LOG(LOG_ERR, "Error parsing offer: %s\n", error_str);error_code = JANUS_VIDEOROOM_ERROR_INVALID_SDP;g_snprintf(error_cause, 512, "Error parsing offer: %s", error_str);goto error;}GList *temp = offer->m_lines;while(temp) {/* Which media are available? */janus_sdp_mline *m = (janus_sdp_mline *)temp->data;if(m->type == JANUS_SDP_AUDIO && m->port > 0 &&m->direction != JANUS_SDP_RECVONLY && m->direction != JANUS_SDP_INACTIVE) {participant->audio = TRUE;} else if(m->type == JANUS_SDP_VIDEO && m->port > 0 &&m->direction != JANUS_SDP_RECVONLY && m->direction != JANUS_SDP_INACTIVE) {participant->video = TRUE;} else if(m->type == JANUS_SDP_APPLICATION && m->port > 0) {participant->data = TRUE;}if(m->type == JANUS_SDP_AUDIO || m->type == JANUS_SDP_VIDEO) {/* Are the extmaps we care about there? */GList *ma = m->attributes;while(ma) {janus_sdp_attribute *a = (janus_sdp_attribute *)ma->data;if(a->value) {if(videoroom->audiolevel_ext && m->type == JANUS_SDP_AUDIO && strstr(a->value, JANUS_RTP_EXTMAP_AUDIO_LEVEL)) {participant->audio_level_extmap_id = atoi(a->value);} else if(videoroom->videoorient_ext && m->type == JANUS_SDP_VIDEO && strstr(a->value, JANUS_RTP_EXTMAP_VIDEO_ORIENTATION)) {participant->video_orient_extmap_id = atoi(a->value);} else if(videoroom->playoutdelay_ext && m->type == JANUS_SDP_VIDEO && strstr(a->value, JANUS_RTP_EXTMAP_PLAYOUT_DELAY)) {participant->playout_delay_extmap_id = atoi(a->value);} else if(m->type == JANUS_SDP_AUDIO && !strcasecmp(a->name, "fmtp") && strstr(a->value, "useinbandfec=1")) {participant->do_opusfec = videoroom->do_opusfec;}}ma = ma->next;}}temp = temp->next;}/* Prepare an answer now: force the room codecs and recvonly on the Janus side */JANUS_LOG(LOG_VERB, "The publisher %s going to send an audio stream\n", participant->audio ? "is" : "is NOT");JANUS_LOG(LOG_VERB, "The publisher %s going to send a video stream\n", participant->video ? "is" : "is NOT");JANUS_LOG(LOG_VERB, "The publisher %s going to open a data channel\n", participant->data ? "is" : "is NOT");/* Check the codecs we can use, or the ones we should */if(participant->acodec == JANUS_AUDIOCODEC_NONE) {int i=0;for(i=0; i<3; i++) {if(videoroom->acodec[i] == JANUS_AUDIOCODEC_NONE)continue;if(janus_sdp_get_codec_pt(offer, janus_audiocodec_name(videoroom->acodec[i])) != -1) {participant->acodec = videoroom->acodec[i];break;}}}JANUS_LOG(LOG_VERB, "The publisher is going to use the %s audio codec\n", janus_audiocodec_name(participant->acodec));participant->audio_pt = janus_audiocodec_pt(participant->acodec);if(participant->vcodec == JANUS_VIDEOCODEC_NONE) {int i=0;for(i=0; i<3; i++) {if(videoroom->vcodec[i] == JANUS_VIDEOCODEC_NONE)continue;if(janus_sdp_get_codec_pt(offer, janus_videocodec_name(videoroom->vcodec[i])) != -1) {participant->vcodec = videoroom->vcodec[i];break;}}}JANUS_LOG(LOG_VERB, "The publisher is going to use the %s video codec\n", janus_videocodec_name(participant->vcodec));participant->video_pt = janus_videocodec_pt(participant->vcodec);janus_sdp *answer = janus_sdp_generate_answer(offer,JANUS_SDP_OA_AUDIO_CODEC, janus_audiocodec_name(participant->acodec),JANUS_SDP_OA_AUDIO_DIRECTION, JANUS_SDP_RECVONLY,JANUS_SDP_OA_AUDIO_FMTP, participant->do_opusfec ? "useinbandfec=1" : NULL,JANUS_SDP_OA_VIDEO_CODEC, janus_videocodec_name(participant->vcodec),JANUS_SDP_OA_VIDEO_DIRECTION, JANUS_SDP_RECVONLY,JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_MID,JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_RID,JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_REPAIRED_RID,JANUS_SDP_OA_ACCEPT_EXTMAP, JANUS_RTP_EXTMAP_FRAME_MARKING,JANUS_SDP_OA_ACCEPT_EXTMAP, videoroom->audiolevel_ext ? JANUS_RTP_EXTMAP_AUDIO_LEVEL : NULL,JANUS_SDP_OA_ACCEPT_EXTMAP, videoroom->videoorient_ext ? JANUS_RTP_EXTMAP_VIDEO_ORIENTATION : NULL,JANUS_SDP_OA_ACCEPT_EXTMAP, videoroom->playoutdelay_ext ? JANUS_RTP_EXTMAP_PLAYOUT_DELAY : NULL,JANUS_SDP_OA_ACCEPT_EXTMAP, videoroom->transport_wide_cc_ext ? JANUS_RTP_EXTMAP_TRANSPORT_WIDE_CC : NULL,JANUS_SDP_OA_DONE);janus_sdp_destroy(offer);/* Replace the session name */g_free(answer->s_name);char s_name[100];g_snprintf(s_name, sizeof(s_name), "VideoRoom %"SCNu64, videoroom->room_id);answer->s_name = g_strdup(s_name);/* Which media are REALLY available? (some may have been rejected) */participant->audio = FALSE;participant->video = FALSE;participant->data = FALSE;temp = answer->m_lines;while(temp) {janus_sdp_mline *m = (janus_sdp_mline *)temp->data;if(m->type == JANUS_SDP_AUDIO && m->port > 0 && m->direction != JANUS_SDP_INACTIVE) {participant->audio = TRUE;} else if(m->type == JANUS_SDP_VIDEO && m->port > 0 && m->direction != JANUS_SDP_INACTIVE) {participant->video = TRUE;} else if(m->type == JANUS_SDP_APPLICATION && m->port > 0) {participant->data = TRUE;}temp = temp->next;}JANUS_LOG(LOG_VERB, "Per the answer, the publisher %s going to send an audio stream\n", participant->audio ? "is" : "is NOT");JANUS_LOG(LOG_VERB, "Per the answer, the publisher %s going to send a video stream\n", participant->video ? "is" : "is NOT");JANUS_LOG(LOG_VERB, "Per the answer, the publisher %s going to open a data channel\n", participant->data ? "is" : "is NOT");/* Update the event with info on the codecs that we'll be handling */if(event) {if(participant->audio)json_object_set_new(event, "audio_codec", json_string(janus_audiocodec_name(participant->acodec)));if(participant->video)json_object_set_new(event, "video_codec", json_string(janus_videocodec_name(participant->vcodec)));}/* Also add a bandwidth SDP attribute if we're capping the bitrate in the room */janus_sdp_mline *m = janus_sdp_mline_find(answer, JANUS_SDP_VIDEO);if(m != NULL && videoroom->bitrate > 0 && videoroom->bitrate_cap) {if(participant->firefox) {/* Use TIAS (bps) instead of AS (kbps) for the b= attribute, as explained here:* https://github.com/meetecho/janus-gateway/issues/1277#issuecomment-397677746 */m->b_name = g_strdup("TIAS");m->b_value = videoroom->bitrate;} else {m->b_name = g_strdup("AS");m->b_value = videoroom->bitrate/1000;}}/* Generate an SDP string we can send back to the publisher */char *answer_sdp = janus_sdp_write(answer);/* Now turn the SDP into what we'll send subscribers, using the static payload types for making switching easier */offer = janus_sdp_generate_offer(s_name, answer->c_addr,JANUS_SDP_OA_AUDIO, participant->audio,JANUS_SDP_OA_AUDIO_CODEC, janus_audiocodec_name(participant->acodec),JANUS_SDP_OA_AUDIO_PT, janus_audiocodec_pt(participant->acodec),JANUS_SDP_OA_AUDIO_DIRECTION, JANUS_SDP_SENDONLY,JANUS_SDP_OA_AUDIO_FMTP, participant->do_opusfec ? "useinbandfec=1" : NULL,JANUS_SDP_OA_VIDEO, participant->video,JANUS_SDP_OA_VIDEO_CODEC, janus_videocodec_name(participant->vcodec),JANUS_SDP_OA_VIDEO_PT, janus_videocodec_pt(participant->vcodec),JANUS_SDP_OA_VIDEO_DIRECTION, JANUS_SDP_SENDONLY,JANUS_SDP_OA_DATA, participant->data,JANUS_SDP_OA_DONE);/* Add the extmap attributes, if needed */if(participant->audio_level_extmap_id > 0) {janus_sdp_mline *m = janus_sdp_mline_find(offer, JANUS_SDP_AUDIO);if(m != NULL) {janus_sdp_attribute *a = janus_sdp_attribute_create("extmap","%d %s\r\n", participant->audio_level_extmap_id, JANUS_RTP_EXTMAP_AUDIO_LEVEL);janus_sdp_attribute_add_to_mline(m, a);}}if(participant->video_orient_extmap_id > 0) {janus_sdp_mline *m = janus_sdp_mline_find(offer, JANUS_SDP_VIDEO);if(m != NULL) {janus_sdp_attribute *a = janus_sdp_attribute_create("extmap","%d %s\r\n", participant->video_orient_extmap_id, JANUS_RTP_EXTMAP_VIDEO_ORIENTATION);janus_sdp_attribute_add_to_mline(m, a);}}if(participant->playout_delay_extmap_id > 0) {janus_sdp_mline *m = janus_sdp_mline_find(offer, JANUS_SDP_VIDEO);if(m != NULL) {janus_sdp_attribute *a = janus_sdp_attribute_create("extmap","%d %s\r\n", participant->playout_delay_extmap_id, JANUS_RTP_EXTMAP_PLAYOUT_DELAY);janus_sdp_attribute_add_to_mline(m, a);}}/* Is this room recorded, or are we recording this publisher already? */janus_mutex_lock(&participant->rec_mutex);if(videoroom->record || participant->recording_active) {janus_videoroom_recorder_create(participant, participant->audio, participant->video, participant->data);}janus_mutex_unlock(&participant->rec_mutex);/* Generate an SDP string we can offer subscribers later on */char *offer_sdp = janus_sdp_write(offer);if(!sdp_update) {/* Is simulcasting involved */if(msg_simulcast && (participant->vcodec == JANUS_VIDEOCODEC_VP8 ||participant->vcodec == JANUS_VIDEOCODEC_H264)) {JANUS_LOG(LOG_VERB, "Publisher is going to do simulcasting\n");janus_rtp_simulcasting_prepare(msg_simulcast,&participant->rid_extmap_id,&participant->framemarking_ext_id,participant->ssrc, participant->rid);} else {/* No simulcasting involved */int i=0;for(i=0; i<3; i++) {participant->ssrc[i] = 0;g_free(participant->rid[i]);participant->rid[i] = NULL;}}}janus_sdp_destroy(offer);janus_sdp_destroy(answer);/* Send the answer back to the publisher */JANUS_LOG(LOG_VERB, "Handling publisher: turned this into an '%s':\n%s\n", type, answer_sdp);json_t *jsep = json_pack("{ssss}", "type", type, "sdp", answer_sdp);g_free(answer_sdp);/* How long will the Janus core take to push the event? */g_atomic_int_set(&session->hangingup, 0);gint64 start = janus_get_monotonic_time();int res = gateway->push_event(msg->handle, &janus_videoroom_plugin, msg->transaction, event, jsep);JANUS_LOG(LOG_VERB, "  >> Pushing event: %d (took %"SCNu64" us)\n", res, janus_get_monotonic_time()-start);/* Done */if(res != JANUS_OK) {/* TODO Failed to negotiate? We should remove this publisher */g_free(offer_sdp);} else {/* Store the participant's SDP for interested subscribers */g_free(participant->sdp);participant->sdp = offer_sdp;/* We'll wait for the setup_media event before actually telling subscribers */}/* Unless this is an update, in which case schedule a new offer for all viewers */if(sdp_update) {json_t *update = json_object();json_object_set_new(update, "request", json_string("configure"));json_object_set_new(update, "update", json_true());janus_mutex_lock(&participant->subscribers_mutex);GSList *s = participant->subscribers;while(s) {janus_videoroom_subscriber *subscriber = (janus_videoroom_subscriber *)s->data;if(subscriber && subscriber->session && subscriber->session->handle) {/* Enqueue the fake request: this will trigger a renegotiation */janus_videoroom_message *msg = g_malloc(sizeof(janus_videoroom_message));janus_refcount_increase(&subscriber->session->ref);msg->handle = subscriber->session->handle;msg->message = update;msg->transaction = NULL;msg->jsep = NULL;json_incref(update);g_async_queue_push(messages, msg);}s = s->next;}janus_mutex_unlock(&participant->subscribers_mutex);json_decref(update);}json_decref(event);json_decref(jsep);}if(participant != NULL)janus_refcount_decrease(&participant->ref);}janus_videoroom_message_free(msg);continue;error:{/* Prepare JSON error event */json_t *event = json_object();json_object_set_new(event, "videoroom", json_string("event"));json_object_set_new(event, "error_code", json_integer(error_code));json_object_set_new(event, "error", json_string(error_cause));int ret = gateway->push_event(msg->handle, &janus_videoroom_plugin, msg->transaction, event, NULL);JANUS_LOG(LOG_VERB, "  >> Pushing event: %d (%s)\n", ret, janus_get_api_error(ret));json_decref(event);janus_videoroom_message_free(msg);}}JANUS_LOG(LOG_VERB, "Leaving VideoRoom handler thread\n");return NULL;
}

3.7 Janus主线程的push_event

在这里插入图片描述

3.8 Transport的send_message(以transports/janus_http.c为例)

int janus_http_send_message(janus_transport_session *transport, void *request_id, gboolean admin, json_t *message) {JANUS_LOG(LOG_HUGE, "Got a %s API %s to send (%p)\n", admin ? "admin" : "Janus", request_id ? "response" : "event", transport);if(message == NULL) {JANUS_LOG(LOG_ERR, "No message...\n");return -1;}if(request_id == NULL) {/* This is an event, add to the session queue */json_t *s = json_object_get(message, "session_id");if(!s || !json_is_integer(s)) {JANUS_LOG(LOG_ERR, "Can't notify event, no session_id...\n");json_decref(message);return -1;}guint64 session_id = json_integer_value(s);janus_mutex_lock(&sessions_mutex);janus_http_session *session = g_hash_table_lookup(sessions, &session_id);if(session == NULL || g_atomic_int_get(&session->destroyed)) {JANUS_LOG(LOG_ERR, "Can't notify event, no session object...\n");janus_mutex_unlock(&sessions_mutex);json_decref(message);return -1;}g_async_queue_push(session->events, message);janus_mutex_unlock(&sessions_mutex);} else {if(request_id == keepalive_id) {/* It's a response from our fake long-poll related keepalive, ignore */json_decref(message);return 0;}/* This is a response, we need a valid transport instance */if(transport == NULL || transport->transport_p == NULL) {JANUS_LOG(LOG_ERR, "Invalid HTTP instance...\n");json_decref(message);return -1;}/* We have a response */janus_transport_session *session = (janus_transport_session *)transport;janus_mutex_lock(&messages_mutex);if(g_hash_table_lookup(messages, session) == NULL) {janus_mutex_unlock(&messages_mutex);JANUS_LOG(LOG_ERR, "Invalid HTTP connection...\n");json_decref(message);return -1;}janus_http_msg *msg = (janus_http_msg *)transport->transport_p;janus_mutex_unlock(&messages_mutex);if(!msg->connection) {JANUS_LOG(LOG_ERR, "Invalid HTTP connection...\n");json_decref(message);return -1;}janus_mutex_lock(&msg->wait_mutex);msg->response = message;msg->got_response = TRUE;janus_condition_signal(&msg->wait_cond);janus_mutex_unlock(&msg->wait_mutex);}return 0;
}

4. 参考资料

  • webrtc服务器janus 服务器代码分析学习六
    https://blog.csdn.net/bvngh3247/article/details/80988371