#include #include #include #include #include #include #include #include #define PACKET_SIZE 1500 #define PACKET_DATA (PACKET_SIZE - 4) #define PACKET_HEADER 4 #define BUFFER_SIZE ((0x100000 / PACKET_DATA) * PACKET_DATA) // For very slow links, send packets as fast as possible #define ASYNCHRONOUS // Commands sent by receiver #define RESEND 0x1 #define START 0x2 #define FILENAME 0x3 char HOSTNAME[1024]; int PORT; FILE *out; char data[PACKET_SIZE]; char buffer[BUFFER_SIZE]; int read_socket; int write_socket; int buffer_size; int64_t buffer_start; int dropped_packets[BUFFER_SIZE / PACKET_SIZE]; int total_dropped_packets; typedef struct { struct timeval start_time; struct timeval last_time; int64_t bytes_transferred; int64_t total_bytes; } status_t; status_t status; void init_status(status_t *status) { gettimeofday(&status->start_time, 0); gettimeofday(&status->last_time, 0); status->bytes_transferred = 0; status->total_bytes = 0; } void update_status(status_t *status, int bytes) { struct timeval new_time; status->bytes_transferred += bytes; status->total_bytes += bytes; gettimeofday(&new_time, 0); if(new_time.tv_sec - status->last_time.tv_sec > 2) { fprintf(stderr, "%lld bytes received. %lld bytes/sec \n", status->total_bytes, (int64_t)status->bytes_transferred / (int64_t)(new_time.tv_sec - status->start_time.tv_sec)); fflush(stdout); gettimeofday(&status->start_time, 0); gettimeofday(&status->last_time, 0); status->bytes_transferred = 0; // status->last_time = new_time; }; } void stop_status() { fprintf(stderr, "\nDone.\n"); } void append_dropped(int packet_number) { dropped_packets[total_dropped_packets++] = packet_number; } int read_int32(char *data) { unsigned int a, b, c, d; a = (unsigned char)data[0]; b = (unsigned char)data[1]; c = (unsigned char)data[2]; d = (unsigned char)data[3]; return (a << 24) | (b << 16) | (c << 8) | (d); } void write_int32(char *data, int number) { data[0] = (number >> 24) & 0xff; data[1] = (number >> 16) & 0xff; data[2] = (number >> 8) & 0xff; data[3] = number & 0xff; } int read_packet(int *packet_number) { int bytes_read = read(read_socket, data, PACKET_SIZE); *packet_number = -1; if(bytes_read > 3) *packet_number = read_int32(data); if(bytes_read < PACKET_SIZE) printf("read_packet: got %d bytes\n", bytes_read); return bytes_read; } int select_read_socket() { fd_set read_set; struct timeval tv; FD_ZERO(&read_set); FD_SET(read_socket, &read_set); #ifdef ASYNCHRONOUS tv.tv_sec = 5; #else tv.tv_sec = 1; #endif tv.tv_usec = 0; return select(read_socket + 1, &read_set, 0, 0, &tv); } int main(int argc, char *argv[]) { int i; if(argc < 3) { printf("Usage: udprecv \n"); exit(1); } char outfile[1024]; int first_packet = 0; strcpy(HOSTNAME, argv[1]); PORT = atoi(argv[2]); int64_t file_size = 0; printf("HOSTNAME=%s\n" "PORT=%d\n" "ASYNCHRONOUS=%s\n", HOSTNAME, PORT, #ifdef ASYNCHRONOUS "yes" #else "no" #endif ); buffer_start = 0; buffer_size = 0; read_socket = socket(PF_INET, SOCK_DGRAM, 0); struct sockaddr_in read_addr; read_addr.sin_family = AF_INET; read_addr.sin_port = htons((unsigned short)PORT); read_addr.sin_addr.s_addr = htonl(INADDR_ANY); if(bind(read_socket, (struct sockaddr*)&read_addr, sizeof(read_addr)) < 0) { perror("main: bind"); } write_socket = socket(PF_INET, SOCK_DGRAM, 0); struct sockaddr_in write_addr; struct hostent *hostinfo; write_addr.sin_family = AF_INET; write_addr.sin_port = htons((unsigned short)PORT); hostinfo = gethostbyname(HOSTNAME); if(hostinfo == NULL) { fprintf (stderr, "main: unknown host %s.\n", HOSTNAME); exit(1); } write_addr.sin_addr = *(struct in_addr *)hostinfo->h_addr; if(connect(write_socket, (struct sockaddr*)&write_addr, sizeof(write_addr)) < 0) { perror("main: connect"); } int expected_packet_number = 0; total_dropped_packets = 0; while(1) { // Filename and size. int read_status = select_read_socket(); int got_file = 0; if(read_status > 0) { int packet_number = -1; int bytes_read = read_packet(&packet_number); if(bytes_read > 0) { if(data[0] == FILENAME) { strcpy(outfile, data + 1); char *ptr = data + 1 + strlen(outfile) + 1; uint32_t sizeh = read_int32(ptr); ptr += 4; uint32_t sizel = read_int32(ptr); file_size = (((uint64_t)sizeh) << 32) | sizel; printf("Receiving '%s' size=%lld\n", outfile, file_size); got_file = 1; } } } if(got_file) { out = fopen(outfile, "w"); init_status(&status); while(1) { int packet_number = -1; #ifdef ASYNCHRONOUS read_status = select_read_socket(); if(read_status > 0) { int bytes_read = read_packet(&packet_number); first_packet = 0; //printf("got packet %d\n", packet_number); update_status(&status, bytes_read); if(bytes_read > 0 && packet_number >= 0) { // Packet out of order if(packet_number < expected_packet_number) { if(packet_number * PACKET_DATA < buffer_start) { printf("packet %d from previous buffer\n", packet_number); } else { printf("packet %d out of order\n", packet_number); memcpy(buffer + (int64_t)packet_number * PACKET_DATA - buffer_start, data + PACKET_HEADER, bytes_read - PACKET_HEADER); } for(i = 0; i < total_dropped_packets; i++) { if(dropped_packets[i] == packet_number) { memmove(dropped_packets + i * PACKET_HEADER, dropped_packets + (i + 1) * PACKET_HEADER, (total_dropped_packets - i - 1) * PACKET_HEADER); total_dropped_packets--; break; } } } else { // Skip up to next packet while(expected_packet_number < packet_number) { if(buffer_size <= BUFFER_SIZE - PACKET_DATA) { buffer_size += PACKET_DATA; } append_dropped(expected_packet_number); //printf("dropped packet %d\n", expected_packet_number); expected_packet_number++; } // Store next packet memcpy(buffer + buffer_size, data + PACKET_HEADER, bytes_read - PACKET_HEADER); buffer_size += bytes_read - PACKET_HEADER; expected_packet_number++; } } } else { if(!first_packet) { printf("dead stream buffer_size=%d\n", buffer_size); #else { { #endif while(buffer_size < BUFFER_SIZE && status.total_bytes < file_size) { int fragment = PACKET_DATA; if(status.total_bytes + fragment > file_size) fragment = file_size - status.total_bytes; buffer_size += PACKET_DATA; status.total_bytes += PACKET_DATA; append_dropped(expected_packet_number++); } printf("%lld\n", status.total_bytes); } } // ASYNCHRONOUS // Handle dropped packets if(buffer_size >= BUFFER_SIZE || status.total_bytes >= file_size) { #ifdef ASYNCHRONOUS if(total_dropped_packets) printf("%d dropped packets/%d total\n", total_dropped_packets, buffer_size / PACKET_DATA); #endif for(i = 0; i < total_dropped_packets; i++) { // Request packet data[0] = RESEND; write_int32(data + 1, dropped_packets[i]); write(write_socket, data, 5); // Read packet //printf("rereading packet %d\n", dropped_packets[i]); int read_status = select_read_socket(); // Failed if(read_status <= 0) { i--; } else // Read it { int packet_number = -1; int bytes_read = read_packet(&packet_number); if(bytes_read > 0 && packet_number == dropped_packets[i]) { memcpy(buffer + (int64_t)dropped_packets[i] * PACKET_DATA - buffer_start, data + PACKET_HEADER, bytes_read - PACKET_HEADER); } else { i--; } } } fwrite(buffer, buffer_size, 1, out); buffer_start += buffer_size; //printf("buffer_start=%lld buffer_size=%d\n", buffer_start, buffer_size); buffer_size = 0; total_dropped_packets = 0; // Restart transmission #ifdef ASYNCHRONOUS while(1 && status.total_bytes < file_size) { data[0] = START; write_int32(data + 1, expected_packet_number); write(write_socket, data, 5); int read_status = select_read_socket(); if(read_status > 0) break; } #endif } /* * printf("bytes_read=%d packet_number=%lld expected_packet_number=%lld\n", * bytes_read, * packet_number, * expected_packet_number); */ if(status.total_bytes >= file_size) break; } if(out) fclose(out); out = 0; } } }