Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 124 additions & 38 deletions src/cpp/src/rag/text_embedding_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ bool has_token_type_ids_input(const T& inputs) {
return false;
}

void set_node_name(std::shared_ptr<ov::Node> node, const std::string& name) {
node->set_friendly_name(name);
node->get_output_tensor(0).set_names({name});
}

/**
* CLS pooling slices first element from seq_length dimension
* [batch_size, seq_length, hidden_size] -> [batch_size, seq_length[0], hidden_size]
Expand All @@ -62,12 +67,10 @@ std::shared_ptr<op::Op> get_cls_pooling_op(const ov::Output<ov::Node>& last_hidd
return std::make_shared<op::v15::Squeeze>(slice, squeeze_axis);
}

std::shared_ptr<op::Op> get_mean_pooling_op(std::shared_ptr<Model> model,
const ov::Output<ov::Node>& last_hidden_state_node) {
std::shared_ptr<op::Op> get_mean_pooling_op(const ov::Output<ov::Node>& last_hidden_state_node,
const ov::Output<ov::Node>& attention_mask) {
auto shape_of = std::make_shared<op::v3::ShapeOf>(last_hidden_state_node);

auto attention_mask = model->input("attention_mask").get_node()->outputs()[0];

auto unsqueze_axis = std::make_shared<op::v0::Constant>(ov::element::i64, ov::Shape{1}, std::vector<int64_t>{-1});

auto unsqueze = std::make_shared<op::v0::Unsqueeze>(attention_mask, unsqueze_axis);
Expand Down Expand Up @@ -95,8 +98,8 @@ std::shared_ptr<op::Op> get_mean_pooling_op(std::shared_ptr<Model> model,
return std::make_shared<op::v1::Divide>(sum_hidden_state, max_expanded_mask);
}

std::shared_ptr<op::Op> get_last_token_pooling_op(std::shared_ptr<Model> model,
const ov::Output<ov::Node>& last_hidden_state_node,
std::shared_ptr<op::Op> get_last_token_pooling_op(const ov::Output<ov::Node>& last_hidden_state_node,
const ov::Output<ov::Node>& attention_mask,
const TextEmbeddingPipeline::Config& config) {
const auto left_padding = config.padding_side.has_value() && config.padding_side.value() == "left";

Expand All @@ -115,8 +118,6 @@ std::shared_ptr<op::Op> get_last_token_pooling_op(std::shared_ptr<Model> model,
return std::make_shared<op::v15::Squeeze>(slice, squeeze_axis);
}

auto attention_mask = model->input("attention_mask").get_node()->outputs()[0];

auto axis_1 = std::make_shared<op::v0::Constant>(ov::element::i64, ov::Shape{1}, std::vector<int64_t>{1});
auto reduce_sum = std::make_shared<op::v1::ReduceSum>(attention_mask, axis_1);
auto subtract_1 = std::make_shared<op::v0::Constant>(ov::element::i64, ov::Shape{1}, std::vector<int64_t>{1});
Expand All @@ -125,31 +126,71 @@ std::shared_ptr<op::Op> get_last_token_pooling_op(std::shared_ptr<Model> model,
return std::make_shared<op::v8::Gather>(last_hidden_state_node, subtract, axis_1, 1);
}

std::shared_ptr<op::Op> create_post_ops(const ov::Output<ov::Node>& input,
const ov::Output<ov::Node>& attention_mask,
const TextEmbeddingPipeline::Config& config) {
if (config.pooling_type == TextEmbeddingPipeline::PoolingType::CLS) {
return get_cls_pooling_op(input);
} else if (config.pooling_type == TextEmbeddingPipeline::PoolingType::MEAN) {
return get_mean_pooling_op(input, attention_mask);
} else if (config.pooling_type == TextEmbeddingPipeline::PoolingType::LAST_TOKEN) {
return get_last_token_pooling_op(input, attention_mask, config);
}

OPENVINO_THROW("Pooling type is not supported");
}

std::shared_ptr<op::Op> create_normalize_ops(const ov::Output<ov::Node>& input,
const TextEmbeddingPipeline::Config& config) {
if (config.normalize) {
auto axis_const = std::make_shared<op::v0::Constant>(ov::element::i32, ov::Shape{1}, std::vector{1});
return std::make_shared<op::v0::NormalizeL2>(input, axis_const, 1e-12, op::EpsMode::MAX);
}
return std::dynamic_pointer_cast<op::Op>(input.get_node_shared_ptr());
}

std::shared_ptr<Model> apply_postprocessing(std::shared_ptr<Model> model, const TextEmbeddingPipeline::Config& config) {
ov::preprocess::PrePostProcessor processor(model);

processor.output().postprocess().custom([model, &config](const ov::Output<ov::Node>& node) {
if (config.pooling_type == TextEmbeddingPipeline::PoolingType::CLS) {
return get_cls_pooling_op(node);
} else if (config.pooling_type == TextEmbeddingPipeline::PoolingType::MEAN) {
return get_mean_pooling_op(model, node);
} else if (config.pooling_type == TextEmbeddingPipeline::PoolingType::LAST_TOKEN) {
return get_last_token_pooling_op(model, node, config);
}

OPENVINO_THROW("Pooling type is not supported");
auto attention_mask = model->input("attention_mask").get_node()->outputs()[0];
return create_post_ops(node, attention_mask, config);
});

if (config.normalize) {
processor.output().postprocess().custom([](const ov::Output<ov::Node>& node) {
auto axis_const = std::make_shared<op::v0::Constant>(ov::element::i32, ov::Shape{1}, std::vector{1});
return std::make_shared<op::v0::NormalizeL2>(node, axis_const, 1e-12, op::EpsMode::MAX);
processor.output().postprocess().custom([&config](const ov::Output<ov::Node>& node) {
return create_normalize_ops(node, config);
});
}

return processor.build();
}

std::shared_ptr<ov::Model> create_post_model(std::shared_ptr<ov::Model> model,
const TextEmbeddingPipeline::Config& config,
ov::Dimension::value_type max_prompt_size) {
auto output_node = model->outputs()[0];
auto output_shape = output_node.get_partial_shape();
auto input_param =
std::make_shared<ov::op::v0::Parameter>(output_node.get_element_type(), ov::PartialShape{1, max_prompt_size, output_shape[2]});
set_node_name(input_param, "input_ids");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
set_node_name(input_param, "input_ids");
set_node_name(input_param, "embedding_hidden_state");


auto attention_mask = std::make_shared<ov::op::v0::Parameter>(ov::element::i64, ov::PartialShape{1, max_prompt_size});
set_node_name(attention_mask, "attention_mask");

auto post_output = create_post_ops(input_param, attention_mask, config);
auto post_normalize_output = create_normalize_ops(post_output, config);
OPENVINO_ASSERT(post_normalize_output != nullptr);

auto result_node = std::make_shared<ov::op::v0::Result>(post_normalize_output);
set_node_name(result_node, "last_hidden_state");
auto post_model =
std::make_shared<ov::Model>(ov::OutputVector{result_node}, ov::ParameterVector{input_param, attention_mask});
post_model->set_friendly_name(model->get_friendly_name() + "_post_process");
post_model->validate_nodes_and_infer_types();
return post_model;
}

std::optional<size_t> read_max_position_embeddings(const std::filesystem::path& models_path) {
// config.json not found. Skip parameters initialization from file, use defaults.
const std::filesystem::path& json_path = models_path / "config.json";
Expand Down Expand Up @@ -211,32 +252,53 @@ class TextEmbeddingPipeline::TextEmbeddingPipelineImpl {

auto model = core.read_model(models_path / "openvino_model.xml", {}, properties);

const bool should_reshape = m_config.batch_size.has_value() || m_config.max_length.has_value();
if (should_reshape) {
reshape_model(model);
}

if (device == "NPU") {
OPENVINO_ASSERT(!model->is_dynamic(),
"NPU device does not support dynamic shapes. In order to fix model shape, set batch_size, "
"max_length and pad_to_max_length in the configuration.");
}

model = apply_postprocessing(model, m_config);

bool is_seq_len_fixed = true;
if (m_config.max_length) {
m_tokenization_params.insert({max_length.name(), *m_config.max_length});
} else {
is_seq_len_fixed = false;
}

if (m_config.pad_to_max_length) {
m_tokenization_params.insert({pad_to_max_length.name(), *m_config.pad_to_max_length});
is_seq_len_fixed &= m_config.pad_to_max_length.value();
} else {
is_seq_len_fixed = false;
}

if (m_config.padding_side) {
m_tokenization_params.insert({padding_side.name(), *m_config.padding_side});
}

ov::CompiledModel compiled_model = core.compile_model(model, device, properties);
bool should_reshape_non_npu =
(device != "NPU" && (m_config.batch_size.has_value() || m_config.max_length.has_value()));
bool should_reshape_npu = (device == "NPU" && m_config.batch_size.has_value() && is_seq_len_fixed);
if (should_reshape_non_npu || should_reshape_npu) {
reshape_model(model);
}

ov::CompiledModel compiled_model;
if (device == "NPU" && model->is_dynamic()) {
OPENVINO_ASSERT(m_config.max_length.has_value(), "The parameter max_length is not set");

bool is_padding_on_left = m_config.padding_side.has_value() && m_config.padding_side.value() == "left";
if (is_padding_on_left && is_seq_len_fixed &&
config.pooling_type != TextEmbeddingPipeline::PoolingType::MEAN) {
OPENVINO_THROW("Padding on left is only supported for the MEAN pooling type");
}

auto kv_pos = ov::genai::utils::get_kv_axes_pos(model);
utils::KVDesc kv_desc;
std::tie(compiled_model, kv_desc) =
utils::compile_decoder_for_npu_text_embedding(model, properties, kv_pos, m_config);

auto post_model = create_post_model(model, m_config, m_config.max_length.value());
auto post_compiled_model = core.compile_model(post_model, "CPU");
m_post_request = post_compiled_model.create_infer_request();
} else {
model = apply_postprocessing(model, m_config);
compiled_model = core.compile_model(model, device, properties);
}

utils::print_compiled_model_properties(compiled_model, "text embedding model");
m_request = compiled_model.create_infer_request();
Expand Down Expand Up @@ -281,9 +343,11 @@ class TextEmbeddingPipeline::TextEmbeddingPipelineImpl {
private:
Tokenizer m_tokenizer;
InferRequest m_request;
InferRequest m_post_request;
Config m_config;
AnyMap m_tokenization_params;
std::optional<size_t> m_max_position_embeddings;
ov::Tensor m_attention_mask;

void reshape_model(std::shared_ptr<Model>& model) {
ov::PartialShape target_shape{ov::Dimension::dynamic(), ov::Dimension::dynamic()};
Expand Down Expand Up @@ -321,6 +385,28 @@ class TextEmbeddingPipeline::TextEmbeddingPipelineImpl {
model->reshape(input_name_to_shape);
}

ov::Tensor post_model_infer(ov::Tensor input) {
if (m_post_request) {
m_post_request.set_tensor("input_ids", input);

auto attention_mask_tensor = m_post_request.get_tensor("attention_mask");

std::copy_n(m_attention_mask.data<int64_t>(),
m_attention_mask.get_size(),
attention_mask_tensor.data<int64_t>());
if (m_attention_mask.get_size() < attention_mask_tensor.get_size()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this is the case?

Copy link
Contributor Author

@mengweiguo mengweiguo Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NPUW can support long context with prefill-chunk.
For example, 8K tokens input may not work on NPU. So we split 8K into eight chunks of 1K each and reshape model to 1K input. Then 8 iterations complete the inference.
For the case of max-length=8K and real input 7.5K, the output of embedding from NPUW is 8K and the input for post model is also 8K(attention_mask_tensor=8K) while the real mask is only 7.5K. So need to fill the dirty slots with 0.

It also explains why post ops can't append to embedding model because the runtime model is not matched with the original static model.

I think the post model does not affect asynchronous inference for embedding. The post model now uses synchronous inference because the model is small enough.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add attention mask normalization for NPUW (7.5k -> 8k) as a step in postprocessing and then merge the whole postprocessing into model?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. The 7.5K value is learned during the inference stage, not during model compilation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can get tensor shape in postprocessing

std::fill_n(attention_mask_tensor.data<int64_t>() + m_attention_mask.get_size(),
attention_mask_tensor.get_size() - m_attention_mask.get_size(),
0);
}

m_post_request.infer();
return m_post_request.get_tensor("last_hidden_state");
}

return input;
}

void start_embed_async(std::vector<std::string>& texts) {
if (m_config.batch_size.has_value()) {
// if batch_size is set, model shape is fixed
Expand All @@ -332,10 +418,11 @@ class TextEmbeddingPipeline::TextEmbeddingPipelineImpl {
}

const auto encoded = m_tokenizer.encode(texts, m_tokenization_params);

m_request.set_tensor("input_ids", encoded.input_ids);
m_request.set_tensor("attention_mask", encoded.attention_mask);

m_attention_mask = encoded.attention_mask;

// fill token_type_ids
// todo: pass token_type_ids from tokenizer
if (has_token_type_ids_input(m_request.get_compiled_model().inputs())) {
Expand All @@ -351,9 +438,8 @@ class TextEmbeddingPipeline::TextEmbeddingPipelineImpl {
m_request.wait();

// [batch_size, hidden_size]
const Tensor last_hidden_state = m_request.get_tensor("last_hidden_state");

return to_embedding_result(last_hidden_state);
const auto last_hidden_state = m_request.get_tensor("last_hidden_state");
return to_embedding_result(post_model_infer(last_hidden_state));
};

std::vector<std::string> format_texts(const std::vector<std::string>& texts) {
Expand Down
Loading