import React  from 'react';
import hljs from 'highlight.js'
import { useEffect } from 'react'

import { Toml, Rust, Bash } from '../CodeSnippets'

export default function RustUniswapEventFetcher() {
    useEffect(() => {
        hljs.highlightAll()
    }, [])
    const anyhow = <b>anyhow</b>
    const futures = <b>futures</b>
    const tokio = <b>tokio</b>
    const hex = <b>hex</b>
    const web3 = <b>web3</b>
    const dotenv = <b>dotenv</b>

    return (
        <section>
            <h1>Uniswap events fetcher with Rust</h1>
            <p>
                In this post we see how to make a Uniswap events fetcher with Rust and provide a mechanism to detect
                block reorganizations. This are moments when the accepted blockchain is another as the one we were
                originally considering and some of the latest block may change. For this we use the{' '}
                <a href="https://docs.rs/web3/latest/web3/">`web3` Rust library</a>. It helps a lot fetching blockchain
                information from a particular provider. In our example we are going to use a web socket endpoint from
                Infura. You can request one without cost in the Infura page by creating a project.
            </p>
            <p>
                The first thing we are going to code is how to fetch event from the last block and print it on the
                console. I will copy a short code snippet and then explain part by part what it does. First of all this
                are going the dependencies we are going to use:
            </p>
            <Toml>{`\
[dependencies]
anyhow = "1.0"
futures = "0.3.14"
tokio = "1.21.2"
hex = "0.4.3"
web3 = "0.18.0"
dotenv = "0.15.0"\
            `}</Toml>
            <p>
                {anyhow} will be for error handling, {tokio} and {futures} will be to work with asynchronously Rust
                runtime in an easy way, {hex} will be to convert hex string representation into a byte array and vice
                versa, {web3} will allows us to fetch data from the blockchain and process it correctly with the help of
                its types and {dotenv} just to store some secrets like the Infura API key.
            </p>
            <p>Our main function will have the following basic skeleton:</p>
            <Rust>
                {`\
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    let websocket_infura_endpoint: String = std::env::var("INFURA_WSS_ENDPOINT")?;

    // we create a web socket for listening to new blocks
    let web3 =
        web3::Web3::new(web3::transports::ws::WebSocket::new(&websocket_infura_endpoint).await?);

    // we create a contract structures that enables us to interact with the Uniswap contract in the
    // specified address
    let contract_address =
        H160::from_slice(&hex::decode("5777d92f208679db4b9778590fa3cab3ac9e2168").unwrap()[..]);
    let contract = Contract::from_json(
        web3.eth(),
        contract_address,
        include_bytes!("contracts/uniswap_pool_abi.json"),
    )?;

    // we specify that the uniswap event that we are going to listen to is the "swap"
    let swap_event = contract.abi().events_by_name("Swap")?.first().unwrap();
    let swap_event_signature = swap_event.signature();

    // we subscribe to new blocks
    let mut block_stream = web3.eth_subscribe().subscribe_new_heads().await?;

    // every time a new block is added to the blockchain our endpoint will alert us
    while let Some(Ok(block)) = block_stream.next().await {
        ...
    }

    Ok(())
}`}
            </Rust>
            <p>
                Note that{' '}
                <a href="https://etherscan.io/address/0x5777d92f208679db4b9778590fa3cab3ac9e2168#events">
                    <b>0x5777d92f208679db4b9778590fa3cab3ac9e2168</b>
                </a>
                is the Uniswap address in the Ethereum mainnet . We additionally store the ABI file in{' '}
                <b>contracts/uniswap_pool_abi.json</b> which enables us to decode the events coming from the websocket.
            </p>
            <p>
                The strategy is now for each new coming block to parse the event data corresponding to the swap. The way
                I thought could be convenient, specially when we then implement a block-reorganization protection
                mechanism, is to create a function that fetch the information for a list of block numbers:
            </p>
            <Rust>{`\
#[derive(PartialEq, Debug, Clone)]
pub struct Block {
	pub number: U64,
	pub hash: H256,
	pub parsed_logs: Vec<ParsedLog>,
}

/// This function returns a queue of Blocks
/// block_numbers: an array indicating which blocks should be fetched.
pub async fn fetch_block_queue(
	block_numbers: Vec<U64>,
	web3: Web3<WebSocket>,
	contract_address: H160,
	swap_event_signature: H256,
	swap_event: Event,
) -> VecDeque<Block> {
    let mut queue = VecDeque::<Block>::new();

    for block_i in block_numbers {
        let block = web3
            .eth()
            .block(BlockId::Number(BlockNumber::Number(block_i)))
            .await
            .unwrap()
            .unwrap();

        let swap_logs_in_block = web3
            .eth()
            .logs(
                web3::types::FilterBuilder::default()
                    .block_hash(block.hash.unwrap())
                    .address(vec![contract_address])
                    .topics(Some(vec![swap_event_signature]), None, None, None)
                    .build(),
            )
            .await
            .unwrap();

        let mut parsed_logs = vec![];
        for log in swap_logs_in_block {
            let log =
                swap_event.parse_log(RawLog { topics: log.topics, data: log.data.0 }).unwrap();

            parsed_logs.push(parse_log(log));
        }

        assert_eq!(
            block_i,
            block.number.expect("could not get block number"),
            ""block_i" should equal "number" field of block fetched"
        );

        let hash = block.hash.expect("could not get block number");
        let number = block_i;

        queue.push_back(Block { hash, number, parsed_logs });
    }
    queue
}\
            `}</Rust>
            <p>
                Note that we are creating a queue of <b>Blocks</b>. A Block has a number, a hash and and a vector
                containing the parsed logs. Note also that we have call a function called <b>parse_log(...)</b> which
                process the raw logs coming from the blockchain in something legible. Finally the return type of the
                function is <b>queue</b> since we are going to use this kind of structure for detecting blocks
                reorganization. Let's take a look to the function <b>parse_log(...)</b>:
            </p>
            <Rust>{`\
#[derive(PartialEq, Clone)]
pub struct ParsedLog {
    pub sender: String,
    pub receiver: String,
    pub direction: String,
    pub amount_usdc: String,
    pub amount_dai: String,
}

impl fmt::Debug for ParsedLog {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "Parsed Log: {{\\n")?;
        write!(f, " sender: {}\\n", self.sender)?;
        write!(f, " receiver: {}\\n", self.receiver)?;
        write!(f, " direction: {}\\n", self.direction)?;
        write!(f, " amount_usdc: {:}\\n", self.amount_usdc)?;
        write!(f, " amount_dai: {:}\\n", self.amount_dai)?;
        write!(f, "}}")
    }
}

pub fn parse_log(log: Log) -> ParsedLog {
    let sender = address_to_string(log.params[0].value.clone().into_address().unwrap());
    let receiver = address_to_string(log.params[1].value.clone().into_address().unwrap());

    let amount_dai = log.params[2].value.clone().into_int().unwrap();
    let amount_usdc = log.params[3].value.clone().into_int().unwrap();

    // check the sign of each amount looking at the last bit (true = negative, false = positive)
    let is_amount_dai_negative = u256_is_negative(amount_dai);
    let is_amount_usdc_negative = u256_is_negative(amount_usdc);

    // one should be false and the other true
    assert!(is_amount_dai_negative ^ is_amount_usdc_negative);

    // the negative one is the swap's output
    let direction =
    	if is_amount_usdc_negative { "DAI -> USDC".to_string() } else { "USDC -> DAI".to_string() };

    // format the amount according to the decimals of each token
    let amount_dai = u256_to_string(amount_dai, 18);
    let amount_usdc = u256_to_string(amount_usdc, 6);

    ParsedLog { sender, receiver, direction, amount_usdc, amount_dai }
}\
            `}</Rust>
            <p>
                Now comes the interesting part, <b>sender</b> and <b>receiver</b> are easy available in the events. Lets
                see the Raw values of one of the raw events fetch by the library:
            </p>
            <Bash>{`\
Log {
    params: [
        LogParam {
            name: "sender",
            value: Address(
                0xe592427a0aece92de3edee1f18e0157c05861564,
            ),
        },
        LogParam {
            name: "recipient",
            value: Address(
                0x60594a405d53811d3bc4766596efd80fd545a270,
            ),
        },
        LogParam {
            name: "amount0",
            value: Int(
                115792089237316195423570985008687907853269984665640564023570442790510814200787,
            ),
        },
        LogParam {
            name: "amount1",
            value: Int(
                15889241460,
            ),
        },
        LogParam {
            name: "sqrtPriceX96",
            value: Uint(
                79229438597859408092457,
            ),
        },
        LogParam {
            name: "liquidity",
            value: Uint(
                551874129406076612606884,
            ),
        },
        LogParam {
            name: "tick",
            value: Int(
                115792089237316195423570985008687907853269984665640564039457584007913129363612,
            ),
        },
    ],
}\
            `}</Bash>
            <p>
                We are interested only in <b>sender</b>, <b>receiver</b>, <b>amount0</b> and <b>amount1</b>. Sender and
                receiver are easy to be parsed fortunately. The problem comes mainly for amount0 and amount1 which at a
                first glance their values make no sense. This is because Uniswap uses <b>int256</b> to represent them,
                i.e, they have a sign and they are represented by the 2's complements. The amount that looks very large
                is in fact negative and means that a trader entered the other amount and got the negative one as output.
                We wish to print the direction of the trade too, for that we can just take the first bit of the amounts.
                If the first bit is 1 then the amount is negative, if not is positive. Note the assert with the{' '}
                <b>xor (^)</b> operation that checks that one amount is negative whether the other is positive. We
                cannot have 2 positive amounts or 2 negatives amount at the same trade. Lets look at how to convert
                these number into something readable:
            </p>
            <Rust>{`\
pub fn u256_is_negative(amount: U256) -> bool {
    amount.bit(255)
}

pub fn u256_to_string(amount: U256, decimals: usize) -> String {
    let mut amount = amount;

    if u256_is_negative(amount) {
        // We compute the 2's complement
        let mut bytes = [0u8; 32];
        amount.to_big_endian(&mut bytes);

        for b in bytes.iter_mut() {
            *b = !(*b);
        }

        amount = U256::from_big_endian(&bytes);
        amount += U256::one();
    }

    let decimal_string = amount.to_string();

    let integer: String = match decimal_string.clone().len() > decimals {
        true => decimal_string[..decimal_string.len() - decimals].to_string(),
        false => "0".to_string(),
    };

    let decimals: String = match decimal_string.len() > decimals {
        true =>
            if decimals > 0 {
                decimal_string[decimal_string.len() - decimals..].to_string()
            } else {
                "0".to_string()
            },
        false => {
            format!("{}{}", "0".repeat(decimals - decimal_string.len()), &decimal_string[..])
        },
    };

    format!("{}.{}", integer, decimals)
}\
            `}</Rust>
            <p>
                The function is pretty simple. All what is that is first to check the sign of the number. If it is
                negative we need to invert every bit and add 1 to the result to get the absolute value. Then we
                transform the number in a decimal string. The <b>amount.to_string()</b> is the magic function that does
                all the hard work here. Knowing the big string decimal value, e.g.:
            </p>
            <p>
                <b>"12920329482938492834923" (string)</b>
            </p>
            <p>
                we could use the <b>decimals</b> of that token to know were the floating point is, e.g.:
            </p>
            <p>
                <b>decimals = 6</b> &#8594; <b>"12920329482938492.834923" (new string)</b>
            </p>
            <p>
                We then return the right format string. Here are some useful unit test that helps us to know what this
                function is doing:
            </p>
            <Rust>{`\
#[test]
fn test_u256_to_string() {
    let m = U256::from_dec_str("1000000000000").unwrap();
    assert_eq!(u256_to_string(m, 6), String::from("1000000.000000"));

    let m = U256::from_dec_str("1000000000001").unwrap();
    assert_eq!(u256_to_string(m, 6), String::from("1000000.000001"));

    let m = U256::from_dec_str("1000000000001").unwrap();
    assert_eq!(u256_to_string(m, 6), String::from("1000000.000001"));

    let m = U256::from_dec_str("1000000000001").unwrap();
    assert_eq!(u256_to_string(m, 2), String::from("10000000000.01"));

    let m = U256::from_dec_str("1000000000001").unwrap();
    assert_eq!(u256_to_string(m, 0), String::from("1000000000001.0"));

    let m = U256::from_dec_str("1000000000001").unwrap();
    assert_eq!(u256_to_string(m, 13), String::from("0.1000000000001"));

    let m = U256::from_dec_str("1000000000001").unwrap();
    assert_eq!(u256_to_string(m, 15), String::from("0.001000000000001"));
}\
            `}</Rust>
            <p>
                We have build code that is able to fetch events from blocks and print it in a human readable format.
                Let's finished our while-loop inside <b>main</b> function:
            </p>
            <Rust>{`\
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {

    ...

    let mut queue = VecDeque::<Block>::new();

    while let Some(Ok(block)) = block_stream.next().await {
        let current_block_num = block.number.expect("Error getting the current block number");
        let block_numbers = vec![current_block_num];
        queue = fetch_block_queue(
            block_numbers,
            web3.clone(),
            contract_address,
            swap_event_signature,
            swap_event.clone(),
        )
        .await;

        assert_eq!(queue.len(), 1, "queue should have length 1 at this point.",);

        let block = queue.pop_front().expect("fail in popping element from the queue");
        println!("block: {}", block.number);
        if block.parsed_logs.len() > 0 {
            println!("{:#?}", block.parsed_logs);
        }

        assert_eq!(queue.len(), 0, "queue should have length 0 at this point.",);
    }

    Ok(())
}\
            `}</Rust>
            <p>
                By running <b>cargo run</b> we should see some behavior like this:
            </p>
            <Bash>{`\
block: 16875638
block: 16875639
block: 16875640
block: 16875641
block: 16875642
block: 16875643
block: 16875644
block: 16875645
[
    Parsed Log: {
     sender: 0xd1742b3c4fbb096990c8950fa635aec75b30781a
     receiver: 0x60594a405d53811d3bc4766596efd80fd545a270
     direction: USDC -> DAI
     amount_usdc: 601.745385
     amount_dai: 601.663417456753298707
    },
]
block: 16875646
[
    Parsed Log: {
     sender: 0xef1c6e67703c7bd7107eed8303fbe6ec2554bf6b
     receiver: 0xef1c6e67703c7bd7107eed8303fbe6ec2554bf6b
     direction: DAI -> USDC
     amount_usdc: 26909.272843
     amount_dai: 26910.990593155414285139
    },
]
block: 16875647
...\
            `}</Bash>
            <p>
                We can check that the values reported are correct by looking into any Uniswap V3 explorer like{' '}
                <a href="https://coinmarketcap.com/dexscan/ethereum/0x5777d92f208679db4b9778590fa3cab3ac9e2168/">
                    this
                </a>
                .
            </p>
            <h3>Check for block reorganization</h3>
            <p>
                One thing that sometimes happens is block reorganization. We typically fetch data from a specific
                endpoint, in our example it is Infura. Infura is connected or running its own Ethereum node which is
                validating transaction and providing us with the last blockchain state. Even though all the copies of
                the blockchain in all the nodes in the network are the same, it could happen that two blocks are mined
                at the same time, in this case the node will have to decide for one or the other. This decision
                typically is done for which becomes first and network delays play a major role here.
            </p>
            <p>
                As an example, imagine one new block mined in Asia and another mined in America at almost the same time
                (less than 1 ms difference). Then almost all nodes in America will take the american block as valid and
                the nodes in Asia will take the asian one. Both are valid and correct! Then come a new one and this time
                one of the chains becomes larger than the other for a substantial amount of time, in this moment the
                nodes who have the shortest chain will immediately drop the short blockchain and take the new one. If
                you were fetching events and printing them from the shorter chain then these event could not be valid
                anymore!
            </p>
            <p>
                For this kind of situations, also for checking balances in exchanges, and so on; block reorganization
                protection mechanisms are added to prevent this. For most blockchains, a confirmation of x-blocks is
                typically considered very improvable to be changed in the future. This could be thought as the
                probability of mining 5 consecutive blocks at the same time by different miners and that two chains keep
                apart for just a long time.
            </p>
            <p>
                To prevent that in our code we can wait for 5 blocks and then print that event in case any 5-blocks
                reorganization has taken place. To make this more visual:
            </p>
            <Bash>{`\
---------------------------------------------> Time
                              now
blocks :  1 -> 2 -> 3 -> 4 -> 5             -> ... 
          |
          ------------------> print block 1
            `}</Bash>
            <p>
                This is the reason why we have use the <b>queue</b> from the beginning since now on each new block we
                can fetch the previous 5-blocks and compare their hashes. In case they differ we need to update the
                queue with their corresponding event logs.
            </p>
            <p>In this case the new main function would look like:</p>
            <Rust>{`\
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    ...

    let mut queue = VecDeque::<Block>::new();

    if let Some(Ok(block)) = block_stream.next().await {
        let current_block_num = block.number.expect("Error getting the current block number");
        let block_numbers: Vec<U64> = (0..BLOCK_REORG_MAX_DEPTH - 1)
            .rev()
            .map(|x| current_block_num - U64::from(x))
            .collect();

        queue = fetch_block_queue(
            block_numbers,
            web3.clone(),
            contract_address,
            swap_event_signature,
            swap_event.clone(),
        )
        .await;
    }

    while let Some(Ok(block)) = block_stream.next().await {

        let current_block_num = block.number.expect("Error getting the current block number");
        let mut block_numbers = queue.iter().map(|block| block.number).collect::<Vec<U64>>();

        block_numbers.push(current_block_num);

        let new_queue = fetch_block_queue(
            block_numbers,
            web3.clone(),
            contract_address,
            swap_event_signature,
            swap_event.clone(),
        )
        .await;

        queue.push_back(new_queue[new_queue.len() - 1].clone());

        let reorganizations = check_and_update_queue(&mut queue, &new_queue);
        let block = queue.pop_front().expect("fail in popping element from the queue");

        println!("block: {} reorgs: {}", block.number, reorganizations);
        if block.parsed_logs.len() > 0 {
            println!("{:#?}", block.parsed_logs);
        }
    }

    Ok(())
}

/// This function updates the main queue using a new queue fetched some blocks ahead of time.
/// Normally the new block is constructed 1 block ahead of time. For example: "queue" has
/// information of blocks 1,2,3,4,5 fetched at the moment block 5 was detected. Then, "new_queue"
/// has the information of the same blocks (1,2,3,4,5) but fetched at block 6 or after.
pub fn check_and_update_queue(queue: &mut VecDeque<Block>, new_queue: &VecDeque<Block>) -> u32 {
	assert_eq!(
		queue.len(),
		new_queue.len(),
		"The 2 queues should have the same length to be compared."
	);
	assert_eq!(
		queue[0].number, new_queue[0].number,
		"Block number of front element in both queues doesn't coincide"
	);
	if queue[0].hash != new_queue[0].hash {
		println!("queue: {:#?}", queue);
		println!("new_queue: {:#?}", new_queue);
		panic!("A {}-blocks reorganization ocurred", queue.len());
	}

	let mut reorganizations = 0;
	for (i, q) in queue.iter_mut().enumerate().rev() {
		assert_eq!(q.number, new_queue[i].number, "Block numbers on both queues doesn't coincide.");

		if q.hash == new_queue[i].hash {
			break
		}
		*q = new_queue[i].clone();

		reorganizations += 1;
	}
	reorganizations
}\
            `}</Rust>
            <h3>Conclusions</h3>
            In this post we have shown
            <ul>
                <li>How to parse with Rust Ethereum event data</li>
                <li>How to detect block reorganizations</li>
            </ul>
        </section>
    )
}
