Loading megatron/schedules.py +4 −12 Original line number Diff line number Diff line Loading @@ -307,7 +307,6 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat model[model_chunk_id], input_tensor, losses_reduced) output_tensors[model_chunk_id].append(output_tensor) assert_viewless_tensor(output_tensor) # if forward-only, no need to save tensors for a backward pass if forward_only: Loading Loading @@ -341,7 +340,6 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat mpu.set_virtual_pipeline_model_parallel_rank(0) input_tensors[0].append( p2p_communication.recv_forward(tensor_shape, timers=timers)) assert_viewless_tensor(input_tensors[0][-1]) for k in range(num_warmup_microbatches): output_tensor = forward_step_helper(k) Loading Loading @@ -373,7 +371,6 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat tensor_shape=tensor_shape, timers=timers) output_tensor_grads[num_model_chunks-1].append(output_tensor_grad) assert_viewless_tensor(output_tensor_grad) else: input_tensor = \ p2p_communication.send_forward_recv_forward( Loading @@ -382,7 +379,6 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat timers=timers) free_output_tensor(output_tensor, args.deallocate_pipeline_outputs) input_tensors[next_forward_model_chunk_id].append(input_tensor) assert_viewless_tensor(input_tensor) # Run 1F1B in steady state. for k in range(num_microbatches_remaining): Loading Loading @@ -452,18 +448,15 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat # right location. if recv_prev: input_tensors[next_forward_model_chunk_id].append(input_tensor) assert_viewless_tensor(input_tensor) if recv_next: output_tensor_grads[next_backward_model_chunk_id].append( output_tensor_grad) assert_viewless_tensor(output_tensor_grad) # Run cooldown backward passes (flush out pipeline). if not forward_only: if all_warmup_microbatches: output_tensor_grads[num_model_chunks-1].append( p2p_communication.recv_backward(tensor_shape, timers=timers)) assert_viewless_tensor(output_tensor_grads[num_model_chunks-1][-1]) for k in range(num_microbatches_remaining, num_microbatches): input_tensor_grad = backward_step_helper(k) next_backward_model_chunk_id = get_model_chunk_id(k+1, forward=False) Loading @@ -478,7 +471,6 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat input_tensor_grad, recv_next=recv_next, tensor_shape=tensor_shape, timers=timers)) assert_viewless_tensor(output_tensor_grads[next_backward_model_chunk_id][-1]) return losses_reduced Loading Loading @@ -624,8 +616,8 @@ def forward_backward_pipelining_without_interleaving(forward_step_func, data_ite send_forward(output_tensor, send_tensor_shapes, timers=timers) if not forward_only: input_tensors.append(mpu.assert_viewless_tensor(input_tensor)) output_tensors.append(mpu.assert_viewless_tensor(output_tensor)) input_tensors.append(input_tensor) output_tensors.append(output_tensor) free_output_tensor(output_tensor, args.deallocate_pipeline_outputs) # Before running 1F1B, need to receive first forward tensor. Loading Loading @@ -653,8 +645,8 @@ def forward_backward_pipelining_without_interleaving(forward_step_func, data_ite timers=timers) # Add input_tensor and output_tensor to end of list. input_tensors.append(mpu.assert_viewless_tensor(input_tensor)) output_tensors.append(mpu.assert_viewless_tensor(output_tensor)) input_tensors.append(input_tensor) output_tensors.append(output_tensor) free_output_tensor(output_tensor, args.deallocate_pipeline_outputs) # Pop input_tensor and output_tensor from the start of the list for Loading Loading
megatron/schedules.py +4 −12 Original line number Diff line number Diff line Loading @@ -307,7 +307,6 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat model[model_chunk_id], input_tensor, losses_reduced) output_tensors[model_chunk_id].append(output_tensor) assert_viewless_tensor(output_tensor) # if forward-only, no need to save tensors for a backward pass if forward_only: Loading Loading @@ -341,7 +340,6 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat mpu.set_virtual_pipeline_model_parallel_rank(0) input_tensors[0].append( p2p_communication.recv_forward(tensor_shape, timers=timers)) assert_viewless_tensor(input_tensors[0][-1]) for k in range(num_warmup_microbatches): output_tensor = forward_step_helper(k) Loading Loading @@ -373,7 +371,6 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat tensor_shape=tensor_shape, timers=timers) output_tensor_grads[num_model_chunks-1].append(output_tensor_grad) assert_viewless_tensor(output_tensor_grad) else: input_tensor = \ p2p_communication.send_forward_recv_forward( Loading @@ -382,7 +379,6 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat timers=timers) free_output_tensor(output_tensor, args.deallocate_pipeline_outputs) input_tensors[next_forward_model_chunk_id].append(input_tensor) assert_viewless_tensor(input_tensor) # Run 1F1B in steady state. for k in range(num_microbatches_remaining): Loading Loading @@ -452,18 +448,15 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat # right location. if recv_prev: input_tensors[next_forward_model_chunk_id].append(input_tensor) assert_viewless_tensor(input_tensor) if recv_next: output_tensor_grads[next_backward_model_chunk_id].append( output_tensor_grad) assert_viewless_tensor(output_tensor_grad) # Run cooldown backward passes (flush out pipeline). if not forward_only: if all_warmup_microbatches: output_tensor_grads[num_model_chunks-1].append( p2p_communication.recv_backward(tensor_shape, timers=timers)) assert_viewless_tensor(output_tensor_grads[num_model_chunks-1][-1]) for k in range(num_microbatches_remaining, num_microbatches): input_tensor_grad = backward_step_helper(k) next_backward_model_chunk_id = get_model_chunk_id(k+1, forward=False) Loading @@ -478,7 +471,6 @@ def forward_backward_pipelining_with_interleaving(forward_step_func, data_iterat input_tensor_grad, recv_next=recv_next, tensor_shape=tensor_shape, timers=timers)) assert_viewless_tensor(output_tensor_grads[next_backward_model_chunk_id][-1]) return losses_reduced Loading Loading @@ -624,8 +616,8 @@ def forward_backward_pipelining_without_interleaving(forward_step_func, data_ite send_forward(output_tensor, send_tensor_shapes, timers=timers) if not forward_only: input_tensors.append(mpu.assert_viewless_tensor(input_tensor)) output_tensors.append(mpu.assert_viewless_tensor(output_tensor)) input_tensors.append(input_tensor) output_tensors.append(output_tensor) free_output_tensor(output_tensor, args.deallocate_pipeline_outputs) # Before running 1F1B, need to receive first forward tensor. Loading Loading @@ -653,8 +645,8 @@ def forward_backward_pipelining_without_interleaving(forward_step_func, data_ite timers=timers) # Add input_tensor and output_tensor to end of list. input_tensors.append(mpu.assert_viewless_tensor(input_tensor)) output_tensors.append(mpu.assert_viewless_tensor(output_tensor)) input_tensors.append(input_tensor) output_tensors.append(output_tensor) free_output_tensor(output_tensor, args.deallocate_pipeline_outputs) # Pop input_tensor and output_tensor from the start of the list for Loading