| | import re |
| | import json |
| | import base64 |
| | import requests |
| | import torch |
| | import uvicorn |
| | import nest_asyncio |
| | from fastapi import FastAPI, HTTPException |
| | from pydantic import BaseModel |
| | from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline |
| | from sentence_transformers import SentenceTransformer, models |
| | import gradio as gr |
| | import os |
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | HF_TOKEN = os.environ.get("HF_TOKEN") |
| | GITHUB_TOKEN = os.environ.get("GITHUB_TOKEN") |
| | GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def extract_repo_info(github_url: str): |
| | pattern = r"github\.com/([^/]+)/([^/]+)" |
| | match = re.search(pattern, github_url) |
| | if match: |
| | owner = match.group(1) |
| | repo = match.group(2).replace('.git', '') |
| | return owner, repo |
| | else: |
| | raise ValueError("Invalid GitHub URL provided.") |
| |
|
| | def get_repo_metadata(owner: str, repo: str): |
| | headers = {'Authorization': f'token {GITHUB_TOKEN}'} |
| | repo_url = f"https://api.github.com/repos/{owner}/{repo}" |
| | response = requests.get(repo_url, headers=headers) |
| | return response.json() |
| |
|
| | def get_repo_tree(owner: str, repo: str, branch: str): |
| | headers = {'Authorization': f'token {GITHUB_TOKEN}'} |
| | tree_url = f"https://api.github.com/repos/{owner}/{repo}/git/trees/{branch}?recursive=1" |
| | response = requests.get(tree_url, headers=headers) |
| | return response.json() |
| |
|
| | def get_file_content(owner: str, repo: str, file_path: str): |
| | headers = {'Authorization': f'token {GITHUB_TOKEN}'} |
| | content_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{file_path}" |
| | response = requests.get(content_url, headers=headers) |
| | data = response.json() |
| | if 'content' in data: |
| | return base64.b64decode(data['content']).decode('utf-8') |
| | else: |
| | return None |
| |
|
| | |
| | |
| | |
| |
|
| | def preprocess_text(text: str) -> str: |
| | cleaned_text = text.strip() |
| | cleaned_text = re.sub(r'\s+', ' ', cleaned_text) |
| | return cleaned_text |
| |
|
| | def load_embedding_model(model_name: str = 'huggingface/CodeBERTa-small-v1') -> SentenceTransformer: |
| | transformer_model = models.Transformer(model_name) |
| | pooling_model = models.Pooling(transformer_model.get_word_embedding_dimension(), pooling_mode_mean_tokens=True) |
| | model = SentenceTransformer(modules=[transformer_model, pooling_model]) |
| | return model |
| |
|
| | def generate_embedding(text: str, model_name: str = 'huggingface/CodeBERTa-small-v1') -> list: |
| | processed_text = preprocess_text(text) |
| | model = load_embedding_model(model_name) |
| | embedding = model.encode(processed_text) |
| | return embedding |
| |
|
| | |
| | |
| | |
| |
|
| | def is_detailed_query(query: str) -> bool: |
| | keywords = ["detail", "detailed", "thorough", "in depth", "comprehensive", "extensive"] |
| | return any(keyword in query.lower() for keyword in keywords) |
| |
|
| | def generate_prompt(query: str, context_snippets: list) -> str: |
| | context = "\n\n".join(context_snippets) |
| | if is_detailed_query(query): |
| | instruction = "Provide an extremely detailed and thorough explanation of at least 500 words." |
| | else: |
| | instruction = "Answer concisely." |
| | |
| | prompt = ( |
| | f"Below is some context from a GitHub repository:\n\n" |
| | f"{context}\n\n" |
| | f"Based on the above, {instruction}\n{query}\n" |
| | f"Answer:" |
| | ) |
| | return prompt |
| |
|
| |
|
| | def get_gemini_flash_response(prompt: str) -> str: |
| | from google import genai |
| | from google.genai import types |
| | client = genai.Client(api_key=GEMINI_API_KEY) |
| |
|
| |
|
| | |
| | response = client.models.generate_content( |
| | model="gemini-2.0-flash", |
| | contents=[prompt], |
| | config=types.GenerateContentConfig( |
| | max_output_tokens=500, |
| | temperature=0.1 |
| | ) |
| | ) |
| | |
| | return response.text |
| |
|
| |
|
| |
|
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | |
| | def get_file_content_for_choice(github_url: str, file_path: str): |
| | try: |
| | owner, repo = extract_repo_info(github_url) |
| | except Exception as e: |
| | return str(e) |
| | content = get_file_content(owner, repo, file_path) |
| | return content, file_path |
| |
|
| | def chat_with_file(github_url: str, file_path: str, user_query: str): |
| | |
| | result = get_file_content_for_choice(github_url, file_path) |
| | if isinstance(result, str): |
| | return result |
| | file_content, selected_file = result |
| | |
| | |
| | preprocessed = preprocess_text(file_content) |
| | context_snippet = preprocessed[:5000] |
| | |
| | |
| | prompt = generate_prompt(user_query, [context_snippet]) |
| | |
| | |
| | llm_response = get_gemini_flash_response(prompt) |
| | |
| | return f"File: {selected_file}\n\nLLM Response:\n{llm_response}" |
| |
|
| |
|
| | def load_repo_contents_backend(github_url: str): |
| | try: |
| | owner, repo = extract_repo_info(github_url) |
| | except Exception as e: |
| | return f"Error: {str(e)}" |
| | repo_data = get_repo_metadata(owner, repo) |
| | default_branch = repo_data.get("default_branch", "main") |
| | tree_data = get_repo_tree(owner, repo, default_branch) |
| | if "tree" not in tree_data: |
| | return "Error: Could not fetch repository tree." |
| | file_list = [item["path"] for item in tree_data["tree"] if item["type"] == "blob"] |
| | return file_list |
| |
|
| | |
| | |
| | |
| |
|
| | with gr.Blocks() as demo: |
| | gr.Markdown("# RepoChat - Chat with Repository Files") |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | gr.Markdown("### Repository Information") |
| | github_url_input = gr.Textbox(label="GitHub Repository URL", placeholder="https://github.com/username/repository") |
| | load_repo_btn = gr.Button("Load Repository Contents") |
| | |
| | file_dropdown = gr.Dropdown(label="Select a File", interactive=True, value="", choices=[]) |
| | |
| | repo_content_output = gr.Chatbot(label="Chat Conversation") |
| | with gr.Column(scale=2): |
| | gr.Markdown("### Chat Interface") |
| | chat_query_input = gr.Textbox(label="Your Query", placeholder="Type your query here") |
| | |
| | chat_output = gr.Chatbot(label="File Content") |
| |
|
| | chat_btn = gr.Button("Send Query") |
| | |
| | |
| | def update_file_dropdown(github_url): |
| | files = load_repo_contents_backend(github_url) |
| | if isinstance(files, str): |
| | print("Error loading files:", files) |
| | return gr.update(choices=[], value="") |
| | print("Files loaded:", files) |
| | |
| | return gr.update(choices=files, value="") |
| | |
| | load_repo_btn.click(fn=update_file_dropdown, inputs=[github_url_input], outputs=[file_dropdown]) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | def update_repo_content(github_url, file_choice): |
| | if not file_choice: |
| | return [("System", "No file selected.")] |
| | content, _ = get_file_content_for_choice(github_url, file_choice) |
| | |
| | return [("File Content", content)] |
| |
|
| | |
| | file_dropdown.change(fn=update_repo_content, inputs=[github_url_input, file_dropdown], outputs=[repo_content_output]) |
| | |
| | |
| | def process_chat(github_url, file_choice, chat_query): |
| | if not file_choice: |
| | return "Please select a file first." |
| | return chat_with_file(github_url, file_choice, chat_query) |
| | |
| | chat_btn.click(fn=process_chat, inputs=[github_url_input, file_dropdown, chat_query_input], outputs=[chat_output]) |
| | |
| | demo.launch(share=True) |
| |
|
| |
|