Return path: Source handling of return path
Open a return path, and handle messages that are received upon it. Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com> Reviewed-by: Juan Quintela <quintela@redhat.com> Signed-off-by: Juan Quintela <quintela@redhat.com>
This commit is contained in:
		
							parent
							
								
									f6844b99ce
								
							
						
					
					
						commit
						70b2047774
					
				| 
						 | 
				
			
			@ -80,6 +80,14 @@ struct MigrationState
 | 
			
		|||
 | 
			
		||||
    int state;
 | 
			
		||||
    MigrationParams params;
 | 
			
		||||
 | 
			
		||||
    /* State related to return path */
 | 
			
		||||
    struct {
 | 
			
		||||
        QEMUFile     *from_dst_file;
 | 
			
		||||
        QemuThread    rp_thread;
 | 
			
		||||
        bool          error;
 | 
			
		||||
    } rp_state;
 | 
			
		||||
 | 
			
		||||
    double mbps;
 | 
			
		||||
    int64_t total_time;
 | 
			
		||||
    int64_t downtime;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -696,6 +696,11 @@ static void migrate_fd_cancel(MigrationState *s)
 | 
			
		|||
    QEMUFile *f = migrate_get_current()->file;
 | 
			
		||||
    trace_migrate_fd_cancel();
 | 
			
		||||
 | 
			
		||||
    if (s->rp_state.from_dst_file) {
 | 
			
		||||
        /* shutdown the rp socket, so causing the rp thread to shutdown */
 | 
			
		||||
        qemu_file_shutdown(s->rp_state.from_dst_file);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    do {
 | 
			
		||||
        old_state = s->state;
 | 
			
		||||
        if (!migration_is_setup_or_active(old_state)) {
 | 
			
		||||
| 
						 | 
				
			
			@ -1030,6 +1035,154 @@ int64_t migrate_xbzrle_cache_size(void)
 | 
			
		|||
    return s->xbzrle_cache_size;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/* migration thread support */
 | 
			
		||||
/*
 | 
			
		||||
 * Something bad happened to the RP stream, mark an error
 | 
			
		||||
 * The caller shall print or trace something to indicate why
 | 
			
		||||
 */
 | 
			
		||||
static void mark_source_rp_bad(MigrationState *s)
 | 
			
		||||
{
 | 
			
		||||
    s->rp_state.error = true;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
static struct rp_cmd_args {
 | 
			
		||||
    ssize_t     len; /* -1 = variable */
 | 
			
		||||
    const char *name;
 | 
			
		||||
} rp_cmd_args[] = {
 | 
			
		||||
    [MIG_RP_MSG_INVALID]        = { .len = -1, .name = "INVALID" },
 | 
			
		||||
    [MIG_RP_MSG_SHUT]           = { .len =  4, .name = "SHUT" },
 | 
			
		||||
    [MIG_RP_MSG_PONG]           = { .len =  4, .name = "PONG" },
 | 
			
		||||
    [MIG_RP_MSG_MAX]            = { .len = -1, .name = "MAX" },
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * Handles messages sent on the return path towards the source VM
 | 
			
		||||
 *
 | 
			
		||||
 */
 | 
			
		||||
static void *source_return_path_thread(void *opaque)
 | 
			
		||||
{
 | 
			
		||||
    MigrationState *ms = opaque;
 | 
			
		||||
    QEMUFile *rp = ms->rp_state.from_dst_file;
 | 
			
		||||
    uint16_t header_len, header_type;
 | 
			
		||||
    const int max_len = 512;
 | 
			
		||||
    uint8_t buf[max_len];
 | 
			
		||||
    uint32_t tmp32, sibling_error;
 | 
			
		||||
    int res;
 | 
			
		||||
 | 
			
		||||
    trace_source_return_path_thread_entry();
 | 
			
		||||
    while (!ms->rp_state.error && !qemu_file_get_error(rp) &&
 | 
			
		||||
           migration_is_setup_or_active(ms->state)) {
 | 
			
		||||
        trace_source_return_path_thread_loop_top();
 | 
			
		||||
        header_type = qemu_get_be16(rp);
 | 
			
		||||
        header_len = qemu_get_be16(rp);
 | 
			
		||||
 | 
			
		||||
        if (header_type >= MIG_RP_MSG_MAX ||
 | 
			
		||||
            header_type == MIG_RP_MSG_INVALID) {
 | 
			
		||||
            error_report("RP: Received invalid message 0x%04x length 0x%04x",
 | 
			
		||||
                    header_type, header_len);
 | 
			
		||||
            mark_source_rp_bad(ms);
 | 
			
		||||
            goto out;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if ((rp_cmd_args[header_type].len != -1 &&
 | 
			
		||||
            header_len != rp_cmd_args[header_type].len) ||
 | 
			
		||||
            header_len > max_len) {
 | 
			
		||||
            error_report("RP: Received '%s' message (0x%04x) with"
 | 
			
		||||
                    "incorrect length %d expecting %zu",
 | 
			
		||||
                    rp_cmd_args[header_type].name, header_type, header_len,
 | 
			
		||||
                    (size_t)rp_cmd_args[header_type].len);
 | 
			
		||||
            mark_source_rp_bad(ms);
 | 
			
		||||
            goto out;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        /* We know we've got a valid header by this point */
 | 
			
		||||
        res = qemu_get_buffer(rp, buf, header_len);
 | 
			
		||||
        if (res != header_len) {
 | 
			
		||||
            error_report("RP: Failed reading data for message 0x%04x"
 | 
			
		||||
                         " read %d expected %d",
 | 
			
		||||
                         header_type, res, header_len);
 | 
			
		||||
            mark_source_rp_bad(ms);
 | 
			
		||||
            goto out;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        /* OK, we have the message and the data */
 | 
			
		||||
        switch (header_type) {
 | 
			
		||||
        case MIG_RP_MSG_SHUT:
 | 
			
		||||
            sibling_error = be32_to_cpup((uint32_t *)buf);
 | 
			
		||||
            trace_source_return_path_thread_shut(sibling_error);
 | 
			
		||||
            if (sibling_error) {
 | 
			
		||||
                error_report("RP: Sibling indicated error %d", sibling_error);
 | 
			
		||||
                mark_source_rp_bad(ms);
 | 
			
		||||
            }
 | 
			
		||||
            /*
 | 
			
		||||
             * We'll let the main thread deal with closing the RP
 | 
			
		||||
             * we could do a shutdown(2) on it, but we're the only user
 | 
			
		||||
             * anyway, so there's nothing gained.
 | 
			
		||||
             */
 | 
			
		||||
            goto out;
 | 
			
		||||
 | 
			
		||||
        case MIG_RP_MSG_PONG:
 | 
			
		||||
            tmp32 = be32_to_cpup((uint32_t *)buf);
 | 
			
		||||
            trace_source_return_path_thread_pong(tmp32);
 | 
			
		||||
            break;
 | 
			
		||||
 | 
			
		||||
        default:
 | 
			
		||||
            break;
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    if (rp && qemu_file_get_error(rp)) {
 | 
			
		||||
        trace_source_return_path_thread_bad_end();
 | 
			
		||||
        mark_source_rp_bad(ms);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    trace_source_return_path_thread_end();
 | 
			
		||||
out:
 | 
			
		||||
    ms->rp_state.from_dst_file = NULL;
 | 
			
		||||
    qemu_fclose(rp);
 | 
			
		||||
    return NULL;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
__attribute__ (( unused )) /* Until later in patch series */
 | 
			
		||||
static int open_return_path_on_source(MigrationState *ms)
 | 
			
		||||
{
 | 
			
		||||
 | 
			
		||||
    ms->rp_state.from_dst_file = qemu_file_get_return_path(ms->file);
 | 
			
		||||
    if (!ms->rp_state.from_dst_file) {
 | 
			
		||||
        return -1;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    trace_open_return_path_on_source();
 | 
			
		||||
    qemu_thread_create(&ms->rp_state.rp_thread, "return path",
 | 
			
		||||
                       source_return_path_thread, ms, QEMU_THREAD_JOINABLE);
 | 
			
		||||
 | 
			
		||||
    trace_open_return_path_on_source_continue();
 | 
			
		||||
 | 
			
		||||
    return 0;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
__attribute__ (( unused )) /* Until later in patch series */
 | 
			
		||||
/* Returns 0 if the RP was ok, otherwise there was an error on the RP */
 | 
			
		||||
static int await_return_path_close_on_source(MigrationState *ms)
 | 
			
		||||
{
 | 
			
		||||
    /*
 | 
			
		||||
     * If this is a normal exit then the destination will send a SHUT and the
 | 
			
		||||
     * rp_thread will exit, however if there's an error we need to cause
 | 
			
		||||
     * it to exit.
 | 
			
		||||
     */
 | 
			
		||||
    if (qemu_file_get_error(ms->file) && ms->rp_state.from_dst_file) {
 | 
			
		||||
        /*
 | 
			
		||||
         * shutdown(2), if we have it, will cause it to unblock if it's stuck
 | 
			
		||||
         * waiting for the destination.
 | 
			
		||||
         */
 | 
			
		||||
        qemu_file_shutdown(ms->rp_state.from_dst_file);
 | 
			
		||||
        mark_source_rp_bad(ms);
 | 
			
		||||
    }
 | 
			
		||||
    trace_await_return_path_close_on_source_joining();
 | 
			
		||||
    qemu_thread_join(&ms->rp_state.rp_thread);
 | 
			
		||||
    trace_await_return_path_close_on_source_close();
 | 
			
		||||
    return ms->rp_state.error;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * migration_completion: Used by migration_thread when there's not much left.
 | 
			
		||||
 *   The caller 'breaks' the loop when this returns.
 | 
			
		||||
| 
						 | 
				
			
			@ -1074,8 +1227,10 @@ fail:
 | 
			
		|||
    migrate_set_state(s, MIGRATION_STATUS_ACTIVE, MIGRATION_STATUS_FAILED);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/* migration thread support */
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * Master migration thread on the source VM.
 | 
			
		||||
 * It drives the migration and pumps the data down the outgoing channel.
 | 
			
		||||
 */
 | 
			
		||||
static void *migration_thread(void *opaque)
 | 
			
		||||
{
 | 
			
		||||
    MigrationState *s = opaque;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										10
									
								
								trace-events
								
								
								
								
							
							
						
						
									
										10
									
								
								trace-events
								
								
								
								
							| 
						 | 
				
			
			@ -1426,12 +1426,22 @@ flic_no_device_api(int err) "flic: no Device Contral API support %d"
 | 
			
		|||
flic_reset_failed(int err) "flic: reset failed %d"
 | 
			
		||||
 | 
			
		||||
# migration.c
 | 
			
		||||
await_return_path_close_on_source_close(void) ""
 | 
			
		||||
await_return_path_close_on_source_joining(void) ""
 | 
			
		||||
migrate_set_state(int new_state) "new state %d"
 | 
			
		||||
migrate_fd_cleanup(void) ""
 | 
			
		||||
migrate_fd_error(void) ""
 | 
			
		||||
migrate_fd_cancel(void) ""
 | 
			
		||||
migrate_pending(uint64_t size, uint64_t max) "pending size %" PRIu64 " max %" PRIu64
 | 
			
		||||
migrate_send_rp_message(int msg_type, uint16_t len) "%d: len %d"
 | 
			
		||||
open_return_path_on_source(void) ""
 | 
			
		||||
open_return_path_on_source_continue(void) ""
 | 
			
		||||
source_return_path_thread_bad_end(void) ""
 | 
			
		||||
source_return_path_thread_end(void) ""
 | 
			
		||||
source_return_path_thread_entry(void) ""
 | 
			
		||||
source_return_path_thread_loop_top(void) ""
 | 
			
		||||
source_return_path_thread_pong(uint32_t val) "%x"
 | 
			
		||||
source_return_path_thread_shut(uint32_t val) "%x"
 | 
			
		||||
migrate_transferred(uint64_t tranferred, uint64_t time_spent, double bandwidth, uint64_t size) "transferred %" PRIu64 " time_spent %" PRIu64 " bandwidth %g max_size %" PRId64
 | 
			
		||||
migrate_state_too_big(void) ""
 | 
			
		||||
migrate_global_state_post_load(const char *state) "loaded state: %s"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue