Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/mysql-util/src/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@ pub fn pack_mysql_row(
row: MySqlRow,
table_desc: &MySqlTableDesc,
) -> Result<Row, MySqlError> {
let mut packer = row_container.packer();
let row_values = row.unwrap();

pack_mysql_row_from_values(row_container, row_values, table_desc)
}

pub fn pack_mysql_row_from_values(
row_container: &mut Row,
row_values: Vec<Value>,
table_desc: &MySqlTableDesc,
) -> Result<Row, MySqlError> {
let mut packer = row_container.packer();

for values in table_desc.columns.iter().zip_longest(row_values) {
let (col_desc, value) = match values {
EitherOrBoth::Both(col_desc, value) => (col_desc, value),
Expand Down
115 changes: 115 additions & 0 deletions src/storage/src/source/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,23 @@
//! The error streams from both of those operators are published to the source status and also
//! trigger a restart of the dataflow.

use std::borrow::Cow;
use std::collections::BTreeMap;
use std::convert::Infallible;
use std::fmt;
use std::io;
use std::io::Error;
use std::rc::Rc;

use differential_dataflow::AsCollection;
use differential_dataflow::containers::TimelyStack;
use itertools::Itertools;

use mysql_common::io::ParseBuf;
use mysql_common::packets::NullBitmap;
use mysql_common::proto::MyDeserialize;
use mysql_common::proto::MySerialize;
use mysql_common::value::ServerSide;
use mz_mysql_util::quote_identifier;
use mz_ore::cast::CastFrom;
use mz_repr::Diff;
Expand Down Expand Up @@ -399,10 +407,117 @@ async fn return_definite_error(
()
}

/// Like `return_definite_error`, but for use where the data_handle expects Rows instead of SourceMessages.
/// The two functions are only necessary until both snapshot and replication workflows have been updated to allow distributed row decoding.
async fn return_definite_error_rows(
err: DefiniteError,
outputs: &[usize],
data_handle: &AsyncOutputHandle<
GtidPartition,
CapacityContainerBuilder<
Vec<(
(usize, Result<(Vec<u8>, MySqlTableDesc), DataflowError>),
GtidPartition,
Diff,
)>,
>,
>,
data_cap_set: &CapabilitySet<GtidPartition>,
definite_error_handle: &AsyncOutputHandle<
GtidPartition,
CapacityContainerBuilder<Vec<ReplicationError>>,
>,
definite_error_cap_set: &CapabilitySet<GtidPartition>,
) {
for output_index in outputs {
let update = (
(*output_index, Err(err.clone().into())),
GtidPartition::new_range(Uuid::minimum(), Uuid::maximum(), GtidState::MAX),
Diff::ONE,
);
data_handle.give(&data_cap_set[0], update);
}
definite_error_handle.give(
&definite_error_cap_set[0],
ReplicationError::Definite(Rc::new(err)),
);
()
}

async fn validate_mysql_repl_settings(conn: &mut mysql_async::Conn) -> Result<(), MySqlError> {
ensure_gtid_consistency(conn).await?;
ensure_full_row_binlog_format(conn).await?;
ensure_replication_commit_order(conn).await?;

Ok(())
}

fn serialize_mysql_row(buffer: &mut Vec<u8>, row: mysql_async::Row) {
let table_name = row.columns_ref().get(0).map_or("<unknown>".to_string(), |c| {
c.table_str().to_string()
});
println!("Serializing table: {} and columns {:?} ", table_name, row.columns_ref());
row.columns_ref().iter().for_each(|col| {
col.serialize(buffer);
});
let mut bitmap = NullBitmap::<ServerSide>::new(row.columns_ref().len());
let vals = row.unwrap();
vals.iter().enumerate().for_each(|(i, val)| {
match val {
mysql_async::Value::NULL => bitmap.set(i, true),
_ => bitmap.set(i, false),
};
});
buffer.extend(bitmap.as_ref());
println!("table: {} Values to be serialized: {:?}", table_name, vals);
vals.iter().for_each(|val| -> () {
val.serialize(buffer);
println!("buffer after serializing value {:?}: {:?}", val, buffer);
});
}

fn deserialize_mysql_row(
buffer: &[u8],
desc: &MySqlTableDesc,
) -> Result<Vec<mysql_async::Value>, Error> {
println!("Deserializing columns {:?} ", desc.columns);
let mut buf = ParseBuf(buffer);
let mut columns = Vec::with_capacity(desc.columns.len());
for _ in 0..desc.columns.len() {
let col = mysql_common::packets::Column::deserialize((), &mut buf)?;
columns.push(col);
}
println!("Deserialized columns: {:?}", columns);
let bitmap = NullBitmap::<ServerSide, Cow<[u8]>>::deserialize(columns.len(), &mut buf)?;
let mut values = Vec::with_capacity(desc.columns.len());
for (column_index, column) in columns.iter().enumerate() {
if bitmap.is_null(column_index) {
values.push(mysql_async::Value::NULL);
println!("Deserialized value: {:?}", mysql_async::Value::NULL);
continue;
}
let val =
mysql_common::value::ValueDeserializer::<mysql_common::value::BinValue>::deserialize(
(column.column_type(), column.flags()),
&mut buf,
)?
.0;
match column.column_type() {
mysql_common::constants::ColumnType::MYSQL_TYPE_TINY => {
buf.eat_buf(7);
}
mysql_common::constants::ColumnType::MYSQL_TYPE_SHORT | mysql_common::constants::ColumnType::MYSQL_TYPE_YEAR => {
buf.eat_buf(6);
}
mysql_common::constants::ColumnType::MYSQL_TYPE_LONG => {
buf.eat_buf(4);
}
_ => {}
}
println!("Deserialized value: {:?}", val);
println!("Buffer after deserializing value: {:?}", buf);
values.push(val);
}
println!("table: {} Deserialized values: {:?}", desc.name, values);
Ok(values)
}
132 changes: 78 additions & 54 deletions src/storage/src/source/mysql/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,8 @@ use futures::TryStreamExt;
use itertools::Itertools;
use mysql_async::prelude::Queryable;
use mysql_async::{IsolationLevel, Row as MySqlRow, TxOpts};
use mz_mysql_util::{
ER_NO_SUCH_TABLE, MySqlError, pack_mysql_row, query_sys_var, quote_identifier,
};
use mz_mysql_util::decoding::pack_mysql_row_from_values;
use mz_mysql_util::{ER_NO_SUCH_TABLE, MySqlError, query_sys_var, quote_identifier};
use mz_ore::cast::CastFrom;
use mz_ore::future::InTask;
use mz_ore::iter::IteratorExt;
Expand All @@ -106,23 +105,24 @@ use mz_storage_types::sources::MySqlSourceConnection;
use mz_storage_types::sources::mysql::{GtidPartition, gtid_set_frontier};
use mz_timely_util::antichain::AntichainExt;
use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
use mz_timely_util::containers::stack::AccountedStackBuilder;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Exchange;
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::{CapabilitySet, Concat};
use timely::dataflow::operators::{CapabilitySet, Concat, Operator};
use timely::dataflow::{Scope, Stream};
use timely::progress::Timestamp;
use tracing::{error, trace};

use crate::metrics::source::mysql::MySqlSnapshotMetrics;
use crate::source::RawSourceCreationConfig;
use crate::source::mysql::deserialize_mysql_row;
use crate::source::types::{SignaledFuture, SourceMessage, StackedCollection};
use crate::statistics::SourceStatistics;

use super::schemas::verify_schemas;
use super::{
DefiniteError, MySqlTableName, ReplicationError, RewindRequest, SourceOutputInfo,
TransientError, return_definite_error, validate_mysql_repl_settings,
TransientError, return_definite_error_rows, serialize_mysql_row, validate_mysql_repl_settings,
};

/// Renders the snapshot dataflow. See the module documentation for more information.
Expand All @@ -141,7 +141,7 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
let mut builder =
AsyncOperatorBuilder::new(format!("MySqlSnapshotReader({})", config.id), scope.clone());

let (raw_handle, raw_data) = builder.new_output::<AccountedStackBuilder<_>>();
let (raw_handle, raw_data) = builder.new_output::<CapacityContainerBuilder<_>>();
let (rewinds_handle, rewinds) = builder.new_output::<CapacityContainerBuilder<_>>();
// Captures DefiniteErrors that affect the entire source, including all outputs
let (definite_error_handle, definite_errors) =
Expand Down Expand Up @@ -259,7 +259,7 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
trace!(%id, "timely-{worker_id} received unknown table error from \
lock query");
let err = DefiniteError::TableDropped(message);
return Ok(return_definite_error(
return Ok(return_definite_error_rows(
err,
&all_outputs,
&raw_handle,
Expand All @@ -282,7 +282,7 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
let err = DefiniteError::UnsupportedGtidState(err.to_string());
// If we received a GTID Set with non-consecutive intervals this breaks all
// our assumptions, so there is nothing else we can do.
return Ok(return_definite_error(
return Ok(return_definite_error_rows(
err,
&all_outputs,
&raw_handle,
Expand All @@ -308,7 +308,7 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
// Verify the MySQL system settings are correct for consistent row-based replication using GTIDs
match validate_mysql_repl_settings(&mut conn).await {
Err(err @ MySqlError::InvalidSystemSetting { .. }) => {
return Ok(return_definite_error(
return Ok(return_definite_error_rows(
DefiniteError::ServerConfigurationError(err.to_string()),
&all_outputs,
&raw_handle,
Expand Down Expand Up @@ -367,16 +367,14 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
let mut removed_outputs = BTreeSet::new();
for (output, err) in errored_outputs {
// Publish the error for this table and stop ingesting it
raw_handle
.give_fueled(
&data_cap_set[0],
(
(output.output_index, Err(err.clone().into())),
GtidPartition::minimum(),
Diff::ONE,
),
)
.await;
raw_handle.give(
&data_cap_set[0],
(
(output.output_index, Err(err.clone().into())),
GtidPartition::minimum(),
Diff::ONE,
),
);
trace!(%id, "timely-{worker_id} stopping snapshot of output {output:?} \
due to schema mismatch");
removed_outputs.insert(output.output_index);
Expand Down Expand Up @@ -408,8 +406,6 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
}

// Read the snapshot data from the tables
let mut final_row = Row::default();

let mut snapshot_staged_total = 0;
for (table, outputs) in &reader_snapshot_table_info {
let mut snapshot_staged = 0;
Expand All @@ -418,33 +414,15 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
let mut results = tx.exec_stream(query, ()).await?;
while let Some(row) = results.try_next().await? {
let row: MySqlRow = row;
let mut row_bytes = Vec::new();
serialize_mysql_row(&mut row_bytes, row);
snapshot_staged += 1;
for (output, row_val) in outputs.iter().repeat_clone(row) {
let event = match pack_mysql_row(&mut final_row, row_val, &output.desc)
{
Ok(row) => Ok(SourceMessage {
key: Row::default(),
value: row,
metadata: Row::default(),
}),
// Produce a DefiniteError in the stream for any rows that fail to decode
Err(err @ MySqlError::ValueDecodeError { .. }) => {
Err(DataflowError::from(DefiniteError::ValueDecodeError(
err.to_string(),
)))
}
Err(err) => Err(err)?,
};
raw_handle
.give_fueled(
&data_cap_set[0],
(
(output.output_index, event),
GtidPartition::minimum(),
Diff::ONE,
),
)
.await;
for (output, row_buf) in outputs.iter().repeat_clone(row_bytes) {
let update = (output.output_index, Ok((row_buf, output.desc.clone())));
raw_handle.give(
&data_cap_set[0],
(update, GtidPartition::minimum(), Diff::ONE),
);
}
// This overcounting maintains existing behavior but will be removed one readers no longer rely on the value.
snapshot_staged_total += u64::cast_from(outputs.len());
Expand Down Expand Up @@ -494,12 +472,58 @@ pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(

let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));

(
raw_data.as_collection(),
rewinds,
errors,
button.press_on_drop(),
)
let mut final_row = Row::default();
let mut next_worker = (0..u64::cast_from(scope.peers()))
// Round robin on 1000-records basis to avoid creating tiny containers when there are a
// small number of updates and a large number of workers.
.flat_map(|w| std::iter::repeat_n(w, 1000))
.cycle();
let round_robin = Exchange::new(move |_| next_worker.next().unwrap());
let snapshot_updates = raw_data
.unary(round_robin, "mysqlCastSnapshotRows", |_, _| {
move |input, output| {
input.for_each_time(|time, data| {
let mut session = output.session(&time);
for ((output_index, event), time, diff) in data.flat_map(|data| data.drain(..))
{
let event = event
.map_err(|e: DataflowError| e.clone())
.and_then(|(row_bytes, output_desc)| {
let row_val = match deserialize_mysql_row(&row_bytes, &output_desc)
{
Ok(vals) => vals,
Err(err) => panic!("Failed to deserialize row: {}", err),
};
match pack_mysql_row_from_values(
&mut final_row,
row_val,
&output_desc,
) {
Ok(row) => Ok(SourceMessage {
key: Row::default(),
value: row,
metadata: Row::default(),
}),
// Produce a DefiniteError in the stream for any rows that fail to decode
Err(err @ MySqlError::ValueDecodeError { .. }) => {
Err(DataflowError::from(DefiniteError::ValueDecodeError(
err.to_string(),
)))
}
Err(err) => Err(DataflowError::from(
DefiniteError::ValueDecodeError(err.to_string()),
)),
}
});

session.give(((output_index, event), time, diff));
}
});
}
})
.as_collection();

(snapshot_updates, rewinds, errors, button.press_on_drop())
}

/// Fetch the size of the snapshot on this worker and emits the appropriate emtrics and statistics
Expand Down